更新可能なデータレイクを構築するテーブルフォーマットApache Hudiについて

Reproでチーフアーキテクトを担当しているjoker1007です。

今回、社内のデータストレージの将来的な選択肢の一つとしてApache Hudiというテーブルデータフォーマットについて調査と実データでの検証を実施しました。

この記事では2回に分けて、そもそもhudiってどんなフォーマットなのか、どういうデータで検証してどんな結果が得られたのかについて紹介します。

ということで第1回は、hudiそのものについての紹介をしていきます。

この記事はhudi-0.14.1を利用して検証した時のものです。また社内向けに書いた資料の手直しであるため丁寧語でないことに御留意ください。

Hudiとは何か、その目的

hudiは更新可能なデータレイクを構築するためのテーブルフォーマットである。

ストリーミングによるデータインサートや、upsert, deleteをサポートする。

通常、データ分析に向いたデータフォーマットはカラムナー(列指向)のデータ構造を持ち、バルクリードに向いている。parquetやorcなどがその代表例である。 こういったデータ構造は、複数行のレコードをまとめてカラムごとに分け圧縮をかけることを念頭に置いているため、基本的に一件単位のデータ更新やデータ削除をサポートしない。 そのため更新や削除を伴うタイプのデータを分析に向いた形で保持するには、更新のためにデータを洗い替えするか重複を許可してクエリでカバーするといった工夫が必要になる。またデータをクエリ可能な状態に反映するためにはバッチ処理が必要でかなりの時間がかかる。 この様に一件単位のデータ更新や反映時間までの課題を解決するための工夫を、テーブルフォーマット全体の仕様としてカバーするのがhudiである。

hudiには大きく分けて二通りのデータの書き方をサポートする。

一つは、バッファに貯めたレコードを書き出す際に、既存のカラムナーフォーマット(parquetまたはorc、以降は読み易さのためparquetをベースに話をする)のファイルとマージして同一キーのものは上書きした結果に置き換えた新しいparquetファイルを生成するCopy on Write方式。

もう一つは、avroフォーマットで一件単位のレコードを書き込み一定期間蓄積する。その後parquet、もしくはorcフォーマットにデータをマージしてコンパクションを行うMerge on Read方式。

それぞれ後でもう一度解説する。

データにはユニークなキー値を持つことができ、同一のキー値を持つレコードはマージする時に統合される。削除も同様。 Merge on Readテーブルではクエリ時にはavroで保存されている一件単位のレコードとparquetファイルをまとめて読み込みクエリエンジン側でマージ処理をした上で結果を返す。 こういった動きをすることで、更新可能かつ反映時間の短いデータレイクの構築を可能にする。

hudiの対象領域が分かったので、ここからはhudiが上記の目的を達成するために供えた各構成要素や概念の解説をしていく。

ファイルとレイアウトに関連する概念

timeline

see. https://hudi.apache.org/docs/next/timeline

hudiは、parquetファイルの書き換えや書き込み途中の状態を管理するために、timelineという概念の下でファイルを管理する。

timelineはイベントの時系列を表したもので、Instant(時刻)とアクションとStateの組み合わせで表現される。 timeline上のイベントはファイルとして書き出され、アクションには、commits, cleans, deltacommit, rollback, compactionなどがある。

ファイル名は以下の命名規則に従う。(0.14.1時点の物で、今後変更される可能性が高い)

[Action timestamp].[Action type].[Action state] 

Action stateはrequested, inflight, suffix無しの3種類で、suffix無しが完了を表わす。

例えば、処理の中でcompactionがリクエストされたとすると、 20240605073236386.compaction.requestedというファイルが生成され、compaction対象がファイルの中に保存される。

実際に処理が開始すると、20240605073236386.compaction.inflightというファイルが生成される。

処理が完了すると20240605073236386.commitというファイルが生成される。compactionからcommitにAction Typeが変わっているが、これはcompaction完了時のファイル書き出しがcommitに相当するからだ。

