Vertex AI Workbench: BigQuery のデータで TensorFlow モデルをトレーニングする

1. 概要

このラボでは、データ探索と ML モデルのトレーニングに Vertex AI Workbench を使用する方法について学習します。

学習内容

次の方法を学習します。

  • Vertex AI Workbench インスタンスを作成して構成する
  • Vertex AI Workbench BigQuery コネクタを使用する
  • Vertex AI Workbench カーネルでモデルをトレーニングする

このラボを Google Cloud で実行するための総費用は約 $1 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルにそれぞれ個別のサービスを介してアクセスする必要がありましたが、Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、Vertex AI Workbench に焦点を当てます。

Vertex AI Workbench は、データサービス(Dataproc、Dataflow、BigQuery、Dataplex など)や Vertex AI との緊密なインテグレーションを通じて、ノートブック ベースのエンドツーエンドのワークフローをすばやく構築するのに便利です。データ サイエンティストは Vertex AI Workbench を使用して、GCP データサービスへの接続、データセットの分析、各種モデリング手法のテスト、トレーニング済みモデルの本番環境へのデプロイ、モデル ライフサイクルを通した MLOps の管理を行うことができます。

3. ユースケースの概要

このラボでは、London Bicycles Hire データセットを調べます。このデータには、2011 年以降のロンドンの公共シェアサイクル プログラムの自転車の利用状況に関する情報が含まれています。まず、Vertex AI Workbench BigQuery コネクタを使用して、BigQuery でこのデータセットを探索します。次に、pandas を使用してデータを Jupyter ノートブックに読み込み、TensorFlow モデルをトレーニングして、サイクリングの発生時期と走行距離に基づいてサイクリングの所要時間を予測します。

このラボでは、Keras 前処理レイヤを使用して、モデルのトレーニング用の入力データを変換して準備します。この API を使用すると、TensorFlow モデルグラフに前処理を直接組み込むことができます。これにより、トレーニング データとサービング データが同一の変換を受けるため、トレーニング/サービング スキューのリスクを軽減できます。TensorFlow 2.6 以降、この API は安定版となっています。古いバージョンの TensorFlow を使用している場合は、試験運用版のシンボルをインポートする必要があります。

4. 環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Compute Engine API を有効にする

まだ有効になっていない場合は、[Compute Engine] に移動して [有効にする] を選択します。

ステップ 2: Vertex AI API を有効にする

Cloud コンソールの [Vertex AI] セクションに移動し、[Vertex AI API を有効にする] をクリックします。

Vertex AI ダッシュボード

ステップ 3: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

Notebooks API をまだ有効にしていない場合は、有効にします。

Notebook_api

有効にしたら、[マネージド ノートブック] をクリックします。

Notebooks_UI

[新しいノートブック] を選択します。

new_notebook

ノートブックの名前を指定して、[権限] で [Service account] を選択します。

service_account

[詳細設定] を選択します。

まだ有効になっていない場合は、[セキュリティ] で、[Enable terminal] を選択します。

enable_terminal

詳細設定のその他の設定はそのままで構いません。

[作成] をクリックします。

インスタンスが作成されたら、[JUPYTERLAB を開く] を選択します。

enable_terminal

5. BigQuery でデータセットを探索する

Vertex AI Workbench インスタンスで、左側に移動して [BigQuery in Notebooks] コネクタをクリックします。

BQ コネクタ

BigQuery コネクタを使用すると、BigQuery データセットを簡単に探索してクエリできます。プロジェクト内のデータセットに加えて、[プロジェクトを追加] ボタンをクリックして、他のプロジェクトのデータセットを探索することもできます。

固定

このラボでは、BigQuery の一般公開データセットのデータを使用します。[london_bicycles] データセットが表示されるまで下にスクロールします。このデータセットには、cycle_hirecycle_stations の 2 つのテーブルがあることがわかります。それぞれについて詳しく見ていきましょう。

london_bike_ds

まず、cycle_hire テーブルをダブルクリックします。テーブルが新しいタブで開き、テーブルのスキーマと、行数やサイズなどのメタデータが表示されます。

cycle_hire_ds

[プレビュー] タブをクリックすると、データのサンプルが表示されます。簡単なクエリを実行して、人気の高いジャーニーを確認してみましょう。まず、[テーブルをクエリ] ボタンをクリックします。

cycle_hire_preview_ds

次に、次のコードを SQL エディタに貼り付けて、[クエリを送信] をクリックします。

