Node.JS और Google Cloud Functions के साथ DAG को ट्रिगर करना

1. परिचय

Apache Airflow को नियमित शेड्यूल पर DAG चलाने के लिए डिज़ाइन किया गया है, लेकिन आप इवेंट के जवाब में DAG को ट्रिगर भी कर सकते हैं, जैसे कि Cloud Storage बकेट में बदलाव होना या Cloud Pub/Sub में पुश किया गया कोई मैसेज. ऐसा करने के लिए, Cloud Composer DAG को Cloud Functions से ट्रिगर किया जा सकता है.

इस लैब के उदाहरण में, Cloud Storage बकेट में हर बार बदलाव होने पर, एक सामान्य डीएजी चलता है. यह DAG, Cloud Storage बकेट में अपलोड किए गए कॉन्टेंट के बारे में बदलाव की जानकारी प्रिंट करने के लिए, बैश कमांड चलाने के लिए BashOperator का इस्तेमाल करता है.

हमारा सुझाव है कि इस लैब को शुरू करने से पहले, Cloud Composer के बारे में जानकारी और Cloud Functions का इस्तेमाल शुरू करना कोडलैब को पूरा कर लें. अगर आपने Cloud Composer कोडलैब के इंट्रो कार्ड में कंपोज़र एनवायरमेंट बनाया है, तो इस लैब में उस एनवायरमेंट का इस्तेमाल किया जा सकता है.

आपको क्या बनाना होगा

इस कोडलैब में, ये काम किए जा सकते हैं:

  1. Google Cloud Storage पर ऐसी फ़ाइल अपलोड करें जिससे
  2. Node.JS रनटाइम का इस्तेमाल करके, Google Cloud Function को ट्रिगर करें
  3. यह फ़ंक्शन, Google Cloud Composer में DAG को एक्ज़ीक्यूट करेगा
  4. यह Google Cloud Storage बकेट में बदलाव को प्रिंट करने के लिए एक आसान बैश कमांड चलाता है

1d3d3736624a923f.png

आपको क्या सीखने को मिलेगा

  • Google Cloud Functions + Node.js का इस्तेमाल करके, Apache Airflow DAG को ट्रिगर करने का तरीका

आपको इन चीज़ों की ज़रूरत होगी

  • GCP खाता
  • JavaScript की बुनियादी समझ
  • Cloud Composer/Airflow और Cloud Functions के बारे में बुनियादी जानकारी
  • सीएलआई निर्देशों का इस्तेमाल करके आसानी से काम करें

2. GCP सेट अप किया जा रहा है

प्रोजेक्ट चुनें या बनाएं

Google Cloud Platform प्रोजेक्ट चुनें या बनाएं. नया प्रोजेक्ट बनाने के लिए, यहां दिया गया तरीका अपनाएं.

अपना प्रोजेक्ट आईडी नोट कर लें, जिसे आपको बाद के चरणों में इस्तेमाल करना होगा.

अगर नया प्रोजेक्ट बनाया जा रहा है, तो प्रोजेक्ट आईडी, बनाने वाले पेज पर प्रोजेक्ट के नाम के ठीक नीचे दिखेगा

अगर आपने पहले ही कोई प्रोजेक्ट बना लिया है, तो आपको प्रोजेक्ट की जानकारी वाले कार्ड में, कंसोल के होम पेज पर आईडी मिल सकता है

एपीआई चालू करें

Cloud Composer, Google Cloud Functions, और Cloud Identity, और Google Identity and Access Management (IAM) API को चालू करें.

कंपोज़र एनवायरमेंट बनाएं

नीचे दिए गए कॉन्फ़िगरेशन की मदद से Cloud Composer एनवायरमेंट बनाएं:

  • नाम: my-कंपोज़र-एनवायरमेंट
  • जगह: भौगोलिक तौर पर जो भी जगह आपके आस-पास हो
  • ज़ोन: उस क्षेत्र का कोई भी ज़ोन

अन्य सभी कॉन्फ़िगरेशन अपनी डिफ़ॉल्ट सेटिंग पर बने रह सकते हैं. "बनाएं" पर क्लिक करें नीचे.अपने कंपोज़र एनवायरमेंट का नाम और जगह नोट करें - आपको आने वाले चरणों में इनकी ज़रूरत पड़ेगी.

Cloud Storage बकेट बनाना

