Kafka Streams の State Store が tombstone によって遅くなってしまったので RocksDB のパラメータを調整してみた話

Platform Team の Repro Core という Unit に所属している村上と申します。

Repro Core の役割の 1 つとして、共通基盤となる Kafka Streams アプリケーションの運用があります。
この共通基盤は Repro の大量トラフィックを捌いている基盤になるため、日々の運用の中で様々な課題に直面します。
今回はそのような課題の中から、tombstone によって State Store のパフォーマンスが低下し、その解決策として RocksDB のパラメータを調整した話をします。

前半部分では tombstone によって State Store のパフォーマンスが低下した件を説明します。後半は RocksDB の compaction の挙動確認とそのパラメータ調整について説明します。

ちなみに、私が所属している Repro Core については、「続・何でも屋になっている SRE 的なチームから責務を分離するまでの道のり 〜新設チームでオンコール体制を構築するまで〜」の記事で説明がされているので、よろしければそちらもご覧になってください。

tombstone によって State Store のパフォーマンスが低下した

State Store とは

State Store は、Kafka Streams のコンポーネントのひとつで、ストリーム処理における状態を保持するための機能です。

レコードの変換や単純なフィルタリングなどの操作では、流れてきたレコードのみに対して処理を行えば良いので状態の保持は不要です。
しかし、別ストリームのレコードとの結合や複数レコードの集約といった、過去に処理したレコードの情報を使用するケースでは一時的にデータを保持する必要があり、そのようなステートフルな操作において State Store が使われます。
例えば、直近 1 分間のレコード数をカウントしてダウンストリームに転送する、という処理では過去のレコードを State Store に保存することで実現することになるでしょう。

State Store はデータの保存先にインメモリとディスクの 2 つの選択肢があります。
ディスクに保存する場合、アプリケーションに埋め込み可能な key-value store である RocksDB をバックエンドに使用します。 Kafka Streams はフォールトトレラントの実現のため、デフォルトで State Store の更新を changelog として Kafka topic に書き込むので、 仮にディスク上のデータが消えたとしても Kafka topic からデータを復元できるようになっています。

RocksDB の構造と tombstone

RocksDB では LSM Tree というデータ構造を採用していて、RocksDB での書き込みは一旦メモリ上の MemTable に保存され、それが一定サイズに達するとソート済み文字列テーブル(SSTable)として、SST ファイルがディスクに書き込まれます。 同一 key に対して複数回の書き込みを行うと複数のレコードが生成されますが、最新レコードは特定可能であり、重複するレコードは compaction という仕組みによって後で削除されます。

レコードの削除の場合に書き込まれるのは tombstone という削除マーカーです。tombstone のレコードは検索からは弾かれるようにはなりますが、データ自体はすぐには消えません。 その後、compaction がトリガーされたときに tombstone が物理的に削除されます。

問題となった処理

今回問題になった処理は State Store をバッファのように使っている箇所で、一時的に登録されたレコードを別の store に移し、移動が完了したレコードを削除するという、いわゆる flush のような処理をしていました。 flush される前にレコードが消えてしまうと不整合が発生するため、対象の State Store は永続化されており retention period 等で自動削除される設定もしていません。

State Store に登録されるレコードは、key のフォーマットによってグループを表現していて、flush はそのグループ単位で行われます。

例えば、以下のような key で登録されていたとき、prefix が storekey-1storekey-2 のグループにそれぞれ分けられ、そのグループ単位で flush が走るようなイメージです。

storekey-1:a
storekey-1:b
storekey-2:a
storekey-2:b
storekey-2:c

そして、flush 処理が行われるタイミングは次の通りです。

  • State Store にレコードを登録するタイミングで、対象のグループにおける最後の flush 処理から一定時間経過していた場合、そのグループの flush 処理を実行する
  • 定期的にすべてのグループを対象に flush 処理を実行する

今回、問題となったのは後者の定期処理の方です。

この定期処理の仕組みは後から導入したもので、State Store にはある程度データが溜まっている状態でのリリースでした。
リリース後、初回の実行は問題なく完了したのですが、2 回目以降の実行から処理時間が異様に長くなるという現象が起こり、頭を抱えることになりました。

