একটি Dataproc ক্লাস্টারে একটি Hadoop wordcount কাজ চালানো

1. ভূমিকা

ওয়ার্কফ্লো হল ডেটা অ্যানালিটিক্সে একটি সাধারণ ব্যবহারের ক্ষেত্রে - এর মধ্যে অর্থপূর্ণ তথ্য খোঁজার জন্য ডেটা গ্রহণ করা, রূপান্তর করা এবং বিশ্লেষণ করা জড়িত। Google ক্লাউড প্ল্যাটফর্মে, ওয়ার্কফ্লো অর্কেস্ট্রেট করার টুল হল ক্লাউড কম্পোজার, যা জনপ্রিয় ওপেন সোর্স ওয়ার্কফ্লো টুল Apache Airflow-এর হোস্ট করা সংস্করণ। এই ল্যাবে, আপনি একটি সাধারণ ওয়ার্কফ্লো তৈরি করতে ক্লাউড কম্পোজার ব্যবহার করবেন যা একটি ক্লাউড ডেটাপ্রোক ক্লাস্টার তৈরি করে, ক্লাউড ডেটাপ্রোক এবং অ্যাপাচি হ্যাডুপ ব্যবহার করে এটি বিশ্লেষণ করে, তারপরে ক্লাউড ডেটাপ্রোক ক্লাস্টার মুছে ফেলবে।

ক্লাউড কম্পোজার কি?

ক্লাউড কম্পোজার হল একটি সম্পূর্ণরূপে পরিচালিত ওয়ার্কফ্লো অর্কেস্ট্রেশন পরিষেবা যা আপনাকে ক্লাউড এবং অন-প্রিমিসেস ডেটা সেন্টার জুড়ে বিস্তৃত পাইপলাইন লেখক, সময়সূচী এবং নিরীক্ষণ করার ক্ষমতা দেয়৷ জনপ্রিয় Apache Airflow ওপেন সোর্স প্রজেক্টে নির্মিত এবং Python প্রোগ্রামিং ভাষা ব্যবহার করে পরিচালিত, ক্লাউড কম্পোজার লক-ইন থেকে মুক্ত এবং ব্যবহার করা সহজ।

Apache Airflow-এর একটি স্থানীয় উদাহরণের পরিবর্তে ক্লাউড কম্পোজার ব্যবহার করে, ব্যবহারকারীরা কোনো ইনস্টলেশন বা ব্যবস্থাপনা ওভারহেড ছাড়াই সেরা এয়ারফ্লো থেকে উপকৃত হতে পারেন।

Apache Airflow কি?

Apache Airflow হল একটি ওপেন সোর্স টুল যা প্রোগ্রাম্যাটিকভাবে লেখক, সময়সূচী এবং ওয়ার্কফ্লো নিরীক্ষণ করতে ব্যবহৃত হয়। বায়ুপ্রবাহের সাথে সম্পর্কিত মনে রাখার জন্য কয়েকটি মূল শর্ত রয়েছে যা আপনি ল্যাব জুড়ে দেখতে পাবেন:

  • DAG - একটি DAG (নির্দেশিত অ্যাসাইক্লিক গ্রাফ) হল সংগঠিত কাজের একটি সংগ্রহ যা আপনি সময়সূচী এবং চালাতে চান। DAGs, যাকে ওয়ার্কফ্লোও বলা হয়, স্ট্যান্ডার্ড পাইথন ফাইলগুলিতে সংজ্ঞায়িত করা হয়
  • অপারেটর - একটি অপারেটর একটি ওয়ার্কফ্লোতে একটি একক কাজ বর্ণনা করে

Cloud Dataproc কি?

