การเรียกใช้งานจำนวนคำของ Hadoop บนคลัสเตอร์ Dataproc

1. บทนำ

เวิร์กโฟลว์เป็นกรณีการใช้งานทั่วไปในการวิเคราะห์ข้อมูล โดยจะเกี่ยวข้องกับการส่งผ่านข้อมูล การเปลี่ยนรูปแบบ และการวิเคราะห์ข้อมูลเพื่อค้นหาข้อมูลที่มีความหมาย เครื่องมือสำหรับจัดการเวิร์กโฟลว์ใน Google Cloud Platform คือ Cloud Composer ซึ่งเป็นเครื่องมือเวิร์กโฟลว์แบบโอเพนซอร์สยอดนิยมอย่าง Apache Airflow ในห้องทดลองนี้ คุณจะใช้ Cloud Composer เพื่อสร้างเวิร์กโฟลว์แบบง่ายๆ ที่สร้างคลัสเตอร์ Cloud Dataproc, วิเคราะห์โดยใช้ Cloud Dataproc และ Apache Hadoop จากนั้นจึงลบคลัสเตอร์ Cloud Dataproc ในภายหลัง

Cloud Composer คืออะไร

Cloud Composer คือบริการเวิร์กโฟลว์แบบเป็นกลุ่มที่มีการจัดการครบวงจร ซึ่งช่วยให้คุณเขียน กำหนดเวลา และตรวจสอบไปป์ไลน์ที่ครอบคลุมในระบบคลาวด์และศูนย์ข้อมูลภายในองค์กรได้ Cloud Composer สร้างขึ้นจากโปรเจ็กต์โอเพนซอร์ส Apache Airflow ยอดนิยมและดำเนินการโดยใช้ภาษาการเขียนโปรแกรม Python ทำให้ปราศจากการผูกมัดและใช้งานง่าย

การใช้ Cloud Composer แทนอินสแตนซ์ในเครื่องของ Apache Airflow จะช่วยให้ผู้ใช้ได้รับประโยชน์จาก Airflow ที่ดีที่สุดโดยไม่ต้องติดตั้งหรือมีค่าใช้จ่ายในการจัดการ

Apache Airflow คืออะไร

Apache Airflow เป็นเครื่องมือโอเพนซอร์สที่ใช้สำหรับเขียน กำหนดเวลา และตรวจสอบเวิร์กโฟลว์โดยใช้โปรแกรม คำศัพท์สำคัญ 2-3 คำที่ควรจดจำเกี่ยวกับ Airflow ที่คุณจะเห็นในห้องทดลองมีดังนี้

  • DAG - DAG (กราฟแบบ Acyclic โดยตรง) คือคอลเล็กชันของงานที่มีการจัดระเบียบซึ่งคุณต้องการกำหนดเวลาและเรียกใช้ DAG หรือเรียกอีกอย่างว่าเวิร์กโฟลว์ จะมีการกำหนดไว้ในไฟล์ Python มาตรฐาน
  • โอเปอเรเตอร์ - โอเปอเรเตอร์อธิบายงานเดียวในเวิร์กโฟลว์

Cloud Dataproc คืออะไร

Cloud Dataproc คือบริการ Apache Spark และ Apache Hadoop ที่มีการจัดการครบวงจรของ Google Cloud Platform Cloud Dataproc ผสานรวมกับบริการ GCP อื่นๆ ได้อย่างง่ายดาย ทำให้คุณมีแพลตฟอร์มที่มีประสิทธิภาพและสมบูรณ์สำหรับการประมวลผลข้อมูล การวิเคราะห์ และแมชชีนเลิร์นนิง

สิ่งที่คุณจะทำ

Codelab นี้จะแสดงวิธีสร้างและเรียกใช้เวิร์กโฟลว์ Apache Airflow ใน Cloud Composer เพื่อทำงานต่อไปนี้ให้เสร็จสมบูรณ์

  • สร้างคลัสเตอร์ Cloud Dataproc
  • เรียกใช้งานจำนวนคำของ Apache Hadoop บนคลัสเตอร์ และส่งผลลัพธ์ไปยัง Cloud Storage
  • ลบคลัสเตอร์

