راه اندازی DAG با Node.JS و توابع ابری Google

۱. مقدمه

آپاچی ایرفلو طوری طراحی شده است که DAGها را طبق یک برنامه منظم اجرا کند، اما شما می‌توانید DAGها را در پاسخ به رویدادهایی مانند تغییر در یک باکت ذخیره‌سازی ابری یا پیامی که به Cloud Pub/Sub ارسال می‌شود، نیز فعال کنید. برای انجام این کار، DAGهای Cloud Composer می‌توانند توسط Cloud Functions فعال شوند.

مثال این آزمایش هر بار که تغییری در یک مخزن ذخیره‌سازی ابری رخ می‌دهد، یک DAG ساده اجرا می‌کند. این DAG از BashOperator برای اجرای یک دستور bash استفاده می‌کند که اطلاعات تغییر مربوط به آنچه در مخزن ذخیره‌سازی ابری آپلود شده است را چاپ می‌کند.

قبل از شروع این آزمایش، توصیه می‌شود که آزمایشگاه‌های کد «مقدمه‌ای بر Cloud Composer» و «شروع به کار با توابع ابری» را تکمیل کنید. اگر در آزمایشگاه کد «مقدمه‌ای بر Cloud Composer» یک محیط Composer ایجاد کنید، می‌توانید از آن محیط در این آزمایش استفاده کنید.

آنچه خواهید ساخت

در این آزمایشگاه کد، شما:

  1. یک فایل را در فضای ذخیره‌سازی ابری گوگل آپلود کنید، که ...
  2. فعال کردن یک تابع Google Cloud با استفاده از زمان اجرای Node.JS
  3. این تابع یک DAG را در Google Cloud Composer اجرا می‌کند.
  4. این یک دستور bash ساده را اجرا می‌کند که تغییر را در مخزن ذخیره‌سازی ابری گوگل چاپ می‌کند.

۱d3d3736624a923f.png

آنچه یاد خواهید گرفت

  • نحوه‌ی راه‌اندازی DAG آپاچی ایرفلو با استفاده از توابع گوگل کلود + نود جی اس

آنچه نیاز دارید

  • حساب GCP
  • درک اولیه از جاوا اسکریپت
  • دانش پایه در مورد Cloud Composer/Airflow و توابع ابری
  • راحتی در استفاده از دستورات CLI

۲. راه‌اندازی GCP

انتخاب یا ایجاد پروژه

یک پروژه پلتفرم ابری گوگل (Google Cloud Platform Project) انتخاب یا ایجاد کنید. اگر در حال ایجاد یک پروژه جدید هستید، مراحل موجود در اینجا را دنبال کنید.

شناسه پروژه خود را یادداشت کنید، که در مراحل بعدی از آن استفاده خواهید کرد.

اگر در حال ایجاد یک پروژه جدید هستید، شناسه پروژه درست زیر نام پروژه در صفحه ایجاد پروژه قرار دارد.

اگر قبلاً پروژه‌ای ایجاد کرده‌اید، می‌توانید شناسه آن را در صفحه اصلی کنسول در کارت اطلاعات پروژه پیدا کنید.

فعال کردن APIها

فعال کردن Cloud Composer، Google Cloud Functions و Cloud Identity و API مدیریت هویت و دسترسی گوگل (IAM).

ایجاد محیط کامپوزر

یک محیط Cloud Composer با پیکربندی زیر ایجاد کنید :

  • نام: محیط آهنگساز من
  • مکان: هر مکانی که از نظر جغرافیایی به شما نزدیک‌تر است
  • منطقه: هر منطقه‌ای در آن منطقه

تمام تنظیمات دیگر می‌توانند به صورت پیش‌فرض باقی بمانند. روی «ایجاد» در پایین کلیک کنید. نام و مکان محیط کامپوزر خود را یادداشت کنید - در مراحل بعدی به آنها نیاز خواهید داشت.

ایجاد سطل ذخیره‌سازی ابری