ক্লাউড ডেটাপ্রোক হল Google ক্লাউড প্ল্যাটফর্মের সম্পূর্ণরূপে পরিচালিত Apache Spark এবং Apache Hadoop পরিষেবা৷ ক্লাউড ডেটাপ্রোক সহজেই অন্যান্য GCP পরিষেবাগুলির সাথে একীভূত হয়, আপনাকে ডেটা প্রক্রিয়াকরণ, বিশ্লেষণ এবং মেশিন লার্নিংয়ের জন্য একটি শক্তিশালী এবং সম্পূর্ণ প্ল্যাটফর্ম দেয়।

আপনি কি করবেন

এই কোডল্যাব আপনাকে দেখায় কিভাবে ক্লাউড কম্পোজারে একটি অ্যাপাচি এয়ারফ্লো ওয়ার্কফ্লো তৈরি এবং চালাতে হয় যা নিম্নলিখিত কাজগুলি সম্পূর্ণ করে:

  • একটি ক্লাউড ডেটাপ্রোক ক্লাস্টার তৈরি করে
  • ক্লাস্টারে একটি Apache Hadoop wordcount কাজ চালায় এবং ক্লাউড স্টোরেজে এর ফলাফল আউটপুট করে
  • ক্লাস্টার মুছে দেয়

আপনি কি শিখবেন

  • ক্লাউড কম্পোজারে কীভাবে অ্যাপাচি এয়ারফ্লো ওয়ার্কফ্লো তৈরি এবং চালানো যায়
  • একটি ডেটাসেটে বিশ্লেষণ চালানোর জন্য ক্লাউড কম্পোজার এবং ক্লাউড ডেটাপ্রোক কীভাবে ব্যবহার করবেন
  • গুগল ক্লাউড প্ল্যাটফর্ম কনসোল, ক্লাউড এসডিকে এবং এয়ারফ্লো ওয়েব ইন্টারফেসের মাধ্যমে কীভাবে আপনার ক্লাউড কম্পোজার পরিবেশে অ্যাক্সেস করবেন

আপনি কি প্রয়োজন হবে

  • GCP অ্যাকাউন্ট
  • প্রাথমিক CLI জ্ঞান
  • পাইথন সম্পর্কে প্রাথমিক ধারণা

2. GCP সেট আপ করা

প্রকল্পটি তৈরি করুন

একটি Google ক্লাউড প্ল্যাটফর্ম প্রকল্প নির্বাচন করুন বা তৈরি করুন৷

আপনার প্রকল্প আইডি নোট করুন, যা আপনি পরবর্তী ধাপে ব্যবহার করবেন।

আপনি যদি একটি নতুন প্রকল্প তৈরি করেন, তাহলে প্রজেক্ট আইডিটি নির্মাণ পৃষ্ঠায় প্রকল্পের নামের ঠিক নিচে পাওয়া যাবে

আপনি যদি ইতিমধ্যে একটি প্রকল্প তৈরি করে থাকেন, তাহলে আপনি প্রজেক্ট ইনফো কার্ডে কনসোল হোমপেজে আইডিটি খুঁজে পেতে পারেন

APIs সক্রিয় করুন

ক্লাউড কম্পোজার, ক্লাউড ডেটাপ্রোক, এবং ক্লাউড স্টোরেজ এপিআই সক্ষম করুন । একবার সেগুলি সক্ষম হয়ে গেলে, আপনি "শংসাপত্রে যান" বোতামটি উপেক্ষা করতে পারেন এবং টিউটোরিয়ালের পরবর্তী ধাপে এগিয়ে যেতে পারেন৷

কম্পোজার এনভায়রনমেন্ট তৈরি করুন

নিম্নলিখিত কনফিগারেশন সহ একটি ক্লাউড কম্পোজার পরিবেশ তৈরি করুন :

  • নাম: আমার-সুরকার-পরিবেশ
  • অবস্থান: us-central1
  • অঞ্চল: us-central1-a

অন্যান্য সমস্ত কনফিগারেশন তাদের ডিফল্টে থাকতে পারে। নীচে "তৈরি করুন" ক্লিক করুন।