इस कॉन्फ़िगरेशन के साथ अपने प्रोजेक्ट में, Cloud Storage बकेट बनाएं:

  • नाम: <your-project-id>
  • डिफ़ॉल्ट स्टोरेज क्लास: एक से ज़्यादा क्षेत्र
  • जगह: आप जिस जगह का इस्तेमाल कर रहे हैं वह Cloud Composer के इलाके के सबसे नज़दीक है
  • ऐक्सेस कंट्रोल मॉडल: ऑब्जेक्ट-लेवल और बकेट-लेवल की अनुमतियां सेट करें

"बनाएं" दबाएं जब आप तैयार हों, तो बाद के चरणों के लिए अपनी Cloud Storage बकेट का नाम नोट कर लें.

3. Google Cloud Functions (GCF) सेट अप करना

जीसीएफ़ सेट अप करने के लिए, हम Google Cloud Shell में कमांड चलाएंगे.

gcloud कमांड लाइन टूल का इस्तेमाल करके, Google Cloud को आपके लैपटॉप से कहीं से भी ऑपरेट किया जा सकता है. हालांकि, इस कोडलैब में हम Google Cloud Shell का इस्तेमाल करेंगे. यह एक कमांड लाइन एनवायरमेंट है जो Cloud में चलता है.

Debian आधारित इस वर्चुअल मशीन में ऐसे सभी डेवलपमेंट टूल मौजूद हैं जिनकी आपको ज़रूरत पड़ेगी. यह पांच जीबी की स्थायी होम डायरेक्ट्री उपलब्ध कराता है और Google Cloud पर चलता है. यह नेटवर्क की परफ़ॉर्मेंस और पुष्टि करने की प्रक्रिया को बेहतर बनाता है. इसका मतलब है कि इस कोडलैब के लिए आपको सिर्फ़ एक ब्राउज़र की ज़रूरत होगी. हां, यह Chromebook पर काम करता है.

Google Cloud Shell को चालू करने के लिए, डेवलपर कंसोल में सबसे ऊपर दाईं ओर मौजूद बटन पर क्लिक करें. प्रावधान करने और एनवायरमेंट से कनेक्ट होने में कुछ ही सेकंड लगेंगे:

Cloud Functions सेवा खाते को BLOB में साइन करने की अनुमतियां दें

GCF, Airflow वेबसर्वर की सुरक्षा करने वाली प्रॉक्सी, Cloud IAP की पुष्टि करे, इसके लिए आपको Appsपॉट सेवा खाते के जीसीएफ़ को Service Account Token Creator की भूमिका देनी होगी. ऐसा करने के लिए, अपने Cloud Shell में नीचे दिया गया कमांड चलाएं और <your-project-id> की जगह अपने प्रोजेक्ट का नाम रखें.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

उदाहरण के लिए, अगर आपके प्रोजेक्ट का नाम my-project है, तो आपका निर्देश

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Client-ID पाना

Cloud IAP की पुष्टि करने वाला टोकन बनाने के लिए, फ़ंक्शन को उस प्रॉक्सी के क्लाइंट आईडी की ज़रूरत होती है जो Airflow वेबसर्वर की सुरक्षा करता है. Cloud Composer API यह जानकारी सीधे तौर पर नहीं देता. इसके बजाय, Airflow वेबसर्वर पर बिना पुष्टि वाला अनुरोध करें और रीडायरेक्ट यूआरएल से क्लाइंट आईडी कैप्चर करें. ऐसा करने के लिए, Cloud Shell का इस्तेमाल करके Python फ़ाइल चलाएं, ताकि क्लाइंट आईडी कैप्चर किया जा सके.

अपने Cloud Shell में यह कमांड चलाकर, GitHub से ज़रूरी कोड डाउनलोड करें

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

अगर इस डायरेक्ट्री पहले से मौजूद होने की वजह से, आपको गड़बड़ी का कोई मैसेज मिलता है, तो यहां दिया गया निर्देश अपनाकर, इसे नए वर्शन में अपडेट करें

cd python-docs-samples/
git pull origin master

इसे चलाकर, सही डायरेक्ट्री में बदलें

cd python-docs-samples/composer/rest

