تشغيل مخطط DAG باستخدام دوال Node.JS وGoogle Cloud

1. مقدمة

تم تصميم Apache Airflow لتشغيل مخططات البيانات المستندة إلى البيانات وفقًا لجدول زمني منتظم، ولكن يمكنك أيضًا تشغيل هذه الرسوم استجابةً للأحداث، مثل تغيير في حزمة Cloud Storage أو إرسال رسالة إلى Cloud Pub/Sub. لتحقيق ذلك، يمكن تشغيل الرسوم البيانية المستندة إلى Cloud Composer من خلال Cloud Functions.

يقوم المثال في هذا التمرين المعملي بتشغيل DAG بسيط في كل مرة يحدث فيها تغيير في حزمة Cloud Storage. يستخدم DAG هذا مُشغِّل BashOperator لتشغيل الأمر bash الذي يطبع معلومات التغيير حول ما تم تحميله إلى حزمة Cloud Storage.

قبل بدء هذا الدرس التطبيقي، ننصحك بإكمال الدرسَين التطبيقيَّين حول الترميز مقدمة عن Cloud Composer وبدء استخدام دوال Cloud. إذا أنشأت بيئة Composer في الدرس التطبيقي حول الترميز: "مقدمة عن Cloud Composer"، يمكنك استخدام تلك البيئة في هذا الدرس التطبيقي.

ما ستنشئه

في هذا الدرس التطبيقي حول الترميز، ستتمكّن من:

  1. حمِّل ملفًا إلى Google Cloud Storage، وسيؤدي ذلك إلى
  2. تشغيل دالة Google Cloud باستخدام وقت تشغيل Node.JS
  3. ستنفِّذ هذه الدالة مخطط DAG في Google Cloud Composer
  4. يؤدي هذا إلى تشغيل أمر bash بسيط يطبع التغيير في حزمة Google Cloud Storage.

1d3d3736624a923f.png

ما ستتعلمه

  • كيفية تشغيل Apache Airflow DAG باستخدام دوال Google Cloud + Node.js

المتطلبات

  • حساب Google Cloud Platform
  • فهم أساسيات JavaScript
  • معرفة أساسية بـ Cloud Composer/Airflow وCloud Functions
  • الراحة باستخدام أوامر واجهة سطر الأوامر

2. إعداد Google Cloud Platform

اختيار المشروع أو إنشاؤه

اختَر مشروع Google Cloud Platform أو أنشئه. إذا كنت تعمل على إنشاء مشروع جديد، يُرجى اتّباع الخطوات الواردة هنا.

دوِّن رقم تعريف المشروع الذي ستستخدمه في الخطوات اللاحقة.

إذا كنت بصدد إنشاء مشروع جديد، يمكنك العثور على رقم تعريف المشروع أسفل "اسم المشروع" مباشرةً في صفحة الإنشاء.

إذا سبق لك إنشاء مشروع، يمكنك العثور على رقم التعريف على صفحة وحدة التحكّم الرئيسية في بطاقة معلومات المشروع.

تفعيل واجهات برمجة التطبيقات

تفعيل Cloud Composer وGoogle Cloud Functions وCloud Identity وواجهة برمجة التطبيقات لإدارة الهوية وإمكانية الوصول (IAM) في Google.

إنشاء بيئة Composer

أنشئ بيئة Cloud Composer بالإعدادات التالية:

  • الاسم: my-composer-environment
  • الموقع الجغرافي: مهما كان الموقع الجغرافي الأقرب إليك
  • المنطقة: أي منطقة في تلك المنطقة

ويمكن أن تظل جميع الإعدادات الأخرى على حالتها التلقائية. انقر على "إنشاء" في أسفل الصفحة.دوِّن اسم بيئة Composer وموقعها الجغرافي، ستحتاج إليها في الخطوات المستقبلية.

إنشاء حزمة Cloud Storage

في مشروعك، أنشِئ حزمة Cloud Storage باستخدام الإعدادات التالية:

  • الاسم: <your-project-id>
  • فئة التخزين التلقائية: متعددة المناطق
  • الموقع الجغرافي: أيًا كان الموقع الجغرافي الأقرب جغرافيًا إلى منطقة Cloud Composer التي تستخدمها
  • نموذج التحكّم في الوصول: ضبط الأذونات على مستوى العنصر والأذونات على مستوى الحزمة

النقر على "إنشاء" عندما تكون مستعدًا، احرص على تدوين اسم حزمتك على Cloud Storage للاطّلاع على الخطوات اللاحقة.

3- إعداد دوال Google Cloud (GCF)