定期処理は Kafka Streams の Punctuator を使って実行していて、実行中は通常のストリーム処理が止まってしまうため、 実行時間が長くなるとその分レコードの処理が遅延してリアルタイム性が損なわれます。 そのため、この定期処理は問題が解消されるまで一時的に止めることにしました。

原因の特定

原因の特定では、単純に定期処理の各ステップの処理時間を計測をしてボトルネックを探りました。
本番で確認した際は flush の対象数を減らし、特定のパーティションのみに対して実行することで、定期処理による遅延の影響を最小限に抑えてます。

そして計測結果から、State Store の Range Iterator を取得するメソッドに時間がかかっていることがわかりました。
ここの処理は flush 対象のレコードの有無を調べるため、flush するグループ単位で呼んでいた箇所であり、呼ばれる回数も多いです。

また、次のような状況を踏まえると、初回実行の削除で 大量に積まれた tombstone が RocksDB の Scan に大きく影響していることが考えられます。

実際に手元の環境で動かしてみたところ tombstone によって seek 処理が遅くなる事象は確認できました。

tombstone によるパフォーマンスの課題は RocksDB に限らず LSM Tree モデルを採用しているデータストアでは一般的に見られる課題でしょう。

対応方針

tombstone が原因となるのであればそれらを効率的に排除することで改善が見込めるため、 RocksDB のパラメータを調整し、最適な compaction の設定を探ることにしました。
RocksDB には DeleteRangeCompactRange など、今回のケースで使えそうな API がありますが、当時の Kafka Streams の version では State Store から使えませんでした。

RocksDB のパラメータを調整する

パラメータ調整にあたって、実際の compaction の挙動を見ていきます。

RocksDB では異なるワークロードに合わせて Compaction Style を選択でき、今回確認するのは以下の Compaction Style です。

それぞれの詳細に関しては公式 Wiki をご覧ください。ここでは簡単に説明します。

Leveled Compaction は RocksDB のデフォルト Compaction Style で、サイズによるレベル分けで管理していて、各レベルには複数の SST ファイルが存在します。
MemTable から flush された SST ファイルはまず Level 0 に配置され、Level 0 のファイル数が指定数を超えたとき compaction がトリガーされ Level 1 にマージされます。 Level 1 に存在する SST ファイルの合計サイズが指定サイズを超えたら Level 2 へマージされます。以降も同様に Level N の SST ファイルの合計サイズが指定サイズを超えたら Level N+1 にマージされます。
通常、レベルが 1 つ上がると指定サイズは 10 倍に増えます。

Universal Compaction は SST ファイルのデータの更新時刻によって各レベルに配置されます。一番古いファイルが最後のレベルに配置されて最新のファイルは Level 0 に配置されます。 MemTable から flush された SST ファイルはまず Level 0 に配置され、全体の SST ファイルが指定数を超えて、かつ 4 つの条件 のうちいずれかを満たしたときに Compaction がトリガーされます。

Compaction Style の選択において、amplification-factors を考慮するのが良いです。

Universal Compaction は Leveled Compaction に比べて Write amplification が小さくなるため、書き込み性能に優れています。
しかしその一方で、Universal Compaction は Read amplification が大きくなり読み込み性能に劣ることがあります。
ref. https://github.com/facebook/rocksdb/wiki/Universal-Compaction#conceptual-basis

Kafka Streams の State Store では、高い書き込み性能を求めるために、デフォルトで Universal Compaction が選択されています。

事前準備

Kafka Streams と同様 rocksdbjni 経由で RocksDB の API 使うために、次のような簡易的な java application を用意して、RocksDB に put する量を調整しつつ compaction の状況を見ます。
RocksDB に put した後、MemTable を flush して rocksDb.getProperty("rocksdb.stats") で Level 毎の SST ファイルや compaction の結果を出力します。

なお、RocksDB の version は当時使っていた 6.19.3 にしています。

