1. บทนำ
Apache Airflow ออกแบบมาเพื่อเรียกใช้ DAG ตามกำหนดการปกติ แต่คุณยังทริกเกอร์ DAG เพื่อตอบสนองต่อเหตุการณ์ต่างๆ ได้ด้วย เช่น การเปลี่ยนแปลงในที่เก็บข้อมูล Cloud Storage หรือข้อความที่พุชไปยัง Cloud Pub/Sub หากต้องการดำเนินการนี้ คุณสามารถทริกเกอร์ DAG ของ Cloud Composer ได้โดยใช้ Cloud Functions
ตัวอย่างใน Lab นี้จะเรียกใช้ DAG อย่างง่ายทุกครั้งที่มีการเปลี่ยนแปลงใน Bucket ของ Cloud Storage DAG นี้ใช้ BashOperator เพื่อเรียกใช้คำสั่ง Bash ที่พิมพ์ข้อมูลการเปลี่ยนแปลงเกี่ยวกับสิ่งที่อัปโหลดไปยัง Bucket ของ Cloud Storage
ก่อนเริ่มแล็บนี้ ขอแนะนำให้ทำ Codelab ข้อมูลเบื้องต้นเกี่ยวกับ Cloud Composer และการเริ่มต้นใช้งาน Cloud Functions ให้เสร็จสมบูรณ์ หากสร้างสภาพแวดล้อม Composer ใน Codelab "ข้อมูลเบื้องต้นเกี่ยวกับ Cloud Composer" คุณจะใช้สภาพแวดล้อมนั้นใน Lab นี้ได้
สิ่งที่คุณจะสร้าง
ใน Codelab นี้ คุณจะได้ทำสิ่งต่อไปนี้
- อัปโหลดไฟล์ไปยัง Google Cloud Storage ซึ่งจะ
- ทริกเกอร์ Google Cloud Function โดยใช้รันไทม์ Node.JS
- ฟังก์ชันนี้จะเรียกใช้ DAG ใน Google Cloud Composer
- ซึ่งจะเรียกใช้คำสั่ง Bash อย่างง่ายที่พิมพ์การเปลี่ยนแปลงไปยัง Bucket ของ Google Cloud Storage