لإعداد GCF، سننفّذ الأوامر في Google Cloud Shell.

يمكن إدارة Google Cloud عن بُعد من الكمبيوتر المحمول باستخدام أداة سطر الأوامر gcloud، إلا أنّنا سنستخدم في هذا الدرس التطبيقي Google Cloud Shell، وهي بيئة سطر أوامر يتم تشغيلها في السحابة الإلكترونية.

هذا الجهاز الافتراضي المستند إلى نظام دبيان محمل بكل أدوات التطوير التي ستحتاج إليها. وتوفِّر هذه الآلة دليلاً رئيسيًا دائمًا بسعة 5 غيغابايت ويتمّ تشغيله على Google Cloud، ما يحسّن كثيرًا أداء الشبكة والمصادقة. وهذا يعني أنّ كل ما ستحتاجه في هذا الدرس التطبيقي حول الترميز هو متصفّح (نعم، يعمل على جهاز Chromebook).

لتفعيل Google Cloud Shell، من وحدة تحكّم المطوّرين، انقر على الزرّ في أعلى يسار الصفحة (من المفترَض أن تستغرق عملية الإعداد والاتصال بالبيئة بضع لحظات فقط):

منح أذونات توقيع الكائن الثنائي الكبير (blob) في حساب خدمة Cloud Functions

من أجل مصادقة GCF على Cloud IAP، وهو الخادم الوكيل الذي يحمي خادم الويب Airflow، يجب منح دور GCF لحساب خدمة Appsبوت الدور Service Account Token Creator. يمكنك إجراء ذلك من خلال تنفيذ الأمر التالي في Cloud Shell، مع استبدال اسم مشروعك بـ <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

على سبيل المثال، إذا كان مشروعك يُسمى my-project، فسيكون الأمر كالتالي:

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

الحصول على معرِّف العميل

لإنشاء رمز مميّز من أجل المصادقة على عمليات الشراء داخل التطبيق في Cloud، تتطلّب الدالة معرّف العميل للخادم الوكيل الذي يحمي خادم ويب Airflow. لا توفّر Cloud Composer API هذه المعلومات مباشرةً. بدلاً من ذلك، يمكنك إرسال طلب لم تتم مصادقته إلى خادم ويب Airflow والحصول على معرِّف العميل من عنوان URL لإعادة التوجيه. سنقوم بذلك عن طريق تشغيل ملف بايثون باستخدام Cloud Shell للحصول على معرف العميل.

قم بتنزيل الرمز اللازم من GitHub من خلال تشغيل الأمر التالي في Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

إذا تلقّيت رسالة خطأ لأنّ هذا الدليل متوفّر حاليًا، يمكنك تحديثه إلى أحدث إصدار من خلال تنفيذ الأمر التالي.

cd python-docs-samples/
git pull origin master

التغيير إلى الدليل المناسب عن طريق تشغيل

cd python-docs-samples/composer/rest

شغِّل رمز Python للحصول على معرِّف العميل، واستبدِل اسم مشروعك بـ <your-project-id>، وموقع بيئة Composer التي أنشأتها سابقًا لـ <your-composer-location>، واسم بيئة Composer التي أنشأتها سابقًا للتطبيق <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

على سبيل المثال، إذا كان اسم مشروعك هو my-project وكان موقع Composer هو us-central1 واسم بيئتك هو my-composer، سيكون الأمر على النحو التالي:

python3 get_client_id.py my-project us-central1 my-composer

تنفِّذ ميزة "get_client_id.py" ما يلي:

  • تتم المصادقة باستخدام Google Cloud
  • إرسال طلب HTTP لم تتم مصادقته إلى خادم ويب Airflow للحصول على عنوان URI لإعادة التوجيه
  • يتم استخراج معلَمة طلب البحث client_id من عملية إعادة التوجيه هذه.
  • يطبعها لك لاستخدامها

وستتم طباعة مُعرّف العميل الخاص بك في سطر الأوامر وسيظهر على النحو التالي:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. إنشاء الدالة

في Cloud Shell، استنسِخ المستودع باستخدام نموذج الرمز اللازم عن طريق تنفيذ

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

انتقِل إلى الدليل اللازم واترك Cloud Shell مفتوحًا أثناء إكمال الخطوات القليلة التالية.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

انتقِل إلى صفحة "وظائف Google Cloud" من خلال النقر على قائمة التنقّل ثم النقر على "دوال السحابة الإلكترونية".

انقر على "إنشاء وظيفة". في أعلى الصفحة

تسمية الدالة "my-function" واترك الذاكرة افتراضيًا، 256 ميغابايت.

