راه اندازی DAG با Node.JS و توابع ابری Google

1. مقدمه

Apache Airflow برای اجرای DAG ها در یک برنامه منظم طراحی شده است، اما شما همچنین می توانید DAG ها را در پاسخ به رویدادهایی مانند تغییر در یک سطل ذخیره سازی ابری یا ارسال پیام به Cloud Pub/Sub فعال کنید. برای انجام این کار، DAG های Cloud Composer را می توان توسط Cloud Functions راه اندازی کرد.

مثال در این آزمایشگاه هر بار که تغییری در یک سطل ذخیره‌سازی ابری رخ می‌دهد، یک DAG ساده را اجرا می‌کند. این DAG از BashOperator برای اجرای دستور bash استفاده می‌کند که اطلاعات تغییر را در مورد آنچه در سطل ذخیره‌سازی ابری آپلود شده است چاپ می‌کند.

قبل از شروع این آزمایشگاه، توصیه می شود مقدمه ای برای Cloud Composer و شروع به کار با کدهای Cloud Functions را تکمیل کنید. اگر یک محیط Composer را در Intro to Cloud Composer codelab ایجاد کنید، می توانید از آن محیط در این آزمایشگاه استفاده کنید.

آنچه شما خواهید ساخت

در این نرم افزار کد، شما:

  1. یک فایل در Google Cloud Storage آپلود کنید، که این کار را انجام خواهد داد
  2. یک تابع Google Cloud را با استفاده از زمان اجرا Node.JS راه اندازی کنید
  3. این تابع یک DAG را در Google Cloud Composer اجرا می کند
  4. این یک دستور bash ساده را اجرا می‌کند که تغییر را در سطل Google Cloud Storage چاپ می‌کند

1d3d3736624a923f.png

آنچه شما یاد خواهید گرفت

  • نحوه راه اندازی Apache Airflow DAG با استفاده از Google Cloud Functions + Node.js

آنچه شما نیاز دارید

  • حساب GCP
  • درک اولیه جاوا اسکریپت
  • دانش اولیه Cloud Composer/Airflow و Cloud Functions
  • راحتی با استفاده از دستورات CLI

2. راه اندازی GCP

پروژه را انتخاب یا ایجاد کنید

یک پروژه Google Cloud Platform را انتخاب یا ایجاد کنید. اگر در حال ایجاد یک پروژه جدید هستید، مراحل موجود در اینجا را دنبال کنید.

شناسه پروژه خود را یادداشت کنید که در مراحل بعدی از آن استفاده خواهید کرد.

اگر در حال ایجاد یک پروژه جدید هستید، شناسه پروژه درست در زیر نام پروژه در صفحه ایجاد پیدا می شود

اگر قبلاً پروژه ای ایجاد کرده اید، می توانید شناسه را در صفحه اصلی کنسول در کارت اطلاعات پروژه پیدا کنید

API ها را فعال کنید

Cloud Composer، Google Cloud Functions و Cloud Identity و Google Identity and Access Management (IAM) API را فعال کنید.

ایجاد محیط آهنگساز

یک محیط Cloud Composer با پیکربندی زیر ایجاد کنید :

  • نام: my-composer-environment
  • موقعیت مکانی: هر مکانی که از نظر جغرافیایی به شما نزدیکتر باشد
  • Zone: هر منطقه در آن منطقه

تمام تنظیمات دیگر می توانند در حالت پیش فرض خود باقی بمانند. روی "ایجاد" در پایین کلیک کنید. نام و مکان Composer Environment خود را یادداشت کنید - در مراحل بعدی به آنها نیاز خواهید داشت.

ایجاد سطل ذخیره سازی ابری

در پروژه خود، یک سطل ذخیره سازی ابری با پیکربندی زیر ایجاد کنید :

  • نام: <Your-project-id>
  • کلاس ذخیره سازی پیش فرض: چند منطقه ای
  • مکان: هر مکانی که از نظر جغرافیایی به منطقه Cloud Composer که استفاده می‌کنید نزدیک‌ترین مکان باشد
  • مدل کنترل دسترسی: مجوزهای سطح شی و سطح سطل را تنظیم کنید

وقتی آماده شدید، «ایجاد» را فشار دهید. مطمئن شوید که نام سطل فضای ذخیره‌سازی ابری خود را برای مراحل بعدی یادداشت کرده‌اید.

3. راه اندازی توابع ابری Google (GCF)

برای راه اندازی GCF، دستورات را در Google Cloud Shell اجرا خواهیم کرد.