package rocksdb.sample;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.UUID;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionOptionsUniversal;
import org.rocksdb.CompactionPriority;
import org.rocksdb.CompactionStyle;
import org.rocksdb.DBOptions;
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class App {
  public static void main(String[] args) {
    // openRocksDBUniversalCompaction() or openRocksDBLevelBasedCompaction() 
    try (var rocksDb = openRocksDBUniversalCompaction()) {
      add(rocksDb, 1000);
    } catch (RocksDBException | InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  public static RocksDB openRocksDBUniversalCompaction() throws RocksDBException {
    var dbOptions = new DBOptions();
    var columnFamilyOptions = new ColumnFamilyOptions();
    columnFamilyOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
    var option = new CompactionOptionsUniversal();
    dbOptions.setCreateIfMissing(true);
    
    // write parameter settings here

    var columnFamilyDescriptors = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
    var columnFamilies = new ArrayList<ColumnFamilyHandle>(columnFamilyDescriptors.size());

    return RocksDB.open(dbOptions, "universal", columnFamilyDescriptors, columnFamilies);
  }

  public static RocksDB openRocksDBLevelBasedCompaction() throws RocksDBException {
    var dbOptions = new DBOptions();
    var columnFamilyOptions = new ColumnFamilyOptions();
    columnFamilyOptions.setCompactionStyle(CompactionStyle.LEVEL);
    columnFamilyOptions.setCompactionPriority(CompactionPriority.ByCompensatedSize);
    dbOptions.setCreateIfMissing(true);

    // write parameter settings here

    var columnFamilyDescriptors = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
    var columnFamilies = new ArrayList<ColumnFamilyHandle>(columnFamilyDescriptors.size());

    return RocksDB.open(dbOptions, "level", columnFamilyDescriptors, columnFamilies);
  }


  public static void add(RocksDB rocksDb, int amount) throws RocksDBException, InterruptedException {
    for (int i = 0; i < amount; i++) {
      var uuid = UUID.randomUUID().toString();
      rocksDb.put(uuid.getBytes(StandardCharsets.UTF_8), uuid.getBytes(StandardCharsets.UTF_8));
    }

    // flush memtable
    rocksDb.flush(new FlushOptions());

    while (true) {
      var runningCount = rocksDb.getProperty("rocksdb.num-running-compactions");

      if (runningCount.equals("0")) {
        break;
      }
      System.out.println("running: " + runningCount);
      Thread.sleep(100);
    }

    System.out.println(rocksDb.getProperty("rocksdb.stats"));
  }
}

Univarsal Compaction

まずはデフォルトのパラメータで試してみます。

public static RocksDB openRocksDBUniversalCompaction() throws RocksDBException {
  var dbOptions = new DBOptions();
  var columnFamilyOptions = new ColumnFamilyOptions();
  columnFamilyOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
  dbOptions.setCreateIfMissing(true);
  var columnFamilyDescriptors = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
  var columnFamilies = new ArrayList<ColumnFamilyHandle>(columnFamilyDescriptors.size());

  return RocksDB.open(dbOptions, "universal", columnFamilyDescriptors, columnFamilies);
}

今回の挙動確認において関連しそうなパラメータ値は次の通りです。

Options.level0_file_num_compaction_trigger: 4
Options.compaction_options_universal.size_ratio: 1
Options.compaction_options_universal.min_merge_width: 2
Options.compaction_options_universal.max_merge_width: 4294967295
Options.compaction_options_universal.max_size_amplification_percent: 200
Options.periodic_compaction_seconds: 2592000

挙動確認

まずは 10 万レコードを 1 回追加

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      1/0    4.42 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     38.9      0.11              0.11         1    0.114       0      0       0.0       0.0
 Sum      1/0    4.42 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     38.9      0.11              0.11         1    0.114       0      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     38.9      0.11              0.11         1    0.114       0      0       0.0       0.0

L0 に 1 ファイル生成されました。

SST ファイルが level0_file_num_compaction_trigger で設定した数以上のときに compaction がトリガーされます。
ref. https://github.com/facebook/rocksdb/wiki/Universal-Compaction#compaction-picking-algorithm

Compaction をトリガーするため、続けて 10 万レコードの追加を 3 回実行

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     33.2      0.13              0.13         1    0.131       0      0       0.0       0.0
  L6      1/0   16.20 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   0.9     28.3     26.2      0.62              0.61         1    0.617    400K      0       0.0       0.0
 Sum      1/0   16.20 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   4.7     23.4     27.5      0.75              0.74         2    0.374    400K      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   4.7     23.4     27.5      0.75              0.74         2    0.374    400K      0       0.0       0.0

4 つの L0 のファイルが、compaction でマージされて L6 に配置されました。

level0_file_num_compaction_triggerlevel0命名付けられているものの、Universal compaction では全レベルの合計ファイル数になります。
そのため、次は L0 に 3 つ積まれた段階で compaction が走るでしょう。

追加レコード数を 1 万レコードに減らして 2 回実行

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      2/0   902.95 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     27.1      0.02              0.02         1    0.016       0      0       0.0       0.0
  L6      1/0   16.20 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   0.0      0.0      0.0      0.00              0.00         0    0.000       0      0       0.0       0.0
 Sum      3/0   17.09 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     27.1      0.02              0.02         1    0.016       0      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     27.1      0.02              0.02         1    0.016       0      0       0.0       0.0

追加レコード数 1 万で 1 回実行

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     41.8      0.01              0.01         1    0.011       0      0       0.0       0.0
  L5      1/0    1.30 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0     36.1     35.5      0.04              0.04         1    0.037     30K      0       0.0       0.0
  L6      1/0   16.20 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   0.0      0.0      0.0      0.00              0.00         0    0.000       0      0       0.0       0.0
 Sum      2/0   17.51 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   4.0     28.0     36.9      0.05              0.05         2    0.024     30K      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   4.0     28.0     36.9      0.05              0.05         2    0.024     30K      0       0.0       0.0

L0 のファイルがマージされて L5 に配置されました。
このとき、L6 までマージされないのは、次の 4 つの条件をいずれも満たしていなかったからです。

  1. Compaction triggered by age of data
    • periodic_compaction_seconds は 2592000 であり、periodic_compaction_seconds よりも古いファイルは存在しない
    • ちなみに periodic_compaction_seconds を変更する API は、6.19.3 の jni version ではサポートしていなかった
  2. Compaction Triggered by Space Amplification
    • max_size_amplification_percentは 200 であり、L6 に存在する最古のファイルのサイズ(16.20MB)が、その他のファイルの合計サイズの 1/2 になっていなければいけない。
    • 1.3 / 16 = 0.0802...なので、max_size_amplification_percent が 8 以下であれば L6 までCompactionされていた
  3. Compaction Triggered by number of sorted runs while respecting size_ratio
    • compaction_options_universal.size_ratio は 1 であり、16.20MB / (CompactionされたL0の合計値) <= 101/100 を満たしていなかった
  4. Compaction Triggered by number of sorted runs without respecting size_ratio
    • max_merge_width は 4294967295 であり、全然達してない

ref. https://github.com/facebook/rocksdb/wiki/Universal-Compaction#compaction-picking-algorithm

この状況では L0 に 2 ファイル生成すると compaction がトリガーされます。

追加レコード数1万で2回実行

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     34.0      0.03              0.02         2    0.013       0      0       0.0       0.0
  L4      1/0   893.75 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0     29.0     28.7      0.03              0.03         1    0.030     20K      0       0.0       0.0
  L5      1/0    1.30 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   0.0      0.0      0.0      0.00              0.00         0    0.000       0      0       0.0       0.0
  L6      1/0   16.20 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   0.0      0.0      0.0      0.00              0.00         0    0.000       0      0       0.0       0.0
 Sum      3/0   18.38 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   2.0     15.7     31.2      0.06              0.05         3    0.019     20K      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   3.0     20.7     30.9      0.04              0.04         2    0.021     20K      0       0.0       0.0

L4 にマージされました。

size_ratio による compaction によって L4 のファイルを L5 のファイルにマージしてみます。 L4 に存在するファイルのサイズが L5 に存在するファイルのサイズを上回るように追加します。

追加レコード数 1 万で 2 回実行する

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     43.4      0.02              0.02         2    0.010       0      0       0.0       0.0
  L5      1/0    3.04 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.7     36.6     36.4      0.08              0.08         1    0.084     70K      0       0.0       0.0
  L6      1/0   16.20 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   0.0      0.0      0.0      0.00              0.00         0    0.000       0      0       0.0       0.0
 Sum      2/0   19.25 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   4.4     29.5     37.8      0.10              0.10         3    0.035     70K      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   7.9     32.7     37.2      0.09              0.09         2    0.047     70K      0       0.0       0.0

size_ratio による compaction 条件が満たられたので、L4 のファイルが L5 にマージされました。

最後に Space Amplification による full compaction を試します。
現時点だと 3.04/16.20 = 0.1876... なので、max_size_amplification_percent = 18 にします。

public static RocksDB openRocksDBUniversalCompaction() throws RocksDBException {
  var dbOptions = new DBOptions();
  var columnFamilyOptions = new ColumnFamilyOptions();
  columnFamilyOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
+ var option = new CompactionOptionsUniversal();
+ option.setMaxSizeAmplificationPercent(18);
+ columnFamilyOptions.setCompactionOptionsUniversal(option);
  dbOptions.setCreateIfMissing(true);
  var columnFamilyDescriptors = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
  var columnFamilies = new ArrayList<ColumnFamilyHandle>(columnFamilyDescriptors.size());  

  return RocksDB.open(dbOptions, "universal", columnFamilyDescriptors, columnFamilies);
}

追加レコード数は 1 件で、2 回実行して(SST ファイルを 2 つ生成して) compaction を実行させる

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0      1.4      0.00              0.00         1    0.001       0      0       0.0       0.0
  L6      1/0   19.00 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   6.2     27.6     27.3      0.70              0.68         1    0.697    470K      0       0.0       0.0
 Sum      1/0   19.00 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0 19670.0     27.6     27.3      0.70              0.68         2    0.349    470K      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0 19670.0     27.6     27.3      0.70              0.68         2    0.349    470K      0       0.0       0.0

full compaction が起こって L6 にすべてマージされました。

Leveled Compaction

以下の設定で試します。

public static RocksDB openRocksDBLevelBasedCompaction() throws RocksDBException {
  var dbOptions = new DBOptions();
  var columnFamilyOptions = new ColumnFamilyOptions();
  columnFamilyOptions.setCompactionStyle(CompactionStyle.LEVEL);
  columnFamilyOptions.setCompactionPriority(CompactionPriority.ByCompensatedSize);
  dbOptions.setCreateIfMissing(true);
  
  columnFamilyOptions.setMaxBytesForLevelBase(100000);
  columnFamilyOptions.setTargetFileSizeBase(25000);

  var columnFamilyDescriptors = Collections.singletonList(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions));
  var columnFamilies = new ArrayList<ColumnFamilyHandle>(columnFamilyDescriptors.size());

  return RocksDB.open(dbOptions, "level", columnFamilyDescriptors, columnFamilies);
}

設定値の説明

columnFamilyOptions.setCompactionPriority(CompactionPriority.ByCompensatedSize);

tombstone の多いファイルがコンパクション対象として選ばれやすくなるオプションです。 あくまで優先度の話なのでどのくらいの精度があるかはわかりません。
ref.https://rocksdb.org/blog/2016/01/29/compaction_pri.html

columnFamilyOptions.setMaxBytesForLevelBase(100000);

max_bytes_for_level_base は L1 のターゲットサイズです。
デフォルトは max_bytes_for_level_base=268435456 であり、各レベルを満たすには大量のレコードを積まないといけないので、compaction の確認を簡易化するために減らしました。

LeveledCompactionでは各レベルのターゲットサイズは次の計算で決まります。(level_compaction_dynamic_level_bytes=0のとき)

Target_Size(Ln+1) = Target_Size(Ln) * max_bytes_for_level_multiplier * max_bytes_for_level_multiplier_additional[n]

ref. https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-false

max_bytes_for_level_multiplier_additional[n] はデフォルトで 1 で、max_bytes_for_level_multiplier は10なので、
L1 が 100 KB, L2 が 1 MB, L3 が 10 MB...といったターゲットサイズになります。

columnFamilyOptions.setTargetFileSizeBase(25000);

SST 1 ファイルあたりの最大サイズです。max_bytes_for_level_base の変更に伴い調整してます。

挙動確認

予め各レベルに散らばるように、1000 件のレコード追加を 100 回実行する

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      1/0   45.96 KB   0.5      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     23.1      0.19              0.18       100    0.002       0      0       0.0       0.0
  L1      5/0   93.68 KB   1.0      0.0     0.0      0.0       0.0      0.0       0.0   1.6     16.9     17.1      0.41              0.38        33    0.013    154K      0       0.0       0.0
  L2     41/0   967.90 KB   1.0      0.0     0.0      0.0       0.0      0.0       0.0   6.3     17.3     17.3      1.60              1.49       163    0.010    623K      0       0.0       0.0
  L3    131/0    3.25 MB   0.3      0.0     0.0      0.0       0.0      0.0       0.0   4.1     16.8     16.6      0.59              0.55        86    0.007    227K      0       0.0       0.0
 Sum    178/0    4.33 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0  10.9     15.9     17.5      2.79              2.59       382    0.007   1005K      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     25.4      0.00              0.00         1    0.002       0      0       0.0       0.0

各レベル毎、ターゲットサイズに近いサイズで収まっているので、想定通りになっています。

次に、以下のようなコードを実行し、登録したレコード数をすべて削除して、どのくらいのtombstoneが残るのかを見てみます。

public static void delete(RocksDB rocksDb, int amount) throws RocksDBException, InterruptedException {
  var iterator = rocksDb.newIterator();
  iterator.seekToFirst();
  for (int i = 0; i < amount; i++) {
    if (!iterator.isValid()) {
      break;
    }
    rocksDb.delete(iterator.key());
    iterator.next();
  }
  rocksDb.flush(new FlushOptions());
  while (true) {
    var runningCount = rocksDb.getProperty("rocksdb.num-running-compactions");
    if (runningCount.equals("0")) {
      break;
    }
    System.out.println("running: " + runningCount);
    Thread.sleep(100);
  }
  System.out.println(rocksDb.getProperty("rocksdb.stats"));
}

1000 件のレコード削除を同じく 100 回実行して、すべてのレコードに対して削除リクエストを行う

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      1/0   44.14 KB   0.9      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     23.3      0.18              0.15       100    0.002       0      0       0.0       0.0
  L1      1/0   21.50 KB   0.5      0.0     0.0      0.0       0.0      0.0       0.0   1.4     20.3     19.9      0.10              0.09        33    0.003     48K   1395       0.0       0.0
  L2     19/0   465.88 KB   1.0      0.0     0.0      0.0       0.0      0.0       0.0   1.7     22.4     19.7      0.32              0.29        96    0.003    160K    19K       0.0       0.0
  L3     28/0   389.04 KB   0.0      0.0     0.0      0.0       0.0     -0.0       0.0   0.8     43.3     14.3      0.24              0.22       126    0.002    236K   158K       0.0       0.0
 Sum     49/0   920.57 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   3.7     23.1     19.0      0.84              0.75       355    0.002    445K   179K       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   2.1     13.9     24.6      0.00              0.00         2    0.002    1167     46       0.0       0.0

トータルのファイルサイズで見ると、4.33 MB → 920 KB まで減っています。残りはすべて tombstone。

ここから、もう一度 1000 件のレコード追加を 100 回実行する

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     21.1      0.21              0.20       100    0.002       0      0       0.0       0.0
  L1      3/0   57.95 KB   0.6      0.0     0.0      0.0       0.0      0.0       0.0   1.6     14.7     14.9      0.49              0.46        34    0.014    159K      0       0.0       0.0
  L2     42/0   973.97 KB   1.0      0.0     0.0      0.0       0.0      0.0       0.0   6.2     15.5     15.4      1.82              1.71       171    0.011    627K   1105       0.0       0.0
  L3    136/0    3.33 MB   0.3      0.0     0.0      0.0       0.0      0.0       0.0   2.3     16.7     15.0      0.59              0.56        87    0.007    228K    18K       0.0       0.0
 Sum    181/0    4.34 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0  10.9     14.5     15.6      3.12              2.92       392    0.008   1015K    19K       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0  46.8     15.7     16.0      0.13              0.12        12    0.011     46K      0       0.0       0.0

トータルのサイズが、最初に 1000 * 100 件のレコードを追加したときと同じくらいであり、KeyDrop の数も積まれているので残っていた tombstone が削除されています。

再び、1000 件のレコード削除 100 回実行して、すべてのレコードに対して削除リクエストを行う

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      1/0   44.13 KB   0.9      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     20.1      0.21              0.18       100    0.002       0      0       0.0       0.0
  L1      1/0   28.83 KB   0.6      0.0     0.0      0.0       0.0      0.0       0.0   1.5     16.3     15.5      0.04              0.03         9    0.004     13K    752       0.0       0.0
  L2     17/0   447.86 KB   1.0      0.0     0.0      0.0       0.0      0.0       0.0   1.5     19.1     16.6      0.33              0.31        87    0.004    144K    19K       0.0       0.0
  L3     23/0   359.06 KB   0.0      0.0     0.0      0.0       0.0     -0.0       0.0   0.7     41.1     11.6      0.24              0.21       122    0.002    223K   161K       0.0       0.0
 Sum     42/0   879.88 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   3.0     20.3     16.0      0.82              0.73       318    0.003    381K   180K       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   2.8     17.9     15.9      0.01              0.01         3    0.003    3093   1332       0.0       0.0

レコード追加 → 全削除のセットを 2 回行った場合、1 回目と比べて tombstone の量は増えていないので、徐々に tombstone が増え続ける事象は避けられているでしょう。

最後に compactRange を実行

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L2      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   3.0     21.0     17.1      0.01              0.01         1    0.013    6030   1118       0.0       0.0
  L3      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0     -0.0       0.0   0.0     94.1      1.1      0.01              0.01         2    0.004     18K    18K       0.0       0.0
 Sum      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0     -0.0       0.0 235274.0     51.0     10.6      0.02              0.02         3    0.007     24K    19K       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0     -0.0       0.0 235274.0     51.0     10.6      0.02              0.02         3    0.007     24K    19K       0.0       0.0

残っていた tombstone が消えて空になりました。


次は、以下のように実際のケースに近づけて試してみます。

  1. 1000 件のレコード追加を 100 回実行してレコードを溜めておく
  2. 1000 件追加 → 1000 件削除のセットを 100 回実行する (削除対象選定はソート順ではなくランダムにする)

1000 件のレコード追加を 100 回実行

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      1/0   45.84 KB   0.5      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     25.7      0.17              0.15       100    0.002       0      0       0.0       0.0
  L1      4/0   88.02 KB   0.9      0.0     0.0      0.0       0.0      0.0       0.0   1.6     18.9     19.2      0.37              0.34        33    0.011    155K      0       0.0       0.0
  L2     43/0   958.73 KB   1.0      0.0     0.0      0.0       0.0      0.0       0.0   6.4     19.3     19.2      1.44              1.31       163    0.009    624K      0       0.0       0.0
  L3    133/0    3.27 MB   0.3      0.0     0.0      0.0       0.0      0.0       0.0   3.4     18.6     18.3      0.42              0.39        77    0.006    182K      0       0.0       0.0
 Sum    181/0    4.33 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0  10.5     17.7     19.5      2.41              2.19       373    0.006    961K      0       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     26.6      0.00              0.00         1    0.002       0      0       0.0       0.0

1 回の追加 → 削除の実行に使うコードは以下です。

public static void add_and_delete(RocksDB rocksDb, int amount) throws RocksDBException, InterruptedException {
  for (int i = 0; i < amount; i++) {
    var uuid = UUID.randomUUID().toString();
    rocksDb.put(uuid.getBytes(StandardCharsets.UTF_8), uuid.getBytes(StandardCharsets.UTF_8));
  }

  var iterator = rocksDb.newIterator();
  iterator.seekToFirst();
  var list = new ArrayList<byte[]>();
  for (int i = 0; i < amount; i++) {
    if (!iterator.isValid()) {
      break;
    }
    list.add(iterator.key());
    iterator.next();
  }

  Collections.shuffle(list);
  for (int i = 0; i < amount; i++) {
    rocksDb.delete(list.get(i));
  }

  rocksDb.flush(new FlushOptions());

  while (true) {
    var runningCount = rocksDb.getProperty("rocksdb.num-running-compactions");

    if (runningCount.equals("0")) {
      break;
    }
    System.out.println("running: " + runningCount);
    Thread.sleep(100);
  }

  System.out.println(rocksDb.getProperty("rocksdb.stats"));
}

1000 件追加 → 1000 件削除のセットを 100 回実行

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L0      1/0   68.24 KB   0.9      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     28.7      0.26              0.24       100    0.003       0      0       0.0       0.0
  L1      5/0   97.26 KB   1.0      0.0     0.0      0.0       0.0      0.0       0.0   1.4     20.5     18.5      0.56              0.51        50    0.011    257K    29K       0.0       0.0
  L2     43/0   969.99 KB   1.0      0.0     0.0      0.0       0.0      0.0       0.0   6.8     18.4     18.2      1.90              1.75       177    0.011    768K   5603       0.0       0.0
  L3    133/0    3.28 MB   0.3      0.0     0.0      0.0       0.0      0.0       0.0   2.8     22.9     16.8      0.97              0.90       206    0.005    516K   132K       0.0       0.0
 Sum    182/0    4.38 MB   0.0      0.1     0.0      0.0       0.1      0.0       0.0   9.3     18.6     18.6      3.69              3.40       533    0.007   1542K   167K       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   1.0      0.0     30.1      0.00              0.00         1    0.002       0      0       0.0       0.0

トータルのファイルサイズはほとんど変わっていないので、tombstone はいい具合に消されています。

CompactRange の実行

Level    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop Rblob(GB) Wblob(GB)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  L1      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   2.1     17.5     15.4      0.01              0.01         1    0.009    3585    476       0.0       0.0
  L2      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0   7.7     13.8     13.7      0.08              0.07         1    0.079     23K      0       0.0       0.0
  L3    164/0    4.23 MB   0.4      0.0     0.0      0.0       0.0      0.0       0.0   3.9     14.0     13.6      0.31              0.28         2    0.156    101K   1046       0.0       0.0
 Sum    164/0    4.23 MB   0.0      0.0     0.0      0.0       0.0      0.0       0.0 5734027.0     14.1     13.7      0.40              0.36         4    0.100    128K   1522       0.0       0.0
 Int      0/0    0.00 KB   0.0      0.0     0.0      0.0       0.0      0.0       0.0 5734027.0     14.1     13.7      0.40              0.36         4    0.100    128K   1522       0.0       0.0

CompactRange の結果からみても、tombstone がほとんど消されていたことがわかるでしょう。

最終的に変更したパラメータ

compaction の挙動確認の内容も踏まえ、結果的に Compaction Style は Universal を維持し、 max_size_amplification_percent の値を下げることで、full compaction を起こしやすくする対応にしました。この変更は問題が生じていた State Store にのみ適用しました。
今回は追加と削除の処理が同じ割合で発生するので用途的にマッチすると考えました。 一方で State Store に追加だけを行う(もしくは、削除に比べて追加が多い)ワークロードの場合は、全体のファイルサイズにおける Rn (最古のファイル)のサイズの比率が大きくなり full compactionの 頻度は徐々に減っていく可能性が高いです。

また、Compaction Style を Leveled に変更する場合は State Store の cleanup が必要になるため、リストア処理を避ける観点からも Universal を維持するのが最適でした。

max_size_amplification_percent を下げることによって Write amplification の増加 が想定されるので、リリース後は RocksDB の I/O 関連のメトリクスを注視しておくのが良いでしょう。

その後

この記事で紹介した対応は、記事の執筆から半年以上前に行われたものなので、その後の状況について簡単にお伝えします。

パラメータを変更した後、パフォーマンスの問題は一旦解消されました。
その後、機能追加によって対象の State Store で扱うデータ量が大幅に増加しましたが、そこもなんとか耐えていました。
しかし、データ量のさらなる増加に伴い、最近またパフォーマンスの懸念が生じ始めている状況です。

こちらに関してはすでに対策を考えていて、 今度は RocksDB のパラメータ調整ではなく別のアプローチで対応する想定です。