Memicu DAG dengan Node.JS dan Google Cloud Functions

1. Pengantar

Apache Airflow dirancang untuk menjalankan DAG sesuai jadwal rutin, tetapi Anda juga dapat memicu DAG sebagai respons terhadap peristiwa, seperti perubahan pada bucket Cloud Storage atau pesan yang dikirim ke Cloud Pub/Sub. Untuk melakukannya, DAG Cloud Composer dapat dipicu oleh Cloud Functions.

Contoh di lab ini menjalankan DAG sederhana setiap kali terjadi perubahan di bucket Cloud Storage. DAG ini menggunakan BashOperator untuk menjalankan perintah bash yang mencetak info perubahan tentang apa yang diupload ke bucket Cloud Storage.

Sebelum memulai lab ini, sebaiknya Anda menyelesaikan codelab Pengantar Cloud Composer dan Mulai Menggunakan Cloud Functions. Jika membuat Lingkungan Composer di codelab Pengantar Cloud Composer, Anda dapat menggunakan lingkungan tersebut di lab ini.

Yang Akan Anda Bangun

Dalam codelab ini, Anda akan:

  1. Upload File ke Google Cloud Storage, yang akan
  2. Picu Google Cloud Function menggunakan runtime Node.JS
  3. Fungsi ini akan menjalankan DAG di Google Cloud Composer
  4. Tindakan ini akan menjalankan perintah bash sederhana yang mencetak perubahan ke bucket Google Cloud Storage.

1d3d3736624a923f.pngS

Yang Akan Anda Pelajari

  • Cara memicu DAG Apache Airflow menggunakan Google Cloud Functions + Node.js

Yang Anda Butuhkan

  • Akun GCP
  • Pemahaman dasar tentang JavaScript
  • Pengetahuan dasar tentang Cloud Composer/Airflow, dan Cloud Functions
  • Memahami penggunaan perintah CLI

2. Menyiapkan GCP

Pilih atau Buat Project

Pilih atau buat Project Google Cloud Platform. Jika Anda membuat project baru, ikuti langkah-langkah yang ada di sini.

Catat Project ID Anda, yang akan digunakan pada langkah-langkah berikutnya.

Jika Anda membuat project baru, project ID dapat ditemukan tepat di bawah Nama Project pada halaman pembuatan

Jika sudah membuat project, Anda dapat menemukan ID-nya di halaman beranda konsol di kartu Info Project

Mengaktifkan API

Aktifkan Cloud Composer, Google Cloud Functions, dan Cloud Identity serta Google Identity and Access Management API (IAM).

Membuat Lingkungan Composer

Buat lingkungan Cloud Composer dengan konfigurasi berikut:

  • Nama: my-composer-environment
  • Lokasi: Di mana pun lokasi geografis yang terdekat dengan Anda
  • Zona: Zona apa pun di region tersebut

Semua konfigurasi lainnya dapat tetap pada setelan default-nya. Klik "Buat" di bagian bawah.Catat nama dan lokasi Lingkungan Composer - Anda akan membutuhkannya pada langkah berikutnya.

Membuat Bucket Cloud Storage

Di project Anda, buat bucket Cloud Storage dengan konfigurasi berikut:

  • Nama: <your-project-id>
  • Kelas penyimpanan default: Multi-regional
  • Lokasi: Mana pun lokasi yang secara geografis paling dekat dengan region Cloud Composer yang Anda gunakan
  • Model Kontrol Akses: Menetapkan izin tingkat objek dan tingkat bucket

Tekan "Buat" jika sudah siap, pastikan Anda mencatat nama bucket Cloud Storage untuk langkah-langkah berikutnya.

3. Menyiapkan Google Cloud Functions (GCF)

Untuk menyiapkan GCF, kita akan menjalankan perintah di Google Cloud Shell.

Meskipun Google Cloud dapat dioperasikan secara jarak jauh dari laptop Anda menggunakan alat command line gcloud, dalam codelab ini kita akan menggunakan Google Cloud Shell, yaitu lingkungan command line yang berjalan di Cloud.

Mesin virtual berbasis Debian ini memuat semua alat pengembangan yang akan Anda perlukan. Layanan ini menawarkan direktori beranda tetap sebesar 5 GB dan beroperasi di Google Cloud, sehingga sangat meningkatkan performa dan autentikasi jaringan. Ini berarti bahwa semua yang Anda perlukan untuk codelab ini adalah browser (ya, ini berfungsi di Chromebook).

Untuk mengaktifkan Google Cloud Shell, dari Konsol Play, klik tombol di sisi kanan atas (hanya perlu waktu beberapa saat untuk melakukan penyediaan dan terhubung ke lingkungan):

Memberikan izin penandatanganan blob ke Akun Layanan Cloud Functions

Agar GCF dapat mengautentikasi ke Cloud IAP, proxy yang melindungi server web Airflow, Anda perlu memberikan peran Service Account Token Creator kepada GCF Akun Layanan Appspot. Lakukan hal tersebut dengan menjalankan perintah berikut di Cloud Shell Anda, dengan mengganti nama project Anda dengan <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Misalnya, jika project Anda bernama my-project, perintah Anda akan menjadi

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Mendapatkan Client-ID

