Node.JS 및 Google Cloud Functions로 DAG 트리거

1. 소개

Apache Airflow는 DAG를 정기적으로 실행하도록 설계되어 있지만 Cloud Storage 버킷의 변경이나 Cloud Pub/Sub에 푸시된 메시지와 같은 이벤트에 대한 응답으로 DAG를 트리거할 수도 있습니다. 이를 위해 Cloud Functions로 Cloud Composer DAG를 트리거할 수 있습니다.

이 실습의 예시는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 간단한 DAG를 실행합니다. 이 DAG는 BashOperator를 사용하여 Cloud Storage 버킷에 업로드된 항목에 관한 변경 정보를 출력하는 bash 명령어를 실행합니다.

이 실습을 시작하기 전에 Cloud Composer 소개Cloud Functions 시작하기 Codelab을 완료하는 것이 좋습니다. Cloud Composer 소개 Codelab에서 Composer 환경을 만든 경우 이 실습에서 해당 환경을 사용할 수 있습니다.

빌드할 항목

이 Codelab에서는 다음 작업을 해 봅니다.

  1. Google Cloud Storage에 파일을 업로드합니다.
  2. Node.JS 런타임을 사용하여 Google Cloud 함수 트리거
  3. 이 함수는 Google Cloud Composer에서 DAG를 실행합니다.
  4. Google Cloud Storage 버킷에 대한 변경사항을 출력하는 간단한 bash 명령어를 실행합니다.

1d3d3736624a923f.png

학습할 내용

  • Google Cloud Functions + Node.js를 사용하여 Apache Airflow DAG를 트리거하는 방법

필요한 항목

  • GCP 계정
  • JavaScript에 관한 기본 이해
  • Cloud Composer/Airflow 및 Cloud Functions에 대한 기본 지식
  • CLI 명령어 사용에 능숙함

2. GCP 설정

프로젝트 선택 또는 만들기

Google Cloud Platform 프로젝트를 선택하거나 만듭니다. 새 프로젝트를 만드는 경우 여기에 나온 단계를 따르세요.

나중에 사용할 수 있도록 프로젝트 ID를 기록해 둡니다.

새 프로젝트를 만드는 경우 생성 페이지의 프로젝트 이름 바로 아래에 프로젝트 ID가 표시됩니다.

이미 프로젝트를 만든 경우 콘솔 홈페이지의 프로젝트 정보 카드에서 ID를 찾을 수 있습니다.

API 사용 설정

Cloud Composer, Google Cloud Functions, Cloud Identity, Google Identity and Access Management (IAM) API를 사용 설정합니다.

Composer 환경 만들기

다음 구성으로 Cloud Composer 환경을 만듭니다.

  • 이름: my-composer-environment
  • 위치: 지리적으로 가장 가까운 위치
  • 영역: 해당 리전의 모든 영역

다른 모든 구성은 기본값으로 유지할 수 있습니다. 하단의 '만들기'를 클릭합니다. Composer 환경 이름과 위치를 기록해 둡니다. 나중에 필요합니다.

Cloud Storage 버킷 만들기

다음 구성으로 프로젝트에서 Cloud Storage 버킷을 만듭니다.

  • 이름: <your-project-id>
  • 기본 스토리지 클래스: 멀티 리전
  • 위치: 사용 중인 Cloud Composer 리전과 지리적으로 가장 가까운 위치
  • 액세스 제어 모델: 객체 수준 및 버킷 수준 권한 설정

준비가 되면 '만들기'를 누릅니다. 나중에 사용할 수 있도록 Cloud Storage 버킷의 이름을 기록해 둡니다.

3. Google Cloud Functions (GCF) 설정

GCF를 설정하기 위해 Google Cloud Shell에서 명령어를 실행합니다.

gcloud 명령줄 도구를 사용하여 노트북에서 Google Cloud를 원격으로 운영할 수 있지만, 이 Codelab에서는 클라우드에서 실행되는 명령줄 환경인 Google Cloud Shell을 사용합니다.

