הפעלת DAG באמצעות Node.JS ו-Google Cloud Functions

1. מבוא

Apache Airflow תוכננה להריץ DAGs לפי לוח זמנים קבוע, אבל אפשר גם להפעיל DAG בתגובה לאירועים, כמו שינוי בקטגוריה של Cloud Storage או הודעה שמועברת ל-Cloud Pub/Sub. כדי לעשות זאת, Cloud Functions יכול להפעיל DAG של Cloud Composer.

הדוגמה בשיעור ה-Lab הזה מריצה DAG פשוט בכל פעם שמתרחש שינוי בקטגוריה של Cloud Storage. ה-DAG הזה משתמש ב-BashOperator כדי להריץ פקודת bash שמדפיסה את פרטי השינוי של התוכן שהועלה לקטגוריה של Cloud Storage.

לפני התחלת שיעור ה-Lab הזה, מומלץ להשלים את ה-codelabs הקוד מבוא ל-Cloud Composer ותחילת העבודה עם Cloud Functions. אם יוצרים סביבת Composer במבוא ל-Cloud Composer, תוכלו להשתמש בסביבה הזו בשיעור ה-Lab הזה.

מה תבנו

ב-Codelab הזה:

  1. מעלים קובץ ל-Google Cloud Storage,
  2. הפעלת פונקציה של Google Cloud באמצעות זמן הריצה של Node.JS
  3. הפונקציה הזו תריץ DAG ב-Google Cloud Composer
  4. מריצה פקודת bash פשוטה שמדפיסה את השינוי לקטגוריה של Google Cloud Storage

1d3d3736624a923f.png

הנושאים שתלמד

  • איך מפעילים DAG של Apache Airflow באמצעות Google Cloud Functions + Node.js

מה צריך להכין

  • חשבון GCP
  • הבנה בסיסית של JavaScript
  • ידע בסיסי ב-Cloud Composer/Airflow ו-Cloud Functions
  • נוחות השימוש בפקודות ב-CLI

2. הגדרת GCP

בוחרים או יוצרים את הפרויקט

בוחרים או יוצרים פרויקט ב-Google Cloud Platform. אם יוצרים פרויקט חדש, צריך לפעול לפי השלבים שמפורטים כאן.

מומלץ לרשום את מזהה הפרויקט, שבו תשתמשו בשלבים הבאים.

אם יוצרים פרויקט חדש, מזהה הפרויקט מופיע ממש מתחת לשם הפרויקט בדף היצירה.

אם כבר יצרתם פרויקט, תוכלו למצוא את המזהה בדף הבית של המסוף בכרטיס 'פרטי הפרויקט'

הפעלת ממשקי ה-API

הפעלת ה-API של Cloud Composer, Google Cloud Functions, Cloud Identity ו-Google Identity and Access Management (IAM).

יצירת סביבת Composer

יוצרים סביבה של Cloud Composer עם ההגדרות הבאות:

  • שם: my-composer-Environment
  • מיקום: המיקום הקרוב ביותר למיקום הגאוגרפי שלכם
  • תחום (zone): כל תחום באותו אזור.

כל שאר ההגדרות יישארו כברירת המחדל. לוחצים על 'יצירה' בחלק התחתון.רושמים בצד את השם ואת המיקום של סביבת ה-Composer – תצטרכו אותם בשלבים הבאים.

יצירת קטגוריה של Cloud Storage

בפרויקט שלכם, יוצרים קטגוריה של Cloud Storage עם ההגדרות הבאות:

  • שם: <your-project-id>
  • סוג האחסון (storage class) המוגדר כברירת מחדל: מרובה אזורים
  • מיקום: המיקום הקרוב ביותר גיאוגרפית לאזור Cloud Composer שבו אתם משתמשים
  • מודל בקרת גישה: הגדרת הרשאות ברמת האובייקט וברמת הקטגוריה

לוחצים על 'יצירה' כשתהיו מוכנים, הקפידו לרשום את שם הקטגוריה של Cloud Storage לשלבים הבאים.

3. הגדרת Google Cloud Functions (GCF)

כדי להגדיר את GCF, נריץ פקודות ב-Google Cloud Shell.