در حالی که Google Cloud را می‌توان از راه دور از لپ‌تاپ با استفاده از ابزار خط فرمان gcloud اداره کرد، در این کد لبه از Google Cloud Shell استفاده می‌کنیم، یک محیط خط فرمان که در Cloud اجرا می‌شود.

این ماشین مجازی مبتنی بر دبیان با تمام ابزارهای توسعه که شما نیاز دارید بارگذاری شده است. این یک فهرست اصلی 5 گیگابایتی دائمی را ارائه می دهد و در Google Cloud اجرا می شود و عملکرد و احراز هویت شبکه را بسیار افزایش می دهد. این بدان معنی است که تمام چیزی که برای این کد لبه نیاز دارید یک مرورگر است (بله، روی کروم بوک کار می کند).

برای فعال کردن Google Cloud Shell، از کنسول توسعه‌دهنده روی دکمه سمت راست بالا کلیک کنید (تنها چند لحظه طول می‌کشد تا محیط را تهیه کرده و به آن متصل شوید):

مجوزهای امضای blob را به حساب خدمات Cloud Functions اعطا کنید

برای اینکه GCF در Cloud IAP ، پروکسی که از وب سرور Airflow محافظت می‌کند، احراز هویت کند، باید به حساب Appspot Service GCF نقش Service Account Token Creator را اعطا کنید. این کار را با اجرای دستور زیر در Cloud Shell خود انجام دهید و نام پروژه خود را با <your-project-id> جایگزین کنید.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

به عنوان مثال، اگر پروژه شما my-project نام داشته باشد، دستور شما خواهد بود

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

دریافت شناسه مشتری

برای ساختن یک توکن برای احراز هویت در Cloud IAP ، این تابع به شناسه مشتری پروکسی نیاز دارد که از وب سرور Airflow محافظت می کند. Cloud Composer API این اطلاعات را مستقیماً ارائه نمی کند. در عوض، یک درخواست احراز هویت نشده به وب سرور Airflow ارسال کنید و شناسه مشتری را از URL تغییر مسیر بگیرید. ما این کار را با اجرای یک فایل پایتون با استفاده از Cloud Shell برای گرفتن شناسه مشتری انجام خواهیم داد.

با اجرای دستور زیر در Cloud Shell خود کد لازم را از GitHub دانلود کنید

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

اگر به دلیل وجود این دایرکتوری با خطا مواجه شدید، با اجرای دستور زیر آن را به آخرین نسخه به روز کنید

cd python-docs-samples/
git pull origin master

با اجرا به دایرکتوری مناسب تغییر دهید

cd python-docs-samples/composer/rest

کد پایتون را برای دریافت شناسه مشتری خود اجرا کنید و نام پروژه خود را به جای <your-project-id> ، مکان محیط Composer که قبلاً برای <your-composer-location> ایجاد کردید و نام محیط Composer را جایگزین کنید. قبلا برای <your-composer-environment> ایجاد شده است

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

برای مثال، اگر نام پروژه شما my-project ، مکان Composer شما us-central1 و نام محیط شما my-composer باشد، دستور شما این خواهد بود.

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py کارهای زیر را انجام می دهد:

  • با Google Cloud احراز هویت می شود
  • برای دریافت URI تغییر مسیر، یک درخواست HTTP تأیید نشده به سرور وب Airflow می‌کند.
  • پارامتر پرس و جو client_id را از آن تغییر مسیر استخراج می کند
  • آن را برای استفاده شما چاپ می کند

شناسه مشتری شما در خط فرمان چاپ می شود و چیزی شبیه به این خواهد بود:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. عملکرد خود را ایجاد کنید

در Cloud Shell خود، با اجرا کردن، مخزن را با کد نمونه لازم کلون کنید

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

دایرکتوری لازم را تغییر دهید و در حین انجام چند مرحله بعدی، Cloud Shell خود را باز بگذارید

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

با کلیک بر روی منوی ناوبری و سپس کلیک بر روی "توابع ابری" به صفحه Google Cloud Functions بروید.

در بالای صفحه بر روی "CREATE FUNCTION" کلیک کنید

نام تابع خود را "my-function" بگذارید و حافظه را روی حالت پیش فرض، 256 مگابایت بگذارید.

Trigger را روی "Cloud Storage" تنظیم کنید، نوع رویداد را به عنوان "Finalize/Create" بگذارید و به سطلی که در مرحله Create a Cloud Storage Bucket ایجاد کردید، بروید.

کد منبع را روی "Inline Editor" بگذارید و زمان اجرا را روی "Node.js 8" تنظیم کنید.