สิ่งที่คุณจะได้เรียนรู้

  • วิธีสร้างและเรียกใช้เวิร์กโฟลว์ Apache Airflow ใน Cloud Composer
  • วิธีใช้ Cloud Composer และ Cloud Dataproc เพื่อเรียกใช้การวิเคราะห์บนชุดข้อมูล
  • วิธีเข้าถึงสภาพแวดล้อม Cloud Composer ผ่านคอนโซล Google Cloud Platform, Cloud SDK และอินเทอร์เฟซเว็บ Airflow

สิ่งที่คุณต้องมี

  • บัญชี GCP
  • ความรู้พื้นฐานเกี่ยวกับ CLI
  • ความเข้าใจเบื้องต้นเกี่ยวกับ Python

2. การตั้งค่า GCP

สร้างโปรเจ็กต์

เลือกหรือสร้างโปรเจ็กต์ Google Cloud Platform

จดรหัสโปรเจ็กต์ไว้ ซึ่งจะใช้ในขั้นตอนถัดไป

หากคุณกำลังสร้างโปรเจ็กต์ใหม่ รหัสโปรเจ็กต์จะอยู่ใต้ชื่อโปรเจ็กต์ในหน้าการสร้าง

หากสร้างโปรเจ็กต์ไว้แล้ว คุณจะดูรหัสได้ในหน้าแรกของคอนโซลในการ์ดข้อมูลโปรเจ็กต์

เปิดใช้ API

เปิดใช้ Cloud Composer, Cloud Dataproc และ Cloud Storage API เมื่อเปิดใช้แล้ว สามารถเพิกเฉยต่อปุ่ม "ไปที่ข้อมูลเข้าสู่ระบบ" ได้ และดำเนินการต่อไปยังขั้นตอนถัดไปของบทแนะนำ

สร้างสภาพแวดล้อมคอมโพสเซอร์

สร้างสภาพแวดล้อม Cloud Composer ด้วยการกำหนดค่าต่อไปนี้

  • ชื่อ: my-composer-environment
  • สถานที่ตั้ง: us-central1
  • โซน: us-central1-a

การกําหนดค่าอื่นๆ ทั้งหมดจะยังคงเป็นค่าเริ่มต้นได้ คลิก "สร้าง" ที่ด้านล่าง

สร้างที่เก็บข้อมูล Cloud Storage

สร้างที่เก็บข้อมูล Cloud Storage ในโปรเจ็กต์ด้วยการกำหนดค่าต่อไปนี้

  • ชื่อ: <รหัสโปรเจ็กต์ของคุณ>
  • คลาสพื้นที่เก็บข้อมูลเริ่มต้น: หลายภูมิภาค
  • สถานที่: สหรัฐอเมริกา
  • โมเดลการควบคุมการเข้าถึง: แบบละเอียด

กด "สร้าง" เมื่อคุณพร้อม

3. การตั้งค่า Apache Airflow

การดูข้อมูลสภาพแวดล้อมของคอมโพสเซอร์

ในคอนโซล GCP ให้เปิดหน้าสภาพแวดล้อม

คลิกชื่อของสภาพแวดล้อมเพื่อดูรายละเอียด

หน้ารายละเอียดสภาพแวดล้อมจะแสดงข้อมูล เช่น URL ของอินเทอร์เฟซเว็บ Airflow, รหัสคลัสเตอร์ของ Google Kubernetes Engine, ชื่อที่เก็บข้อมูล Cloud Storage และเส้นทางสำหรับโฟลเดอร์ /dags