Guna membuat token untuk melakukan autentikasi ke Cloud IAP, fungsi ini memerlukan client ID proxy yang melindungi server web Airflow. Cloud Composer API tidak menyediakan informasi ini secara langsung. Sebagai gantinya, buat permintaan yang tidak diautentikasi ke server web Airflow dan ambil client ID dari URL alihan. Kita akan melakukannya dengan menjalankan file python menggunakan Cloud Shell untuk mengambil client ID.

Download kode yang diperlukan dari GitHub dengan menjalankan perintah berikut di Cloud Shell Anda

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Jika Anda menerima error karena direktori ini sudah ada, update ke versi terbaru dengan menjalankan perintah berikut

cd python-docs-samples/
git pull origin master

Ubah ke direktori yang sesuai dengan menjalankan

cd python-docs-samples/composer/rest

Jalankan kode python untuk mendapatkan client ID Anda, dengan mengganti nama project Anda dengan <your-project-id>, lokasi lingkungan Composer yang Anda buat sebelumnya untuk <your-composer-location>, dan nama lingkungan Composer yang Anda buat sebelumnya untuk <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Misalnya, jika nama project Anda adalah my-project, lokasi Composer adalah us-central1, dan nama lingkungan Anda adalah my-composer, perintah Anda adalah

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py melakukan hal berikut ini:

  • Melakukan autentikasi dengan Google Cloud
  • Membuat permintaan HTTP yang tidak diautentikasi ke server web Airflow untuk mendapatkan URI pengalihan
  • Mengekstrak parameter kueri client_id dari pengalihan tersebut
  • Mencetaknya untuk Anda gunakan

Client ID Anda akan dicetak pada command line dan akan terlihat seperti ini:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Membuat Fungsi

Di Cloud Shell, clone repo dengan kode contoh yang diperlukan dengan menjalankan

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Buka direktori yang diperlukan dan biarkan Cloud Shell Anda tetap terbuka selagi Anda menyelesaikan beberapa langkah berikutnya

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Buka halaman Google Cloud Functions dengan mengklik Navigation menu, lalu mengklik "Cloud Functions"

Klik "CREATE FUNCTION" di bagian atas halaman

Beri nama fungsi "my-function" dan biarkan memori sesuai default, 256 MB.

Tetapkan Pemicu ke "Cloud Storage", biarkan jenis Peristiwa sebagai "Selesaikan/Buat", dan jelajahi bucket yang Anda buat pada langkah Membuat Bucket Cloud Storage.

Biarkan Kode Sumber ditetapkan ke "Inline Editor" dan tetapkan runtime ke "Node.js 8"

Jalankan perintah berikut di Cloud Shell Anda. Tindakan ini akan membuka index.js dan package.json di Cloud Shell Editor

cloudshell edit index.js package.json

Klik tab package.json, salin kode tersebut dan tempelkan ke bagian package.json pada editor inline Cloud Functions

Tetapkan "Function to Execute" untuk triggerDag

Klik tab index.js, salin kode, dan tempelkan ke bagian index.js pada editor inline Cloud Functions

Ubah PROJECT_ID menjadi project ID Anda, CLIENT_ID menjadi client ID yang Anda simpan dari langkah Mendapatkan Client ID. JANGAN klik "Buat" namun - masih ada beberapa hal yang harus diisi!

Di Cloud Shell, jalankan perintah berikut, dengan mengganti <your-environment-name> dengan nama lingkungan Composer dan <your-composer-region> dengan region tempat Lingkungan Composer Anda berada.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Misalnya, jika lingkungan Anda bernama my-composer-environment dan terletak di us-central1, perintah Anda akan menjadi

gcloud composer environments describe my-composer-environment --location us-central1

Outputnya kurang lebih akan seperti ini:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Pada output tersebut, cari variabel bernama airflowUri. Di kode index.js, ubah WEBSERVER_ID menjadi ID server web Airflow - ini merupakan bagian dari variabel airflowUri yang akan memiliki '-tp' di akhir, misalnya, abc123efghi456k-tp

Klik tombol "Lainnya" link dropdown, lalu pilih Wilayah secara geografis terdekat dengan Anda

Centang "Coba Lagi jika Gagal"

Klik "Buat" untuk membuat Cloud Function

Melewati Kode

Kode yang Anda salin dari index.js akan terlihat seperti ini:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Mari kita lihat apa yang terjadi. Ada tiga fungsi di sini: triggerDag, authorizeIap, dan makeIapPostRequest

triggerDag adalah fungsi yang dipicu saat kita mengupload sesuatu ke bucket Cloud Storage yang ditetapkan. Di sini, kita mengonfigurasi variabel penting yang digunakan dalam permintaan lain, seperti PROJECT_ID, CLIENT_ID, WEBSERVER_ID, dan DAG_NAME. Class ini memanggil authorizeIap dan makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap membuat permintaan ke proxy yang melindungi server web Airflow, menggunakan akun layanan dan "bertukar" JWT untuk token ID yang akan digunakan untuk mengautentikasi makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest melakukan panggilan ke server web Airflow untuk memicu composer_sample_trigger_response_dag.. Nama DAG disematkan di URL server web Airflow yang diteruskan dengan parameter url, dan idToken adalah token yang kita peroleh dalam permintaan authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Menyiapkan DAG