در Cloud Shell خود دستور زیر را اجرا کنید. با این کار index.js و package.json در ویرایشگر پوسته ابری باز می شود

cloudshell edit index.js package.json

روی تب package.json کلیک کنید، آن کد را کپی کرده و در قسمت package.json ویرایشگر درون خطی توابع ابری قرار دهید.

"Function to Execute" را روی triggerDag تنظیم کنید

روی تب index.js کلیک کنید، کد را کپی کنید و آن را در بخش index.js ویرایشگر درون خطی Cloud Functions قرار دهید.

PROJECT_ID به شناسه پروژه خود، CLIENT_ID را به شناسه مشتری که از مرحله Getting the Client ID ذخیره کرده‌اید، تغییر دهید. هنوز روی "ایجاد" کلیک نکنید - هنوز چند چیز دیگر برای پر کردن وجود دارد!

در Cloud Shell خود، دستور زیر را اجرا کنید و <your-environment-name> را با نام محیط Composer خود و <your-composer-region> را با منطقه ای که محیط Composer شما در آن قرار دارد جایگزین کنید.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

به عنوان مثال، اگر محیط شما my-composer-environment نام دارد و در us-central1 قرار دارد دستور شما به این صورت خواهد بود.

gcloud composer environments describe my-composer-environment --location us-central1

خروجی باید چیزی شبیه به این باشد:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

در آن خروجی، به دنبال متغیری به نام airflowUri بگردید. در کد index.js خود، WEBSERVER_ID به شناسه وب سرور Airflow تغییر دهید - این بخشی از متغیر airflowUri است که در انتهای آن یک '-tp' خواهد داشت، برای مثال، abc123efghi456k-tp

روی پیوند کشویی «بیشتر» کلیک کنید، سپس نزدیکترین منطقه جغرافیایی به خود را انتخاب کنید

"امتحان مجدد در صورت شکست" را علامت بزنید

برای ایجاد Cloud Function خود روی "Create" کلیک کنید

گام برداشتن از طریق کد

کدی که از index.js کپی کردید چیزی شبیه به این خواهد بود:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

بیایید نگاهی بیندازیم که چه خبر است. سه تابع در اینجا وجود دارد: triggerDag ، authorizeIap ، و makeIapPostRequest

triggerDag تابعی است که وقتی چیزی را در سطل ذخیره سازی ابری تعیین شده آپلود می کنیم، راه اندازی می شود. اینجاست که ما متغیرهای مهم مورد استفاده در سایر درخواست‌ها را پیکربندی می‌کنیم، مانند PROJECT_ID ، CLIENT_ID ، WEBSERVER_ID ، و DAG_NAME . authorizeIap و makeIapPostRequest را فراخوانی می کند.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap با استفاده از یک حساب سرویس و "مبادله" JWT برای شناسه شناسه ای که برای احراز هویت makeIapPostRequest استفاده می شود، درخواستی از پروکسی می کند که از وب سرور Airflow محافظت می کند.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest با وب سرور Airflow تماس می گیرد تا composer_sample_trigger_response_dag. نام DAG در URL وب سرور Airflow که با پارامتر url ارسال شده است، جاسازی شده است، و idToken نشانه ای است که در درخواست authorizeIap به دست آوردیم.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. DAG خود را راه اندازی کنید

در Cloud Shell خود، به دایرکتوری با نمونه گردش کار تغییر دهید. این بخشی از نمونه‌های python-docs است که از GitHub در مرحله Getting the Client Id دانلود کرده‌اید.

cd
cd python-docs-samples/composer/workflows

DAG را در Composer آپلود کنید

DAG نمونه را با دستور زیر در سطل ذخیره سازی DAG محیط Composer خود آپلود کنید، جایی که <environment_name> نام محیط Composer شما و <location> نام منطقه ای است که در آن قرار دارد. trigger_response_dag.py DAG است که ما با آن کار خواهیم کرد.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

برای مثال، اگر محیط Composer شما my-composer نام داشت و در us-central1 قرار داشت، دستور شما به این صورت خواهد بود.

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

قدم گذاشتن از طریق DAG

کد DAG در trigger_response.py شبیه این است

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