ใน Airflow DAG (กราฟแบบ Acyclic โดยตรง) คือคอลเล็กชันของงานที่มีการจัดระเบียบซึ่งคุณต้องการกำหนดเวลาและเรียกใช้ DAG หรือเรียกอีกอย่างว่าเวิร์กโฟลว์ จะมีการกำหนดไว้ในไฟล์ Python มาตรฐาน Cloud Composer จะกำหนดเวลา DAG ในโฟลเดอร์ /dags เท่านั้น โฟลเดอร์ /dags อยู่ในที่เก็บข้อมูล Cloud Storage ที่ Cloud Composer สร้างขึ้นโดยอัตโนมัติเมื่อคุณสร้างสภาพแวดล้อม

การตั้งค่าตัวแปรสภาพแวดล้อม Apache Airflow

ตัวแปร Apache Airflow เป็นแนวคิดเฉพาะ Airflow ซึ่งแตกต่างจากตัวแปรสภาพแวดล้อม ในขั้นตอนนี้ คุณจะตั้งค่าตัวแปร Airflow 3 รายการ ได้แก่ gcp_project, gcs_bucket และ gce_zone

กำลังใช้ gcloud เพื่อตั้งค่าตัวแปร

ก่อนอื่น ให้เปิด Cloud Shell ซึ่งมี Cloud SDK ติดตั้งให้คุณได้อย่างสะดวก

ตั้งค่าตัวแปรสภาพแวดล้อม COMPOSER_INSTANCE เป็นชื่อของสภาพแวดล้อมคอมโพสเซอร์

COMPOSER_INSTANCE=my-composer-environment

หากต้องการตั้งค่าตัวแปร Airflow โดยใช้เครื่องมือบรรทัดคำสั่ง gcloud ให้ใช้คำสั่ง gcloud composer environments run กับคำสั่งย่อย variables คำสั่ง gcloud composer นี้จะเรียกใช้คำสั่งย่อย Airflow CLI variables คำสั่งย่อยจะส่งอาร์กิวเมนต์ไปยังเครื่องมือบรรทัดคำสั่ง gcloud

คุณจะต้องเรียกใช้คำสั่งนี้ 3 ครั้ง โดยแทนที่ตัวแปรด้วยตัวแปรที่เกี่ยวข้องกับโปรเจ็กต์ของคุณ

ตั้งค่า gcp_project โดยใช้คำสั่งต่อไปนี้ โดยแทนที่ <your-project-id> รหัสโปรเจ็กต์ที่คุณจดไว้ในขั้นตอนที่ 2

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

เอาต์พุตจะมีลักษณะดังนี้

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

ตั้งค่า gcs_bucket โดยใช้คำสั่งต่อไปนี้ โดยแทนที่ <your-bucket-name> ด้วยรหัสที่เก็บข้อมูลที่คุณจดไว้ในขั้นตอนที่ 2 หากคุณทำตามคำแนะนำของเรา ชื่อที่เก็บข้อมูลของคุณจะเหมือนกับรหัสโปรเจ็กต์ เอาต์พุตจะคล้ายกับคำสั่งก่อนหน้า

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

ตั้งค่า gce_zone โดยใช้คำสั่งต่อไปนี้ เอาต์พุตจะคล้ายกับคำสั่งก่อนหน้า

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(ไม่บังคับ) การใช้ gcloud เพื่อดูตัวแปร

หากต้องการดูค่าของตัวแปร ให้เรียกใช้คำสั่งย่อย Airflow CLI variables ด้วยอาร์กิวเมนต์ get หรือใช้ Airflow UI

เช่น

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

ซึ่งทำได้โดยใช้ตัวแปร 3 ตัวที่เพิ่งตั้งค่า ซึ่งได้แก่ gcp_project, gcs_bucket และ gce_zone

4. ตัวอย่างเวิร์กโฟลว์

มาดูโค้ดสำหรับ DAG ที่เราจะใช้ในขั้นตอนที่ 5 กัน ไม่ต้องกังวลเรื่องการดาวน์โหลดไฟล์ เพียงทำตามขั้นตอนต่อไปนี้

ที่นี่มีของมากมายให้คลายเครียด เรามาดูรายละเอียดกัน

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