ক্লাউড স্টোরেজ বাকেট তৈরি করুন

আপনার প্রকল্পে, নিম্নলিখিত কনফিগারেশন সহ একটি ক্লাউড স্টোরেজ বালতি তৈরি করুন :

  • নাম: <your-project-id>
  • ডিফল্ট স্টোরেজ ক্লাস: বহু-আঞ্চলিক
  • অবস্থান: মার্কিন যুক্তরাষ্ট্র
  • অ্যাক্সেস কন্ট্রোল মডেল: সূক্ষ্ম দানাদার

আপনি প্রস্তুত হলে "তৈরি করুন" টিপুন

3. Apache Airflow সেট আপ করা

কম্পোজার এনভায়রনমেন্ট তথ্য দেখা

GCP কনসোলে, পরিবেশ পৃষ্ঠা খুলুন

এর বিস্তারিত দেখতে পরিবেশের নামের উপর ক্লিক করুন।

এনভায়রনমেন্ট বিশদ পৃষ্ঠা তথ্য প্রদান করে, যেমন Airflow ওয়েব ইন্টারফেস URL, Google Kubernetes Engine ক্লাস্টার আইডি, ক্লাউড স্টোরেজ বাকেটের নাম এবং /dags ফোল্ডারের পথ।

এয়ারফ্লোতে, একটি DAG (নির্দেশিত অ্যাসাইক্লিক গ্রাফ) হল সংগঠিত কাজের একটি সংগ্রহ যা আপনি সময়সূচী করতে এবং চালাতে চান। DAGs, যাকে ওয়ার্কফ্লোও বলা হয়, স্ট্যান্ডার্ড পাইথন ফাইলগুলিতে সংজ্ঞায়িত করা হয়। ক্লাউড কম্পোজার শুধুমাত্র /ড্যাগস ফোল্ডারে DAGগুলি নির্ধারণ করে। /ড্যাগস ফোল্ডারটি ক্লাউড স্টোরেজ বাকেটের মধ্যে রয়েছে যা আপনি যখন আপনার পরিবেশ তৈরি করেন তখন ক্লাউড কম্পোজার স্বয়ংক্রিয়ভাবে তৈরি করে।

অ্যাপাচি এয়ারফ্লো এনভায়রনমেন্ট ভেরিয়েবল সেট করা

Apache Airflow ভেরিয়েবল হল একটি বায়ুপ্রবাহ-নির্দিষ্ট ধারণা যা পরিবেশ ভেরিয়েবল থেকে আলাদা। এই ধাপে, আপনি নিম্নলিখিত তিনটি এয়ারফ্লো ভেরিয়েবল সেট করবেন: gcp_project , gcs_bucket , এবং gce_zone

ভেরিয়েবল সেট করতে gcloud ব্যবহার করে

প্রথমে, আপনার ক্লাউড শেল খুলুন, যেখানে আপনার জন্য ক্লাউড SDK সুবিধাজনকভাবে ইনস্টল করা আছে।

এনভায়রনমেন্ট ভেরিয়েবল COMPOSER_INSTANCE আপনার কম্পোজার এনভায়রনমেন্টের নামে সেট করুন

COMPOSER_INSTANCE=my-composer-environment

gcloud কমান্ড-লাইন টুল ব্যবহার করে Airflow ভেরিয়েবল সেট করতে, variables সাব-কমান্ড সহ gcloud composer environments run কমান্ড ব্যবহার করুন। এই gcloud composer কমান্ড এয়ারফ্লো CLI সাব-কমান্ড variables চালায়। সাব-কমান্ড gcloud কমান্ড লাইন টুলে আর্গুমেন্ট পাস করে।

আপনি এই কমান্ডটি তিনবার চালাবেন, আপনার প্রকল্পের সাথে প্রাসঙ্গিকগুলির সাথে ভেরিয়েবলগুলি প্রতিস্থাপন করবেন।