در پروژه خود، یک مخزن ذخیره‌سازی ابری با پیکربندی زیر ایجاد کنید :

  • نام: <your-project-id>
  • کلاس ذخیره‌سازی پیش‌فرض: چند منطقه‌ای
  • مکان: هر مکانی که از نظر جغرافیایی به منطقه Cloud Composer مورد استفاده شما نزدیک‌تر باشد.
  • مدل کنترل دسترسی: مجوزهای سطح شیء و سطح سطل را تنظیم کنید

وقتی آماده بودید، روی «ایجاد» کلیک کنید. حتماً نام مخزن ذخیره‌سازی ابری خود را برای مراحل بعدی یادداشت کنید.

۳. راه‌اندازی توابع ابری گوگل (GCF)

برای راه‌اندازی GCF، دستورات را در Google Cloud Shell اجرا خواهیم کرد.

اگرچه می‌توان گوگل کلود را از راه دور و با استفاده از ابزار خط فرمان gcloud از لپ‌تاپ خود مدیریت کرد، اما در این آزمایشگاه کد، ما از Google Cloud Shell ، یک محیط خط فرمان که در فضای ابری اجرا می‌شود، استفاده خواهیم کرد.

این ماشین مجازی مبتنی بر دبیان، تمام ابزارهای توسعه مورد نیاز شما را در خود جای داده است. این ماشین مجازی یک دایرکتوری خانگی دائمی ۵ گیگابایتی ارائه می‌دهد و روی فضای ابری گوگل اجرا می‌شود که عملکرد شبکه و احراز هویت را تا حد زیادی بهبود می‌بخشد. این بدان معناست که تنها چیزی که برای این آزمایشگاه کد نیاز دارید یک مرورگر است (بله، روی کروم‌بوک هم کار می‌کند).

برای فعال کردن Google Cloud Shell، از کنسول توسعه‌دهندگان روی دکمه‌ای که در سمت راست بالا قرار دارد کلیک کنید (فعال‌سازی و اتصال به محیط فقط چند لحظه طول می‌کشد):

مجوزهای امضای blob را به حساب سرویس توابع ابری اعطا کنید

برای اینکه GCF بتواند در Cloud IAP ، پروکسی که از وب‌سرور Airflow محافظت می‌کند، احراز هویت شود، باید به Appspot Service Account GCF نقش Service Account Token Creator را اعطا کنید. این کار را با اجرای دستور زیر در Cloud Shell خود انجام دهید و نام پروژه خود را به جای <your-project-id> قرار دهید.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

برای مثال، اگر پروژه شما my-project نام دارد، دستور شما به صورت زیر خواهد بود:

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

دریافت شناسه مشتری

برای ساخت یک توکن جهت احراز هویت در Cloud IAP ، این تابع به شناسه کلاینت پروکسی که از وب‌سرور Airflow محافظت می‌کند، نیاز دارد. API مربوط به Cloud Composer این اطلاعات را مستقیماً ارائه نمی‌دهد. در عوض، یک درخواست احراز هویت نشده به وب‌سرور Airflow ارسال کنید و شناسه کلاینت را از URL ریدایرکت دریافت کنید. ما این کار را با اجرای یک فایل پایتون با استفاده از Cloud Shell برای دریافت شناسه کلاینت انجام خواهیم داد.

با اجرای دستور زیر در Cloud Shell خود، کد لازم را از GitHub دانلود کنید.

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

اگر به دلیل وجود این دایرکتوری خطایی دریافت کردید، با اجرای دستور زیر آن را به آخرین نسخه به‌روزرسانی کنید.

cd python-docs-samples/
git pull origin master

با اجرای دستور زیر به دایرکتوری مناسب تغییر دهید.

cd python-docs-samples/composer/rest

