تشغيل مهمة Hadoop عددًا من الكلمات على مجموعة Dataproc

1. مقدمة

تُعدّ سير العمل حالات استخدام شائعة في تحليلات البيانات، إذ تتضمن نقل البيانات وتحويلها وتحليلها للعثور على المعلومات المفيدة فيها. في Google Cloud Platform، أداة تنسيق مهام سير العمل هي Cloud Composer، وهي إصدار مستضاف من أداة سير العمل المفتوحة المصدر الشائعة Apache Airflow. في هذا التمرين، ستستخدم Cloud Composer لإنشاء سير عمل بسيط يؤدي إلى إنشاء مجموعة Cloud Dataproc وتحليلها باستخدام Cloud Dataproc وApache Hadoop، ثم حذف مجموعة Cloud Dataproc بعد ذلك.

ما هو Cloud Composer؟

Cloud Composer هي خدمة لإدارة سير العمل مُدارة بالكامل، وتتيح لك إنشاء وجدولة ومراقبة مسارات التعلّم التي تشمل السحابة الإلكترونية ومراكز البيانات داخل الشركة. تم إنشاء Cloud Composer استنادًا إلى مشروع Apache Airflow المفتوح المصدر الشهير ويتم تشغيله باستخدام لغة برمجة Python، وهو سهل الاستخدام وخالٍ من القيود.

باستخدام Cloud Composer بدلاً من نسخة محلية من Apache Airflow، يمكن للمستخدمين الاستفادة من أفضل ميزات Airflow بدون أيّ تكاليف إضافية للتثبيت أو الإدارة.

ما هو Apache Airflow؟

Apache Airflow هو أداة مفتوحة المصدر تُستخدم للمؤلف البرمجي وجدولة المهام ومراقبتها آليًا. هناك بعض المصطلحات الرئيسية التي يجب تذكرها فيما يتعلق بـ Airflow، والتي ستراها خلال التمرين المعملي:

  • مخطّط سير العمل (DAG): مخطّط سير العمل (DAG) هو مجموعة من المهام المنظَّمة التي تريد جدولتها وتنفيذها. يتم تعريف مخططات DAG، المعروفة أيضًا باسم سير العمل، في ملفات بايثون القياسية
  • عامل التشغيل: يصف عامل تشغيل مهمة واحدة في سير عمل

ما هي Cloud Dataproc؟

Cloud Dataproc هي خدمة Apache Spark وApache Hadoop المُدارة بالكامل من Google Cloud Platform. يمكن دمج Cloud Dataproc بسهولة مع خدمات GCP الأخرى، ما يمنحك منصة فعّالة وكاملة لمعالجة البيانات والإحصاءات وتعلُّم الآلة.

ما ستفعله

يعرض لك هذا الدرس التطبيقي كيفية إنشاء سير عمل Apache Airflow وتشغيله في Cloud Composer والذي يُكمل المهام التالية:

  • إنشاء مجموعة Cloud Dataproc
  • تشغيل مهمة احتساب عدد الكلمات في Apache Hadoop على المجموعة، وإخراج نتائجها إلى Cloud Storage
  • حذف المجموعة

ما ستتعرّف عليه

  • كيفية إنشاء سير عمل Apache Airflow وتشغيله في Cloud Composer
  • كيفية استخدام Cloud Composer وCloud Dataproc لإجراء تحليل على مجموعة بيانات
  • كيفية الوصول إلى بيئة Cloud Composer من خلال Google Cloud Platform Console وCloud SDK وواجهة ويب Airflow

المتطلبات

  • حساب Google Cloud Platform
  • معرفة واجهة سطر الأوامر (CLI) الأساسية
  • فهم أساسي للغة بايثون

2. إعداد Google Cloud Platform

إنشاء المشروع

اختَر مشروعًا على Google Cloud Platform أو أنشئ مشروعًا.

دوِّن رقم تعريف المشروع الذي ستستخدمه في الخطوات اللاحقة.

إذا كنت بصدد إنشاء مشروع جديد، يمكنك العثور على رقم تعريف المشروع أسفل اسم المشروع مباشرةً في صفحة الإنشاء.

إذا سبق لك إنشاء مشروع، يمكنك العثور على رقم التعريف على صفحة وحدة التحكّم الرئيسية في بطاقة معلومات المشروع.

تفعيل واجهات برمجة التطبيقات