202406050732363862024-06-05 07:32:36.386を意味し、timelineがミリ秒単位であることが分かる。

Table Type

see. https://hudi.apache.org/docs/table_types

hudiのテーブルは書き込み方式に対応して2種類ある。

  • Copy on Write
  • Merge on Read

Copy on Writeは書き込みの度にベースファイル(通常はparquetファイル)を丸ごと置き換える方式でテーブルを管理する。 これは洗い替えのバッチ処理を何度も繰り返しているのと実質的には同じと言える。そのため書き込みには一定の時間がかかるが、書き込まれたデータはクエリエンジンからは通常のparquetファイルにクエリするのと変わらない。

Merge on Readは書き込み時にAvroでシリアライズされたデルタログを書き込み、クエリ時点でベースのparquetファイルとデルタログを結合してデータを読み出すことで、低レイテンシでの書き込みを実現する。Merge on Readテーブルでは一定コミットごとにcompactionまたはclusteringの処理を行うことで、デルタログをベースファイルのparquetに結合して新しいベースファイルを生成する。これにはデルタログの書き込みよりも長い時間がかかる。

また、Merge on Readテーブルはbaseとなるparquetファイルだけを対象にクエリすることも可能。その際はデルタログのレコードはまだ反映されていないものとして扱われる。これをread optimizedクエリと呼ぶ。

その他にも、hudiはある時点のコミット状態のファイルのみをリストアップ可能であるため、過去の特定時点のsnapshotに対するクエリをサポートし、また、ある時点からの更新分のみを取得できるため、Incremental Queryもサポートする。

実際にはクエリエンジンがそれらをサポートしているかどうかで実行可能かどうかが決まる。基本的にSparkが第一サポート環境なので、Sparkでクエリする分にはフル機能が利用できる。 Reproで利用しているTrinoでは残念ながら現時点でread optimizedクエリしかサポートしていない。

File Layout

see. https://hudi.apache.org/docs/file_layouts, https://hudi.apache.org/tech-specs/

hudiはファイル名の規則及びディレクトリ構成によってファイルを識別している。そのためファイル配置のレイアウトも仕様で決められている。

/data/hudi_trips/                   <== Base Path
├── .hoodie/                        <== Meta Path
|   └── hoodie.properties           <== Table Configs
│   └── metadata/                   <== Table Metadata
├── americas/
│   ├── brazil/
│   │   └── sao_paulo/              <== Partition Path 
│   │       ├── <data_files>
│   └── united_states/
│       └── san_francisco/
│           ├── <data_files>
└── asia/
    └── india/
        └── chennai/
            ├── <data_files>

.hoodie以下に前述のtimeline関係のファイルや書き込み中を示すMARKERや、indexやcolumn_statsなどを含むmetadataテーブルが含まれる。

これらのファイルの配置先として想定されるのは、HDFSなどの分散ストレージ、もしくはS3に代表されるクラウド上のオブジェクトストレージになる。

ここからは実データを持つdata_filesの中のファイルについて詳しく見ていく。

ベースファイルの例

ベースファイルは出力されたカラムナーフォーマットのファイルで、つまり大抵の場合はhudiテーブル内のparquetファイルを指す。 ベースファイルの命名規則は以下の通り。

[File Id]_[File Write Token]_[Transaction timestamp].[File Extension]

例として検証中に実際に出力されたファイルを元にした、<table_base_path>/partition_id=2563/26c6a734-90b1-4826-8ba0-9308d3294363-0_89-24-17401_20240606100956894.parquetというファイル名を見てみる。

partition_id=2563の部分がパーティションである。この例はhiveスタイルと同じに設定してあるが、デフォルトはキー名が無く上記のtree出力の様に値だけで表現される。 26c6a734-90b1-4826-8ba0-9308d3294363-0の部分がFile IDになる。hudiはこのFile IDが共通のファイル郡をFile Groupと呼ぶ。 89-24-17401の部分がFile Write Tokenになる。同じFile IDのファイルに対し書き込み施行の度にここの値が増えていく。エラーやリトライなどで同一コミットでも複数書き込みが発生する可能性があり、その場合に最新の結果を特定するために利用する。 20240606100956894の部分がTransaction Timestampで、概ねtimelineのcommitかreplacecommitの値と一致する。トランザクションのタイムスタンプであってファイルのタイムスタンプではない。

