Kích hoạt DAG bằng Node.JS và Google Cloud Functions

1. Giới thiệu

Apache Airflow được thiết kế để chạy DAG theo lịch trình thường xuyên, nhưng bạn cũng có thể kích hoạt DAG theo các sự kiện, chẳng hạn như thay đổi trong bộ chứa Cloud Storage hoặc khi có thông báo được đẩy lên Cloud Pub/Sub. Để thực hiện việc này, có thể kích hoạt DAG của Cloud Composer bằng Cloud Functions.

Ví dụ trong phòng thí nghiệm này chạy một DAG đơn giản mỗi khi có thay đổi xảy ra trong bộ chứa Cloud Storage. DAG này sử dụng BashOperator để chạy một lệnh bash in thông tin thay đổi về nội dung đã được tải lên bộ chứa Cloud Storage.

Trước khi bắt đầu phòng thí nghiệm này, bạn nên hoàn thành các lớp học lập trình Giới thiệu về Cloud ComposerBắt đầu sử dụng Hàm đám mây. Nếu tạo một Môi trường Compose trong lớp học lập trình Giới thiệu về Cloud Composer, thì bạn có thể sử dụng môi trường đó trong phòng thí nghiệm này.

Sản phẩm bạn sẽ tạo ra

Trong lớp học lập trình này, bạn sẽ:

  1. Tải một tệp lên Google Cloud Storage. Thao tác này sẽ
  2. Kích hoạt Google Cloud Function bằng thời gian chạy Node.JS
  3. Hàm này sẽ thực thi một DAG trong Google Cloud Composer
  4. Thao tác này chạy một lệnh bash đơn giản để in nội dung thay đổi sang bộ chứa Google Cloud Storage

1d3d3736624a923f.png.

Kiến thức bạn sẽ học được

  • Cách kích hoạt DAG Apache Airflow bằng Google Cloud Functions + Node.js

Bạn cần

  • Tài khoản GCP
  • Hiểu biết cơ bản về JavaScript
  • Kiến thức cơ bản về Cloud Composer/Airflow và Cloud Functions
  • Điều chỉnh mức độ thoải mái bằng các lệnh CLI

2. Thiết lập GCP

Chọn hoặc Tạo dự án

Chọn hoặc tạo một Dự án Google Cloud Platform. Nếu bạn tạo một dự án mới, hãy làm theo các bước tại đây.

Ghi lại Mã dự án của bạn để sử dụng trong các bước sau.

Nếu đang tạo một dự án mới, bạn sẽ thấy mã dự án ngay bên dưới Tên dự án trên trang tạo

Nếu đã tạo một dự án, bạn có thể tìm thấy mã này trên trang chủ của bảng điều khiển trong thẻ Thông tin dự án

Bật các API

Bật Cloud Composer, Google Cloud Functions, Cloud Identity và API Quản lý danh tính và quyền truy cập (IAM) của Google.

Tạo môi trường Compose

Tạo môi trường Cloud Composer với cấu hình sau:

  • Tên: my-composer-environment
  • Vị trí: Bất kể vị trí nào gần bạn nhất về địa lý
  • Vùng: Bất kỳ vùng nào trong khu vực đó

Có thể giữ nguyên tất cả cấu hình khác theo mặc định. Nhấp vào "Tạo" ở dưới cùng.Ghi lại tên và vị trí Môi trường của Trình soạn thảo - bạn sẽ cần chúng trong các bước sau này.

Tạo bộ chứa Cloud Storage

Trong dự án của bạn, hãy tạo một bộ chứa Cloud Storage với cấu hình sau:

  • Tên: <your-project-id>
  • Lớp bộ nhớ mặc định: Nhiều khu vực
  • Vị trí: Bất kể vị trí nào ở gần khu vực Cloud Composer mà bạn đang sử dụng nhất
  • Mô hình kiểm soát quyền truy cập: Đặt quyền ở cấp đối tượng và cấp bộ chứa

Nhấn "Tạo" khi đã sẵn sàng Hãy nhớ ghi lại tên bộ chứa Cloud Storage của bạn để thực hiện các bước sau.

3. Thiết lập Google Cloud Functions (GCF)

Để thiết lập GCF, chúng ta sẽ chạy các lệnh trong Google Cloud Shell.