নিচের কমান্ডটি ব্যবহার করে gcp_project সেট করুন, <your-project-id> কে প্রতিস্থাপন করুন যেটি প্রকল্প আইডি আপনি ধাপ 2 এ নোট করেছেন।

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

আপনার আউটপুট এই মত কিছু দেখাবে

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

নিচের কমান্ডটি ব্যবহার করে gcs_bucket সেট করুন, <your-bucket-name> এর পরিবর্তে ধাপ 2-এ আপনি যে বালতি আইডিটি নোট করেছেন তা দিয়ে। আপনি যদি আমাদের সুপারিশ অনুসরণ করেন, আপনার বালতির নাম আপনার প্রকল্প আইডির মতোই। আপনার আউটপুট আগের কমান্ডের অনুরূপ হবে।

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

নিম্নলিখিত কমান্ড ব্যবহার করে gce_zone সেট করুন। আপনার আউটপুট আগের কমান্ডের অনুরূপ হবে।

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(ঐচ্ছিক) একটি পরিবর্তনশীল দেখতে gcloud ব্যবহার করে

একটি ভেরিয়েবলের মান দেখতে, get আর্গুমেন্ট সহ Airflow CLI সাব-কমান্ড variables চালান বা Airflow UI ব্যবহার করুন।

যেমন:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

আপনি এইমাত্র সেট করা তিনটি ভেরিয়েবলের যেকোনো একটি দিয়ে এটি করতে পারেন: gcp_project , gcs_bucket , এবং gce_zone

4. নমুনা কর্মপ্রবাহ

আসুন আমরা ধাপ 5 এ যে DAG-এর কোডটি ব্যবহার করব তা একবার দেখে নেওয়া যাক। ফাইল ডাউনলোড করার বিষয়ে এখনও চিন্তা করবেন না, শুধু এখানে অনুসরণ করুন।

এখানে আনপ্যাক করার জন্য অনেক কিছু আছে, তাই আসুন এটিকে একটু ভেঙে দেই।

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

আমরা কিছু এয়ারফ্লো আমদানি দিয়ে শুরু করি:

  • airflow.models - আমাদেরকে এয়ারফ্লো ডাটাবেসে ডেটা অ্যাক্সেস এবং তৈরি করার অনুমতি দেয়।
  • airflow.contrib.operators - সম্প্রদায়ের অপারেটররা যেখানে বাস করে। এই ক্ষেত্রে, ক্লাউড ডেটাপ্রোক API অ্যাক্সেস করার জন্য আমাদের dataproc_operator প্রয়োজন।
  • airflow.utils.trigger_rule - আমাদের অপারেটরগুলিতে ট্রিগার নিয়ম যোগ করার জন্য। ট্রিগার নিয়মগুলি একটি অপারেটরকে তার পিতামাতার অবস্থার সাথে সম্পৃক্ত করা উচিত কিনা তার উপর সূক্ষ্ম নিয়ন্ত্রণের অনুমতি দেয়।
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

এটি আমাদের আউটপুট ফাইলের অবস্থান নির্দিষ্ট করে। এখানে উল্লেখযোগ্য লাইন হল models.Variable.get('gcs_bucket') যা এয়ারফ্লো ডাটাবেস থেকে gcs_bucket ভেরিয়েবলের মান ধরবে।

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - .jar ফাইলের অবস্থান আমরা অবশেষে ক্লাউড ডেটাপ্রোক ক্লাস্টারে চালাব। এটি ইতিমধ্যেই আপনার জন্য GCP-এ হোস্ট করা আছে।
  • input_file - ফাইলের অবস্থান যেখানে ডেটা রয়েছে আমাদের Hadoop কাজ শেষ পর্যন্ত গণনা করবে। আমরা ধাপ 5 এ একসাথে সেই অবস্থানে ডেটা আপলোড করব।
  • wordcount_args - আর্গুমেন্ট যা আমরা জার ফাইলে পাস করব।
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

