Kích hoạt DAG bằng Node.JS và Google Cloud Functions

1. Giới thiệu

Apache Airflow được thiết kế để chạy DAG theo lịch trình thường xuyên, nhưng bạn cũng có thể kích hoạt DAG để phản hồi các sự kiện, chẳng hạn như thay đổi trong một bộ chứa Cloud Storage hoặc một thông báo được chuyển đến Cloud Pub/Sub. Để thực hiện việc này, bạn có thể kích hoạt DAG của Cloud Composer bằng Cloud Functions.

Ví dụ trong phòng thí nghiệm này chạy một DAG đơn giản mỗi khi có thay đổi xảy ra trong một bộ chứa Cloud Storage. DAG này sử dụng BashOperator để chạy một lệnh bash in thông tin thay đổi về nội dung đã được tải lên bộ chứa Cloud Storage.

Trước khi bắt đầu bài thực hành này, bạn nên hoàn thành lớp học lập trình Giới thiệu về Cloud ComposerBắt đầu sử dụng Cloud Functions. Nếu tạo một Môi trường Composer trong lớp học lập trình Giới thiệu về Cloud Composer, bạn có thể sử dụng môi trường đó trong phòng thí nghiệm này.

Sản phẩm bạn sẽ tạo ra

Trong lớp học lập trình này, bạn sẽ:

  1. Tải một tệp lên Google Cloud Storage. Thao tác này sẽ
  2. Kích hoạt một Hàm Google Cloud bằng thời gian chạy Node.JS
  3. Hàm này sẽ thực thi một DAG trong Google Cloud Composer
  4. Lệnh này chạy một lệnh bash đơn giản để in nội dung thay đổi vào bộ chứa Google Cloud Storage

1d3d3736624a923f.png

Kiến thức bạn sẽ học được

  • Cách kích hoạt một DAG của Apache Airflow bằng Google Cloud Functions + Node.js

Những thông tin bạn cần

  • Tài khoản GCP
  • Hiểu biết cơ bản về JavaScript
  • Kiến thức cơ bản về Cloud Composer/Airflow và Cloud Functions
  • Thoải mái khi sử dụng các lệnh CLI

2. Thiết lập GCP

Chọn hoặc tạo dự án

Chọn hoặc tạo một dự án trên Google Cloud Platform. Nếu bạn đang tạo một dự án mới, hãy làm theo các bước tại đây.

Ghi lại mã dự án để sử dụng trong các bước sau.

Nếu bạn đang tạo một dự án mới, thì mã dự án sẽ nằm ngay bên dưới Tên dự án trên trang tạo

Nếu đã tạo một dự án, bạn có thể tìm thấy mã nhận dạng trên trang chủ của bảng điều khiển trong thẻ Thông tin dự án

Bật các API

Bật Cloud Composer, Google Cloud Functions, Cloud Identity và API Quản lý danh tính và quyền truy cập (IAM) của Google.

Tạo môi trường Composer

Tạo một môi trường Cloud Composer có cấu hình sau:

  • Tên: my-composer-environment
  • Vị trí: Bất kỳ vị trí nào gần bạn nhất về mặt địa lý
  • Vùng: Mọi vùng trong khu vực đó

Bạn có thể giữ nguyên tất cả các cấu hình khác theo mặc định. Nhấp vào "Tạo" ở dưới cùng.Ghi lại tên và vị trí của Môi trường Composer – bạn sẽ cần những thông tin này trong các bước sau.

Tạo bộ chứa Cloud Storage

Trong dự án của bạn, hãy tạo một bộ chứa Cloud Storage có cấu hình như sau:

  • Tên: <your-project-id>
  • Lớp lưu trữ mặc định: Đa khu vực
  • Vị trí: Bất kỳ vị trí nào gần nhất về mặt địa lý với khu vực Cloud Composer mà bạn đang sử dụng
  • Mô hình kiểm soát quyền truy cập: Thiết lập quyền ở cấp đối tượng và cấp bộ chứa

Nhấn vào "Tạo" khi bạn đã sẵn sàng. Nhớ ghi lại tên bộ chứa Cloud Storage để dùng cho các bước sau.

3. Thiết lập Google Cloud Functions (GCF)

Để thiết lập GCF, chúng ta sẽ chạy các lệnh trong Google Cloud Shell.

