Node.JS এবং Google ক্লাউড ফাংশন সহ একটি DAG ট্রিগার করা

1. ভূমিকা

Apache Airflow একটি নিয়মিত সময়সূচীতে DAGs চালানোর জন্য ডিজাইন করা হয়েছে, তবে আপনি ইভেন্টগুলির প্রতিক্রিয়া হিসাবে DAGগুলিকে ট্রিগার করতে পারেন, যেমন ক্লাউড স্টোরেজ বাকেটের পরিবর্তন বা ক্লাউড পাব/সাব-এ ঠেলে দেওয়া কোনও বার্তা। এটি সম্পন্ন করার জন্য, ক্লাউড কম্পোজার ডিএজি ক্লাউড ফাংশন দ্বারা ট্রিগার করা যেতে পারে।

ক্লাউড স্টোরেজ বালতিতে যখনই কোনো পরিবর্তন ঘটে তখন এই ল্যাবের উদাহরণটি একটি সাধারণ DAG চালায়। এই DAG ক্লাউড স্টোরেজ বালতিতে কি আপলোড করা হয়েছে সে সম্পর্কে পরিবর্তনের তথ্য মুদ্রণ করার জন্য একটি bash কমান্ড চালানোর জন্য BashOperator ব্যবহার করে।

এই ল্যাবটি শুরু করার আগে, ক্লাউড কম্পোজারের ইন্ট্রো এবং ক্লাউড ফাংশন কোডল্যাবগুলির সাথে শুরু করার সুপারিশ করা হয়৷ আপনি যদি ক্লাউড কম্পোজার কোডল্যাবের ভূমিকায় একটি কম্পোজার এনভায়রনমেন্ট তৈরি করেন, আপনি এই ল্যাবে সেই পরিবেশটি ব্যবহার করতে পারেন।

আপনি কি নির্মাণ করব

এই কোডল্যাবে, আপনি করবেন:

  1. গুগল ক্লাউড স্টোরেজে একটি ফাইল আপলোড করুন, যা হবে
  2. Node.JS রানটাইম ব্যবহার করে একটি Google ক্লাউড ফাংশন ট্রিগার করুন
  3. এই ফাংশনটি Google ক্লাউড কম্পোজারে একটি DAG কার্যকর করবে৷
  4. এটি Google ক্লাউড স্টোরেজ বালতিতে পরিবর্তন মুদ্রণ করার জন্য একটি সাধারণ ব্যাশ কমান্ড চালায়

1d3d3736624a923f.png

আপনি কি শিখবেন

  • গুগল ক্লাউড ফাংশন + Node.js ব্যবহার করে কীভাবে Apache Airflow DAG ট্রিগার করবেন

আপনি কি প্রয়োজন হবে

  • GCP অ্যাকাউন্ট
  • জাভাস্ক্রিপ্টের প্রাথমিক ধারণা
  • ক্লাউড কম্পোজার/এয়ারফ্লো এবং ক্লাউড ফাংশনগুলির প্রাথমিক জ্ঞান
  • CLI কমান্ড ব্যবহার করে আরাম

2. GCP সেট আপ করা

প্রকল্পটি নির্বাচন করুন বা তৈরি করুন

একটি Google ক্লাউড প্ল্যাটফর্ম প্রকল্প নির্বাচন করুন বা তৈরি করুন৷ আপনি যদি একটি নতুন প্রকল্প তৈরি করছেন, এখানে পাওয়া পদক্ষেপগুলি অনুসরণ করুন৷

আপনার প্রকল্প আইডি নোট করুন, যা আপনি পরবর্তী ধাপে ব্যবহার করবেন।

আপনি যদি একটি নতুন প্রকল্প তৈরি করেন, তাহলে প্রজেক্ট আইডিটি নির্মাণ পৃষ্ঠায় প্রকল্পের নামের ঠিক নিচে পাওয়া যাবে

আপনি যদি ইতিমধ্যে একটি প্রকল্প তৈরি করে থাকেন, তাহলে আপনি প্রজেক্ট ইনফো কার্ডে কনসোল হোমপেজে আইডিটি খুঁজে পেতে পারেন

APIs সক্রিয় করুন