اضبط المشغِّل على "Cloud Storage"، واترك نوع الحدث على "إنهاء/إنشاء"، وانتقِل إلى الحزمة التي أنشأتها في خطوة "إنشاء حزمة Cloud Storage".

اترك رمز المصدر معيّنًا على "المحرِّر المضمّن". واضبط بيئة التشغيل على "Node.js 8"

في Cloud Shell، شغِّل الأمر التالي. سيؤدي هذا إلى فتح index.js وpackage.json في محرّر Cloud Shell

cloudshell edit index.js package.json

انقر على علامة التبويب package.json وانسخ هذا الرمز البرمجي والصقه في قسم package.json في المحرِّر المضمَّن في دوال Cloud Functions

تعيين "الدالة المطلوب تنفيذها" لتشغيلDag

انقر على علامة التبويب index.js وانسخ الرمز، ثم الصقه في قسم index.js في المحرِّر المضمَّن في دوال Cloud.

غيِّر PROJECT_ID إلى رقم تعريف مشروعك، وCLIENT_ID إلى معرِّف العميل الذي حفظته من خطوة "الحصول على معرِّف العميل". لا تنقر على "إنشاء" على الرغم من ذلك - لا يزال هناك بعض الأشياء التي يجب ملؤها!

في Cloud Shell، شغِّل الأمر التالي، مع استبدال <your-environment-name> باسم بيئة Composer و<your-composer-region> مع المنطقة التي تقع فيها بيئة Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

على سبيل المثال، إذا كانت بيئتك باسم my-composer-environment وتقع في us-central1، فسيكون الأمر كما يلي

gcloud composer environments describe my-composer-environment --location us-central1

من المفترض أن تبدو النتيجة على النحو التالي:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

في هذا الناتج، ابحث عن المتغيّر المسمى airflowUri. في رمز index.js، غيِّر WEBSERVER_ID ليصبح رقم تعريف خادم الويب Airflow، وهو جزء من متغيّر airflowUri الذي سيحتوي على " -tp". في النهاية، على سبيل المثال، abc123efghi456k-tp

انقر على "المزيد" ثم اختر "المنطقة الأقرب إليك جغرافيًا"

تحديد "إعادة المحاولة عند التعذُّر"

انقر على "إنشاء" لإنشاء دالة Cloud

لمحة عن الرمز

ستبدو التعليمة البرمجية التي نسختها من index.js على النحو التالي:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

لنلقي نظرة على ما يجري. تتوفّر ثلاث دوال هنا: triggerDag وauthorizeIap وmakeIapPostRequest.

triggerDag هي الدالة التي يتم تشغيلها عندما نحمّل ملفًا إلى حزمة Cloud Storage المحدّدة. ويمكنك من خلاله ضبط المتغيّرات المهمة المستخدَمة في الطلبات الأخرى، مثل PROJECT_ID وCLIENT_ID وWEBSERVER_ID وDAG_NAME. يتصل بـ authorizeIap وmakeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

يرسل authorizeIap طلبًا إلى الخادم الوكيل الذي يحمي خادم ويب Airflow، باستخدام حساب خدمة و"تبادل" رمز JWT للرمز المميّز للمعرّف الذي سيتم استخدامه لمصادقة makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

يتصل makeIapPostRequest بخادم ويب Airflow لتفعيل composer_sample_trigger_response_dag.. يتم تضمين اسم DAG في عنوان URL لخادم ويب Airflow الذي يتم تمريره باستخدام المعلمة url، وidToken هو الرمز المميز الذي حصلنا عليه في طلب authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5- إعداد رسم بياني دائري

في Cloud Shell، انتقِل إلى الدليل باستخدام نموذج مهام سير العمل. وهو جزء من عينات مستندات python التي قمت بتنزيلها من GitHub في خطوة "الحصول على معرف العميل".

cd
cd python-docs-samples/composer/workflows

تحميل رسم بياني دائري إلى مؤلِّف

حمِّل نموذج DAG إلى حزمة تخزين DAG في بيئة Composer باستخدام الأمر التالي، حيث يشير <environment_name> إلى اسم بيئة Composer و<location> هو اسم المنطقة التي تقع فيها. trigger_response_dag.py هو DAG الذي سنعمل معه.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

على سبيل المثال، إذا كانت بيئة Composer تُسمّى my-composer وتتوفّر في us-central1، سيكون الأمر هو

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

تعديل الصور

يبدو رمز DAG في trigger_response.py على النحو التالي

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

