テストを最大限に活用: Vertex AI による ML テストの管理

1. 概要

このラボでは、Vertex AI を使用して、TensorFlow でカスタム Keras モデルをトレーニングするパイプラインを構築します。次に、Vertex AI Experiments で使用できる新しい機能を使用して、モデル実行を追跡して比較し、最適なパフォーマンスのハイパーパラメータの組み合わせを特定します。

学習内容

次の方法を学習します。

  • プレーヤーの評価を予測するカスタム Keras モデルをトレーニングする(回帰など)
  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • Cloud Storage からのデータの取り込み、データのスケーリング、モデルのトレーニング、その評価、結果としてできたモデルの Cloud Storage への保存を行う 5 ステップのパイプラインを作成して実行する
  • Vertex ML Metadata を活用し、モデルやモデルの指標などのモデル アーティファクトを保存する
  • Vertex AI Experiments を使用して、さまざまなパイプライン実行の結果を比較する

このラボを Google Cloud で実行するための総費用は約 $1 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルにそれぞれ個別のサービスを介してアクセスする必要がありましたが、Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、以下でハイライト表示されているプロダクト(ExperimentsPipelinesML MetadataWorkbench)を中心に学習します。

Vertex プロダクトの概要

3. ユースケースの概要

ここでは、EA Sports の FIFA ビデオゲーム シリーズから取得した人気のサッカー データセットを使用します。2008 ~ 2016 年のシーズンを対象に、25,000 試合以上のサッカーの試合と 10,000 人以上の選手が含まれています。データは事前に前処理されているため、すぐに実行できます。このデータセットはラボ全体で使用します。このデータセットは、一般公開の Cloud Storage バケットにあります。データセットへのアクセス方法については、この Codelab の後半で詳しく説明します。最終的な目標は、インターセプトやペナルティなどのさまざまなゲーム内アクションに基づいて、選手の総合評価を予測することです。

Vertex AI Experiments がデータ サイエンスに役立つ理由

データ サイエンスは実験的な性質を持っています。データ サイエンティストと呼ばれるのはそのためです。優れたデータ サイエンティストは仮説主導型であり、試行錯誤を繰り返してさまざまな仮説をテストし、反復を重ねることでパフォーマンスの高いモデルが生まれることを期待しています。

データ サイエンス チームはテストを積極的に取り入れていますが、テストで得られた作業内容や「秘伝のソース」を追跡するのに苦労することがよくあります。これには、いくつかの理由があります。

  • トレーニング ジョブの追跡が煩雑になり、どの実行が有効でどの実行が有効でないかを見失いやすくなります。
  • この問題は、データ サイエンス チーム全体を見るとさらに複雑になります。すべてのメンバーがテストを追跡するわけではありません。また、一部のメンバーのみが結果を知っていて、他のメンバーに結果が共有されない場合さえあります。
  • データ収集には時間がかかり、ほとんどのチームは手動の方法(シートやドキュメントなど)を利用しているため、一貫性がなく不完全な情報しか得られず、分析しづらい

要約: Vertex AI Experiments は、テストの追跡と比較を簡単に行えるように、ユーザーの代わりに作業を行います。

ゲームに Vertex AI Experiments を使用する理由

ゲームは、これまで機械学習と ML のテストの場として利用されてきました。ゲームは 1 日あたり数十億件のリアルタイム イベントを生成するだけでなく、ML と ML 実験を活用して、ゲーム内エクスペリエンスの改善、プレーヤーの維持、プラットフォーム上のさまざまなプレーヤーの評価に、そのすべてのデータを使用しています。そのため、ゲーム データセットは全体的なテスト演習にうまく適合すると考えました。

4. 環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Compute Engine API を有効にする

まだ有効になっていない場合は、[Compute Engine] に移動して [有効にする] を選択します。

ステップ 2: Vertex AI API を有効にする

Cloud コンソールの [Vertex AI] セクションに移動し、[Vertex AI API を有効にする] をクリックします。

Vertex AI ダッシュボード

ステップ 3: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

Notebooks API をまだ有効にしていない場合は、有効にします。

Notebook_api

有効にしたら、[マネージド ノートブック] をクリックします。

Notebooks_UI

[新しいノートブック] を選択します。

new_notebook