এটি আমাদের একটি ডেটটাইম অবজেক্ট দেবে যা আগের দিনের মধ্যরাতের সমতুল্য। উদাহরণস্বরূপ, যদি এটি 4 ঠা মার্চ 11:00 এ কার্যকর করা হয়, তাহলে তারিখ সময় বস্তুটি 3রা মার্চ 00:00 প্রতিনিধিত্ব করবে৷ এটি কিভাবে এয়ারফ্লো সময়সূচী পরিচালনা করে তার সাথে সম্পর্কিত। যে সম্পর্কে আরও তথ্য এখানে পাওয়া যাবে.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

একটি অভিধান আকারে default_dag_args ভেরিয়েবলটি সরবরাহ করা উচিত যখনই একটি নতুন DAG তৈরি করা হয়:

  • 'email_on_failure' - কোনো টাস্ক ব্যর্থ হলে ইমেল সতর্কতা পাঠানো উচিত কিনা তা নির্দেশ করে
  • 'email_on_retry' - নির্দেশ করে যে একটি টাস্ক পুনরায় চেষ্টা করার সময় ইমেল সতর্কতা পাঠানো উচিত কিনা
  • 'retries' - একটি DAG ব্যর্থতার ক্ষেত্রে এয়ারফ্লোকে কতগুলি পুনরায় চেষ্টা করা উচিত তা নির্দেশ করে
  • 'retry_delay' - পুনরায় চেষ্টা করার আগে কতক্ষণ এয়ারফ্লো অপেক্ষা করতে হবে তা নির্দেশ করে
  • 'project_id' - DAG কে বলে যে এটির সাথে কোন GCP প্রোজেক্ট আইডি যুক্ত করতে হবে, যা পরে Dataproc অপারেটরের সাথে প্রয়োজন হবে
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

with models.DAG ব্যবহার করা। ডিএজি স্ক্রিপ্টকে একই DAG-এর ভিতরে তার নীচের সবকিছু অন্তর্ভুক্ত করতে বলে। আমরা তিনটি আর্গুমেন্টও দেখতে পাচ্ছি:

  • প্রথম, একটি স্ট্রিং, DAG দেওয়ার নাম যা আমরা তৈরি করছি। এই ক্ষেত্রে, আমরা composer_hadoop_tutorial ব্যবহার করছি।
  • schedule_interval - একটি datetime.timedelta অবজেক্ট, যা এখানে আমরা একদিন সেট করেছি। এর মানে হল যে এই DAG 'start_date' এর পর দিনে একবার কার্যকর করার চেষ্টা করবে যা 'default_dag_args' এ আগে সেট করা হয়েছিল
  • default_args - DAG-এর জন্য ডিফল্ট আর্গুমেন্ট ধারণ করে আমরা আগে তৈরি করা অভিধানটি

একটি Dataproc ক্লাস্টার তৈরি করুন

এরপর, আমরা একটি dataproc_operator.DataprocClusterCreateOperator তৈরি করব।DataprocClusterCreateOperator যা একটি ক্লাউড ডেটাপ্রোক ক্লাস্টার তৈরি করে।

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