เราเริ่มต้นด้วยการนำเข้า Airflow บางส่วน:

  • airflow.models - ช่วยให้เราเข้าถึงและสร้างข้อมูลในฐานข้อมูล Airflow ได้
  • airflow.contrib.operators - ตำแหน่งที่โอเปอเรเตอร์จากชุมชนตั้งอยู่ ในกรณีนี้ เราต้องใช้ dataproc_operator เพื่อเข้าถึง Cloud Dataproc API
  • airflow.utils.trigger_rule - สำหรับเพิ่มกฎทริกเกอร์ในโอเปอเรเตอร์ กฎการเรียกใช้ช่วยให้ควบคุมได้อย่างละเอียดว่าโอเปอเรเตอร์ควรดำเนินการหรือไม่เมื่อเกี่ยวข้องกับสถานะของระดับบนสุด
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

แอตทริบิวต์นี้ระบุตำแหน่งไฟล์เอาต์พุตของเรา บรรทัดเด่นตรงนี้คือ models.Variable.get('gcs_bucket') ซึ่งจะจับค่าตัวแปร gcs_bucket จากฐานข้อมูล Airflow

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - ตำแหน่งของไฟล์ .jar ที่เราจะเรียกใช้บนคลัสเตอร์ Cloud Dataproc ในที่สุด ซึ่งโฮสต์อยู่บน GCP สำหรับคุณอยู่แล้ว
  • input_file - ตำแหน่งของไฟล์ที่มีข้อมูลซึ่งงาน Hadoop ของเราจะประมวลผลในที่สุด เราจะอัปโหลดข้อมูลไปยังตำแหน่งนั้นร่วมกันในขั้นตอนที่ 5
  • wordcount_args - อาร์กิวเมนต์ที่เราจะส่งไปในไฟล์ jar
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

ซึ่งจะทำให้ออบเจ็กต์วันที่และเวลาเทียบเท่ากับเที่ยงคืนของวันก่อนหน้า ตัวอย่างเช่น หากเรียกใช้ในวันที่ 4 มีนาคม เวลา 11:00 น. ออบเจ็กต์วันที่และเวลาจะแสดงเป็น 00:00 ของวันที่ 3 มีนาคม เรื่องนี้เกี่ยวข้องกับวิธีที่ Airflow จัดการการจัดตารางเวลา ดูข้อมูลเพิ่มเติมเกี่ยวกับเรื่องนี้ได้ที่นี่

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

ควรระบุตัวแปร default_dag_args ในรูปแบบพจนานุกรมเมื่อมีการสร้าง DAG ใหม่:

  • 'email_on_failure' - ระบุว่าควรส่งการแจ้งเตือนทางอีเมลเมื่องานล้มเหลวหรือไม่
  • 'email_on_retry' - ระบุว่าควรส่งการแจ้งเตือนทางอีเมลเมื่อมีการพยายามทำงานอีกครั้งหรือไม่
  • 'retries' - แสดงจำนวนครั้งที่ Airflow ควรดำเนินการซ้ำในกรณีที่ DAG ล้มเหลว
  • 'retry_delay' - แสดงระยะเวลาที่ Airflow ควรรอก่อนที่จะลองลองอีกครั้ง
  • 'project_id' - บอก DAG ว่าจะใช้รหัสโปรเจ็กต์ GCP ใดเพื่อเชื่อมโยง ซึ่งจำเป็นต้องใช้กับผู้ให้บริการ Dataproc ในภายหลัง
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

