Kafka のパーティションとの付き合い方

Platform Team/Core Unit の村上です。

Repro は分散メッセージングシステムである Kafka を用いたイベント駆動アーキテクチャで構築されています。
私が所属している Core Unit は、Repro のコアとなる基盤を支えているチームであり、Kafka のストリームアプリケーションを日常的に扱っています。

私は Repro に入ってから Kafka のストリームアプリケーションを触るようになりましたが、一般的な Web API や Worker アプリケーションの実装とは異なるマインドセットが求められると感じます。
その中で特に重要だと考えているのが Kafka のパーティションに関する理解です。

Kafka のパーティションは分散処理を扱う上で重要な概念であり、ここの理解が浅いまま実装を進めていくと、後になってトラブルに見舞われることがあります。私自身 Kafka に入門して日が浅いころに筋が悪い実装をしていました。

本記事では、そんなパーティションの扱いについて、ストリームアプリケーションの実装時に押さえておきたいポイントを書きました。

Kafka のパーティションとは

Kafka は、メッセージを Topic1 という単位で分類します。
例えば、ECサイトにおける注文イベントを orders という Topic に、支払いイベントを payments という Topic にそれぞれ配信する、といった具合にメッセージの種別毎に対象 Topic を分けて管理できます。

パーティションは、その Topic のサブセットで、Topic 内のメッセージの分割単位です。1 つの Topic は 1 つ以上のパーティションを持つことができます。

%%{init: {"theme": "dark"}}%%
graph LR
    PA([Producer A])
    PB([Producer B])

    %% Broker → Topic A → Partitions
    subgraph Broker["Broker"]
        subgraph TopicA["Topic A"]
            PART0["Partition 0"]
            PART1["Partition 1"]
            PART2["Partition 2"]
        end
    end
    style Broker stroke:#999,stroke-dasharray: 4 2
    
    subgraph CG2["Consumer Group B"]
      C2([Consumer C])
    end
    style CG2 stroke:#999,stroke-dasharray: 4 2

    subgraph CG1["Consumer Group A"]
        C0([Consumer A])
        C1([Consumer B])
    end
    style CG1 stroke:#999,stroke-dasharray: 4 2

    %% Producers -> Partitions
    PA -- "message_0" --> PART0
    PA -- "message_1" --> PART1
    PB -- "message_2" --> PART2

    PART0 --> C0
    PART1 --> C0
    PART2 --> C1

    PART0 --> C2
    PART1 --> C2
    PART2 --> C2

    %% Style tweaks: producer/consumer colors
    style PA fill:#eef6ff,stroke:#3399ff,color:#000
    style PB fill:#eef6ff,stroke:#3399ff,color:#000
    style C0 fill:#f6f6e9,stroke:#999933,color:#000
    style C1 fill:#f6f6e9,stroke:#999933,color:#000
    style C2 fill:#e9f6f2,stroke:#339966,color:#000

上図が Kafka の基本的な処理フローを簡略化して表したものです。用語説明は以下。

  • Producer: メッセージを送るアプリケーションで、指定した Topic にメッセージを送信する
  • Consumer: メッセージを受け取るアプリケーションで、指定した Topic のメッセージを受信する
  • Consumer Group: 複数の Consumer をグループ化したもので、同じ Consumer Group に属する Consumer は、Topic のパーティションを分担して処理する
  • Broker: Producer からメッセージを受け取って Consumer に配信するメッセージの仲介者。Topic のメッセージは Broker で保持する

どのパーティションにメッセージを格納するかを決定するのは、Producer の責務です。
図の例では、Topic に Partition 0 ~ Partition 2 の 3 つがあり、Producer はこの中からパーティションを選択して送信しています2

Kafka では、Consumer Group 単位でパーティションが割り当てられます。
図のように Consumer Group A には Consumer が 2 つ存在するため、3 つのパーティションのうち 2 つは片方の Consumer が、残り 1 つはもう片方の Consumer が担当します。
一方、Consumer Group B は Consumer が 1 つだけなので、すべてのパーティションを 1 つの Consumer が担当します。

1 つのパーティションは Consumer Group 内の 1 つの Consumer にのみ割り当てられます。 そのため、Consumer の数はパーティション数以下になります。パーティション数を超えた Consumer を立ち上げても割り当てられず何もしない Consumer が生まれます。

つまり、パーティションとは並列処理の単位であり、パーティションの数が多いほど、同時に処理できる Consumer の数を増やすことができます。

パーティションがもたらす恩恵

あるパーティションを処理するのはただ 1 つの Consumerであり、基本的に、パーティション単位では Producer が送信した順序3で直列に処理されます。
この特徴がパーティションにおける強力さであると考えます。

