Upgrade to Pro — share decks privately, control downloads, hide ads and more …

OCI Streaming 技術詳細

OCI Streaming 技術詳細

Oracle Infrastructure(OCI)のサービスの技術資料です。

oracle4engineer
PRO

July 19, 2023
Tweet

More Decks by oracle4engineer

Other Decks in Technology

Transcript

  1. Oracle Cloud Infrastructure
    Streaming Service 技術詳細
    日本オラクル株式会社
    Jul. 2023
    1

    View Slide

  2. Streamingを利用するにあたって理解しておくべき概念とその詳細
    • Stream : Partitionで区切られた、Messageの追加のみ可能なデータストア
    • Stream Pool : Streamをグループ化したもの
    • Kafka Connect利用時にはこのStream Poolの設定値が必要(コンソールから確認可能)
    • Message : Base64エンコードされたデータ(KeyとValueで構成)
    • 1MBを超えるサイズのMessageは利用できない
    • Key : Messageをグルーピングするための識別子、同じKeyを持つMessageは同じPartitionに送られる
    Streamingの概念(Stream, Message, Key, Partition)
    2
    0 1 2 3 4 5
    Partition0
    0 1 2 3 4
    Partition1
    0 1 2 3 4 5 6
    Partition2
    Stream
    Messageの追加
    (KeyによってPartitionに分配)
    Stream Pool
    Copyright © 2023, Oracle and/or its affiliates

    View Slide

  3. Streamingを利用するにあたって理解しておくべき概念とその詳細
    • Offset : Partition内でのMessageの識別子
    • Offsetを保存しておくことで、そのOffsetから読み出しを再開できる
    • 0, 1, 2, 4, 5, 7 のように、必ずしも密にはならないため、「次のOffsetを算出して読み出しを再開」という処理は書けない
    • Cursor : SDKを利用する際にStreamからMassageを読み出す際のポインタ
    • Cursorは作成後5分経つと失効する
    • 失効するため、Cursorを保存しておいて障害からの回復に使うのは危険
    • Partition : Streamの中の区切りであり、これにより並列なMessageの読み出しを実現できる
    • Partitionの名称は自動的に0,1,2,3,4,...と割り当てられる
    • Partitionの数量はStreamの作成時に指定(後から追加/削除はできない)
    Streamingの概念(Offset, Cursor)
    3
    0 1 2 4 5 7
    Partition
    Stream
    Offset
    Cursor
    Copyright © 2023, Oracle and/or its affiliates

    View Slide

  4. Producer : Messageを書き込む(Publish)役割
    • 同じKeyのMessageは、同一のPartitionに配信される
    • Producerが明示的にどのPartitionに書き込むかを指定することはできない
    • Keyを指定することで、Partition内での配信順序を保持できる(first-in/first-out)
    • Keyを指定しない(null)場合は、複数のPartitionに自動的に分散される
    Consumer : Messageを読み出す(Subscribe)役割
    • Messageが作成された順に、Messageを読み出す
    • 複数のPartitionから読み出す場合は、Messageの順序は保証されない
    • どこまでMessageを読み出したかはOffsetを用いて判断
    ProducerとConsumer – Streamingの周辺コンポーネント
    4
    0 1 2
    0 1
    0
    Stream
    Producer 1
    PutMessage API GetMessage API
    Producer 2
    Producer 3
    Consumer 1
    Consumer 2
    Consumer 3
    Partition0
    Partition1
    Partition2
    Key0
    Value
    Key2
    Value
    Key1
    Value
    Key2
    Value
    Key0
    Value
    Copyright © 2023, Oracle and/or its affiliates

    View Slide

  5. Streamの作成後、APIを利用しメッセージの書き込み(Produce)可能に
    • Streamの作成はOCIコンソールやAPIで行うことができる
    Messageの読み出し(Consume)のためには、Cursorを利用する
    • OffsetのポインターとなるCursorを、Partitionを指定して作成
    • 作成したCursorを利用して、PartitionからMessageを読み出し
    • Cursorは都度生成せず、Message取得時に取得できるNextCursorを利用する
    その他のProduce/Consume方法
    • Kafka互換性APIの利用
    • Kafka Connect構成を利用
    • Kafka Connectと接続し、Object StorageやAutonomous DBとの接続が可能
    MessageのProduceとConsume
    5 Copyright © 2023, Oracle and/or its affiliates

    View Slide

  6. • Cursorは5つの種類があり、作成時に対応するパラメータとともに指定
    • TRIM_HORIZON: 全てのMessageを取得、パラメータ無し
    • AT_OFFSET: 特定のOffset以上のMessageを取得、パラメータはOffset
    • AFTER_OFFSET: 特定のOffsetより大きなMessageを取得、パラメータはOffset
    • AT_TIME: 指定した時間以降のMessageを取得、パラメータはTime
    • LATEST: Cursorの初回生成後に追加されたMessageを取得
    • Cursor作成時の種類は、どのようにMessageを取得したいかで決める
    Cursorの詳細
    Copyright © 2023, Oracle and/or its affiliates
    6
    Offset - 0
    Timestamp – 00:00
    Offset - 1
    Timestamp – 00:01
    Offset - 2
    Timestamp – 00:02
    Offset - 3
    Timestamp – 00:03
    TRIM_HORIZON
    AT_OFFSET(1)
    AFTER_OFFSET(
    1)
    AT_TIME(00:0
    3)
    ※Cursor
    生成
    Offset – 4
    Timestamp – 00:04
    LATEST

    View Slide

  7. • 複数のConsumerでグループを構成
    • GroupNameがGroupの識別子となり、Consumerの識別子はInstanceName
    • Group内のConsumerは、それぞれ単一のPartitionから読み出す
    • 同じGroup内の複数のConsumerが、同一のPartitionからMessageを読み出すことはできない
    • この仕組みが、並列かつPartitionの重複がないデータの読み出しを可能にする
    Consumer Groupの概要
    7
    Consumer 1
    Consumer 2
    Consumer 3
    Grp. A
    0 1 2
    0 1
    0
    Stream
    Partition0
    Partition1
    Partition2 Consumer 4
    Consumer 5
    Consumer 6
    Grp. B
    Copyright © 2023, Oracle and/or its affiliates

    View Slide

  8. Consumer Groupを使う際には、GroupCursorを利用する
    • GroupNameとInstanceNameを指定し、GroupCursorを取得する
    Consumer Groupの使い方
    8
    Consumer 1
    Consumer 2
    Grp. A
    0 1 2
    Stream
    0
    1 0 1
    .groupName(“GroupA”)
    .instanceName(“Consumer1”)
    .groupName(“GroupA”)
    .instanceName(“Consumer2”)
    取得したGroupCursorをそれぞれ利用
    GroupCursorの種類は
    ・TRIM_HORIZON
    ・AT_TIME
    ・LATEST
    に限定される
    Copyright © 2023, Oracle and/or its affiliates

    View Slide

  9. • Consumer Group内のアサイン(どのPartitionから読み出すか)は自動的に決まる
    • Consumer Groupにはリバランスという仕組みがある
    • インスタンスが追加された場合には、Consumerは読み出しの負荷を分散する
    • インスタンスがエラーなどで削除された場合、残ったConsumerで全てのPartitionから読み出すように動作
    Consumer Groupの仕組み
    9
    Consumer 1
    Consumer 2
    0 1 2
    Stream
    0
    1
    2
    0
    0 1
    Consumer3,
    Consumer4
    の順番に追加
    リバランスされる
    追加後も待機状態に
    Consumer 1
    Consumer 2
    3 5
    Stream
    0
    1
    2
    1 2
    2 3
    Consumer 3
    Consumer 4
    ※図はTRIM_HORIZONの場合の動作
    3 5 6
    Stream
    0
    1
    2
    1 2 3
    4 6
    Consumer 3
    Consumer 4
    Consumer 1
    Consumer 2
    3 5
    Stream
    0
    1
    2
    1 2
    2 3
    Consumer 3
    Consumer 4
    Consumer1,
    Consumer2
    を停止
    停止したインスタンスが一度
    読み出したOffsetをもう一
    度読み出し、 リバランスさ
    れる
    (詳しくは次スライド)
    Copyright © 2023, Oracle and/or its affiliates

    View Slide

  10. • Consumer Groupには、Messageを欠落させないための
    仕組みであるCommitという概念がある
    • Commitを自動で行うCommitOnGet: Message取得時
    に、最後に取得したMessageのOffsetをCommitする
    • Consumer 1が0-99のMessageを取得し、100-199の
    Message取得時に落ちた場合、99がCommitされる
    • 結果的に引き継ぐConsumer 2は、100からMessageを取得す
    るため、データロスしないことになる
    • CommitのタイミングはgetMessages API実行時
    • 自動CommitをOffにすることも可能
    Consumer GroupにおけるCommitの概念
    Copyright © 2023, Oracle and/or its affiliates
    10
    Consumer 1
    Consumer 2
    0 … 99
    Stream
    0
    301 … 400
    1
    0-99のMessageを取得後、 100-
    199のMessageを取得しに行く
    Consumer 1
    Consumer 2
    0 … 99 100 … 199
    Stream
    0
    1 301 … 400 401 … 500
    停止
    Partition0の読み出しを
    Consumer2が引き継ぐ
    Consumer 1
    Consumer 2
    0 … 99 100 … 199 200 … 299
    Stream
    0
    301 … 400 401 … 500 501 … 600
    1
    返却されたNextCursorを利用して引き続
    き501以降のMessageを読み出し
    99がCommitされているので、100以降の読み出
    しから引き継げる(データロスを防げる)

    View Slide

  11. Consumerがエラーなどで停止した際、30秒経つとそのInstanceはConsumer Groupから除外される
    • プログラムの都合上、Consumerが30秒以上Messageを取得しない可能性がある場合には、
    Heartbeat APIを利用してConsumer Groupから除外されないようにする
    • 一度停止したConsumerを再開したとしても、自動リバランスにより、停止する前と同様のPartitionから読み出すと
    は限らない
    Consumer GroupにおけるConsumerのライフサイクル
    Copyright © 2023, Oracle and/or its affiliates
    11
    Consumer 1
    Consumer 2
    0 … 100
    Stream
    0
    301 … 400
    1
    停止
    30秒後
    Consumer 3
    Consumer 4
    Heartbeat APIの実行
    Consumer 2
    0 … 100
    Stream
    0
    301 … 400
    1
    Consumer 3
    Consumer 4
    Heartbeat APIの実行
    Groupから除外されない
    Consumer1は
    Groupから除外され、
    再度Groupに入ったと
    してもPartition0から
    読み出すわけではない

    View Slide

  12. View Slide