使用 Node.JS 和 Google Cloud Functions 触发 DAG

1. 简介

Apache Airflow 旨在定期运行 DAG,但您也可以对事件(例如,Cloud Storage 存储分区发生更改或推送到 Cloud Pub/Sub 的消息)触发 DAG。为此,您可以使用 Cloud Functions 函数触发 Cloud Composer DAG。

本实验中的示例会在 Cloud Storage 存储分区每次发生更改时运行一个简单的 DAG。此 DAG 使用 BashOperator 运行 bash 命令,输出有关上传到 Cloud Storage 存储分区的内容的更改信息。

在开始本实验之前,建议您先完成 Cloud Composer 简介Cloud Functions 使用入门 Codelab。如果您在“Cloud Composer 简介”Codelab 中创建了 Composer 环境,则可以在本实验中使用该环境。

构建内容

在此 Codelab 中,您将:

  1. 将文件上传到 Google Cloud Storage
  2. 使用 Node.JS 运行时触发 Google Cloud Functions 函数
  3. 此函数将在 Google Cloud Composer 中执行 DAG
  4. 这会运行一个简单的 bash 命令,输出对 Google Cloud Storage 存储分区的更改

1d3d3736624a923f

学习内容

  • 如何使用 Google Cloud Functions 和 Node.js 触发 Apache Airflow DAG

所需条件

  • GCP 账号
  • 对 JavaScript 有基本的了解
  • 具备 Cloud Composer/Airflow 和 Cloud Functions 方面的基础知识
  • 熟练使用 CLI 命令

2. 设置 GCP

选择或创建项目

选择或创建 Google Cloud Platform 项目。如果您要创建新项目,请按照此处的步骤操作。

记下您的项目 ID,您将在后续步骤中使用它。

在创建新项目时,项目 ID 位于创建页面上的“项目名称”正下方

如果您已经创建了项目,则可以在控制台首页的“项目信息”卡片中找到该 ID

启用 API

启用 Cloud Composer、Google Cloud Functions 和 Cloud Identity and Google Identity and Access Management (IAM) API。

创建 Composer 环境

使用以下配置创建一个 Cloud Composer 环境

  • 名称:my-Compose-environment
  • 地理位置:离您最近的地理位置
  • 可用区:该区域中的任何可用区

所有其他配置可以保留默认值。点击“创建”记下您的 Composer 环境名称和位置,您在后面的步骤中需要用到这些信息。

创建 Cloud Storage 存储分区

在您的项目中,创建一个具有以下配置的 Cloud Storage 存储分区

  • 名称:<your-project-id>
  • 默认存储类别:Multi-regional
  • 位置:在地理位置上最接近您所使用的 Cloud Composer 区域的位置
  • 访问权限控制模型:设置对象级和存储分区级权限

点击“创建”请务必记下 Cloud Storage 存储分区的名称,以备后续步骤使用。

3. 设置 Google Cloud Functions (GCF)

为了设置 GCF,我们将在 Google Cloud Shell 中运行命令。

虽然可以使用 gcloud 命令行工具从笔记本电脑远程操作 Google Cloud,但在此 Codelab 中,我们将使用 Google Cloud Shell,这是一个在云端运行的命令行环境。

基于 Debian 的这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5GB 主目录,并且在 Google Cloud 上运行,大大增强了网络性能和身份验证功能。这意味着在本 Codelab 中,您只需要一个浏览器(没错,它适用于 Chromebook)。

如需激活 Google Cloud Shell,请在开发者控制台中点击右上角的按钮(配置和连接到环境应该只需要片刻时间):

向 Cloud Functions 服务账号授予 blob 签名权限

为了让 GCF 向 Cloud IAP(保护 Airflow Web 服务器的代理)进行身份验证,您需要为 Appspot 服务账号 GCF 授予 Service Account Token Creator 角色。为此,请在 Cloud Shell 中运行以下命令(将 <your-project-id> 替换为您的项目名称)。

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

例如,如果您的项目名为 my-project,则您的命令为

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

获取客户端 ID

