Node.JS 및 Google Cloud Functions로 DAG 트리거

1. 소개

Apache Airflow는 정기적으로 DAG를 실행하도록 설계되었지만 Cloud Storage 버킷 변경이나 Cloud Pub/Sub로 푸시된 메시지와 같은 이벤트에 대한 응답으로 DAG를 트리거할 수도 있습니다. 이를 위해 Cloud Functions로 Cloud Composer DAG를 트리거할 수 있습니다.

이 실습의 예시에서는 Cloud Storage 버킷에서 변경사항이 발생할 때마다 간단한 DAG를 실행합니다. 이 DAG는 BashOperator를 사용하여 Cloud Storage 버킷에 업로드된 항목에 대한 변경 정보를 출력하는 bash 명령어를 실행합니다.

이 실습을 시작하기 전에 Cloud Composer 소개Cloud Functions 시작하기 Codelab을 완료하는 것이 좋습니다. Cloud Composer 소개 Codelab에서 Composer 환경을 만든 경우 이 실습에서는 해당 환경을 사용할 수 있습니다.

빌드할 항목

이 Codelab에서 학습할 내용은 다음과 같습니다.

  1. Google Cloud Storage에 파일을 업로드하면
  2. Node.JS 런타임을 사용하여 Google Cloud 함수 트리거
  3. 이 함수는 Google Cloud Composer에서 DAG를 실행합니다.
  4. 그러면 Google Cloud Storage 버킷에 변경사항을 출력하는 간단한 bash 명령어가 실행됩니다.

1d3d3736624a923f.png

학습할 내용

  • Google Cloud Functions + Node.js를 사용하여 Apache Airflow DAG를 트리거하는 방법

필요한 항목

  • GCP 계정
  • JavaScript에 관한 기본 이해
  • Cloud Composer/Airflow, Cloud Functions에 관한 기본 지식
  • CLI 명령어 사용에 익숙해지세요.

2. GCP 설정

프로젝트 선택 또는 만들기

Google Cloud Platform 프로젝트를 선택하거나 만듭니다. 새 프로젝트를 만드는 경우 여기에 안내된 단계를 따르세요.

이후 단계에서 사용하게 될 프로젝트 ID를 기록해 둡니다.

새 프로젝트를 만드는 경우 만들기 페이지의 프로젝트 이름 바로 아래에 프로젝트 ID가 있습니다.

이미 프로젝트를 만들었다면 콘솔 홈페이지의 프로젝트 정보 카드에서 ID를 찾을 수 있습니다.

API 사용 설정

Cloud Composer, Google Cloud Functions, Cloud ID 및 Google Identity and Access Management (IAM) API를 사용 설정합니다.

Composer 환경 만들기

다음 구성으로 Cloud Composer 환경을 만듭니다.

  • 이름: my-composer-environment
  • 위치: 지리적으로 가장 가까운 모든 위치
  • 영역: 해당 리전의 모든 영역

다른 모든 구성은 기본값으로 유지할 수 있습니다. '만들기'를 클릭합니다. Composer 환경의 이름과 위치를 기록해 둡니다.이후 단계에서 이 이름이 필요합니다.

Cloud Storage 버킷 만들기

프로젝트에서 다음 구성으로 Cloud Storage 버킷을 만듭니다.

  • 이름: <your-project-id>
  • 기본 스토리지 클래스: 멀티 리전
  • 위치: 사용 중인 Cloud Composer 리전과 지리적으로 가장 가까운 위치
  • 액세스 제어 모델: 객체 수준 및 버킷 수준 권한 설정

'만들기'를 누릅니다. 준비가 되면 이후 단계를 위해 Cloud Storage 버킷의 이름을 적어둡니다.

3. Google Cloud Functions (GCF) 설정

GCF를 설정하기 위해 Google Cloud Shell에서 명령어를 실행합니다.

