הפעלת DAG באמצעות Node.JS ו-Google Cloud Functions

1. מבוא

‫Apache Airflow מיועד להפעלת DAGs בלוח זמנים קבוע, אבל אפשר גם להפעיל DAGs בתגובה לאירועים, כמו שינוי בקטגוריה של Cloud Storage או הודעה שנשלחת ל-Cloud Pub/Sub. כדי לעשות את זה, אפשר להפעיל DAG ב-Cloud Composer באמצעות Cloud Functions.

בדוגמה הזו מריצים DAG פשוט בכל פעם שמתרחש שינוי בקטגוריה של Cloud Storage. ה-DAG הזה משתמש ב-BashOperator כדי להריץ פקודת Bash שמדפיסה את פרטי השינוי לגבי מה שהועלה לקטגוריה של Cloud Storage.

לפני שמתחילים את שיעור ה-Lab הזה, מומלץ להשלים את שיעורי ה-Codelab‏ Intro to Cloud Composer ו-Getting Started with Cloud Functions. אם יצרתם סביבת Composer ב-Codelab בנושא מבוא ל-Cloud Composer, תוכלו להשתמש בסביבה הזו בשיעור Lab הזה.

מה תבנו

ב-Codelab הזה תלמדו:

  1. העלאת קובץ ל-Google Cloud Storage,
  2. הפעלת פונקציה של Cloud Functions באמצעות סביבת זמן הריצה של Node.JS
  3. הפונקציה הזו תבצע DAG ב-Google Cloud Composer
  4. הפקודה מריצה פקודת Bash פשוטה שמדפיסה את השינוי בקטגוריה של Google Cloud Storage

1d3d3736624a923f.png

הנושאים שתלמד

  • איך מפעילים DAG של Apache Airflow באמצעות Google Cloud Functions + Node.js

מה צריך

  • חשבון GCP
  • הבנה בסיסית של JavaScript
  • ידע בסיסי ב-Cloud Composer/Airflow וב-Cloud Functions
  • נוח לכם להשתמש בפקודות CLI

2. הגדרת GCP

בחירה או יצירה של פרויקט

בוחרים פרויקט קיים או יוצרים פרויקט חדש ב-Google Cloud Platform. אם אתם יוצרים פרויקט חדש, פועלים לפי השלבים שמופיעים כאן.

חשוב לשים לב למזהה הפרויקט, כי תצטרכו להשתמש בו בשלבים הבאים.

אם אתם יוצרים פרויקט חדש, מזהה הפרויקט מופיע ממש מתחת לשם הפרויקט בדף היצירה.

אם כבר יצרתם פרויקט, תוכלו למצוא את המזהה בדף הבית של המסוף בכרטיסיית מידע של הפרויקט.

הפעלת ממשקי ה-API

מפעילים את Cloud Composer,‏ Google Cloud Functions,‏ Cloud Identity ו-Google Identity and Access Management ‏ (IAM) API.

יצירת סביבת Composer

יוצרים סביבת Cloud Composer עם ההגדרות הבאות:

  • שם: my-composer-environment
  • מיקום: כל מיקום שקרוב אליכם מבחינה גיאוגרפית
  • אזור: כל אזור באזור הזה

אפשר להשאיר את כל שאר ההגדרות כברירת מחדל. לוחצים על 'יצירה' בחלק התחתון.חשוב לשמור את השם והמיקום של סביבת Composer, כי תצטרכו אותם בשלבים הבאים.

יצירת קטגוריה של Cloud Storage

בפרויקט, יוצרים קטגוריה של Cloud Storage עם ההגדרות הבאות:

  • שם: <your-project-id>
  • סוג האחסון (storage class) שמוגדר כברירת מחדל: Multi-regional
  • מיקום: כל מיקום שהוא הכי קרוב מבחינה גיאוגרפית לאזור Cloud Composer שבו אתם משתמשים
  • מודל בקרת גישה: הגדרת הרשאות ברמת האובייקט וברמת הקטגוריה

כשמוכנים, לוחצים על 'יצירה'. חשוב לרשום את השם של הקטגוריה של Cloud Storage כדי להשתמש בו בשלבים הבאים.

3. הגדרה של Google Cloud Functions‏ (GCF)

כדי להגדיר את GCF, נריץ פקודות ב-Google Cloud Shell.

