Kafka Brokerのcompaction動作の詳細とチューニング方法について

Reproでチーフアーキテクトとして働いているid:joker1007です。 今回、Kafka Brokerのcompaction動作について調査しチューニングすることでパフォーマンス改善の成果が得られたため、そのノウハウをブログにまとめておきました。 かなりマニアックな内容なので、需要は多くないと思いますが、私が調査した限りでは日本語で同じ様な内容のブログ記事はほとんど存在しなかったため、Kafkaを自前で運用している人にとっては役に立つ内容かもしれません。

compactionとは

(参考: https://kafka.apache.org/documentation/#compaction)

Kafkaの基本的なデータ削除ポリシーは一定時間が経過したら過去のデータをそのまま削除するdeleteというポリシーを使う。 これは、log.retention.hoursという設定でコントロールされておりデフォルトは7日である。

一方で、データベースのchange data capture (cdc)や、イベントソーシングや、ジャーナリング (Kafka StreamsでいうStateStoreのchangelog)などの用途で利用する場合に適したcompactというポリシーがある。

compactポリシーでは、レコードのキーが同一のものが複数存在したら、過去のキーが重複しているデータを削除して最新のもの一つにまとめる動作を行う。これによって余計なデータを削除するポリシーである。

123 => bill@microsoft.com
        .
        .
        .
123 => bill@gatesfoundation.org
        .
        .
        .
123 => bill@gmail.com

こんな風になっていたら、compaction実行後にはbill@gmail.comのデータだけが残る。

動作イメージ図

(source https://kafka.apache.org/32/images/log_compaction.png)

compactionの動作詳細

Compactionの実態はLogCleanerというクラスが管理していて、その中でCleanerThreadというスレッドが1つ以上動作しており、そのCleanerThreadが実際のcompaction処理を実施する。

ソースコードに記載されているdocには以下の様に書かれている。 https://github.com/apache/kafka/blob/b61edf2037ed6fb6bfed319e9f435f83354611f7/core/src/main/scala/kafka/log/LogCleaner.scala#L41-L86

The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O O'.

Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section. The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable.

The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "compact" retention policy and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.

To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of the implementation of the mapping.

Once the key=>last_offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log).

To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning.

Cleaned segments are swapped into the log as they become available.
One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.

Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner. The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.

Note that cleaning is more complicated with the idempotent/transactional producer capabilities. The following are the key points:
1. In order to maintain sequence number continuity for active producers, we always retain the last batch from each producerId, even if all the records from the batch have been removed. The batch will be removed once the producer either writes a new batch or is expired due to inactivity. 2. We do not clean beyond the last stable offset. This ensures that all records observed by the cleaner have been decided (i.e. committed or aborted). In particular, this allows us to use the transaction index to collect the aborted transactions ahead of time. 3. Records from aborted transactions are removed by the cleaner immediately without regard to record keys. 4. Transaction markers are retained until all record batches from the same transaction have been removed and a sufficient amount of time has passed to reasonably ensure that an active consumer wouldn't consume any data from the transaction prior to reaching the offset of the marker. This follows the same logic used for tombstone deletion.

ざっくり要約すると以下のことを行う。

  • Cleanerは、トピックのログデータをCleanセクションとDirtyセクションに分ける。そしてDirtyセクションをCleanableセクションとUncleanableセクションに分ける。Uncleanableなのは、今正に書いている最中のセグメントであるか、セグメントのタイムスタンプがmin.compaction.lag.ms以下のセグメントであるかである。
  • CleanerThreadは、Cleanableなトピックパーティションの中で、最もDirty Ratioが高いものを選択してcompactionを実行する。Dirty Ratioはパーティション全体のログのサイズとDirtyセクションの割合である。
  • compactionは以下の流れで行われる
    1. Dirtyセクションのkey => last_offsetのマッピングを構築する。これをOffsetMapと呼ぶ。OffsetMapはデフォルトではキーをMD5にした値とoffsetをペアにした値でそれが1エントリになる。つまり1エントリは16byte + 8 byteで24byteになる。
    2. CleanerThreadはパーティションのログを最小のオフセット値から順番にスキャンしていき、キーがOffsetMapと一致するレコードを省略して新しいセグメントファイルを構築する。
    3. 一定範囲のセグメント再構築が終わる度にセグメントファイルをswapして、順次置き換えながらcompactionを進める。
    4. OffsetMapを構築した地点までcompactionが進んだらそのイテレーションを完了とする。
  • ペイロードがNULLになっているレコードは削除対象として扱う。tombstoneレコードとして一定期間保持するが、その後のcompactionではレコードエントリはOffsetMapに無かったとしても削除される。

更に詳しく見ていこう。

CleanセクションとDirtyセクションの分け方

LogCleanerは前回までにCompactionした範囲をcleaner-offset-checkpointというファイルに記録している。このファイルは各log.dirに存在し、トピック名、パーティション、オフセットが記録されている。 LogCleanerは起動時にこのファイルを読んで、Cleanerがどこまで進んでいるかを復元し、それ以前と以後でCleanセクションとDirtyセクションを分けている。

OffsetMapのサイズ

OffsetMapはヒープメモリ上にアロケートされるため、サイズに上限がある。そのサイズはlog.cleaner.dedupe.buffer.sizeというコンフィグの値を、log.cleaner.threadsの値で割ったサイズで、1スレッド辺りの上限は2GBである。

つまり、一度のcompactionのイテレーションにおいては、このOffsetMapのサイズ上限に含むことができるキーの数しか対象にできないということになる。

compactionは1回実施する度に頭からログをスキャンしなおすので、ファイルサイズの大きなトピックパーティションの場合は1回のイテレーションにかなりの時間とI/O負荷をかけることになる。そのため、もしOffsetMapのサイズが十分でないと何度も余計なイテレーションを行うことになり、非常に非効率になる可能性がある。

ログによる動作の確認

KafkaはLogCleanerの動作についてデフォルトで専用ログを吐く様になっていると思う。(ディストリビューションやインストール方法に依るかもしれない)

例えば、confluent-kafkaだと/var/log/kafka/log-cleaner.logにログが出力される。 そのログを見ることで現在どの様に動作しているかを概ね把握することができる。 重要なログの例を以下に示す。

以下はLogCleanerのstatsのログで、compaction1回分のサマリが記録されている。

[2022-06-14 11:55:41,232] INFO [kafka-log-cleaner-thread-3]:
        Log cleaner thread 3 cleaned log <topic name> (dirty section = [17556518476, 17584106050])
        45,218.7 MB of log processed in 1,478.2 seconds (30.6 MB/sec).
        Indexed 7,352.6 MB in 204.9 seconds (35.9 Mb/sec, 13.9% of total time)
        Buffer utilization: 90.0%
        Cleaned 45,218.7 MB in 1273.4 seconds (35.5 Mb/sec, 86.1% of total time)
        Start size: 45,218.7 MB (161,199,718 messages)
        End size: 40,467.3 MB (142,787,217 messages)
        10.5% size reduction (11.4% fewer messages)
 (kafka.log.LogCleaner)

このdirty sectionで示されている部分が、今回のcompactionでcompaction対象となった範囲のoffsetを指している。

45,218.7 MB of log processed in 1,478.2 seconds (30.6 MB/sec). が今回のcompaction全体でかかった時間と対象になったファイルのサイズを示す。

Indexed 7,352.6 MB in 204.9 seconds (35.9 Mb/sec, 13.9% of total time) は、OffsetMapを作るためにかかった時間と読んだデータ量を示す。

Buffer utilization: 90.0% は dedupe.buffer.sizeのどれぐらいを利用したかを示している。この値はlog.cleaner.io.buffer.load.factorで何割まで使うかをコントロールできる。デフォルトは0.9(90%)に設定されている。

        Cleaned 45,218.7 MB in 1273.4 seconds (35.5 Mb/sec, 86.1% of total time)
        Start size: 45,218.7 MB (161,199,718 messages)
        End size: 40,467.3 MB (142,787,217 messages)
        10.5% size reduction (11.4% fewer messages)

上記は、実際の削除処理でどれぐらい時間がかかってどれぐらい読み込んだか、compaction前後でどれぐらいファイルサイズとレコード数が変化したかのサマリを示している。

このdirty sectionがパーティション全体のオフセットの先頭と余りに乖離していたり、流入量に比べて1回のcompactionで処理できているメッセージ量が余りに少なかったりすると、compactionが追い付いていない可能性がある。 上記の例の様に1パーティションが数十GBにもなると、実行時間もそれなりにかかるのでそれなりにチューニングが必要な処理ということが分かる。

下記に示す別のログを確認することで、今LogCleanerがどれぐらいの時期のOffsetに対して処理を行っているかをざっくり把握することができる。

Cleaning log <topic> (cleaning prior to Mon Jun 13 12:04:42 UTC 2022, discarding tombstones prior to Sat Jun 11 04:36:54 UTC 2022)... (kafka.log.LogCleaner)

このログは一つ目のtimestampがClean不可能な領域の中で最も古いoffsetを含むセグメントファイルのタイムスタンプを指している。そして二つ目のタイムスタンプは現在のイテレーションでtombstoneレコードを削除可能であるタイムスタンプを指している。

つまり、この例では6/11の4:36:54以前のtombstoneレコードは削除可能であるということになる。この日時がどうやって決まるかというと、現在のcompaction対象になっている範囲の最初のオフセットを含むセグメントファイルのtimestampとdelete.rentetion.ms(デフォルトでは1日)という設定項目によって決まる。

ソースコード上では下記の様になっている。 https://github.com/apache/kafka/blob/b61edf2037ed6fb6bfed319e9f435f83354611f7/core/src/main/scala/kafka/log/LogCleaner.scala#L504-L514

  private[log] def doClean(cleanable: LogToClean, currentTime: Long): (Long, CleanerStats) = {
    info("Beginning cleaning of log %s".format(cleanable.log.name))


    // figure out the timestamp below which it is safe to remove delete tombstones
    // this position is defined to be a configurable time beneath the last modified time of the last clean segment
    // this timestamp is only used on the older message formats older than MAGIC_VALUE_V2
    val legacyDeleteHorizonMs =
      cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
        case None => 0L
        case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
      }

つまり、上記の例だと、今削除対象になっているセグメントファイルの中で最も古いもののタイムスタンプは6/12の4:36:54になる。1つのセグメントファイルにどれぐらいのペースでデータが書き込まれるかに依るが、上記のログを見る限りでは、極端にcompactionが遅れている訳ではないことが分かる。

重要な設定項目のまとめ

ここまでで説明した様に、Kafkaのcompactionはデータ量が増えるとかなり重い処理となる可能性があり、ある程度の状況のモニタリングやチューニングが必要になる。

チューニングにおいて重要な設定項目とそれが示す意味の詳細を下記にまとめておく。

  • log.cleaner.threads: compactionを行うスレッドの数を設定する。1スレッドが同時にcompactionできるのは一つのトピックパーティションだけなので、扱うトピックやパーティションが多い程多くのスレッドが必要になる。ただしスレッド多くすればする程、CPUとディスクに負荷がかかる。
  • log.cleaner.dedupe.buffer.size: logの重複排除に使うバッファサイズで全スレッドにまたがった設定値。言い換えるとOffsetMapを保持できるメモリサイズの設定値である。各スレッドの個別のバッファサイズは単純にこの値をlog.cleaner.threadsの値で割った値になる。1GBで4スレッドはら256MBになる。256MBのOffsetMapで保持できるキーの数はおよそ1100万件なので、一度のcompactionで処理できる量の上限はそこで決まってしまう。流入量やパーティション全体のサイズが大きい場合は、この値を大きくしておかないとcompactionの1イテレーションで処理できる数が不足し、処理が遅れる可能性がある。
  • log.cleaner.delete.retention.ms: compactionを行った後、tombestoneレコードをどれぐらい保持しておくかの設定値。ログを読み解く時の情報としては便利だが、チューニングにおいてはそこまで重要ではない様に思う。但し、この設定値を余り短くすると、処理が遅いconsumerがそのレコードを読む前にnullレコード(tombstone)が削除されてしまう可能性がある。もし、そのconsumerがnullレコードを読んで別のリソースの削除をするなどの処理に利用している場合は、開発者の想定外の挙動を生む可能性があることに注意する必要がある。
  • log.cleaner.io.buffer.load.factor: log.cleaner.dedupe.buffer.sizeがどれぐらい埋まったらフルになったと見做すかの設定値。デフォルトでは90%埋まった時点でフルになったと見做す。どうも実際の設定内容と名前が合ってない気がする。
  • log.cleaner.io.buffer.size: ログをスキャンする時に1度に読み込んで保持しておくメモリ領域のサイズ。これは初期値であってメッセージのサイズが大きければ必要に応じて拡張される。これも全スレッドで共通で、各スレッドのサイズはlog.cleaner.threadsで割った値になる。
  • log.cleaner.min.cleanable.ratio: 許容できるCleanable Ratio(Dirty ratio)の最小値を設定する。この値よりCleanable Ratioが大きくなるとcompactionが必要と見なす。デフォルトでは0.5なので、半分以上のデータがDirtyセクションに含まれる様になるとcompactionの対象になる。ディスク消費量と負荷とのトレードオフになる。
  • log.cleaner.min.compaction.lag.ms: ログの中でcompactionされないまま残っているデータを最低どれぐらい保持するかの設定値。現在cleanableなセグメントファイルの中で最も新しいタイムスタンプと現在時刻とのラグが、この設定値を越えるまではcompaction対象にならない。効果が薄いcompactionの頻度を制限するのに効果がある。
  • log.cleaner.max.compaction.lag.ms: ログの中でcompactionされないまま残っている期間が、この設定値の期間を越えるとcompactionの対象になる。Cleanable ratioに関わらず、一定期間でcompactionを実行したい場合に有効な設定になる。

但し、log.cleaner.max.compaction.lag.ms及び、log.cleaner.min.compaction.lag.msクラスタレベルで設定する場合は注意が必要である。 これらの設定は全トピックに影響を与えるため、log.cleaner.max.compaction.lag.msを設定するとcompactionの効果が薄いトピックも強制的にcompactionが走る様になって本来やって欲しいトピックパーティションに対するcompactionが遅延したり、一方でlog.cleaner.min.compaction.lag.msを設定しても、頻繁にcompactionして欲しいトピックで処理スパンが長過ぎて無駄にストレージを消費したり、上述したlog.cleaner.dedupe.buffer.sizeが不足していて蓄積されているdirtyセクション分のcompactionが中々追い付かないといったことに繋がる可能性がある。

こういった問題を避けるため、log.cleaner.max.compaction.lag.mslog.cleaner.min.compaction.lag.ms (場合によってはlog.cleaner.min.cleanable.ratioも) の設定項目はkafka-configsコマンドを利用してトピックレベルの設定で上書きする様にコントロールした方が安全にチューニングできる。

有用なメトリック

JMXのMBeanで示している。

  • kafka.log:type=LogCleanerManager,name=max-dirty-percent: そのホストのdirty ratioの最大値。上手く動作していればlog.cleaner.min.cleanable.ratioに収束する。
  • kafka.log:type=LogCleaner,name=max-buffer-utilization-percent: log.cleaner.dedupe.buffer.sizeの利用率の最大値。log.cleaner.io.buffer.load.factorに貼りついていれば、bufferが足りていない可能性が高い。
  • kafka.log:type=LogCleaner,name=max-clean-time-secs: そのホストでcompactionにかかった所要時間の最大値。
  • kafka.log:type=LogCleaner,name=max-compaction-delay-secs: そのホストでcomactionが可能なタイムスタンプより遅延している時間の最大値。これがlog.cleaner.min.compaction.lag.mslog.cleaner.max.compaction.lag.msと比べてみて、大きくズレているならcompaction遅延の可能性が高い。

その他の参考資料