Python कोड चलाकर, <your-project-id> के लिए अपने प्रोजेक्ट का नाम, <your-composer-location> के लिए पहले बनाए गए कंपोज़र एनवायरमेंट की जगह, और <your-composer-environment> के लिए पहले बनाए गए कंपोज़र एनवायरमेंट की जगह का नाम बदलना

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

उदाहरण के लिए, अगर आपके प्रोजेक्ट का नाम my-project है, आपके कंपोज़र की जगह us-central1 है, और आपके एनवायरमेंट का नाम my-composer है, तो आपका निर्देश

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py ये काम करता है:

  • Google Cloud की मदद से पुष्टि करता है
  • रीडायरेक्ट यूआरआई पाने के लिए, Airflow वेबसर्वर से बिना पुष्टि वाला एक एचटीटीपी अनुरोध बनाता है
  • इस रीडायरेक्ट से client_id क्वेरी पैरामीटर एक्सट्रैक्ट करता है
  • आपके इस्तेमाल के लिए इसे प्रिंट करता है

आपका Client-ID कमांड लाइन पर प्रिंट हो जाएगा और कुछ ऐसा दिखेगा:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. अपना फ़ंक्शन बनाएं

अपने Cloud Shell में, ज़रूरी सैंपल कोड की मदद से रेपो का क्लोन बनाएं. इसके लिए, आपको

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

अगले कुछ चरण पूरे करने तक, ज़रूरी डायरेक्ट्री में बदलाव करें और अपना Cloud Shell खुला रखें

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

नेविगेशन मेन्यू पर क्लिक करके और फिर "Cloud Functions" पर क्लिक करके, Google Cloud Functions पेज पर जाएं

"फ़ंक्शन बनाएं" पर क्लिक करें पेज के सबसे ऊपर

अपने फ़ंक्शन को "मेरा फ़ंक्शन" नाम दें और मेमोरी को डिफ़ॉल्ट, 256 एमबी पर छोड़ दें.

ट्रिगर को "Cloud Storage" पर सेट करें, इवेंट टाइप को "फ़ाइनल बनाएं/बनाएं" के तौर पर रहने दें, और 'Cloud Storage बकेट बनाएं' चरण में बनाई गई बकेट को ब्राउज़ करें..

स्रोत कोड को "इनलाइन संपादक" पर सेट रहने दें और रनटाइम को "Node.js 8" पर सेट करें

अपने Cloud Shell में, नीचे दिया गया कमांड चलाएं. इससे Cloud Shell Editor में index.js और Package.json की फ़ाइल खुल जाएगी

cloudshell edit index.js package.json

Package.json टैब पर क्लिक करके, उस कोड को कॉपी करें और इसे Cloud Functions इनलाइन एडिटर के Package.json सेक्शन में चिपकाएं

"फ़ंक्शन को लागू करने के लिए" सेट करना ट्रिगर-डैग के लिए

index.js टैब पर क्लिक करें, कोड को कॉपी करें, और उसे Cloud Functions इनलाइन एडिटर के index.js सेक्शन में चिपकाएं

PROJECT_ID को अपने प्रोजेक्ट आईडी में बदलें. CLIENT_ID को क्लाइंट आईडी पाने के चरण से सेव किए गए क्लाइंट आईडी में बदलें. "बनाएं" पर क्लिक न करें हालांकि, कुछ और चीज़ें भरने की ज़रूरत है!

अपने Cloud Shell में, <your-environment-name> को बदलकर, नीचे दिया गया कमांड चलाएं इसमें आपको Composer एनवायरमेंट और <your-composer-region> का नाम शामिल किया जा सकता है जिसमें आपका Composer Environment मौजूद है.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

उदाहरण के लिए, अगर आपके एनवायरमेंट का नाम my-composer-environment है और यह us-central1 में मौजूद है, तो आपका निर्देश

gcloud composer environments describe my-composer-environment --location us-central1

आउटपुट कुछ ऐसा दिखना चाहिए:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

उस आउटपुट में, airflowUri नाम वाला वैरिएबल खोजें. अपने index.js कोड में, WEBSERVER_ID को Airflow वेबसर्वर आईडी में बदलें - यह airflowUri वैरिएबल का वह हिस्सा है जिसमें '-tp' होगा आखिर में, उदाहरण के लिए, abc123efghi456k-tp

"ज़्यादा" पर क्लिक करें ड्रॉपडाउन लिंक चुनें. इसके बाद, उस इलाके को चुनें जो आपके सबसे नज़दीक है