کد پایتون را اجرا کنید تا شناسه کلاینت خود را دریافت کنید، نام پروژه خود را به جای <your-project-id> ، مکان محیط Composer که قبلاً ایجاد کرده‌اید را به جای <your-composer-location> و نام محیط Composer که قبلاً ایجاد کرده‌اید را به جای <your-composer-environment> جایگزین کنید.

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

برای مثال، اگر نام پروژه شما my-project ، مکان Composer شما us-central1 و نام محیط شما my-composer باشد، دستور شما به صورت زیر خواهد بود:

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py کارهای زیر را انجام می‌دهد:

  • با Google Cloud احراز هویت می‌شود
  • یک درخواست HTTP احراز هویت نشده به وب سرور Airflow ارسال می‌کند تا URI مربوط به تغییر مسیر را دریافت کند.
  • پارامتر کوئری client_id را از آن ریدایرکت استخراج می‌کند.
  • چاپ می‌کند تا بتوانید از آن استفاده کنید

شناسه کلاینت شما در خط فرمان چاپ می‌شود و چیزی شبیه به این خواهد بود:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

۴. تابع خود را ایجاد کنید

در Cloud Shell خود، با اجرای دستور زیر، مخزن را با کد نمونه لازم کلون کنید:

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

به دایرکتوری مورد نظر بروید و Cloud Shell خود را تا زمانی که مراحل بعدی را انجام می‌دهید، باز بگذارید.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

با کلیک روی منوی ناوبری و سپس کلیک روی «توابع ابری» به صفحه توابع ابری گوگل بروید.

روی «ایجاد تابع» در بالای صفحه کلیک کنید

نام تابع خود را "my-function" بگذارید و حافظه را به طور پیش‌فرض، ۲۵۶ مگابایت، رها کنید.

تریگر را روی «فضای ابری» تنظیم کنید، نوع رویداد را «نهایی/ایجاد» بگذارید و به باکتی که در مرحله «ایجاد یک باکت فضای ابری» ایجاد کرده‌اید، بروید.

کد منبع را روی «ویرایشگر درون‌خطی» (Inline Editor) بگذارید و زمان اجرا را روی «Node.js 8» تنظیم کنید.

در Cloud Shell خود، دستور زیر را اجرا کنید. این دستور فایل‌های index.js و package.json را در ویرایشگر Cloud Shell باز می‌کند.

cloudshell edit index.js package.json

روی تب package.json کلیک کنید، آن کد را کپی کرده و در بخش package.json ویرایشگر درون‌خطی Cloud Functions قرار دهید.

«تابع اجرا» را روی triggerDag تنظیم کنید

روی تب index.js کلیک کنید، کد را کپی کنید و آن را در بخش index.js ویرایشگر درون‌خطی Cloud Functions قرار دهید.

PROJECT_ID به شناسه پروژه خود و CLIENT_ID را به شناسه کلاینتی که از مرحله «دریافت شناسه کلاینت» ذخیره کرده‌اید، تغییر دهید. البته هنوز روی «ایجاد» کلیک نکنید - هنوز چند مورد دیگر برای پر کردن وجود دارد!

در Cloud Shell خود، دستور زیر را اجرا کنید و <your-environment-name> را با نام محیط Composer خود و <your-composer-region> را با منطقه‌ای که محیط Composer شما در آن قرار دارد، جایگزین کنید.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

برای مثال، اگر محیط شما my-composer-environment نام دارد و در us-central1 واقع شده است، دستور شما به صورت زیر خواهد بود:

gcloud composer environments describe my-composer-environment --location us-central1

خروجی باید چیزی شبیه به این باشد:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

در آن خروجی، به دنبال متغیری به نام airflowUri بگردید. در کد index.js خود، WEBSERVER_ID را به شناسه وب‌سرور Airflow تغییر دهید - این بخشی از متغیر airflowUri است که در انتها یک '-tp' خواهد داشت، برای مثال، abc123efghi456k-tp

روی لینک کشویی «بیشتر» کلیک کنید، سپس منطقه‌ای را که از نظر جغرافیایی به شما نزدیک‌تر است انتخاب کنید.

