Amazon EMR のバージョンアップ 3/3:Presto から Trino への移行

前回の続きです。

EMR 5.36.1 から EMR 6.15.0 への更新

使用するアプリケーションのバージョンは以下のようになりました。OS は Amazon Linux 2 です。

アプリケーション等 EMR 5.36.1 EMR 6.15.0
Tez 0.9.2 0.10.2
Hue 4.10.0 4.11.0
Hive 2.3.9 3.1.3
Hadoop 2.10.1 3.3.6
Presto 0.267 0.2831
Trino N/A 426

Hive, Hadoop, Tez については前の記事で確認済みなので、ここからはそれ以外の要素について検討していきます。

一番問題になったのは Presto から Trino への移行です。Trino 移行に関係して発生した事柄を列挙します。

  • Datadog Agent の設定変更
  • 任意の期間の指定したタイプのクエリを実行して結果の同一性をチェックするためのスクリプトを実行すると trino-server が再起動を繰り返す
  • presto-fluentd を trino-fluentd へ書き換え
  • BigQuery connector の動作確認とクエリの修正
  • trino-client-ruby の model_version を接続先に応じて変更する
  • embulk-input-presto を embulk-input-trino-jdbc に変更する
  • td-agent の EOL で fluent-package 5 へ移行

Datadog Agent の設定変更

Repro で Presto の監視を始めた2018年5月頃は Datadog Agent の Presto インテグレーションはまだ存在していなかったので2、 Datadog Agent の JMX インテグレーションを使用して監視できるようにしていました。

しかし、 Trino へ移行するにあたって JMX インテグレーションで使用する Bean 名が変わってしまったので、これまでの Presto 用の設定ではメトリクスが記録できなくなりました。改めて調べてみると Datadog Agent に Trino インテグレーションが追加されていることがわかりました。そちらを使うことにしたのですが、元々 Presto で監視していたメトリクスとは異なっていたので、不足分は設定を追加しました。

以下のコマンドを実行すると、メトリクスのリストを確認できます。

# 設定ファイルに記載があるメトリクスのリスト
sudo -u dd-agent datadog-agent jmx list matching | tee mathing.txt
# 設定ファイルに記載がないメトリクスのリスト
sudo -u dd-agent datadog-agent jmx list not-matching | tee not-matching.txt

trino-server が再起動を繰り返す

Trino の event listener という仕組みを使い trino-fluentd というものを作り Fluentd に直接ログを流していたのですが、先述の通り trino-server が落ちる現象が発生しました。まずは自分のコードを疑うということで trino-fluentd を外して検証用のスクリプトを流すと、完走してしまったので trino-fluentd 部分に問題があると早合点してしまいました。しかし、クエリの統計情報は必要なので、代替の仕組みを HTTP event listener を使って作ってみたものの trino-sever が落ちる現象は解決しませんでした。 再度、よく調べてみると event listener の有無によって trino-server が落ちるかどうかは変わりませんでした。つまり trino-fluentd や Trino の event listener まわりには問題がありませんでした。

その後しばらく調べると、特定のパターンのクエリを流し続けると trino-server のメモリ使用量が跳ね上がって、タイミングが悪いとヒープの拡張に失敗して EMR のマスターノードで動いている trino-server が落ちるということがわかりました。

検証用のスクリプトを実行すると約2500件のクエリを逐次実行していくのですが、調子がよいと数時間走ったあと trino-server が落ちていました。調子が悪いと30分くらいで落ちていました。trino-server が落ちてしまうと実行中のクエリの情報がログに残らないことがあります。これでは調査しづらいので検証用スクリプトに、実行予定のクエリを記録して S3 に保存する機能を追加しました。また、記録したクエリを実行するためのスクリプトは後述のように別で用意しました。

HTTP event listener を使用している場合は以下のように設定し、同じクエリ ID でステータスが QUEUED しかないクエリを抽出すると trino-server が落ちたときに実行中だったクエリがわかります。

event-listener.name=http
http-event-listener.log-created=true
http-event-listener.log-completed=true

これだけだとヒープ拡張の契機になったクエリわかりづらいので、クエリの実行前後にヒープのサイズをログに出力する Ruby スクリプトを書きました。

require "bundler/inline"

gemfile do
  source "https://rubygems.org"
  gem "trino-client"
  gem "aws-sdk-s3"
  gem "rexml"
  gem "net-ssh"
end

require "logger"

remote_host = "スクリプト実行前に書き換える"