ক্লাউড কম্পোজার, Google ক্লাউড ফাংশন এবং ক্লাউড আইডেন্টিটি এবং Google আইডেন্টিটি অ্যান্ড অ্যাক্সেস ম্যানেজমেন্ট (IAM) API সক্ষম করুন৷

কম্পোজার এনভায়রনমেন্ট তৈরি করুন

নিম্নলিখিত কনফিগারেশন সহ একটি ক্লাউড কম্পোজার পরিবেশ তৈরি করুন :

  • নাম: আমার-সুরকার-পরিবেশ
  • অবস্থান: ভৌগোলিকভাবে আপনার সবচেয়ে কাছাকাছি যে অবস্থানই হোক না কেন
  • অঞ্চল: সেই অঞ্চলের যে কোনও অঞ্চল

অন্যান্য সমস্ত কনফিগারেশন তাদের ডিফল্টে থাকতে পারে। নীচে "তৈরি করুন" এ ক্লিক করুন৷ আপনার কম্পোজার এনভায়রনমেন্টের নাম এবং অবস্থানের একটি নোট করুন - ভবিষ্যতের ধাপে আপনার সেগুলি প্রয়োজন হবে৷

ক্লাউড স্টোরেজ বাকেট তৈরি করুন

আপনার প্রকল্পে, নিম্নলিখিত কনফিগারেশন সহ একটি ক্লাউড স্টোরেজ বালতি তৈরি করুন :

  • নাম: <your-project-id>
  • ডিফল্ট স্টোরেজ ক্লাস: বহু-আঞ্চলিক
  • অবস্থান: আপনি যে ক্লাউড কম্পোজার অঞ্চলটি ব্যবহার করছেন তার ভৌগলিকভাবে সবচেয়ে কাছাকাছি যে অবস্থানই হোক না কেন
  • অ্যাক্সেস কন্ট্রোল মডেল: অবজেক্ট-লেভেল এবং বালতি-স্তরের অনুমতি সেট করুন

আপনি প্রস্তুত হলে "তৈরি করুন" টিপুন, পরবর্তী পদক্ষেপের জন্য আপনি আপনার ক্লাউড স্টোরেজ বাকেটের নাম নোট করেছেন তা নিশ্চিত করুন৷

3. Google ক্লাউড ফাংশন সেট আপ করা (GCF)

GCF সেট আপ করতে, আমরা Google Cloud Shell-এ কমান্ড চালাব।

যদিও Google ক্লাউড আপনার ল্যাপটপ থেকে gcloud কমান্ড লাইন টুল ব্যবহার করে দূরবর্তীভাবে পরিচালিত হতে পারে, এই কোডল্যাবে আমরা Google Cloud Shell ব্যবহার করব, ক্লাউডে চলমান একটি কমান্ড লাইন পরিবেশ।

এই ডেবিয়ান-ভিত্তিক ভার্চুয়াল মেশিনটি আপনার প্রয়োজনীয় সমস্ত বিকাশের সরঞ্জামগুলির সাথে লোড করা হয়েছে। এটি একটি ক্রমাগত 5GB হোম ডিরেক্টরি অফার করে এবং Google ক্লাউডে চলে, যা নেটওয়ার্ক কর্মক্ষমতা এবং প্রমাণীকরণকে ব্যাপকভাবে উন্নত করে। এর মানে হল যে এই কোডল্যাবের জন্য আপনার যা দরকার তা হল একটি ব্রাউজার (হ্যাঁ, এটি একটি Chromebook এ কাজ করে)।

Google ক্লাউড শেল সক্রিয় করতে, বিকাশকারী কনসোল থেকে উপরের ডানদিকের বোতামে ক্লিক করুন (এটি পরিবেশের সাথে সংযুক্ত হতে এবং সরবরাহ করতে মাত্র কয়েক মুহূর্ত লাগবে):

ক্লাউড ফাংশন পরিষেবা অ্যাকাউন্টে ব্লব স্বাক্ষর করার অনুমতি দিন

ক্লাউড আইএপি -তে GCF প্রমাণীকরণ করার জন্য, প্রক্সি যা এয়ারফ্লো ওয়েব সার্ভারকে রক্ষা করে, আপনাকে Appspot পরিষেবা অ্যাকাউন্ট GCF-কে Service Account Token Creator ভূমিকা প্রদান করতে হবে। <your-project-id> এর জন্য আপনার প্রকল্পের নাম প্রতিস্থাপন করে, আপনার ক্লাউড শেল-এ নিম্নলিখিত কমান্ডটি চালিয়ে তা করুন।

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