SELECT
  start_station_name,
  end_station_name,
  IF(start_station_name = end_station_name,
    TRUE,
    FALSE) same_station,
  AVG(duration) AS avg_duration,
  COUNT(*) AS total_rides
FROM
  `bigquery-public-data.london_bicycles.cycle_hire`
GROUP BY
  start_station_name,
  end_station_name,
  same_station
ORDER BY
  total_rides DESC

クエリの結果から、Hyde Park Corner 駅を発着する自転車での移動が最も人気があったことがわかります。

journey_query_results

次に、各ステーションに関する情報を提供する cycle_stations テーブルをダブルクリックします。

cycle_hire テーブルと cycle_stations テーブルを結合します。cycle_stations テーブルには、各ステーションの緯度と経度が含まれています。この情報を使用して、出発ステーションと到着ステーション間の距離を計算し、各サイクリングの移動距離を推定します。

この計算を行うには、BigQuery の地理関数を使用します。具体的には、各緯度/経度文字列を ST_GEOGPOINT に変換し、ST_DISTANCE 関数を使用して 2 点間の直線距離をメートル単位で計算します。この値は、各サイクルトリップで移動した距離のプロキシとして使用されます。

次のクエリを SQL エディタにコピーして、[クエリを送信] をクリックします。JOIN 条件には 3 つのテーブルがあります。これは、サイクルの開始ステーションと終了ステーションの両方の緯度/経度を取得するために、stations テーブルを 2 回結合する必要があるためです。

WITH staging AS (
    SELECT
        STRUCT(
            start_stn.name,
            ST_GEOGPOINT(start_stn.longitude, start_stn.latitude) AS POINT,
            start_stn.docks_count,
            start_stn.install_date
        ) AS starting,
        STRUCT(
            end_stn.name,
            ST_GEOGPOINT(end_stn.longitude, end_stn.latitude) AS point,
            end_stn.docks_count,
            end_stn.install_date
        ) AS ending,
        STRUCT(
            rental_id,
            bike_id,
            duration, --seconds
            ST_DISTANCE(
                ST_GEOGPOINT(start_stn.longitude, start_stn.latitude),
                ST_GEOGPOINT(end_stn.longitude, end_stn.latitude)
            ) AS distance, --meters
            start_date,
            end_date
        ) AS bike
        FROM `bigquery-public-data.london_bicycles.cycle_stations` AS start_stn
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_hire` as b
        ON start_stn.id = b.start_station_id
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_stations` AS end_stn
        ON end_stn.id = b.end_station_id
        LIMIT 700000)

SELECT * from STAGING

6. TensorFlow カーネルで ML モデルをトレーニングする

Vertex AI Workbench には、TensorFlow、PySpark、R などのカーネルを単一のノートブック インスタンスから起動できるコンピューティング互換性レイヤがあります。このラボでは、TensorFlow カーネルを使用してノートブックを作成します。

DataFrame を作成する

クエリが実行されたら、[Copy code for DataFrame] をクリックします。これにより、BigQuery クライアントに接続し、このデータを pandas DataFrame として抽出する Python コードをノートブックに貼り付けることができます。

copy_for_df

次に、Launcher に戻り、TensorFlow 2 ノートブックを作成します。

tf_kernel

ノートブックの最初のセルに、クエリエディタからコピーしたコードを貼り付けます。以下に例を示します。

# The following two lines are only necessary to run once.
# Comment out otherwise for speed-up.
from google.cloud.bigquery import Client, QueryJobConfig
client = Client()

query = """WITH staging AS (
    SELECT
        STRUCT(
            start_stn.name,
            ST_GEOGPOINT(start_stn.longitude, start_stn.latitude) AS POINT,
            start_stn.docks_count,
            start_stn.install_date
        ) AS starting,
        STRUCT(
            end_stn.name,
            ST_GEOGPOINT(end_stn.longitude, end_stn.latitude) AS point,
            end_stn.docks_count,
            end_stn.install_date
        ) AS ending,
        STRUCT(
            rental_id,
            bike_id,
            duration, --seconds
            ST_DISTANCE(
                ST_GEOGPOINT(start_stn.longitude, start_stn.latitude),
                ST_GEOGPOINT(end_stn.longitude, end_stn.latitude)
            ) AS distance, --meters
            start_date,
            end_date
        ) AS bike
        FROM `bigquery-public-data.london_bicycles.cycle_stations` AS start_stn
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_hire` as b 
        ON start_stn.id = b.start_station_id
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_stations` AS end_stn
        ON end_stn.id = b.end_station_id
        LIMIT 700000)

SELECT * from STAGING"""
job = client.query(query)
df = job.to_dataframe()

