1. مقدمة
تُعدّ سير العمل حالات استخدام شائعة في تحليلات البيانات، إذ تتضمن نقل البيانات وتحويلها وتحليلها للعثور على المعلومات المفيدة فيها. في Google Cloud Platform، أداة تنسيق مهام سير العمل هي Cloud Composer، وهي إصدار مستضاف من أداة سير العمل المفتوحة المصدر الشائعة Apache Airflow. في هذا التمرين، ستستخدم Cloud Composer لإنشاء سير عمل بسيط يؤدي إلى إنشاء مجموعة Cloud Dataproc وتحليلها باستخدام Cloud Dataproc وApache Hadoop، ثم حذف مجموعة Cloud Dataproc بعد ذلك.
ما هو Cloud Composer؟
Cloud Composer هي خدمة لإدارة سير العمل مُدارة بالكامل، وتتيح لك إنشاء وجدولة ومراقبة مسارات التعلّم التي تشمل السحابة الإلكترونية ومراكز البيانات داخل الشركة. تم إنشاء Cloud Composer استنادًا إلى مشروع Apache Airflow المفتوح المصدر الشهير ويتم تشغيله باستخدام لغة برمجة Python، وهو سهل الاستخدام وخالٍ من القيود.
باستخدام Cloud Composer بدلاً من نسخة محلية من Apache Airflow، يمكن للمستخدمين الاستفادة من أفضل ميزات Airflow بدون أيّ تكاليف إضافية للتثبيت أو الإدارة.
ما هو Apache Airflow؟
Apache Airflow هو أداة مفتوحة المصدر تُستخدم للمؤلف البرمجي وجدولة المهام ومراقبتها آليًا. هناك بعض المصطلحات الرئيسية التي يجب تذكرها فيما يتعلق بـ Airflow، والتي ستراها خلال التمرين المعملي:
- مخطّط سير العمل (DAG): مخطّط سير العمل (DAG) هو مجموعة من المهام المنظَّمة التي تريد جدولتها وتنفيذها. يتم تعريف مخططات DAG، المعروفة أيضًا باسم سير العمل، في ملفات بايثون القياسية
- عامل التشغيل: يصف عامل تشغيل مهمة واحدة في سير عمل
ما هي Cloud Dataproc؟
Cloud Dataproc هي خدمة Apache Spark وApache Hadoop المُدارة بالكامل من Google Cloud Platform. يمكن دمج Cloud Dataproc بسهولة مع خدمات GCP الأخرى، ما يمنحك منصة فعّالة وكاملة لمعالجة البيانات والإحصاءات وتعلُّم الآلة.
ما ستفعله
يعرض لك هذا الدرس التطبيقي كيفية إنشاء سير عمل Apache Airflow وتشغيله في Cloud Composer والذي يُكمل المهام التالية:
- إنشاء مجموعة Cloud Dataproc
- تشغيل مهمة احتساب عدد الكلمات في Apache Hadoop على المجموعة، وإخراج نتائجها إلى Cloud Storage
- حذف المجموعة
ما ستتعرّف عليه
- كيفية إنشاء سير عمل Apache Airflow وتشغيله في Cloud Composer
- كيفية استخدام Cloud Composer وCloud Dataproc لإجراء تحليل على مجموعة بيانات
- كيفية الوصول إلى بيئة Cloud Composer من خلال Google Cloud Platform Console وCloud SDK وواجهة ويب Airflow
المتطلبات
- حساب Google Cloud Platform
- معرفة واجهة سطر الأوامر (CLI) الأساسية
- فهم أساسي للغة بايثون
2. إعداد Google Cloud Platform
إنشاء المشروع
اختَر مشروعًا على Google Cloud Platform أو أنشئ مشروعًا.
دوِّن رقم تعريف المشروع الذي ستستخدمه في الخطوات اللاحقة.
إذا كنت بصدد إنشاء مشروع جديد، يمكنك العثور على رقم تعريف المشروع أسفل اسم المشروع مباشرةً في صفحة الإنشاء. | |
إذا سبق لك إنشاء مشروع، يمكنك العثور على رقم التعريف على صفحة وحدة التحكّم الرئيسية في بطاقة معلومات المشروع. |
تفعيل واجهات برمجة التطبيقات
فعِّل واجهات برمجة تطبيقات Cloud Composer وCloud Dataproc وCloud Storage. بعد تفعيلها، يمكنك تجاهل الزر "الانتقال إلى بيانات الاعتماد" والمتابعة إلى الخطوة التالية من البرنامج التعليمي. |
إنشاء بيئة Composer
أنشئ بيئة Cloud Composer باستخدام الإعدادات التالية:
ويمكن أن تظل جميع الإعدادات الأخرى على حالتها التلقائية. انقر على "إنشاء" في الأسفل. |
إنشاء حزمة Cloud Storage
في مشروعك، أنشِئ حزمة Cloud Storage باستخدام الإعدادات التالية:
اضغط على "إنشاء" عندما تصبح جاهزًا |
3- إعداد Apache Airflow
عرض معلومات بيئة Composer
في وحدة تحكُّم Google Cloud Platform، افتح صفحة البيئات.
انقر على اسم البيئة للاطّلاع على تفاصيلها.
توفّر صفحة تفاصيل البيئة معلومات، مثل عنوان URL لواجهة الويب Airflow ورقم تعريف مجموعة Google Kubernetes Engine واسم حزمة Cloud Storage ومسار مجلد /dags.
في Airflow، الرسم البياني المُوجَّه غير المُعادِل (DAG) هو مجموعة من المهام المنظَّمة التي تريد جدولتها وتنفيذها. يتم تحديد مخططات DAG، التي تُعرف أيضًا باسم سير العمل، في ملفات Python العادية. لا تحدّد خدمة Cloud Composer سوى جداول البيانات الوصفية في مجلد /dags. يتوفّر مجلد /dags في حزمة Cloud Storage التي تنشئها Cloud Composer تلقائيًا عند إنشاء بيئتك.
ضبط متغيّرات بيئة Apache Airflow
متغيّرات Apache Airflow هي مفهوم خاص بخدمة Airflow يختلف عن متغيّرات البيئة. في هذه الخطوة، عليك إعداد متغيّرات Airflow الثلاثة التالية: gcp_project
وgcs_bucket
وgce_zone
.
استخدام gcloud
لضبط المتغيرات
أولاً، افتح Cloud Shell، الذي تم تثبيت حزمة Cloud SDK عليه بسهولة من أجلك.
اضبط متغيّر البيئة COMPOSER_INSTANCE
على اسم بيئة Composer.
COMPOSER_INSTANCE=my-composer-environment
لضبط متغيّرات Airflow باستخدام أداة سطر الأوامر gcloud، استخدِم الأمر gcloud composer environments run
مع الأمر الفرعي variables
. ينفِّذ الأمر gcloud composer
الأمر الفرعي variables
في واجهة سطر أوامر Airflow. يمرر الأمر الفرعي الوسيطات إلى أداة سطر الأوامر gcloud
.
عليك تنفيذ هذا الأمر ثلاث مرات، مع استبدال المتغيّرات بالمتغيّرات ذات الصلة بمشروعك.
اضبط gcp_project
باستخدام الأمر التالي، مع استبدال <your-project-id> برقم تعريف المشروع الذي دونته في الخطوة 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
ستبدو مخرجاتك على هذا النحو
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
اضبط gcs_bucket
باستخدام الأمر التالي، مع استبدال <your-bucket-name>
بمعرّف الحزمة الذي سجّلته في الخطوة 2. إذا اتّبعت الاقتراح، سيكون اسم الحزمة هو نفسه رقم تعريف المشروع. ستكون مخرجاتك مماثلة للأمر السابق.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
اضبط gce_zone
باستخدام الأمر التالي. ستكون مخرجاتك مشابهة للأوامر السابقة.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(اختياري) استخدام gcloud
لعرض متغيّر
للاطّلاع على قيمة متغيّر، يمكنك تنفيذ الأمر الفرعي variables
في واجهة سطر أوامر Airflow مع الوسيطة get
أو استخدام واجهة مستخدم Airflow.
على سبيل المثال:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
يمكنك إجراء ذلك باستخدام أيّ من المتغيّرات الثلاثة التي ضبطتها للتو: gcp_project
وgcs_bucket
وgce_zone
.
4. نموذج سير عمل
لنلقِ نظرة على رمز DAG الذي سنستخدمه في الخطوة 5. لا تقلق بشأن تنزيل الملفات الآن، ما عليك سوى المتابعة هنا.
هناك الكثير مما يجب توضيحه هنا، لذا دعنا نحلل هذا الأمر قليلاً.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
سنبدأ ببعض عمليات الاستيراد في Airflow:
airflow.models
- تتيح لنا الوصول إلى البيانات وإنشائها في قاعدة بيانات Airflow.airflow.contrib.operators
- مكان إقامة مشغّلي الشبكة من المنتدى في هذه الحالة، نحتاج إلىdataproc_operator
للوصول إلى Cloud Dataproc API.airflow.utils.trigger_rule
- لإضافة قواعد عامل التشغيل إلى عوامل التشغيل لدينا تتيح قواعد المشغِّل إمكانية تحكُّم أكثر دقة في ما إذا كان يجب تنفيذ عامل تشغيل في ما يتعلّق بحالة عناصره الرئيسية.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
يحدد هذا موقع ملف الإخراج. السطر المميّز هنا هو models.Variable.get('gcs_bucket')
الذي سيستخرج قيمة المتغيّر gcs_bucket
من قاعدة بيانات Airflow.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
- مكان توفُّر ملف .jar الذي سنشغّله في النهاية على مجموعة Cloud Dataproc. وتتم استضافة هذه الميزة نيابةً عنك على Google Cloud Platform.input_file
- الموقع الجغرافي للملف الذي يحتوي على البيانات التي ستُجري عليها وظيفة Hadoop العمليات الحسابية في النهاية. سنحمّل البيانات إلى هذا الموقع الجغرافي معًا في الخطوة 5.wordcount_args
- الوسائط التي سنرسلها إلى ملف jar
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
سيمنحنا ذلك عنصرًا زمنيًا يعادل منتصف الليل في اليوم السابق. على سبيل المثال، إذا تم تنفيذ ذلك في الساعة 11:00 في 4 مارس، سيمثل كائن التاريخ والوقت 00:00 في 3 مارس. هذا يتعلق بكيفية معالجة Airflow للجدولة. يمكنك الاطّلاع على مزيد من المعلومات حول ذلك هنا.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
يجب توفير المتغير default_dag_args
في شكل قاموس عند إنشاء DAG جديد:
'email_on_failure'
- يشير إلى ما إذا كان يجب إرسال تنبيهات عبر البريد الإلكتروني عند تعذُّر إكمال مهمة'email_on_retry'
- يشير إلى ما إذا كان يجب إرسال تنبيهات عبر البريد الإلكتروني عند إعادة محاولة تنفيذ مهمة'retries'
: تشير هذه القيمة إلى عدد محاولات إعادة المحاولة التي يجب أن يجريها Airflow في حال عدم نجاح DAG.'retry_delay'
: يشير إلى المدة التي يجب أن ينتظر فيها Airflow قبل محاولة إعادة المحاولة.'project_id'
: تُستخدَم هذه السمة لإخبار DAG بمعرّف مشروع Google Cloud Platform الذي سيتم ربطه به، وسيكون مطلوبًا لاحقًا مع "مشغِّل Dataproc".
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
يؤدي استخدام with models.DAG
إلى تضمين النص البرمجي أسفله في نفس DAG. نرى أيضًا ثلاث وسيطات تم تمريرها:
- الأولى، السلسلة، هي الاسم لإعطاء DAG الذي نقوم بإنشائه. في هذه الحالة، سنستخدم
composer_hadoop_tutorial
. -
schedule_interval
: عنصرdatetime.timedelta
، تم ضبطه هنا على يوم واحد. يعني ذلك أنّ مخطط DAG هذا سيحاول التنفيذ مرة واحدة في اليوم بعد'start_date'
التي تم ضبطها سابقًا في'default_dag_args'
default_args
- القاموس الذي أنشأناه سابقًا ويحتوي على الوسيطات التلقائية لـ DAG
إنشاء مجموعة Dataproc
بعد ذلك، سننشئ dataproc_operator.DataprocClusterCreateOperator
لإنشاء مجموعة Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
في هذا العامل التشغيلي، نرى بعض الوسيطات، وجميعها خاصة بهذا العامل التشغيلي باستثناء الأولى:
task_id
- كما هو الحال في BashOperator، هذا هو الاسم الذي نخصّصه لعامل التشغيل، والذي يمكن رؤيته من واجهة مستخدم Airflowcluster_name
- هو الاسم الذي نخصّصه لمجموعة Cloud Dataproc. لقد أطلقنا على هذا الإجراء اسمcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(يمكنك الاطّلاع على مربّع المعلومات للحصول على معلومات إضافية اختيارية).num_workers
- عدد وحدات العمل التي نخصّصها لمجموعة Cloud Dataproczone
- المنطقة الجغرافية التي نريد أن نتواجد فيها المجموعة العنقودية، كما هي محفوظة في قاعدة بيانات Airflow. سيؤدي ذلك إلى قراءة المتغيّر'gce_zone'
الذي ضبطناه في الخطوة 3.master_machine_type
- نوع الجهاز الذي نريد تخصيصه لوحدة التحكّم الرئيسية في Cloud Dataprocworker_machine_type
- نوع الجهاز الذي نريد تخصيصه لعامل Cloud Dataproc
إرسال مهمة Apache Hadoop
تسمح لنا السمة dataproc_operator.DataProcHadoopOperator
بإرسال مهمة إلى مجموعة Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
نقدّم العديد من المَعلمات:
task_id
: الاسم الذي نحدّده لهذه القطعة من مخطّط "مسار الإحالة الناجحة"main_jar
- موقع ملف .jar الذي نريد تشغيله مع المجموعةcluster_name
- اسم المجموعة التي سيتم تشغيل المهمة عليها، والذي ستلاحظ أنّه مطابق لما نعثر عليه في عامل التشغيل السابقarguments
: الوسيطات التي يتم إدخالها في ملف jar، كما تفعل في حال تنفيذ ملف .jar من سطر الأوامر
حذف المجموعة
عامل التشغيل الأخير الذي سننشئه هو dataproc_operator.DataprocClusterDeleteOperator
.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
كما يشير الاسم، سيحذف عامل التشغيل هذا مجموعة Cloud Dataproc معيّنة. نرى ثلاث وسيطات هنا:
task_id
- كما هو الحال في BashOperator، هذا هو الاسم الذي نخصّصه لعامل التشغيل، والذي يمكن رؤيته من واجهة مستخدم Airflowcluster_name
: الاسم الذي نحدّده لمجموعة Cloud Dataproc. في ما يلي، أطلقنا عليها اسمcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(اطّلِع على مربّع المعلومات بعد "إنشاء مجموعة Dataproc" للحصول على معلومات إضافية اختيارية).trigger_rule
- لقد ذكرنا "قواعد التفعيل" بشكل موجز أثناء عمليات الاستيراد في بداية هذه الخطوة، ولكن إليك مثالاً على تطبيقها. لا يتم تنفيذ مشغل Airflow تلقائيًا ما لم تكتمل جميع مشغلاته المتقدّمة بنجاح. لا تتطلّب قاعدة عامل التفعيلALL_DONE
سوى اكتمال جميع عوامل التشغيل في مرحلة ما قبل المعالجة، بغض النظر عمّا إذا كانت ناجحة أم لا. يعني ذلك هنا أنّه حتى إذا تعذّر إكمال وظيفة Hadoop، ما زلنا نريد إزالة المجموعة.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
أخيرًا، نريد تنفيذ هذه المشغّلات بترتيب معيّن، ويمكننا الإشارة إلى ذلك باستخدام مشغّلات تحويل البتات في Python. في هذه الحالة، سيتم تنفيذ create_dataproc_cluster
دائمًا أولاً، ثم run_dataproc_hadoop
وأخيرًا delete_dataproc_cluster
.
عند تجميع كل هذه العناصر معًا، سيظهر الرمز البرمجي على النحو التالي:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5- تحميل ملفات Airflow إلى Cloud Storage
انسخ DAG إلى مجلد /dags
- أولاً، افتح Cloud Shell، الذي تم تثبيت حزمة Cloud SDK عليه بسهولة من أجلك.
- استنسِخ مستودع نماذج Python وانتقِل إلى دليل composer/workflows.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- شغِّل الأمر التالي لضبط اسم مجلد DAGs على متغيّر بيئة.
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- شغِّل الأمر
gsutil
التالي لنسخ رمز البرنامج التعليمي إلى المكان الذي تم فيه إنشاء مجلد /dags.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
ستبدو النتيجة على النحو التالي:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6- استخدام واجهة مستخدم Airflow
للوصول إلى واجهة ويب Airflow باستخدام وحدة تحكُّم Google Cloud Platform:
|
للحصول على معلومات عن واجهة مستخدم Airflow، يُرجى الاطّلاع على مقالة الوصول إلى واجهة الويب.
عرض المتغيرات
ويتم الاحتفاظ بالمتغيرات التي حددتها سابقًا في بيئتك. يمكنك عرض المتغيّرات من خلال اختيار المشرف > المتغيّرات من شريط قوائم واجهة مستخدم Airflow.
استكشاف عروض DAG
عند تحميل ملف DAG إلى مجلد dags
في Cloud Storage، يحلّل تطبيق Cloud Composer الملف. وإذا لم يتم العثور على أي أخطاء، يظهر اسم سير العمل في قائمة DAG، ويتم وضع سير العمل في قائمة الانتظار لتشغيله على الفور. للاطّلاع على مخططات DAG، انقر على مخططات DAG في أعلى الصفحة.
انقر على composer_hadoop_tutorial
لفتح صفحة تفاصيل DAG. تتضمن هذه الصفحة تمثيلاً بيانيًا لمهام سير العمل والتبعيات.
الآن، في شريط الأدوات، انقر على عرض الرسم البياني ثم مرِّر مؤشر الماوس فوق الرسم البياني لكل مهمة للاطّلاع على حالتها. لاحظ أن الحد حول كل مهمة يشير أيضًا إلى الحالة (الحد الأخضر = قيد التشغيل؛ اللون الأحمر = فشل، وما إلى ذلك).
لتنفيذ سير العمل مرة أخرى من عرض الرسم البياني:
- في عرض الرسم البياني لواجهة مستخدم Airflow، انقر على الرسم
create_dataproc_cluster
. - انقر على محو لإعادة ضبط المهام الثلاث، ثم انقر على حسنًا للتأكيد.
يمكنك أيضًا التحقّق من حالة سير عمل composer-hadoop-tutorial
ونتائجه من خلال الانتقال إلى صفحات وحدة تحكُّم Google Cloud Platform التالية:
- مجموعات Cloud Dataproc لمراقبة إنشاء المجموعات وحذفها. تجدر الإشارة إلى أنّ المجموعة التي أنشأها سير العمل هي مجموعة مؤقتة، فهي متاحة فقط طوال مدة سير العمل ويتم حذفها كجزء من آخر مهمة سير عمل.
- Cloud Dataproc Jobs لعرض مهمة عدد الكلمات في Apache Hadoop. انقر على رقم تعريف المهمة للاطّلاع على إخراج سجلّ المهام.
- متصفّح Cloud Storage للاطّلاع على نتائج عدد الكلمات في مجلد
wordcount
في حزمة Cloud Storage التي أنشأتها لهذا البرنامج التعليمي
7- تنظيف
لتجنُّب تحصيل رسوم من حسابك على Google Cloud Platform مقابل الموارد المستخدَمة في هذا الدليل التعليمي للترميز:
- (اختياري) لحفظ بياناتك، نزِّل البيانات من حزمة Cloud Storage الخاصة ببيئة Cloud Composer وحزمة التخزين التي أنشأتها لهذا الدليل التعليمي.
- احذِف حزمة Cloud Storage التي أنشأتها لهذا الدليل التعليمي.
- احذف حزمة Cloud Storage الخاصة بالبيئة.
- احذف بيئة Cloud Composer. يُرجى العِلم أنّ حذف بيئتك لا يؤدي إلى حذف حزمة مساحة التخزين للبيئة.
يمكنك أيضًا اختياريًا حذف المشروع:
- في "وحدة تحكّم Google Cloud Platform"، انتقِل إلى صفحة المشاريع.
- في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
- في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.