이 Debian 기반 가상 머신에는 필요한 모든 개발 도구가 로드되어 있습니다. 영구적인 5GB 홈 디렉토리를 제공하고 Google Cloud에서 실행되므로 네트워크 성능과 인증이 크게 개선됩니다. 즉, 이 Codelab에 필요한 것은 브라우저뿐입니다(Chromebook에서도 작동 가능).

Google Cloud Shell을 활성화하려면 개발자 콘솔에서 오른쪽 상단의 버튼을 클릭합니다. 환경을 프로비저닝하고 연결하는 데 몇 분 정도만 소요됩니다.

Cloud Functions 서비스 계정에 Blob 서명 권한 부여

GCF가 Airflow 웹 서버를 보호하는 프록시인 Cloud IAP에 인증하려면 Appspot 서비스 계정 GCF에 Service Account Token Creator 역할을 부여해야 합니다. Cloud Shell에서 다음 명령어를 실행하여 프로젝트 이름을 <your-project-id>로 대체합니다.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

예를 들어 프로젝트 이름이 my-project인 경우 명령어는 다음과 같습니다.

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

클라이언트 ID 가져오기

Cloud IAP에 인증하는 토큰을 작성하려면 함수에 Airflow 웹 서버를 보호하는 프록시의 클라이언트 ID가 필요합니다. Cloud Composer API는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처합니다. Cloud Shell을 사용하여 Python 파일을 실행하여 클라이언트 ID를 캡처합니다.

Cloud Shell에서 다음 명령어를 실행하여 GitHub에서 필요한 코드를 다운로드합니다.

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

이 디렉터리가 이미 있어 오류가 발생한 경우 다음 명령어를 실행하여 최신 버전으로 업데이트합니다.

cd python-docs-samples/
git pull origin master

다음 명령어를 실행하여 적절한 디렉터리로 변경합니다.

cd python-docs-samples/composer/rest

Python 코드를 실행하여 클라이언트 ID를 가져옵니다. 이때 프로젝트 이름은 <your-project-id>로, 이전에 만든 Composer 환경의 위치는 <your-composer-location>로, 이전에 만든 Composer 환경의 이름은 <your-composer-environment>로 대체합니다.

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

예를 들어 프로젝트 이름이 my-project이고 Composer 위치가 us-central1이며 환경 이름이 my-composer이면 명령어는 다음과 같습니다.

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py는 다음을 실행합니다.

  • Google Cloud로 인증
  • 리디렉션 URI를 가져오기 위해 Airflow 웹 서버에 인증되지 않은 HTTP 요청을 보냅니다.
  • 해당 리디렉션에서 client_id 쿼리 매개변수를 추출합니다.
  • 사용할 수 있도록 출력합니다.

클라이언트 ID가 명령줄에 출력되며 다음과 같이 표시됩니다.

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. 함수 만들기

Cloud Shell에서 다음 명령어를 실행하여 필요한 샘플 코드가 포함된 저장소를 클론합니다.

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

필요한 디렉터리로 변경하고 다음 단계를 완료하는 동안 Cloud Shell을 열어 두세요.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

탐색 메뉴를 클릭한 다음 'Cloud Functions'를 클릭하여 Google Cloud Functions 페이지로 이동합니다.

페이지 상단의 '함수 만들기'를 클릭합니다.

함수 이름을 'my-function'으로 지정하고 메모리를 기본값인 256MB로 둡니다.

트리거를 'Cloud Storage'로 설정하고 이벤트 유형을 'Finalize/Create'로 그대로 두고 Cloud Storage 버킷 만들기 단계에서 만든 버킷으로 이동합니다.

소스 코드를 '인라인 편집기'로 설정하고 런타임을 'Node.js 8'로 설정합니다.

Cloud Shell에서 다음 명령어를 실행합니다. 그러면 Cloud Shell 편집기에서 index.js와 package.json이 열립니다.