このラボでは、トレーニング時間を短縮するために、データセットを 700, 000 に制限します。ただし、クエリを変更して、データセット全体を試すことは可能です。

次に、必要なライブラリをインポートします。

from datetime import datetime
import pandas as pd
import tensorflow as tf

次のコードを実行して、この演習の ML 部分に必要な列のみを含む縮小された DataFrame を作成します。

values = df['bike'].values
duration = list(map(lambda a: a['duration'], values))
distance = list(map(lambda a: a['distance'], values))
dates = list(map(lambda a: a['start_date'], values))
data = pd.DataFrame(data={'duration': duration, 'distance': distance, 'start_date':dates})
data = data.dropna()

start_date 列は Python の datetime です。この datetime をモデルで直接使用する代わりに、自転車の移動が発生した曜日と時間帯を示す 2 つの新しい特徴を作成します。

data['weekday'] = data['start_date'].apply(lambda a: a.weekday())
data['hour'] = data['start_date'].apply(lambda a: a.time().hour)
data = data.drop(columns=['start_date'])

最後に、わかりやすくするために、期間の列を秒から分に変換します。

data['duration'] = data['duration'].apply(lambda x:float(x / 60))

フォーマットされた DataFrame の最初の数行を調べます。これで、サイクリングの各移動について、移動が発生した曜日と時間、移動距離のデータが取得できるようになりました。この情報から、旅行にかかった時間を予測します。

data.head()

data_head

モデルを作成してトレーニングする前に、データをトレーニング セットと検証セットに分割する必要があります。

# Use 80/20 train/eval split
train_size = int(len(data) * .8)
print ("Train size: %d" % train_size)
print ("Evaluation size: %d" % (len(data) - train_size))

# Split data into train and test sets
train_data = data[:train_size]
val_data = data[train_size:]

TensorFlow モデルを作成する

Keras Functional API を使用して TensorFlow モデルを作成します。入力データを前処理するには、Keras 前処理レイヤ API を使用します。

次のユーティリティ関数は、pandas Dataframe から tf.data.Dataset を作成します。

def df_to_dataset(dataframe, label, shuffle=True, batch_size=32):
  dataframe = dataframe.copy()
  labels = dataframe.pop(label)
  ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
  if shuffle:
    ds = ds.shuffle(buffer_size=len(dataframe))
  ds = ds.batch(batch_size)
  ds = ds.prefetch(batch_size)
  return ds

上記の関数を使用して、トレーニング用と検証用の 2 つの tf.data.Dataset を作成します。警告が表示されることがありますが、無視してかまいません。

train_dataset = df_to_dataset(train_data, 'duration')
validation_dataset = df_to_dataset(val_data, 'duration')

モデルでは次の前処理レイヤを使用します。

  • Normalization レイヤ: 入力特徴量を特徴量単位で正規化します。
  • IntegerLookup レイヤ: 整数カテゴリ値を整数インデックスに変換します。
  • CategoryEncoding レイヤ: 整数カテゴリ特徴量をワンホット、マルチホット、TF-IDF 密表現のいずれかに変換します。

これらのレイヤはトレーニングできません。代わりに、adapt() メソッドを使用して、トレーニング データに公開することで、前処理レイヤの状態を設定します。

次の関数は、距離特徴で使用できる正規化レイヤを作成します。トレーニング データで adapt() メソッドを使用して、モデルを適合させる前に状態を設定します。これにより、正規化に使用する平均と分散が計算されます。後で検証データセットをモデルに渡すときに、トレーニング データで計算された同じ平均と分散を使用して検証データがスケーリングされます。

def get_normalization_layer(name, dataset):
  # Create a Normalization layer for our feature.
  normalizer = tf.keras.layers.Normalization(axis=None)

  # Prepare a Dataset that only yields our feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the statistics of the data.
  normalizer.adapt(feature_ds)

  return normalizer

同様に、次の関数は、時間と曜日の特徴で使用するカテゴリ エンコードを作成します。

