Vertex Pipelines で動く Kubeflow Pipelines のサンプルを公開しました

Repro AI Labs で Software Engineer として働いている杉山阿聖です。Repro では機械学習の基盤として GCP を用いています。今回は Google I/O 2021 で発表された Vertex AI のサービスのひとつである、機械学習パイプラインの構築・運用を行える Vertex Pipelines で動かせるサンプルを作成したのでその紹介をします。サンプルは次のリンクからお試しください。

reproio/lab_sample_pipelines

この記事ではまず、機械学習パイプラインの主な要件について述べます。次に、機械学習パイプラインの構築で用いられる Kubeflow Pipelines について概要を述べます。最後に、機械学習パイプラインの構築にあたり理解が必要な Kubeflow Pipelines の仕様について、今回作成したパイプラインを例に述べます。

機械学習パイプラインの概要

機械学習パイプラインは機械学習において必要な一連の処理をパイプラインとして実装したものです。

機械学習では蓄積されたデータを用いてモデルを訓練し、その結果を本番環境にデプロイして利用しますが、この一連のプロセスは複雑であることが知られています。この一連の処理を意味のある単位をコンポーネントして区切り、実装したものが機械学習パイプラインです。次の図は論文 TFX: A TensorFlow-Based Production-Scale Machine Learning Platform からの引用です。

f:id:K_Ryuichirou:20210622124533p:plain
Figure 1: High-level component overview of machine learning platform.

この図は機械学習パイプラインの要件をよく表しています。たとえば、オーケストレーションがある点はデータパイプラインと機械学習パイプラインに共通する点ですが、データや機械学習モデルの可視化が必要になる点は機械学習パイプラインに特有な要件です。

機械学習パイプラインは次の処理を典型的に含みます。

  • データの取得
  • データの学習用/評価用データセットへの分割
  • データの前処理
  • モデルの訓練
  • モデルの評価
  • モデルのデプロイ

機械学習パイプラインについては別の資料にまとめたので参照ください。

Kubeflow Pipelines の概要

Kubeflow Pipelines機械学習パイプラインを実装するための OSS のひとつです。Kubeflow Pipelines は機械学習基盤を Kubernetes で展開する Kubeflow の一部として実装されており、機械学習パイプラインの運用に必要な次のコンポーネントを備えています。

  • パイプラインを管理し、生成物を可視化するための Web UI
  • パイプラインのコードと生成物を管理するための MLMD (Machine Learning MetaData)
  • パイプラインの実行を制御するための Orchestration

Kubeflow Pipelines では MLMD に MySQL を、Orchestration に Argo Workflow を用いています。アーキテクチャの詳細はKubeflow Pipelines のドキュメントを参照ください。

Repro ではこれまで GCP で提供される Kubeflow Pipelines のサービスである AI Platform Pipelines を活用してきました。これまでの取り組みの内容について説明した資料があるのでよければご確認ください。

https://docs.google.com/presentation/d/19P_hAZw8W9X1myeV1wuKYVEAHZsOl3-ohcV1E4wug7g/edit?usp=sharing

AI Platform Pipelines から Vertex Pipelines へ

AI Platform Pipelines では GKE 上に Kubernetes をデプロイし、自分で管理する必要があります。資料で触れているように、GKE のデフォルトのダッシュボードで管理に必要な要件を満たすことはできましたが、管理対象が増えてしまうのは悩みのタネでした。

また、機械学習パイプラインでは一部の処理で莫大なメモリーを使い、別の処理では別の計算資源 (CPU や GPU) が要求されることがあり、柔軟な計算資源の確保が必要になります。AI Platform Pipelines では Preemptible instance を用いて Kubernetesクラスターを構成でき、コンポーネントごとに実行するクラスターを指定できるため、オートスケールと組み合わせることでこのような要件への対応も可能です。しかし、管理対象のクラスターが増えてしまうこともあり、なかなか活用できずにいました。

上記の課題を解決するために、コンポーネントをすべて AI Platform Training というモデルを学習させるためのサービスで実装することも候補の1つにあがりました。AI Platform Training ではカスタムのコンテナイメージを作成し実行できるため、理論上は機械学習パイプラインのすべての処理を実行できます。また、マシンタイプも柔軟に選択でき、GPU や TPU も利用可能です。しかし、既存の実装に対する変更や検証を含めた移行コストを考えると現実的ではないと判断していました。