为构建一个用于向 Cloud IAP 进行身份验证的令牌,该函数需要使用保护 Airflow Web 服务器的代理的客户端 ID。Cloud Composer API 不会直接提供此信息,您可以改为向 Airflow Web 服务器发出未经身份验证的请求,并从重定向网址中捕获客户端 ID。为此,我们将使用 Cloud Shell 运行一个 Python 文件来捕获客户端 ID。

在 Cloud Shell 中运行以下命令,从 GitHub 下载必要的代码

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

如果您收到错误是因为此目录已存在,请通过运行以下命令将其更新到最新版本

cd python-docs-samples/
git pull origin master

运行以下命令切换到相应目录:

cd python-docs-samples/composer/rest

运行 Python 代码以获取客户端 ID,并将 <your-project-id> 替换为您的项目名称,将 <your-composer-location> 替换为您之前创建的 Composer 环境的位置,以及之前为 <your-composer-environment> 创建的 Composer 环境的名称

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

例如,如果您的项目名称为 my-project,Composer 位置为 us-central1,环境名称为 my-composer,则您的命令为

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py 会执行以下操作:

  • 使用 Google Cloud 进行身份验证
  • 向 Airflow Web 服务器发出未经身份验证的 HTTP 请求以获取重定向 URI
  • 从该重定向中提取 client_id 查询参数
  • 打印出来供您使用

客户端 ID 将在命令行中输出,显示如下:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. 创建函数

在 Cloud Shell 中,运行以下命令,使用必要的示例代码克隆代码库

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

切换到所需的目录,并让 Cloud Shell 保持打开状态,同时完成接下来的几个步骤

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

点击导航菜单,然后点击“Cloud Functions”,以转到 Google Cloud Functions 页面

点击“创建函数”页首

将您的函数命名为“my-function”将内存保留为默认值,即 256MB

将触发器设置为“Cloud Storage”,将“事件类型”保留为“敲定/创建”,然后找到您在“创建 Cloud Storage 存储分区”步骤中创建的存储分区。

将源代码设置为“内嵌编辑器”并将运行时设置为“Node.js 8”

在 Cloud Shell 中,运行以下命令。此操作将在 Cloud Shell 编辑器中打开 index.js 和 package.json

cloudshell edit index.js package.json

点击 package.json 标签页,复制该代码并将其粘贴到 Cloud Functions 内嵌编辑器的 package.json 部分

设置“要执行的函数”到 triggerDag 了

点击 index.js 标签页,复制代码并将其粘贴到 Cloud Functions 内嵌编辑器的 index.js 部分中

PROJECT_ID 更改为您的项目 ID,将 CLIENT_ID 更改为您在“获取客户端 ID”步骤中保存的客户端 ID。请勿点击“创建”但仍然有几个步骤要填写!

在 Cloud Shell 中,运行以下命令,将 <your-environment-name> 替换为您所使用的环境名称>替换为您的 Composer 环境的名称以及 <your- Composer-region>与您的 Composer 环境所在区域相同。

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

例如,如果您的环境名为 my-composer-environment 且位于 us-central1,则您的命令为

gcloud composer environments describe my-composer-environment --location us-central1

输出应类似如下所示:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

在该输出中,查找名为 airflowUri 的变量。在 index.js 代码中,将 WEBSERVER_ID 更改为 Airflow Web 服务器 ID,该 ID 是 airflowUri 变量中含有“-tp”的部分例如 abc123efghi456k-tp

点击“更多”图标下拉菜单链接,然后选择与您地理位置最近的区域

勾选“失败时重试”

点击“创建”创建 Cloud Functions 函数

单步调试代码

您从 index.js 复制的代码将如下所示:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

我们来看看发生了什么。此处有三个函数:triggerDagauthorizeIapmakeIapPostRequest

triggerDag 是将内容上传到指定的 Cloud Storage 存储分区时触发的函数。您可以在其中配置其他请求中使用的重要变量,例如 PROJECT_IDCLIENT_IDWEBSERVER_IDDAG_NAME。它会调用 authorizeIapmakeIapPostRequest

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap 使用服务账号向保护 Airflow Web 服务器的代理发出请求并“交换”用于对 makeIapPostRequest 进行身份验证的 ID 令牌的 JWT。

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest 会调用 Airflow Web 服务器以触发 composer_sample_trigger_response_dag.。DAG 名称会嵌入到通过 url 参数传入的 Airflow Web 服务器网址中,而 idToken 则是我们从 authorizeIap 请求中获取的令牌。

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. 设置 DAG