אפשר להפעיל את Google Cloud מרחוק מהמחשב הנייד באמצעות כלי שורת הפקודה gcloud, אבל ב-Codelab הזה נשתמש ב-Google Cloud Shell, סביבת שורת פקודה שפועלת בענן.

המכונה הווירטואלית הזו מבוססת על Debian, וטעונים בה כל הכלים הדרושים למפתחים. יש בה ספריית בית בנפח מתמיד של 5GB והיא פועלת ב-Google Cloud, מה שמשפר מאוד את הביצועים והאימות ברשת. כלומר, כל מה שצריך כדי לבצע את ההוראות במאמר הזה הוא דפדפן (כן, זה עובד ב-Chromebook).

כדי להפעיל את Google Cloud Shell, לוחצים על הלחצן בפינה השמאלית העליונה של מסוף המפתחים (הקצאת המשאבים והחיבור לסביבה אמורים להימשך רק כמה רגעים):

מתן הרשאות לחתימה על Blob לחשבון השירות ב-Cloud Functions

כדי ש-GCF יוכל לבצע אימות מול Cloud IAP, שרת ה-proxy שמגן על שרת האינטרנט של Airflow, צריך לתת לחשבון השירות של Appspot את התפקיד Service Account Token Creator. כדי לעשות זאת, מריצים את הפקודה הבאה ב-Cloud Shell, ומחליפים את <your-project-id> בשם הפרויקט.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

לדוגמה, אם שם הפרויקט הוא my-project, הפקודה תהיה

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

איך מקבלים את מזהה הלקוח

כדי ליצור אסימון לאימות ב-Cloud IAP, הפונקציה צריכה את מזהה הלקוח של ה-proxy שמגן על שרת האינטרנט של Airflow. ‫Cloud Composer API לא מספק את המידע הזה ישירות. במקום זאת, שולחים בקשה לא מאומתת לשרת האינטרנט של Airflow ומתעדים את מזהה הלקוח מכתובת ה-URL להפניה אוטומטית. נעשה זאת על ידי הפעלת קובץ Python באמצעות Cloud Shell כדי לתעד את מזהה הלקוח.

מורידים את הקוד הדרוש מ-GitHub על ידי הרצת הפקודה הבאה ב-Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

אם קיבלתם שגיאה כי הספרייה הזו כבר קיימת, צריך להריץ את הפקודה הבאה כדי לעדכן אותה לגרסה העדכנית

cd python-docs-samples/
git pull origin master

כדי לעבור לספרייה המתאימה, מריצים את הפקודה

cd python-docs-samples/composer/rest

מריצים את קוד ה-Python כדי לקבל את מזהה הלקוח. מחליפים את <your-project-id> בשם הפרויקט, את <your-composer-location> במיקום של סביבת Composer שיצרתם קודם ואת <your-composer-environment> בשם של סביבת Composer שיצרתם קודם.

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

לדוגמה, אם שם הפרויקט הוא my-project, מיקום Composer הוא us-central1 ושם הסביבה הוא my-composer, הפקודה תהיה

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py מבצע את הפעולות הבאות:

  • אימות ב-Google Cloud
  • שליחת בקשת HTTP לא מאומתת לשרת האינטרנט של Airflow כדי לקבל את כתובת ה-URI להפניה אוטומטית
  • מחילוץ פרמטר השאילתה client_id מההפניה האוטומטית
  • מדפיס אותו כדי שתוכלו להשתמש בו

מזהה הלקוח יודפס בשורת הפקודה וייראה בערך כך:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. יצירת הפונקציה

ב-Cloud Shell, משכפלים את מאגר ה-repo עם קוד לדוגמה שנדרש על ידי הרצת הפקודה

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

עוברים לספרייה הדרושה ומשאירים את Cloud Shell פתוח בזמן שמבצעים את השלבים הבאים.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

כדי לעבור לדף Cloud Functions ב-Google Cloud, לוחצים על תפריט הניווט ואז על Cloud Functions.

לוחצים על CREATE FUNCTION (יצירת פונקציה) בחלק העליון של הדף.

נותנים לפונקציה את השם my-function ומשאירים את הזיכרון בערך ברירת המחדל, 256MB.

מגדירים את הטריגר ל-Cloud Storage, משאירים את סוג האירוע כ-Finalize/Create (סיום/יצירה) ומעיינים בקטגוריה שיצרתם בשלב 'יצירת קטגוריה של Cloud Storage'.