تیک گزینه "تلاش مجدد در صورت عدم موفقیت" را بزنید

برای ایجاد تابع ابری خود، روی «ایجاد» کلیک کنید

قدم به قدم کد را بررسی کنید

کدی که از index.js کپی کرده‌اید چیزی شبیه به این خواهد بود:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

بیایید نگاهی به آنچه اتفاق می‌افتد بیندازیم. در اینجا سه ​​تابع وجود دارد: triggerDag ، authorizeIap و makeIapPostRequest

triggerDag تابعی است که وقتی چیزی را در مخزن ذخیره‌سازی ابری تعیین‌شده آپلود می‌کنیم، فعال می‌شود. این تابع جایی است که متغیرهای مهم مورد استفاده در سایر درخواست‌ها، مانند PROJECT_ID ، CLIENT_ID ، WEBSERVER_ID و DAG_NAME را پیکربندی می‌کنیم. این تابع authorizeIap و makeIapPostRequest را فراخوانی می‌کند.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap با استفاده از یک حساب کاربری سرویس و "مبادله" یک JWT برای یک توکن شناسه که برای احراز هویت makeIapPostRequest استفاده خواهد شد، درخواستی را به پروکسی که از وب‌سرور Airflow محافظت می‌کند، ارسال می‌کند.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest یک فراخوانی به وب‌سرور Airflow انجام می‌دهد تا composer_sample_trigger_response_dag. نام DAG در URL وب‌سرور Airflow که با پارامتر url ارسال شده است، تعبیه شده است و idToken توکنی است که ما در درخواست authorizeIap به دست آورده‌ایم.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

۵. DAG خود را تنظیم کنید

در Cloud Shell خود، به دایرکتوری حاوی گردش‌های کاری نمونه بروید. این دایرکتوری بخشی از python-docs-samples است که در مرحله‌ی «دریافت شناسه‌ی کلاینت» از گیت‌هاب دانلود کرده‌اید.

cd
cd python-docs-samples/composer/workflows

DAG را در Composer آپلود کنید

نمونه DAG را با دستور زیر در مخزن ذخیره‌سازی DAG محیط Composer خود آپلود کنید، که در آن <environment_name> نام محیط Composer شما و <location> نام منطقه‌ای است که در آن قرار دارد. trigger_response_dag.py DAG است که با آن کار خواهیم کرد.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

برای مثال، اگر محیط Composer شما my-composer نام داشت و در us-central1 قرار داشت، دستور شما به صورت زیر خواهد بود:

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

گام برداشتن در DAG

کد DAG در trigger_response.py به این شکل است:

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

بخش default_args شامل آرگومان‌های پیش‌فرض مورد نیاز مدل BaseOperator در Apache Airflow است. این بخش را با این پارامترها در هر DAG آپاچی ایرفلو مشاهده خواهید کرد. owner در حال حاضر روی Composer Example تنظیم شده است، اما در صورت تمایل می‌توانید آن را به نام خود تغییر دهید. depends_on_past به ما نشان می‌دهد که این DAG به هیچ DAG قبلی وابسته نیست. سه بخش ایمیل، email ، email_on_failure و email_on_retry طوری تنظیم شده‌اند که هیچ اعلان ایمیلی بر اساس وضعیت این DAG ارسال نشود. DAG فقط یک بار دوباره تلاش می‌کند، زیرا retries روی ۱ تنظیم شده است و این کار را پس از پنج دقیقه، به ازای هر retry_delay انجام می‌دهد. start_date معمولاً زمان اجرای یک DAG را همراه با schedule_interval آن (که بعداً تنظیم می‌شود) تعیین می‌کند، اما در مورد این DAG، اهمیتی ندارد. روی ۱ ژانویه ۲۰۱۷ تنظیم شده است، اما می‌تواند روی هر تاریخ گذشته‌ای تنظیم شود.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