Di Cloud Shell, beralihlah ke direktori yang berisi contoh alur kerja. Ini adalah bagian dari python-docs-samples yang Anda download dari GitHub di langkah Getting the Client ID.

cd
cd python-docs-samples/composer/workflows

Mengupload DAG ke Composer

Upload contoh DAG ke bucket penyimpanan DAG lingkungan Composer Anda menggunakan perintah berikut, dengan <environment_name> adalah nama lingkungan Composer Anda dan <location> adalah nama region tempatnya berada. trigger_response_dag.py adalah DAG yang akan kita gunakan.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Misalnya, jika lingkungan Composer Anda bernama my-composer dan terletak di us-central1, perintah Anda akan menjadi

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Melewati DAG

Kode DAG di trigger_response.py terlihat seperti ini

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Bagian default_args berisi argumen default seperti yang diwajibkan oleh model BaseOperator di Apache Airflow. Anda akan melihat bagian ini dengan parameter ini di DAG Apache Airflow. owner saat ini disetel ke Composer Example, tetapi Anda dapat mengubahnya menjadi nama Anda jika mau. depends_on_past menunjukkan kepada kami bahwa DAG ini tidak bergantung pada DAG sebelumnya. Tiga bagian email, yaitu email, email_on_failure, dan email_on_retry diatur sehingga tidak ada notifikasi email yang masuk berdasarkan status DAG ini. DAG hanya akan mencoba lagi sekali, karena retries disetel ke 1, dan akan melakukannya setelah lima menit, per retry_delay. start_date biasanya menentukan kapan DAG harus dijalankan, bersama dengan schedule_interval-nya (ditetapkan nanti), tetapi dalam kasus DAG ini, tidak relevan. Tanggal ini ditetapkan ke 1 Januari 2017, tetapi dapat ditetapkan ke tanggal yang sudah berlalu.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

Bagian with airflow.DAG mengonfigurasi DAG yang akan dijalankan. Fungsi ini akan dijalankan dengan ID tugas composer_sample_trigger_response_dag, argumen default dari bagian default_args, dan yang paling penting, dengan schedule_interval dari None. schedule_interval disetel ke None karena kita memicu DAG khusus ini dengan Cloud Function. Itulah sebabnya start_date di default_args tidak relevan.

Saat dieksekusi, DAG akan mencetak konfigurasinya, seperti yang ditentukan dalam variabel print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Menguji Fungsi Anda

Buka Composer Environment dan di baris yang berisi nama lingkungan Anda, klik link Airflow

Buka composer_sample_trigger_response_dag dengan mengklik namanya. Saat ini, tidak akan ada bukti bahwa DAG dijalankan karena kita belum memicu DAG untuk dijalankan.Jika DAG ini tidak terlihat atau dapat diklik, tunggu sebentar lalu muat ulang halaman.

Buka tab terpisah dan upload file apa pun ke bucket Cloud Storage yang Anda buat sebelumnya dan tentukan sebagai pemicu untuk Cloud Function Anda. Anda dapat melakukannya melalui Konsol atau menggunakan perintah gsutil.

Kembali ke tab dengan UI Airflow Anda dan klik Graph View

Klik tugas print_gcs_info, yang harus ditandai dengan warna hijau

Klik "Lihat Log" di kanan atas menu

Di log, Anda akan melihat info tentang file yang Anda upload ke bucket Cloud Storage.

Selamat! Anda baru saja memicu Airflow DAG menggunakan Node.js dan Google Cloud Functions.

7. Pembersihan

Agar tidak menimbulkan biaya pada akun GCP Anda untuk resource yang digunakan di panduan memulai ini:

  1. (Opsional) Untuk menyimpan data, download data dari bucket Cloud Storage untuk lingkungan Cloud Composer dan bucket penyimpanan yang Anda buat untuk panduan memulai ini.
  2. Hapus bucket Cloud Storage untuk lingkungan dan yang Anda buat
  3. Hapus lingkungan Cloud Composer. Perhatikan bahwa menghapus lingkungan tidak menghapus bucket penyimpanan untuk lingkungan tersebut.
  4. (Opsional) Dengan komputasi serverless, 2 juta pemanggilan pertama per bulan gratis. Anda tidak akan dikenai biaya jika melakukan penskalaan hingga nol (lihat harga untuk mengetahui detail selengkapnya). Namun, jika Anda ingin menghapus Cloud Function, lakukan dengan mengklik "HAPUS" di kanan atas halaman ringkasan untuk fungsi Anda

4fe11e1b41b32ba2.pngS

Anda juga dapat menghapus project secara opsional:

  1. Di GCP Console, buka halaman Project.
  2. Dalam daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada kotak, ketik project ID, lalu klik Shut down untuk menghapus project.