1. מבוא
תהליכי עבודה הם תרחיש לדוגמה נפוץ בניתוח נתונים – הם כוללים הטמעה, טרנספורמציה וניתוח של נתונים כדי למצוא את המידע המשמעותי שבתוכם. בפלטפורמת Google Cloud, הכלי לתזמור תהליכי עבודה הוא Cloud Composer, שהוא גרסה מתארחת של הכלי הפופולרי בקוד פתוח לתהליכי עבודה, Apache Airflow. במעבדה הזו תלמדו איך להשתמש ב-Cloud Composer כדי ליצור תהליך עבודה פשוט שיוצר אשכול של Cloud Dataproc, מנתח אותו באמצעות Cloud Dataproc ו-Apache Hadoop ולאחר מכן מחק את אשכול Cloud Dataproc.
מה זה Cloud Composer?
Cloud Composer הוא שירות מנוהל לתזמור תהליכי עבודה שמאפשר ליצור צינורות עיבוד נתונים, לתזמן אותם ולעקוב אחריהם משם. Cloud Composer, שמובנה על ידי פרויקט הקוד הפתוח Apache Airflow הפופולרי ומופעל באמצעות שפת התכנות Python, זמין ללא נעילה וקל לשימוש.
באמצעות Cloud Composer במקום במופע מקומי של Apache Airflow, המשתמשים יכולים ליהנות מהמיטב של Airflow ללא תקורת התקנה או ניהול.
מהו Apache Airflow?
Apache Airflow הוא כלי בקוד פתוח שמשמש ליצירה, לתזמון ולניטור של תהליכי עבודה באופן פרוגרמטי. יש כמה מונחים עיקריים שכדאי לזכור בקשר ל-Airflow שיוצגו במהלך שיעור ה-Lab:
- DAG – DAG (גרף אציקלי מכוון) הוא אוסף של משימות מאורגנות שרוצים לתזמן ולהריץ. DAG, שנקרא גם תהליכי עבודה, מוגדר בקובצי Python סטנדרטיים
- אופרטור – אופרטור מתאר משימה אחת בתהליך עבודה
מה זה Cloud Dataproc?
Cloud Dataproc הוא שירות מנוהל לחלוטין של Apache Spark ו-Apache Hadoop ב-Google Cloud Platform. Cloud Dataproc משתלב בקלות עם שירותי GCP אחרים, ומספק פלטפורמה חזקה ומלאה לעיבוד נתונים, לניתוח נתונים וללמידת מכונה.
מה תעשו
בשיעור הזה תלמדו איך ליצור ולהפעיל תהליך עבודה של Apache Airflow ב-Cloud Composer ב-Cloud Composer, ויבצע את המשימות הבאות:
- יצירת אשכול של Cloud Dataproc
- מריצה משימה של ספירת מילים של Apache Hadoop באשכול ומפיקה את התוצאות שלה ל-Cloud Storage
- מחיקה של האשכול
מה תלמדו
- איך יוצרים ומפעילים תהליך עבודה של Apache Airflow ב-Cloud Composer
- איך משתמשים ב-Cloud Composer וב-Cloud Dataproc כדי להריץ ניתוח על מערך נתונים
- איך ניגשים לסביבת Cloud Composer דרך מסוף Google Cloud Platform, Cloud SDK וממשק האינטרנט של Airflow
מה צריך להכין
- חשבון GCP
- ידע בסיסי ב-CLI
- הבנה בסיסית של Python
2. הגדרת GCP
יצירת הפרויקט
בוחרים פרויקט קיים או יוצרים פרויקט חדש ב-Google Cloud Platform.
חשוב לזכור את מזהה הפרויקט, שבו תשתמשו בשלבים הבאים.
אם אתם יוצרים פרויקט חדש, מזהה הפרויקט מופיע מתחת לשם הפרויקט בדף היצירה. | |
אם כבר יצרתם פרויקט, תוכלו למצוא את המזהה בדף הבית של המסוף בכרטיס 'פרטי הפרויקט' |
הפעלת ממשקי ה-API
מפעילים את Cloud Composer, Cloud Dataproc ו-Cloud Storage API. אחרי שהם מופעלים, אפשר להתעלם מהלחצן 'כניסה לדף פרטי הכניסה' ולהמשיך לשלב הבא במדריך. |
יצירת סביבת Composer
יוצרים סביבה של Cloud Composer עם ההגדרות הבאות:
כל שאר ההגדרות יכולות להישאר בברירת המחדל. לוחצים על 'יצירה' בחלק התחתון. |
יצירת קטגוריה של Cloud Storage
יוצרים קטגוריה ב-Cloud Storage בפרויקט עם ההגדרות הבאות:
כשהכול מוכן, אפשר ללחוץ על 'יצירה' |
3. הגדרת Apache Airflow
הצגת המידע על סביבת Composer
במסוף GCP, פותחים את הדף Environments (סביבות).
לוחצים על שם הסביבה כדי לראות את הפרטים שלה.
הדף Environment details כולל מידע כמו כתובת ה-URL של ממשק האינטרנט של Airflow, מזהה האשכול של Google Kubernetes Engine, שם הקטגוריה של Cloud Storage והנתיב של התיקייה /dags.
ב-Airflow, DAG (Directed Acyclic Graph) הוא אוסף של משימות מאורגנות שאתם רוצים לתזמן ולהפעיל. DAG, שנקרא גם תהליך עבודה, מוגדר בקובצי Python רגילים. Cloud Composer מתזמן את ה-DAG רק בתיקייה /dags. התיקייה /dags נמצאת בקטגוריה של Cloud Storage ש-Cloud Composer יוצר באופן אוטומטי כשיוצרים את הסביבה.
הגדרת משתני סביבה ב-Apache Airflow
משתני Apache Airflow הם קונספט ספציפי ל-Airflow ששונה ממשתני סביבה. בשלב הזה מגדירים את שלושת המשתנים של Airflow הבאים: gcp_project
, gcs_bucket
ו-gce_zone
.
שימוש ב-gcloud
להגדרת משתנים
קודם כול, פותחים את Cloud Shell, שבו Cloud SDK מותקן בצורה נוחה.
מגדירים את משתנה הסביבה COMPOSER_INSTANCE
לשם של סביבת Composer
COMPOSER_INSTANCE=my-composer-environment
כדי להגדיר משתני Airflow באמצעות כלי שורת הפקודה של gcloud, משתמשים בפקודה gcloud composer environments run
עם פקודת המשנה variables
. פקודת gcloud composer
הזו מפעילה את פקודת המשנה של ה-CLI של Airflow variables
. פקודת המשנה מעבירה את הארגומנטים לכלי שורת הפקודה gcloud
.
מריצים את הפקודה הזו שלוש פעמים, ומחליפים את המשתנים במשתנים הרלוונטיים לפרויקט.
מגדירים את gcp_project
באמצעות הפקודה הבאה, ומחליפים את <your-project-id> במזהה הפרויקט שציינתם בשלב 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
הפלט שלכם ייראה בערך כך
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
מגדירים את gcs_bucket
באמצעות הפקודה הבאה, ומחליפים את <your-bucket-name>
במזהה הקטגוריה שציינתם בשלב 2. אם פעלתם לפי ההמלצה שלנו, שם הקטגוריה יהיה זהה למזהה הפרויקט. הפלט שלכם יהיה דומה לפקודה הקודמת.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
מגדירים את gce_zone
באמצעות הפקודה הבאה. הפלט שלכם יהיה דומה לפקודות הקודמות.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(אופציונלי) שימוש ב-gcloud
כדי להציג משתנה
כדי לראות את הערך של משתנה, מריצים את פקודת המשנה של Airflow CLI variables
עם הארגומנט get
או משתמשים ב-ממשק המשתמש של Airflow.
לדוגמה:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
אפשר לעשות זאת עם כל אחד משלושת המשתנים שהגדרתם: gcp_project
, gcs_bucket
ו-gce_zone
.
4. תהליך עבודה לדוגמה
נסתכל על הקוד של DAG שבו נשתמש בשלב 5. אל תדאגו, עדיין לא צריך להוריד קבצים. פשוט פועלים לפי ההוראות הבאות.
יש כאן הרבה דברים שצריך להסביר, אז נסביר קצת.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
נתחיל עם כמה ייבוא של Airflow:
airflow.models
– מאפשרת לנו לגשת לנתונים במסד הנתונים של Airflow וליצור נתונים בו.airflow.contrib.operators
– המיקום שבו נמצאים המפעילים מהקהילה. במקרה כזה, אנחנו צריכים אתdataproc_operator
כדי לגשת ל-Cloud Dataproc API.airflow.utils.trigger_rule
– להוספת כללי טריגר לאופרטורים שלנו. כללי טריגר מאפשרים לשלוט באופן מדויק אם אופרטורים צריכים לפעול בהתאם לסטטוס של ההורים שלהם.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
מציין את המיקום של קובץ הפלט שלנו. השורה הבולטת כאן היא models.Variable.get('gcs_bucket')
, שתאחזר את ערך המשתנה gcs_bucket
ממסד הנתונים של Airflow.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
– מיקום קובץ ה- .jar שבסופו של דבר נריץ באשכול Cloud Dataproc. הוא כבר מתארח ב-GCP בשבילך.input_file
– המיקום של הקובץ שמכיל את הנתונים שעליהם תתבצע בסופו של דבר החישוב של המשימה ב-Hadoop. נעלה את הנתונים למיקום הזה ביחד בשלב 5.wordcount_args
– ארגומנטים שנעביר לקובץ ה-jar.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
הפעולה הזו תספק לנו אובייקט עם תאריך ושעה מקבילים שמייצג את חצות ביום הקודם. לדוגמה, אם הפונקציה הזו תבוצע בשעה 11:00 ב-4 במרץ, אובייקט תאריך-שעה יציג את השעה 00:00 ב-3 במרץ. הבעיה הזו קשורה לאופן שבו Airflow מטפל בתזמון. מידע נוסף זמין כאן.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
בכל פעם שיוצרים DAG חדש, יש לספק את המשתנה default_dag_args
בצורת מילון:
'email_on_failure'
– מציין אם צריך לשלוח התראות באימייל כשמשימה נכשלה'email_on_retry'
– מציין אם צריך לשלוח התראות באימייל כשמתבצע ניסיון חוזר של משימה'retries'
– מציין כמה ניסיונות חוזרים Airflow צריך לבצע במקרה של כשל ב-DAG'retry_delay'
– מציינת כמה זמן Airflow צריך להמתין לפני ניסיון חוזר'project_id'
– קובע את מזהה הפרויקט ב-GCP שאליו DAG ישויך. המזהה הזה יידרש בהמשך ל-Dataproc Operator.
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
השימוש ב-with models.DAG
מורה לסקריפט לכלול את כל מה שמתחתיו בתוך אותו DAG. אנחנו גם רואים שלושה ארגומנטים שהועברו:
- הראשון, מחרוזת, הוא השם שרוצים לתת ל-DAG שיצרנו. במקרה הזה, נשתמש ב-
composer_hadoop_tutorial
. schedule_interval
- אובייקטdatetime.timedelta
, שכאן הגדרנו ליום אחד. המשמעות היא שה-DAG הזה ינסה לפעול פעם ביום אחרי ה-'start_date'
שהוגדר קודם לכן ב-'default_dag_args'
default_args
- המילון שיצרנו קודם שמכיל את ארגומנטים ברירת המחדל של ה-DAG
יצירת אשכול Dataproc
בשלב הבא ניצור dataproc_operator.DataprocClusterCreateOperator
שיוצר אשכול של Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
האופרטור הזה כולל כמה ארגומנטים, והם כולם ספציפיים לאופרטור הזה, מלבד הראשון:
task_id
– בדומה ל-BashOperator, זהו השם שאנחנו מקצים למפעיל, שאפשר לראות בממשק המשתמש של Airflow.cluster_name
– השם שאנחנו מקצים לאשכול Cloud Dataproc. כאן שינינו את השםcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(מידע אופציונלי נוסף בתיבת המידע)num_workers
– מספר העובדים שאנחנו מקצים לאשכול Cloud Dataproczone
– האזור הגיאוגרפי שבו רוצים שהאשכול ימוקם, כפי שהוא שמור במסד הנתונים של Airflow. הפקודה הזו תקריא את המשתנה'gce_zone'
שהגדרתם בשלב 3.master_machine_type
– סוג המכונה שאנחנו רוצים להקצות למאסטר של Cloud Dataprocworker_machine_type
– סוג המכונה שאנחנו רוצים להקצות ל-Cloud Dataproc
שליחת משימת Apache Hadoop
השדה dataproc_operator.DataProcHadoopOperator
מאפשר לנו לשלוח משימה לאשכול של Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
אנחנו מספקים כמה פרמטרים:
task_id
- השם שאנחנו מקצים לחלק הזה ב-DAGmain_jar
– המיקום של קובץ ה- .jar שאנחנו רוצים להריץ מול האשכולcluster_name
– שם האשכול שעליו רוצים להריץ את המשימה, שתבחינו בו זהה לשם שמצאנו באופרטור הקודםarguments
– ארגומנטים שמועברים לקובץ ה-jar, כמו שאתם עושים כשמריצים את קובץ ה-jar.jar משורת הפקודה
מחיקת האשכול
האופרטור האחרון שיצרנו הוא dataproc_operator.DataprocClusterDeleteOperator
.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
כפי שרואים מהשם, האופרטור הזה ימחק אשכול נתון של Cloud Dataproc. כאן מוצגים שלושה ארגומנטים:
task_id
– בדומה ל-BashOperator, זהו השם שאנחנו מקצים למפעיל, שאפשר לראות בממשק המשתמש של Airflow.cluster_name
– השם שאנחנו מקצים לאשכול Cloud Dataproc. כאן הענקנו לו את השםcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(למידע נוסף, אפשר לעיין בתיבת המידע אחרי 'יצירת אשכול Dataproc'trigger_rule
– הזכרנו את כללי הטריגר בקצרה במהלך הייבוא בתחילת השלב הזה, אבל כאן אנחנו רואים כלל אחד בפעולה. כברירת מחדל, מפעיל Airflow לא יתבצע אלא אם כל האופרטורים שלו ב-upstream השלימו את התהליך. כלל הטריגרALL_DONE
רק מחייב שכל האופרטורים ב-upstream ישלימו את התהליך, גם אם הם לא הצליחו. כלומר, גם אם משימת Hadoop נכשלה, אנחנו עדיין רוצים לנתק את האשכול.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
לסיום, אנחנו רוצים שהאופרטורים האלה יופעלו בסדר מסוים, ונוכל לציין זאת באמצעות שימוש באופרטורים של העברת סיביות (bitshift) ב-Python. במקרה הזה, הפקודה create_dataproc_cluster
תמיד תבוצע קודם, אחריה run_dataproc_hadoop
ולבסוף delete_dataproc_cluster
.
כשמרכזים את כל החלקים, הקוד נראה כך:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. העלאת קבצים מ-Airflow ל-Cloud Storage
העתקת ה-DAG לתיקייה /dags
- קודם כול, פותחים את Cloud Shell, שבו Cloud SDK מותקן בצורה נוחה.
- משכפלים את המאגר של דוגמאות Python ועוברים לספרייה composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- מריצים את הפקודה הבאה כדי להגדיר את השם של תיקיית ה-DAG למשתנה סביבה
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- מריצים את הפקודה הבאה של
gsutil
כדי להעתיק את הקוד של המדריך למקום שבו נוצרת התיקייה /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
הפלט שלכם ייראה בערך כך:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. שימוש בממשק המשתמש של Airflow
כדי לגשת לממשק האינטרנט של Airflow באמצעות מסוף GCP:
|
מידע על ממשק המשתמש של Airflow זמין במאמר גישה לממשק האינטרנט.
הצגת המשתנים
המשתנים שהגדרתם קודם נשארים בסביבה. כדי להציג את המשתנים, בוחרים באפשרות אדמין > משתנים בסרגל התפריטים של ממשק המשתמש של Airflow.
סקירה כללית של הפעלות DAG
כשאתם מעלים את קובץ ה-DAG לתיקייה dags
ב-Cloud Storage, Cloud Composer מנתח את הקובץ. אם לא יתגלו שגיאות, שם תהליך העבודה יופיע ברשימת ה-DAG ותהליך העבודה יתווסף לתור להרצה מיידית. כדי להציג את ה-DAGs, לוחצים על DAGs בחלק העליון של הדף.
לוחצים על composer_hadoop_tutorial
כדי לפתוח את דף פרטי ה-DAG. הדף הזה כולל ייצוג גרפי של משימות של תהליך עבודה ויחסי תלות.
עכשיו, בסרגל הכלים, לוחצים על תצוגת תרשים ואז מעבירים את העכבר מעל התרשים של כל משימה כדי לראות את הסטטוס שלה. לתשומת ליבכם: הגבול מסביב לכל משימה מציין גם את הסטטוס (גבול ירוק = פועל, אדום = נכשל וכו').
כדי להפעיל שוב את תהליך העבודה מתצוגת התרשים:
- בתצוגת התרשים של ממשק המשתמש של Airflow, לוחצים על הגרפיקה
create_dataproc_cluster
. - לוחצים על ניקוי כדי לאפס את שלוש המשימות ואז לוחצים על אישור כדי לאשר.
אפשר גם לבדוק את הסטטוס והתוצאות של תהליך העבודה של composer-hadoop-tutorial
בדפים הבאים במסוף GCP:
- אשכולות Cloud Dataproc כדי לעקוב אחרי יצירת אשכולות ומחיקה שלהם. חשוב לזכור שהאשכול שנוצר על ידי תהליך העבודה הוא זמני: הוא קיים רק למשך תהליך העבודה, ונמחק כחלק מהמשימה האחרונה בתהליך העבודה.
- Cloud Dataproc Jobs כדי להציג או לעקוב אחרי המשימה של ספירת המילים ב-Apache Hadoop. לוחצים על מזהה המשימה כדי לראות את הפלט של יומן המשימות.
- Cloud Storage Browser כדי לראות את תוצאות ספירת המילים בתיקייה
wordcount
בקטגוריה של Cloud Storage שיצרתם לקודלאב הזה.
7. הסרת המשאבים
כדי להימנע מצבירת חיובים בחשבון GCP עבור המשאבים שבהם נעשה שימוש ב-Codelab הזה:
- (אופציונלי) כדי לשמור את הנתונים, מורידים את הנתונים מהקטגוריה של Cloud Storage לסביבת Cloud Composer ומקטגוריית האחסון שיצרתם לקודלאב הזה.
- מוחקים את הקטגוריה של Cloud Storage שיצרתם בשביל ה-Codelab הזה.
- מחיקת הקטגוריה של Cloud Storage של הסביבה.
- מחיקה של סביבת Cloud Composer. חשוב לזכור שמחיקת הסביבה לא מוחקת את קטגוריית האחסון של הסביבה.
אפשר גם למחוק את הפרויקט:
- במסוף GCP, עוברים לדף Projects.
- ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על Delete.
- כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבה ולוחצים על Shut down.