উদাহরণস্বরূপ, যদি আপনার প্রজেক্টকে বলা হয় my-project , আপনার কমান্ড হবে

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

ক্লায়েন্ট আইডি পাওয়া

ক্লাউড আইএপি-তে প্রমাণীকরণের জন্য একটি টোকেন তৈরি করতে, ফাংশনের জন্য প্রক্সির ক্লায়েন্ট আইডি প্রয়োজন যা এয়ারফ্লো ওয়েব সার্ভারকে রক্ষা করে। ক্লাউড কম্পোজার API সরাসরি এই তথ্য প্রদান করে না। পরিবর্তে, এয়ারফ্লো ওয়েব সার্ভারের কাছে একটি অপ্রমাণিত অনুরোধ করুন এবং পুনঃনির্দেশ URL থেকে ক্লায়েন্ট আইডি ক্যাপচার করুন৷ আমরা ক্লায়েন্ট আইডি ক্যাপচার করতে ক্লাউড শেল ব্যবহার করে একটি পাইথন ফাইল চালানোর মাধ্যমে এটি করতে যাচ্ছি।

আপনার ক্লাউড শেলে নিম্নলিখিত কমান্ডটি চালিয়ে গিটহাব থেকে প্রয়োজনীয় কোডটি ডাউনলোড করুন

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

যদি আপনি একটি ত্রুটি পেয়ে থাকেন কারণ এই ডিরেক্টরিটি ইতিমধ্যেই বিদ্যমান, নিম্নলিখিত কমান্ডটি চালিয়ে এটিকে সর্বশেষ সংস্করণে আপডেট করুন

cd python-docs-samples/
git pull origin master

রান করে উপযুক্ত ডিরেক্টরিতে পরিবর্তন করুন

cd python-docs-samples/composer/rest

আপনার ক্লায়েন্ট আইডি পেতে পাইথন কোডটি চালান, <your-project-id> এর জন্য আপনার প্রকল্পের নাম প্রতিস্থাপন করুন, আপনি <your-composer-location> এর জন্য আগে তৈরি করা কম্পোজার পরিবেশের অবস্থান এবং আপনার কম্পোজার পরিবেশের নাম। <your-composer-environment> এর জন্য আগে তৈরি করা হয়েছে

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

উদাহরণস্বরূপ, যদি আপনার প্রজেক্টের নাম হয় my-project , আপনার কম্পোজারের অবস্থান হয় us-central1 , এবং আপনার পরিবেশের নাম হয় my-composer , আপনার কমান্ড হবে

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py নিম্নলিখিত কাজ করে:

  • Google ক্লাউড দিয়ে প্রমাণীকরণ করে
  • রিডাইরেক্ট ইউআরআই পাওয়ার জন্য এয়ারফ্লো ওয়েব সার্ভারে একটি অননুমোদিত HTTP অনুরোধ করে
  • সেই পুনঃনির্দেশ থেকে client_id ক্যোয়ারী প্যারামিটার বের করে
  • আপনি ব্যবহার করার জন্য এটি প্রিন্ট আউট

আপনার ক্লায়েন্ট আইডি কমান্ড লাইনে প্রিন্ট আউট হবে এবং এইরকম কিছু দেখাবে:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. আপনার ফাংশন তৈরি করুন

আপনার ক্লাউড শেলে, প্রয়োজনীয় নমুনা কোডটি চালিয়ে রেপো ক্লোন করুন

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

প্রয়োজনীয় ডিরেক্টরিতে পরিবর্তন করুন এবং পরবর্তী কয়েকটি ধাপ সম্পূর্ণ করার সময় আপনার ক্লাউড শেলটি খোলা রেখে দিন

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

নেভিগেশন মেনুতে ক্লিক করে এবং তারপর "ক্লাউড ফাংশন" এ ক্লিক করে Google ক্লাউড ফাংশন পৃষ্ঠাতে নেভিগেট করুন

পৃষ্ঠার শীর্ষে "CREATE FUNCTION" এ ক্লিক করুন

আপনার ফাংশনটির নাম দিন "মাই-ফাংশন" এবং মেমরিটি ডিফল্টে 256 এমবি রেখে দিন।