cloudshell edit index.js package.json

package.json 탭을 클릭하고 해당 코드를 복사하여 Cloud Functions 인라인 편집기의 package.json 섹션에 붙여넣습니다.

'실행할 함수'를 triggerDag로 설정합니다.

index.js 탭을 클릭하고 코드를 복사하여 Cloud Functions 인라인 편집기의 index.js 섹션에 붙여넣습니다.

PROJECT_ID를 프로젝트 ID로, CLIENT_ID를 클라이언트 ID 가져오기 단계에서 저장한 클라이언트 ID로 변경합니다. 하지만 아직 '만들기'를 클릭하지 마세요. 아직 작성해야 할 항목이 몇 가지 더 있습니다.

Cloud Shell에서 다음 명령어를 실행합니다. 이때 <your-environment-name>을 Composer 환경의 이름으로, <your-composer-region>을 Composer 환경이 있는 리전으로 바꿉니다.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

예를 들어 환경 이름이 my-composer-environment이고 us-central1에 있는 경우 명령어는 다음과 같습니다.

gcloud composer environments describe my-composer-environment --location us-central1

출력은 다음과 같이 표시됩니다.

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

해당 출력에서 airflowUri이라는 변수를 찾습니다. index.js 코드에서 WEBSERVER_ID를 Airflow 웹 서버 ID로 변경합니다. airflowUri 변수에서 끝에 '-tp'가 있는 부분입니다(예: abc123efghi456k-tp).

'더보기' 드롭다운 링크를 클릭한 다음 지리적으로 가장 가까운 리전을 선택합니다.

'실패 시 재시도'를 선택합니다.

'만들기'를 클릭하여 Cloud 함수를 만듭니다.

코드 단계별 처리

index.js에서 복사한 코드는 다음과 같습니다.

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

무슨 일이 일어나고 있는지 살펴보겠습니다. 여기에는 triggerDag, authorizeIap, makeIapPostRequest의 세 가지 함수가 있습니다.

triggerDag는 지정된 Cloud Storage 버킷에 항목을 업로드할 때 트리거되는 함수입니다. 여기에서 PROJECT_ID, CLIENT_ID, WEBSERVER_ID, DAG_NAME과 같은 다른 요청에 사용되는 중요한 변수를 구성합니다. authorizeIapmakeIapPostRequest를 호출합니다.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap는 서비스 계정을 사용하여 Airflow 웹 서버를 보호하는 프록시에 요청을 하고 makeIapPostRequest를 인증하는 데 사용될 ID 토큰에 대해 JWT를 '교환'합니다.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest는 Airflow 웹 서버를 호출하여 composer_sample_trigger_response_dag.를 트리거합니다. DAG 이름은 url 매개변수와 함께 전달된 Airflow 웹 서버 URL에 삽입되고 idTokenauthorizeIap 요청에서 획득한 토큰입니다.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. DAG 설정

Cloud Shell에서 샘플 워크플로가 있는 디렉터리로 변경합니다. 이는 클라이언트 ID 가져오기 단계에서 GitHub에서 다운로드한 python-docs-samples의 일부입니다.

cd
cd python-docs-samples/composer/workflows

Composer에 DAG 업로드

다음 명령어를 사용하여 샘플 DAG를 Composer 환경의 DAG 스토리지 버킷에 업로드합니다. 여기서 <environment_name>은 Composer 환경의 이름이고 <location>은 환경이 있는 리전의 이름입니다. trigger_response_dag.py은 사용할 DAG입니다.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

예를 들어 Composer 환경의 이름이 my-composer이고 위치가 us-central1인 경우 명령어는 다음과 같습니다.

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

DAG 단계별 실행