فعِّل واجهات برمجة تطبيقات Cloud Composer وCloud Dataproc وCloud Storage. بعد تفعيلها، يمكنك تجاهل الزر "الانتقال إلى بيانات الاعتماد" والمتابعة إلى الخطوة التالية من البرنامج التعليمي.

إنشاء بيئة Composer

أنشئ بيئة Cloud Composer باستخدام الإعدادات التالية:

  • الاسم: my-composer-environment
  • الموقع: us-central1
  • المنطقة: us-central1-a

ويمكن أن تظل جميع الإعدادات الأخرى على حالتها التلقائية. انقر على "إنشاء" في الأسفل.

إنشاء حزمة Cloud Storage

في مشروعك، أنشِئ حزمة Cloud Storage باستخدام الإعدادات التالية:

  • الاسم: <your-project-id>
  • فئة التخزين التلقائية: متعددة المناطق
  • الموقع الجغرافي: الولايات المتحدة
  • نموذج التحكم في الوصول: دقيق

اضغط على "إنشاء" عندما تصبح جاهزًا

3- إعداد Apache Airflow

عرض معلومات بيئة Composer

في وحدة تحكُّم Google Cloud Platform، افتح صفحة البيئات.

انقر على اسم البيئة للاطّلاع على تفاصيلها.

توفّر صفحة تفاصيل البيئة معلومات، مثل عنوان URL لواجهة الويب Airflow ورقم تعريف مجموعة Google Kubernetes Engine واسم حزمة Cloud Storage ومسار مجلد /dags.

في Airflow، الرسم البياني المُوجَّه غير المُعادِل (DAG) هو مجموعة من المهام المنظَّمة التي تريد جدولتها وتنفيذها. يتم تحديد مخططات DAG، التي تُعرف أيضًا باسم سير العمل، في ملفات Python العادية. لا تحدّد خدمة Cloud Composer سوى جداول البيانات الوصفية في مجلد /dags. يتوفّر مجلد /dags في حزمة Cloud Storage التي تنشئها Cloud Composer تلقائيًا عند إنشاء بيئتك.

ضبط متغيّرات بيئة Apache Airflow

متغيّرات Apache Airflow هي مفهوم خاص بخدمة Airflow يختلف عن متغيّرات البيئة. في هذه الخطوة، عليك إعداد متغيّرات Airflow الثلاثة التالية: gcp_project وgcs_bucket وgce_zone.

استخدام gcloud لضبط المتغيرات

أولاً، افتح Cloud Shell، الذي تم تثبيت حزمة Cloud SDK عليه بسهولة من أجلك.

اضبط متغيّر البيئة COMPOSER_INSTANCE على اسم بيئة Composer.

COMPOSER_INSTANCE=my-composer-environment

لضبط متغيّرات Airflow باستخدام أداة سطر الأوامر gcloud، استخدِم الأمر gcloud composer environments run مع الأمر الفرعي variables. ينفِّذ الأمر gcloud composer الأمر الفرعي variables في واجهة سطر أوامر Airflow. يمرر الأمر الفرعي الوسيطات إلى أداة سطر الأوامر gcloud.

عليك تنفيذ هذا الأمر ثلاث مرات، مع استبدال المتغيّرات بالمتغيّرات ذات الصلة بمشروعك.

اضبط gcp_project باستخدام الأمر التالي، مع استبدال <your-project-id> برقم تعريف المشروع الذي دونته في الخطوة 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

ستبدو مخرجاتك على هذا النحو

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

اضبط gcs_bucket باستخدام الأمر التالي، مع استبدال <your-bucket-name> بمعرّف الحزمة الذي سجّلته في الخطوة 2. إذا اتّبعت الاقتراح، سيكون اسم الحزمة هو نفسه رقم تعريف المشروع. ستكون مخرجاتك مماثلة للأمر السابق.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

اضبط gce_zone باستخدام الأمر التالي. ستكون مخرجاتك مشابهة للأوامر السابقة.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(اختياري) استخدام gcloud لعرض متغيّر

للاطّلاع على قيمة متغيّر، يمكنك تنفيذ الأمر الفرعي variables في واجهة سطر أوامر Airflow مع الوسيطة get أو استخدام واجهة مستخدم Airflow.

على سبيل المثال:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

يمكنك إجراء ذلك باستخدام أيّ من المتغيّرات الثلاثة التي ضبطتها للتو: gcp_project وgcs_bucket وgce_zone.

4. نموذج سير عمل

لنلقِ نظرة على رمز DAG الذي سنستخدمه في الخطوة 5. لا تقلق بشأن تنزيل الملفات الآن، ما عليك سوى المتابعة هنا.