משאירים את Source Code (קוד מקור) על Inline Editor (עורך מוטבע) ומגדירים את זמן הריצה ל-Node.js 8.

ב-Cloud Shell, מריצים את הפקודה הבאה. הקובץ index.js והקובץ package.json ייפתחו ב-Cloud Shell Editor.

cloudshell edit index.js package.json

לוחצים על הכרטיסייה package.json, מעתיקים את הקוד ומדביקים אותו בקטע package.json של העורך המוטבע של Cloud Functions.

מגדירים את האפשרות Function to Execute (פונקציה להרצה) ל-triggerDag

לוחצים על הכרטיסייה index.js, מעתיקים את הקוד ומדביקים אותו בקטע index.js של העורך המוטמע של Cloud Functions.

מחליפים את PROJECT_ID במזהה הפרויקט ואת CLIENT_ID במזהה הלקוח ששמרתם בשלב 'קבלת מזהה הלקוח'. אל תלחצו על 'יצירה' עדיין – יש עוד כמה פרטים שצריך למלא.

ב-Cloud Shell, מריצים את הפקודה הבאה, ומחליפים את <your-environment-name> בשם סביבת Composer ואת <your-composer-region> באזור שבו נמצאת סביבת Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

לדוגמה, אם הסביבה נקראת my-composer-environment והיא נמצאת ב-us-central1, הפקודה תהיה

gcloud composer environments describe my-composer-environment --location us-central1

הפלט אמור להיראות כך:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

בפלט הזה, מחפשים את המשתנה שנקרא airflowUri. בקוד index.js, משנים את WEBSERVER_ID למזהה של שרת האינטרנט של Airflow – זה החלק של המשתנה airflowUri שיהיה לו ‎'-tp'‎ בסוף, לדוגמה, abc123efghi456k-tp

לוחצים על הקישור לתפריט הנפתח 'עוד' ואז בוחרים את האזור שקרוב אליכם מבחינה גיאוגרפית.

מסמנים את התיבה 'ניסיון חוזר במקרה של כשל'.

לוחצים על 'יצירה' כדי ליצור את הפונקציה של Cloud Functions.

הרצת קוד בשלבים

הקוד שהעתקתם מהקובץ index.js ייראה בערך כך:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

בואו נבדוק מה קורה. יש כאן שלוש פונקציות: triggerDag,‏ authorizeIap ו-makeIapPostRequest

triggerDag היא הפונקציה שמופעלת כשמעלים משהו לקטגוריה המיועדת של Cloud Storage. כאן אנחנו מגדירים משתנים חשובים שמשמשים בבקשות אחרות, כמו PROJECT_ID, CLIENT_ID, WEBSERVER_ID ו-DAG_NAME. הוא מתקשר אל authorizeIap ואל makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap שולח בקשה לשרת ה-proxy שמגן על שרת האינטרנט של Airflow, באמצעות חשבון שירות ו "החלפה" של JWT באסימון מזהה שישמש לאימות של makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest מבצע קריאה לשרת האינטרנט של Airflow כדי להפעיל את composer_sample_trigger_response_dag. שם ה-DAG מוטמע בכתובת ה-URL של שרת האינטרנט של Airflow שמועברת עם הפרמטר url, ו-idToken הוא האסימון שקיבלנו בבקשה authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. הגדרת תרשים DAG

ב-Cloud Shell, עוברים לספרייה עם תהליכי העבודה לדוגמה. הוא חלק מה-python-docs-samples שהורדתם מ-GitHub בשלב 'קבלת מזהה הלקוח'.

cd
cd python-docs-samples/composer/workflows

העלאת ה-DAG ל-Composer

מעלים את ה-DAG לדוגמה אל קטגוריית האחסון של ה-DAG בסביבת Composer באמצעות הפקודה הבאה, שבה <environment_name> הוא שם סביבת Composer ו-<location> הוא שם האזור שבו היא נמצאת. ‫trigger_response_dag.py הוא ה-DAG שבו נשתמש.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

לדוגמה, אם סביבת ה-Composer שלכם נקראת my-composer והיא ממוקמת ב-us-central1, הפקודה תהיה

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

מעבר שלב אחר שלב בתרשים ה-DAG

קוד ה-DAG ב-trigger_response.py נראה כך

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