Google Cloud는 gcloud 명령줄 도구를 사용하여 노트북에서 원격으로 작동할 수 있지만, 이 Codelab에서는 클라우드에서 실행되는 명령줄 환경인 Google Cloud Shell을 사용합니다.

이 Debian 기반 가상 머신에는 필요한 모든 개발 도구가 로드되어 있습니다. 영구적인 5GB 홈 디렉토리를 제공하고 Google Cloud에서 실행되므로 네트워크 성능과 인증이 크게 개선됩니다. 즉, 이 Codelab에 필요한 것은 브라우저뿐입니다(Chromebook에서도 작동 가능).

Google Cloud Shell을 활성화하려면 개발자 콘솔에서 오른쪽 상단에 있는 버튼을 클릭합니다. 환경을 프로비저닝하고 연결하는 데 몇 분 정도 소요됩니다.

Cloud Functions 서비스 계정에 blob 서명 권한 부여

GCF에서 Airflow 웹 서버를 보호하는 프록시인 Cloud IAP에 인증하려면 Appspot 서비스 계정 GCF에 Service Account Token Creator 역할을 부여해야 합니다. Cloud Shell에서 다음 명령어를 실행하고 <your-project-id>를 프로젝트 이름으로 바꾸면 됩니다.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

예를 들어 프로젝트 이름이 my-project이면 명령어는 다음과 같습니다.

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

클라이언트 ID 가져오기

Cloud IAP에 인증하는 토큰을 작성하려면 함수에 Airflow 웹 서버를 보호하는 프록시의 클라이언트 ID가 필요합니다. Cloud Composer API는 이 정보를 직접 제공하지 않습니다. 대신 Airflow 웹 서버에 인증되지 않은 요청을 하고 리디렉션 URL에서 클라이언트 ID를 캡처하세요. Cloud Shell로 Python 파일을 실행하여 클라이언트 ID를 캡처해 보겠습니다.

Cloud Shell에서 다음 명령어를 실행하여 GitHub에서 필요한 코드를 다운로드합니다.

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

이 디렉터리가 이미 존재하여 오류가 발생한 경우 다음 명령어를 실행하여 디렉터리를 최신 버전으로 업데이트합니다.

cd python-docs-samples/
git pull origin master

다음을 실행하여 적절한 디렉터리로 변경합니다.

cd python-docs-samples/composer/rest

Python 코드를 실행하여 클라이언트 ID를 가져옵니다. 이때 프로젝트 이름을 <your-project-id>로 대체하고, 이전에 만든 Composer 환경의 위치를 <your-composer-location>로, 이전에 생성한 Composer 환경의 이름을 <your-composer-environment>로 대체합니다.

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

예를 들어 프로젝트 이름이 my-project, Composer 위치가 us-central1, 환경 이름이 my-composer이면 명령어는 다음과 같습니다.

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py는 다음을 실행합니다.

  • Google Cloud로 인증
  • 리디렉션 URI를 가져오기 위해 Airflow 웹 서버에 인증되지 않은 HTTP 요청을 전송합니다.
  • 해당 리디렉션에서 client_id 쿼리 매개변수를 추출합니다.
  • 인쇄하여 사용할 수 있습니다.

클라이언트 ID는 명령줄에 다음과 같이 출력됩니다.

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. 함수 만들기

Cloud Shell에서 다음을 실행하여 필요한 샘플 코드로 저장소를 클론합니다.

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

필요한 디렉터리로 변경하고 다음 몇 단계를 완료하는 동안 Cloud Shell을 열어 둡니다.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

탐색 메뉴를 클릭한 후 'Cloud Functions'를 클릭하여 Google Cloud Functions 페이지로 이동합니다.

'함수 만들기'를 클릭합니다. (페이지 상단에서)

함수 이름을 'my-function'으로 지정합니다. 메모리는 기본값인 256MB로 둡니다.

트리거를 'Cloud Storage'로 설정하고 이벤트 유형은 '완료/만들기'로 두고 Cloud Storage 버킷 만들기 단계에서 만든 버킷으로 이동합니다.

