การทริกเกอร์ DAG ด้วย Node.JS และ Google Cloud Functions

1. บทนำ

Apache Airflow ได้รับการออกแบบมาให้เรียกใช้ DAG ตามกำหนดการอย่างสม่ำเสมอ แต่คุณยังทริกเกอร์ DAG เพื่อตอบสนองต่อเหตุการณ์ได้ด้วย เช่น การเปลี่ยนแปลงในที่เก็บข้อมูล Cloud Storage หรือข้อความที่พุชไปยัง Cloud Pub/Sub Cloud Functions จะทริกเกอร์ DAG ของ Cloud Composer ได้เช่นกัน

ตัวอย่างในห้องทดลองนี้จะเรียกใช้ DAG แบบง่ายทุกครั้งที่มีการเปลี่ยนแปลงในที่เก็บข้อมูล Cloud Storage DAG นี้ใช้ BashOperator เพื่อเรียกใช้คำสั่ง Bash เพื่อพิมพ์ข้อมูลการเปลี่ยนแปลงเกี่ยวกับสิ่งที่อัปโหลดไปยังที่เก็บข้อมูล Cloud Storage

ก่อนเริ่ม Lab นี้ เราขอแนะนำให้ทำ Codelab ของข้อมูลเบื้องต้นเกี่ยวกับ Cloud Composer และการเริ่มต้นใช้งาน Cloud Functions ให้เสร็จสมบูรณ์ หากคุณสร้างสภาพแวดล้อมของคอมโพสเซอร์ในการแนะนำ Codelab ของ Cloud Composer คุณจะใช้สภาพแวดล้อมดังกล่าวในห้องทดลองนี้ได้

สิ่งที่คุณจะสร้าง

ใน Codelab นี้ คุณจะ

  1. อัปโหลดไฟล์ไปยัง Google Cloud Storage ซึ่งจะ
  2. ทริกเกอร์ฟังก์ชัน Google Cloud โดยใช้รันไทม์ของ Node.JS
  3. ฟังก์ชันนี้จะดำเนินการ DAG ใน Google Cloud Composer
  4. ที่เรียกใช้คำสั่ง Bash แบบง่ายๆ เพื่อพิมพ์การเปลี่ยนแปลงไปยังที่เก็บข้อมูล Google Cloud Storage

1d3d3736624a923f.png

สิ่งที่คุณจะได้เรียนรู้

  • วิธีทริกเกอร์ Apache Airflow DAG โดยใช้ Google Cloud Functions + Node.js

สิ่งที่คุณต้องมี

  • บัญชี GCP
  • ความเข้าใจเบื้องต้นเกี่ยวกับ JavaScript
  • ความรู้พื้นฐานเกี่ยวกับ Cloud Composer/Airflow และ Cloud Functions
  • สบายเมื่อใช้คำสั่ง CLI

2. การตั้งค่า GCP

เลือกหรือสร้างโปรเจ็กต์

เลือกหรือสร้างโปรเจ็กต์ Google Cloud Platform หากคุณกำลังสร้างโปรเจ็กต์ใหม่ ให้ทำตามขั้นตอนที่นี่

จดรหัสโปรเจ็กต์ไว้ ซึ่งจะใช้ในขั้นตอนถัดไป

หากคุณกำลังสร้างโปรเจ็กต์ใหม่ รหัสโปรเจ็กต์จะอยู่ใต้ชื่อโปรเจ็กต์ในหน้าการสร้าง

หากสร้างโปรเจ็กต์ไว้แล้ว คุณจะดูรหัสได้ในหน้าแรกของคอนโซลในการ์ดข้อมูลโปรเจ็กต์

เปิดใช้ API

เปิดใช้ Cloud Composer, Google Cloud Functions และ Cloud Identity และ Google Identity and Access Management (IAM) API

สร้างสภาพแวดล้อมคอมโพสเซอร์

สร้างสภาพแวดล้อม Cloud Composer ด้วยการกำหนดค่าต่อไปนี้

  • ชื่อ: my-composer-environment
  • ตำแหน่ง: อะไรก็ตามที่ตั้งอยู่ใกล้ตัวคุณมากที่สุด
  • โซน: โซนใดก็ได้ในภูมิภาคนั้น

การกําหนดค่าอื่นๆ ทั้งหมดจะยังคงเป็นค่าเริ่มต้นได้ คลิก "สร้าง" ที่ด้านล่าง ให้จดชื่อสภาพแวดล้อมของคอมโพสเซอร์และตำแหน่งไว้ เนื่องจากคุณจะต้องใช้ข้อมูลดังกล่าวในขั้นตอนต่อไป