def jmx_heap(remote_host)
  heap_info = {}
  Net::SSH.start(remote_host, "hadoop", keys_only: true, keys: ["~/.ssh/private-key.pem"]) do |ssh|
    heap = ssh.exec!("java -jar ./jmxterm-1.0.4-uber.jar -n -v silent -i heap.jmx")
    heap_info = heap.strip.sub(/HeapMemoryUsage = /, "").tr("=", ":").tr(";", ",").gsub(/ :/, ":")[0..-2]
  end
  eval(heap_info)
rescue Exception => e
  pp heap_info
  { committed: 0 }
end

log = Logger.new($stdout)

# スクリプト実行前に SSH port forwarding しておく
trino = Trino::Client.new(
  server: "localhost:8889",
  user: "trino-debug",
)

s3 = Aws::S3::Client.new
keys = s3.list_objects_v2(bucket: "bucket", prefix: "prefix").flat_map do |response|
  response.contents.map(&:key)
end
log.info("keys=#{keys.size}")

skip_keys = []

keys.each_with_index do |key, index|
  # trino-server が落ちたときのログを見て続きを実行できるようにスクリプトを書き換える
  # next if index < 2417
  if skip_keys.include?(key)
    log.info("#{index} key=#{key} skip")
    next
  end

  sql = s3.get_object(bucket: "bucket", key: key).body.read
  before = jmx_heap(remote_host)
  query = trino.query(sql)
  api = query.instance_variable_get(:@api)
  query_id = api.query_id
  log.info("#{index} #{query_id} key=#{key} committed=#{before[:committed]} size=#{sql.bytesize} queued")
  while query.advance
    sleep 2
  end
  after = jmx_heap(remote_host)
  diff = after[:committed] - before[:committed]
  diff = (diff.to_f / 1024**3).round(6)
  log.info("#{index} #{query_id} key=#{key} committed=#{after[:committed]} diff=#{diff} done")
  sleep 0.5
end

スクリト内で指定している heap.jmx の内容は以下の通りです。

open localhost:9080
bean java.lang:type=Memory
get HeapMemoryUsage
$ java -jar ./jmxterm-1.0.4-uber.jar -n -v silent -i heap.jmx
HeapMemoryUsage = {
  committed = 1577058304;
  init = 536870912;
  max = 23253221376;
  used = 1036226912;
 };

Trino の設定で JMX がリモート接続できるように設定しておけば、ポート番号を指定して jmxterm を実行できます。毎回プロセス ID を調べなくてよいので便利です。

数日かけて、約2500件のクエリのうち30件くらいを流すとほぼ確実に trino-server を落とすことができるところまで絞り込むことができました。

クエリはプログラムで動的に生成している以下のようなものでした。3

  • Cassandra に保存している20億行以上あるテーブルからデータを抽出する
  • サブクエリが4段か5段ほどネストしている
  • サブクエリの中で WITH 句を使用している
  • サブクエリの中に LEFT OUTER JOIN がある
  • サブクエリの中に INNER JOIN もある
  • サブクエリの中で DISTINCT を使っている

当初は EMR のマスターノードに c6i.4xlarge (16vCPU/32GiB memory) を使用していました。また、全てのクエリが Presto で問題なく動作していたことから Trino でも問題なく動くはずと考えて Trino の設定などを調べていました。しかし、解決できなかったためメモリ不足が原因であると疑い EMR のマスターノードを r6i.4xlarge (16vCPU/128GiB memory) に変更しました。そうすると、厳選 trino-server 絶対落とすクエリ集や検証用のスクリプトを流しても trino-server は安定して動作するようになりました。4

presto-fluentd と trino-fluentd

wyukawa/presto-fluentd の v0.0.6 では Prestosql をサポートしています。 reproio/presto-fluentd の v0.0.6 では Prestodb をサポートしています。

しかし、使用しているライブラリのパッケージ名などが異なっているため、いずれも Trino のサポートはありません。 そこで Trino をサポートした reproio/trino-fluentd を作りました。

変更点は以下の通りです。

  • EMR で使う予定だった Trino 426 をサポートした
  • Maven のかわりに Gradle を使うように変更した
  • 依存関係を更新した
  • EMR に組込みやすいように uber jar を作るようにした

EMR に組込みやすいように uber jar を作るようにしたのが、地味に便利でした。

しかし、最終的には後述する HTTP event listener と Fluentd を使う方法を採用することにしました。理由は以下の2点です。

  • メンテナンスするものを減らしたい
  • Trino のバージョンアップに追従する際の手間を減らしたい

HTTP event listener と Fluentd

ちょうど EMR のノードで Fluentd が動作している5ので Trino の HTTP event listener を使えば Fluentd の in-http plugin を使って Trino で実行したクエリの統計情報を集めることができます。