システム開発において、1 プロセスのみで直列に 1 件ずつ処理を行う前提であれば、複数ノードを跨ぐ整合性の維持やレースコンディションの回避のための複雑性が抑えられシンプルに実装できますが、一方でスケーラビリティに欠けるため実際に採用するのは難しいケースも多いでしょう。
そこに対し、Kafka はパーティションによる分割で処理の並列性をもたせつつ、 1 Partition 1 Consumer のみの割り当てによって、パーティションに閉じた範囲であれば直列処理を前提とした実装が可能です。
例えば、ユーザー ID でルーティングして「あるユーザーに関するメッセージは常に同じパーティションに送る」ようにすれば、あるユーザーのメッセージをシーケンシャルに処理できます。
そうすれば、ユーザー単位で排他制御をかけたい場面において RDB のロック機構を使わなくても済むかもしれませんし、ユーザー単位のキャッシュを扱う場面では、ローカルインメモリのキャッシュでも一貫性をもたせることができます。

順序通りに処理する特性でいうと、更新イベントを Kafka に送り、そのイベントから永続化されたリソースをインクリメンタルに更新する用途や不正検知目的でアクティビティの特定パターンを抽出する用途など、処理する順番が意味を持つケースに使えるかと思います。

また、あるユーザーのイベント発火をカウントするなど、ストリーム処理で何らかの集計処理を行う場合、関連性のあるデータをパーティション単位で集まるようにしておけば、全部のデータから読み込んで集計する必要はなくなります。 Kafka のクライアントライブラリである Kafka Streams では、State Store という機能で、そのような局所性のあるデータをストリーム処理の状態として保持できます。

関連性のあるデータを見極めてグルーピングすることは、パーティションの設計に限らず、分散処理において重要な考え方になります。

運用中に直面するかもしれないパーティションの問題

ストリームアプリケーションを運用する中で直面する可能性がある問題について取り上げます。
新しく Topic を作成する際に検討しておくと未来のトラブルが避けられるかもしれません。

スループットを上げるためにパーティション数を増やしたい

スループットを上げるためには、パーティションを増やして並列度を上げるのが手っ取り早いですが、パーティション数を増やすのはそれなりに大変な場合があります。

メッセージのフィルタリングや別のコンポーネントに HTTP リクエストを送るなど、これまでに処理したメッセージ情報を使わない、ステートレスなアプリケーションで使っている Topic であれば比較的簡単に増やせます。
一方で、過去のイベントの集計結果をパーティション単位で保持しているケースなど、状態を持つステートフルなアプリケーションで使われている Topic の場合は一筋縄ではいかないことがあります。

Topic のパーティションを変更するということは、同じメッセージでもこれまでとは異なるパーティションに送られるということなので、ステートフルなアプリケーションで使われている Topic のパーティションを変えてしまうと、パーティション単位で保持しているデータの整合性が崩れてしまいます。
例えば、元々ユーザーA のメッセージは Partition 1 へ送られてデータ集計されていたが、パーティション変更したあとにユーザーAのメッセージは Partition 2 へ送られるようになってしまい、これまでの集計結果が使えなくなる。といったことが起きます。
また、複数 Topic で結合し、関連があるデータ同士を紐づけてエンリッチしているケースでは、複数 Topic のパーティション数を一気に変える必要性もでてきます。

後からパーティション数を増やすのは手間がかかるので、最初からアプリケーションの目標スループットを定め、余裕を持たせたパーティション数に設定しておくのが理想ではあります。

パーティションを増やさずにスループットを向上させたい場合、複数のワーカースレッドを立ち上げ、メッセージの処理を別スレッドにオフロードして並列実行する手段もとれます。直列に実行されなくなるので、順序保証を求めていない場合に使える手段です。
ただし、この方法を採用する場合、自動コミットの設定だとメッセージの処理が完了していないのにも関わらず完了したと見なされてしまう点に注意してください。
Kafka のメッセージは、パーティションごとに順番で並んでいて、あるメッセージがどの位置にあるかは「オフセット」という番号で管理されています。 Consumer は、どの位置までメッセージを処理したかを記録するために、取り出したメッセージの最新オフセットの次のオフセットを Broker にコミットします。これを「オフセットコミット」と呼び、定期的に実行しています4
メッセージの処理を非同期に行うことで、処理完了を待たずして次のメッセージを取ってくるため、処理途中のメッセージのオフセットも処理済みとしてコミットされてしまいます。