ノートブックに名前を付けて、[詳細設定] をクリックします。

create_notebook

[詳細設定] で、アイドル状態でのシャットダウンを有効にして、シャットダウンまでの時間(分)を 60 に設定します。これにより、使用されていないノートブックが自動的にシャットダウンされるため、不要なコストが発生しません。

idle_timeout

ステップ 4: ノートブックを開く

インスタンスが作成されたら、[JUPYTERLAB を開く] を選択します。

open_jupyterlab

ステップ 5: 認証する(初回のみ)

新しいインスタンスを初めて使用するときに、認証が求められます。その場合は、UI で手順を行います。

authenticate

ステップ 6: 適切なカーネルを選択する

マネージド ノートブックは、1 つの UI で複数のカーネルを提供します。Tensorflow 2(ローカル)のカーネルを選択します。

tensorflow_kernel

5. ノートブックでの初期設定の手順

パイプラインを構築する前に、ノートブック内で環境を設定するための一連の追加の手順を行う必要があります。これらの手順には、追加パッケージのインストール、変数の設定、Cloud Storage バケットの作成、一般公開ストレージ バケットからのゲーム データセットのコピー、ライブラリのインポートと追加の定数の定義が含まれます。

ステップ 1: 追加のパッケージをインストールする

ノートブック環境に現在インストールされていない追加のパッケージ依存関係をインストールする必要があります。この例には KFP SDK が含まれています。

!pip3 install --user --force-reinstall 'google-cloud-aiplatform>=1.15' -q --no-warn-conflicts
!pip3 install --user kfp -q --no-warn-conflicts

次に、ノートブックのカーネルを再起動して、ダウンロードしたパッケージをノートブック内で使用できるようにします。

# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

ステップ 2: 変数を設定する

PROJECT_ID を定義します。Project_ID がわからない場合は、gcloud を使用して PROJECT_ID を取得できることがあります。

import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

それ以外の場合は、ここで PROJECT_ID を設定します。

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

また、このノートブックの残りの部分で使用される REGION 変数も設定します。Vertex AI でサポートされているリージョンは次のとおりです。最も近いリージョンを選択することをおすすめします。

  • 南北アメリカ: us-central1
  • ヨーロッパ: europe-west4
  • アジア太平洋: asia-east1

Vertex AI を使用したトレーニングでは、マルチリージョン バケットを使用しないでください。すべてのリージョンがすべての Vertex AI サービスをサポートしているわけではありません。Vertex AI リージョンの詳細を確認する。

#set your region 
REGION = "us-central1"  # @param {type: "string"}

最後に、TIMESTAMP 変数を設定します。この変数は、作成されたリソース間でユーザー名の競合を回避するために使用されます。インスタンス セッションごとに TIMESTAMP を作成し、このチュートリアルで作成するリソースの名前に TIMESTAMP を追加します。

#set timestamp to avoid collisions between multiple users

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

ステップ 3: Cloud Storage バケットを作成する

Cloud Storage ステージング バケットを指定して活用する必要があります。ステージング バケットは、データセットとモデルのリソースに関連付けられているすべてのデータが、セッションをまたいで保持される場所です。

Cloud Storage バケットの名前を設定します。バケット名は、組織外のものも含め、すべての Google Cloud プロジェクトでグローバルに一意である必要があります。

#set cloud storage bucket 
BUCKET_NAME = "[insert bucket name here]"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

バケットがまだ存在しない場合は、次のセルを実行して Cloud Storage バケットを作成できます。

! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

次のセルを実行して、Cloud Storage バケットへのアクセスを確認できます。

#verify access 
! gsutil ls -al $BUCKET_URI

ステップ 4: ゲーム データセットをコピーする

前述のように、EA Sports の人気ビデオゲーム FIFA のゲーム データセットを活用します。前処理は完了しているため、公開ストレージ バケットからデータセットをコピーして、作成したバケットに移動するだけです。

# copy the data over to your cloud storage bucket
DATASET_URI = "gs://cloud-samples-data/vertex-ai/structured_data/player_data" 

!gsutil cp -r $DATASET_URI $BUCKET_URI

ステップ 5: ライブラリをインポートし、追加の定数を定義する

次に、Vertex AI、KFP などのライブラリをインポートします。

import logging
import os
import time