สร้างที่เก็บข้อมูล Cloud Storage

สร้างที่เก็บข้อมูล Cloud Storage ในโปรเจ็กต์ด้วยการกำหนดค่าต่อไปนี้

  • ชื่อ: <รหัสโปรเจ็กต์ของคุณ>
  • คลาสพื้นที่เก็บข้อมูลเริ่มต้น: หลายภูมิภาค
  • ตำแหน่ง: ไม่ว่าตำแหน่งใดก็ตามจะอยู่ใกล้กับภูมิภาคของ Cloud Composer มากที่สุด
  • โมเดลการควบคุมการเข้าถึง: ตั้งค่าสิทธิ์ระดับออบเจ็กต์และระดับที่เก็บข้อมูล

กด "สร้าง" เมื่อพร้อม อย่าลืมจดชื่อของที่เก็บข้อมูล Cloud Storage ไว้สำหรับขั้นตอนถัดไป

3. การตั้งค่า Google Cloud Functions (GCF)

เราจะเรียกใช้คำสั่งใน Google Cloud Shell ในการตั้งค่า GCF

แม้ว่า Google Cloud จะทำงานจากระยะไกลได้จากแล็ปท็อปโดยใช้เครื่องมือบรรทัดคำสั่ง gcloud แต่เราจะใช้ Google Cloud Shell ซึ่งเป็นสภาพแวดล้อมของบรรทัดคำสั่งที่ทำงานในระบบคลาวด์ใน Codelab

เครื่องเสมือนแบบ Debian นี้เต็มไปด้วยเครื่องมือการพัฒนาทั้งหมดที่คุณต้องการ โดยมีไดเรกทอรีหลักขนาด 5 GB ที่ทำงานอย่างต่อเนื่องบน Google Cloud ซึ่งช่วยเพิ่มประสิทธิภาพของเครือข่ายและการตรวจสอบสิทธิ์ได้อย่างมาก ซึ่งหมายความว่าสิ่งที่คุณต้องมีสำหรับ Codelab นี้คือเบราว์เซอร์ (ใช่แล้ว ทั้งหมดนี้ทำงานได้บน Chromebook)

หากต้องการเปิดใช้งาน Google Cloud Shell จาก Play Console ให้คลิกปุ่มด้านขวาบน (ใช้เวลาไม่นานในการจัดสรรและเชื่อมต่อกับสภาพแวดล้อม) ดังนี้

ให้สิทธิ์ BLOB Signing แก่บัญชีบริการ Cloud Functions

หากต้องการให้ GCF ตรวจสอบสิทธิ์กับ Cloud IAP ซึ่งเป็นพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow คุณต้องมอบบทบาท Service Account Token Creator ให้บัญชีบริการ Appspot แก่ GCF โดยเรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell แทนที่ชื่อโปรเจ็กต์สำหรับ <your-project-id>

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

ตัวอย่างเช่น หากโปรเจ็กต์ของคุณชื่อ my-project คำสั่งจะเป็น

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

การรับรหัสไคลเอ็นต์

ในการสร้างโทเค็นเพื่อตรวจสอบสิทธิ์กับ Cloud IAP ฟังก์ชันนี้ต้องใช้รหัสไคลเอ็นต์ของพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow Cloud Composer API จะไม่ให้ข้อมูลนี้โดยตรง ให้ส่งคำขอที่ไม่ผ่านการตรวจสอบสิทธิ์ไปยังเว็บเซิร์ฟเวอร์ Airflow แทนและบันทึกรหัสไคลเอ็นต์จาก URL เปลี่ยนเส้นทาง โดยการเรียกใช้ไฟล์ Python โดยใช้ Cloud Shell เพื่อจับรหัสไคลเอ็นต์

ดาวน์โหลดโค้ดที่จำเป็นจาก GitHub โดยเรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

หากคุณได้รับข้อผิดพลาดเนื่องจากมีไดเรกทอรีนี้อยู่แล้ว ให้อัปเดตเป็นเวอร์ชันล่าสุดโดยเรียกใช้คำสั่งต่อไปนี้

cd python-docs-samples/
git pull origin master

เปลี่ยนเป็นไดเรกทอรีที่เหมาะสมโดยเรียกใช้

cd python-docs-samples/composer/rest