デルタログファイルの例

デルタログファイルは、Merge on Readテーブルで作成されるベースファイルからの差分更新データを指す。hudiにおいてはavroでシリアライズされた行指向のファイルを指す。 デルタログファイルの命名規則は以下の通り。

[File Id]_[Base Transaction timestamp].[Log File Extension].[Log File Version]_[File Write Token]

<table_base_path>/partition_id=2563/.26c6a734-90b1-4826-8ba0-9308d3294363-0_20240606114251267.log.1_2670-9-4044というファイルを例に見てみる。

これは実際に出力されたファイルを元にした例だが、見て分かる様に上記のベースファイルとFile Idが共通している。Merge on Readでデルタログを書き込む時は、どのFile Idに対応する書き込みか特定して書き込まないと後からマージできないので、この様になっている。

Base Transaction Timestampは20240606114251267で、これはベースファイルのどのトランザクションからの差分かを特定するもので、つまり20240606114251267というタイムスタンプを持つベースファイルに対する変更差分がこのファイルである、ということを示す。

このファイルではLog File Versionは1になる。デルタログの書き込み(deltacommit)はcompaction(またはclustering)が行われるまで、何度も実行される。そのため、deltacommitごとに差分が積み重なる。同一File Idに対する差分が積み増しされると、Log File Versionをインクリメントして新しい差分ファイルが書き込まれる。例えば、次のdeltacommitの後には.26c6a734-90b1-4826-8ba0-9308d3294363-0_20240606114251267.log.2_2667-9-4042というファイルが生成された。

Indexing

see. https://hudi.apache.org/docs/indexing

hudiの特徴の一つとしてUpsertのサポートがある。カラムナーフォーマットのparquetやorcに対してレコード単位の更新をサポートするために、レコードキーからFile Idを一意に特定する仕組みが必要になる。そのためhudiは別途indexの仕組みを持っている。

インデックスにはいくつかの種類がある。以下公式サイトより引用。

  • BLOOM: Employs bloom filters built out of the record keys, optionally also pruning candidate files using record key ranges.Key uniqueness is enforced inside partitions.
  • GLOBAL_BLOOM: Employs bloom filters built out of the record keys, optionally also pruning candidate files using record key ranges. Key uniqueness is enforced across all partitions in the table.
  • SIMPLE (default for Spark engines): Default index type for spark engine. Performs a lean join of the incoming records against keys extracted from the table on storage. Key uniqueness is enforced inside partitions.
  • GLOBAL_SIMPLE: Performs a lean join of the incoming records against keys extracted from the table on storage. Key uniqueness is enforced across all partitions in the table.
  • HBASE: Manages the index mapping in an external Apache HBase table.
  • INMEMORY (default for Flink and Java): Uses in-memory hashmap in Spark and Java engine and Flink in-memory state in Flink for indexing.
  • BUCKET: Employs bucket hashing to locates the file group containing the records. Particularly beneficial in large scale. Use hoodie.index.bucket.engine to choose bucket engine type, i.e., how buckets are generated;
    • SIMPLE(default): Uses a fixed number of buckets for file groups per partition which cannot shrink or expand. This works for both COW and MOR tables. Since the num of buckets cannot be changed and design of one-on-one mapping between buckets and file groups, this index might not suit well for highly skewed partitions.
    • CONSISTENT_HASHING: Supports dynamic number of buckets with bucket resizing to properly size each bucket. This solves potential data skew problem where partitions with high volume of data can be dynamically resized to have multiple buckets that are reasonably sized in contrast to the fixed number of buckets per partition in SIMPLE bucket engine type. This only works with MOR tables.
  • RECORD_INDEX: Index which saves the record key to location mappings in the HUDI Metadata Table. Record index is a global index, enforcing key uniqueness across all partitions in the table. Supports sharding to achieve very high scale.