การใช้ with models.DAG จะบอกให้สคริปต์รวมทุกอย่างที่อยู่ด้านล่างไว้ใน DAG เดียวกัน นอกจากนี้ เรายังเห็นอาร์กิวเมนต์ 3 แบบที่ผ่านการตรวจสอบใน

  • ชื่อแรกคือ สตริง คือชื่อที่จะกำหนด DAG ที่เรากำลังสร้าง ในกรณีนี้เราใช้ composer_hadoop_tutorial
  • schedule_interval - ออบเจ็กต์ datetime.timedelta ซึ่งเราตั้งไว้ที่ 1 วัน ซึ่งหมายความว่า DAG นี้จะพยายามดำเนินการวันละครั้งหลังจาก 'start_date' ซึ่งตั้งค่าไว้ก่อนหน้าใน 'default_dag_args'
  • default_args - พจนานุกรมที่เราสร้างไว้ก่อนหน้านี้ ซึ่งมีอาร์กิวเมนต์เริ่มต้นสำหรับ DAG

สร้างคลัสเตอร์ Dataproc

ต่อไป เราจะสร้าง dataproc_operator.DataprocClusterCreateOperator ซึ่งจะสร้างคลัสเตอร์ Cloud Dataproc

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

ภายในโอเปอเรเตอร์นี้ เราพบอาร์กิวเมนต์สองสามรายการ ยกเว้นอาร์กิวเมนต์แรกที่เฉพาะเจาะจงสำหรับโอเปอเรเตอร์นี้

  • task_id - เช่นเดียวกับใน BashOperator นี่คือชื่อที่เรากำหนดให้กับโอเปอเรเตอร์ ซึ่งสามารถดูได้จาก Airflow UI
  • cluster_name - ชื่อที่เรากำหนดคลัสเตอร์ Cloud Dataproc โดยตั้งชื่อว่า composer-hadoop-tutorial-cluster-{{ ds_nodash }} (ดูกล่องข้อมูลสำหรับข้อมูลเพิ่มเติมที่ไม่บังคับ)
  • num_workers - จำนวนผู้ปฏิบัติงานที่เราจัดสรรให้กับคลัสเตอร์ Cloud Dataproc
  • zone - ภูมิภาคทางภูมิศาสตร์ที่เราต้องการให้คลัสเตอร์อยู่ ตามที่บันทึกไว้ในฐานข้อมูล Airflow ซึ่งจะอ่านตัวแปร 'gce_zone' ที่เราตั้งค่าไว้ในขั้นตอนที่ 3
  • master_machine_type - ประเภทเครื่องที่เราต้องการจัดสรรให้กับต้นแบบ Cloud Dataproc
  • worker_machine_type - ประเภทของเครื่องที่เราต้องการจัดสรรให้กับผู้ปฏิบัติงาน Cloud Dataproc

ส่งงาน Apache Hadoop

dataproc_operator.DataProcHadoopOperator ช่วยให้เราส่งงานไปยังคลัสเตอร์ Cloud Dataproc ได้

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

เรามีพารามิเตอร์หลายอย่าง ดังนี้

  • task_id - ชื่อที่เรากำหนดให้กับ DAG ส่วนนี้
  • main_jar - ตำแหน่งของไฟล์ .jar ที่ต้องการให้ทำงานกับคลัสเตอร์
  • cluster_name - ชื่อของคลัสเตอร์ที่จะเรียกใช้งาน ซึ่งคุณจะเห็นว่าเหมือนกับที่เราพบในโอเปอเรเตอร์ก่อนหน้า
  • arguments - อาร์กิวเมนต์ที่ส่งผ่านเข้าไปในไฟล์ jar ในลักษณะเดียวกับที่คุณจะดำเนินการกับไฟล์ .jar จากบรรทัดคำสั่ง

ลบคลัสเตอร์

โอเปอเรเตอร์สุดท้ายที่เราจะสร้างคือ dataproc_operator.DataprocClusterDeleteOperator

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