Mặc dù bạn có thể vận hành Google Cloud từ xa trên máy tính xách tay bằng công cụ dòng lệnh gcloud, nhưng trong lớp học lập trình này, chúng ta sẽ sử dụng Google Cloud Shell, một môi trường dòng lệnh chạy trên Đám mây.

Máy ảo dựa trên Debian này được tải tất cả các công cụ phát triển mà bạn cần. Dịch vụ này cung cấp thư mục gốc có dung lượng ổn định 5 GB và chạy trên Google Cloud, giúp nâng cao đáng kể hiệu suất và khả năng xác thực của mạng. Tức là tất cả những gì bạn cần để thực hiện lớp học lập trình này là một trình duyệt (vâng, trình duyệt này hoạt động trên Chromebook).

Để kích hoạt Google Cloud Shell, trong bảng điều khiển dành cho nhà phát triển, hãy nhấp vào nút ở trên cùng bên phải (chỉ mất vài phút để cấp phép và kết nối với môi trường):

Cấp quyền ký blob vào Tài khoản dịch vụ Cloud Functions

Để GCF xác thực với Cloud IAP (máy chủ proxy bảo vệ máy chủ web của Airflow), bạn cần cấp cho GCF (tài khoản dịch vụ Appspot) vai trò Service Account Token Creator. Bạn có thể thực hiện việc này bằng cách chạy lệnh sau trong Cloud Shell, thay thế tên dự án bằng <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Ví dụ: nếu dự án của bạn có tên là my-project, lệnh sẽ là

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Lấy mã ứng dụng khách

Để tạo mã thông báo nhằm xác thực IAP trên Cloud, hàm này sẽ yêu cầu mã ứng dụng khách của proxy giúp bảo vệ máy chủ web của Airflow. Cloud Composer API không trực tiếp cung cấp thông tin này. Thay vào đó, hãy gửi một yêu cầu chưa được xác thực đến máy chủ web Airflow và thu thập mã ứng dụng khách từ URL chuyển hướng. Chúng ta sẽ thực hiện việc đó bằng cách chạy một tệp python bằng Cloud Shell để thu thập mã ứng dụng khách.

Tải mã cần thiết từ GitHub xuống bằng cách chạy lệnh sau trong Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Nếu bạn gặp lỗi do thư mục này đã tồn tại, hãy cập nhật thư mục lên phiên bản mới nhất bằng cách chạy lệnh sau

cd python-docs-samples/
git pull origin master

Thay đổi thư mục thích hợp bằng cách chạy

cd python-docs-samples/composer/rest

Chạy mã python để lấy mã ứng dụng khách của bạn, thay thế tên dự án cho <your-project-id>, vị trí của môi trường Composer mà bạn đã tạo trước đó cho <your-composer-location> và tên của môi trường Composer mà bạn đã tạo trước đó cho <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Ví dụ: nếu tên dự án là my-project, vị trí Trình soạn thảo của bạn là us-central1 và tên môi trường là my-composer, thì lệnh của bạn sẽ là

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py sẽ thực hiện những việc sau:

  • Xác thực bằng Google Cloud
  • Gửi một yêu cầu HTTP chưa được xác thực đến máy chủ web Airflow để nhận URI chuyển hướng
  • Trích xuất tham số truy vấn client_id từ lệnh chuyển hướng đó
  • In để bạn sử dụng

Client-ID của bạn sẽ được in ra dòng lệnh và có dạng như sau:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Tạo hàm

Trong Cloud Shell, hãy sao chép kho lưu trữ với mã mẫu cần thiết bằng cách chạy

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Hãy chuyển sang thư mục cần thiết và để Cloud Shell mở trong khi bạn hoàn tất một số bước tiếp theo

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Chuyển đến trang Google Cloud Functions bằng cách nhấp vào Trình đơn điều hướng rồi nhấp vào "Cloud Functions"

Nhấp vào "TẠO CHỨC NĂNG" ở đầu trang

Đặt tên cho hàm là "hàm của tôi" và để bộ nhớ theo mặc định là 256 MB.

Đặt Điều kiện kích hoạt thành "Cloud Storage", để loại Sự kiện là "Hoàn tất/Tạo" và duyệt đến bộ chứa bạn đã tạo ở bước Tạo bộ chứa Cloud Storage.