logger = logging.getLogger("logger")
logging.basicConfig(level=logging.INFO)

import kfp.v2.compiler as compiler
# Pipeline Experiments
import kfp.v2.dsl as dsl
# Vertex AI
from google.cloud import aiplatform as vertex_ai
from kfp.v2.dsl import Artifact, Input, Metrics, Model, Output, component
from typing import NamedTuple

また、トレーニング データのファイルパスなど、ノートブックの残りの部分で参照する追加の定数も定義します。

#import libraries and define constants
# Experiments

TASK = "regression"
MODEL_TYPE = "tensorflow"
EXPERIMENT_NAME = f"{PROJECT_ID}-{TASK}-{MODEL_TYPE}-{TIMESTAMP}"

# Pipeline
PIPELINE_URI = f"{BUCKET_URI}/pipelines"
TRAIN_URI = f"{BUCKET_URI}/player_data/data.csv"
LABEL_URI = f"{BUCKET_URI}/player_data/labels.csv"
MODEL_URI = f"{BUCKET_URI}/model"
DISPLAY_NAME = "experiments-demo-gaming-data"
BQ_DATASET = "player_data"
BQ_LOCATION = "US"  
VIEW_NAME = 'dataset_test'
PIPELINE_JSON_PKG_PATH = "experiments_demo_gaming_data.json"
PIPELINE_ROOT = f"gs://{BUCKET_URI}/pipeline_root"

6. パイプラインを構築する

これで、Vertex AI を活用してトレーニング パイプラインを構築できるようになりました。Vertex AI SDK を初期化し、トレーニング ジョブをパイプライン コンポーネントとして設定し、パイプラインを構築し、パイプライン実行を送信し、Vertex AI SDK を活用してテストを表示し、ステータスをモニタリングします。

ステップ 1: Vertex AI SDK を初期化する

PROJECT_IDBUCKET_URI を設定して、Vertex AI SDK を初期化します。

#initialize vertex AI SDK 
vertex_ai.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

ステップ 2: トレーニング ジョブをパイプライン コンポーネントとして設定する

テストの実行を開始するには、パイプライン コンポーネントとして定義することでトレーニング ジョブを指定する必要があります。パイプラインは、トレーニング データとハイパーパラメータ(DROPOUT_RATELEARNING_RATEEPOCHS など)を入力として受け取り、モデル指標(MAERMSE など)とモデル アーティファクトを出力します。