Mặc dù bạn có thể vận hành Google Cloud từ xa trên máy tính xách tay bằng công cụ dòng lệnh gcloud, nhưng trong lớp học lập trình này, chúng ta sẽ sử dụng Google Cloud Shell, một môi trường dòng lệnh chạy trên Cloud.

Máy ảo dựa trên Debian này được trang bị tất cả các công cụ phát triển mà bạn cần. Nó cung cấp một thư mục chính có dung lượng 5 GB và chạy trên Google Cloud, giúp tăng cường đáng kể hiệu suất mạng và xác thực. Điều này có nghĩa là bạn chỉ cần một trình duyệt (có, trình duyệt này hoạt động trên Chromebook) cho lớp học lập trình này.

Để kích hoạt Google Cloud Shell, hãy nhấp vào nút ở phía trên cùng bên phải của bảng điều khiển dành cho nhà phát triển (chỉ mất vài giây để cung cấp và kết nối với môi trường):

Cấp quyền ký blob cho Tài khoản dịch vụ Cloud Functions

Để GCF xác thực với Cloud IAP, tức là proxy bảo vệ máy chủ web Airflow, bạn cần cấp cho Tài khoản dịch vụ Appspot vai trò Service Account Token Creator GCF. Để làm như vậy, hãy chạy lệnh sau trong Cloud Shell, thay thế tên dự án của bạn bằng <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Ví dụ: nếu dự án của bạn có tên là my-project, thì lệnh sẽ là

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Lấy mã ứng dụng khách

Để tạo mã thông báo nhằm xác thực với Cloud IAP, hàm này yêu cầu mã ứng dụng khách của proxy bảo vệ máy chủ web Airflow. API Cloud Composer không cung cấp trực tiếp thông tin này. Thay vào đó, hãy đưa ra một yêu cầu chưa xác thực đến máy chủ web Airflow và thu thập mã ứng dụng khách từ URL chuyển hướng. Chúng ta sẽ thực hiện việc đó bằng cách chạy một tệp python bằng Cloud Shell để thu thập mã ứng dụng khách.

Tải mã cần thiết xuống từ GitHub bằng cách chạy lệnh sau trong Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Nếu bạn gặp lỗi do thư mục này đã tồn tại, hãy cập nhật thư mục lên phiên bản mới nhất bằng cách chạy lệnh sau

cd python-docs-samples/
git pull origin master

Thay đổi thành thư mục thích hợp bằng cách chạy

cd python-docs-samples/composer/rest

Chạy mã python để lấy mã ứng dụng khách của bạn, thay thế tên dự án bằng <your-project-id>, vị trí của môi trường Composer mà bạn đã tạo trước đó bằng <your-composer-location> và tên của môi trường Composer mà bạn đã tạo trước đó bằng <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Ví dụ: nếu tên dự án của bạn là my-project, vị trí Composer là us-central1 và tên môi trường là my-composer, thì lệnh của bạn sẽ là

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py sẽ thực hiện những việc sau:

  • Xác thực bằng Google Cloud
  • Gửi một yêu cầu HTTP chưa được xác thực đến máy chủ web Airflow để lấy URI chuyển hướng
  • Trích xuất tham số truy vấn client_id từ lệnh chuyển hướng đó
  • In ra để bạn sử dụng

Mã ứng dụng khách của bạn sẽ được in trên dòng lệnh và có dạng như sau:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Tạo hàm

Trong Cloud Shell, hãy sao chép kho lưu trữ có mã mẫu cần thiết bằng cách chạy

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Chuyển sang thư mục cần thiết và để Cloud Shell mở trong khi bạn hoàn tất một vài bước tiếp theo

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Chuyển đến trang Google Cloud Functions bằng cách nhấp vào trình đơn Điều hướng, rồi nhấp vào "Cloud Functions"

Nhấp vào "CREATE FUNCTION" ở đầu trang

Đặt tên cho hàm là "my-function" và để nguyên bộ nhớ ở mức mặc định là 256 MB.

Đặt Điều kiện kích hoạt thành "Cloud Storage", giữ nguyên Loại sự kiện là "Hoàn tất/Tạo" và duyệt đến bộ chứa mà bạn đã tạo ở bước Tạo bộ chứa Cloud Storage.

