1. परिचय
डेटा के विश्लेषण में वर्कफ़्लो, इस्तेमाल के उदाहरण हैं. इनमें काम की जानकारी खोजने के लिए, डेटा को डालना, बदलना, और उसका विश्लेषण करना शामिल होता है. Google Cloud Platform में, वर्कफ़्लो को व्यवस्थित करने के लिए Cloud Composer टूल है. यह लोकप्रिय ओपन सोर्स वर्कफ़्लो टूल Apache Airflow का होस्ट किया गया वर्शन है. इस लैब में, आपको Cloud Composer का इस्तेमाल करके एक आसान वर्कफ़्लो बनाना है. इससे Cloud Dataproc क्लस्टर बनाया जा सकता है, Cloud Dataproc और Apache Hadoop का इस्तेमाल करके, इसका विश्लेषण किया जा सकता है और बाद में Cloud Dataproc क्लस्टर को मिटाया जा सकता है.
Cloud Composer क्या है?
Cloud Composer पूरी तरह से मैनेज की गई वर्कफ़्लो ऑर्केस्ट्रेशन सेवा है. इसकी मदद से, क्लाउड और कंपनी की इमारत में मौजूद डेटा सेंटर पर चलने वाले पाइपलाइन को बनाया जा सकता है, उन्हें शेड्यूल किया जा सकता है, और उनकी निगरानी की जा सकती है. Cloud Composer लोकप्रिय Apache Airflow ओपन सोर्स प्रोजेक्ट पर बना है और इसे Python प्रोग्रामिंग भाषा का इस्तेमाल करके ऑपरेट किया जाता है. Cloud Composer इस्तेमाल करने में आसान है. इसमें लॉक-इन नहीं किया जा सकता.
Apache Airflow के लोकल इंस्टेंस के बजाय Cloud Composer का इस्तेमाल करके, उपयोगकर्ता Airflow के बेहतरीन फ़ायदे पा सकते हैं. इसके लिए, उन्हें इंस्टॉल करने या मैनेज करने में कोई परेशानी नहीं होती.
Apache Airflow क्या है?
Apache Airflow एक ओपन सोर्स टूल है, जिसका इस्तेमाल प्रोग्राम के हिसाब से वर्कफ़्लो लिखने, शेड्यूल करने, और उन्हें मॉनिटर करने के लिए किया जाता है. Airflow से जुड़े कुछ मुख्य शब्दों को याद रखें. ये आपको इस लैब में दिखेंगे:
- DAG - DAG (डायरेक्टेड एकाइकिक ग्राफ़) व्यवस्थित किए गए टास्क का कलेक्शन है, जिन्हें आपको शेड्यूल करके चलाना है. डीएजी को वर्कफ़्लो भी कहा जाता है. इन्हें स्टैंडर्ड Python फ़ाइलों में तय किया जाता है
- ऑपरेटर - ऑपरेटर, वर्कफ़्लो में किसी एक टास्क के बारे में बताता है
Cloud Dataproc क्या है?
Cloud Dataproc, Google Cloud Platform की पूरी तरह से मैनेज की गई Apache Spark और Apache Hadoop सेवा है. Cloud Dataproc, GCP की अन्य सेवाओं के साथ आसानी से इंटिग्रेट हो जाता है. इससे आपको डेटा प्रोसेसिंग, आंकड़ों, और मशीन लर्निंग के लिए एक बेहतरीन और बेहतर प्लैटफ़ॉर्म मिलता है.
आपको क्या करना होगा
यह कोडलैब आपको Cloud Composer में Apache Airflow वर्कफ़्लो बनाने और चलाने का तरीका बताता है, जिससे ये काम पूरे किए जा सकते हैं:
- Cloud Dataproc क्लस्टर बनाता है
- क्लस्टर पर Apache Hadoop wordcount जॉब चलाता है और उसके नतीजों को Cloud Storage में भेजता है
- क्लस्टर मिटाता है
आपको इनके बारे में जानकारी मिलेगी
- Cloud Composer में Apache Airflow वर्कफ़्लो बनाने और उसे चलाने का तरीका
- किसी डेटासेट पर विश्लेषण करने के लिए, Cloud Composer और Cloud Dataproc का इस्तेमाल करने का तरीका
- Google Cloud Platform Console, Cloud SDK, और Airflow के वेब इंटरफ़ेस की मदद से, Cloud Composer सुविधा को ऐक्सेस करने का तरीका
आपको इन चीज़ों की ज़रूरत होगी
- GCP खाता
- सीएलआई के बारे में बुनियादी जानकारी
- Python के बारे में बुनियादी जानकारी
2. GCP सेट अप करना
प्रोजेक्ट बनाना
Google Cloud Platform प्रोजेक्ट चुनें या बनाएं.
अपने प्रोजेक्ट आईडी को नोट कर लें. इसका इस्तेमाल आपको आगे के चरणों में करना होगा.
अगर नया प्रोजेक्ट बनाया जा रहा है, तो प्रोजेक्ट आईडी, बनाने वाले पेज पर प्रोजेक्ट के नाम के ठीक नीचे दिखेगा | |
अगर आपने पहले से ही कोई प्रोजेक्ट बनाया है, तो आपको Console के होम पेज पर प्रोजेक्ट की जानकारी वाले कार्ड में आईडी दिखेगा |
एपीआई चालू करना
Cloud Composer, Cloud Dataproc, और Cloud Storage एपीआई चालू करें.इनके चालू होने के बाद, "क्रेडेंशियल पर जाएं" बटन को अनदेखा करके, ट्यूटोरियल के अगले चरण पर जाएं. |
Composer एनवायरमेंट बनाना
नीचे दिए गए कॉन्फ़िगरेशन का इस्तेमाल करके, Cloud Composer एनवायरमेंट बनाएं:
अन्य सभी कॉन्फ़िगरेशन, डिफ़ॉल्ट रूप से बने रहेंगे. नीचे "बनाएं" पर क्लिक करें. |
Cloud Storage बकेट बनाना
इस कॉन्फ़िगरेशन के साथ अपने प्रोजेक्ट में, Cloud Storage बकेट बनाएं:
जब आप तैयार हों, तब "बनाएं" दबाएं |
3. Apache Airflow सेट अप करना
Composer के एनवायरमेंट की जानकारी देखना
GCP Console में, एनवायरमेंट पेज खोलें
एनवायरमेंट की जानकारी देखने के लिए, उसके नाम पर क्लिक करें.
परिवेश की जानकारी वाले पेज पर Airflow वेब इंटरफ़ेस का यूआरएल, Google Kubernetes Engine क्लस्टर आईडी, Cloud Storage बकेट का नाम, और /dags फ़ोल्डर के लिए पाथ जैसी जानकारी दी जाती है.
Airflow में, डीएजी (डायरेक्टेड एकाइक्लिक ग्राफ़) व्यवस्थित किए गए टास्क का कलेक्शन है, जिन्हें आपको शेड्यूल करके चलाना है. डीएजी को वर्कफ़्लो भी कहा जाता है. इन्हें स्टैंडर्ड Python फ़ाइलों में तय किया जाता है. Cloud Composer, सिर्फ़ /dags फ़ोल्डर में मौजूद डीएजी को शेड्यूल करता है. /dags फ़ोल्डर, उस Cloud Storage बकेट में होता है जो Cloud Composer, आपका एनवायरमेंट बनाने पर अपने-आप बनाता है.
Apache Airflow के एनवायरमेंट वैरिएबल सेट करना
Apache Airflow वैरिएबल, Airflow का एक खास कॉन्सेप्ट है, जो एनवायरमेंट वैरिएबल से अलग है. इस चरण में, ये तीन Airflow वैरिएबल सेट किए जाएंगे: gcp_project
, gcs_bucket
, और gce_zone
.
वैरिएबल सेट करने के लिए gcloud
का इस्तेमाल करना
सबसे पहले, अपना Cloud Shell खोलें. इसमें Cloud SDK टूल पहले से इंस्टॉल होता है.
एनवायरमेंट वैरिएबल COMPOSER_INSTANCE
को अपने कंपोज़र एनवायरमेंट के नाम पर सेट करें
COMPOSER_INSTANCE=my-composer-environment
gcloud कमांड-लाइन टूल का इस्तेमाल करके, Airflow वैरिएबल सेट करने के लिए, gcloud composer environments run
कमांड के साथ variables
सब-कमांड का इस्तेमाल करें. यह gcloud composer
निर्देश, Airflow CLI सब-कमांड variables
को लागू करता है. सब-कमांड, gcloud
कमांड लाइन टूल पर आर्ग्युमेंट पास करता है.
आपको यह कमांड तीन बार चलाना होगा. साथ ही, वैरिएबल को अपने प्रोजेक्ट के हिसाब से बदलना होगा.
<your-project-id> को उस प्रोजेक्ट आईडी से बदलें जो आपने दूसरे चरण में नोट किया था. यहां दिए गए निर्देश का इस्तेमाल करके, gcp_project
को सेट करें.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
आपका आउटपुट कुछ ऐसा दिखेगा
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
<your-bucket-name>
को नीचे दिए गए निर्देश का इस्तेमाल करके, दूसरे चरण में नोट किए गए बकेट आईडी से बदलें. gcs_bucket
को सेट करें. अगर आपने हमारे सुझाव का पालन किया है, तो आपके बकेट का नाम वही होगा जो आपके प्रोजेक्ट आईडी का है. आपको पिछले निर्देश जैसा ही आउटपुट मिलेगा.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
नीचे दिए गए निर्देश का इस्तेमाल करके, gce_zone
को सेट करें. आपको पिछले निर्देशों जैसा ही आउटपुट मिलेगा.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(ज़रूरी नहीं) किसी वैरिएबल को देखने के लिए gcloud
का इस्तेमाल करना
किसी वैरिएबल की वैल्यू देखने के लिए, get
आर्ग्युमेंट के साथ Airflow सीएलआई सब-कमांड variables
चलाएं या Airflow यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करें.
उदाहरण के लिए:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
आपने अभी जिन वैरिएबल को सेट किया है उनमें से किसी के भी साथ ऐसा किया जा सकता है: gcp_project
, gcs_bucket
, और gce_zone
.
4. वर्कफ़्लो का सैंपल
चलिए, पांचवें चरण में इस्तेमाल किए जाने वाले डीएजी के कोड पर नज़र डालते हैं. अब भी फ़ाइलें डाउनलोड करने की चिंता न करें, बस यहां जाएं.
यहां अनपैक करने के लिए बहुत कुछ है, इसलिए आइए इसे थोड़ा छोटा-बड़ा करें.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
हम कुछ Airflow इंपोर्ट से शुरू करते हैं:
airflow.models
- इससे हमें Airflow डेटाबेस में डेटा ऐक्सेस करने और बनाने की अनुमति मिलती है.airflow.contrib.operators
- वह प्लैटफ़ॉर्म जहां कम्यूनिटी के ऑपरेटर मौजूद होते हैं. इस मामले में, हमें Cloud Dataproc API को ऐक्सेस करने के लिएdataproc_operator
की ज़रूरत है.airflow.utils.trigger_rule
- हमारे ऑपरेटर में ट्रिगर नियम जोड़ने के लिए. ट्रिगर नियमों की मदद से यह कंट्रोल किया जा सकता है कि किसी ऑपरेटर को उसके पैरंट स्टेटस के हिसाब से काम करना चाहिए या नहीं.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
इससे हमारी आउटपुट फ़ाइल की जगह की जानकारी मिलती है. यहां ध्यान देने लायक लाइन models.Variable.get('gcs_bucket')
है. इसकी मदद से, Airflow के डेटाबेस से gcs_bucket
वैरिएबल की वैल्यू हासिल की जा सकती है.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
- उस .jar फ़ाइल की जगह जिसे आखिरकार हम Cloud Dataproc क्लस्टर पर चलाएंगे. यह आपके लिए पहले ही GCP पर होस्ट किया जा चुका है.input_file
- उस फ़ाइल की जगह जहां वह डेटा मौजूद है जिस पर हमारी Hadoop जॉब आखिर में कैलकुलेट करेगी. हम पांचवें चरण में, उस जगह पर डेटा को एक साथ अपलोड कर देंगे.wordcount_args
- ऐसे तर्क जिन्हें जार फ़ाइल में भेजा जाएगा.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
यह हमें तारीख और पिछले दिन की आधी रात के बराबर का ऑब्जेक्ट देगा. उदाहरण के लिए, अगर यह 4 मार्च को 11:00 बजे लागू होता है, तो तारीख और समय ऑब्जेक्ट ऑब्जेक्ट, 3 मार्च को 00:00 दिखाएगा. यह इस बात पर निर्भर करता है कि Airflow, शेड्यूलिंग को कैसे हैंडल करता है. इस बारे में ज़्यादा जानकारी यहां मिल सकती है.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
जब भी नया डीएजी बनाया जाता है, तो डिक्शनरी के तौर पर default_dag_args
वैरिएबल दिया जाना चाहिए:
'email_on_failure'
- यह बताता है कि टास्क के फ़ेल होने पर ईमेल अलर्ट भेजे जाएं या नहीं'email_on_retry'
- इससे पता चलता है कि टास्क को फिर से ट्रिगर करने पर, ईमेल सूचनाएं भेजी जानी चाहिए या नहीं'retries'
- इससे पता चलता है कि DAG न होने पर, Airflow को कितनी बार कोशिश करनी चाहिए'retry_delay'
- इससे पता चलता है कि फिर से कोशिश करने से पहले, Airflow को कितने समय तक इंतज़ार करना चाहिए'project_id'
- डीएजी को यह बताता है कि उसे किस GCP प्रोजेक्ट आईडी से जोड़ना है. इसकी ज़रूरत बाद में, Dataproc ऑपरेटर के साथ होगी
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
with models.DAG
का इस्तेमाल करने से, स्क्रिप्ट को उससे नीचे मौजूद सभी चीज़ों को एक ही डीएजी में शामिल करने के लिए कहा जाता है. हमें इसमें दिए गए तीन तर्क भी दिखते हैं:
- पहली स्ट्रिंग, उस डीएजी का नाम है जिसे हम बना रहे हैं. इस मामले में, हम
composer_hadoop_tutorial
का इस्तेमाल कर रहे हैं. schedule_interval
- एकdatetime.timedelta
ऑब्जेक्ट, जिसे हमने यहां एक दिन पर सेट किया है. इसका मतलब है कि यह डीएजी'default_dag_args'
में पहले सेट किए गए'start_date'
के बाद, दिन में एक बार एक्ज़ीक्यूट करने की कोशिश करेगाdefault_args
- पहले बनाई गई डिक्शनरी में DAG के डिफ़ॉल्ट आर्ग्युमेंट शामिल हैं
डेटाप्रॉक क्लस्टर बनाएं
इसके बाद, हम एक dataproc_operator.DataprocClusterCreateOperator
बनाएंगे, जो Cloud Dataproc क्लस्टर बनाता है.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
इस ऑपरेटर में, हमें कुछ तर्क दिखते हैं. हालांकि, इनमें से पहले सभी आर्ग्युमेंट इस ऑपरेटर के लिए खास होते हैं:
task_id
- BashOperator की तरह ही, यह वह नाम है जिसे हमने ऑपरेटर को असाइन किया है. इसे Airflow के यूज़र इंटरफ़ेस (यूआई) से देखा जा सकता हैcluster_name
- यह वह नाम है जो हम Cloud Dataproc क्लस्टर को असाइन करते हैं. यहां हमने इसेcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
नाम दिया है (ज़्यादा जानकारी के लिए, जानकारी बॉक्स देखें)num_workers
- Cloud Dataproc क्लस्टर को असाइन किए गए वर्कर्स की संख्याzone
- वह भौगोलिक क्षेत्र जहां हमें क्लस्टर को सेव करना है. यह Airflow डेटाबेस में सेव किया गया है. इससे तीसरे चरण में सेट किए गए'gce_zone'
वैरिएबल को पढ़ा जाएगाmaster_machine_type
- वह मशीन टाइप जिसे हमें Cloud Dataproc के मास्टर को असाइन करना हैworker_machine_type
- वह मशीन टाइप जिसे हमें Cloud Dataproc वर्कर को असाइन करना है
Apache Hadoop जॉब सबमिट करें
dataproc_operator.DataProcHadoopOperator
की मदद से, हम Cloud Dataproc क्लस्टर में कोई जॉब सबमिट कर सकते हैं.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
हम कई पैरामीटर उपलब्ध कराते हैं:
task_id
- डीएजी के इस हिस्से को असाइन किया गया नामmain_jar
- उस .J फ़ाइल की जगह जिसे हम क्लस्टर के लिए चलाना चाहते हैंcluster_name
- उस क्लस्टर का नाम जिस पर जॉब चलाना है. आपको पता चलेगा कि यह नाम, पिछले ऑपरेटर में मिले नाम से मेल खाता हैarguments
- ऐसे आर्ग्युमेंट जो जार फ़ाइल में पास किए जाते हैं, जैसा कि कमांड लाइन से .jar फ़ाइल को एक्ज़ीक्यूट करने पर किया जाता है
क्लस्टर मिटाएं
आखिरी ऑपरेटर dataproc_operator.DataprocClusterDeleteOperator
होगा.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
जैसा कि नाम से पता चलता है, यह ऑपरेटर किसी दिए गए Cloud Dataproc क्लस्टर को मिटा देगा. हमें यहां तीन तर्क दिखते हैं:
task_id
- BashOperator की तरह ही, यह वह नाम है जिसे हमने ऑपरेटर को असाइन किया है. इसे Airflow के यूज़र इंटरफ़ेस (यूआई) से देखा जा सकता हैcluster_name
- वह नाम जो हम Cloud Dataproc क्लस्टर को असाइन करते हैं. यहां, हमने इसेcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
नाम दिया है. ज़्यादा जानकारी के लिए, "Dataproc क्लस्टर बनाएं" के बाद मौजूद जानकारी बॉक्स देखेंtrigger_rule
- हमने इस चरण की शुरुआत में, इंपोर्ट के दौरान ट्रिगर नियमों के बारे में कम शब्दों में बताया था. हालांकि, यहां हमने एक नियम को लागू किया है. डिफ़ॉल्ट रूप से, Airflow ऑपरेटर तब तक लागू नहीं होता, जब तक उसके सभी अपस्ट्रीम ऑपरेटर काम नहीं कर लेते.ALL_DONE
के ट्रिगर के नियम के लिए सिर्फ़ यह ज़रूरी है कि सभी अपस्ट्रीम ऑपरेटर पूरे हुए हों, भले ही वे पूरे हुए हों या नहीं. इसका मतलब है कि Hadoop जॉब के फ़ेल होने के बावजूद, हम क्लस्टर को हटाना चाहते हैं.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
आखिर में, हम चाहते हैं कि ये ऑपरेटर किसी खास क्रम में लागू हों. इसके लिए, Python के बिटशिफ़्ट ऑपरेटर का इस्तेमाल किया जा सकता है. इस मामले में, create_dataproc_cluster
हमेशा पहले लागू होगा. इसके बाद, run_dataproc_hadoop
और आखिर में delete_dataproc_cluster
लागू होगा.
इन सभी को एक साथ मिलाकर, कोड कुछ ऐसा दिखेगा:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Airflow की फ़ाइलों को Cloud Storage में अपलोड करें
DAG को अपने /dags फ़ोल्डर में कॉपी करें
- सबसे पहले, अपना Cloud Shell खोलें. इसमें Cloud SDK टूल पहले से इंस्टॉल होता है.
- Python सैंपल रेपो को क्लोन करें और कंपोज़र/वर्कफ़्लो डायरेक्ट्री में बदलें
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- अपने DAGs फ़ोल्डर के नाम को एनवायरमेंट वैरिएबल पर सेट करने के लिए, यह निर्देश चलाएं
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- ट्यूटोरियल का कोड उस जगह पर कॉपी करने के लिए, यहां दिया गया
gsutil
कमांड चलाएं जहां आपका /dags फ़ोल्डर बनाया गया है
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
आपका आउटपुट कुछ ऐसा दिखेगा:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. Airflow यूज़र इंटरफ़ेस (यूआई) का इस्तेमाल करना
GCP कंसोल का इस्तेमाल करके, Airflow के वेब इंटरफ़ेस को ऐक्सेस करने के लिए:
|
Airflow के यूज़र इंटरफ़ेस (यूआई) के बारे में जानकारी के लिए, वेब इंटरफ़ेस ऐक्सेस करना देखें.
वैरिएबल देखें
आपने पहले जो वैरिएबल सेट किए थे वे आपके एनवायरमेंट में सेव रहते हैं. Airflow यूज़र इंटरफ़ेस (यूआई) मेन्यू बार से, एडमिन > वैरिएबल को चुनकर, वैरिएबल देखे जा सकते हैं.
डीएजी रन को एक्सप्लोर करें
जब DAG फ़ाइल को Cloud Storage के dags
फ़ोल्डर में अपलोड किया जाता है, तो Cloud Composer उस फ़ाइल को पार्स करता है. अगर कोई गड़बड़ी नहीं मिलती है, तो वर्कफ़्लो का नाम DAG सूची में दिखता है और वर्कफ़्लो को तुरंत चलाने के लिए सूची में जोड़ दिया जाता है. अपने डीएजी देखने के लिए, पेज पर सबसे ऊपर मौजूद डीएजी पर क्लिक करें.
डीएजी की ज़्यादा जानकारी वाला पेज खोलने के लिए, composer_hadoop_tutorial
पर क्लिक करें. इस पेज पर, वर्कफ़्लो टास्क और डिपेंडेंसी को ग्राफ़िक के तौर पर दिखाया गया है.
अब, टूलबार में ग्राफ़ व्यू पर क्लिक करें. इसके बाद, हर टास्क की स्थिति देखने के लिए, उसके ग्राफ़ पर कर्सर घुमाएं. ध्यान दें कि हर टास्क के चारों ओर मौजूद बॉर्डर से भी स्टेटस का पता चलता है. जैसे, हरे रंग का बॉर्डर = चल रहा है; लाल रंग का बॉर्डर = पूरा नहीं हुआ वगैरह.
ग्राफ़ व्यू से वर्कफ़्लो को फिर से चलाने के लिए:
- Airflow के यूज़र इंटरफ़ेस (यूआई) के ग्राफ़ व्यू में,
create_dataproc_cluster
ग्राफ़िक पर क्लिक करें. - तीन टास्क को रीसेट करने के लिए, मिटाएं पर क्लिक करें. इसके बाद, पुष्टि करने के लिए ठीक है पर क्लिक करें.
GCP Console के इन पेजों पर जाकर, composer-hadoop-tutorial
वर्कफ़्लो की स्थिति और नतीजे भी देखे जा सकते हैं:
- क्लस्टर बनाने और मिटाने की निगरानी करने के लिए, Cloud Dataproc क्लस्टर. ध्यान दें कि वर्कफ़्लो से बनाया गया क्लस्टर कुछ समय के लिए ही मौजूद रहता है: यह सिर्फ़ वर्कफ़्लो के दौरान मौजूद रहता है और वर्कफ़्लो के आखिरी टास्क के हिस्से के तौर पर मिट जाता है.
- Apache Hadoop वर्ड काउंट जॉब देखने या मॉनिटर करने के लिए, Cloud Dataproc जॉब. जॉब लॉग का आउटपुट देखने के लिए, जॉब आईडी पर क्लिक करें.
- Cloud Storage ब्राउज़र, ताकि आपने इस कोडलैब के लिए जो Cloud Storage बकेट बनाई है उसके
wordcount
फ़ोल्डर में, शब्दों की संख्या के नतीजे देख सकें.
7. साफ़-सफ़ाई सेवा
इस कोडलैब में इस्तेमाल किए गए संसाधनों के लिए, आपके GCP खाते पर शुल्क न लगे:
- (ज़रूरी नहीं) अपना डेटा सेव करने के लिए, Cloud Composer एनवायरमेंट के लिए बनाई गई Cloud Storage बकेट और इस कोडलैब के लिए बनाई गई स्टोरेज बकेट से डेटा डाउनलोड करें.
- इस कोडलैब के लिए बनाई गई Cloud Storage बकेट को मिटाएं.
- एनवायरमेंट के लिए Cloud Storage बकेट मिटाएं.
- Cloud Composer एनवायरमेंट को मिटाएं. ध्यान दें कि अपने एनवायरमेंट को मिटाने से, एनवायरमेंट का स्टोरेज बकेट नहीं मिटता.
आपके पास इस प्रोजेक्ट को मिटाने का भी विकल्प है:
- GCP Console में, प्रोजेक्ट पेज पर जाएं.
- प्रोजेक्ट की सूची में, वह प्रोजेक्ट चुनें जिसे आपको मिटाना है. इसके बाद, मिटाएं पर क्लिक करें.
- बॉक्स में प्रोजेक्ट आईडी टाइप करें. इसके बाद, प्रोजेक्ट मिटाने के लिए बंद करें पर क्लिक करें.