@component(
    packages_to_install=[
        "numpy==1.21.0",
        "pandas==1.3.5", 
        "scikit-learn==1.0.2",
        "tensorflow==2.9.0",
    ]
)
def custom_trainer(
    train_uri: str,
    label_uri: str,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_uri: str,
    metrics: Output[Metrics], 
    model_metadata: Output[Model], 
    

):

    # import libraries
    import logging
    import uuid
    from pathlib import Path as path

    import pandas as pd
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.layers import Dropout
    from tensorflow.keras.metrics import Metric 
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.metrics import mean_absolute_error
    import numpy as np
    from math import sqrt
    import os
    import tempfile

    # set variables and use gcsfuse to update prefixes
    gs_prefix = "gs://"
    gcsfuse_prefix = "/gcs/"
    train_path = train_uri.replace(gs_prefix, gcsfuse_prefix)
    label_path = label_uri.replace(gs_prefix, gcsfuse_prefix)
    model_path = model_uri.replace(gs_prefix, gcsfuse_prefix)

    def get_logger():

        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        logger.addHandler(handler)
        return logger

    def get_data(
        train_path: str, 
        label_path: str
    ) -> (pd.DataFrame): 
        
        
        #load data into pandas dataframe
        data_0 = pd.read_csv(train_path)
        labels_0 = pd.read_csv(label_path)
        
        #drop unnecessary leading columns
        
        data = data_0.drop('Unnamed: 0', axis=1)
        labels = labels_0.drop('Unnamed: 0', axis=1)
        
        #save as numpy array for reshaping of data 
        
        labels = labels.values
        data = data.values
    
        # Split the data
        labels = labels.reshape((labels.size,))
        train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, shuffle=True, random_state=7)
    
        #Convert data back to pandas dataframe for scaling
        
        train_data = pd.DataFrame(train_data)
        test_data = pd.DataFrame(test_data)
        train_labels = pd.DataFrame(train_labels)
        test_labels = pd.DataFrame(test_labels)
        
        #Scale and normalize the training dataset
        
        scaler = StandardScaler()
        scaler.fit(train_data)
        train_data = pd.DataFrame(scaler.transform(train_data), index=train_data.index, columns=train_data.columns)
        test_data = pd.DataFrame(scaler.transform(test_data), index=test_data.index, columns=test_data.columns)
        
        return train_data,train_labels, test_data, test_labels 
    
        """ Train your Keras model passing in the training data and values for learning rate, dropout rate,and the number of epochs """

    def train_model(
        learning_rate: float, 
        dropout_rate: float,
        epochs: float,
        train_data: pd.DataFrame,
        train_labels: pd.DataFrame):
 
        # Train tensorflow model
        param = {"learning_rate": learning_rate, "dropout_rate": dropout_rate, "epochs": epochs}
        model = Sequential()
        model.add(Dense(500, input_dim=train_data.shape[1], activation= "relu"))
        model.add(Dropout(param['dropout_rate']))
        model.add(Dense(100, activation= "relu"))
        model.add(Dense(50, activation= "relu"))
        model.add(Dense(1))
            
        model.compile(
        tf.keras.optimizers.Adam(learning_rate= param['learning_rate']),
        loss='mse',
        metrics=[tf.keras.metrics.RootMeanSquaredError(),tf.keras.metrics.MeanAbsoluteError()])
        
        model.fit(train_data, train_labels, epochs= param['epochs'])
        
        return model

    # Get Predictions
    def get_predictions(model, test_data):

        dtest = pd.DataFrame(test_data)
        pred = model.predict(dtest)
        return pred

    # Evaluate predictions with MAE
    def evaluate_model_mae(pred, test_labels):
        
        mae = mean_absolute_error(test_labels, pred)
        return mae
    
    # Evaluate predictions with RMSE
    def evaluate_model_rmse(pred, test_labels):

        rmse = np.sqrt(np.mean((test_labels - pred)**2))
        return rmse    
 
    
    #Save your trained model in GCS     
    def save_model(model, model_path):

        model_id = str(uuid.uuid1())
        model_path = f"{model_path}/{model_id}"        
        path(model_path).parent.mkdir(parents=True, exist_ok=True)
        model.save(model_path + '/model_tensorflow')

        
    # Main ----------------------------------------------
    
    train_data, train_labels, test_data, test_labels = get_data(train_path, label_path)
    model = train_model(learning_rate, dropout_rate, epochs, train_data,train_labels )
    pred = get_predictions(model, test_data)
    mae = evaluate_model_mae(pred, test_labels)
    rmse = evaluate_model_rmse(pred, test_labels)
    save_model(model, model_path)

    # Metadata ------------------------------------------

    #convert numpy array to pandas series
    mae = pd.Series(mae)
    rmse = pd.Series(rmse)

    #log metrics and model artifacts with ML Metadata. Save metrics as a list. 
    metrics.log_metric("mae", mae.to_list()) 
    metrics.log_metric("rmse", rmse.to_list()) 
    model_metadata.uri = model_uri

ステップ 3: パイプラインを構築する

次に、KFP で使用可能な Domain Specific Language (DSL) を使用してワークフローを設定し、パイプラインを JSON ファイルにコンパイルします。

# define our workflow

@dsl.pipeline(name="gaming-custom-training-pipeline")
def pipeline(
    train_uri: str,
    label_uri: str,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_uri: str,
):

    custom_trainer(
        train_uri,label_uri, dropout_rate,learning_rate,epochs, model_uri
    )
#compile our pipeline
compiler.Compiler().compile(pipeline_func=pipeline, package_path="gaming_pipeline.json")

ステップ 4: パイプライン実行を送信する

コンポーネントの設定とパイプラインの定義は完了しました。上記で指定したパイプラインのさまざまな実行を送信する準備が整いました。これを行うには、次のようにさまざまなハイパーパラメータの値を定義する必要があります。

runs = [
    {"dropout_rate": 0.001, "learning_rate": 0.001,"epochs": 20},
    {"dropout_rate": 0.002, "learning_rate": 0.002,"epochs": 25},
    {"dropout_rate": 0.003, "learning_rate": 0.003,"epochs": 30},
    {"dropout_rate": 0.004, "learning_rate": 0.004,"epochs": 35},
    {"dropout_rate": 0.005, "learning_rate": 0.005,"epochs": 40},
]

