1. 簡介
Apache Airflow 可以定期執行 DAG,但您也可以為發生事件時觸發 DAG,例如 Cloud Storage 值區中的變更或推送至 Cloud Pub/Sub 的訊息。為此,您可以透過 Cloud Functions 觸發 Cloud Composer DAG。
本研究室中的範例,每次在 Cloud Storage 值區中發生變更時,都會執行簡單的 DAG。這個 DAG 會使用 BashOperator 執行 bash 指令,其中列印已上傳至 Cloud Storage 值區的內容變更資訊。
開始本研究室前,建議您完成 Cloud Composer 簡介和開始使用 Cloud Functions 的程式碼研究室。如果您在 Cloud Composer 簡介程式碼研究室中建立 Composer 環境,則可在本研究室中使用該環境。
建構內容
在本程式碼研究室中,您將進行以下作業:
- 將檔案上傳至 Google Cloud Storage,
- 使用 Node.JS 執行階段觸發 Google Cloud 函式
- 這個函式將在 Google Cloud Composer 中執行 DAG
- 這會執行簡單的 bash 指令,將變更輸出至 Google Cloud Storage 值區
課程內容
- 如何使用 Google Cloud Functions 和 Node.js 觸發 Apache Airflow DAG
軟硬體需求
- GCP 帳戶
- 對 JavaScript 有基本瞭解
- 對 Cloud Composer/Airflow 和 Cloud Functions 有基本瞭解
- 習慣使用 CLI 指令
2. 設定 GCP
選取或建立專案
選取或建立 Google Cloud Platform 專案。如要建立新專案,請按照這裡的步驟操作。
記下專案 ID,後續步驟將會用到。
建立新專案時,專案 ID 會顯示在建立頁面的「專案名稱」下方 | |
如果您已建立專案,即可在「專案資訊卡」的控制台首頁中找到專案 ID |
啟用 API
建立 Composer 環境
使用下列設定建立 Cloud Composer 環境:
所有其他設定均可保留預設設定。按一下「建立」並記下 Composer 環境名稱和位置,後續步驟將會用到。 |
建立 Cloud Storage 值區
在專案中,使用下列設定建立 Cloud Storage 值區:
按下「建立」準備好之後,請務必記下 Cloud Storage 值區名稱以供後續步驟使用。 |
3. 設定 Google Cloud Functions (GCF)
如要設定 GCF,我們會在 Google Cloud Shell 中執行指令。
雖然您可以在筆記型電腦上使用 gcloud 指令列工具遠端操作 Google Cloud,但在本程式碼研究室中,我們會使用 Google Cloud Shell,這是 Cloud 環境中的指令列環境。
這種以 Debian 為基礎的虛擬機器,搭載各種您需要的開發工具。提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作,因此能大幅提升網路效能和驗證成效。換言之,本程式碼研究室只需要在 Chromebook 上運作即可。
如要啟用 Google Cloud Shell,請點選開發人員控制台右上方的按鈕 (應該只需幾分鐘就能佈建並連線至環境): |
將 blob 簽署權限授予 Cloud Functions 服務帳戶
為了讓 GCF 向 Cloud IAP 進行驗證 (保護 Airflow 網路伺服器的 Proxy),您必須將 Service Account Token Creator
角色授予 Appspot 服務帳戶 GCF。方法是在 Cloud Shell 中執行下列指令,並將 <your-project-id>
替換成您的專案名稱。
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
舉例來說,如果專案名稱是 my-project
,指令會是:
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
取得用戶端 ID
如要建構權杖來向 Cloud IAP 進行驗證,您必須為函式提供保護 Airflow 網路伺服器的 Proxy 用戶端 ID。Cloud Composer API 不會直接提供這項資訊。而是可以向 Airflow 網路伺服器發出未經驗證的要求,並從重新導向網址擷取用戶端 ID。為此,我們使用 Cloud Shell 執行 Python 檔案來擷取用戶端 ID。
在 Cloud Shell 中執行下列指令,從 GitHub 下載必要程式碼
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
如果錯誤訊息是因為這個目錄已存在,請執行下列指令,將目錄更新為最新版本
cd python-docs-samples/ git pull origin master
執行命令,切換至適當的目錄
cd python-docs-samples/composer/rest
執行 Python 程式碼來取得用戶端 ID,將 <your-project-id>
替換成您的專案名稱、您先前為 <your-composer-location>
建立的 Composer 環境位置,以及您先前為 <your-composer-environment>
建立的 Composer 環境名稱
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
舉例來說,如果您的專案名稱為 my-project
,Composer 位置為 us-central1
,環境名稱為 my-composer
,指令就會是
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
會執行以下動作:
- 透過 Google Cloud 進行驗證
- 向 Airflow 網路伺服器發出未經驗證的 HTTP 要求,以取得重新導向 URI
- 從該重新導向中擷取
client_id
查詢參數 - 列印後可供使用
您的用戶端 ID 會顯示在指令列中,如下所示:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. 建立函式
在 Cloud Shell 中,執行下列指令以複製存放區與必要的程式碼範例
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
變更至必要的目錄,並且讓 Cloud Shell 保持開啟狀態,同時完成後續步驟
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
依序按一下導覽選單和「Cloud Functions」來前往「Google Cloud Functions」頁面。 | |
按一下「建立函式」網頁頂端 | |
將函式命名為「my-function」保留預設的 256 MB 記憶體 | |
將觸發條件設為「Cloud Storage」,將事件類型保留為「Finalize/Create」,然後瀏覽至您在「建立 Cloud Storage 值區」步驟中建立的值區。 | |
保留原始碼的「Inline Editor」(內嵌編輯器) 設定並將執行階段設為「Node.js 8」 |
在 Cloud Shell 中執行下列指令。系統會在 Cloud Shell 編輯器中開啟 index.js 和 package.json
cloudshell edit index.js package.json
按一下「package.json」分頁標籤,複製這組程式碼並貼到 Cloud Functions 內嵌編輯器中的 package.json 部分 | |
設定「Function to Execute」觸發 Dag | |
按一下 index.js 分頁標籤、複製程式碼,然後貼到 Cloud Functions 內嵌編輯器中的 index.js 部分。 | |
將 |
在 Cloud Shell 中執行下列指令,並將 <your-environment-name>命名為 Composer 環境名稱和 <your-composer-region>改成 Composer 環境所在的區域。
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
舉例來說,如果環境名為 my-composer-environment
且位於 us-central1
,指令如下:
gcloud composer environments describe my-composer-environment --location us-central1
輸出內容應如下所示:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
在該輸出內容中,找出名為 | |
按一下「更多」下拉式選單,然後選擇離您最近的地理區域 | |
檢查「失敗時重試」 | |
按一下「建立」建立 Cloud 函式 |
逐步執行程式碼
您從 index.js 複製的程式碼如下所示:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
一起來看看發生了什麼事。這裡有三個函式:triggerDag
、authorizeIap
和 makeIapPostRequest
triggerDag
是將內容上傳至指定的 Cloud Storage 值區時觸發的函式。在這裡,我們可以設定其他要求中使用的重要變數,例如 PROJECT_ID
、CLIENT_ID
、WEBSERVER_ID
和 DAG_NAME
。呼叫 authorizeIap
和 makeIapPostRequest
。
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
會使用服務帳戶向保護 Airflow 網路伺服器的 Proxy 發出要求,並使用服務帳戶進行「交換」用於驗證 makeIapPostRequest
的 ID 權杖 JWT。
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
會呼叫 Airflow 網路伺服器以觸發 composer_sample_trigger_response_dag.
。DAG 名稱內嵌在透過 url
參數傳入的 Airflow 網路伺服器網址中,idToken
則是我們在 authorizeIap
要求中取得的權杖。
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. 設定 DAG
在 Cloud Shell 中,變更為包含工作流程範例的目錄。屬於您在「取得用戶端 ID」步驟中從 GitHub 下載的 python-docs-samples 的一部分。
cd cd python-docs-samples/composer/workflows
將 DAG 上傳至 Composer
請使用下列指令,將 DAG 範例上傳至 Composer 環境的 DAG 儲存空間值區,其中 <environment_name>
是 Composer 環境名稱,<location>
則是其所在地區的名稱。trigger_response_dag.py
是我們要使用的 DAG。
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
舉例來說,如果您的 Composer 環境名為 my-composer
,且位於 us-central1
,您的指令如下:
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
逐步完成 DAG
trigger_response.py
中的 DAG 程式碼如下所示
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
default_args
區段包含 Apache Airflow 中 BaseOperator 模型所需的預設引數。您會在任何 Apache Airflow DAG 中看到含有這些參數的部分。owner
目前設為 Composer Example
,但您可以視需求將其變更為您的名稱。depends_on_past
表示這個 DAG 未依賴任何先前的 DAG。已設定 email
、email_on_failure
和 email_on_retry
這三個電子郵件區段,以免系統根據此 DAG 的狀態傳送任何電子郵件通知。由於 retries
設為 1,且每 retry_delay
五分鐘後,DAG 只會重試一次。start_date
通常會決定 DAG 應搭配其 schedule_interval
(稍後設定) 的執行時機,但就本 DAG 而言,則不相關。日期是 2017 年 1 月 1 日,但可設為過去的日期。
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG
區段會設定要執行的 DAG。此指令將以工作 ID composer_sample_trigger_response_dag
執行,也就是 default_args
區段中的預設引數,最重要的是,其中 schedule_interval
為 None
。我們會透過 Cloud 函式觸發這個特定的 DAG,因此 schedule_interval
已設為 None
。因此 default_args
中的 start_date
與此無關。
執行時,DAG 會按照 print_gcs_info
變數中的指示輸出設定。
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. 測試函式
開啟 Composer 環境,然後在含有環境名稱的列中按一下 Airflow 連結 | |
按一下 | |
開啟另一個分頁,並將任何檔案上傳至先前建立且指定為 Cloud 函式觸發條件的 Cloud Storage 值區。您可以透過主控台或使用 gsutil 指令進行這項作業。 | |
返回含有 Airflow UI 的分頁,然後按一下「Graph View」 | |
按一下應以綠色外框顯示的 | |
按一下 [查看記錄](位於選單右上角) | |
您會在記錄檔中看到已上傳至 Cloud Storage 值區的檔案相關資訊。 |
恭喜!您已成功透過 Node.js 和 Google Cloud Functions 觸發 Airflow DAG!
7. 清除
如要避免系統向您的 GCP 帳戶收取您在本快速入門導覽課程中所用資源的相關費用,請按照下列指示操作:
- (選用) 如要儲存資料,請從 Cloud Composer 環境的 Cloud Storage 值區以及為本快速入門導覽課程建立的儲存空間值區下載資料。
- 刪除環境和您建立的 Cloud Storage 值區
- 刪除 Cloud Composer 環境。請注意,刪除環境並不會刪除環境的儲存空間值區。
- (選用) 採用無伺服器運算技術時,每個月前 200 萬次叫用不會產生費用,而您將函式擴充為零時,無須支付費用 (詳情請參閱定價說明)。不過,如要刪除 Cloud 函式,請點選「刪除」位於函式總覽頁面的右上角
您也可以選擇刪除專案:
- 在 GCP 控制台中,前往「專案」頁面。
- 在專案清單中,選取要刪除的專案,然後按一下「Delete」(刪除)。
- 在方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。