1. はじめに
Apache Airflow は、DAG を定期的に実行するように設計されていますが、Cloud Storage バケットの変更や Cloud Pub/Sub に push されたメッセージなどのイベントに応答して DAG をトリガーすることもできます。これを実現するには、Cloud Composer DAG を Cloud Functions でトリガーします。
このラボの例では、Cloud Storage バケットで変更が発生するたびにシンプルな DAG を実行します。この DAG は、BashOperator を使用して bash コマンドを実行し、Cloud Storage バケットにアップロードされた内容に関する変更情報を出力します。
このラボを開始する前に、Cloud Composer の概要と Cloud Functions のスタートガイドの Codelab を完了しておくことをおすすめします。「Cloud Composer の概要」Codelab で Composer 環境を作成した場合は、このラボでその環境を使用できます。
作成するもの
この Codelab では、以下のことを行います。
- Google Cloud Storage にファイルをアップロードすると、
- Node.JS ランタイムを使用して Google Cloud Functions をトリガーする
- この関数は、Google Cloud Composer で DAG を実行します。
- Google Cloud Storage バケットへの変更を出力するシンプルな bash コマンドが実行されます。
学習内容
- Google Cloud Functions と Node.js を使用して Apache Airflow DAG をトリガーする方法
必要なもの
- GCP アカウント
- JavaScript に関する基本的な知識
- Cloud Composer、Airflow、Cloud Functions に関する基本的な知識
- CLI コマンドの使いやすさ
2. GCP の設定
プロジェクトを選択または作成する
Google Cloud Platform プロジェクトを選択または作成します。新しいプロジェクトを作成する場合は、こちらの手順を行ってください。
プロジェクト ID をメモしておきます。この ID は後のステップで使用します。
新しいプロジェクトを作成する場合、プロジェクト ID は作成ページのプロジェクト名のすぐ下に表示されます。 | |
すでにプロジェクトを作成している場合は、コンソールのホームページのプロジェクト情報カードで ID を確認できます。 |
API を有効にする
Composer 環境を作成する
次の構成で Cloud Composer 環境を作成します。
その他の構成はすべてデフォルトのままにできます。[作成] をクリックしますComposer 環境の名前と場所をメモしておきます。これは後の手順で必要になります。 |
Cloud Storage バケットを作成する
プロジェクトで、Cloud Storage バケットを作成して次の構成にします。
[作成] をクリックします。準備ができたら、後の手順で使用するため、Cloud Storage バケットの名前をメモしておきます。 |
3. Google Cloud Functions(GCF)の設定
GCF を設定するために、Google Cloud Shell でコマンドを実行します。
Google Cloud は、ノートパソコンから gcloud コマンドライン ツールを使用してリモートで操作できますが、この Codelab では Cloud 上で動作するコマンドライン環境である Google Cloud Shell を使用します。
この Debian ベースの仮想マシンには、必要な開発ツールがすべて揃っています。永続的なホーム ディレクトリが 5 GB 用意されており、Google Cloud で稼働します。ネットワークのパフォーマンスと認証が大幅に向上しています。つまり、この Codelab に必要なのはブラウザだけです(はい、Chromebook で動作します)。
Google Cloud Shell を有効にするには、デベロッパー コンソールの右上にあるボタンをクリックします(環境のプロビジョニングと接続には少し時間がかかります)。 |
Cloud Functions サービス アカウントに blob 署名権限を付与する
Airflow ウェブサーバーを保護する Cloud IAP に対して GCF が認証を行うには、Appspot サービス アカウント GCF に Service Account Token Creator
ロールを付与する必要があります。そのためには、Cloud Shell で次のコマンドを実行します。<your-project-id>
は実際のプロジェクトの名前に置き換えます。
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
たとえば、プロジェクトの名前が my-project
の場合、コマンドは次のようになります。
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
クライアント ID の取得
Cloud IAP に対する認証を行うためのトークンを作成するには、Airflow ウェブサーバーを保護するプロキシのクライアント ID が関数に必要です。Cloud Composer API はこの情報を直接提供しません。代わりに、Airflow ウェブサーバーに未認証のリクエストを行い、リダイレクト URL からクライアント ID を取得します。そのためには、Cloud Shell を使用して Python ファイルを実行し、クライアント ID を取得します。
Cloud Shell で次のコマンドを実行して、GitHub から必要なコードをダウンロードします。
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
このディレクトリがすでに存在することが原因でエラーが発生した場合は、次のコマンドを実行して最新バージョンに更新します。
cd python-docs-samples/ git pull origin master
次のコマンドを実行して、適切なディレクトリに移動します。
cd python-docs-samples/composer/rest
Python コードを実行してクライアント ID を取得します。<your-project-id>
はプロジェクト名、以前に <your-composer-location>
に作成した Composer 環境のロケーション、先ほど <your-composer-environment>
に作成した Composer 環境の名前に置き換えます
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
たとえば、プロジェクト名が my-project
、Composer のロケーションが us-central1
、環境名が my-composer
の場合、コマンドは次のようになります。
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
によって、次の処理が行われます。
- Google Cloud での認証
- リダイレクト URI を取得するために、未認証の HTTP リクエストを Airflow ウェブサーバーに送信します。
- そのリダイレクトから
client_id
クエリ パラメータを抽出します - 印刷して使用できます
クライアント ID は次のような形式でコマンドラインに出力されます。
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. 関数を作成する
Cloud Shell で次のコマンドを実行して、必要なサンプルコードを含むリポジトリのクローンを作成します。
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
必要なディレクトリに移動し、Cloud Shell を開いたまま次の数ステップの手順を完了します
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
ナビゲーション メニューをクリックし、[Cloud Functions] をクリックして、Google Cloud Functions ページに移動する | |
[ファンクションを作成] をクリックします。ページ上部に | |
関数に「my-function」という名前を付けます。メモリはデフォルトの 256MB のままにします | |
トリガーを「Cloud Storage」に設定し、イベントタイプは「ファイナライズ/作成」のままにして、「Cloud Storage バケットを作成する」手順で作成したバケットを参照します。 | |
ソースコードは [インライン エディタ] に設定されたままにするランタイムを「Node.js 8」に設定 |
Cloud Shell で、次のコマンドを実行します。Cloud Shell エディタで index.js と package.json が開きます。
cloudshell edit index.js package.json
[package.json] タブをクリックし、そのコードをコピーして、Cloud Functions インライン エディタの package.json セクションに貼り付けます。 | |
[実行する関数] を設定するtriggerDag への | |
[index.js] タブをクリックしてコードをコピーし、Cloud Functions インライン エディタの index.js セクションに貼り付けます。 | |
|
Cloud Shell で、次のコマンドを実行します。このとき <your-environment-name> の部分をComposer 環境の名前と <your-composer-region>Cloud Composer 環境が配置されているリージョンに置き換えます。
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
たとえば、環境名が my-composer-environment
で、us-central1
に配置されている場合、コマンドは次のようになります。
gcloud composer environments describe my-composer-environment --location us-central1
出力結果は次のようになります。
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
その出力で、 | |
[もっと見る] をクリックしますプルダウンから地理的に最も近いリージョンを選択 | |
[Retry on Failure] チェックボックスをオンにする | |
[作成] をクリックしますCloud Functions 関数を作成する |
コードのステップ実行
index.js からコピーしたコードは次のようになります。
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
状況を見てみましょう。これには、triggerDag
、authorizeIap
、makeIapPostRequest
の 3 つの関数があります。
triggerDag
は、指定された Cloud Storage バケットに何かをアップロードしたときにトリガーされる関数です。ここでは、他のリクエストで使用される重要な変数(PROJECT_ID
、CLIENT_ID
、WEBSERVER_ID
、DAG_NAME
など)を構成します。authorizeIap
と makeIapPostRequest
を呼び出します。
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
が、サービス アカウントを使用して「やり取り」を行い、Airflow ウェブサーバーを保護するプロキシにリクエストを行います。makeIapPostRequest
の認証に使用される ID トークンの JWT。
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
は Airflow ウェブサーバーを呼び出して composer_sample_trigger_response_dag.
をトリガーします。DAG 名は url
パラメータで渡された Airflow ウェブサーバー URL に埋め込まれ、idToken
は authorizeIap
リクエストで取得したトークンです。
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. DAG を設定する
Cloud Shell で、サンプル ワークフローがあるディレクトリに移動します。これは、クライアント ID を取得する手順で GitHub からダウンロードした python-docs-samples の一部です。
cd cd python-docs-samples/composer/workflows
DAG を Composer にアップロードする
次のコマンドを使用して、サンプル DAG を Composer 環境の DAG ストレージ バケットにアップロードします。<environment_name>
は Composer 環境の名前、<location>
は Composer 環境が配置されているリージョンの名前です。trigger_response_dag.py
は作業対象の DAG です。
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
たとえば、Composer 環境の名前が my-composer
で、us-central1
にある場合、コマンドは次のようになります。
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
DAG のステップ実行
trigger_response.py
の DAG コードは次のようになります。
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
default_args
セクションには、Apache Airflow の BaseOperator モデルで必要とされるデフォルトの引数が含まれています。Apache Airflow DAG では、これらのパラメータを含むこのセクションが表示されます。現在、owner
は Composer Example
に設定されていますが、必要に応じて名前に変更できます。depends_on_past
は、この DAG が以前の DAG に依存していないことを示しています。email
、email_on_failure
、email_on_retry
の 3 つのメール セクションは、この DAG のステータスに基づいてメール通知が送信されないように設定されています。retries
が 1 に設定されているため、DAG は 1 回だけ再試行し、retry_delay
に従って 5 分後に再試行します。start_date
は通常、DAG を実行するタイミングを schedule_interval
(後で設定)と組み合わせて使用しますが、この DAG の場合は関係ありません。2017 年 1 月 1 日に設定されていますが、過去の日付に設定することもできます。
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG
セクションでは、実行される DAG を構成します。これは、タスク ID composer_sample_trigger_response_dag
、default_args
セクションのデフォルト引数、そして最も重要なことに、None
の schedule_interval
で実行されます。この特定の DAG を Cloud Functions の関数でトリガーするため、schedule_interval
は None
に設定されています。このため、default_args
の start_date
は意味がありません。
DAG の実行時に、print_gcs_info
変数の指定に従って、DAG の構成が出力されます。
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. 関数をテストする
Composer 環境を開き、環境名の行で [Airflow] リンクをクリックします。 | |
| |
別のタブを開き、前に作成し、Cloud Functions の関数のトリガーとして指定した Cloud Storage バケットに任意のファイルをアップロードします。そのためには、コンソールまたは gsutil コマンドを使用します。 | |
Airflow UI のタブに戻り、[Graph View] をクリックします。 | |
緑色の枠線で囲まれている | |
[ログを表示] をクリックします。[Rules] をクリックすると | |
Cloud Storage バケットにアップロードしたファイルに関する情報がログに表示されます。 |
これで、Node.js と Google Cloud Functions を使用して Airflow DAG をトリガーしました。
7. クリーンアップ
このクイックスタートで使用したリソースについて GCP アカウントに課金されないようにする手順は次のとおりです。
- (省略可)データを保存するには、Cloud Composer 環境の Cloud Storage バケットと、このクイックスタート用に作成したストレージ バケットからデータをダウンロードします。
- 作成した環境の Cloud Storage バケットを削除します。
- Cloud Composer 環境を削除します。環境を削除しても、その環境のストレージ バケットは削除されません。
- (省略可)サーバーレス コンピューティングでは、毎月最初の 200 万回の呼び出しは無料です。関数をゼロにスケーリングしても課金されません(詳細については、料金をご覧ください)。Cloud Functions の関数を削除する場合は、[削除] をクリックします。関数の概要ページの右上で
必要に応じて、プロジェクトを削除することもできます。
- GCP コンソールで [プロジェクト] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。