ট্রিগারটিকে "ক্লাউড স্টোরেজ"-এ সেট করুন, ইভেন্টের ধরনটিকে "চূড়ান্ত করুন/তৈরি করুন" হিসাবে ছেড়ে দিন এবং একটি ক্লাউড স্টোরেজ বাকেট তৈরি করুন ধাপে আপনার তৈরি করা বালতিতে ব্রাউজ করুন।

সোর্স কোডটি "ইনলাইন এডিটর" এ সেট করুন এবং রানটাইম "Node.js 8" এ সেট করুন

আপনার ক্লাউড শেলে, নিম্নলিখিত কমান্ডটি চালান। এটি ক্লাউড শেল এডিটরে index.js এবং package.json খুলবে

cloudshell edit index.js package.json

package.json ট্যাবে ক্লিক করুন, সেই কোডটি অনুলিপি করুন এবং ক্লাউড ফাংশন ইনলাইন সম্পাদকের package.json বিভাগে পেস্ট করুন

ট্রিগারড্যাগ করতে "চালানোর ফাংশন" সেট করুন

index.js ট্যাবে ক্লিক করুন, কোডটি অনুলিপি করুন এবং ক্লাউড ফাংশন ইনলাইন সম্পাদকের index.js বিভাগে পেস্ট করুন

PROJECT_ID আপনার প্রজেক্ট আইডিতে পরিবর্তন করুন, CLIENT_ID ক্লায়েন্ট আইডিতে পরিবর্তন করুন যা আপনি ক্লায়েন্ট আইডি পাওয়ার ধাপ থেকে সংরক্ষণ করেছেন। যদিও এখনও "তৈরি করুন" এ ক্লিক করবেন না - এখনও আরও কিছু জিনিস পূরণ করতে হবে!

আপনার ক্লাউড শেল-এ, আপনার কম্পোজার পরিবেশের নাম দিয়ে <your-environment-name> এবং আপনার কম্পোজার এনভায়রনমেন্ট অবস্থিত অঞ্চলের সাথে <your-composer-region> প্রতিস্থাপন করে নিম্নলিখিত কমান্ডটি চালান।

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

উদাহরণস্বরূপ, যদি আপনার পরিবেশের নাম দেওয়া হয় my-composer-environment এবং us-central1 এ অবস্থিত আপনার কমান্ড হবে

gcloud composer environments describe my-composer-environment --location us-central1

আউটপুট এই মত কিছু দেখতে হবে:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

সেই আউটপুটে, airflowUri নামক পরিবর্তনশীলটি সন্ধান করুন। আপনার index.js কোডে, WEBSERVER_ID Airflow ওয়েবসার্ভার আইডিতে পরিবর্তন করুন - এটি airflowUri ভেরিয়েবলের অংশ যার শেষে একটি '-tp' থাকবে, উদাহরণস্বরূপ, abc123efghi456k-tp

"আরো" ড্রপডাউন লিঙ্কে ক্লিক করুন, তারপর ভৌগলিকভাবে আপনার সবচেয়ে কাছের অঞ্চলটি বেছে নিন

"ব্যর্থতার উপর পুনরায় চেষ্টা করুন" চেক করুন

আপনার ক্লাউড ফাংশন তৈরি করতে "তৈরি করুন" এ ক্লিক করুন

কোড মাধ্যমে পদক্ষেপ

আপনি index.js থেকে যে কোডটি কপি করেছেন তা এইরকম দেখতে হবে:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

চলুন দেখে নেওয়া যাক কি হচ্ছে. এখানে তিনটি ফাংশন আছে: triggerDag , authorizeIap , এবং makeIapPostRequest