ログの投入などデータウェアハウスの典型的な事例では、時系列にincrementされるキーをベースにBLOOMフィルタを使うことで、対象レコードの候補を大幅に絞り込むことができる。

一方でCDCでの利用や、ユーザーの属性情報などキーの順序がランダムな場合、BLOOMフィルタは効果が小さくなる。こういう場合はSIMPLEインデックスで対応するのが良いと公式サイトに書かれているが、データ量が一定以上になると上手くいかなくなる。理由は第2回で予定している検証時のパフォーマンスチューニングについてまとめる時に説明する。

Metadata Table

書き込み処理時のファイルリストの取得を効率化するためのfiles indexや、前述のindexingでBLOOMを利用する時のbloom filter index、もしくはRECORD_INDEXを利用する時のindexファイルなどのデータもここに配置される。

このMetadata Table自体もhudiのMerge on Readテーブルと同様の方法で更新されており、デルタログがcommitされて定期的にcompactionされベースファイルに纏められる。ベースファイルはそれぞれの目的に合わせた形式で異なる。

アクション・オペレーションに間する概念

hudiは1つのファイルで完結するフォーマットではないため、書き出し工程に複数の手順がある。それぞれがどんなものをかを知っておくことで負荷のかかり方を理解できる。

Compaction

see. https://hudi.apache.org/docs/compaction

deltacommitで作成されたデルタログを読み、既存のparquetファイルがあればそれとマージして、新しいparquetファイルを作成する処理。

compactionが終わった時点で正式なcommitとなる。 この時点までに生成されたparquetファイルは通常のparquetファイルとしてクエリ可能であり、Trinoでも問題なくクエリできる。

compactionの実施タイミングにはいくつかのパターンがある。

  • inline compaction: 主にバッチ処理で利用される。バッチ処理の何回かのdeltacommitが終了するごとに、同期的にcompactionを実行する。
  • async compaction: Stream Ingestion処理で利用される。inline compactionと同様に何回かのdeltacommitごとにcompactionが実行されるがdeltacommitの書き込み処理はブロックしない。
  • scheduling compaction: 何回かのdeltacommitごとにcompactionの実行要求だけを作成する。実行要求はcompactionの対象をリストアップしたファイルを作成することで完了となる。実行時はspark-submitでジョブを実行する(下記参照)か、hudi-clicompaction runを呼び出すことで実行する。書き込みと別のクラスタで処理が出来るため書き込み処理への影響を抑えられる。

spark-submitによるcompactionジョブの実行例

spark-submit \
  --executor-memory 48g \
  --driver-memory 16g \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf spark.driver.memoryOverhead=1024 \
  --conf spark.executor.memoryOverhead=3072 \
  --conf spark.memory.fraction=0.4 \
  --class org.apache.hudi.utilities.HoodieCompactor /usr/lib/hudi/hudi-utilities-bundle.jar \
  --spark-master yarn \
  --spark-memory 48g \
  --base-path s3://repro-hudi-experiment/test_hudi_table \
  --table-name test_hudi_table

Clustering

see. https://hudi.apache.org/docs/clustering

レコードをバケッティングして小さいファイルを結合したり、逆に大き過ぎるファイルを分割したりを行う。この時特定のキーでソートしたファイルを作成することでクエリ効率の向上にも利用できる。

現状CONSISTENT_HASHING方式のバケットインデックスを利用する際は、Clusteringを利用してバケット数の調整と再割り当てを行う必要がある。

clusteringもcompactionと同様、inline, async, schedulingの実行パターンが存在する。

clusteringは同時にcompactionと同様の処理を内包しているため、clusteringが実行されているとcompactionが実行されない(する必要がない)場合がある。

spark-submitによるclusteringジョブの実行例。

spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path /path/to/hudi_table/basePath \
--table-name hudi_table_schedule_clustering \
--spark-memory 1g