เรียกใช้โค้ด Python เพื่อรับรหัสไคลเอ็นต์ โดยแทนที่ชื่อโปรเจ็กต์สำหรับ <your-project-id>, ตำแหน่งของสภาพแวดล้อมคอมโพสเซอร์ที่คุณสร้างไว้ก่อนหน้านี้สำหรับ <your-composer-location> และชื่อของสภาพแวดล้อมคอมโพสเซอร์ที่คุณสร้างไว้ก่อนหน้านี้สำหรับ <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

ตัวอย่างเช่น หากชื่อโปรเจ็กต์คือ my-project ตำแหน่งคอมโพสเซอร์คือ us-central1 และชื่อสภาพแวดล้อมคือ my-composer คำสั่งจะเป็น

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py ทำสิ่งต่อไปนี้

  • ตรวจสอบสิทธิ์ด้วย Google Cloud
  • ส่งคำขอ HTTP ที่ไม่ได้ตรวจสอบสิทธิ์ไปยังเว็บเซิร์ฟเวอร์ Airflow เพื่อรับ URI การเปลี่ยนเส้นทาง
  • ดึงข้อมูลพารามิเตอร์การค้นหา client_id จากการเปลี่ยนเส้นทางนั้น
  • พิมพ์ออกมาให้คุณใช้

รหัสลูกค้าจะพิมพ์ออกมาในบรรทัดคำสั่งและมีลักษณะดังนี้

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. สร้างฟังก์ชัน

โคลนที่เก็บด้วยโค้ดตัวอย่างที่จำเป็นใน Cloud Shell โดยการเรียกใช้

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

เปลี่ยนเป็นไดเรกทอรีที่จำเป็นและเปิด Cloud Shell ทิ้งไว้ในขณะที่ทำขั้นตอนถัดไป

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

ไปที่หน้า Google Cloud Functions ด้วยการคลิกเมนูการนำทาง จากนั้นคลิก "Cloud Functions"

คลิก "สร้างฟังก์ชัน" ที่ด้านบนของหน้า

ตั้งชื่อฟังก์ชันว่า "my-function" และใช้หน่วยความจำตามค่าเริ่มต้นที่ 256 MB

ตั้งค่าทริกเกอร์เป็น "Cloud Storage" จากนั้นปล่อยให้ประเภทเหตุการณ์เป็น "สรุป/สร้าง" จากนั้นเรียกดูที่เก็บข้อมูลที่คุณสร้างไว้ในขั้นตอนสร้างที่เก็บข้อมูล Cloud Storage

ปล่อยให้การตั้งค่าซอร์สโค้ดเป็น "ตัวแก้ไขในบรรทัด" และตั้งค่ารันไทม์เป็น "Node.js 8"

เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell ซึ่งจะเปิดindex.js และpackage.json ใน Cloud Shell Editor

cloudshell edit index.js package.json

คลิกที่แท็บpackage.json คัดลอกโค้ดนั้นแล้ววางลงในส่วนpackage.jsonของเครื่องมือแก้ไขในบรรทัดของ Cloud Functions

ตั้งค่า "Function to Execute" เพื่อทริกเกอร์Dag

คลิกที่แท็บindex.js จากนั้นคัดลอกโค้ดแล้ววางลงในส่วน index.js ของเครื่องมือแก้ไขในบรรทัดสำหรับ Cloud Functions

เปลี่ยน PROJECT_ID เป็นรหัสโปรเจ็กต์ จากนั้นเปลี่ยน CLIENT_ID เป็นรหัสไคลเอ็นต์ที่คุณบันทึกไว้จากขั้นตอนรับรหัสไคลเอ็นต์ อย่าคลิก "สร้าง" แต่ก็ยังมีอีกหลายสิ่งที่ต้องกรอก!

เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell โดยแทนที่ <your-environment-name> ที่มีชื่อสภาพแวดล้อมคอมโพสเซอร์และ <your-composer-region> ในภูมิภาคของสภาพแวดล้อมคอมโพสเซอร์

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

ตัวอย่างเช่น หากสภาพแวดล้อมของคุณชื่อ my-composer-environment และอยู่ใน us-central1 คำสั่งของคุณจะเป็น

gcloud composer environments describe my-composer-environment --location us-central1

ผลลัพธ์ควรมีลักษณะเช่นนี้

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

ในเอาต์พุตดังกล่าว ให้ค้นหาตัวแปรชื่อ airflowUri ในโค้ด index.js ให้เปลี่ยน WEBSERVER_ID เป็นรหัสเว็บเซิร์ฟเวอร์ Airflow โดยเป็นส่วนหนึ่งของตัวแปร airflowUri ที่จะมี "-tp" ในตอนท้าย เช่น abc123efghi456k-tp