triggerDag হল সেই ফাংশন যা ট্রিগার হয় যখন আমরা নির্দিষ্ট ক্লাউড স্টোরেজ বালতিতে কিছু আপলোড করি। এখানেই আমরা অন্যান্য অনুরোধে ব্যবহৃত গুরুত্বপূর্ণ ভেরিয়েবলগুলি কনফিগার করি, যেমন PROJECT_ID , CLIENT_ID , WEBSERVER_ID , এবং DAG_NAME ৷ এটি authorizeIap এবং makeIapPostRequest কল করে।

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap প্রক্সির কাছে একটি অনুরোধ করে যা Airflow ওয়েব সার্ভারকে সুরক্ষা দেয়, একটি পরিষেবা অ্যাকাউন্ট ব্যবহার করে এবং একটি JWT "আদান-প্রদান" করে একটি ID টোকেনের জন্য যা makeIapPostRequest কে প্রমাণীকরণ করতে ব্যবহার করা হবে।

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest composer_sample_trigger_response_dag. DAG নামটি url প্যারামিটারের সাথে পাস করা Airflow ওয়েবসার্ভার URL-এ এমবেড করা আছে এবং idToken হল টোকেন যা আমরা authorizeIap অনুরোধে পেয়েছি।

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. আপনার DAG সেট আপ করুন

আপনার ক্লাউড শেলে, নমুনা ওয়ার্কফ্লো সহ ডিরেক্টরিতে পরিবর্তন করুন। এটি পাইথন-ডক্স-নমুনাগুলির অংশ যা আপনি ক্লায়েন্ট আইডি পাওয়ার ধাপে গিটহাব থেকে ডাউনলোড করেছেন।

cd
cd python-docs-samples/composer/workflows

কম্পোজারে DAG আপলোড করুন

নিম্নলিখিত কমান্ডের সাহায্যে আপনার কম্পোজার পরিবেশের DAG স্টোরেজ বালতিতে নমুনা DAG আপলোড করুন, যেখানে <environment_name> আপনার কম্পোজার পরিবেশের নাম এবং <location> হল সেই অঞ্চলের নাম যেখানে এটি অবস্থিত। trigger_response_dag.py হল DAG যার সাথে আমরা কাজ করব।

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

উদাহরণস্বরূপ, যদি আপনার কম্পোজার এনভায়রনমেন্টকে my-composer নাম দেওয়া হয় এবং us-central1 এ অবস্থিত হয়, তাহলে আপনার কমান্ড হবে

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

ডিএজির মাধ্যমে পদক্ষেপ

trigger_response.py এ DAG কোড দেখতে এইরকম

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args বিভাগে Apache Airflow-এ BaseOperator মডেলের প্রয়োজনীয় ডিফল্ট আর্গুমেন্ট রয়েছে। আপনি যেকোন Apache Airflow DAG-তে এই পরামিতি সহ এই বিভাগটি দেখতে পাবেন। owner বর্তমানে Composer Example সেট করা আছে, কিন্তু আপনি চাইলে সেটিকে আপনার নাম হিসেবে পরিবর্তন করতে পারেন। depends_on_past আমাদের দেখায় যে এই DAG কোনো পূর্ববর্তী DAG-এর উপর নির্ভরশীল নয়। তিনটি ইমেল বিভাগ, email , email_on_failure , এবং email_on_retry সেট করা হয়েছে যাতে এই DAG-এর অবস্থার উপর ভিত্তি করে কোনো ইমেল বিজ্ঞপ্তি না আসে৷ DAG শুধুমাত্র একবার পুনরায় চেষ্টা করবে, কারণ retries 1 তে সেট করা হয়েছে, এবং প্রতি retry_delay প্রতি পাঁচ মিনিট পরে তা করবে। start_date সাধারণত নির্দেশ করে যে কখন একটি DAG চালানো উচিত, তার schedule_interval সাথে একত্রে (পরে সেট করা হবে) কিন্তু এই DAG-এর ক্ষেত্রে এটি প্রাসঙ্গিক নয়। এটি 1 জানুয়ারী, 2017 এ সেট করা হয়েছে, তবে যেকোন অতীতের তারিখে সেট করা যেতে পারে।

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG বিভাগ DAG কনফিগার করে যা চালানো হবে। এটি টাস্ক আইডি composer_sample_trigger_response_dag , default_args বিভাগ থেকে ডিফল্ট আর্গুমেন্ট এবং সবচেয়ে গুরুত্বপূর্ণভাবে, None এর একটি schedule_interval সহ চালানো হবে। schedule_interval কোনটিতে সেট করা None কারণ আমরা আমাদের ক্লাউড ফাংশন দিয়ে এই বিশেষ DAG ট্রিগার করছি। এই কারণেই default_argsstart_date প্রাসঙ্গিক নয়।

যখন এটি কার্যকর হয়, DAG তার কনফিগারেশন প্রিন্ট করে, যেমন print_gcs_info ভেরিয়েবলে নির্দেশ করা হয়েছে।

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. আপনার ফাংশন পরীক্ষা করুন