אומנם אפשר להפעיל את Google Cloud מרחוק מהמחשב הנייד באמצעות כלי שורת הפקודה gcloud, אבל ב-Codelab הזה נשתמש ב-Google Cloud Shell, סביבת שורת הפקודה שפועלת ב-Cloud.

המכונה הווירטואלית הזו שמבוססת על Debian נטענת עם כל הכלים למפתחים שדרושים לכם. יש בה ספריית בית בנפח מתמיד של 5GB, והיא פועלת ב-Google Cloud כדי לשפר משמעותית את ביצועי הרשת והאימות. כלומר, כל מה שדרוש ל-Codelab הזה הוא דפדפן (כן, הוא פועל ב-Chromebook).

כדי להפעיל את Google Cloud Shell, לוחצים ממסוף המפתחים על הלחצן בצד שמאל למעלה (ההקצאה וההתחברות לסביבה צריכות להימשך כמה דקות):

הענקת הרשאות חתימה מסוג blob לחשבון השירות של Cloud Functions

כדי ש-GCF יוכל לבצע אימות ל-Cloud IAP, שהוא שרת ה-proxy שמגן על שרת האינטרנט Airflow, צריך להקצות לחשבון השירות של Apps מדובר ב-GCF Service Account Token Creator. כדי לעשות את זה, מריצים את הפקודה הבאה ב-Cloud Shell, מחליפים את שם הפרויקט ב-<your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

לדוגמה, אם הפרויקט נקרא my-project, הפקודה תהיה

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

אחזור מזהה הלקוח

כדי ליצור אסימון לאימות ב-Cloud IAP, הפונקציה דורשת את מזהה הלקוח של שרת ה-proxy שמגן על שרת האינטרנט Airflow. ה-Cloud Composer API לא מספק את המידע הזה באופן ישיר. במקום זאת, צריך לשלוח בקשה לא מאומתת לשרת האינטרנט של Airflow ולתעד את מזהה הלקוח מכתובת ה-URL להפניה אוטומטית. לשם כך נריץ קובץ python באמצעות Cloud Shell כדי לתעד את מזהה הלקוח.

מריצים את הפקודה הבאה ב-Cloud Shell כדי להוריד את הקוד הנדרש מ-GitHub.

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

אם מופיעה שגיאה כי הספרייה הזו כבר קיימת, צריך לעדכן אותה לגרסה האחרונה על ידי הרצת הפקודה הבאה.

cd python-docs-samples/
git pull origin master

כדי להחליף לספרייה המתאימה, מריצים

cd python-docs-samples/composer/rest

מריצים את קוד python כדי לקבל את מזהה הלקוח, תוך החלפת שם הפרויקט ב-<your-project-id>, המיקום של סביבת ה-Composer שיצרתם קודם בשביל <your-composer-location> ואת השם של סביבת Composer שיצרתם קודם עבור <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

לדוגמה, אם שם הפרויקט הוא my-project, מיקום ה-Composer הוא us-central1 ושם הסביבה שלכם הוא my-composer, הפקודה תהיה

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py עושה את הפעולות הבאות:

  • אימות באמצעות Google Cloud
  • שולחת בקשת HTTP לא מאומתת לשרת האינטרנט של Airflow כדי לקבל את ה-URI של ההפניה האוטומטית
  • משלים את פרמטר השאילתה client_id מההפניה הזו
  • מדפיסים את המסמך כדי להשתמש בו

מזהה הלקוח שלך יודפס בשורת הפקודה ויראה בערך כך:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. יצירת פונקציה

משכפלים ב-Cloud Shell את המאגר עם הקוד לדוגמה הדרוש באמצעות הרצת

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

עוברים לספרייה הנדרשת ומשאירים את Cloud Shell פתוח במהלך השלבים הבאים

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

כדי להיכנס לדף Google Cloud Functions, לוחצים על תפריט הניווט ואז על Cloud Functions.

לוחצים על 'יצירת פונקציה' בראש הדף

צריך לתת לפונקציה my-function ומשאירים את ערך ברירת המחדל של 256MB.

מגדירים את הטריגר בתור Cloud Storage, משאירים את סוג האירוע בתור Finalize/Create, ומעיינים לקטגוריה שיצרתם בשלב 'יצירת קטגוריה של Cloud Storage'.