소스 코드를 '인라인 편집기'로 설정된 상태로 둡니다. 런타임을 'Node.js 8'로 설정합니다.

Cloud Shell에서 다음 명령어를 실행합니다. Cloud Shell 편집기에 index.js 및 package.json이 열립니다.

cloudshell edit index.js package.json

package.json 탭을 클릭하고 코드를 복사하여 Cloud Functions 인라인 편집기의 package.json 섹션에 붙여넣습니다.

'실행할 함수' 설정 triggerDag로 변경

index.js 탭을 클릭하고 코드를 복사하여 Cloud Functions 인라인 편집기의 index.js 섹션에 붙여넣습니다.

PROJECT_ID를 프로젝트 ID로, CLIENT_ID를 클라이언트 ID 가져오기 단계에서 저장한 클라이언트 ID로 변경합니다. 'Create'를 클릭하지 마세요 하지만 아직 몇 가지 더 채워야 할 것이 있습니다.

Cloud Shell에서 다음 명령어를 실행합니다. 이때 <your-environment-name>을 바꿉니다. Composer 환경의 이름과 <your-composer-region>을 Composer 환경이 위치한 리전으로 바꿉니다.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

예를 들어 환경 이름이 my-composer-environment이고 us-central1에 있는 경우 명령어는 다음과 같습니다.

gcloud composer environments describe my-composer-environment --location us-central1

출력은 다음과 같이 표시됩니다.

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

이 출력에서 airflowUri라는 변수를 찾습니다. index.js 코드에서 WEBSERVER_ID를 Airflow 웹 서버 ID로 변경합니다. 이는 airflowUri 변수의 일부로서 '-tp'가 포함됩니다. 끝(예: abc123efghi456k-tp)

'더보기' 드롭다운 링크를 클릭한 다음 지리적으로 가장 가까운 지역을 선택하세요.

'실패 시 다시 시도'를 선택합니다.

'만들기'를 클릭합니다. Cloud 함수를 만들 수 있습니다

단계별 코드 실행

index.js에서 복사한 코드는 다음과 같습니다.

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

그 이유를 살펴보도록 하겠습니다. 여기에는 triggerDag, authorizeIap, makeIapPostRequest의 세 가지 함수가 있습니다.

triggerDag는 지정된 Cloud Storage 버킷에 항목을 업로드할 때 트리거되는 함수입니다. 여기서 다른 요청에 사용되는 중요한 변수(예: PROJECT_ID, CLIENT_ID, WEBSERVER_ID, DAG_NAME)를 구성합니다. authorizeIapmakeIapPostRequest를 호출합니다.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap는 서비스 계정을 사용하고 '교환'하여 Airflow 웹 서버를 보호하는 프록시에 요청합니다. makeIapPostRequest 인증에 사용될 ID 토큰의 JWT입니다.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest는 Airflow 웹 서버를 호출하여 composer_sample_trigger_response_dag.를 트리거합니다. DAG 이름은 url 매개변수로 전달된 Airflow 웹 서버 URL에 삽입되고 idTokenauthorizeIap 요청에서 가져온 토큰입니다.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. DAG 설정

Cloud Shell에서 샘플 워크플로가 있는 디렉터리로 변경합니다. 이 파일은 클라이언트 ID 가져오기 단계에서 GitHub에서 다운로드한 python-docs-samples에 포함되어 있습니다.

cd
cd python-docs-samples/composer/workflows

Composer에 DAG 업로드

다음 명령어를 사용하여 샘플 DAG를 Composer 환경의 DAG 스토리지 버킷에 업로드합니다. 여기서 <environment_name>은 Composer 환경의 이름이고 <location>은 Composer 환경이 위치한 리전의 이름입니다. trigger_response_dag.py는 작업할 DAG입니다.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

예를 들어 Composer 환경의 이름이 my-composer이고 us-central1에 있다면 명령어는 다음과 같습니다.

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

DAG 살펴보기