在 Cloud Shell 中,切换到包含示例工作流的目录。它是您在“获取客户端 ID”步骤中从 GitHub 下载的 python-docs-samples 的一部分。

cd
cd python-docs-samples/composer/workflows

将 DAG 上传到 Composer

使用以下命令将示例 DAG 上传到 Composer 环境的 DAG 存储分区中,其中 <environment_name> 是 Composer 环境的名称,<location> 是 Composer 环境所在区域的名称。trigger_response_dag.py 是我们将使用的 DAG。

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

例如,如果您的 Composer 环境名为 my-composer 且位于 us-central1,则您的命令为

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

逐步执行 DAG

trigger_response.py 中的 DAG 代码如下所示

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args 部分包含 Apache Airflow 中的 BaseOperator 模型所需的默认参数。您会在任何 Apache Airflow DAG 中看到包含这些参数的这一部分。owner 当前设置为 Composer Example,但您可以根据需要将其更改为自己的名称。depends_on_past 表明此 DAG 不依赖于任何先前的 DAG。三个电子邮件部分(emailemail_on_failureemail_on_retry)已设置,以确保系统不会根据此 DAG 的状态发出任何电子邮件通知。由于 retries 设置为 1,此 DAG 只会重试一次,并且会在五分钟后针对每个 retry_delay 重试。start_date 通常用于指示 DAG 应在何时运行,并结合其 schedule_interval(稍后设置),但在此 DAG 中,它并不相关。它设置为 2017 年 1 月 1 日,但可以设置为过去的任何日期。

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG 部分用于配置将要运行的 DAG。该查询将使用任务 ID composer_sample_trigger_response_dagdefault_args 部分中的默认参数)运行,最重要的是,使用 schedule_intervalNoneschedule_interval 设置为 None,因为我们将使用 Cloud Functions 函数触发此特定 DAG。因此,default_args 中的 start_date 不相关。

执行时,DAG 会输出其配置,如 print_gcs_info 变量中所述。

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. 测试您的函数

打开 Composer 环境,然后在环境名称所在的行中点击 Airflow 链接

点击 composer_sample_trigger_response_dag 的名称将其打开。目前没有任何 DAG 运行的证据,因为我们尚未触发 DAG 运行。如果此 DAG 不可见或不可点击,请稍等片刻,然后刷新页面。

打开一个单独的标签页,然后将任意文件上传到您之前创建并指定为 Cloud Functions 函数触发器的 Cloud Storage 存储分区。您可以通过控制台或使用 gsutil 命令执行此操作。

导航回 Airflow 界面所在的标签页,然后点击“Graph View”

点击 print_gcs_info 任务,该任务应有绿色轮廓

点击“查看日志”菜单右上角

在日志中,您将看到您上传到 Cloud Storage 存储分区的文件的相关信息。

恭喜!您刚刚使用 Node.js 和 Google Cloud Functions 触发了 Airflow DAG!

7. 清理

为避免系统因本快速入门中使用的资源向您的 GCP 账号收取费用,请执行以下操作:

  1. (可选)如需保存数据,请从 Cloud Composer 环境的 Cloud Storage 存储分区和您为本快速入门创建的存储分区中下载数据
  2. 为环境删除 Cloud Storage 存储分区以及您创建的存储分区
  3. 删除 Cloud Composer 环境。请注意,删除环境并不会删除环境的存储分区。
  4. (可选)使用无服务器计算时,每月前 200 万次调用是免费的,如果将函数数量缩减到零,则无需付费(如需了解详情,请参阅价格)。但是,如果您要删除 Cloud Functions 函数,请点击“删除”您函数概览页面的右上角

4fe11e1b41b32ba2

您也可以选择删除项目:

  1. 在 GCP Console 中,转到项目页面。
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在框中输入项目 ID,然后点击关停以删除项目。