既存の仕組みと整合性を取るためにある程度データを変換する必要がありましたが、それはログ収集基盤として動作している Fluentd でやることにしました。

Fluentd の設定例:

<source>
  @type http
  @id trino-http-event-listener
  port 9880
</source>
<match **>
  @type forward
  # snip
</match>

Fluentd の設定については以下を参照してください。 - http | 1.0 | Fluentd - forward | 1.0 | Fluentd

Trino の設定例:

event-listener.name=http
http-event-listener.log-completed=true
http-event-listener.connect-ingest-uri=http://localhost:9880/trino.query

Trino の設定については HTTP event listener を参照してください。

なお Trino の HTTP event listener で Fluentd に送信するデータサイズが大きいため Fluentd のバッファの設定を適切に行う必要があります。

trino-client-ruby の model_version を接続先に応じて変更する

Repro で開発している Rails アプリケーションや Ruby で書かれたバッチから Trino/Presto への接続には treasure-data/trino-client-ruby: Trino/Presto client library for Ruby を使用しています。trino-client-ruby 自身は Presto と Trino の両方をサポートしているのですが Trino::Client を初期化するときに指定する model_version によって送信する HTTP ヘッダーを変更しています。trino-client-ruby ではこの model_version を動的に検出する方法は提供されていません。

そこで Trino/Presto の REST API /v1/info を叩いて Trino/Presto のバージョンを取得し、その値から設定すべき model_version を導出するようにしました。 Trino への移行が完了したら削除する予定のコードなので簡単なものにしてあります。

def fetch_model_version
  version = JSON.parse(URI.open("http://#{PRESTO_HOST}:#{PRESTO_PORT}/v1/info").read).dig("nodeVersion", "version")
  version = Float(version.sub(/-amzn.*$/, ""))
  if version < 1
    0.205 # Presto で使える最新のモデルバージョン
  else
    351 # Trino で使える最新のモデルバージョン
  end
rescue
  0.205
end

注意点は $.nodeVersion.version の値に以下のように -amzn-1 のようなサフィックスが付く場合があることです。 このあたりは EMR のバージョンによって微妙に変わる可能性があるので、対象とする EMR のバージョンによってどのように実装するのか決める必要がありそうです。

EMR 5.36.1 の Presto 0.267 の場合:

{
  "nodeVersion": {
    "version": "0.267-amzn-1"
  },
  "environment": "production",
  "coordinator": true,
  "starting": false,
  "uptime": "105.88d"
}

EMR 6.15.0 の Trino 426 の場合:

{
  "nodeVersion": {
    "version": "426"
  },
  "environment": "production",
  "coordinator": true,
  "starting": false,
  "uptime": "21.10h"
}

このモデルバージョンを決めるメソッドを Trino::Client.newmodel_version を設定する箇所で呼び出せば、接続先に応じて適したモデルバージョンを設定できます。

embulk-input-presto を embulk-input-trino-jdbc に変更する

Repro では Embulk も活用しています。

主に、バッチ処理のワークフローやデータエクスポート機能に組込まれています。その中で Presto にアクセスするものがありました。

Presto へのアクセスには toyama0919/embulk-input-presto を使用していました。このプラグインRuby で実装されており presto-client-ruby を使用しています。しかし、embulk-input-presto ではモデルバージョンを設定できませんし presto-client-ruby を使っていることからわかるように Trino のサポートも入っていません。また trino-client-ruby を使うように改修しようにも、Embulk v0.11.x 以降では JRuby のサポートが第一線ではなくなって Ruby で実装されたプラグインも今後は非推奨になっていくようです6

そこで reproio/embulk-input-trino-jdbc を作りました。このプラグインembulk/embulk-input-jdbc に含まれている org.embulk.input.jdbc.AbstractJdbcInputPlugin を継承して作っています。この抽象クラスを継承すると基本的には JdbcInputConnection newConnection(PluginTask task) throws SQLException を実装するだけでよいので非常に短時間で実装できました。

まだ JDBC driver — Trino 443 Documentation にある全てのオプションをサポートしていませんが、使えると思います。

trino-client を使っても実装可能ですが、trino-client や Embulk プラグインにドキュメントがあまりないし、Trino の型から Embulk の型への変換などを自分で書かないといけないのでそこそこ大変です。

Hive クエリの問題3

やれることを全てやって、もう問題はないだろうと考えて4月某日の朝イチに EMR cluster の更新を行いました。しばらくは問題なかったのですが、数時間かかる日に数回実行するバッチが失敗しました。2時間くらい調査したのですが、すぐには修正できない感じだったので EMR 5.36.1 (Presto) に切り戻しました。簡単に切り戻せるように準備していたのが活かされました。