下図において、offset 2, 3 の処理が失敗していた場合は、Kafka の仕組み上では再実行されずメッセージがロストします。メッセージのロストを許容できない場合は、自動コミットではなく手動コミットにしてアプリケーション側でオフセットを管理する必要がありますが、複雑性は増します。

%%{init: {"theme": "dark"}}%%
sequenceDiagram
    participant ConsumerThread
    participant WorkerThread
    participant Broker

    Broker->>ConsumerThread: Fetch offset 1, 2, 3
    ConsumerThread-->>WorkerThread: Process offset 1
    ConsumerThread-->>WorkerThread: Process offset 2
    ConsumerThread-->>WorkerThread: Process offset 3

    WorkerThread-->>ConsumerThread: offset 1 done

    ConsumerThread->>Broker: Commit offset 4
    Note over ConsumerThread,Broker: offset 2, 3 は未完了のままコミット

    WorkerThread-->>ConsumerThread: offset 2 done
    WorkerThread-->>ConsumerThread: offset 3 done

パーティション数を変更しない他のやり方として、Apache Kafka 4.1 以降であれば、KIP-932 Queues for Kafka で導入された Share Groups の利用を試してみるとよいかもしれません。
Share Groups を使うことでパーティション数を超えた Consumer でメッセージを処理することができるようです。

意図しないパーティションにメッセージを送られていた

新たに Producer アプリケーションを実装したときに、パーティションの考慮が漏れていて意図していないパーティションに送られていた、というケースを考えます。
パーティションを決定するのは Producer なので、社内の Kafka の利用範囲が拡大するとこのようなことが起こり得ます。
ステートフルなアプリケーションにおいては、これまでに蓄積されたデータもあるため、誤ったパーティションに送っていたことを検知できたとしても、正しいパーティションにすんなり移し替えられるとは限りません。

そして、厄介なことに、誤ったパーティションに送られたとしても例外がスローされるわけではないため検知が難しく、時限爆弾的に一定期間経過してから問題が顕在化することもあります。

これの予防策としては、例えば、次のようなアクションが考えられます。

Repro は、後者の Consumer 側での検証に関して、ConsumerInterceptor を使って実現しています。

特定のパーティションにメッセージが偏った

特定パーティションにメッセージが集中して、そのパーティションの処理が大きく遅延する事象です。
送信先パーティションの決定をラウンドロビンやランダムに決めていたら問題になることはなさそうですが、ユーザーID や注文 ID など特定の情報を元にパーティションを決定している場合は起こる可能性があります。

こちらに関しては、まず一部のパーティションの処理が遅延していることを検知できること、そして原因となっているメッセージの特徴を把握することが重要です。
処理遅延の監視は、メッセージのタイムスタンプと処理完了した時刻のレイテンシを見るのが、ユーザー体験に近い指標であるのでよいと考えています。Kafka Streams を利用しているのであれば record-e2e-latency が使えます。

原因となるメッセージは、偏りが発生した時間帯の Topic メッセージから、パーティションを決定するときに使った値を抽出し、それらの発生数を集計することで特定できるかもしれません。
このとき、kcat を使ってコマンドライン上で集計するのもよいですが、Kafka のメッセージをデータウェアハウスに蓄積しておいて SQL で集計可能にするのが楽ではあります。

偏ったときのアプローチは状況によります。
パーティションの偏りの原因が、大量の不正なデータの投入であれば、特定パターンのデータをリジェクトする仕組みを導入すればよくて、通常のワークロードで起こり得るものであればそもそもパーティションのルーティングを変えたほうがよいかもしれません。

まとめ

今回は、Kafka のパーティションをテーマに、ストリームアプリケーションの実装ポイントや運用上の注意点を記載しました。

パーティションやメッセージのルーティングは、後から変えたいと思っても簡単に変更できないこともあり、初期実装の考慮漏れや設計の不備が、後々大きな修正コストとして跳ね返ってくることもあります。
だからこそ、今回取り上げたパーティションはもちろん、Kafka をはじめとした Repro で使っているミドルウェアへの理解を深めて先を見据えた設計を意識することで、未来の開発リソースをプロダクトの価値向上に集中できるようにしていきたいです。


  1. Pub/Sub メッセージングモデルの概念に Topic がありますが、そこから取り入れているのでしょうか。
  2. 基本的には Kafka の Client ライブラリが、メッセージの key のハッシュ値を計算して、同一 key は同じパーティションへ送るようにしています。明示的なパーティションを指定も可能です。
  3. Producer での送信の順序保証がされるかどうかは設定値によります。ref. https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#retries
  4. オフセットコミットは同期的に実行される負荷が高いオペレーションなので、一定間隔で行われます。