Cleaning

see. https://hudi.apache.org/docs/hoodie_cleaner

compactionやclusteringによってparquetファイルが更新されると古いparquetファイルやデルタログファイルは不要になる。

hudiはロールバックや過去のスナップショットをクエリ可能にするために、何コミット分か過去の履歴を保持する。そして保持期間が過ぎたものはcleaning処理によって削除される。

cleaning処理は基本的にはデフォルトで処理の終わりに自動で実行される。別途spark-submitやhudi-cliから手動実行もできる。

spark-submit --master local --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`\
  --target-base-path /path/to/hoodie_table \
  --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
  --hoodie-conf hoodie.cleaner.commits.retained=10 \
  --hoodie-conf hoodie.cleaner.parallelism=200

書き込みの流れ

hudiの書き込みは工程が割と複雑で、単純にフォーマットを変換して書き出す様なものではないので、今のところデータ出力にはSparkかFlinkを利用しhudiが用意している書き出しジョブを経由してデータを出力することになる。

データ書き込みの流れは以下の様になっている。

  1. inputデータ取得(parquet, csv, kafkaなど)
  2. 同一キーのものを更に時系列キーでソートし、deduplicationを行う
  3. レコードのキーでIndexをlookupし、File Groupを特定する
  4. ファイルサイズを調整するためレコードのbinpackを行う
  5. 書き込みパーティションと突き合わせる
  6. 実際にファイルを書き出す
  7. コミット処理でtimelineを確定させ、書き出したファイルを参照可能にする
  8. 必要であればcleaningを行って、不要なファイルを削除する。不要なファイルとは一定以上古いコミットのベースファイル・デルタログや、書き込みに失敗して参照されなくなったファイルなど
  9. Merge on Readテーブルなら必要に応じてcompactionを実施し、デルタログファイルとベースファイルを結合し新しいベースファイルを作成する

もし、update処理が不要でキーの重複などを考慮する必要がなく、ただデータを追記していきたいだけなら、INSERTオペレーションを指定することでindexのlookupを省略できる。

具体的な書き込み方法としては以下の様な方法が選択できる。

spark-shellを利用し、DataFrame APIを使ってhudiフォーマットの書き出しを行う

spark-shell --jars <path of hudi-spark-bundle.jar> \
  --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"  \
  --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import spark.implicits._

val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"

val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
    (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
    (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
    (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

var inserts = spark.createDataFrame(data).toDF(columns:_*)
inserts.write.format("hudi").
  option(PARTITIONPATH_FIELD_NAME.key(), "city").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

spark-sqlを利用し、Spark SQLで書き込みを行う

spark-sql --jars <path of hudi-spark-bundle.jar> \
  --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"  \
  --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
CREATE TABLE hudi_table (
    ts BIGINT,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING HUDI
PARTITIONED BY (city);

INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'    ),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'    ),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'      ),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');

spark-submitを利用し、組込みのHudi Streamerを使う

see. https://hudi.apache.org/docs/hoodie_streaming_ingestion

spark-submit \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf spark.streaming.kafka.allowNonConsecutiveOffsets=true \
  --conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'" \
  --conf spark.driver.extraJavaOptions="-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'" \
  --conf spark.task.maxFailures=4 \
  --conf spark.executor.maxNumFailures=100 \
  --class org.apache.hudi.utilities.streamer.HoodieStreamer /usr/lib/hudi/hudi-utilities-bundle.jar \
  --props s3://repro-hudi-test/hudi-kafka-source.properties \
  --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
  --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
  --source-ordering-field updated_at \
  --target-base-path s3://repro-hudi-test/hudi_test \
  --table-type MERGE_ON_READ \
  --target-table hudi_test \
  --enable-sync \
  --op UPSERT

次回

以上が、hudiの目的と基本的な構成要素になる。 これを踏まえた上で、次回の記事では、実際の検証に使ったデータセットとその際のhudiのパフォーマンス特性についてと、設定項目やチューニングポイントについても紹介する予定。