trigger_response.py의 DAG 코드는 다음과 같습니다.

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args 섹션에는 Apache Airflow의 BaseOperator 모델에 필요한 기본 인수가 포함되어 있습니다. Apache Airflow DAG에는 이러한 매개변수가 포함된 섹션이 표시됩니다. owner은 현재 Composer Example로 설정되어 있지만 원하는 경우 자신의 이름으로 변경할 수 있습니다. depends_on_past를 보면 이 DAG가 이전 DAG에 종속되지 않음을 알 수 있습니다. email, email_on_failure, email_on_retry 세 개의 이메일 섹션은 이 DAG의 상태에 따라 이메일 알림이 수신되지 않도록 설정되어 있습니다. retries가 1로 설정되어 있으므로 DAG는 한 번만 다시 시도하며 retry_delay에 따라 5분 후에 다시 시도합니다. start_date는 일반적으로 schedule_interval (나중에 설정)와 함께 DAG가 실행되어야 하는 시기를 지정하지만 이 DAG의 경우에는 관련이 없습니다. 2017년 1월 1일로 설정되어 있지만 과거 날짜로 설정할 수도 있습니다.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG 섹션은 실행될 DAG를 구성합니다. 작업 ID composer_sample_trigger_response_dag, default_args 섹션의 기본 인수, 가장 중요한 Noneschedule_interval로 실행됩니다. Cloud 함수로 이 특정 DAG를 트리거하므로 schedule_intervalNone로 설정됩니다. 따라서 default_argsstart_date는 관련이 없습니다.

실행되면 DAG는 print_gcs_info 변수에 지정된 대로 구성을 출력합니다.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. 함수 테스트

Composer 환경을 열고 환경 이름이 있는 행에서 Airflow 링크를 클릭합니다.

이름을 클릭하여 composer_sample_trigger_response_dag을 엽니다. 아직 DAG 실행을 트리거하지 않았으므로 현재 DAG 실행 증거는 없습니다.이 DAG가 표시되지 않거나 클릭할 수 없는 경우 잠시 기다렸다가 페이지를 새로고침하세요.

별도의 탭을 열고 이전에 만들고 Cloud 함수의 트리거로 지정한 Cloud Storage 버킷에 파일을 업로드합니다. 콘솔을 통해 또는 gsutil 명령어를 사용하여 이 작업을 수행할 수 있습니다.

Airflow UI가 있는 탭으로 돌아가 그래프 보기를 클릭합니다.

녹색으로 윤곽이 표시된 print_gcs_info 작업을 클릭합니다.

메뉴 오른쪽 상단에서 '로그 보기'를 클릭합니다.

로그에는 Cloud Storage 버킷에 업로드한 파일에 관한 정보가 표시됩니다.

축하합니다. Node.js 및 Google Cloud Functions를 사용하여 Airflow DAG를 트리거했습니다.

7. 삭제

이 빠른 시작에서 사용한 리소스 비용이 GCP 계정에 청구되지 않도록 다음을 수행합니다.

  1. (선택사항) 데이터를 저장하려면 Cloud Composer 환경의 Cloud Storage 버킷과 이 빠른 시작을 위해 만든 스토리지 버킷에서 데이터를 다운로드합니다.
  2. 환경 및 사용자가 만든 환경의 Cloud Storage 버킷을 삭제합니다.
  3. Cloud Composer 환경을 삭제합니다. 환경을 삭제해도 환경의 스토리지 버킷은 삭제되지 않습니다.
  4. (선택사항) 서버리스 컴퓨팅을 사용하면 매월 처음 2백만 건의 호출은 무료이며 함수를 0으로 확장하면 요금이 청구되지 않습니다 (자세한 내용은 가격 책정 참고). 하지만 Cloud 함수를 삭제하려면 함수 개요 페이지의 오른쪽 상단에 있는 '삭제'를 클릭하여 삭제하세요.

4fe11e1b41b32ba2.png

선택적으로 프로젝트를 삭제할 수도 있습니다.

  1. GCP 콘솔에서 프로젝트 페이지로 이동합니다.
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 상자에 프로젝트 ID를 입력한 다음 종료를 클릭하여 프로젝트를 삭제합니다.