しかし、Vertex AI が発表されると状況が一変しました。Vertex Pipelines は AI Platform Pipelines 同様に Kubeflow Pipelines 互換なサービスですが、Kubernetesクラスターを必要としません。代わりにすべてのコンポーネントをモデルの訓練を行うサービスで実行します。まさしく我々が検討していたものがコストなく利用可能になりました。

Vertex Pipelines ではマシンタイプをコンポーネントごとに選べるほか、GPU も利用可能です (TPU は未対応でした)。これはまさに求めていたものであり、Repro でも現在、Vertex Pipelines へ移行するために検証を進めています。

はじめての Kubeflow Pipelines SDK

Kubeflow Pipelines ではパイプラインを記述するための SDK が提供されています。SDK を用いることで機械学習パイプラインを Python で記述できます。ここでは小さなサンプルを引用し、記述方法について述べます。

pipelines/hello_world.py でもっとも小さなパイプラインのサンプルコードが提供されています。main 関数で行っている、機械学習パイプラインの構築に必要な記述について見ていきましょう。

最初に行なわれているのはコンポーネントの処理の実装です。ここでは受け取った入力をそのまま出力するプログラムを実装しています。

def hello_world(text: str):
    print(text)
    return text

components.func_to_container_op(
    hello_world, output_component_file='hw.yaml'
)

func_to_container_opコンポーネントの仕様を表す YAML ファイルを出力します。YAML ファイルは ComponentSpec に従う形式で出力されます。出力された hw.yaml を次に示します。

name: Hello world
inputs:
- {name: text, type: String}
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -ec
    - |
      program_path=$(mktemp)
      printf "%s" "$0" > "$program_path"
      python3 -u "$program_path" "$@"
    - |
      def hello_world(text):
          print(text)
          return text

      import argparse
      _parser = argparse.ArgumentParser(prog='Hello world', description='')
      _parser.add_argument("--text", dest="text", type=str, required=True, default=argparse.SUPPRESS)
      _parsed_args = vars(_parser.parse_args())

      _outputs = hello_world(**_parsed_args)
    args:
    - --text
    - {inputValue: text}

implementation 以下でコンポーネントの内容が記述されています。Kubeflow Pipelines ではコンポーネントをコンテナとして実装します。コンテナのイメージ名を image に、コンテナに与えるコマンドを command に、引数を args に定義します。ここまででコンポーネントの実装が終了しました。

次に、パイプラインで使われるコンポーネントを作成するファクトリ関数を生成します。コンポーネントに相当するクラスとして ContainerOp が用意されていますが、Vertex Pipelines で動くパイプラインを記述する場合、直接このクラスを使うことはできません。すでに作成した YAML ファイルを用いて、コンポーネントを返す関数を定義します。

# Create a pipeline op from the component we defined above.
hw_op = components.load_component_from_file(
    './hw.yaml'
)  # you can also use load_component_from_url

load_component_from_fileYAML ファイルを読み込み、hw.yaml 中の args で定義した inputValue と同じ引数 text を持つ関数 hw_op を作成します。この hw_op をパイプラインで用いてコンポーネントを生成します。

以上でパイプラインを記述する準備が整いました。パイプラインの定義は pipeline デコレータを用いて関数を定義することにより行います。サンプルの記述は古く、現在は動作しません。正しくは次のとおりです。

@dsl.pipeline(name='hello-world',
              description='A simple intro pipeline',
              pipeline_root="gs://your-gcs-bucket-name")
def pipeline_parameter_to_consumer(text: str = 'hi there'):
    '''Pipeline that passes small pipeline parameter string to consumer op'''
    consume_task = hw_op(
        text
    )  # Passing pipeline parameter as argument to consumer op

このパイプラインは1つのコンポーネント consume_task から成り立ちます。パイプラインのデコレーターではパイプラインの名前 (任意の文字列) とその説明文、パイプラインの生成物の保存先を記述しています。

パイプラインそのものとなるのは関数 pipeline_parameter_to_consumer です。pipeline_parameter_to_consumer の引数 text はパイプライン実行時に与えられる引数を、hi there はパイプラインの引数のデフォルトの値を表します。パイプラインの引数には文字列 (string)、整数 (int)、実数 (double, float) を指定できます。

最後にパイプラインをコンパイルしてパイプラインの仕様を記述した JSON ファイルを作成します。この JSON ファイルは PipelineSpecスキーマにしたがうもので、これをバックエンドに渡すことでパイプラインを実行します。コンパイルは次のようにして行います。

compiler.Compiler().compile(
  pipeline_func=pipeline_parameter_to_consumer,
  package_path='hw_pipeline_job.json',
)