ハイパーパラメータを定義したら、for loop を活用してパイプラインのさまざまな実行を正常にフィードできます。

for i, run in enumerate(runs):

    job = vertex_ai.PipelineJob(
        display_name=f"{EXPERIMENT_NAME}-pipeline-run-{i}",
        template_path="gaming_pipeline.json",
        pipeline_root=PIPELINE_URI,
        parameter_values={
            "train_uri": TRAIN_URI,
            "label_uri": LABEL_URI,
            "model_uri": MODEL_URI,
            **run,
        },
    )
    job.submit(experiment=EXPERIMENT_NAME)

ステップ 5: Vertex AI SDK を活用してテストを表示する

Vertex AI SDK を使用すると、パイプライン実行のステータスをモニタリングできます。また、Vertex AI Experiment でパイプライン実行のパラメータと指標を返すためにも使用できます。次のコードを使用して、実行に関連付けられているパラメータとその現在の状態を確認します。

# see state/status of all the pipeline runs

vertex_ai.get_experiment_df(EXPERIMENT_NAME)

次のコードを使用して、パイプライン実行のステータスに関する更新情報を取得できます。

#check on current status
while True:
    pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
    if all(
        pipeline_state != "COMPLETE" for pipeline_state in pipeline_experiments_df.state
    ):
        print("Pipeline runs are still running...")
        if any(
            pipeline_state == "FAILED"
            for pipeline_state in pipeline_experiments_df.state
        ):
            print("At least one Pipeline run failed")
            break
    else:
        print("Pipeline experiment runs have completed")
        break
    time.sleep(60)

run_name を使用して、特定のパイプライン ジョブを呼び出すこともできます。

# Call the pipeline runs based on the experiment run name
pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
job = vertex_ai.PipelineJob.get(pipeline_experiments_df.run_name[0])
print(job.resource_name)
print(job._dashboard_uri())

最後に、設定した間隔(60 秒ごとなど)で実行の状態を更新して、状態が RUNNING から FAILED または COMPLETE に変化するのを確認します。

# wait 60 seconds and view state again
import time
time.sleep(60)
vertex_ai.get_experiment_df(EXPERIMENT_NAME)

7. 最もパフォーマンスの高い実行を特定する

これで、パイプライン実行の結果が得られました。結果から何を学べるのか、疑問に思われるかもしれません。テストの出力には、各パイプライン実行につき 1 行ずつ、合計 5 行の結果が含まれています。次のような内容になります。

Final-Results-Snapshot

MAE と RMSE はどちらも平均的モデル予測エラーの測定であるため、ほとんどの場合、この 2 つの指標の値は低い方が望ましいです。Vertex AI Experiments の出力から、両方の指標で最もパフォーマンスの高い実行は最後の実行で、dropout_rate が 0.001、learning_rate が 0.001、epochs の合計数が 20 であることがわかります。このテストに基づくと、これらのモデル パラメータは最終的に本番環境で使用されます。このテストで最も優れたモデル パフォーマンスを得られたからです。

これでラボは完了です。

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • プレーヤーの評価を予測するカスタム Keras モデルをトレーニングする(回帰など)
  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • 5 ステップのパイプライン(GCS からのデータの取り込み、データのスケーリング、モデルのトレーニング、その評価、結果としてできたモデルの GCS への保存)を作成して実行する
  • Vertex ML Metadata を活用し、モデルやモデルの指標などのモデル アーティファクトを保存する
  • Vertex AI Experiments を使用して、さまざまなパイプライン実行の結果を比較する

Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。

8. クリーンアップ

課金されないようにするには、このラボ全体で作成したリソースを削除することをおすすめします。

ステップ 1: Notebooks インスタンスを停止または削除する

このラボで作成したノートブックを引き続き使用する場合は、未使用時にオフにすることをおすすめします。Cloud コンソールの Notebooks UI で、ノートブックを選択して [停止] を選択します。インスタンスを完全に削除する場合は、[削除] を選択します。

インスタンスの停止

ステップ 2: Cloud Storage バケットを削除する

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除