Đặt Mã nguồn thành "Trình chỉnh sửa cùng dòng" và đặt thời gian chạy thành "Node.js 8"

Trong Cloud Shell, hãy chạy lệnh sau. Thao tác này sẽ mở http://index.js và package.json trong Cloud Shell Editor (Trình chỉnh sửa Cloud Shell)

cloudshell edit index.js package.json

Nhấp vào thẻ package.json, sao chép và dán mã đó vào phần package.json của trình chỉnh sửa nội tuyến Cloud Functions

Đặt "Hàm để thực thi" để kích hoạtDag

Nhấp vào tabindex.js, sao chép mã và dán mã đó vào phầnindex.js của trình chỉnh sửa nội tuyến Hàm đám mây

Thay đổi PROJECT_ID thành mã dự án, CLIENT_ID thành mã ứng dụng khách mà bạn đã lưu ở bước Lấy mã ứng dụng khách. KHÔNG nhấp vào "Tạo" Tuy nhiên, bạn vẫn cần hoàn thành một số việc nữa!

Trong Cloud Shell, hãy chạy lệnh sau, thay thế <your-environment-name> bằng tên môi trường Composer và <your-composer-region> của bạn bằng khu vực đặt Môi trường Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Ví dụ: nếu môi trường của bạn có tên là my-composer-environment và nằm trong us-central1, lệnh của bạn sẽ là

gcloud composer environments describe my-composer-environment --location us-central1

Kết quả đầu ra sẽ có dạng như sau:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Trong kết quả đó, hãy tìm biến có tên airflowUri. Trong mãindex.js của bạn, hãy thay đổi WEBSERVER_ID thành Mã máy chủ web Airflow – đó là một phần của biến airflowUri sẽ có "-tp" ở cuối, ví dụ: abc123efghi456k-tp

Nhấp vào nút "Thêm" đường liên kết thả xuống, sau đó chọn Khu vực địa lý gần bạn nhất

Chọn "Thử lại khi không thành công"

Nhấp vào "Tạo" để tạo Hàm đám mây

Khám phá mã

Mã mà bạn đã sao chép từindex.js sẽ trông giống như sau:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Hãy xem điều gì đang xảy ra. Có 3 hàm ở đây: triggerDag, authorizeIapmakeIapPostRequest

triggerDag là hàm được kích hoạt khi chúng ta tải nội dung nào đó lên bộ chứa Cloud Storage được chỉ định. Đây là nơi chúng ta định cấu hình các biến quan trọng dùng trong các yêu cầu khác, chẳng hạn như PROJECT_ID, CLIENT_ID, WEBSERVER_IDDAG_NAME. Phương thức này gọi authorizeIapmakeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap gửi yêu cầu tới proxy bảo vệ máy chủ web Airflow bằng tài khoản dịch vụ và "trao đổi" JWT cho mã thông báo nhận dạng sẽ được dùng để xác thực makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest thực hiện lệnh gọi đến máy chủ web của Airflow để kích hoạt composer_sample_trigger_response_dag.. Tên DAG được nhúng trong URL máy chủ web của Airflow được truyền vào cùng với tham số urlidToken là mã thông báo mà chúng tôi nhận được trong yêu cầu authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Thiết lập DAG

Trong Cloud Shell, hãy thay đổi sang thư mục có quy trình làm việc mẫu. Đây là một phần của python-docs-samples mà bạn đã tải xuống từ GitHub trong bước Lấy mã ứng dụng khách.

cd
cd python-docs-samples/composer/workflows

Tải DAG lên Composer

Tải DAG mẫu lên bộ chứa lưu trữ DAG của môi trường Composer bằng lệnh sau, trong đó <environment_name> là tên của môi trường Composer và <location> là tên của khu vực nơi chứa DAG đó. trigger_response_dag.py là DAG mà chúng tôi sẽ làm việc cùng.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Ví dụ: nếu môi trường Trình soạn thảo của bạn có tên là my-composer và nằm trong us-central1, lệnh của bạn sẽ là

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Vượt qua DAG