โอเปอเรเตอร์นี้จะลบคลัสเตอร์ Cloud Dataproc ที่ระบุ เราเห็นอาร์กิวเมนต์ 3 อย่างที่นี่

  • task_id - เช่นเดียวกับใน BashOperator นี่คือชื่อที่เรากำหนดให้กับโอเปอเรเตอร์ ซึ่งสามารถดูได้จาก Airflow UI
  • cluster_name - ชื่อที่เรากำหนดคลัสเตอร์ Cloud Dataproc ซึ่งเราได้ตั้งชื่อว่า composer-hadoop-tutorial-cluster-{{ ds_nodash }} (ดูข้อมูลเพิ่มเติมในส่วน "สร้างคลัสเตอร์ Dataproc" ได้ที่ช่องข้อมูล)
  • trigger_rule - เราพูดถึงกฎทริกเกอร์เป็นระยะเวลาสั้นๆ ระหว่างการนำเข้าเมื่อเริ่มต้นขั้นตอนนี้ แต่เรายังมีการดำเนินการอยู่ 1 รายการ โดยค่าเริ่มต้น โอเปอเรเตอร์ Airflow จะไม่ทำงานเว้นแต่โอเปอเรเตอร์อัปสตรีมทั้งหมดจะเสร็จสิ้น กฎทริกเกอร์ ALL_DONE กำหนดให้โอเปอเรเตอร์อัปสตรีมทั้งหมดต้องทำงานให้เสร็จสมบูรณ์เท่านั้น ไม่ว่าโอเปอเรเตอร์จะสำเร็จหรือไม่ก็ตาม ซึ่งหมายความว่าแม้ว่างาน Hadoop จะล้มเหลว แต่เราก็ยังต้องการคลัสเตอร์ดังกล่าวอยู่
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

สุดท้าย เราต้องการให้โอเปอเรเตอร์เหล่านี้ทำงานตามลำดับที่กำหนด และสามารถระบุได้โดยใช้โอเปอเรเตอร์บิตชิฟท์ของ Python ในกรณีนี้ create_dataproc_cluster จะดำเนินการก่อนเสมอ ตามด้วย run_dataproc_hadoop และสุดท้ายคือ delete_dataproc_cluster

เมื่อนำทุกอย่างมารวมกัน โค้ดจะมีลักษณะเช่นนี้

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. อัปโหลดไฟล์ Airflow ไปยัง Cloud Storage

คัดลอก DAG ไปยังโฟลเดอร์ /dags ของคุณ

  1. ก่อนอื่น ให้เปิด Cloud Shell ซึ่งมี Cloud SDK ติดตั้งให้คุณได้อย่างสะดวก
  2. โคลนที่เก็บตัวอย่าง Python และเปลี่ยนเป็นไดเรกทอรี Composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งชื่อโฟลเดอร์ DAG เป็นตัวแปรสภาพแวดล้อม
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. เรียกใช้คำสั่ง gsutil ต่อไปนี้เพื่อคัดลอกโค้ดบทแนะนำไปวางที่โฟลเดอร์ /dags ของคุณ
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

เอาต์พุตจะมีลักษณะดังนี้

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. การใช้ Airflow UI

วิธีเข้าถึงอินเทอร์เฟซเว็บ Airflow โดยใช้คอนโซล GCP

  1. เปิดหน้าสภาพแวดล้อม
  2. ในคอลัมน์เว็บเซิร์ฟเวอร์ Airflow สำหรับสภาพแวดล้อม ให้คลิกไอคอนหน้าต่างใหม่ UI เว็บ Airflow จะเปิดในหน้าต่างเบราว์เซอร์ใหม่

ดูข้อมูลเกี่ยวกับ UI ของ Airflow ได้ที่การเข้าถึงอินเทอร์เฟซเว็บ

ดูตัวแปร

ตัวแปรที่คุณตั้งค่าไว้ก่อนหน้านี้จะยังคงอยู่ในสภาพแวดล้อม คุณสามารถดูตัวแปรได้โดยเลือกผู้ดูแลระบบ > ตัวแปรจากแถบเมนูของ UI ใน Airflow

มีการเลือกแท็บรายการและแสดงตารางที่มีคีย์และคีย์ค่าต่อไปนี้: gcp_project, ค่า: คีย์รหัสโปรเจ็กต์: gcs_bucket, ค่า: gs://bucket-name key: gce_zone, value: zone

การสำรวจการเรียกใช้ DAG