بخش default_args حاوی آرگومان‌های پیش‌فرض مطابق با مدل BaseOperator در Apache Airflow است. این بخش را با این پارامترها در هر Apache Airflow DAG مشاهده خواهید کرد. owner در حال حاضر روی Composer Example تنظیم شده است، اما در صورت تمایل می توانید نام خود را تغییر دهید. depends_on_past به ما نشان می دهد که این DAG به هیچ DAG قبلی وابسته نیست. سه بخش ایمیل، email ، email_on_failure ، و email_on_retry به گونه ای تنظیم شده اند که هیچ اعلان ایمیلی بر اساس وضعیت این DAG وارد نشود. DAG فقط یک بار امتحان می کند، زیرا retries روی 1 تنظیم شده است، و این کار را پس از پنج دقیقه، در هر retry_delay انجام می دهد. start_date معمولاً دیکته می‌کند که یک DAG چه زمانی باید اجرا شود، در رابطه با schedule_interval آن (تعداد بعدی) اما در مورد این DAG، مربوط نیست. برای تاریخ 1 ژانویه 2017 تنظیم شده است، اما می تواند در هر تاریخ گذشته تنظیم شود.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

بخش with airflow.DAG DAG را که اجرا خواهد شد پیکربندی می کند. با شناسه وظیفه composer_sample_trigger_response_dag ، آرگومان‌های پیش‌فرض از بخش default_args ، و مهم‌تر از همه، با یک schedule_interval None اجرا می‌شود. schedule_interval روی None تنظیم شده است زیرا ما این DAG خاص را با عملکرد Cloud خود راه اندازی می کنیم. به همین دلیل است که start_date در default_args مرتبط نیست.

هنگامی که اجرا می شود، DAG پیکربندی خود را همانطور که در متغیر print_gcs_info دیکته شده است، چاپ می کند.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. عملکرد خود را آزمایش کنید

Composer Environment خود را باز کنید و در ردیف با نام محیط خود، روی پیوند Airflow کلیک کنید

composer_sample_trigger_response_dag را با کلیک بر روی نام آن باز کنید. در حال حاضر هیچ مدرکی دال بر اجرای DAG وجود نخواهد داشت، زیرا ما هنوز DAG را برای اجرا راه اندازی نکرده ایم. اگر این DAG قابل مشاهده یا کلیک نیست، یک دقیقه صبر کنید و صفحه را بازخوانی کنید.

یک برگه جداگانه باز کنید و هر فایلی را در سطل Cloud Storage که قبلا ایجاد کرده اید و به عنوان محرک عملکرد Cloud خود مشخص کرده اید، آپلود کنید. می توانید این کار را از طریق کنسول یا با استفاده از دستور gsutil انجام دهید.

با رابط کاربری Airflow خود به برگه برگردید و روی Graph View کلیک کنید

روی کار print_gcs_info کلیک کنید، که باید با رنگ سبز مشخص شود

روی "View Log" در سمت راست بالای منو کلیک کنید

در گزارش‌ها، اطلاعاتی درباره فایلی که در سطل فضای ذخیره‌سازی ابری خود آپلود کرده‌اید، مشاهده خواهید کرد.

تبریک می گویم! شما به تازگی یک Airflow DAG را با استفاده از Node.js و Google Cloud Functions راه اندازی کرده اید!

7. پاکسازی

برای جلوگیری از تحمیل هزینه به حساب GCP خود برای منابع استفاده شده در این شروع سریع:

  1. (اختیاری) برای ذخیره داده های خود، داده ها را از سطل ذخیره سازی Cloud برای محیط Cloud Composer و سطل ذخیره سازی که برای این شروع سریع ایجاد کرده اید دانلود کنید .
  2. سطل Cloud Storage را برای محیطی که ایجاد کرده اید حذف کنید
  3. محیط Cloud Composer را حذف کنید . توجه داشته باشید که حذف محیط شما، سطل ذخیره سازی محیط را حذف نمی کند.
  4. (اختیاری) با محاسبات بدون سرور، 2 میلیون فراخوان اول در ماه رایگان است، و هنگامی که عملکرد خود را صفر کنید، هزینه ای از شما دریافت نمی شود (برای جزئیات بیشتر به قیمت مراجعه کنید). با این حال، اگر می‌خواهید عملکرد Cloud خود را حذف کنید، این کار را با کلیک کردن روی «DELETE» در سمت راست بالای صفحه نمای کلی برای عملکرد خود انجام دهید.

4fe11e1b41b32ba2.png

همچنین می توانید به صورت اختیاری پروژه را حذف کنید:

  1. در کنسول GCP، به صفحه پروژه ها بروید.
  2. در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و روی Delete کلیک کنید.
  3. در کادر، ID پروژه را تایپ کنید و سپس بر روی Shut down کلیک کنید تا پروژه حذف شود.