এই অপারেটরের মধ্যে, আমরা কয়েকটি আর্গুমেন্ট দেখতে পাই, যার মধ্যে প্রথমটি এই অপারেটরের জন্য নির্দিষ্ট:

  • task_id - BashOperator-এর মতই, এই নামটি আমরা অপারেটরকে বরাদ্দ করি, যা Airflow UI থেকে দেখা যায়
  • cluster_name - যে নামটি আমরা Cloud Dataproc ক্লাস্টার নির্ধারণ করি। এখানে, আমরা এর নাম দিয়েছি composer-hadoop-tutorial-cluster- (ঐচ্ছিক অতিরিক্ত তথ্যের জন্য তথ্য বাক্স দেখুন)
  • num_workers - ক্লাউড ডেটাপ্রোক ক্লাস্টারে আমরা যত কর্মী বরাদ্দ করি
  • zone - ভৌগলিক অঞ্চল যেখানে আমরা ক্লাস্টারটি বাস করতে চাই, যেমন এয়ারফ্লো ডাটাবেসের মধ্যে সংরক্ষিত। এটি 'gce_zone' ভেরিয়েবলটি পড়বে যা আমরা ধাপ 3 এ সেট করেছি
  • master_machine_type - আমরা ক্লাউড ডেটাপ্রোক মাস্টারকে যে ধরনের মেশিন বরাদ্দ করতে চাই
  • worker_machine_type - আমরা ক্লাউড ডেটাপ্রোক কর্মীকে যে ধরনের মেশিন বরাদ্দ করতে চাই

একটি Apache Hadoop চাকরি জমা দিন

dataproc_operator.DataProcHadoopOperator আমাদের একটি ক্লাউড ডেটাপ্রোক ক্লাস্টারে একটি কাজ জমা দেওয়ার অনুমতি দেয়।

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

আমরা বেশ কয়েকটি পরামিতি প্রদান করি:

  • task_id - আমরা DAG-এর এই অংশটিকে যে নাম বরাদ্দ করি
  • main_jar - .jar ফাইলের অবস্থান যা আমরা ক্লাস্টারের বিপরীতে চালাতে চাই
  • cluster_name - এর বিরুদ্ধে কাজ চালানোর জন্য ক্লাস্টারের নাম, যা আপনি লক্ষ্য করবেন আগের অপারেটরে আমরা যা পেয়েছি তার সাথে একই রকম
  • arguments - আর্গুমেন্ট যা জার ফাইলে পাস করা হয়, যেমন আপনি যদি কমান্ড লাইন থেকে .jar ফাইলটি চালান

ক্লাস্টার মুছুন

শেষ অপারেটরটি আমরা তৈরি করব dataproc_operator.DataprocClusterDeleteOperator

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

নাম অনুসারে, এই অপারেটর একটি প্রদত্ত ক্লাউড ডেটাপ্রোক ক্লাস্টার মুছে ফেলবে। আমরা এখানে তিনটি যুক্তি দেখতে পাই:

  • task_id - BashOperator-এর মতই, এই নামটি আমরা অপারেটরকে বরাদ্দ করি, যা Airflow UI থেকে দেখা যায়
  • cluster_name - যে নামটি আমরা Cloud Dataproc ক্লাস্টার নির্ধারণ করি। এখানে, আমরা এর নাম দিয়েছি composer-hadoop-tutorial-cluster- (ঐচ্ছিক অতিরিক্ত তথ্যের জন্য "একটি ডেটাপ্রোক ক্লাস্টার তৈরি করুন" এর পরে তথ্য বাক্স দেখুন)
  • trigger_rule - আমরা এই ধাপের শুরুতে আমদানির সময় সংক্ষিপ্তভাবে ট্রিগারের নিয়ম উল্লেখ করেছি, কিন্তু এখানে আমাদের একটি কাজ আছে। ডিফল্টরূপে, একটি এয়ারফ্লো অপারেটর কার্যকর করে না যদি না তার সমস্ত আপস্ট্রিম অপারেটর সফলভাবে সম্পন্ন না হয়। ALL_DONE ট্রিগার নিয়মের জন্য শুধুমাত্র প্রয়োজন যে সমস্ত আপস্ট্রিম অপারেটর সম্পূর্ণ করেছে, তারা সফল হয়েছে কিনা তা নির্বিশেষে। এখানে এর মানে হল যে Hadoop কাজ ব্যর্থ হলেও, আমরা এখনও ক্লাস্টারটি ভেঙে ফেলতে চাই।
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

