こんにちは。業務委託として SRE チームのお手伝いをしている @syucream です。 本記事では Repro にて開発した、 Go 製のカラムナフォーマットへのデータ変換ツール columnify について、開発背景や技術的な取り組みを紹介します。
なぜカラムナフォーマットか?
ことのおこり
事業がスケールすると共に扱うログの量が増えることは、喜ばしい反面さまざまな悩みをもたらします。その中でも顕著なものの一つとしてコストの問題が挙げられます。 膨大なログデータはログに対するストレージ料金を増大させると共に、分析や可視化に際してクエリで求められるコンピュートのコストも無視できなくなっていきます。 近頃 Repro でもコンテナのログの管理においてこの問題が顕著になってきました。Repro のバックエンドシステムは ECS 上のコンテナで実現され、ログの閲覧・管理のため外部のログ収集サービスを使っているのですが、インフラのスケールに伴い利用コストが増大しており今後もそのコストが伸びてしまう目算が立っていました。
この問題を緩和する策として、ログの選択的な送信先の決定とログファイルのカラムナフォーマットでの保存を行おうと考えました。 前者では単純に、リアルタイム性への要件が強くないまたは膨大な量になることが見込まれるログを外部ログ収集サービスに送信せず S3 に格納していくことでコストを抑えます。 S3 にログファイルを格納さえしておけば後で Amazon Athena でクエリするなど活用ができます。 しかし素直にログファイルを JSON など行指向フォーマットで保持していくと、 Athena でクエリするなど読み込む際のクエリパフォーマンスやスキャン対象のデータ量、ストレージ使用量などが懸念されます。 そこで行指向でなく列指向であるカラムナフォーマットの出番です。 S3 に格納するログのフォーマットに Apache Parquet や Apache ORC といったフォーマットを用いることでこの問題を改善できます。
Apache Parquet とその周辺
カラムナフォーマットで、どのようにデータがエンコードされ、クエリのパフォーマンス向上などの効果が得られるか。 これについては Retty さんのこちらの記事が非常によくまとまっているのでそちらをご参照いただければと思います。 以降では Repro にてログファイルのアーカイブのために用いることにした Apache Parquet について触れます。
https://engineer.retty.me/entry/columnar-storage-format
Apache Parquet は Google の Dremel の論文を参考にしたカラムナフォーマットの実装です。 実装としては主に Java が充実しており、 C++ や Rust なども存在します。 また Ruby や Python では Apache Arrow と統合して利用されるケースもあります。
Parquet はカラム毎に 様々なエンコーディング方式 を適用できる、ネストしたカラムや繰り返しの(repeated な)カラムも列指向エンコードでされる、メタデータの走査でクエリ範囲が絞られて高速化 される場合がある 、実装言語に依るが圧縮コーデックが多様である、という特徴があります。
そして今回最も重要な事として、 Athena は賢く Parquet 形式のファイルを解釈 してくれます。 Athena はスキャン量に対する従量課金のため、ログファイルが効率よく圧縮されることによりコストを抑えられ、その上スキャンするカラムを適当に選択することで実際にスキャンされるデータブロックを絞り込むこともできます。
columnify について
Parquet 出力と既存システム
Parquet 形式のファイルを扱うためにとりうる選択肢はいくつかありますが、分かりやすい例としては Hadoop のスタックに乗ってしまうことでしょう。 たとえば Apache Spark で DataFrame API を利用して Parquet 形式ファイルを出力する のであれば、複雑な変換処理をしなければ非常に簡素なコーディングで行えます。 また前述の通り Apache Arrow を介して Parquet ファイル出力するのも有用な手です。
しかしこれらの構成ではクラスタの準備や依存ライブラリの解決、環境がシンプルでない、ポータビリティの確保に苦労するなどの問題が生まれます。 我々としては単一の CLI ツールで入力データを手軽に変換可能であることを望んでいました。 また Repro ではログ収集と S3 への保存に fluentd を使っており、 fluentd と Parquet 出力処理が容易に統合可能であると理想的です。
このユースケースに合わせるため、今回我々は Go でカラムナフォーマットへのデータ変換ツール columnify を開発しました。 社内での実際の利用例のひとつとして以下で紹介する対処法で、従来 JSON 形式でログファイルを保存していた箇所を Parquet 形式にするよう置き換えたところ 1/3 以下のファイルサイズになったケースも見られました。
columnify とは?
columnify は Go で実装した、行指向データを列指向データに変換するツールです。本記事の公開に合わせて OSS としてリポジトリを公開しています。 Go 製ということでシングルバイナリで配布可能で既存システムに統合しやすいものとなっています。 設計としてはシンプルに、スキーマ情報と行指向のレコード群を受け取り、中間表現に変換してカラムナフォーマットで出力するものとしています。 中間表現への変換ロジックを複数持つことで入力スキーマ・データフォーマットを多様化できるようにしています。
現状の実装では、スキーマと入力データに以下のような形式を取れるようにしています。ここでサポートしている形式は fluentd との統合のため fluentd の Formatter Plugins でサポートされる形式 を意識しています。出力形式は今のところ Parquet しかサポートしていませんが、 ORC のサポートが実装可能そうであればやってみるのも面白そうです。
schema
input data
中間表現としてはスキーマ部分は Apache Arrow の Go 実装 を利用しています。データ部分は愚直に Go の map[string]interface{}
型で保持しています。
データ部分も Arrow を採用したかったのですが、中間表現の実装当時に一部統合がうまくいかないケースがあり、また後述する parquet-go との連携が容易でなかったので一旦諦めています。
最終的に Parquet 形式のファイルを出力する部分は parquet-go を利用しています。中間表現のスキーマ・データを受け取ってカラムナフォーマットでデータを持ち直して Parquet 形式のバイナリファイルを出力します。
columnify の使い方
CLI での使い方
columnify は goreleaser のビルド結果のバイナリでのリリースを行っています。 go get
が簡単に実行できる環境であればそれで取得することも可能です。
$ GO111MODULE=off go get github.com/reproio/columnify/cmd/columnify
コマンドラインオプションでスキーマやデータの形式と、実際のファイルパスを指定できます。
$ ./columnify -h Usage of columnify: columnify [-flags] [input files] -output string path to output file; default: stdout -recordType string data type, [avro|csv|jsonl|ltsv|msgpack|tsv] (default "jsonl") -schemaFile string path to schema file -schemaType string schema type, [avro|bigquery]
試しに Avro で記述されたスキーマに従う CSV ファイルを Parquet に変換してみましょう。 以下のようなサンプルデータを用意してみました。 Avro で言うところのプリミティブ型だけが列挙された例となります。
# schema $ cat examples/primitives.avsc { "type": "record", "name": "Primitives", "fields" : [ {"name": "boolean", "type": "boolean"}, {"name": "int", "type": "int"}, {"name": "long", "type": "long"}, {"name": "float", "type": "float"}, {"name": "double", "type": "double"}, {"name": "bytes", "type": "bytes"}, {"name": "string", "type": "string"} ] } # record $ cat examples/primitives.csv false,1,1,1.1,1.1,"foo","foo" true,2,2,2.2,2.2,"bar","bar"
コマンドラインオプションで、 Avro 形式のスキーマと CSV ファイルを与えてみます。
$ columnify -schemaType avro -schemaFile examples/primitives.avsc -recordType csv examples/primitives.csv > out.parquet
変換結果の Parquet ファイルの確認は parquet-tools で行うと便利です。 入力で与えたデータが Parquet 形式で表現できています。
$ parquet-tools schema out.parquet message Primitives { required boolean boolean; required int32 int; required int64 long; required float float; required double double; required binary bytes; required binary string (UTF8); } $ parquet-tools cat -json out.parquet {"boolean":false,"int":1,"long":1,"float":1.1,"double":1.1,"bytes":"Zm9v","string":"foo"} {"boolean":true,"int":2,"long":2,"float":2.2,"double":2.2,"bytes":"YmFy","string":"bar"}
fluentd との統合
元々実現したかった、 fluentd でのデータパイプラインと統合して S3 に Parquet 形式のファイルを put することを考えます。
fluent-plugin-s3 は ユーザ定義の (de)compression ロジックを挿入する余地 を設けてくれているので今回この仕組みを利用します。 Parquet への変換方法は単純で、ユーザ定義の Compressor クラス内で fluentd のバッファファイルを入力に columnify コマンドを実行するだけです。 これは fluent-plugin-s3 リポジトリに存在する、 lzo コマンド呼び出しで圧縮する LZOCompressor などが参考になります。
これに従い、今回は examples/fluent-plugin-s3/s3_compressor_parquet.rb
にあるように ParquetCompressor
を実装しています。
fluentd の config で以下のように S3 plugin の compressor として認識させれば統合が完了します。
<match s3.**> @type s3 ... store_as parquet <compress> schema_type avro schema_file /path/to/columnify/examples/schema/primitives.avsc record_type msgpack </compress> ...
以上で、既存の fluentd の構成を大きく変更することなく columnify と Compressor の実装を追加するだけで Parquet 形式での出力が可能になりました。
ちなみに今回実装した Compressor は fluent-plugin-s3 本体のリポジトリに統合してもらう話 も出ています。 もし同じようなユースケースを抱く方がいれば、近い将来組み込みやすくなるかもしれません。
おわりに
というわけで、 Parquet 形式でのログファイルの保存と columnify の紹介でした。 Go 製のビッグデータ関連のツールはまだ少ないように見受けられ、深堀りすると面白くかつ運用に役立つものができるのではないかという印象を受けました。 今回開発した columnify も、紹介した以外にも単純にダミーの Parquet ファイルを他のフォーマットから生成する、みたいな別の用途にも使えるかもしれません。 Parquet ファイルを生成するのが意外と骨が折れるケースはありそうですので。
最後になりますが、 Repro ではデータエンジニアリングやデータ処理基盤を支えるエンジニアを募集しています! Repro は利用者さまのデータを預かり価値を提供する、まさにデータがビジネスの軸のひとつになっている事業を展開しており、データにまつわる面白い課題がまだまだ存在すると思います。 ご興味を抱いていただけましたらぜひ、以下ページをご参照いただければ幸いです。