คลิกปุ่ม "เพิ่มเติม" เมนูแบบเลื่อนลง แล้วเลือก ภูมิภาค ทางภูมิศาสตร์ที่ใกล้กับคุณมากที่สุด

เลือก "ลองอีกครั้งเมื่อล้มเหลว"

คลิก "สร้าง" เพื่อสร้าง Cloud Function

ทำความเข้าใจโค้ด

โค้ดที่คุณคัดลอกจาก index.js จะมีลักษณะดังนี้

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

มาดูกันว่ามีอะไรเกิดขึ้นบ้าง ฟังก์ชันมี 3 ฟังก์ชัน ได้แก่ triggerDag, authorizeIap และ makeIapPostRequest

triggerDag คือฟังก์ชันที่ทริกเกอร์เมื่อเราอัปโหลดบางสิ่งไปยังที่เก็บข้อมูล Cloud Storage ที่กำหนดไว้ ซึ่งเป็นที่ที่เรากำหนดค่าตัวแปรสำคัญที่ใช้ในคำขออื่นๆ เช่น PROJECT_ID, CLIENT_ID, WEBSERVER_ID และ DAG_NAME โทรติดต่อ authorizeIap และ makeIapPostRequest

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap ส่งคำขอไปยังพร็อกซีที่ปกป้องเว็บเซิร์ฟเวอร์ Airflow โดยใช้บัญชีบริการและ "แลกเปลี่ยน" JWT สำหรับโทเค็นรหัสที่จะใช้ในการตรวจสอบสิทธิ์ makeIapPostRequest

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest เรียกเว็บเซิร์ฟเวอร์ Airflow เพื่อทริกเกอร์ composer_sample_trigger_response_dag. ชื่อ DAG ฝังอยู่ใน URL เว็บเซิร์ฟเวอร์ Airflow ที่ส่งผ่านพารามิเตอร์ url และ idToken คือโทเค็นที่เราได้รับในคำขอ authorizeIap

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. ตั้งค่า DAG ของคุณ

ใน Cloud Shell ให้เปลี่ยนเป็นไดเรกทอรีที่มีเวิร์กโฟลว์ตัวอย่าง ซึ่งเป็นส่วนหนึ่งของตัวอย่าง Python-docs ที่คุณดาวน์โหลดจาก GitHub ในขั้นตอนการรับรหัสไคลเอ็นต์

cd
cd python-docs-samples/composer/workflows

อัปโหลด DAG ไปยังคอมโพสเซอร์

อัปโหลด DAG ตัวอย่างไปยังที่เก็บข้อมูลของพื้นที่เก็บข้อมูล DAG ของสภาพแวดล้อมคอมโพสเซอร์ด้วยคำสั่งต่อไปนี้ โดยที่ <environment_name> คือชื่อของสภาพแวดล้อม Composer ของคุณ และ <location> คือชื่อของภูมิภาคที่ครับ trigger_response_dag.py คือ DAG ที่เราจะร่วมงานด้วย

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

ตัวอย่างเช่น หากสภาพแวดล้อมคอมโพสเซอร์ของคุณชื่อ my-composer และอยู่ใน us-central1 คำสั่งของคุณจะเป็น

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

การก้าวผ่าน DAG

โค้ด DAG ใน trigger_response.py มีลักษณะดังนี้

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

ส่วน default_args มีอาร์กิวเมนต์เริ่มต้นตามที่โมเดล BaseOperator ต้องการใน Apache Airflow คุณจะเห็นส่วนนี้พร้อมพารามิเตอร์เหล่านี้ใน Apache Airflow DAG ทั้งหมด ขณะนี้ owner ตั้งค่าเป็น Composer Example แต่คุณสามารถเปลี่ยนเป็นชื่อของคุณได้หากต้องการ depends_on_past แสดงให้เราทราบว่า DAG นี้ไม่ได้อาศัย DAG ใดๆ ก่อนหน้านี้ ส่วนอีเมล 3 ส่วน ได้แก่ email, email_on_failure และ email_on_retry ได้รับการตั้งค่าไว้เพื่อให้ไม่มีการแจ้งเตือนทางอีเมลตามสถานะของ DAG นี้ DAG จะลองอีกครั้งเพียงครั้งเดียว เนื่องจากมีการตั้งค่า retries เป็น 1 และจะดำเนินการดังกล่าวหลังจากผ่านไป 5 นาทีต่อ retry_delay โดยปกติ start_date จะกำหนดเวลาที่ควรเรียกใช้ DAG ร่วมกับ schedule_interval (ตั้งค่าในภายหลัง) แต่ในกรณีของ DAG นี้ไม่เกี่ยวข้อง ระบบได้ตั้งค่าให้เป็นวันที่ 1 มกราคม 2017 แต่สามารถตั้งค่าเป็นวันที่ในอดีตได้

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