সবশেষে, আমরা চাই এই অপারেটরগুলো একটি নির্দিষ্ট ক্রমে কার্যকর করুক, এবং আমরা পাইথন বিটশিফ্ট অপারেটর ব্যবহার করে এটি বোঝাতে পারি। এই ক্ষেত্রে, create_dataproc_cluster সর্বদা প্রথমে চালাবে, তারপর run_dataproc_hadoop এবং অবশেষে delete_dataproc_cluster

এটি সব একসাথে রাখা, কোড এই মত দেখায়:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. ক্লাউড স্টোরেজে এয়ারফ্লো ফাইল আপলোড করুন

আপনার /ড্যাগস ফোল্ডারে DAG কপি করুন

  1. প্রথমে, আপনার ক্লাউড শেল খুলুন, যেখানে আপনার জন্য ক্লাউড SDK সুবিধাজনকভাবে ইনস্টল করা আছে।
  2. পাইথন নমুনা রেপো ক্লোন করুন এবং কম্পোজার/ওয়ার্কফ্লো ডিরেক্টরিতে পরিবর্তন করুন
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. আপনার DAGs ফোল্ডারের নাম একটি এনভায়রনমেন্ট ভেরিয়েবলে সেট করতে নিম্নলিখিত কমান্ডটি চালান
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. টিউটোরিয়াল কোডটি কপি করতে নিম্নলিখিত gsutil কমান্ডটি চালান যেখানে আপনার /dags ফোল্ডার তৈরি করা হয়েছে
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

আপনার আউটপুট এই মত কিছু দেখাবে:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. এয়ারফ্লো UI ব্যবহার করা

GCP কনসোল ব্যবহার করে Airflow ওয়েব ইন্টারফেস অ্যাক্সেস করতে:

  1. পরিবেশ পৃষ্ঠা খুলুন।
  2. পরিবেশের জন্য এয়ারফ্লো ওয়েব সার্ভার কলামে, নতুন উইন্ডো আইকনে ক্লিক করুন। Airflow ওয়েব UI একটি নতুন ব্রাউজার উইন্ডোতে খোলে৷

এয়ারফ্লো UI সম্পর্কে তথ্যের জন্য, ওয়েব ইন্টারফেস অ্যাক্সেস করা দেখুন।

ভেরিয়েবল দেখুন

আপনি আগে সেট করা ভেরিয়েবলগুলি আপনার পরিবেশে টিকে থাকে। আপনি এয়ারফ্লো UI মেনু বার থেকে অ্যাডমিন > ভেরিয়েবল নির্বাচন করে ভেরিয়েবল দেখতে পারেন।

List tab is selected and shows a table with the following keys and values key: gcp_project, value: project-id key: gcs_bucket, value: gs://bucket-name key: gce_zone, value: zone

DAG রান অন্বেষণ

আপনি যখন ক্লাউড স্টোরেজের dags ফোল্ডারে আপনার DAG ফাইল আপলোড করেন, তখন ক্লাউড কম্পোজার ফাইলটি পার্স করে। যদি কোনো ত্রুটি পাওয়া না যায়, তাহলে ওয়ার্কফ্লোটির নাম DAG তালিকায় প্রদর্শিত হবে, এবং ওয়ার্কফ্লো অবিলম্বে চালানোর জন্য সারিবদ্ধ হবে। আপনার DAGs দেখতে, পৃষ্ঠার শীর্ষে DAGs- এ ক্লিক করুন৷

84a29c71f20bff98.png

DAG বিবরণ পৃষ্ঠা খুলতে composer_hadoop_tutorial ক্লিক করুন। এই পৃষ্ঠায় ওয়ার্কফ্লো কাজ এবং নির্ভরতাগুলির একটি গ্রাফিক্যাল উপস্থাপনা অন্তর্ভুক্ত রয়েছে।

f4f1663c7a37f47c.png

