1. مقدمه
گردش کار یک مورد رایج در تجزیه و تحلیل داده ها است - آنها شامل دریافت، تبدیل و تجزیه و تحلیل داده ها برای یافتن اطلاعات معنی دار درون آن هستند. در Google Cloud Platform، ابزار هماهنگسازی گردشهای کاری، Cloud Composer است که نسخه میزبانی شده از ابزار محبوب گردش کار منبع باز Apache Airflow است. در این آزمایشگاه، از Cloud Composer برای ایجاد یک گردش کار ساده استفاده خواهید کرد که یک کلاستر Cloud Dataproc ایجاد می کند، آن را با استفاده از Cloud Dataproc و Apache Hadoop تجزیه و تحلیل می کند، سپس خوشه Cloud Dataproc را حذف می کند.
Cloud Composer چیست؟
Cloud Composer یک سرویس هماهنگسازی گردش کار کاملاً مدیریتشده است که به شما امکان میدهد تا خطوط لولهای را که در ابرها و مراکز داده در محل پخش میشوند، نویسنده، زمانبندی و نظارت کنید. Cloud Composer که بر اساس پروژه منبع باز محبوب Apache Airflow ساخته شده و با استفاده از زبان برنامه نویسی پایتون کار می کند، بدون قفل و آسان برای استفاده است.
با استفاده از Cloud Composer به جای نمونه محلی Apache Airflow، کاربران می توانند از بهترین جریان هوا بدون هزینه نصب یا مدیریت بهره ببرند.
Apache Airflow چیست؟
Apache Airflow یک ابزار منبع باز است که برای برنامهنویسی، برنامهریزی و نظارت بر گردشهای کاری استفاده میشود. چند اصطلاح کلیدی در رابطه با جریان هوا وجود دارد که در سراسر آزمایشگاه خواهید دید:
- DAG - یک DAG (Directed Acyclic Graph) مجموعه ای از وظایف سازمان یافته است که می خواهید برنامه ریزی و اجرا کنید. DAG ها که گردش کار نیز نامیده می شوند، در فایل های استاندارد پایتون تعریف شده اند
- اپراتور - یک اپراتور یک کار واحد را در یک گردش کار توصیف می کند
Cloud Dataproc چیست؟
Cloud Dataproc سرویس کاملاً مدیریت شده Apache Spark و Apache Hadoop پلتفرم ابری Google است. Cloud Dataproc به راحتی با سایر سرویس های GCP ادغام می شود و یک پلت فرم قدرتمند و کامل برای پردازش داده ها، تجزیه و تحلیل و یادگیری ماشین به شما ارائه می دهد.
کاری که خواهی کرد
این کد لبه به شما نشان می دهد که چگونه یک گردش کاری Apache Airflow را در Cloud Composer ایجاد و اجرا کنید که وظایف زیر را تکمیل می کند:
- یک خوشه Cloud Dataproc ایجاد می کند
- یک کار Apache Hadoop wordcount را روی خوشه اجرا می کند و نتایج آن را به Cloud Storage می دهد.
- خوشه را حذف می کند
چیزی که یاد خواهید گرفت
- نحوه ایجاد و اجرای جریان کاری Apache Airflow در Cloud Composer
- نحوه استفاده از Cloud Composer و Cloud Dataproc برای اجرای یک تجزیه و تحلیل بر روی یک مجموعه داده
- نحوه دسترسی به محیط Cloud Composer خود از طریق کنسول Google Cloud Platform، Cloud SDK و رابط وب Airflow
آنچه شما نیاز دارید
- حساب GCP
- دانش اولیه CLI
- درک اولیه پایتون
2. راه اندازی GCP
پروژه را ایجاد کنید
یک پروژه Google Cloud Platform را انتخاب یا ایجاد کنید.
شناسه پروژه خود را یادداشت کنید که در مراحل بعدی از آن استفاده خواهید کرد.
اگر در حال ایجاد یک پروژه جدید هستید، شناسه پروژه درست در زیر نام پروژه در صفحه ایجاد پیدا می شود | |
اگر قبلاً پروژه ای ایجاد کرده اید، می توانید شناسه را در صفحه اصلی کنسول در کارت اطلاعات پروژه پیدا کنید |
API ها را فعال کنید
APIهای Cloud Composer، Cloud Dataproc و Cloud Storage را فعال کنید . پس از فعال شدن، میتوانید دکمه «رفتن به اعتبارنامهها» را نادیده بگیرید و به مرحله بعدی آموزش بروید. |
ایجاد محیط آهنگساز
یک محیط Cloud Composer با پیکربندی زیر ایجاد کنید :
تمام تنظیمات دیگر می توانند در حالت پیش فرض خود باقی بمانند. روی "ایجاد" در پایین کلیک کنید. |
ایجاد سطل ذخیره سازی ابری
در پروژه خود، یک سطل ذخیره سازی ابری با پیکربندی زیر ایجاد کنید :
وقتی آماده شدید، «ایجاد» را فشار دهید |
3. راه اندازی Apache Airflow
مشاهده اطلاعات محیطی Composer
در کنسول GCP، صفحه Environments را باز کنید
روی نام محیط کلیک کنید تا جزئیات آن را ببینید.
صفحه جزئیات محیط ، اطلاعاتی مانند URL رابط وب جریان هوا، شناسه خوشه موتور Google Kubernetes، نام سطل فضای ذخیره سازی ابری، و مسیر پوشه /dags را ارائه می دهد.
در جریان هوا، DAG (Directed Acyclic Graph) مجموعه ای از وظایف سازمان یافته است که می خواهید برنامه ریزی و اجرا کنید. DAG ها که گردش کار نیز نامیده می شوند، در فایل های استاندارد پایتون تعریف شده اند. Cloud Composer فقط DAG ها را در پوشه /dags زمان بندی می کند. پوشه /dags در سطل Cloud Storage است که Cloud Composer به طور خودکار هنگام ایجاد محیط خود ایجاد می کند.
تنظیم متغیرهای محیط جریان هوای آپاچی
متغیرهای جریان هوای آپاچی یک مفهوم خاص جریان هوا هستند که از متغیرهای محیطی متمایز است. در این مرحله، سه متغیر جریان هوای زیر را تنظیم خواهید کرد: gcp_project
، gcs_bucket
و gce_zone
.
استفاده از gcloud
برای تنظیم متغیرها
ابتدا، Cloud Shell خود را باز کنید، که Cloud SDK به راحتی برای شما نصب شده است.
متغیر محیطی COMPOSER_INSTANCE
را به نام محیط Composer خود تنظیم کنید
COMPOSER_INSTANCE=my-composer-environment
برای تنظیم متغیرهای جریان هوا با استفاده از ابزار خط فرمان gcloud، از دستور gcloud composer environments run
با دستور فرعی variables
استفاده کنید. این دستور gcloud composer
variables
فرعی Airflow CLI را اجرا می کند. دستور فرعی آرگومان ها را به ابزار خط فرمان gcloud
منتقل می کند.
این دستور را سه بار اجرا میکنید و متغیرهای مربوط به پروژه را جایگزین میکنید.
gcp_project
با استفاده از دستور زیر تنظیم کنید و <your-project-id> را با ID پروژه ای که در مرحله 2 یادداشت کرده اید جایگزین کنید.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
خروجی شما چیزی شبیه این خواهد بود
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
gcs_bucket
را با استفاده از دستور زیر تنظیم کنید، و <your-bucket-name>
با شناسه سطلی که در مرحله 2 یادداشت کردید جایگزین کنید. اگر از توصیه ما پیروی کردید، نام سطل شما با ID پروژه شما یکسان است. خروجی شما مشابه دستور قبلی خواهد بود.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
gce_zone
با استفاده از دستور زیر تنظیم کنید. خروجی شما مشابه دستورات قبلی خواهد بود.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(اختیاری) استفاده از gcloud
برای مشاهده یک متغیر
برای دیدن مقدار یک متغیر، variables
فرعی Airflow CLI را با آرگومان get
اجرا کنید یا از Airflow UI استفاده کنید.
به عنوان مثال:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
می توانید این کار را با هر یک از سه متغیری که به تازگی تنظیم کرده اید انجام دهید: gcp_project
، gcs_bucket
و gce_zone
.
4. نمونه گردش کار
بیایید به کد DAG که در مرحله 5 استفاده خواهیم کرد نگاهی بیاندازیم. هنوز نگران دانلود فایل ها نباشید، فقط اینجا را دنبال کنید.
در اینجا چیزهای زیادی برای باز کردن وجود دارد، پس بیایید کمی آن را تجزیه کنیم.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
ما با برخی از واردات Airflow شروع می کنیم:
-
airflow.models
- به ما امکان دسترسی و ایجاد داده ها در پایگاه داده Airflow را می دهد. -
airflow.contrib.operators
- جایی که اپراتورهای جامعه زندگی می کنند. در این مورد، ما بهdataproc_operator
برای دسترسی به Cloud Dataproc API نیاز داریم. -
airflow.utils.trigger_rule
- برای اضافه کردن قوانین ماشه به اپراتورهای ما. قوانین ماشه کنترل دقیقی را در مورد اینکه آیا یک اپراتور باید در رابطه با وضعیت والدین خود اجرا کند یا خیر اجازه می دهد.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
این محل فایل خروجی ما را مشخص می کند. خط قابل توجه در اینجا models.Variable.get('gcs_bucket')
است که مقدار متغیر gcs_bucket
را از پایگاه داده Airflow می گیرد.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
-
WORDCOUNT_JAR
- محل فایل jar. که در نهایت در کلاستر Cloud Dataproc اجرا خواهیم کرد. قبلاً در GCP برای شما میزبانی شده است. -
input_file
- محل فایل حاوی دادههایی است که کار Hadoop ما در نهایت روی آن محاسبه خواهد شد. ما داده ها را با هم در مرحله 5 در آن مکان آپلود می کنیم. -
wordcount_args
- آرگومان هایی که به فایل jar منتقل می کنیم.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
این به ما یک شیء datetime معادل می دهد که نشان دهنده نیمه شب روز قبل است. به عنوان مثال، اگر این در ساعت 11:00 روز 4 مارس اجرا شود، شی datetime نشان دهنده 00:00 در 3 مارس خواهد بود. این به نحوه مدیریت جریان هوا در زمانبندی مربوط میشود. اطلاعات بیشتر در مورد آن را می توان در اینجا یافت.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
متغیر default_dag_args
در قالب یک فرهنگ لغت باید هر زمان که یک DAG جدید ایجاد می شود ارائه شود:
-
'email_on_failure'
- نشان می دهد که آیا هشدارهای ایمیل باید در صورت شکست کاری ارسال شوند یا خیر -
'email_on_retry'
- نشان می دهد که آیا هشدارهای ایمیل باید هنگام تلاش مجدد ارسال شوند یا خیر -
'retries'
- نشان می دهد که در صورت خرابی DAG، Airflow باید چند بار تلاش مجدد انجام دهد -
'retry_delay'
- نشان دهنده مدت زمانی است که جریان هوا قبل از تلاش مجدد باید منتظر بماند -
'project_id'
- به DAG میگوید که چه شناسه پروژه GCP با آن مرتبط شود، که بعداً با اپراتور Dataproc مورد نیاز خواهد بود.
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
با استفاده with models.DAG
به اسکریپت میگوید که همه چیز زیر آن را در داخل همان DAG قرار دهد. ما همچنین شاهد سه استدلال هستیم که در:
- اولین، یک رشته، نامی است که DAG را ایجاد می کنیم. در این مورد، ما از
composer_hadoop_tutorial
استفاده می کنیم. -
schedule_interval
- یک شیdatetime.timedelta
که در اینجا آن را روی یک روز تنظیم کرده ایم. این بدان معنی است که این DAG یک بار در روز پس از'start_date'
که قبلاً در'default_dag_args'
تنظیم شده بود، تلاش خواهد کرد. -
default_args
- دیکشنری که قبلا ایجاد کردیم حاوی آرگومان های پیش فرض برای DAG است
یک خوشه Dataproc ایجاد کنید
بعد، ما یک dataproc_operator.DataprocClusterCreateOperator
ایجاد می کنیم که یک Cloud Dataproc Cluster ایجاد می کند.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
در این عملگر، چند آرگومان می بینیم که همه آنها به جز اولی مختص این عملگر است:
-
task_id
- درست مانند BashOperator، این نامی است که ما به اپراتور اختصاص می دهیم که از رابط کاربری Airflow قابل مشاهده است. -
cluster_name
- نامی که به Cloud Dataproc اختصاص می دهیم. در اینجا، نام آن راcomposer-hadoop-tutorial-cluster-
گذاشتهایم (برای اطلاعات اضافی اختیاری به کادر اطلاعات مراجعه کنید) -
num_workers
- تعداد کارگرانی که به کلاستر Cloud Dataproc اختصاص می دهیم -
zone
- منطقه جغرافیایی که می خواهیم خوشه در آن زندگی کند، همانطور که در پایگاه داده Airflow ذخیره شده است. این متغیر'gce_zone'
که در مرحله 3 تنظیم کردیم، می خواند -
master_machine_type
- نوع ماشینی که می خواهیم به Cloud Dataproc master اختصاص دهیم. -
worker_machine_type
- نوع ماشینی که می خواهیم به کارگر Cloud Dataproc اختصاص دهیم.
یک شغل Apache Hadoop ارسال کنید
dataproc_operator.DataProcHadoopOperator
به ما اجازه می دهد تا یک کار را به یک Cloud Dataproc Cluster ارسال کنیم.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
ما چندین پارامتر را ارائه می دهیم:
-
task_id
- نامی که به این قطعه از DAG اختصاص می دهیم -
main_jar
- محل فایل jar که می خواهیم در مقابل خوشه اجرا شود -
cluster_name
- نام خوشه برای اجرای کار، که متوجه خواهید شد با آنچه در عملگر قبلی پیدا کردیم یکسان است. -
arguments
- آرگومان هایی که به فایل jar منتقل می شوند، همانطور که اگر فایل jar. را از خط فرمان اجرا می کنید.
Cluster را حذف کنید
آخرین اپراتور که ایجاد خواهیم کرد dataproc_operator.DataprocClusterDeleteOperator
است.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
همانطور که از نام آن پیداست، این اپراتور یک Cloud Dataproc داده شده را حذف می کند. ما در اینجا سه استدلال می بینیم:
-
task_id
- درست مانند BashOperator، این نامی است که ما به اپراتور اختصاص می دهیم که از رابط کاربری Airflow قابل مشاهده است. -
cluster_name
- نامی که به Cloud Dataproc اختصاص می دهیم. در اینجا، نام آن راcomposer-hadoop-tutorial-cluster-
گذاشتهایم (برای اطلاعات اضافی اختیاری، کادر اطلاعات را بعد از «ایجاد یک خوشه Dataproc» ببینید) -
trigger_rule
- در ابتدای این مرحله به طور مختصر به Rules Trigger در هنگام واردات اشاره کردیم، اما در اینجا یک مورد در عمل داریم. بهطور پیشفرض، یک اپراتور Airflow اجرا نمیشود مگر اینکه همه اپراتورهای بالادست آن با موفقیت تکمیل شوند. قانون ماشهALL_DONE
فقط مستلزم آن است که تمام اپراتورهای بالادستی بدون در نظر گرفتن موفقیت یا عدم موفقیت آنها را تکمیل کرده باشند. در اینجا این بدان معنی است که حتی اگر کار Hadoop شکست بخورد، ما همچنان می خواهیم خوشه را از بین ببریم.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
در نهایت، ما میخواهیم این عملگرها به ترتیب خاصی اجرا شوند و میتوانیم با استفاده از عملگرهای bitshift پایتون آن را نشان دهیم. در این حالت، create_dataproc_cluster
همیشه ابتدا اجرا می شود، سپس run_dataproc_hadoop
و در نهایت delete_dataproc_cluster
.
با کنار هم گذاشتن همه اینها، کد به شکل زیر است:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. فایل های جریان هوا را در فضای ذخیره سازی ابری آپلود کنید
DAG را در پوشه /dags خود کپی کنید
- ابتدا، Cloud Shell خود را باز کنید، که Cloud SDK به راحتی برای شما نصب شده است.
- مخزن نمونه پایتون را کلون کنید و به دایرکتوری composer/workflows تغییر دهید
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- دستور زیر را اجرا کنید تا نام پوشه DAGs خود را روی یک متغیر محیطی تنظیم کنید
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- دستور
gsutil
زیر را اجرا کنید تا کد آموزشی را در جایی که پوشه /dags شما ایجاد شده است کپی کنید
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
خروجی شما چیزی شبیه به این خواهد بود:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. استفاده از رابط کاربری Airflow
برای دسترسی به رابط وب Airflow با استفاده از کنسول GCP:
|
برای اطلاعات در مورد رابط کاربری Airflow، به دسترسی به رابط وب مراجعه کنید.
مشاهده متغیرها
متغیرهایی که قبلا تنظیم کرده اید در محیط شما باقی می مانند. با انتخاب Admin > Variables از نوار منو Airflow UI می توانید متغیرها را مشاهده کنید.
کاوش در اجراهای DAG
وقتی فایل DAG خود را در پوشه dags
در Cloud Storage آپلود می کنید، Cloud Composer فایل را تجزیه می کند. اگر خطایی پیدا نشد، نام گردش کار در لیست DAG ظاهر می شود و گردش کار در صف قرار می گیرد تا بلافاصله اجرا شود. برای مشاهده DAG های خود، روی DAG ها در بالای صفحه کلیک کنید.
روی composer_hadoop_tutorial
کلیک کنید تا صفحه جزئیات DAG باز شود. این صفحه شامل یک نمایش گرافیکی از وظایف و وابستگی های گردش کار است.
اکنون در نوار ابزار، روی Graph View کلیک کنید و سپس ماوس را روی گرافیک برای هر کار قرار دهید تا وضعیت آن را ببینید. توجه داشته باشید که حاشیه اطراف هر وظیفه نیز وضعیت را نشان می دهد (حاشیه سبز = در حال اجرا؛ قرمز = ناموفق و غیره).
برای اجرای مجدد گردش کار از نمای نمودار :
- در نمای نمودار رابط کاربری Airflow، روی گرافیک
create_dataproc_cluster
کلیک کنید. - برای بازنشانی سه کار روی Clear کلیک کنید و سپس برای تایید روی OK کلیک کنید.
همچنین می توانید با رفتن به صفحات کنسول GCP زیر وضعیت و نتایج گردش کار composer-hadoop-tutorial
را بررسی کنید:
- Cloud Dataproc Cluster برای نظارت بر ایجاد و حذف خوشه. توجه داشته باشید که خوشه ایجاد شده توسط گردش کار زودگذر است: فقط برای مدت زمان گردش کار وجود دارد و به عنوان بخشی از آخرین کار گردش کار حذف می شود.
- Cloud Dataproc Jobs برای مشاهده یا نظارت بر کار شمارش کلمات Apache Hadoop. برای مشاهده خروجی گزارش کار روی شناسه شغل کلیک کنید.
- مرورگر Cloud Storage برای مشاهده نتایج تعداد کلمات در پوشه
wordcount
در سطل Cloud Storage که برای این Codelab ایجاد کردهاید.
7. پاکسازی
برای جلوگیری از تحمیل هزینه به حساب GCP خود برای منابع استفاده شده در این کد لبه:
- (اختیاری) برای ذخیره داده های خود، داده ها را از سطل Cloud Storage برای محیط Cloud Composer و سطل ذخیره سازی که برای این Codelab ایجاد کرده اید دانلود کنید .
- سطل Cloud Storage را که برای این Codelab ایجاد کرده اید حذف کنید .
- سطل Cloud Storage برای محیط را حذف کنید .
- محیط Cloud Composer را حذف کنید . توجه داشته باشید که حذف محیط شما، سطل ذخیره سازی محیط را حذف نمی کند.
همچنین می توانید به صورت اختیاری پروژه را حذف کنید:
- در کنسول GCP، به صفحه پروژه ها بروید.
- در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و روی Delete کلیک کنید.
- در کادر، ID پروژه را تایپ کنید و سپس بر روی Shut down کلیک کنید تا پروژه حذف شود.