يحتوي القسم default_args على الوسيطات التلقائية على النحو الذي يطلبه نموذج BaseOperator في Apache Airflow. سترى هذا القسم يتضمّن هذه المَعلمات في أيّ Apache Airflow DAG. تم ضبط owner حاليًا على Composer Example، ولكن يمكنك تغيير هذا الإعداد ليصبح اسمك إذا أردت ذلك. يوضّح لنا depends_on_past أنّ مقياس DAG هذا لا يعتمد على أي مخططات DAG سابقة. تم ضبط أقسام البريد الإلكتروني الثلاثة، email وemail_on_failure وemail_on_retry، بحيث لا يتم تلقّي أي إشعارات عبر البريد الإلكتروني استنادًا إلى حالة DAG هذه. ستحاول أداة DAG إعادة المحاولة مرة واحدة فقط، حيث تم ضبط retries على 1، وسيتم إجراء ذلك بعد خمس دقائق، لكل retry_delay. ويحدّد start_date عادةً الوقت الذي يجب تنفيذ فيه مخطط DAG، بالتزامن مع schedule_interval (الذي تم تحديده لاحقًا)، ولكن في حالة مخطط DAG هذا، لن يكون مناسبًا. وقد تم ضبطها على 1 كانون الثاني (يناير) 2017، ولكن يمكن ضبطها على أي تاريخ في الماضي.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

ويهيئ القسم with airflow.DAG مخطط DAG الذي سيتم تشغيله. سيتم تنفيذها باستخدام معرّف المهمة composer_sample_trigger_response_dag، والوسيطات التلقائية من القسم default_args، والأهم من ذلك، باستخدام schedule_interval من None. تم ضبط schedule_interval على None لأنّنا نستخدم دالة DAG المحدّدة باستخدام دالة السحابة الإلكترونية. هذا هو سبب عدم ملاءمة start_date في default_args.

عند التنفيذ، تطبع DAG الإعدادات، كما هو موضح في المتغير print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6- اختبار الدالة

افتح بيئة Composer، وفي الصف الذي يتضمّن اسم البيئة، انقر على رابط Airflow.

يمكنك فتح "composer_sample_trigger_response_dag" من خلال النقر على اسمه. لن يتوفّر في الوقت الحالي أي دليل على تنفيذ DAG، لأنّنا لم نشغّله بعد.إذا لم تكن DAG هذه مرئية أو قابلة للنقر، انتظِر لدقيقة وأعِد تحميل الصفحة.

افتح علامة تبويب منفصلة وحمِّل أي ملف إلى حزمة Cloud Storage التي أنشأتها سابقًا وحدَّدتها كمشغِّل لدالة Cloud. يمكنك إجراء ذلك من خلال وحدة التحكّم أو باستخدام أمر gsutil.

انتقل مرة أخرى إلى علامة التبويب التي تحتوي على واجهة مستخدم Airflow وانقر على "عرض الرسم البياني"

انقر على مهمة print_gcs_info التي يجب تحديدها باللون الأخضر

انقر على "عرض السجلّ". في أعلى يسار القائمة

في السجلّات، ستظهر لك معلومات عن الملف الذي حمّلته إلى حزمة Cloud Storage.

تهانينا! لقد شغّلت للتو مخطط DAG لتدفق الهواء باستخدام Node.js ودوال Google Cloud.

7. تنظيف

لتجنُّب تحمُّل الرسوم إلى حسابك على Google Cloud Platform مقابل الموارد المستخدَمة في عملية البدء السريع هذه:

  1. (اختياري) لحفظ بياناتك، يمكنك تنزيل البيانات من حزمة Cloud Storage لبيئة Cloud Composer وحزمة التخزين التي أنشأتها لعملية البدء السريع هذه.
  2. حذف حزمة Cloud Storage للبيئة التي أنشأتها
  3. احذف بيئة Cloud Composer. يُرجى العِلم أنّ حذف بيئتك لا يؤدي إلى حذف حزمة مساحة التخزين للبيئة.
  4. (اختياري) من خلال الحوسبة بدون خادم، تكون أول مليونَي استدعاء في الشهر مجانية، وعند توسيع نطاق الدالة إلى الصفر، لن يتم تحصيل رسوم منك (راجع الأسعار للحصول على مزيد من التفاصيل). مع ذلك، إذا أردت حذف دالّة السحابة الإلكترونية، يمكنك إجراء ذلك من خلال النقر على "حذف". في أعلى يسار صفحة النظرة العامة لدالتك

4fe11e1b41b32ba2.png

يمكنك أيضًا حذف المشروع اختياريًا:

  1. في "وحدة تحكّم Google Cloud Platform"، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف التشغيل لحذف المشروع.