"फ़ेल होने पर फिर से कोशिश करें" चुनें

"बनाएं" पर क्लिक करें अपना Cloud फ़ंक्शन बनाने के लिए

कोड के बारे में जानना

आपने index.js से जो कोड कॉपी किया है वह कुछ ऐसा दिखेगा:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

आइए, देखते हैं कि क्या समस्या आ रही है. यहां तीन फ़ंक्शन हैं: triggerDag, authorizeIap, और makeIapPostRequest

triggerDag फ़ंक्शन तब ट्रिगर होता है, जब हम तय किए गए Cloud Storage बकेट में कुछ अपलोड करते हैं. यहां पर हम दूसरे अनुरोधों में इस्तेमाल किए जाने वाले अहम वैरिएबल, जैसे कि PROJECT_ID, CLIENT_ID, WEBSERVER_ID, और DAG_NAME को कॉन्फ़िगर करते हैं. यह authorizeIap और makeIapPostRequest को कॉल करता है.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap, सेवा खाते का इस्तेमाल करके और "एक्सचेंज" करके, Airflow वेबसर्वर की सुरक्षा करने वाली प्रॉक्सी को अनुरोध भेजता है आईडी टोकन के लिए JWT, जिसका इस्तेमाल makeIapPostRequest की पुष्टि करने के लिए किया जाएगा.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest, composer_sample_trigger_response_dag. को ट्रिगर करने के लिए Airflow वेबसर्वर को कॉल करता है. DAG नाम, url पैरामीटर के साथ पास किए गए Airflow वेबसर्वर यूआरएल में एम्बेड किया गया है और idToken वह टोकन है जो हमें authorizeIap अनुरोध में मिला है.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. अपना DAG सेट अप करें

अपने Cloud Shell में, सैंपल वर्कफ़्लो की मदद से, डायरेक्ट्री में बदलाव करें. यह क्लाइंट आईडी पाने के चरण में, GitHub से डाउनलोड किए गए Python-docs के सैंपल का हिस्सा है.

cd
cd python-docs-samples/composer/workflows

DAG को Composer पर अपलोड करना

नीचे दिए गए निर्देश का इस्तेमाल करके, अपने कंपोज़र एनवायरमेंट के DAG स्टोरेज बकेट में सैंपल DAG अपलोड करें. यहां <environment_name> आपके कंपोज़र एनवायरमेंट का नाम और <location> उस इलाके का नाम होगा जहां वह मौजूद है. trigger_response_dag.py वह DAG है जिसके साथ हम काम करेंगे.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

उदाहरण के लिए, अगर आपके Composer एनवायरमेंट का नाम my-composer है और यह us-central1 में मौजूद है, तो आपका निर्देश

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

डीएजी के ज़रिए आगे बढ़ना

trigger_response.py में डीएजी कोड ऐसा दिखता है

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args सेक्शन में, Apache Airflow के BaseOperator मॉडल के मुताबिक डिफ़ॉल्ट आर्ग्युमेंट शामिल किए गए हैं. आपको किसी भी Apache Airflow DAG में, इन पैरामीटर के साथ यह सेक्शन दिखेगा. फ़िलहाल, owner को Composer Example पर सेट किया गया है. हालांकि, आपके पास इसे अपना नाम रखने का विकल्प है. depends_on_past से पता चलता है कि यह डीएजी किसी पिछले डीएजी पर निर्भर नहीं है. ईमेल के तीन सेक्शन email, email_on_failure, और email_on_retry को इस तरह सेट किया गया है कि इस DAG की स्थिति के आधार पर कोई भी ईमेल सूचना न मिले. हर retry_delay के लिए, डीएजी सिर्फ़ एक बार कोशिश करेगा, क्योंकि retries को 1 पर सेट किया गया है और पांच मिनट बाद ऐसा किया जाएगा. आम तौर पर, start_date बताता है कि डीएजी को schedule_interval (बाद में सेट किया गया) के साथ कब चलाना चाहिए. हालांकि, इस डीएजी के मामले में यह काम का नहीं है. यह 1 जनवरी, 2017 के लिए सेट है, लेकिन इसे किसी पिछली तारीख पर भी सेट किया जा सकता है.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG सेक्शन, चलाए जाने वाले DAG को कॉन्फ़िगर करता है. इसे composer_sample_trigger_response_dag टास्क आईडी के साथ चलाया जाएगा. यह default_args सेक्शन के डिफ़ॉल्ट आर्ग्युमेंट हैं. इसे None के schedule_interval के साथ चलाया जाएगा. schedule_interval को None पर सेट किया गया है, क्योंकि हम इस डीएजी को अपने क्लाउड फ़ंक्शन के साथ ट्रिगर कर रहे हैं. इसलिए, default_args में मौजूद start_date काम का नहीं है.