هناك الكثير مما يجب توضيحه هنا، لذا دعنا نحلل هذا الأمر قليلاً.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

سنبدأ ببعض عمليات الاستيراد في Airflow:

  • airflow.models - تتيح لنا الوصول إلى البيانات وإنشائها في قاعدة بيانات Airflow.
  • airflow.contrib.operators - مكان إقامة مشغّلي الشبكة من المنتدى في هذه الحالة، نحتاج إلى dataproc_operator للوصول إلى Cloud Dataproc API.
  • airflow.utils.trigger_rule - لإضافة قواعد عامل التشغيل إلى عوامل التشغيل لدينا تتيح قواعد المشغِّل إمكانية تحكُّم أكثر دقة في ما إذا كان يجب تنفيذ عامل تشغيل في ما يتعلّق بحالة عناصره الرئيسية.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

يحدد هذا موقع ملف الإخراج. السطر المميّز هنا هو models.Variable.get('gcs_bucket') الذي سيستخرج قيمة المتغيّر gcs_bucket من قاعدة بيانات Airflow.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - مكان توفُّر ملف ‎.jar الذي سنشغّله في النهاية على مجموعة Cloud Dataproc. وتتم استضافة هذه الميزة نيابةً عنك على Google Cloud Platform.
  • input_file - الموقع الجغرافي للملف الذي يحتوي على البيانات التي ستُجري عليها وظيفة Hadoop العمليات الحسابية في النهاية. سنحمّل البيانات إلى هذا الموقع الجغرافي معًا في الخطوة 5.
  • wordcount_args - الوسائط التي سنرسلها إلى ملف jar
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

سيمنحنا ذلك عنصرًا زمنيًا يعادل منتصف الليل في اليوم السابق. على سبيل المثال، إذا تم تنفيذ ذلك في الساعة 11:00 في 4 مارس، سيمثل كائن التاريخ والوقت 00:00 في 3 مارس. هذا يتعلق بكيفية معالجة Airflow للجدولة. يمكنك الاطّلاع على مزيد من المعلومات حول ذلك هنا.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

يجب توفير المتغير default_dag_args في شكل قاموس عند إنشاء DAG جديد:

  • 'email_on_failure' - يشير إلى ما إذا كان يجب إرسال تنبيهات عبر البريد الإلكتروني عند تعذُّر إكمال مهمة
  • 'email_on_retry' - يشير إلى ما إذا كان يجب إرسال تنبيهات عبر البريد الإلكتروني عند إعادة محاولة تنفيذ مهمة
  • 'retries': تشير هذه القيمة إلى عدد محاولات إعادة المحاولة التي يجب أن يجريها Airflow في حال عدم نجاح DAG.
  • 'retry_delay': يشير إلى المدة التي يجب أن ينتظر فيها Airflow قبل محاولة إعادة المحاولة.
  • 'project_id': تُستخدَم هذه السمة لإخبار DAG بمعرّف مشروع Google Cloud Platform الذي سيتم ربطه به، وسيكون مطلوبًا لاحقًا مع "مشغِّل Dataproc".
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

يؤدي استخدام with models.DAG إلى تضمين النص البرمجي أسفله في نفس DAG. نرى أيضًا ثلاث وسيطات تم تمريرها:

  • الأولى، السلسلة، هي الاسم لإعطاء DAG الذي نقوم بإنشائه. في هذه الحالة، سنستخدم composer_hadoop_tutorial.
  • schedule_interval: عنصر datetime.timedelta، تم ضبطه هنا على يوم واحد. يعني ذلك أنّ مخطط DAG هذا سيحاول التنفيذ مرة واحدة في اليوم بعد 'start_date' التي تم ضبطها سابقًا في 'default_dag_args'
  • default_args - القاموس الذي أنشأناه سابقًا ويحتوي على الوسيطات التلقائية لـ DAG

إنشاء مجموعة Dataproc

بعد ذلك، سننشئ dataproc_operator.DataprocClusterCreateOperator لإنشاء مجموعة Cloud Dataproc.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