事前の検証では、変換先のテーブルと同じ定義を持つ、空のテーブル(S3のパスだけ異なる)を作成し INSERT OVERWRITE を実行して成功することを確認していました。

失敗していた Hive クエリは、以下のようなものでした。 変換元のテーブルは、あまり細かくパーティションを切っていませんでした。変換先のテーブルは、より高速にデータを絞り込むために顧客IDや日時などでパーティションを切っていました。

INSERT OVERWRITE TABLE 変換先のテーブル PARTITION (顧客ID, 日時,...)
SELECT
-- これ以外にもいくつかのカラムがある
, 顧客ID
, 日時
,...
FROM
  変換元のテーブル
WHERE
...
;

ダイナミックパーティション7を使っているので、実行するたびにデータのある顧客IDや日時の数だけパーティションが増えます。

不要になった過去のデータを削除していなかったため、変換先のテーブルには不要なパーティションが数百万ありました。パーティション数の少ない staging 環境では、成功していたので、パーティション数が多いことが原因だと仮説を立てて検証しました。

問題の発生した変換先のテーブルと同じ定義を持つ、空のテーブル(S3のパスだけ異なる)をいくつか作成し、パーティションの数を変えて試したところ以下の結果が得られました。8

パーティション INSERT OVERWRITE の結果
200万 失敗
150万 失敗
100万 成功
50万 成功

古いデータの保持期間を見直したところ、パーティション数は100万以下に抑えられそうだったので、古いデータとパーティションを定期的に削除することにしました。

並行して AWS に調査を依頼したところ、パーティション数が多いことが原因を考えられるので HADOOP_HEAPSIZE を増やしてはどうか、という提案がありました。 HADOOP_HEAPSIZE の値をデフォルト値(1000)の 10 倍以上に増やしたところ staging 環境では INSERT OVERWRITE が成功しました。

まとめ

半年くらいかけて実施した EMR のバージョンアップに関して行ったことを一通り紹介しました。

特に重要だったのは以下の3つです。

  • バージョンアップ前後で Presto/Trino や Hive のクエリの実行結果を比較する仕組みを整備し、自信を持って進めるようにしたこと
  • バージョンアップ作業のスクリプト化を行ない、作業を進める上での心理的な障壁を減らしたこと
  • EMR cluster の入れ替えだけで Presto <=> Trino の切り替えをできるようにし、いつでも戻れるようにしたこと

非常に多くのやってみないとわからないことに遭遇したり、修正が必要なものや新たに作らないといけないものがどんどん出てきたりして大変でした。

おまけ

EMR のバージョンアップに関連して作ったものと捨てたもののリスト

  • 作ったもの
    • presto-fluentd
    • trino-fluentd
    • embulk-input-trino-jdbc
    • Hive 用 EMR cluster の入れ替えスクリプト
    • Hive で実行したクエリの結果を比較する仕組み
    • Presto <=> Trino の切り替えをシームレスにする仕組み
    • 使い捨てのスクリプトをいくつか
  • 捨てたもの
    • presto-fluentd
    • trino-fluentd
    • 独自開発の Hive UDF

EMR のバージョンアップに関連して使用した技術要素

  • Git
    • Trino/Presto の Cassandra connector へのパッチの更新
  • シェルスクリプト
    • EMR のブートストラップアクションなど
  • Ruby
  • Java
    • Embulk のプラグイン
    • Presto のイベントリスナー(presto-fluentd)
    • Trino のイベントリスナー(trino-fluentd)
    • 独自開発した Hive UDF
  • Embulk
  • MySQL
  • Fluentd
  • AWS EMR(特に重要なもののみ)
    • Hive
    • Presto
    • Trino
    • Amazon Linux2
    • ブートストラップアクション

  1. Presto については Trino へ移行するので新しい EMR cluster にはインストールしません。
  2. Datadog Agent に Presto integration が追加されたのは2019年5月以降 https://github.com/DataDog/integrations-core/pull/3131
  3. 当然ですが Presto だと全く問題なく動作していたクエリです。
  4. HeapMemoryUsagemax は最大で 70GiB 程度になりました。Presto では 20GiB 前後で安定していました。
  5. Fluentd を入れているのは Trino のログを Fluentd で集めて S3 に永続化するためです。
  6. Embulk v0.11 / v1.0 に向けて: プラグイン開発者の皆様へ
  7. https://cwiki.apache.org/confluence/display/hive/dynamicpartitions
  8. 空のパーティションを作るだけでも数時間かかるので、放置して検証できるよう工夫しました