लागू होने पर, डीएजी print_gcs_info वैरिएबल के हिसाब से अपने कॉन्फ़िगरेशन को प्रिंट करता है.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. अपने फ़ंक्शन की जांच करें

अपना Composer Environment खोलें और अपने एनवायरमेंट के नाम वाली लाइन में, Airflow लिंक पर क्लिक करें

composer_sample_trigger_response_dag के नाम पर क्लिक करके उसे खोलें. अभी DAG के चलने का कोई सबूत नहीं है, क्योंकि हमने अभी तक DAG को चलाने के लिए ट्रिगर नहीं किया है.अगर यह DAG दिखाई नहीं दे रहा है या इस पर क्लिक नहीं किया जा सकता है, तो एक मिनट इंतज़ार करें और पेज को रीफ़्रेश करें.

एक अलग टैब खोलें और किसी भी फ़ाइल को Cloud Storage बकेट में अपलोड करें, जिसे आपने पहले बनाया हो और जिसे आपके Cloud फ़ंक्शन के लिए ट्रिगर के तौर पर बताया गया हो. ऐसा करने के लिए, कंसोल का इस्तेमाल किया जा सकता है या gsutil कमांड का इस्तेमाल किया जा सकता है.

अपने Airflow यूज़र इंटरफ़ेस (यूआई) के साथ टैब पर वापस जाएं और ग्राफ़ व्यू पर क्लिक करें

print_gcs_info टास्क पर क्लिक करें, जिसे हरे रंग में आउटलाइन किया जाना चाहिए

"लॉग देखें" पर क्लिक करें मेन्यू में सबसे ऊपर दाईं ओर

लॉग में, आपको उस फ़ाइल के बारे में जानकारी दिखेगी जिसे आपने अपने Cloud Storage बकेट में अपलोड किया है.

बधाई हो! आपने अभी-अभी Node.js और Google Cloud Functions का इस्तेमाल करके Airflow DAG को ट्रिगर किया है!

7. साफ़-सफ़ाई सेवा

इस क्विकस्टार्ट में इस्तेमाल किए गए संसाधनों के लिए, आपके GCP खाते पर शुल्क न लगे:

  1. (ज़रूरी नहीं) अपना डेटा सेव करने के लिए, Cloud Composer एनवायरमेंट के लिए Cloud Storage बकेट और इस क्विकस्टार्ट के लिए बनाई गई स्टोरेज बकेट से डेटा डाउनलोड करें.
  2. एनवायरमेंट और अपने बनाए गए एनवायरमेंट के लिए, Cloud Storage बकेट को मिटाना
  3. Cloud Composer एनवायरमेंट को मिटाएं. ध्यान दें कि अपने एनवायरमेंट को मिटाने से, एनवायरमेंट का स्टोरेज बकेट नहीं मिटता.
  4. (ज़रूरी नहीं) सर्वरलेस कंप्यूटिंग की मदद से, हर महीने पहले 20 लाख बार शुरू करने के लिए कोई शुल्क नहीं लिया जाता. साथ ही, फ़ंक्शन को शून्य पर स्केल करने पर, आपसे कोई शुल्क नहीं लिया जाता है (ज़्यादा जानकारी के लिए कीमत देखें). हालांकि, अगर आपको अपना Cloud Function मिटाना है, तो "मिटाएं" पर क्लिक करके ऐसा करें यह पेज के सबसे ऊपर दाईं ओर मौजूद होता है.

4fe11e1b41b32ba2.png

आपके पास प्रोजेक्ट को मिटाने का विकल्प भी होता है:

  1. GCP कंसोल में, प्रोजेक्ट पेज पर जाएं.
  2. प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे आपको मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
  3. बॉक्स में, प्रोजेक्ट आईडी लिखें और फिर प्रोजेक्ट मिटाने के लिए शट डाउन करें पर क्लिक करें.