compile を用いてパイプラインをコンパイルしています。pipeline_func にはパイプラインを定義した関数を、package_path にはパイプラインをコンパイルした JSON の出力先を指定します。コンパイル時にはパイプライン関数の評価が行われ、コンポーネントの実行順序の解決が行われます。実行順序はコンポーネントの入出力の関係を Kubeflow Pipelines SDK が読み取ることで自動的に決定されますが、実装者が手動で制御することも可能です。

このようにしてコンパイルしたパイプラインを実行してみましょう。SDK を用いて実行することもできますが、今回は Vertex Pipelines の Web UI 上から手動で実行します。GCP の Web コンソールから Vertex Pipelines を開くとこの様になっています。

f:id:K_Ryuichirou:20210622124655p:plain
Vertex Pipelines Console

ここの Create Run からパイプラインを登録します。詳細は省きますがあまり迷わずに操作できるかと思います。設定の途中でパイプラインの引数が指定でき、デフォルト値も Python の記述と一致していることを確認できます。

f:id:K_Ryuichirou:20210622124747p:plain
Create pipeline run

実行するとこのようにひとつのコンポーネントだけからなるパイプラインが確認できます。

f:id:K_Ryuichirou:20210622124818p:plain
hello world run

実態は Training Job であることもわかります。

f:id:K_Ryuichirou:20210622124841p:plain
Vertex Training

ログを見るとたしかに "hi there" と出力されているので、実装したコードが実行されたことも確認できました。

f:id:K_Ryuichirou:20210622124924p:plain
Console log.

Kubeflow Pipelines の課題

Kubeflow Pipelines SDK を用いる場合、個々のコンポーネントは単なるコンテナでしかないため、実装の自由度は非常に高いです。このため、新しいライブラリを部分的に用いたい場合への対応や、新しいコンポーネントをあとからパイプラインに追加するということも比較的容易にできます。Vertex Pipelines を用いると実行環境もサーバレスにできるので運用コストも抑えられるでしょう。

その一方で、Kubeflow Pipelines は現在過渡期にあり変化が激しいのも事実です。AI Platform Pipelines でデプロイされるのは V1 と呼ばれる環境で、SDK も V1 の API を使う必要があります。一方、Vertex Pipelines を用いるためには Kubeflow Pipelines SDK の V2 を用いなければいけません。V2 では V1 に対する破壊的な変更であり、V1 で利用できていた記述方法が使えなくなります。ContainerOp が直接使えなくなったのもその一環です。

Kubeflow Pipelines SDK V2 については現状で、ドキュメントもサンプルコードも不足しています。ドキュメントは日々更新されており、徐々に記述が進んでいますが、まだクリティカルな機能がドキュメント化されていなく、仕様の理解のためにはソースコードを読んだ上で検証するしかない状況です。また、サンプルコードも2021年6月18日現在では、上記の Hello, world 以外にもうひとつ機械学習とは関係のないサンプルしかなく、自分で機械学習パイプラインを組み上げようとすると途方に暮れてしまうことでしょう。

Vertex Pipelines のように便利なサービスがあるものの Kubeflow Pielines V2 を使い始めるための情報が不足していることが、Kubeflow Pipelines の現在の課題だと考えます。

reproio/lab_sample_pipeline

Repro でも Vertex Pipelines を活用するために検証を行っていますが、その取組の一環として検証用のパイプラインである Sample Pipeline を作成し、公開しました。

https://github.com/reproio/lab_sample_pipelines/tree/main/kfp

こちらを利用すると、機械学習パイプラインで行なわれる典型的な処理をひととおり Vertex Pipelines を用いて実行できます。

f:id:K_Ryuichirou:20210622125007p:plain
Repro lab sample pipeline

また、仕様書や単体テストを実装し、コンポーネント単体での動作確認を行うためのコマンドも記述するといった、業務で用いているパイプライン相当の品質のものになっています。

以降ではその中身について解説します。

Sample Pipeline の仕様

Sample Pipeline では4つのコンポーネントを実装しています。

  • Data Generator
  • Transform
  • Trainer
  • Evaluator

Data Generator は訓練用/評価用のデータセットを作成します。Sample Pipeline では機械学習パイプラインの実装の理解が容易なように、できるだけ単純なデータを利用する方針で実装しています。用いるデータは Palmer Penguin Dataset に前処理を加えたもの です。オリジナルの Palmer Archipelago (Antarctica) penguin data には欠損値が含まれていますが、今回のデータセットには欠損値が含まれていません。また、カテゴリカルな特徴量も削除し、すべての特徴量は 0 から 1 のあいだの数値となるようにスケールされています。

