1. Google Cloud Storage と Dataflow を使用して Snowflake から Spanner へのリバース ETL パイプラインを構築する
はじめに
このラボでは、リバース ETL パイプラインを構築します。従来、ETL(抽出、変換、読み込み)パイプラインは、分析のためにオペレーショナル データベースから Snowflake などのデータ ウェアハウスにデータを移動していました。 リバース ETL パイプラインは、その逆を行います。キュレーションされた処理済みデータをデータ ウェアハウスからオペレーショナル システムに戻し、アプリケーションの強化、ユーザー向け機能の提供、リアルタイムの意思決定に使用できるようにします。
目標は、サンプル データセットを Snowflake テーブルから Spanner に移動することです。Spanner は、高可用性アプリケーションに最適なグローバル分散リレーショナル データベースです。
これを実現するために、Google Cloud Storage(GCS)と Dataflow を中間ステップとして使用します。フローとこのアーキテクチャの理由の内訳は次のとおりです。
- Snowflake から Google Cloud Storage(GCS)への CSV 形式:
- 最初のステップは、オープンでユニバーサルな形式で Snowflake からデータを取得することです。CSV へのエクスポートは、ポータブル データファイルを作成するための一般的で簡単な方法です。これらのファイルは、スケーラブルで耐久性のあるオブジェクト ストレージ ソリューションを提供する GCS にステージングします。
- GCS から Spanner(Dataflow 経由):
- GCS から読み取って Spanner に書き込むカスタム スクリプトを作成する代わりに、フルマネージド データ処理サービスである Google Dataflow を使用します。Dataflow には、この種のタスク専用の事前構築済みテンプレートが用意されています。「GCS Text to Cloud Spanner」テンプレートを使用すると、データ処理コードを記述せずに、高スループットの並列化されたデータ インポートが可能になり、開発時間を大幅に短縮できます。
学習内容
- データを Snowflake に読み込む方法
- GCS バケットを作成する方法
- Snowflake テーブルを CSV 形式で GCS にエクスポートする方法
- Spanner インスタンスを設定する方法
- Dataflow を使用して CSV テーブルを Spanner に読み込む方法
2. 設定、要件、制限事項
前提条件
- Snowflake アカウント。
- Spanner、Cloud Storage、Dataflow API が有効になっている Google Cloud アカウント。
- ウェブブラウザから Google Cloud コンソール にアクセスできること。
- Google Cloud CLI がインストールされたターミナル。
- Google Cloud 組織で
iam.allowedPolicyMemberDomainsポリシーが有効になっている場合は、外部ドメインのサービス アカウントを許可するために、管理者が例外を付与する必要があります。これについては、該当する場合に後のステップで説明します。
Google Cloud Platform IAM の権限
この Codelab のすべてのステップを実行するには、Google アカウントに次の権限が必要です。
サービス アカウント | ||
| サービス アカウントの作成を許可します。 | |
Spanner | ||
| 新しい Spanner インスタンスの作成を許可します。 | |
| DDL ステートメントを実行して作成することを許可します。 | |
| DDL ステートメントを実行してデータベースにテーブルを作成することを許可します。 | |
Google Cloud Storage | ||
| エクスポートされた Parquet ファイルを保存する新しい GCS バケットの作成を許可します。 | |
| エクスポートされた Parquet ファイルを GCS バケットに書き込むことを許可します。 | |
| BigQuery が GCS バケットから Parquet ファイルを読み取ることを許可します。 | |
| BigQuery が GCS バケット内の Parquet ファイルを一覧表示することを許可します。 | |
Dataflow | ||
| Dataflow からのワークアイテムの取得を許可します。 | |
| Dataflow ワーカーが Dataflow サービスにメッセージを送信することを許可します。 | |
| Dataflow ワーカーがログエントリを Google Cloud Logging に書き込むことを許可します。 | |
便宜上、これらの権限を含む事前定義ロールを使用できます。
|
|
|
|
|
|
|
|
制限事項
システム間でデータを移動する場合は、データ型の違いに注意することが重要です。
- Snowflake から CSV: エクスポート時に、Snowflake のデータ型は標準のテキスト表現に変換されます。
- CSV から Spanner: インポートする場合は、ターゲットの Spanner データ型が CSV ファイルの文字列表現と互換性があることを確認する必要があります。このラボでは、一般的な型マッピングについて説明します。
再利用可能なプロパティを設定する
このラボでは、いくつかの値を繰り返し使用します。これを簡単にするために、これらの値をシェル変数に設定して後で使用します。
- GCP_REGION - GCP リソースが配置される特定のリージョン。リージョンの一覧については、こちらをご覧ください。
- GCP_PROJECT - 使用する GCP プロジェクト ID。
- GCP_BUCKET_NAME - 作成する GCS バケットの名前。データファイルが保存されます。
- SPANNER_INSTANCE - Spanner インスタンスに割り当てる名前
- SPANNER_DB - Spanner インスタンス内のデータベースに割り当てる名前
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
このラボでは、Google Cloud プロジェクトが必要です。
Google Cloud プロジェクト
プロジェクトは、Google Cloud の基本的な組織単位です。管理者が使用するプロジェクトを提供している場合は、このステップをスキップできます。
プロジェクトは、次のように CLI を使用して作成できます。
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
詳細については、プロジェクトの作成と管理をこちらでご覧ください。
3. Spanner を設定する
Spanner の使用を開始するには、インスタンスとデータベースをプロビジョニングする必要があります。Spanner インスタンスの構成と作成の詳細については、こちらをご覧ください。
インスタンスを作成する
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
データベースを作成する
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. Google Cloud Storage バケットを作成する
Google Cloud Storage(GCS)は、Snowflake で生成された CSV データファイルを Spanner にインポートする前に一時的に保存するために使用されます。
バケットを作成する
次のコマンドを使用して、特定のリージョン(us-central1 など)にストレージ バケットを作成します。
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
バケットの作成を確認する
コマンドが成功したら、すべてのバケットを一覧表示して結果を確認します。新しいバケットが結果リストに表示されます。通常、バケット参照はバケット名の前に接頭辞 gs:// が付いて表示されます。
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
書き込み権限をテストする
このステップでは、ローカル環境が正しく認証され、新しく作成されたバケットにファイルを書き込むために必要な権限があることを確認します。
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
アップロードしたファイルを確認する
バケット内のオブジェクトを一覧表示します。アップロードしたファイルの完全パスが表示されます。
gcloud storage ls gs://$GCS_BUCKET_NAME
次の出力が表示されます。
gs://$GCS_BUCKET_NAME/hello.txt
バケット内のオブジェクトの内容を表示するには、gcloud storage cat を使用します。
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
ファイルの内容が表示されます。
Hello, GCS
テストファイルをクリーンアップする
Cloud Storage バケットが設定されました。一時的なテストファイルを削除できます。
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
出力で削除を確認します。
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. Snowflake から GCS にエクスポートする
このラボでは、意思決定支援システムの業界標準ベンチマークである TPC-H データセット を使用します。このデータセットは、すべての Snowflake アカウントでデフォルトで使用できます。
Snowflake でデータを準備する
Snowflake アカウントにログインし、新しいワークシートを作成します。
Snowflake が提供するサンプル TPC-H データは、権限により共有ロケーションから直接エクスポートできません。まず、ORDERS テーブルを別のデータベースとスキーマにコピーする必要があります。
データベースの作成
- 左側のサイドメニューの [Horizon Catalog] で [カタログ] にカーソルを合わせ、[データベース エクスプローラ] をクリックします。
- [**データベース**] ページで、右上の [**\+ データベース**] ボタンをクリックします。
- 新しいデータベースに
codelabs_retl_dbという名前を付けます。
ワークシートを作成する
データベースに対して SQL コマンドを実行するには、ワークシートが必要です。
ワークシートを作成するには:
- 左側のサイドメニューの [Work with data] で [Projects] にカーソルを合わせ、[Workspaces] をクリックします。
- [My Workspaces] サイドバーで、[+ Add new] ボタンをクリックし、[SQL File] を選択します。
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
出力には、4375 行がコピーされたことが示されます。
GCS にアクセスするように Snowflake を構成する
Snowflake が GCS バケットにデータを書き込めるようにするには、ストレージ統合 とステージ を作成する必要があります。
- ストレージ統合: 生成されたサービス アカウントと外部クラウド ストレージの認証情報を保存する Snowflake オブジェクト。
- ステージ: ストレージ統合を使用して認証を処理する、特定のバケットとパスを参照する名前付きオブジェクト。データの読み込みとアンロードのオペレーションに便利な名前付きロケーションを提供します。
まず、ストレージ統合を作成します。
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
次に、統合を記述して、Snowflake が作成したサービス アカウントを取得します。
DESC STORAGE INTEGRATION gcs_int;
結果で、STORAGE_GCP_SERVICE_ACCOUNT の値をコピーします。メールアドレスのように表示されます。
このサービス アカウントをシェル インスタンスの環境変数に保存して、後で再利用できるようにします。
export GCP_SERVICE_ACCOUNT=<Your service account>
Snowflake に GCS の権限を付与する
次に、Snowflake サービス アカウントに GCS バケットへの書き込み権限を付与する必要があります。
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
ステージを作成してデータをエクスポートする
権限が設定されたら、Snowflake ワークシートに戻ります。統合を使用するステージを作成し、COPY INTO コマンドを使用して SAMPLE_ORDERS テーブルデータをそのステージにエクスポートします。
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
[結果] ペインに rows_unloaded が表示され、値が 1500000 になります。
GCS のデータを確認する
GCS バケットを確認して、Snowflake が作成したファイルを確認します。これにより、エクスポートが成功したことが確認できます。
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
1 つ以上の番号付き CSV ファイルが表示されます。
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. Dataflow を使用してデータを Spanner に読み込む
データが GCS に保存されたので、Dataflow を使用して Spanner にインポートします。Dataflow は、ストリーム データとバッチデータの処理を行う Google Cloud のフルマネージド サービスです。GCS から Spanner にテキスト ファイルをインポートするために特別に設計された、事前構築済みの Google テンプレートを使用します。
Spanner テーブルを作成する
まず、Spanner に宛先テーブルを作成します。スキーマは CSV ファイル内のデータと互換性がある必要があります。
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
Dataflow マニフェストを作成する
Dataflow テンプレートには「マニフェスト」ファイルが必要です。これは、ソース データファイルの場所と、読み込む Spanner テーブルをテンプレートに伝える JSON ファイルです。
新しい regional_sales_manifest.json を定義して GCS バケットにアップロードします。
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
Dataflow API を有効にする
Dataflow を使用する前に、有効にする必要があります。有効にするには、
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
Dataflow ジョブを作成して実行する
インポート ジョブを実行する準備が整いました。このコマンドは、GCS_Text_to_Cloud_Spanner テンプレートを使用して Dataflow ジョブを起動します。
コマンドは長く、複数のパラメータがあります。内訳は次のとおりです。
| GCS 上の事前構築済みテンプレートのパス。 | |
| Dataflow ジョブが実行されるリージョン。 | |
| ||
| ターゲットの Spanner インスタンスとデータベース。 | |
| 作成したマニフェスト ファイルの GCS パス。 | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
Dataflow ジョブのステータスは、次のコマンドで確認できます。
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
ジョブが完了するまでに約 5 分かかります。
Spanner のデータを確認する
Dataflow ジョブが成功したら、データが Spanner に読み込まれたことを確認します。
まず、行数を確認します。4375 にする必要があります。
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
次に、いくつかの行をクエリしてデータを検査します。
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
Snowflake テーブルからインポートされたデータが表示されます。
7. クリーンアップ
Spanner をクリーンアップする
Spanner データベースとインスタンスを削除します。
gcloud spanner instances delete $SPANNER_INSTANCE
GCS をクリーンアップする
データをホストするために作成した GCS バケットを削除します。
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
Snowflake をクリーンアップする
データベースをドロップ
- 左側のサイドメニューの [Horizon Catalog] で [カタログ] にカーソルを合わせ、[データベース エクスプローラ] をクリックします。
CODELABS_RETL_DBデータベースの右側にある [...] をクリックしてオプションを開き、[Drop] を選択します。- 確認ダイアログが表示されたら、[Drop Database] を選択します。
ワークブックを削除する
- 左側のサイドメニューの [Work with data] で [Projects] にカーソルを合わせ、[Workspaces] をクリックします。
- [My Workspace] サイドバーで、このラボで使用したさまざまなワークスペース ファイルにカーソルを合わせると、[...] その他のオプションが表示されるので、クリックします。
- [削除] を選択し、表示された確認ダイアログで [削除] をもう一度選択します。
- このラボ用に作成したすべての SQL ワークスペース ファイルに対してこれを行います。
8. 完了
以上で、この Codelab は完了です。
学習した内容
- データを Snowflake に読み込む方法
- GCS バケットを作成する方法
- Snowflake テーブルを CSV 形式で GCS にエクスポートする方法
- Spanner インスタンスを設定する方法
- Dataflow を使用して CSV テーブルを Spanner に読み込む方法