في هذا العامل التشغيلي، نرى بعض الوسيطات، وجميعها خاصة بهذا العامل التشغيلي باستثناء الأولى:

  • task_id - كما هو الحال في BashOperator، هذا هو الاسم الذي نخصّصه لعامل التشغيل، والذي يمكن رؤيته من واجهة مستخدم Airflow
  • cluster_name - هو الاسم الذي نخصّصه لمجموعة Cloud Dataproc. لقد أطلقنا على هذا الإجراء اسم composer-hadoop-tutorial-cluster-{{ ds_nodash }} (يمكنك الاطّلاع على مربّع المعلومات للحصول على معلومات إضافية اختيارية).
  • num_workers - عدد وحدات العمل التي نخصّصها لمجموعة Cloud Dataproc
  • zone - المنطقة الجغرافية التي نريد أن نتواجد فيها المجموعة العنقودية، كما هي محفوظة في قاعدة بيانات Airflow. سيؤدي ذلك إلى قراءة المتغيّر 'gce_zone' الذي ضبطناه في الخطوة 3.
  • master_machine_type - نوع الجهاز الذي نريد تخصيصه لوحدة التحكّم الرئيسية في Cloud Dataproc
  • worker_machine_type - نوع الجهاز الذي نريد تخصيصه لعامل Cloud Dataproc

إرسال مهمة Apache Hadoop

تسمح لنا السمة dataproc_operator.DataProcHadoopOperator بإرسال مهمة إلى مجموعة Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

نقدّم العديد من المَعلمات:

  • task_id: الاسم الذي نحدّده لهذه القطعة من مخطّط "مسار الإحالة الناجحة"
  • main_jar - موقع ملف .jar الذي نريد تشغيله مع المجموعة
  • cluster_name - اسم المجموعة التي سيتم تشغيل المهمة عليها، والذي ستلاحظ أنّه مطابق لما نعثر عليه في عامل التشغيل السابق
  • arguments: الوسيطات التي يتم إدخالها في ملف jar، كما تفعل في حال تنفيذ ملف .jar من سطر الأوامر

حذف المجموعة

عامل التشغيل الأخير الذي سننشئه هو dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

كما يشير الاسم، سيحذف عامل التشغيل هذا مجموعة Cloud Dataproc معيّنة. نرى ثلاث وسيطات هنا:

  • task_id - كما هو الحال في BashOperator، هذا هو الاسم الذي نخصّصه لعامل التشغيل، والذي يمكن رؤيته من واجهة مستخدم Airflow
  • cluster_name: الاسم الذي نحدّده لمجموعة Cloud Dataproc. في ما يلي، أطلقنا عليها اسم composer-hadoop-tutorial-cluster-{{ ds_nodash }} (اطّلِع على مربّع المعلومات بعد "إنشاء مجموعة Dataproc" للحصول على معلومات إضافية اختيارية).
  • trigger_rule - لقد ذكرنا "قواعد التفعيل" بشكل موجز أثناء عمليات الاستيراد في بداية هذه الخطوة، ولكن إليك مثالاً على تطبيقها. لا يتم تنفيذ مشغل Airflow تلقائيًا ما لم تكتمل جميع مشغلاته المتقدّمة بنجاح. لا تتطلّب قاعدة عامل التفعيل ALL_DONE سوى اكتمال جميع عوامل التشغيل في مرحلة ما قبل المعالجة، بغض النظر عمّا إذا كانت ناجحة أم لا. يعني ذلك هنا أنّه حتى إذا تعذّر إكمال وظيفة Hadoop، ما زلنا نريد إزالة المجموعة.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

أخيرًا، نريد تنفيذ هذه المشغّلات بترتيب معيّن، ويمكننا الإشارة إلى ذلك باستخدام مشغّلات تحويل البتات في Python. في هذه الحالة، سيتم تنفيذ create_dataproc_cluster دائمًا أولاً، ثم run_dataproc_hadoop وأخيرًا delete_dataproc_cluster.

عند تجميع كل هذه العناصر معًا، سيظهر الرمز البرمجي على النحو التالي:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5- تحميل ملفات Airflow إلى Cloud Storage

انسخ DAG إلى مجلد /dags

  1. أولاً، افتح Cloud Shell، الذي تم تثبيت حزمة Cloud SDK عليه بسهولة من أجلك.
  2. استنسِخ مستودع نماذج Python وانتقِل إلى دليل composer/workflows.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. شغِّل الأمر التالي لضبط اسم مجلد DAGs على متغيّر بيئة.
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. شغِّل الأمر gsutil التالي لنسخ رمز البرنامج التعليمي إلى المكان الذي تم فيه إنشاء مجلد ‎/dags.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

ستبدو النتيجة على النحو التالي:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6- استخدام واجهة مستخدم Airflow