ส่วน with airflow.DAG กำหนดค่า DAG ที่จะเรียกใช้ โดยจะเรียกใช้ด้วยรหัสงาน composer_sample_trigger_response_dag ซึ่งเป็นอาร์กิวเมนต์เริ่มต้นจากส่วน default_args และที่สำคัญที่สุดคือ schedule_interval ของ None schedule_interval ได้รับการตั้งค่าเป็น None เนื่องจากเรากำลังทริกเกอร์ DAG นี้โดยเฉพาะด้วย Cloud Function นี่เป็นสาเหตุที่ start_date ใน default_args ไม่เกี่ยวข้อง

เมื่อเรียกใช้ DAG จะพิมพ์การกำหนดค่าตามที่กำหนดไว้ในตัวแปร print_gcs_info

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. ทดสอบฟังก์ชันของคุณ

เปิดสภาพแวดล้อมคอมโพสเซอร์ และในแถวที่มีชื่อสภาพแวดล้อมของคุณ ให้คลิกลิงก์ Airflow

เปิด composer_sample_trigger_response_dag โดยคลิกชื่อ ขณะนี้ยังไม่มีหลักฐานใดๆ เกี่ยวกับการเรียกใช้ DAG เนื่องจากเรายังไม่ได้เรียกใช้ DAG ดังกล่าว หาก DAG นี้ไม่ปรากฏหรือคลิกไม่ได้ โปรดรอ 1 นาทีและรีเฟรชหน้าเว็บ

เปิดแท็บแยกต่างหากและอัปโหลดไฟล์ไปยังที่เก็บข้อมูล Cloud Storage ที่คุณสร้างไว้ก่อนหน้านี้และระบุไว้เป็นทริกเกอร์สำหรับ Cloud Function คุณสามารถดำเนินการดังกล่าวได้ผ่านคอนโซลหรือใช้คำสั่ง gsutil

กลับไปที่แท็บที่มี Airflow UI แล้วคลิกมุมมองกราฟ

คลิกงาน print_gcs_info ซึ่งควรเป็นสีเขียว

คลิก "ดูบันทึก" ที่ด้านขวาบนของเมนู

คุณจะเห็นข้อมูลเกี่ยวกับไฟล์ที่อัปโหลดไปยังที่เก็บข้อมูล Cloud Storage ในบันทึก

ยินดีด้วย คุณเพิ่งทริกเกอร์ Airflow DAG โดยใช้ Node.js และ Google Cloud Functions

7. ล้างข้อมูล

โปรดทำดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP สำหรับทรัพยากรที่ใช้ในการเริ่มต้นอย่างรวดเร็วนี้

  1. (ไม่บังคับ) หากต้องการบันทึกข้อมูล ให้ดาวน์โหลดข้อมูลจากที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อม Cloud Composer และที่เก็บข้อมูลของพื้นที่เก็บข้อมูลที่คุณสร้างขึ้นสำหรับการเริ่มต้นอย่างรวดเร็วนี้
  2. ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อมและที่คุณสร้างขึ้น
  3. ลบสภาพแวดล้อม Cloud Composer โปรดทราบว่าการลบสภาพแวดล้อมจะไม่ลบที่เก็บข้อมูลของพื้นที่เก็บข้อมูลสำหรับสภาพแวดล้อมนั้น
  4. (ไม่บังคับ) เมื่อใช้การประมวลผลแบบ Serverless การเรียกใช้ 2 ล้านครั้งแรกต่อเดือนจะไม่มีค่าใช้จ่าย และเมื่อคุณปรับขนาดฟังก์ชันเป็น 0 ระบบจะไม่เรียกเก็บเงินจากคุณ (ดูรายละเอียดเพิ่มเติมที่ราคา) แต่หากต้องการลบ Cloud Function ก็ให้คลิก "ลบ" ที่ด้านขวาบนของหน้าภาพรวมสำหรับฟังก์ชัน

4fe11e1b41b32ba2.png

นอกจากนี้ คุณยังเลือกลบโปรเจ็กต์ได้ด้วย โดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเครื่องเพื่อลบโปรเจ็กต์