Mã DAG trong trigger_response.py có dạng như sau

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Phần default_args chứa các đối số mặc định theo yêu cầu của mô hình BaseOperator trong Apache Airflow. Bạn sẽ thấy mục này cùng với các tham số này trong mọi DAG của Apache Airflow. owner hiện được đặt thành Composer Example, nhưng bạn có thể thay đổi thành tên của mình nếu muốn. depends_on_past cho thấy rằng DAG này không phụ thuộc vào bất kỳ DAG nào trước đó. Ba phần của email, email, email_on_failureemail_on_retry được thiết lập để không có thông báo qua email nào được gửi dựa trên trạng thái của DAG này. DAG sẽ chỉ thử lại một lần, vì retries được đặt thành 1 và sẽ thực hiện lại sau 5 phút, theo retry_delay. start_date thường quy định thời điểm nên chạy DAG, cùng với schedule_interval (thiết lập sau) nhưng không phù hợp đối với DAG này. Ngày này được đặt là ngày 1 tháng 1 năm 2017, nhưng bạn có thể đặt thành bất kỳ ngày nào trong quá khứ.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

Phần with airflow.DAG định cấu hình DAG sẽ được chạy. Thao tác này sẽ chạy bằng mã tác vụ composer_sample_trigger_response_dag, các đối số mặc định trong phần default_args và quan trọng nhất là với schedule_intervalNone. schedule_interval được đặt thành None vì chúng ta đang kích hoạt DAG cụ thể này bằng Chức năng đám mây của mình. Đây là lý do tại sao start_date trong default_args không liên quan.

Khi thực thi, DAG sẽ in cấu hình như quy định trong biến print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Kiểm tra chức năng

Mở Composer Environment (Môi trường Composer) và trong hàng có tên môi trường, hãy nhấp vào đường liên kết Airflow

Mở composer_sample_trigger_response_dag bằng cách nhấp vào tên của nó. Hiện tại, sẽ không có bất kỳ bằng chứng nào về việc chạy DAG, vì chúng tôi chưa kích hoạt DAG để chạy.Nếu bạn không nhìn thấy hoặc nhấp vào được DAG này, hãy đợi một phút rồi làm mới trang.

Mở một thẻ riêng và tải tệp bất kỳ lên bộ chứa Cloud Storage mà bạn đã tạo trước đó và chỉ định làm điều kiện kích hoạt cho Chức năng đám mây của bạn. Bạn có thể thực hiện việc này qua Bảng điều khiển hoặc sử dụng lệnh YT.

Quay lại thẻ có giao diện người dùng Airflow rồi nhấp vào Chế độ xem biểu đồ

Nhấp vào nhiệm vụ print_gcs_info (sẽ có đường viền màu xanh lục)

Nhấp vào "Xem nhật ký" ở góc trên bên phải trình đơn

Trong nhật ký, bạn sẽ thấy thông tin về tệp mà bạn đã tải lên bộ chứa Cloud Storage.

Xin chúc mừng! Bạn vừa kích hoạt một DAG của Airflow bằng Node.js và Google Cloud Functions!

7. Dọn dẹp

Để tránh bị tính phí vào tài khoản GCP đối với các tài nguyên được sử dụng trong phần khởi động nhanh này, hãy làm như sau:

  1. (Không bắt buộc) Để lưu dữ liệu của bạn, hãy tải dữ liệu xuống từ bộ chứa Cloud Storage cho môi trường Cloud Composer và bộ chứa lưu trữ bạn đã tạo cho bước bắt đầu nhanh này.
  2. Xoá bộ chứa Cloud Storage đối với môi trường và bộ chứa bạn đã tạo
  3. Xoá môi trường Cloud Composer. Xin lưu ý rằng việc xoá môi trường sẽ không xoá bộ chứa lưu trữ của môi trường đó.
  4. (Không bắt buộc) Với điện toán không máy chủ, 2 triệu lệnh gọi đầu tiên mỗi tháng là miễn phí và khi mở rộng quy mô hàm về 0, bạn sẽ không bị tính phí (xem giá để biết thêm chi tiết). Tuy nhiên, nếu bạn muốn xoá Hàm đám mây của mình, hãy nhấp vào "XOÁ" ở trên cùng bên phải trang tổng quan cho hàm của bạn

4fe11e1b41b32ba2.png.

Bạn cũng có thể tuỳ ý xoá dự án:

  1. Trong Bảng điều khiển GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.