এখন, টুলবারে, গ্রাফ ভিউ- এ ক্লিক করুন এবং তারপর প্রতিটি কাজের জন্য গ্রাফিকের স্থিতি দেখতে মাউসওভার করুন। মনে রাখবেন যে প্রতিটি টাস্কের চারপাশের সীমানাও অবস্থা নির্দেশ করে (সবুজ সীমানা = চলমান; লাল = ব্যর্থ, ইত্যাদি)।

4c5a0c6fa9f88513.png

গ্রাফ ভিউ থেকে আবার ওয়ার্কফ্লো চালানোর জন্য:

  1. এয়ারফ্লো UI গ্রাফ ভিউতে, create_dataproc_cluster গ্রাফিকে ক্লিক করুন।
  2. তিনটি কাজ পুনরায় সেট করতে সাফ ক্লিক করুন এবং তারপর নিশ্চিত করতে ঠিক আছে ক্লিক করুন।

fd1b23b462748f47.png

এছাড়াও আপনি নিম্নলিখিত GCP কনসোল পৃষ্ঠাগুলিতে গিয়ে composer-hadoop-tutorial ওয়ার্কফ্লো-এর স্থিতি এবং ফলাফল পরীক্ষা করতে পারেন:

  • ক্লাউড ডেটাপ্রোক ক্লাস্টারগুলি ক্লাস্টার তৈরি এবং মুছে ফেলার নিরীক্ষণ করতে। লক্ষ্য করুন যে ওয়ার্কফ্লো দ্বারা তৈরি ক্লাস্টারটি ক্ষণস্থায়ী: এটি শুধুমাত্র কর্মপ্রবাহের সময়কালের জন্য বিদ্যমান এবং শেষ ওয়ার্কফ্লো টাস্কের অংশ হিসাবে মুছে ফেলা হয়।
  • Apache Hadoop wordcount কাজ দেখতে বা নিরীক্ষণ করতে Cloud Dataproc জবস । কাজের লগ আউটপুট দেখতে জব আইডিতে ক্লিক করুন।
  • এই কোডল্যাবের জন্য আপনার তৈরি করা ক্লাউড স্টোরেজ বাকেটের wordcount ফোল্ডারে wordcount-এর ফলাফল দেখতে Cloud Storage Browser

7. পরিষ্কার করা

এই কোডল্যাবে ব্যবহৃত রিসোর্সের জন্য আপনার GCP অ্যাকাউন্টে চার্জ এড়াতে:

  1. (ঐচ্ছিক) আপনার ডেটা সংরক্ষণ করতে, ক্লাউড কম্পোজার পরিবেশের জন্য ক্লাউড স্টোরেজ বাকেট থেকে ডেটা ডাউনলোড করুন এবং এই কোডল্যাবের জন্য আপনার তৈরি স্টোরেজ বাকেট।
  2. এই কোডল্যাবের জন্য আপনার তৈরি করা ক্লাউড স্টোরেজ বাকেটটি মুছুন
  3. পরিবেশের জন্য ক্লাউড স্টোরেজ বালতি মুছুন
  4. ক্লাউড কম্পোজার পরিবেশ মুছুন । মনে রাখবেন যে আপনার পরিবেশ মুছে ফেলার ফলে পরিবেশের জন্য স্টোরেজ বালতি মুছে যায় না।

আপনি ঐচ্ছিকভাবে প্রকল্পটি মুছে ফেলতে পারেন:

  1. GCP কনসোলে, প্রকল্প পৃষ্ঠায় যান।
  2. প্রকল্প তালিকায়, আপনি যে প্রকল্পটি মুছতে চান সেটি নির্বাচন করুন এবং মুছুন ক্লিক করুন।
  3. বাক্সে, প্রজেক্ট আইডি টাইপ করুন এবং তারপরে প্রজেক্ট মুছে ফেলতে শাট ডাউন ক্লিক করুন।