เมื่อคุณอัปโหลดไฟล์ DAG ไปยังโฟลเดอร์ dags ใน Cloud Storage แล้ว Cloud Composer จะแยกวิเคราะห์ไฟล์นั้น หากไม่พบข้อผิดพลาด ชื่อของเวิร์กโฟลว์จะปรากฏในรายการ DAG และเวิร์กโฟลว์จะอยู่ในคิวเพื่อเรียกใช้ทันที หากต้องการดู DAG ให้คลิก DAG ที่ด้านบนของหน้า

84a29c71f20bff98.png

คลิก composer_hadoop_tutorial เพื่อเปิดหน้ารายละเอียด DAG หน้านี้จะมีการนำเสนองานเวิร์กโฟลว์และทรัพยากร Dependency แบบกราฟิก

f4f1663c7a37f47c.png

จากนั้นคลิกมุมมองกราฟในแถบเครื่องมือ แล้ววางเมาส์เหนือกราฟิกของแต่ละงานเพื่อดูสถานะ โปรดทราบว่าเส้นขอบรอบแต่ละงานจะระบุสถานะด้วย (เส้นขอบสีเขียว = กำลังทำงาน; สีแดง = ล้มเหลว ฯลฯ)

4c5a0c6fa9f88513.png

หากต้องการเรียกใช้เวิร์กโฟลว์อีกครั้งจากมุมมองกราฟ ให้ทำดังนี้

  1. ในมุมมองกราฟ UI ของ Airflow ให้คลิกกราฟิก create_dataproc_cluster
  2. คลิกล้างเพื่อรีเซ็ตงานทั้ง 3 รายการ แล้วคลิกตกลงเพื่อยืนยัน

fd1b23b462748f47.png

นอกจากนี้ คุณยังตรวจสอบสถานะและผลลัพธ์ของเวิร์กโฟลว์ composer-hadoop-tutorial ได้โดยไปที่หน้าคอนโซล GCP ต่อไปนี้

  • คลัสเตอร์ Cloud Dataproc เพื่อตรวจสอบการสร้างและการลบคลัสเตอร์ โปรดทราบว่าคลัสเตอร์ที่สร้างโดยเวิร์กโฟลว์เป็นแบบชั่วคราว ซึ่งมีอยู่ในช่วงเวลาของเวิร์กโฟลว์เท่านั้นและจะถูกลบเป็นส่วนหนึ่งของงานเวิร์กโฟลว์ล่าสุด
  • งาน Cloud Dataproc เพื่อดูหรือตรวจสอบงานจำนวนคำของ Apache Hadoop คลิกรหัสงานเพื่อดูเอาต์พุตบันทึกงาน
  • เบราว์เซอร์ Cloud Storage เพื่อดูผลลัพธ์ของจำนวนคำในโฟลเดอร์ wordcount ในที่เก็บข้อมูล Cloud Storage ที่คุณสร้างขึ้นสำหรับ Codelab นี้

7. ล้างข้อมูล

โปรดทำดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP สำหรับทรัพยากรที่ใช้ใน Codelab นี้

  1. (ไม่บังคับ) หากต้องการบันทึกข้อมูล ให้ดาวน์โหลดข้อมูลจากที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อม Cloud Composer และที่เก็บข้อมูลของพื้นที่เก็บข้อมูลที่คุณสร้างขึ้นสำหรับ Codelab นี้
  2. ลบที่เก็บข้อมูล Cloud Storage ที่คุณสร้างไว้สำหรับ Codelab นี้
  3. ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อม
  4. ลบสภาพแวดล้อม Cloud Composer โปรดทราบว่าการลบสภาพแวดล้อมจะไม่ลบที่เก็บข้อมูลของพื้นที่เก็บข้อมูลสำหรับสภาพแวดล้อมนั้น

นอกจากนี้ คุณยังเลือกลบโปรเจ็กต์ได้ด้วย โดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเครื่องเพื่อลบโปรเจ็กต์