للوصول إلى واجهة ويب Airflow باستخدام وحدة تحكُّم Google Cloud Platform:

  1. افتح صفحة البيئات.
  2. في عمود خادم ويب Airflow للبيئة، انقر على رمز نافذة جديدة. يتم فتح واجهة مستخدم الويب في Airflow في نافذة متصفّح جديدة.

للحصول على معلومات عن واجهة مستخدم Airflow، يُرجى الاطّلاع على مقالة الوصول إلى واجهة الويب.

عرض المتغيرات

ويتم الاحتفاظ بالمتغيرات التي حددتها سابقًا في بيئتك. يمكنك عرض المتغيّرات من خلال اختيار المشرف > المتغيّرات من شريط قوائم واجهة مستخدم Airflow.

تم اختيار علامة التبويب &quot;قائمة&quot; وتعرض جدولاً يحتوي على مفتاح القيم والمفاتيح التالية: gcp_project, value: مفتاح المشروع: gcs_bucket, value: gs://bucket-name key: gce_zone, value: المنطقة

استكشاف عروض DAG

عند تحميل ملف DAG إلى مجلد dags في Cloud Storage، يحلّل تطبيق Cloud Composer الملف. وإذا لم يتم العثور على أي أخطاء، يظهر اسم سير العمل في قائمة DAG، ويتم وضع سير العمل في قائمة الانتظار لتشغيله على الفور. للاطّلاع على مخططات DAG، انقر على مخططات DAG في أعلى الصفحة.

84a29c71f20bff98.png

انقر على composer_hadoop_tutorial لفتح صفحة تفاصيل DAG. تتضمن هذه الصفحة تمثيلاً بيانيًا لمهام سير العمل والتبعيات.

f4f1663c7a37f47c.png

الآن، في شريط الأدوات، انقر على عرض الرسم البياني ثم مرِّر مؤشر الماوس فوق الرسم البياني لكل مهمة للاطّلاع على حالتها. لاحظ أن الحد حول كل مهمة يشير أيضًا إلى الحالة (الحد الأخضر = قيد التشغيل؛ اللون الأحمر = فشل، وما إلى ذلك).

4c5a0c6fa9f88513.png

لتنفيذ سير العمل مرة أخرى من عرض الرسم البياني:

  1. في عرض الرسم البياني لواجهة مستخدم Airflow، انقر على الرسم create_dataproc_cluster.
  2. انقر على محو لإعادة ضبط المهام الثلاث، ثم انقر على حسنًا للتأكيد.

fd1b23b462748f47.png

يمكنك أيضًا التحقّق من حالة سير عمل composer-hadoop-tutorial ونتائجه من خلال الانتقال إلى صفحات وحدة تحكُّم Google Cloud Platform التالية:

  • مجموعات Cloud Dataproc لمراقبة إنشاء المجموعات وحذفها. تجدر الإشارة إلى أنّ المجموعة التي أنشأها سير العمل هي مجموعة مؤقتة، فهي متاحة فقط طوال مدة سير العمل ويتم حذفها كجزء من آخر مهمة سير عمل.
  • Cloud Dataproc Jobs لعرض مهمة عدد الكلمات في Apache Hadoop. انقر على رقم تعريف المهمة للاطّلاع على إخراج سجلّ المهام.
  • متصفّح Cloud Storage للاطّلاع على نتائج عدد الكلمات في مجلد wordcount في حزمة Cloud Storage التي أنشأتها لهذا البرنامج التعليمي

7- تنظيف

لتجنُّب تحصيل رسوم من حسابك على Google Cloud Platform مقابل الموارد المستخدَمة في هذا الدليل التعليمي للترميز:

  1. (اختياري) لحفظ بياناتك، نزِّل البيانات من حزمة Cloud Storage الخاصة ببيئة Cloud Composer وحزمة التخزين التي أنشأتها لهذا الدليل التعليمي.
  2. احذِف حزمة Cloud Storage التي أنشأتها لهذا الدليل التعليمي.
  3. احذف حزمة Cloud Storage الخاصة بالبيئة.
  4. احذف بيئة Cloud Composer. يُرجى العِلم أنّ حذف بيئتك لا يؤدي إلى حذف حزمة مساحة التخزين للبيئة.

يمكنك أيضًا اختياريًا حذف المشروع:

  1. في "وحدة تحكّم Google Cloud Platform"، انتقِل إلى صفحة المشاريع.
  2. في قائمة المشاريع، اختَر المشروع الذي تريد حذفه وانقر على حذف.
  3. في المربّع، اكتب رقم تعريف المشروع، ثم انقر على إيقاف لحذف المشروع.