לא מגדירים את קוד המקור כ'עורך מוטבע' ומגדירים את זמן הריצה ל-"Node.js 8"

ב-Cloud Shell, מריצים את הפקודה הבאה. הפעולה הזו תפתח את index.js ו-package.json ב-Cloud Shell Editor

cloudshell edit index.js package.json

לוחצים על הכרטיסייה package.json, מעתיקים את הקוד ומדביקים אותו בקטע package.json בעורך המוטבע של Cloud Functions

מגדירים את הפונקציה לביצוע כדי להפעיל Dag

לוחצים על הכרטיסייה index.js, מעתיקים את הקוד ומדביקים אותו בקטע index.js בעורך המוטבע של Cloud Functions

משנים את PROJECT_ID למזהה הפרויקט, CLIENT_ID למזהה הלקוח ששמרתם בשלב 'קבלת מזהה הלקוח'. לא ללחוץ על 'יצירה' אבל יש עוד כמה דברים להשלים!

ב-Cloud Shell, מריצים את הפקודה הבאה שמחליפה את הערך <your-environment-name> בשם של סביבת ה-Composer ובשם <your-composer-region> עם האזור שבו נמצאת סביבת ה-Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

לדוגמה, אם שם הסביבה הוא my-composer-environment והיא נמצאת ב-us-central1, הפקודה תהיה

gcloud composer environments describe my-composer-environment --location us-central1

הפלט אמור להיראות ככה:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

בפלט הזה, מחפשים את המשתנה בשם airflowUri. בקוד Index.js, משנים את WEBSERVER_ID כך שיהיה מזהה שרת האינטרנט Airflow. זהו החלק במשתנה airflowUri שיקבל '-tp' בסוף, למשל, abc123efghi456k-tp

לוחצים על הסמל 'עוד' ובוחרים את האזור הקרוב ביותר למיקום הגאוגרפי שלכם

מסמנים את האפשרות 'ניסיון חוזר נכשל'

לוחצים על 'יצירה' ליצירת הפונקציה של Cloud Functions

מעבר לקוד

הקוד שהעתקתם מ-index.js ייראה בערך כך:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

בואו נראה מה קורה. יש כאן שלוש פונקציות: triggerDag, authorizeIap ו-makeIapPostRequest

triggerDag היא הפונקציה שמופעלת כשמעלים פריט לקטגוריה הייעודית של Cloud Storage. זה המקום שבו אנחנו מגדירים משתנים חשובים שנמצאים בשימוש בבקשות אחרות, כמו PROJECT_ID, CLIENT_ID, WEBSERVER_ID ו-DAG_NAME. קוראים לפונקציה authorizeIap ול-makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap שולח בקשה לשרת ה-proxy שמגן על שרת האינטרנט Airflow, באמצעות חשבון שירות ו'החלפה' JWT לאסימון מזהה שישמש לאימות makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest מבצע קריאה לשרת האינטרנט של Airflow כדי להפעיל את composer_sample_trigger_response_dag.. שם ה-DAG מוטמע בכתובת ה-URL של שרת האינטרנט של Airflow שמועברת עם הפרמטר url. השדה idToken הוא האסימון שקיבלנו בבקשה authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. הגדרת ה-DAG

ב-Cloud Shell, עוברים לספרייה שכוללת את תהליכי העבודה לדוגמה. זהו חלק מדוגמאות python-docs שהורדת מ-GitHub בשלב 'קבלת מזהה הלקוח'.

cd
cd python-docs-samples/composer/workflows

העלאת ה-DAG ל-Composer

מעלים את ה-DAG לדוגמה אל קטגוריית האחסון של DAG בסביבת ה-Composer באמצעות הפקודה הבאה, שבה <environment_name> הוא השם של סביבת Composer ו-<location> הוא שם האזור שבו הוא נמצא. trigger_response_dag.py הוא ה-DAG שאיתו נעבוד.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

לדוגמה, אם סביבת ה-Composer נקראת my-composer והיא ממוקמת ב-us-central1, הפקודה תהיה

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

עוברים דרך ה-DAG

קוד ה-DAG ב-trigger_response.py נראה כך

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

