1. บทนำ
Apache Airflow ได้รับการออกแบบมาให้เรียกใช้ DAG ตามกำหนดการอย่างสม่ำเสมอ แต่คุณยังทริกเกอร์ DAG เพื่อตอบสนองต่อเหตุการณ์ได้ด้วย เช่น การเปลี่ยนแปลงในที่เก็บข้อมูล Cloud Storage หรือข้อความที่พุชไปยัง Cloud Pub/Sub Cloud Functions จะทริกเกอร์ DAG ของ Cloud Composer ได้เช่นกัน
ตัวอย่างในห้องทดลองนี้จะเรียกใช้ DAG แบบง่ายทุกครั้งที่มีการเปลี่ยนแปลงในที่เก็บข้อมูล Cloud Storage DAG นี้ใช้ BashOperator เพื่อเรียกใช้คำสั่ง Bash เพื่อพิมพ์ข้อมูลการเปลี่ยนแปลงเกี่ยวกับสิ่งที่อัปโหลดไปยังที่เก็บข้อมูล Cloud Storage
ก่อนเริ่ม Lab นี้ เราขอแนะนำให้ทำ Codelab ของข้อมูลเบื้องต้นเกี่ยวกับ Cloud Composer และการเริ่มต้นใช้งาน Cloud Functions ให้เสร็จสมบูรณ์ หากคุณสร้างสภาพแวดล้อมของคอมโพสเซอร์ในการแนะนำ Codelab ของ Cloud Composer คุณจะใช้สภาพแวดล้อมดังกล่าวในห้องทดลองนี้ได้
สิ่งที่คุณจะสร้าง
ใน Codelab นี้ คุณจะ
- อัปโหลดไฟล์ไปยัง Google Cloud Storage ซึ่งจะ
- ทริกเกอร์ฟังก์ชัน Google Cloud โดยใช้รันไทม์ของ Node.JS
- ฟังก์ชันนี้จะดำเนินการ DAG ใน Google Cloud Composer
- ที่เรียกใช้คำสั่ง Bash แบบง่ายๆ เพื่อพิมพ์การเปลี่ยนแปลงไปยังที่เก็บข้อมูล Google Cloud Storage
สิ่งที่คุณจะได้เรียนรู้
- วิธีทริกเกอร์ Apache Airflow DAG โดยใช้ Google Cloud Functions + Node.js
สิ่งที่คุณต้องมี
- บัญชี GCP
- ความเข้าใจเบื้องต้นเกี่ยวกับ JavaScript
- ความรู้พื้นฐานเกี่ยวกับ Cloud Composer/Airflow และ Cloud Functions
- สบายเมื่อใช้คำสั่ง CLI
2. การตั้งค่า GCP
เลือกหรือสร้างโปรเจ็กต์
เลือกหรือสร้างโปรเจ็กต์ Google Cloud Platform หากคุณกำลังสร้างโปรเจ็กต์ใหม่ ให้ทำตามขั้นตอนที่นี่
จดรหัสโปรเจ็กต์ไว้ ซึ่งจะใช้ในขั้นตอนถัดไป
หากคุณกำลังสร้างโปรเจ็กต์ใหม่ รหัสโปรเจ็กต์จะอยู่ใต้ชื่อโปรเจ็กต์ในหน้าการสร้าง | |
หากสร้างโปรเจ็กต์ไว้แล้ว คุณจะดูรหัสได้ในหน้าแรกของคอนโซลในการ์ดข้อมูลโปรเจ็กต์ |
เปิดใช้ API
สร้างสภาพแวดล้อมคอมโพสเซอร์
สร้างสภาพแวดล้อม Cloud Composer ด้วยการกำหนดค่าต่อไปนี้
การกําหนดค่าอื่นๆ ทั้งหมดจะยังคงเป็นค่าเริ่มต้นได้ คลิก "สร้าง" ที่ด้านล่าง ให้จดชื่อสภาพแวดล้อมของคอมโพสเซอร์และตำแหน่งไว้ เนื่องจากคุณจะต้องใช้ข้อมูลดังกล่าวในขั้นตอนต่อไป |
สร้างที่เก็บข้อมูล Cloud Storage
สร้างที่เก็บข้อมูล Cloud Storage ในโปรเจ็กต์ด้วยการกำหนดค่าต่อไปนี้
กด "สร้าง" เมื่อพร้อม อย่าลืมจดชื่อของที่เก็บข้อมูล Cloud Storage ไว้สำหรับขั้นตอนถัดไป |
3. การตั้งค่า Google Cloud Functions (GCF)
เราจะเรียกใช้คำสั่งใน Google Cloud Shell ในการตั้งค่า GCF
แม้ว่า Google Cloud จะทำงานจากระยะไกลได้จากแล็ปท็อปโดยใช้เครื่องมือบรรทัดคำสั่ง gcloud แต่เราจะใช้ Google Cloud Shell ซึ่งเป็นสภาพแวดล้อมของบรรทัดคำสั่งที่ทำงานในระบบคลาวด์ใน Codelab
เครื่องเสมือนแบบ Debian นี้เต็มไปด้วยเครื่องมือการพัฒนาทั้งหมดที่คุณต้องการ โดยมีไดเรกทอรีหลักขนาด 5 GB ที่ทำงานอย่างต่อเนื่องบน Google Cloud ซึ่งช่วยเพิ่มประสิทธิภาพของเครือข่ายและการตรวจสอบสิทธิ์ได้อย่างมาก ซึ่งหมายความว่าสิ่งที่คุณต้องมีสำหรับ Codelab นี้คือเบราว์เซอร์ (ใช่แล้ว ทั้งหมดนี้ทำงานได้บน Chromebook)
หากต้องการเปิดใช้งาน Google Cloud Shell จาก Play Console ให้คลิกปุ่มด้านขวาบน (ใช้เวลาไม่นานในการจัดสรรและเชื่อมต่อกับสภาพแวดล้อม) ดังนี้ |
ให้สิทธิ์ BLOB Signing แก่บัญชีบริการ Cloud Functions
หากต้องการให้ GCF ตรวจสอบสิทธิ์กับ Cloud IAP ซึ่งเป็นพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow คุณต้องมอบบทบาท Service Account Token Creator
ให้บัญชีบริการ Appspot แก่ GCF โดยเรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell แทนที่ชื่อโปรเจ็กต์สำหรับ <your-project-id>
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
ตัวอย่างเช่น หากโปรเจ็กต์ของคุณชื่อ my-project
คำสั่งจะเป็น
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
การรับรหัสไคลเอ็นต์
ในการสร้างโทเค็นเพื่อตรวจสอบสิทธิ์กับ Cloud IAP ฟังก์ชันนี้ต้องใช้รหัสไคลเอ็นต์ของพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow Cloud Composer API จะไม่ให้ข้อมูลนี้โดยตรง ให้ส่งคำขอที่ไม่ผ่านการตรวจสอบสิทธิ์ไปยังเว็บเซิร์ฟเวอร์ Airflow แทนและบันทึกรหัสไคลเอ็นต์จาก URL เปลี่ยนเส้นทาง โดยการเรียกใช้ไฟล์ Python โดยใช้ Cloud Shell เพื่อจับรหัสไคลเอ็นต์
ดาวน์โหลดโค้ดที่จำเป็นจาก GitHub โดยเรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
หากคุณได้รับข้อผิดพลาดเนื่องจากมีไดเรกทอรีนี้อยู่แล้ว ให้อัปเดตเป็นเวอร์ชันล่าสุดโดยเรียกใช้คำสั่งต่อไปนี้
cd python-docs-samples/ git pull origin master
เปลี่ยนเป็นไดเรกทอรีที่เหมาะสมโดยเรียกใช้
cd python-docs-samples/composer/rest
เรียกใช้โค้ด Python เพื่อรับรหัสไคลเอ็นต์ โดยแทนที่ชื่อโปรเจ็กต์สำหรับ <your-project-id>
, ตำแหน่งของสภาพแวดล้อมคอมโพสเซอร์ที่คุณสร้างไว้ก่อนหน้านี้สำหรับ <your-composer-location>
และชื่อของสภาพแวดล้อมคอมโพสเซอร์ที่คุณสร้างไว้ก่อนหน้านี้สำหรับ <your-composer-environment>
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
ตัวอย่างเช่น หากชื่อโปรเจ็กต์คือ my-project
ตำแหน่งคอมโพสเซอร์คือ us-central1
และชื่อสภาพแวดล้อมคือ my-composer
คำสั่งจะเป็น
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
ทำสิ่งต่อไปนี้
- ตรวจสอบสิทธิ์ด้วย Google Cloud
- ส่งคำขอ HTTP ที่ไม่ได้ตรวจสอบสิทธิ์ไปยังเว็บเซิร์ฟเวอร์ Airflow เพื่อรับ URI การเปลี่ยนเส้นทาง
- ดึงข้อมูลพารามิเตอร์การค้นหา
client_id
จากการเปลี่ยนเส้นทางนั้น - พิมพ์ออกมาให้คุณใช้
รหัสลูกค้าจะพิมพ์ออกมาในบรรทัดคำสั่งและมีลักษณะดังนี้
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. สร้างฟังก์ชัน
โคลนที่เก็บด้วยโค้ดตัวอย่างที่จำเป็นใน Cloud Shell โดยการเรียกใช้
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
เปลี่ยนเป็นไดเรกทอรีที่จำเป็นและเปิด Cloud Shell ทิ้งไว้ในขณะที่ทำขั้นตอนถัดไป
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
ไปที่หน้า Google Cloud Functions ด้วยการคลิกเมนูการนำทาง จากนั้นคลิก "Cloud Functions" | |
คลิก "สร้างฟังก์ชัน" ที่ด้านบนของหน้า | |
ตั้งชื่อฟังก์ชันว่า "my-function" และใช้หน่วยความจำตามค่าเริ่มต้นที่ 256 MB | |
ตั้งค่าทริกเกอร์เป็น "Cloud Storage" จากนั้นปล่อยให้ประเภทเหตุการณ์เป็น "สรุป/สร้าง" จากนั้นเรียกดูที่เก็บข้อมูลที่คุณสร้างไว้ในขั้นตอนสร้างที่เก็บข้อมูล Cloud Storage | |
ปล่อยให้การตั้งค่าซอร์สโค้ดเป็น "ตัวแก้ไขในบรรทัด" และตั้งค่ารันไทม์เป็น "Node.js 8" |
เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell ซึ่งจะเปิดindex.js และpackage.json ใน Cloud Shell Editor
cloudshell edit index.js package.json
คลิกที่แท็บpackage.json คัดลอกโค้ดนั้นแล้ววางลงในส่วนpackage.jsonของเครื่องมือแก้ไขในบรรทัดของ Cloud Functions | |
ตั้งค่า "Function to Execute" เพื่อทริกเกอร์Dag | |
คลิกที่แท็บindex.js จากนั้นคัดลอกโค้ดแล้ววางลงในส่วน index.js ของเครื่องมือแก้ไขในบรรทัดสำหรับ Cloud Functions | |
เปลี่ยน |
เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell โดยแทนที่ <your-environment-name> ที่มีชื่อสภาพแวดล้อมคอมโพสเซอร์และ <your-composer-region> ในภูมิภาคของสภาพแวดล้อมคอมโพสเซอร์
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
ตัวอย่างเช่น หากสภาพแวดล้อมของคุณชื่อ my-composer-environment
และอยู่ใน us-central1
คำสั่งของคุณจะเป็น
gcloud composer environments describe my-composer-environment --location us-central1
ผลลัพธ์ควรมีลักษณะเช่นนี้
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
ในเอาต์พุตดังกล่าว ให้ค้นหาตัวแปรชื่อ | |
คลิกปุ่ม "เพิ่มเติม" เมนูแบบเลื่อนลง แล้วเลือก ภูมิภาค ทางภูมิศาสตร์ที่ใกล้กับคุณมากที่สุด | |
เลือก "ลองอีกครั้งเมื่อล้มเหลว" | |
คลิก "สร้าง" เพื่อสร้าง Cloud Function |
ทำความเข้าใจโค้ด
โค้ดที่คุณคัดลอกจาก index.js จะมีลักษณะดังนี้
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
มาดูกันว่ามีอะไรเกิดขึ้นบ้าง ฟังก์ชันมี 3 ฟังก์ชัน ได้แก่ triggerDag
, authorizeIap
และ makeIapPostRequest
triggerDag
คือฟังก์ชันที่ทริกเกอร์เมื่อเราอัปโหลดบางสิ่งไปยังที่เก็บข้อมูล Cloud Storage ที่กำหนดไว้ ซึ่งเป็นที่ที่เรากำหนดค่าตัวแปรสำคัญที่ใช้ในคำขออื่นๆ เช่น PROJECT_ID
, CLIENT_ID
, WEBSERVER_ID
และ DAG_NAME
โทรติดต่อ authorizeIap
และ makeIapPostRequest
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
ส่งคำขอไปยังพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow โดยใช้บัญชีบริการและ "แลกเปลี่ยน" JWT สำหรับโทเค็นรหัสที่จะใช้ในการตรวจสอบสิทธิ์ makeIapPostRequest
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
เรียกเว็บเซิร์ฟเวอร์ Airflow เพื่อทริกเกอร์ composer_sample_trigger_response_dag.
ชื่อ DAG ฝังอยู่ใน URL เว็บเซิร์ฟเวอร์ Airflow ที่ส่งผ่านพารามิเตอร์ url
และ idToken
คือโทเค็นที่เราได้รับในคำขอ authorizeIap
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. ตั้งค่า DAG ของคุณ
ใน Cloud Shell ให้เปลี่ยนเป็นไดเรกทอรีที่มีเวิร์กโฟลว์ตัวอย่าง ซึ่งเป็นส่วนหนึ่งของตัวอย่าง Python-docs ที่คุณดาวน์โหลดจาก GitHub ในขั้นตอนการรับรหัสไคลเอ็นต์
cd cd python-docs-samples/composer/workflows
อัปโหลด DAG ไปยังคอมโพสเซอร์
อัปโหลด DAG ตัวอย่างไปยังที่เก็บข้อมูลของพื้นที่เก็บข้อมูล DAG ของสภาพแวดล้อมคอมโพสเซอร์ด้วยคำสั่งต่อไปนี้ โดยที่ <environment_name>
คือชื่อของสภาพแวดล้อม Composer ของคุณ และ <location>
คือชื่อของภูมิภาคที่ครับ trigger_response_dag.py
คือ DAG ที่เราจะร่วมงานด้วย
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
ตัวอย่างเช่น หากสภาพแวดล้อมคอมโพสเซอร์ของคุณชื่อ my-composer
และอยู่ใน us-central1
คำสั่งของคุณจะเป็น
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
การก้าวผ่าน DAG
โค้ด DAG ใน trigger_response.py
มีลักษณะดังนี้
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
ส่วน default_args
มีอาร์กิวเมนต์เริ่มต้นตามที่โมเดล BaseOperator ต้องการใน Apache Airflow คุณจะเห็นส่วนนี้พร้อมพารามิเตอร์เหล่านี้ใน Apache Airflow DAG ทั้งหมด ขณะนี้ owner
ตั้งค่าเป็น Composer Example
แต่คุณสามารถเปลี่ยนเป็นชื่อของคุณได้หากต้องการ depends_on_past
แสดงให้เราทราบว่า DAG นี้ไม่ได้อาศัย DAG ใดๆ ก่อนหน้านี้ ส่วนอีเมล 3 ส่วน ได้แก่ email
, email_on_failure
และ email_on_retry
ได้รับการตั้งค่าไว้เพื่อให้ไม่มีการแจ้งเตือนทางอีเมลตามสถานะของ DAG นี้ DAG จะลองอีกครั้งเพียงครั้งเดียว เนื่องจากมีการตั้งค่า retries
เป็น 1 และจะดำเนินการดังกล่าวหลังจากผ่านไป 5 นาทีต่อ retry_delay
โดยปกติ start_date
จะกำหนดเวลาที่ควรเรียกใช้ DAG ร่วมกับ schedule_interval
(ตั้งค่าในภายหลัง) แต่ในกรณีของ DAG นี้ไม่เกี่ยวข้อง ระบบได้ตั้งค่าให้เป็นวันที่ 1 มกราคม 2017 แต่สามารถตั้งค่าเป็นวันที่ในอดีตได้
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
ส่วน with airflow.DAG
กำหนดค่า DAG ที่จะเรียกใช้ โดยจะเรียกใช้ด้วยรหัสงาน composer_sample_trigger_response_dag
ซึ่งเป็นอาร์กิวเมนต์เริ่มต้นจากส่วน default_args
และที่สำคัญที่สุดคือ schedule_interval
ของ None
schedule_interval
ได้รับการตั้งค่าเป็น None
เนื่องจากเรากำลังทริกเกอร์ DAG นี้โดยเฉพาะด้วย Cloud Function นี่เป็นสาเหตุที่ start_date
ใน default_args
ไม่เกี่ยวข้อง
เมื่อเรียกใช้ DAG จะพิมพ์การกำหนดค่าตามที่กำหนดไว้ในตัวแปร print_gcs_info
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. ทดสอบฟังก์ชันของคุณ
เปิดสภาพแวดล้อมคอมโพสเซอร์ และในแถวที่มีชื่อสภาพแวดล้อมของคุณ ให้คลิกลิงก์ Airflow | |
เปิด | |
เปิดแท็บแยกต่างหากและอัปโหลดไฟล์ไปยังที่เก็บข้อมูล Cloud Storage ที่คุณสร้างไว้ก่อนหน้านี้และระบุไว้เป็นทริกเกอร์สำหรับ Cloud Function คุณสามารถดำเนินการดังกล่าวได้ผ่านคอนโซลหรือใช้คำสั่ง gsutil | |
กลับไปที่แท็บที่มี Airflow UI แล้วคลิกมุมมองกราฟ | |
คลิกงาน | |
คลิก "ดูบันทึก" ที่ด้านขวาบนของเมนู | |
คุณจะเห็นข้อมูลเกี่ยวกับไฟล์ที่อัปโหลดไปยังที่เก็บข้อมูล Cloud Storage ในบันทึก |
ยินดีด้วย คุณเพิ่งทริกเกอร์ Airflow DAG โดยใช้ Node.js และ Google Cloud Functions
7. ล้างข้อมูล
โปรดทำดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP สำหรับทรัพยากรที่ใช้ในการเริ่มต้นอย่างรวดเร็วนี้
- (ไม่บังคับ) หากต้องการบันทึกข้อมูล ให้ดาวน์โหลดข้อมูลจากที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อม Cloud Composer และที่เก็บข้อมูลของพื้นที่เก็บข้อมูลที่คุณสร้างขึ้นสำหรับการเริ่มต้นอย่างรวดเร็วนี้
- ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างขึ้น
- ลบสภาพแวดล้อม Cloud Composer โปรดทราบว่าการลบสภาพแวดล้อมจะไม่ลบที่เก็บข้อมูลของพื้นที่เก็บข้อมูลสำหรับสภาพแวดล้อมนั้น
- (ไม่บังคับ) เมื่อใช้การประมวลผลแบบ Serverless การเรียกใช้ 2 ล้านครั้งแรกต่อเดือนจะไม่มีค่าใช้จ่าย และเมื่อคุณปรับขนาดฟังก์ชันเป็น 0 ระบบจะไม่เรียกเก็บเงินจากคุณ (ดูรายละเอียดเพิ่มเติมที่ราคา) แต่หากต้องการลบ Cloud Function ก็ให้คลิก "ลบ" ที่ด้านขวาบนของหน้าภาพรวมสำหรับฟังก์ชัน
นอกจากนี้ คุณยังเลือกลบโปรเจ็กต์ได้ด้วย โดยทำดังนี้
- ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
- ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
- ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเครื่องเพื่อลบโปรเจ็กต์