הקטע default_args מכיל את ארגומנטי ברירת המחדל שנדרשים על ידי המודל BaseOperator ב-Apache Airflow. הקטע הזה עם הפרמטרים האלה יופיע בכל DAG של Apache Airflow. השם owner מוגדר כרגע כComposer Example, אבל אפשר לשנות אותו לשם שלכם אם רוצים. ‫depends_on_past מראה לנו ש-DAG הזה לא תלוי באף DAG קודם. שלושת החלקים של האימייל, email, email_on_failure ו-email_on_retry, מוגדרים כך שלא יתקבלו התראות באימייל על סמך הסטטוס של קבוצת ה-DAG הזו. ה-DAG ינסה לבצע שוב את הפעולה רק פעם אחת, כי הערך של retries מוגדר כ-1, והוא יעשה זאת אחרי חמש דקות, בהתאם לערך של retry_delay. בדרך כלל, start_date קובע מתי DAG צריך לפעול, בשילוב עם schedule_interval (שמוגדר מאוחר יותר), אבל במקרה של DAG הזה, הוא לא רלוונטי. התאריך מוגדר ל-1 בינואר 2017, אבל אפשר להגדיר כל תאריך בעבר.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

בקטע with airflow.DAG מגדירים את ה-DAG שיופעל. היא תופעל עם מזהה המשימה composer_sample_trigger_response_dag, עם ארגומנטים שמוגדרים כברירת מחדל מהקטע default_args, וחשוב מכל, עם schedule_interval של None. הערך של schedule_interval מוגדר ל-None כי אנחנו מפעילים את ה-DAG הספציפי הזה באמצעות פונקציה של Cloud Functions. לכן, ה-start_date ב-default_args לא רלוונטי.

כשהוא מופעל, ה-DAG מדפיס את ההגדרה שלו, כפי שנקבע במשתנה print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. בדיקת הפונקציה

פותחים את סביבת ה-Composer ובשורה עם שם הסביבה, לוחצים על הקישור ל-Airflow.

פותחים את composer_sample_trigger_response_dag על ידי לחיצה על השם שלו. בשלב הזה לא יהיו הוכחות להרצת DAG, כי עוד לא הפעלנו את ה-DAG.אם ה-DAG הזה לא מוצג או שאי אפשר ללחוץ עליו, צריך לחכות דקה ולרענן את הדף.

פותחים כרטיסייה נפרדת ומעלים קובץ כלשהו לקטגוריה של Cloud Storage שיצרתם קודם וציינתם כטריגר לפונקציה של Cloud Functions. אפשר לעשות זאת דרך המסוף או באמצעות פקודת gsutil.

חוזרים לכרטיסייה עם ממשק המשתמש של Airflow ולוחצים על Graph View (תצוגת תרשים).

לוחצים על המשימה print_gcs_info, שאמורה להיות מודגשת בירוק.

לוחצים על 'הצגת היומן' בפינה השמאלית העליונה של התפריט.

בלוגים יופיע מידע על הקובץ שהעליתם לקטגוריה של Cloud Storage.

מעולה! הרצתם עכשיו DAG של Airflow באמצעות Node.js ו-Google Cloud Functions.

7. הסרת המשאבים

כדי להימנע מחיובים בחשבון GCP על המשאבים שבהם השתמשתם במדריך למתחילים הזה:

  1. (אופציונלי) כדי לשמור את הנתונים, מורידים את הנתונים מקטגוריית Cloud Storage של סביבת Cloud Composer ומקטגוריית האחסון שיצרתם עבור המדריך למתחילים הזה.
  2. מוחקים את הקטגוריה של Cloud Storage עבור הסביבה שיצרתם
  3. מחיקת סביבת Cloud Composer. שימו לב: מחיקת הסביבה לא מוחקת את דלי האחסון של הסביבה.
  4. (אופציונלי) במחשוב בלי שרת (serverless computing), שני מיליון ההפעלות הראשונות בחודש הן בחינם, וכשמצמצמים את הפונקציה לאפס לא מחויבים (פרטים נוספים זמינים בתמחור). עם זאת, אם רוצים למחוק את Cloud Function, לוחצים על 'מחיקה' בפינה השמאלית העליונה של דף הסקירה הכללית של הפונקציה.

4fe11e1b41b32ba2.png

אפשר גם למחוק את הפרויקט:

  1. במסוף GCP, נכנסים לדף Projects.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. בתיבה, כותבים את מזהה הפרויקט ולוחצים על Shut down כדי למחוק את הפרויקט.