Transform は前処理に相当する処理を行います。今回用いたデータセットは前処理済みのものであるため、擬似的な処理として特徴量のカラム名に特定の文字列を付け加える処理としています。

Trainer はモデルを学習させます。モデルには scikit-learnRandomForestClassifier を用いています。できるだけ単純な実装となるよう、ハイパーパラメーターはいじっていません。

Evaluator はモデルを評価します。評価には accuracy と confusion matrix を用いています。

上記のコンポーネントの設計は TFX で提唱されているものを参考にしています。TFX を用いて実装したサンプルも tensorflow/tfx で提供されているため、ぜひ見比べてみてください。

ComponentSpec

ここでは、hello world よりも複雑な ComponentSpec に従う YAML ファイルの記述を見てみます。Trainer コンポーネントYAML ファイルは次のようになっています。

name: trainer
inputs:
- {name: transformed_train_data_path, type: {GCPPath: {data_type: CSV}}}
- {name: suffix, type: String}
outputs:
- {name: trained_model_path, type: {GCPPath: {data_type: PKL}}}
implementation:
  container:
    image: ${tagged_name}
    args:
      - {inputPath: transformed_train_data_path}
      - {inputValue: suffix}
      - {outputPath: trained_model_path}

inputs では基本となる string, int, float, double 以外の GCPPath という型を transformed_train_data_path に対して指定しています。これは入力値が GCS のバケットの保存先を表す文字列となっていることを期待することを示すものです。同様の型を outputstrained_model_path でも使用しています。

出力 trained_model_pathargsoutputPath として利用さています。これは実行時に Runtime (Vertex Pipelines や Kubeflow Pipelines) が生成するパスで、MLMD にパイプラインの実行 (run と呼ばれます) と紐付けられて保存されます。それぞれの出力はパイプラインの実行ごとに連続して記録されるため、モデルの訓練に用いたデータや実行に用いたコンテナを追跡できます。この様子はたとえば Vertex ML Metadata を利用すると次のように可視化できます。

f:id:K_Ryuichirou:20210622125047p:plain
Vertex ML Metadata

Vertex Pipelines や AI Platform Pipelines でパイプラインを実行する場合、outputPath には /gcs/ から始まるローカルのパスが渡されます。このパスにファイルを書き込むと、パイプラインのコンパイル時に指定された GCS のバケット内に書き込まれます。保存する際に GCS を意識する必要はありません。

また、入力 transformed_train_data_pathargsinputPath として利用されています。inputPath は典型的には他のコンポーネントからの入力であることを示します。こちらも同様に /gcs/ から始まるパスが渡されるので、それをローカルにあるファイルと同様に読み込むだけで GCS に保存されたファイルを読み込めます。

hello, world で作成した YAML ファイルと異なり、containercommand が指定されていません。実際には containercommand は省略可能で、その場合には Dockerfile で指定したコマンドが実行されます。Sample Pipeline では ENTRYPOINT を書くようにしています。コンポーネント単体での動作確認を行いやすいようにこのような記述としていますが、ドキュメントでは command を記述するよう推奨されているため、ここは一考の余地があります。

Sample Pipeline に特異な点として、containerimage${tagged_name} という文字列を指定している点が上げられます。ここには本来であれば利用するコンテナイメージ名を書き込みます。Sample Pipeline では pyproject.toml から読み込んだバージョンを利用し、イメージ名をパイプラインの実行時に動的に指定できるよう、プレースホルダーとなる文字列を指定しています。

他のコンポーネントについても同様な実装としています。ぜひ確認してみてください。

PipelineSpec

Sample Pipeline におけるパイプラインについて解説します。パイプラインの定義 を次に示します。

@kfp.dsl.pipeline(
    name=PIPELINE_NAME,
    pipeline_root=f"gs://{GCP_GCS_PIPELINE_ROOT}/",
)
def kfp_sample_pipeline(suffix: str = "_xf"):
    data_generator = _data_generator_op()
    transform = _transform_op(
        train_data_path=data_generator.outputs[GeneratedData.TrainData.value],
        eval_data_path=data_generator.outputs[GeneratedData.EvalData.value],
        suffix=suffix,
    )
    trainer = _trainer_op(
        transformed_train_data_path=transform.outputs[
            GeneratedData.TransformedTrainData.value
        ],
        suffix=suffix,
    )
    _ = _evaluator_op(
        trained_model_path=trainer.outputs[GeneratedData.TrainedModel.value],
        transformed_eval_data_path=transform.outputs[
            GeneratedData.TransformedEvalData.value
        ],
        suffix=suffix,
    )