Giữ Mã nguồn ở chế độ "Trình chỉnh sửa nội tuyến" và đặt thời gian chạy thành "Node.js 8"

Trong Cloud Shell, hãy chạy lệnh sau. Thao tác này sẽ mở index.js và package.json trong Cloud Shell Editor

cloudshell edit index.js package.json

Nhấp vào thẻ package.json, sao chép mã đó rồi dán vào phần package.json của trình chỉnh sửa nội tuyến Cloud Functions

Đặt "Hàm cần thực thi" thành triggerDag

Nhấp vào thẻ index.js, sao chép mã rồi dán vào phần index.js của trình chỉnh sửa nội tuyến Cloud Functions

Thay đổi PROJECT_ID thành mã dự án của bạn, CLIENT_ID thành mã ứng dụng khách mà bạn đã lưu ở bước Lấy mã ứng dụng khách. ĐỪNG nhấp vào "Tạo" vội, bạn vẫn cần điền thêm một số thông tin nữa!

Trong Cloud Shell, hãy chạy lệnh sau, thay <your-environment-name> bằng tên của môi trường Composer và <your-composer-region> bằng khu vực nơi Môi trường Composer của bạn được đặt.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Ví dụ: nếu môi trường của bạn có tên là my-composer-environment và nằm trong us-central1, thì lệnh sẽ là

gcloud composer environments describe my-composer-environment --location us-central1

Kết quả đầu ra sẽ có dạng như sau:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Trong đầu ra đó, hãy tìm biến có tên là airflowUri. Trong mã index.js, hãy thay đổi WEBSERVER_ID thành mã nhận dạng máy chủ web Airflow – đây là phần của biến airflowUri sẽ có "-tp" ở cuối, ví dụ: abc123efghi456k-tp

Nhấp vào đường liên kết thả xuống "Khác", sau đó chọn Khu vực gần bạn nhất về mặt địa lý

Chọn "Retry on Failure" (Thử lại khi thất bại)

Nhấp vào "Tạo" để tạo Cloud Function

Từng bước thực hiện mã

Mã bạn sao chép từ index.js sẽ có dạng như sau:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Hãy cùng xem điều gì đang xảy ra. Có 3 hàm ở đây: triggerDag, authorizeIapmakeIapPostRequest

triggerDag là hàm được kích hoạt khi chúng ta tải nội dung nào đó lên bộ chứa Cloud Storage được chỉ định. Đây là nơi chúng ta định cấu hình các biến quan trọng được dùng trong các yêu cầu khác, chẳng hạn như PROJECT_ID, CLIENT_ID, WEBSERVER_IDDAG_NAME. Hàm này gọi authorizeIapmakeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap đưa ra yêu cầu cho proxy bảo vệ máy chủ web Airflow, sử dụng tài khoản dịch vụ và "trao đổi" JWT để lấy mã nhận dạng sẽ được dùng để xác thực makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest gọi đến máy chủ web Airflow để kích hoạt composer_sample_trigger_response_dag. Tên DAG được nhúng trong URL máy chủ web Airflow được truyền vào bằng tham số urlidToken là mã thông báo mà chúng ta nhận được trong yêu cầu authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Thiết lập DAG

Trong Cloud Shell, hãy chuyển sang thư mục có các quy trình mẫu. Đây là một phần của python-docs-samples mà bạn đã tải xuống từ GitHub trong bước Lấy mã nhận dạng ứng dụng khách.

cd
cd python-docs-samples/composer/workflows

Tải DAG lên Composer

Tải DAG mẫu lên thùng lưu trữ DAG của môi trường Composer bằng lệnh sau, trong đó <environment_name> là tên của môi trường Composer và <location> là tên của khu vực nơi môi trường đó được đặt. trigger_response_dag.py là DAG mà chúng ta sẽ làm việc.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Ví dụ: nếu môi trường Composer của bạn có tên là my-composer và nằm trong us-central1, thì lệnh của bạn sẽ là

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Từng bước thực hiện DAG