def get_category_encoding_layer(name, dataset, dtype, max_tokens=None):
  index = tf.keras.layers.IntegerLookup(max_tokens=max_tokens)

  # Prepare a Dataset that only yields our feature
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the set of possible values and assign them a fixed integer index.
  index.adapt(feature_ds)

  # Create a Discretization for our integer indices.
  encoder = tf.keras.layers.CategoryEncoding(num_tokens=index.vocabulary_size())

  # Apply one-hot encoding to our indices. The lambda function captures the
  # layer so we can use them, or include them in the functional model later.
  return lambda feature: encoder(index(feature))

次に、モデルの前処理部分を作成します。まず、各特徴の tf.keras.Input レイヤを作成します。

# Create a Keras input layer for each feature
numeric_col = tf.keras.Input(shape=(1,), name='distance')
hour_col = tf.keras.Input(shape=(1,), name='hour', dtype='int64')
weekday_col = tf.keras.Input(shape=(1,), name='weekday', dtype='int64')

次に、正規化レイヤとカテゴリ エンコード レイヤを作成し、リストに保存します。

all_inputs = []
encoded_features = []

# Pass 'distance' input to normalization layer
normalization_layer = get_normalization_layer('distance', train_dataset)
encoded_numeric_col = normalization_layer(numeric_col)
all_inputs.append(numeric_col)
encoded_features.append(encoded_numeric_col)

# Pass 'hour' input to category encoding layer
encoding_layer = get_category_encoding_layer('hour', train_dataset, dtype='int64')
encoded_hour_col = encoding_layer(hour_col)
all_inputs.append(hour_col)
encoded_features.append(encoded_hour_col)

# Pass 'weekday' input to category encoding layer
encoding_layer = get_category_encoding_layer('weekday', train_dataset, dtype='int64')
encoded_weekday_col = encoding_layer(weekday_col)
all_inputs.append(weekday_col)
encoded_features.append(encoded_weekday_col)

前処理レイヤを定義したら、モデルの残りの部分を定義できます。すべての入力特徴を連結して、密なレイヤに渡します。これは回帰問題なので、出力レイヤは単一のユニットです。

all_features = tf.keras.layers.concatenate(encoded_features)
x = tf.keras.layers.Dense(64, activation="relu")(all_features)
output = tf.keras.layers.Dense(1)(x)
model = tf.keras.Model(all_inputs, output)

最後に、モデルをコンパイルします。

model.compile(optimizer = tf.keras.optimizers.Adam(0.001),
              loss='mean_squared_logarithmic_error')

モデルを定義したので、アーキテクチャを可視化できます。

tf.keras.utils.plot_model(model, show_shapes=True, rankdir="LR")

keras_model

このモデルは、この単純なデータセットに対してはかなり複雑です。これはデモ用です。

コードが実行されることを確認するために、1 エポックだけトレーニングしてみましょう。

model.fit(train_dataset, validation_data = validation_dataset, epochs = 1)

GPU を使用してモデルをトレーニングする

次に、モデルをより長くトレーニングし、ハードウェア スイッチャーを使用してトレーニングを高速化します。Vertex AI Workbench では、インスタンスをシャットダウンせずにハードウェアを変更できます。GPU は必要なときにのみ追加することで、費用を抑えることができます。

ハードウェア プロファイルを変更するには、右上にあるマシンタイプをクリックして [ハードウェアの変更] を選択します。

modify_hardware

[GPU をアタッチ] を選択し、NVIDIA T4 Tensor Core GPU を選択します。

add_gpu

ハードウェアの構成には 5 分ほどかかります。プロセスが完了したら、モデルのトレーニングをもう少し続けます。各エポックにかかる時間が短くなっていることがわかります。

model.fit(train_dataset, validation_data = validation_dataset, epochs = 5)

お疲れさまでした

Vertex AI Workbench を使用して次のことを行う方法を学びました。

  • BigQuery でデータを探索する
  • BigQuery クライアントを使用して Python にデータを読み込む
  • Keras 前処理レイヤと GPU を使用して TensorFlow モデルをトレーニングする

Vertex AI のさまざまな機能の詳細については、こちらのドキュメントをご覧ください。

7. クリーンアップ

ノートブックは、アイドル状態で 60 分が経過するとタイムアウトするように構成されています。このため、インスタンスのシャットダウンを心配する必要はありません。インスタンスを手動でシャットダウンする場合は、Console で [Vertex AI] の [ワークベンチ] セクションにある [停止] ボタンをクリックします。ノートブックを完全に削除する場合は、[削除] ボタンをクリックします。

delete