_transform_op の引数に data_generatoroutput プロパティを用いています。このように指定することで、コンパイル時に Kubeflow Pipelines SDKtransform コンポーネントdata_generator のあとに実行すべきだと判断できるようになります。また MLMD によりキャッシュが有効になり、パイプラインの実行時にコンポーネントの入力に変更がない場合、そのコンポーネントの実行はスキップされて前回実行した出力が用いられるるようになります。

実行順をパイプラインの入出力ではなく人手で制御することも可能です。たとえば、パイプラインの入力に対してバリデーションを行うコンポーネントを追加したり、前処理を BigQuery に SQL を複数回発行することで行っているケースでは、コンポーネントの実行順を人手で制御する必要があります。そのような場合では after を使うことで実行順の制御が可能です。

今回は行っていませんが、パイプラインのコンパイル時にはコンポーネントごとに計算資源の割当を設定できます。ドキュメントから該当箇所を引用します。

train = (train_op(
      training_data=generate.outputs['training_data'],
      test_data=generate.outputs['test_data'],
      config_file=generate.outputs['config_file']).
    set_cpu_limit('CPU_LIMIT').
    set_memory_limit('MEMORY_LIMIT').
    add_node_selector_constraint('SELECTOR_CONSTRAINT').
    set_gpu_limit('GPU_LIMIT'))

少し読みにくいですが、このようにメソッドチェインにより CPU やメモリ、GPU の割当を設定できます。Vertex AI を用いるとこのような計算資源の確保を柔軟かつサーバーレスに行えます。

今後の課題

Sample Pipeline はひととおり動作しますが、一部、意図通りの動作をさせられていない箇所があります。多くは Kubeflow Pipelines V2 の現状の制約事項によるものです。具体的には次の事象を解決できていません。

  • コンポーネントの出力に対する拡張子の付与
  • Vertex Pipelines での Confusion Matrix の表示
  • GCPPath から Dataset への移行

最後のものについては補足が必要でしょう。Kubeflow Pipelines V2 では string, int, double, float 以外の入出力値は Artifact とするのが望ましいとされます。たとえば、訓練用/評価用データは本来であれば Artifact やそれを継承した Dataset とすべきでしょう。しかし、Artifact についてはドキュメントでの言及はほぼなく、サンプルコードも充実していないため、今回は利用できていません。

Kubeflow Pipelines の開発も、ドキュメントの追加も急ピッチで行なわれています。状況の確認を行いながら、これらの事項に取り組んでいきたいと思います。

おわりに

今回は機械学習パイプラインだけに焦点をあてましたが、Vertex AI では Vertex Pipelines 以外にもさまざまな MLOps に役立つサービスが利用可能です。個人的にはベイズ最適化を行うための Vertex Vizier に着目しています。Vertex Vizier についてはその前身となった Cloud Vizier についての調査を行ったときの資料があるのでご参照ください。

昨今、MLOps という概念が注目を浴びています。Repro においても MLOps に関する取り組みを進めています。MLOps については私の考えをまとめた資料があるのでよければ参照ください。

この MLOps という分野はまだ概念が未整理で、ベストプラクティスも確立していません。取り組む中でも一朝一夕にはできない難しさを痛感しています。現在のところ、自分たちの課題をひとつひとつ手探りで解決していくのが近道になると信じ、日々精進しています。

今回公開した Sample Pipeline には Repro で培ったノウハウが詰まっています。この Sample Pipeline を改善することで業界に貢献するとともに、他の方々と意見交換を行いたいと考えています。Sample Pipeline をリファレンスとし、 MLOps のベストプラクティスの確立に貢献するというのが、この Sample Pipeline を公開した大きな理由です。

Repro ではデータエンジニアリングやサービスを支えるエンジニアを募集しています! Repro は利用者さまのデータを預かり価値を提供する、まさにデータがビジネスの軸のひとつになっている事業を展開しており、データにまつわるおもしろい課題がまだまだ存在すると思います。また、Repro のデータをビジネスへ活用し、価値のあるものにするのはチャレンジのしがいがある仕事だと思います。ご興味を抱いていただけましたらぜひ、以下のページをご参照いただければ幸いです。