trigger_response.py의 DAG 코드는 다음과 같습니다.

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args 섹션에는 Apache Airflow의 BaseOperator 모델에 필요한 기본 인수가 포함되어 있습니다. 모든 Apache Airflow DAG에서 이러한 매개변수가 포함된 이 섹션을 볼 수 있습니다. 현재 ownerComposer Example로 설정되어 있지만 원하는 경우 이름으로 변경할 수 있습니다. depends_on_past는 이 DAG가 이전 DAG에 종속되지 않음을 보여줍니다. 이 DAG의 상태에 따라 이메일 알림이 오지 않도록 이메일 섹션 email, email_on_failure, email_on_retry가 설정됩니다. retries가 1로 설정되어 있으므로 DAG는 5분 후에 retry_delay별로 한 번만 재시도합니다. start_date는 일반적으로 DAG를 schedule_interval (나중에 설정)와 함께 실행해야 하는 시점을 나타내지만 이 DAG의 경우에는 관련이 없습니다. 2017년 1월 1일로 설정되어 있지만 과거 날짜로 설정할 수 있습니다.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG 섹션은 실행할 DAG를 구성합니다. 이 작업은 default_args 섹션의 기본 인수인 작업 ID composer_sample_trigger_response_dag로 실행되며 가장 중요하게는 schedule_intervalNone로 실행됩니다. Cloud 함수를 사용하여 이 특정 DAG를 트리거하므로 schedule_intervalNone로 설정됩니다. 이러한 이유로 default_argsstart_date은 관련이 없습니다.

DAG는 실행되면 print_gcs_info 변수에 명시된 대로 구성을 출력합니다.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. 함수 테스트

Composer 환경을 열고 환경 이름이 있는 행에서 Airflow 링크를 클릭합니다.

이름을 클릭하여 composer_sample_trigger_response_dag를 엽니다. 아직 DAG가 실행되도록 트리거하지 않았으므로 지금은 DAG 실행의 증거가 없습니다.이 DAG가 표시되거나 클릭할 수 없는 경우 잠시 기다렸다가 페이지를 새로고침하세요.

별도의 탭을 열고 앞서 만들고 Cloud 함수의 트리거로 지정한 Cloud Storage 버킷에 파일을 업로드합니다. 콘솔 또는 gsutil 명령어를 통해 이 작업을 수행할 수 있습니다.

Airflow UI가 있는 탭으로 돌아가서 그래프 뷰를 클릭합니다.

녹색 윤곽선으로 표시된 print_gcs_info 작업을 클릭합니다.

'로그 보기'를 클릭합니다. 을 클릭합니다.

로그에는 Cloud Storage 버킷에 업로드한 파일에 대한 정보가 표시됩니다.

축하합니다. Node.js 및 Google Cloud Functions를 사용하여 Airflow DAG를 트리거했습니다.

7. 삭제

이 빠른 시작에서 사용한 리소스 비용이 GCP 계정에 청구되지 않도록 다음을 수행합니다.

  1. (선택사항) 데이터를 저장하려면 Cloud Composer 환경의 Cloud Storage 버킷과 이 빠른 시작을 위해 만든 스토리지 버킷에서 데이터를 다운로드합니다.
  2. 내가 만든 환경의 Cloud Storage 버킷 삭제
  3. Cloud Composer 환경을 삭제합니다. 환경을 삭제해도 환경의 스토리지 버킷은 삭제되지 않습니다.
  4. (선택사항) 서버리스 컴퓨팅의 경우 매월 처음 200만 개의 호출은 무료이며, 함수를 0으로 조정해도 요금이 부과되지 않습니다 (자세한 내용은 가격 책정 참고). 하지만 Cloud 함수를 삭제하려면 '삭제'를 클릭하면 됩니다. (함수의 개요 페이지 오른쪽 상단)

4fe11e1b41b32ba2.png

필요한 경우 프로젝트를 삭제할 수도 있습니다.

  1. GCP Console에서 프로젝트 페이지로 이동합니다.
  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 상자에 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.