بخش with airflow.DAG DAG مورد نظر را پیکربندی می‌کند. این DAG با شناسه وظیفه composer_sample_trigger_response_dag ، آرگومان‌های پیش‌فرض از بخش default_args و از همه مهم‌تر، با مقدار schedule_interval برابر با None اجرا خواهد شد. مقدار schedule_interval برابر با None تنظیم شده است زیرا ما این DAG خاص را با تابع Cloud خود فعال می‌کنیم. به همین دلیل است که start_date در default_args اهمیتی ندارد.

هنگام اجرا، DAG پیکربندی خود را، همانطور که در متغیر print_gcs_info دیکته شده است، چاپ می‌کند.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

۶. عملکرد خود را آزمایش کنید

محیط کامپوزر خود را باز کنید و در ردیفی که نام محیط شما قرار دارد، روی لینک Airflow کلیک کنید.

با کلیک روی نام composer_sample_trigger_response_dag آن را باز کنید. در حال حاضر هیچ مدرکی از اجرای DAG وجود نخواهد داشت، زیرا ما هنوز DAG را برای اجرا فعال نکرده‌ایم. اگر این DAG قابل مشاهده یا کلیک نیست، یک دقیقه صبر کنید و صفحه را رفرش کنید.

یک تب جداگانه باز کنید و هر فایلی را که قبلاً ایجاد کرده‌اید و به عنوان تریگر برای تابع ابری خود مشخص کرده‌اید، در آن آپلود کنید. می‌توانید این کار را از طریق کنسول یا با استفاده از دستور gsutil انجام دهید.

به برگه رابط کاربری Airflow خود برگردید و روی Graph View کلیک کنید.

روی وظیفه print_gcs_info کلیک کنید، که باید با رنگ سبز مشخص شده باشد.

روی «مشاهده گزارش» در بالا سمت راست منو کلیک کنید

در گزارش‌ها، اطلاعاتی درباره فایلی که در فضای ذخیره‌سازی ابری خود آپلود کرده‌اید، مشاهده خواهید کرد.

تبریک! شما همین الان یک DAG از Airflow را با استفاده از Node.js و توابع Google Cloud راه‌اندازی کردید!

۷. پاکسازی

برای جلوگیری از تحمیل هزینه به حساب GCP خود برای منابع استفاده شده در این راهنمای سریع:

  1. (اختیاری) برای ذخیره داده‌هایتان، داده‌ها را از مخزن ذخیره‌سازی ابری برای محیط Cloud Composer و مخزن ذخیره‌سازی که برای این شروع سریع ایجاد کرده‌اید، دانلود کنید .
  2. سطل ذخیره‌سازی ابری مربوط به محیط و آنچه ایجاد کرده‌اید را حذف کنید.
  3. محیط Cloud Composer را حذف کنید . توجه داشته باشید که حذف محیط، فضای ذخیره‌سازی آن محیط را حذف نمی‌کند.
  4. (اختیاری) با محاسبات بدون سرور، ۲ میلیون فراخوانی اول در هر ماه رایگان است و وقتی عملکرد خود را به صفر می‌رسانید، هزینه‌ای از شما دریافت نمی‌شود (برای جزئیات بیشتر به قیمت‌گذاری مراجعه کنید). با این حال، اگر می‌خواهید عملکرد ابری خود را حذف کنید، با کلیک روی «حذف» در سمت راست بالای صفحه نمای کلی عملکرد خود، این کار را انجام دهید.

4fe11e1b41b32ba2.png

همچنین می‌توانید به صورت اختیاری پروژه را حذف کنید:

  1. در کنسول GCP، به صفحه پروژه‌ها بروید.
  2. در لیست پروژه‌ها، پروژه‌ای را که می‌خواهید حذف کنید انتخاب کرده و روی حذف کلیک کنید.
  3. در کادر، شناسه پروژه را تایپ کنید و سپس برای حذف پروژه، روی خاموش کردن کلیک کنید.