המקטע default_args מכיל את ארגומנטים ברירת המחדל כפי שנדרש על ידי המודל BaseOperator ב-Apache Airflow. הקטע הזה יופיע עם הפרמטרים האלה בכל DAG של Apache Airflow. השדה owner מוגדר כרגע ל-Composer Example, אבל אתה יכול לשנות אותו כך שיהיה השם שלך. לפי depends_on_past, ה-DAG הזה לא תלוי במדדי DAG קודמים. שלושת הקטעים של האימייל: email, email_on_failure ו-email_on_retry מוגדרים כך שלא יישלחו התראות באימייל לפי הסטטוס של ה-DAG הזה. ה-DAG ינסה שוב פעם אחת, מאחר שב-retries מוגדר הערך 1 ויעשה זאת לאחר חמש דקות, בכל retry_delay. בדרך כלל, ה-start_date קובע מתי DAG ירוץ, בשילוב עם schedule_interval (מוגדר מאוחר יותר), אבל במקרה של ה-DAG הזה, הוא לא רלוונטי. התאריך שהחל ממנו הוא 1 בינואר 2017, אבל אפשר להגדיר אותו לכל תאריך שחל בעבר.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

הקטע with airflow.DAG מגדיר את ה-DAG שיופעל. היא תרוץ עם מזהה המשימה composer_sample_trigger_response_dag, ארגומנטים ברירת המחדל מהקטע default_args, והכי חשוב, עם schedule_interval של None. השדה schedule_interval מוגדר לערך None כי אנחנו מפעילים את ה-DAG הספציפי הזה באמצעות הפונקציה של Cloud Functions. לכן השדה start_date בdefault_args לא רלוונטי.

כשהוא מופעל, ה-DAG מדפיס את ההגדרה שלו, כפי שמוסבר במשתנה print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. בדיקת הפונקציה

פותחים את סביבת ה-Composer, ובשורה עם שם הסביבה לוחצים על הקישור של Airflow.

כדי לפתוח את הקובץ composer_sample_trigger_response_dag, לוחצים על השם שלו. נכון לעכשיו לא תהיה כל הוכחה להפעלות DAG, כי עדיין לא הפעלנו את ה-DAG.אם ה-DAG הזה לא גלוי או לא קליקבילי, צריך להמתין דקה ולרענן את הדף.

פותחים כרטיסייה נפרדת ומעלים קובץ כלשהו לקטגוריה של Cloud Storage שיצרתם קודם ושהגדרתם כטריגר לפונקציה של Cloud Functions. אפשר לעשות זאת דרך המסוף או באמצעות פקודת gsutil.

חוזרים לכרטיסייה עם ממשק המשתמש של Airflow ולוחצים על 'תצוגת תרשים'

לוחצים על המשימה print_gcs_info, שאמורה להיות מסומנת בירוק

לוחצים על 'הצגת היומן'. בפינה השמאלית העליונה של התפריט

ביומנים יופיע מידע על הקובץ שהעליתם לקטגוריה של Cloud Storage.

מעולה! הפעלת DAG של Airflow באמצעות Node.js ו-Google Cloud Functions!

7. הסרת המשאבים

כדי להימנע מצבירת חיובים בחשבון GCP עבור המשאבים שבהם השתמשתם במדריך למתחילים הזה:

  1. (אופציונלי) כדי לשמור את הנתונים, מורידים את הנתונים מהקטגוריה של Cloud Storage לסביבת Cloud Composer ואת קטגוריית האחסון שיצרתם במדריך למתחילים.
  2. מוחקים את הקטגוריה של Cloud Storage של הסביבה ושיצרתם
  3. מחיקה של סביבת Cloud Composer. שימו לב שמחיקת הסביבה לא מוחקת את קטגוריית האחסון של הסביבה.
  4. (אופציונלי) כשמשתמשים במחשוב ללא שרת (serverless computing), 2 מיליון ההפעלות הראשונות בחודש הן בחינם, וכשמגדילה את הפונקציה לאפס, לא נחייב אתכם (פרטים נוספים זמינים במאמר בנושא תמחור). עם זאת, אם רוצים למחוק את הפונקציה של Cloud Functions, לוחצים על "DELETE". בפינה השמאלית העליונה של דף הסקירה הכללית של הפונקציה

4fe11e1b41b32ba2.png

אפשר גם למחוק את הפרויקט:

  1. במסוף GCP, עוברים לדף Projects.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.