สิ่งที่คุณจะได้เรียนรู้
- วิธีทริกเกอร์ DAG ของ Apache Airflow โดยใช้ Google Cloud Functions + Node.js
สิ่งที่คุณต้องมี
- บัญชี GCP
- ความเข้าใจพื้นฐานเกี่ยวกับ JavaScript
- ความรู้พื้นฐานเกี่ยวกับ Cloud Composer/Airflow และ Cloud Functions
- ความสะดวกในการใช้คำสั่ง CLI
2. การตั้งค่า GCP
เลือกหรือสร้างโปรเจ็กต์
เลือกหรือสร้างโปรเจ็กต์ Google Cloud Platform หากจะสร้างโปรเจ็กต์ใหม่ ให้ทำตามขั้นตอนที่นี่
จดรหัสโปรเจ็กต์ไว้เพื่อใช้ในขั้นตอนต่อๆ ไป
หากคุณสร้างโปรเจ็กต์ใหม่ คุณจะเห็นรหัสโปรเจ็กต์อยู่ใต้ชื่อโปรเจ็กต์ในหน้าการสร้าง |
|
หากสร้างโปรเจ็กต์แล้ว คุณจะดูรหัสได้ในหน้าแรกของคอนโซลในการ์ดข้อมูลโปรเจ็กต์ |
|
เปิดใช้ API
|
สร้างสภาพแวดล้อมคอมโพสเซอร์
สร้างสภาพแวดล้อม Cloud Composer ที่มีการกำหนดค่าต่อไปนี้
การกำหนดค่าอื่นๆ ทั้งหมดสามารถคงค่าเริ่มต้นไว้ได้ คลิก "สร้าง" ที่ด้านล่าง จดชื่อและตำแหน่งของสภาพแวดล้อม Composer ไว้ เนื่องจากคุณจะต้องใช้ในขั้นตอนต่อๆ ไป |
|
สร้างที่เก็บข้อมูล Cloud Storage
สร้างที่เก็บข้อมูล Cloud Storage ในโปรเจ็กต์ด้วยการกำหนดค่าต่อไปนี้
กด "สร้าง" เมื่อพร้อมแล้ว โปรดจดชื่อของ Bucket ใน Cloud Storage ไว้สำหรับขั้นตอนต่อๆ ไป |
|
3. การตั้งค่า Google Cloud Functions (GCF)
หากต้องการตั้งค่า GCF เราจะเรียกใช้คำสั่งใน Google Cloud Shell
แม้ว่าคุณจะใช้งาน Google Cloud จากระยะไกลจากแล็ปท็อปได้โดยใช้เครื่องมือบรรทัดคำสั่ง gcloud แต่ใน Codelab นี้เราจะใช้ Google Cloud Shell ซึ่งเป็นสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานในระบบคลาวด์
เครื่องเสมือนที่ใช้ Debian นี้มาพร้อมเครื่องมือพัฒนาทั้งหมดที่คุณต้องการ โดยมีไดเรกทอรีหลักแบบถาวรขนาด 5 GB และทำงานบน Google Cloud ซึ่งช่วยเพิ่มประสิทธิภาพเครือข่ายและการตรวจสอบสิทธิ์ได้อย่างมาก ซึ่งหมายความว่าคุณจะต้องมีเพียงเบราว์เซอร์เท่านั้นสำหรับโค้ดแล็บนี้ (ใช่แล้ว ใช้ได้ใน Chromebook)
หากต้องการเปิดใช้งาน Google Cloud Shell ให้คลิกปุ่มที่ด้านขวาบนจากคอนโซลนักพัฒนาแอป (การจัดสรรและเชื่อมต่อกับสภาพแวดล้อมจะใช้เวลาเพียงไม่กี่นาที) |
|
ให้สิทธิ์การลงนามใน Blob แก่บัญชีบริการ Cloud Functions
หากต้องการให้ GCF ตรวจสอบสิทธิ์กับ Cloud IAP ซึ่งเป็นพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow คุณต้องให้บทบาท Service Account Token Creator แก่บัญชีบริการ Appspot GCF โดยเรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell และแทนที่ชื่อโปรเจ็กต์ด้วย <your-project-id>
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
เช่น หากโปรเจ็กต์ชื่อ my-project คำสั่งจะเป็น
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
การรับรหัสไคลเอ็นต์
หากต้องการสร้างโทเค็นเพื่อตรวจสอบสิทธิ์กับ Cloud IAP ฟังก์ชันจะต้องมีรหัสไคลเอ็นต์ของพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow Cloud Composer API ไม่ได้ให้ข้อมูลนี้โดยตรง แต่ให้ส่งคำขอที่ไม่ผ่านการตรวจสอบสิทธิ์ไปยังเว็บเซิร์ฟเวอร์ Airflow และบันทึกรหัสไคลเอ็นต์จาก URL การเปลี่ยนเส้นทางแทน เราจะดำเนินการดังกล่าวโดยการเรียกใช้ไฟล์ Python โดยใช้ Cloud Shell เพื่อบันทึกรหัสไคลเอ็นต์
ดาวน์โหลดโค้ดที่จำเป็นจาก GitHub โดยเรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
หากได้รับข้อผิดพลาดเนื่องจากมีไดเรกทอรีนี้อยู่แล้ว ให้อัปเดตเป็นเวอร์ชันล่าสุดโดยเรียกใช้คำสั่งต่อไปนี้
cd python-docs-samples/ git pull origin master
เปลี่ยนเป็นไดเรกทอรีที่เหมาะสมโดยเรียกใช้
cd python-docs-samples/composer/rest
เรียกใช้โค้ด Python เพื่อรับรหัสไคลเอ็นต์ โดยแทนที่ชื่อโปรเจ็กต์ด้วย <your-project-id> ตำแหน่งของสภาพแวดล้อม Composer ที่คุณสร้างไว้ก่อนหน้านี้ด้วย <your-composer-location> และชื่อของสภาพแวดล้อม Composer ที่คุณสร้างไว้ก่อนหน้านี้ด้วย <your-composer-environment>
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
เช่น หากชื่อโปรเจ็กต์คือ my-project, ตำแหน่ง Composer คือ us-central1 และชื่อสภาพแวดล้อมคือ my-composer คำสั่งจะเป็น
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py จะดำเนินการต่อไปนี้
- ตรวจสอบสิทธิ์กับ Google Cloud
- ส่งคำขอ HTTP ที่ไม่ได้ตรวจสอบสิทธิ์ไปยังเว็บเซิร์ฟเวอร์ Airflow เพื่อรับ URI การเปลี่ยนเส้นทาง
- ดึงพารามิเตอร์การค้นหา
client_idจากการเปลี่ยนเส้นทางนั้น - พิมพ์ออกมาให้คุณใช้
ระบบจะพิมพ์ Client ID ในบรรทัดคำสั่งและจะมีลักษณะดังนี้
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. สร้างฟังก์ชัน
ใน Cloud Shell ให้โคลนที่เก็บที่มีโค้ดตัวอย่างที่จำเป็นโดยเรียกใช้คำสั่งต่อไปนี้
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
เปลี่ยนไปที่ไดเรกทอรีที่จำเป็นและเปิด Cloud Shell ไว้ขณะทำตาม 2-3 ขั้นตอนถัดไป
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
ไปที่หน้า Google Cloud Functions โดยคลิกเมนูการนำทาง แล้วคลิก "Cloud Functions" |
|
คลิก "CREATE FUNCTION" ที่ด้านบนของหน้า |
|
ตั้งชื่อฟังก์ชันว่า "my-function" และปล่อยให้หน่วยความจำเป็นค่าเริ่มต้นที่ 256 MB |
|
ตั้งค่าทริกเกอร์เป็น "Cloud Storage" ปล่อยให้ประเภทเหตุการณ์เป็น "สิ้นสุด/สร้าง" และเรียกดู Bucket ที่คุณสร้างในขั้นตอนสร้าง Bucket ของ Cloud Storage |
|
ปล่อยให้ซอร์สโค้ดตั้งค่าเป็น "ตัวแก้ไขในบรรทัด" และตั้งค่ารันไทม์เป็น "Node.js 8" |
|
ใน Cloud Shell ให้เรียกใช้คำสั่งต่อไปนี้ ซึ่งจะเปิด index.js และ package.json ใน Cloud Shell Editor
cloudshell edit index.js package.json
คลิกแท็บ package.json คัดลอกโค้ดนั้น แล้ววางลงในส่วน package.json ของตัวแก้ไขแบบอินไลน์ของ Cloud Functions |
|
ตั้งค่า "ฟังก์ชันที่จะเรียกใช้" เป็น triggerDag |
|
คลิกแท็บ index.js คัดลอกโค้ด แล้ววางลงในส่วน index.js ของโปรแกรมแก้ไขแบบอินไลน์ของ Cloud Functions |
|
เปลี่ยน |
|
ใน Cloud Shell ให้เรียกใช้คำสั่งต่อไปนี้ โดยแทนที่ <your-environment-name> ด้วยชื่อของสภาพแวดล้อม Composer และ <your-composer-region> ด้วยภูมิภาคที่สภาพแวดล้อม Composer อยู่
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
เช่น หากสภาพแวดล้อมชื่อ my-composer-environment และอยู่ใน us-central1 คำสั่งจะเป็น
gcloud composer environments describe my-composer-environment --location us-central1
เอาต์พุตควรมีลักษณะดังนี้
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
ในเอาต์พุตนั้น ให้มองหาตัวแปรที่ชื่อ |
|
คลิกลิงก์แบบเลื่อนลง "เพิ่มเติม" แล้วเลือกภูมิภาคที่อยู่ใกล้คุณมากที่สุด |
|
เลือก "ลองดำเนินการในส่วนที่ล้มเหลวอีกครั้ง" |
|
คลิก "สร้าง" เพื่อสร้าง Cloud Function |
|
การก้าวผ่านโค้ด
โค้ดที่คุณคัดลอกจาก index.js จะมีลักษณะดังนี้
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
มาดูกันว่าเกิดอะไรขึ้น ฟังก์ชัน 3 อย่างในที่นี้ ได้แก่ triggerDag, authorizeIap และ makeIapPostRequest
triggerDag คือฟังก์ชันที่จะทริกเกอร์เมื่อเราอัปโหลดบางอย่างไปยังที่เก็บข้อมูล Cloud Storage ที่กำหนด ซึ่งเป็นที่ที่เรากำหนดค่าตัวแปรสำคัญที่ใช้ในคำขออื่นๆ เช่น PROJECT_ID, CLIENT_ID, WEBSERVER_ID และ DAG_NAME โดยจะโทรหา authorizeIap และ makeIapPostRequest
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap ส่งคำขอไปยังพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow โดยใช้บัญชีบริการและ "แลกเปลี่ยน" JWT เป็นโทเค็นรหัสที่จะใช้เพื่อตรวจสอบสิทธิ์ makeIapPostRequest
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest จะเรียกใช้เว็บเซิร์ฟเวอร์ Airflow เพื่อทริกเกอร์ composer_sample_trigger_response_dag. ชื่อ DAG จะฝังอยู่ใน URL ของเว็บเซิร์ฟเวอร์ Airflow ที่ส่งมาพร้อมกับพารามิเตอร์ url และ idToken คือโทเค็นที่เราได้รับในคำขอ authorizeIap
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. ตั้งค่า DAG
ใน Cloud Shell ให้เปลี่ยนไปที่ไดเรกทอรีที่มีเวิร์กโฟลว์ตัวอย่าง ซึ่งเป็นส่วนหนึ่งของ python-docs-samples ที่คุณดาวน์โหลดจาก GitHub ในขั้นตอนการรับรหัสไคลเอ็นต์
cd cd python-docs-samples/composer/workflows
อัปโหลด DAG ไปยัง Composer
อัปโหลด DAG ตัวอย่างไปยัง storage bucket ของ DAG ในสภาพแวดล้อม Composer ด้วยคำสั่งต่อไปนี้ โดยที่ <environment_name> คือชื่อสภาพแวดล้อม Composer และ <location> คือชื่อภูมิภาคที่สภาพแวดล้อมนั้นอยู่ trigger_response_dag.py คือ DAG ที่เราจะใช้
gcloud composer environments storage dags import \
--environment <environment_name> \
--location <location> \
--source trigger_response_dag.py
ตัวอย่างเช่น หากสภาพแวดล้อม Composer ชื่อ my-composer และอยู่ใน us-central1 คำสั่งจะเป็น
gcloud composer environments storage dags import \
--environment my-composer \
--location us-central1 \
--source trigger_response_dag.py
การดู DAG ทีละขั้นตอน
โค้ด DAG ใน trigger_response.py มีลักษณะดังนี้
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
ส่วน default_args มีอาร์กิวเมนต์เริ่มต้นตามที่โมเดล BaseOperator ใน Apache Airflow กำหนด คุณจะเห็นส่วนนี้พร้อมพารามิเตอร์เหล่านี้ใน DAG ของ Apache Airflow ปัจจุบัน owner ตั้งค่าเป็น Composer Example แต่คุณเปลี่ยนเป็นชื่อของคุณได้หากต้องการ depends_on_past แสดงให้เห็นว่า DAG นี้ไม่ได้ขึ้นอยู่กับ DAG ก่อนหน้า ส่วนอีเมลทั้ง 3 ส่วน ได้แก่ email, email_on_failure และ email_on_retry ได้รับการตั้งค่าไว้เพื่อไม่ให้มีการแจ้งเตือนทางอีเมลตามสถานะของ DAG นี้ DAG จะลองอีกครั้งเพียงครั้งเดียวเนื่องจากตั้งค่า retries เป็น 1 และจะลองอีกครั้งหลังจากผ่านไป 5 นาทีตาม retry_delay start_date โดยปกติจะกำหนดเวลาที่ควรเรียกใช้ DAG ร่วมกับ schedule_interval (ตั้งค่าในภายหลัง) แต่ในกรณีของ DAG นี้จะไม่เกี่ยวข้อง โดยตั้งค่าเป็นวันที่ 1 มกราคม 2017 แต่คุณจะตั้งค่าเป็นวันที่ในอดีตก็ได้
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
ส่วน with airflow.DAG จะกำหนดค่า DAG ที่จะเรียกใช้ โดยจะเรียกใช้ด้วยรหัสงาน composer_sample_trigger_response_dag อาร์กิวเมนต์เริ่มต้นจากส่วน default_args และที่สำคัญที่สุดคือมี schedule_interval เป็น None เราตั้งค่า schedule_interval เป็น None เนื่องจากเราทริกเกอร์ DAG นี้ด้วย Cloud Function ด้วยเหตุนี้ start_date ใน default_args จึงไม่เกี่ยวข้อง
เมื่อเรียกใช้ DAG จะพิมพ์การกำหนดค่าตามที่ระบุไว้ในตัวแปร print_gcs_info
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. ทดสอบฟังก์ชัน
เปิดสภาพแวดล้อม Composer แล้วคลิกลิงก์ Airflow ในแถวที่มีชื่อสภาพแวดล้อม |
|
เปิด |
|
เปิดแท็บแยกต่างหากแล้วอัปโหลดไฟล์ไปยัง Bucket ของ Cloud Storage ที่คุณสร้างไว้ก่อนหน้านี้และระบุเป็นทริกเกอร์สำหรับ Cloud Function คุณทำได้ผ่านคอนโซลหรือใช้คำสั่ง gsutil |
|
กลับไปที่แท็บที่มี UI ของ Airflow แล้วคลิกมุมมองกราฟ |
|
คลิก |
|
คลิก "ดูบันทึก" ที่ด้านขวาบนของเมนู |
|
ในบันทึก คุณจะเห็นข้อมูลเกี่ยวกับไฟล์ที่คุณอัปโหลดไปยัง Bucket ของ Cloud Storage |
|
ยินดีด้วย คุณเพิ่งทริกเกอร์ DAG ของ Airflow โดยใช้ Node.js และ Google Cloud Functions
7. ล้างข้อมูล
โปรดดำเนินการดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP สำหรับทรัพยากรที่ใช้ในคู่มือเริ่มใช้งานฉบับย่อนี้
- (ไม่บังคับ) หากต้องการบันทึกข้อมูล ให้ดาวน์โหลดข้อมูลจาก Bucket ของ Cloud Storage สำหรับสภาพแวดล้อม Cloud Composer และ Bucket ของพื้นที่เก็บข้อมูลที่คุณสร้างขึ้นสำหรับคู่มือเริ่มใช้งานฉบับย่อนี้
- ลบ Bucket ของ Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างไว้
- ลบสภาพแวดล้อม Cloud Composer โปรดทราบว่าการลบสภาพแวดล้อมจะไม่ลบบัคเก็ตพื้นที่เก็บข้อมูลสำหรับสภาพแวดล้อม
- (ไม่บังคับ) การประมวลผลแบบ Serverless จะให้การเรียกใช้ 2 ล้านครั้งแรกต่อเดือนฟรี และเมื่อปรับขนาดฟังก์ชันเป็น 0 คุณจะไม่ถูกเรียกเก็บเงิน (ดูรายละเอียดเพิ่มเติมได้ที่ราคา) อย่างไรก็ตาม หากต้องการลบ Cloud Function ให้คลิก "ลบ" ที่ด้านขวาบนของหน้าภาพรวมของฟังก์ชัน

นอกจากนี้ คุณยังเลือกที่จะลบโปรเจกต์ได้ด้วย โดยทำดังนี้
- ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
- ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
- พิมพ์รหัสโปรเจ็กต์ในช่อง แล้วคลิกปิดเพื่อลบโปรเจ็กต์


