আপনার কম্পোজার এনভায়রনমেন্ট খুলুন এবং আপনার পরিবেশের নাম সহ সারিতে, এয়ারফ্লো লিঙ্কে ক্লিক করুন

এর নামের উপর ক্লিক করে composer_sample_trigger_response_dag খুলুন। এই মুহূর্তে DAG রানের কোনো প্রমাণ পাওয়া যাবে না, কারণ আমরা এখনও DAG-কে চালানোর জন্য ট্রিগার করিনি৷ যদি এই DAG দৃশ্যমান বা ক্লিকযোগ্য না হয়, তাহলে এক মিনিট অপেক্ষা করুন এবং পৃষ্ঠাটি রিফ্রেশ করুন৷

একটি পৃথক ট্যাব খুলুন এবং আপনার ক্লাউড ফাংশনের জন্য ট্রিগার হিসাবে নির্দিষ্ট করা আপনার আগে তৈরি করা ক্লাউড স্টোরেজ বাকেটটিতে যেকোনো ফাইল আপলোড করুন। আপনি কনসোলের মাধ্যমে বা একটি gsutil কমান্ড ব্যবহার করে এটি করতে পারেন।

আপনার এয়ারফ্লো UI দিয়ে ট্যাবে ফিরে যান এবং গ্রাফ ভিউতে ক্লিক করুন

print_gcs_info টাস্কে ক্লিক করুন, যা সবুজ রঙে আউটলাইন করা উচিত

মেনুর উপরের ডানদিকে "লগ দেখুন" এ ক্লিক করুন

লগগুলিতে, আপনি আপনার ক্লাউড স্টোরেজ বালতিতে আপলোড করা ফাইল সম্পর্কে তথ্য দেখতে পাবেন।

অভিনন্দন! আপনি Node.js এবং Google ক্লাউড ফাংশন ব্যবহার করে একটি Airflow DAG ট্রিগার করেছেন!

7. পরিষ্কার করা

এই কুইকস্টার্টে ব্যবহৃত রিসোর্সের জন্য আপনার GCP অ্যাকাউন্টে চার্জ এড়াতে:

  1. (ঐচ্ছিক) আপনার ডেটা সংরক্ষণ করতে, ক্লাউড কম্পোজার পরিবেশের জন্য ক্লাউড স্টোরেজ বাকেট থেকে ডেটা ডাউনলোড করুন এবং এই কুইকস্টার্টের জন্য আপনার তৈরি স্টোরেজ বাকেট।
  2. আপনার তৈরি করা পরিবেশের জন্য ক্লাউড স্টোরেজ বালতি মুছুন
  3. ক্লাউড কম্পোজার পরিবেশ মুছুন । মনে রাখবেন যে আপনার পরিবেশ মুছে ফেলার ফলে পরিবেশের জন্য স্টোরেজ বালতি মুছে যায় না।
  4. (ঐচ্ছিক) সার্ভারলেস কম্পিউটিং সহ, প্রতি মাসে প্রথম 2 মিলিয়ন আহ্বান বিনামূল্যে, এবং আপনি যখন আপনার কার্যকারিতা শূন্যে স্কেল করেন, তখন আপনাকে চার্জ করা হবে না (আরো বিশদ বিবরণের জন্য মূল্য দেখুন)। যাইহোক, আপনি যদি আপনার ক্লাউড ফাংশন মুছতে চান, তাহলে আপনার ফাংশনের জন্য ওভারভিউ পৃষ্ঠার উপরের ডানদিকে "মুছে ফেলুন" ক্লিক করে তা করুন

4fe11e1b41b32ba2.png

আপনি ঐচ্ছিকভাবে প্রকল্পটি মুছে ফেলতে পারেন:

  1. GCP কনসোলে, প্রকল্প পৃষ্ঠায় যান।
  2. প্রকল্প তালিকায়, আপনি যে প্রকল্পটি মুছতে চান সেটি নির্বাচন করুন এবং মুছুন ক্লিক করুন।
  3. বাক্সে, প্রজেক্ট আইডি টাইপ করুন এবং তারপরে প্রজেক্ট মুছে ফেলতে শাট ডাউন ক্লিক করুন।