Mã DAG trong trigger_response.py sẽ có dạng như sau

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Mục default_args chứa các đối số mặc định theo yêu cầu của mô hình BaseOperator trong Apache Airflow. Bạn sẽ thấy phần này cùng với các tham số sau trong mọi DAG của Apache Airflow. owner hiện được đặt thành Composer Example, nhưng bạn có thể thay đổi thành tên của mình nếu muốn. depends_on_past cho chúng ta thấy rằng DAG này không phụ thuộc vào bất kỳ DAG nào trước đó. Ba phần email email, email_on_failureemail_on_retry được thiết lập để không nhận được thông báo qua email dựa trên trạng thái của DAG này. DAG sẽ chỉ thử lại một lần, vì retries được đặt thành 1 và sẽ thử lại sau 5 phút, theo retry_delay. start_date thường chỉ định thời điểm DAG sẽ chạy, cùng với schedule_interval (được đặt sau) nhưng trong trường hợp DAG này, điều đó không liên quan. Ngày này được đặt là ngày 1 tháng 1 năm 2017, nhưng bạn có thể đặt thành bất kỳ ngày nào trong quá khứ.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

Phần with airflow.DAG định cấu hình DAG sẽ được chạy. Thao tác này sẽ được chạy với mã nhận dạng tác vụ composer_sample_trigger_response_dag, các đối số mặc định trong phần default_args và quan trọng nhất là với schedule_intervalNone. schedule_interval được đặt thành None vì chúng ta đang kích hoạt DAG cụ thể này bằng Cloud Function. Đó là lý do start_date trong default_args không liên quan.

Khi thực thi, DAG sẽ in cấu hình của nó, như được quy định trong biến print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Kiểm thử hàm

Mở Môi trường Composer và trong hàng có tên môi trường của bạn, hãy nhấp vào đường liên kết Airflow

Mở composer_sample_trigger_response_dag bằng cách nhấp vào tên của composer_sample_trigger_response_dag đó. Hiện tại, sẽ không có bằng chứng nào về các lần chạy DAG vì chúng ta chưa kích hoạt DAG để chạy.Nếu DAG này không xuất hiện hoặc không thể nhấp vào, hãy đợi một phút rồi làm mới trang.

Mở một thẻ riêng biệt rồi tải mọi tệp lên bộ chứa Cloud Storage mà bạn đã tạo trước đó và chỉ định làm điều kiện kích hoạt cho Cloud Function. Bạn có thể thực hiện việc này thông qua Bảng điều khiển hoặc dùng lệnh gsutil.

Chuyển về thẻ có giao diện người dùng Airflow rồi nhấp vào Graph View (Chế độ xem biểu đồ)

Nhấp vào việc cần làm print_gcs_info. Việc cần làm này sẽ được đánh dấu bằng màu xanh lục

Nhấp vào "Xem nhật ký" ở trên cùng bên phải của trình đơn

Trong nhật ký, bạn sẽ thấy thông tin về tệp mà bạn đã tải lên bộ chứa Cloud Storage.

Xin chúc mừng! Bạn vừa kích hoạt một DAG Airflow bằng Node.js và Google Cloud Functions!

7. Dọn dẹp

Để tránh phát sinh phí cho tài khoản GCP của bạn đối với các tài nguyên được dùng trong hướng dẫn nhanh này, hãy làm như sau:

  1. (Không bắt buộc) Để lưu dữ liệu, hãy tải dữ liệu xuống từ bộ chứa Cloud Storage cho môi trường Cloud Composer và bộ chứa bạn đã tạo cho hướng dẫn bắt đầu nhanh này.
  2. Xoá bộ chứa Cloud Storage cho môi trường mà bạn đã tạo
  3. Xoá môi trường Cloud Composer. Xin lưu ý rằng việc xoá môi trường sẽ không xoá bộ chứa lưu trữ cho môi trường đó.
  4. (Không bắt buộc) Với điện toán phi máy chủ, 2 triệu lệnh gọi đầu tiên mỗi tháng là miễn phí và khi giảm quy mô hàm xuống 0, bạn sẽ không bị tính phí (xem giá để biết thêm thông tin). Tuy nhiên, nếu bạn muốn xoá Cloud Function, hãy nhấp vào "XOÁ" ở trên cùng bên phải trang tổng quan cho hàm của bạn

4fe11e1b41b32ba2.png

Bạn cũng có thể xoá dự án (không bắt buộc):

  1. Trong Bảng điều khiển của GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.