การเรียกใช้งานจำนวนคำของ Hadoop บนคลัสเตอร์ Dataproc

1. บทนำ

เวิร์กโฟลว์เป็นกรณีการใช้งานที่พบบ่อยในการวิเคราะห์ข้อมูล ซึ่งเกี่ยวข้องกับการนำเข้า การเปลี่ยนรูปแบบ และการวิเคราะห์ข้อมูลเพื่อค้นหาข้อมูลที่สําคัญ ใน Google Cloud Platform เครื่องมือสำหรับจัดระเบียบเวิร์กโฟลว์คือ Cloud Composer ซึ่งเป็นเครื่องมือเวิร์กโฟลว์โอเพนซอร์สยอดนิยม Apache Airflow เวอร์ชันที่โฮสต์ ในห้องทดลองนี้ คุณจะใช้ Cloud Composer เพื่อสร้างเวิร์กโฟลว์แบบง่ายๆ ที่สร้างคลัสเตอร์ Cloud Dataproc, วิเคราะห์โดยใช้ Cloud Dataproc และ Apache Hadoop จากนั้นจึงลบคลัสเตอร์ Cloud Dataproc ในภายหลัง

Cloud Composer คืออะไร

Cloud Composer เป็นบริการเวิร์กโฟลว์แบบจัดการเป็นกลุ่มอย่างครบวงจรที่ช่วยให้คุณเขียน กำหนดเวลา และตรวจสอบไปป์ไลน์ที่ครอบคลุมทั้งระบบคลาวด์และศูนย์ข้อมูลในองค์กร Cloud Composer สร้างขึ้นจากโปรเจ็กต์โอเพนซอร์ส Apache Airflow ยอดนิยมและดำเนินการโดยใช้ภาษาการเขียนโปรแกรม Python ทำให้ปราศจากการผูกมัดและใช้งานง่าย

การใช้ Cloud Composer แทนอินสแตนซ์ในเครื่องของ Apache Airflow จะช่วยให้ผู้ใช้ได้รับประโยชน์จาก Airflow ที่ดีที่สุดโดยไม่ต้องติดตั้งหรือมีค่าใช้จ่ายในการจัดการ

Apache Airflow คืออะไร

Apache Airflow เป็นเครื่องมือโอเพนซอร์สที่ใช้สำหรับเขียน กำหนดเวลา และตรวจสอบเวิร์กโฟลว์โดยใช้โปรแกรม คําศัพท์สําคัญบางคําเกี่ยวกับ Airflow ที่คุณจะเห็นตลอดทั้งห้องทดลองมีดังนี้

  • DAG - DAG (กราฟแบบ Acyclic โดยตรง) คือคอลเล็กชันของงานที่มีการจัดระเบียบซึ่งคุณต้องการกำหนดเวลาและเรียกใช้ DAG หรือที่เรียกว่าเวิร์กโฟลว์ จะกำหนดในไฟล์ Python มาตรฐาน
  • โอเปอเรเตอร์ - โอเปอเรเตอร์อธิบายงานเดียวในเวิร์กโฟลว์

Cloud Dataproc คืออะไร

Cloud Dataproc เป็นบริการ Apache Spark และ Apache Hadoop ที่มีการจัดการโดยสมบูรณ์ของ Google Cloud Platform Cloud Dataproc ผสานรวมกับบริการอื่นๆ ของ GCP ได้อย่างง่ายดาย จึงเป็นแพลตฟอร์มที่มีประสิทธิภาพและสมบูรณ์แบบสําหรับการประมวลผลข้อมูล การวิเคราะห์ และแมชชีนเลิร์นนิง

สิ่งที่คุณต้องทำ

Codelab นี้แสดงวิธีสร้างและเรียกใช้เวิร์กโฟลว์ Apache Airflow ใน Cloud Composer ซึ่งทํางานต่อไปนี้ให้เสร็จสมบูรณ์

  • สร้างคลัสเตอร์ Cloud Dataproc
  • เรียกใช้งานนับคำของ Apache Hadoop ในคลัสเตอร์ และแสดงผลลัพธ์ไปยัง Cloud Storage
  • ลบคลัสเตอร์

สิ่งที่คุณจะได้เรียนรู้

  • วิธีสร้างและเรียกใช้เวิร์กโฟลว์ Apache Airflow ใน Cloud Composer
  • วิธีใช้ Cloud Composer และ Cloud Dataproc เพื่อเรียกใช้การวิเคราะห์บนชุดข้อมูล
  • วิธีเข้าถึงสภาพแวดล้อม Cloud Composer ผ่านคอนโซล Google Cloud Platform, Cloud SDK และเว็บอินเทอร์เฟซของ Airflow

สิ่งที่ต้องมี

  • บัญชี GCP
  • ความรู้พื้นฐานเกี่ยวกับ CLI
  • ความเข้าใจเบื้องต้นเกี่ยวกับ Python

2. การตั้งค่า GCP

สร้างโปรเจ็กต์

เลือกหรือสร้างโปรเจ็กต์ Google Cloud Platform

จดรหัสโปรเจ็กต์ไว้ ซึ่งจะใช้ในขั้นตอนต่อๆ ไป

หากคุณกำลังสร้างโปรเจ็กต์ใหม่ รหัสโปรเจ็กต์จะอยู่ใต้ชื่อโปรเจ็กต์ในหน้าการสร้าง

หากสร้างโปรเจ็กต์ไว้แล้ว คุณจะดูรหัสได้ในหน้าแรกของคอนโซลในการ์ดข้อมูลโปรเจ็กต์

เปิดใช้ API

เปิดใช้ Cloud Composer, Cloud Dataproc และ Cloud Storage API เมื่อเปิดใช้แล้ว คุณไม่ต้องสนใจปุ่ม "ไปที่ข้อมูลเข้าสู่ระบบ" และไปยังขั้นตอนถัดไปของบทแนะนำ

สร้างสภาพแวดล้อมคอมโพสเซอร์

สร้างสภาพแวดล้อม Cloud Composer ด้วยการกำหนดค่าต่อไปนี้

  • ชื่อ: my-composer-environment
  • สถานที่ตั้ง: us-central1
  • โซน: us-central1-a

การกําหนดค่าอื่นๆ ทั้งหมดจะยังคงเป็นค่าเริ่มต้นได้ คลิก "สร้าง" ที่ด้านล่าง

สร้างที่เก็บข้อมูล Cloud Storage

สร้างที่เก็บข้อมูล Cloud Storage ในโปรเจ็กต์โดยกำหนดค่าดังนี้

  • ชื่อ: <รหัสโปรเจ็กต์ของคุณ>
  • คลาสพื้นที่เก็บข้อมูลเริ่มต้น: หลายภูมิภาค
  • สถานที่: สหรัฐอเมริกา
  • โมเดลการควบคุมการเข้าถึง: แบบละเอียด

กด "สร้าง" เมื่อพร้อมแล้ว

3. การตั้งค่า Apache Airflow

การดูข้อมูลสภาพแวดล้อมของคอมโพสเซอร์

ในคอนโซล GCP ให้เปิดหน้าสภาพแวดล้อม

คลิกชื่อของสภาพแวดล้อมเพื่อดูรายละเอียด

หน้ารายละเอียดสภาพแวดล้อมจะแสดงข้อมูล เช่น URL ของอินเทอร์เฟซเว็บ Airflow, รหัสคลัสเตอร์ของ Google Kubernetes Engine, ชื่อที่เก็บข้อมูล Cloud Storage และเส้นทางสำหรับโฟลเดอร์ /dags

ใน Airflow DAG (กราฟแบบ Acyclic โดยตรง) คือคอลเล็กชันของงานที่มีการจัดระเบียบซึ่งคุณต้องการกำหนดเวลาและเรียกใช้ DAG หรือที่เรียกว่าเวิร์กโฟลว์จะกำหนดไว้ในไฟล์ Python มาตรฐาน Cloud Composer จะกำหนดเวลา DAG ในโฟลเดอร์ /dags เท่านั้น โฟลเดอร์ /dags อยู่ในที่เก็บข้อมูล Cloud Storage ที่ Cloud Composer สร้างขึ้นโดยอัตโนมัติเมื่อคุณสร้างสภาพแวดล้อม

การตั้งค่าตัวแปรสภาพแวดล้อม Apache Airflow

ตัวแปร Apache Airflow เป็นแนวคิดเฉพาะ Airflow ซึ่งแตกต่างจากตัวแปรสภาพแวดล้อม ในขั้นตอนนี้ คุณจะตั้งค่าตัวแปร Airflow 3 รายการ ได้แก่ gcp_project, gcs_bucket และ gce_zone

กำลังใช้ gcloud เพื่อตั้งค่าตัวแปร

ก่อนอื่น ให้เปิด Cloud Shell ซึ่งมี Cloud SDK ติดตั้งให้คุณได้อย่างสะดวก

ตั้งค่าตัวแปรสภาพแวดล้อม COMPOSER_INSTANCE เป็นชื่อสภาพแวดล้อมคอมโพสเซอร์

COMPOSER_INSTANCE=my-composer-environment

หากต้องการตั้งค่าตัวแปร Airflow โดยใช้เครื่องมือบรรทัดคำสั่ง gcloud ให้ใช้คำสั่ง gcloud composer environments run ที่มีคำสั่งย่อย variables คำสั่ง gcloud composer นี้จะเรียกใช้คำสั่งย่อย variables ของ Airflow CLI คำสั่งย่อยจะส่งอาร์กิวเมนต์ไปยังเครื่องมือบรรทัดคำสั่ง gcloud

คุณจะต้องเรียกใช้คำสั่งนี้ 3 ครั้ง โดยแทนที่ตัวแปรด้วยตัวแปรที่เกี่ยวข้องกับโปรเจ็กต์ของคุณ

ตั้งค่า gcp_project โดยใช้คําสั่งต่อไปนี้ โดยแทนที่ <your-project-id> ด้วยรหัสโปรเจ็กต์ที่คุณจดไว้ในระยะที่ 2

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

เอาต์พุตจะมีลักษณะดังนี้

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

ตั้งค่า gcs_bucket โดยใช้คําสั่งต่อไปนี้ โดยแทนที่ <your-bucket-name> ด้วยรหัสที่เก็บข้อมูลที่คุณจดไว้ในระยะที่ 2 หากคุณทำตามคำแนะนำของเรา ชื่อที่เก็บข้อมูลของคุณจะเหมือนกับรหัสโปรเจ็กต์ เอาต์พุตจะคล้ายกับคำสั่งก่อนหน้า

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

ตั้งค่า gce_zone โดยใช้คำสั่งต่อไปนี้ เอาต์พุตจะคล้ายกับคำสั่งก่อนหน้า

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(ไม่บังคับ) การใช้ gcloud เพื่อดูตัวแปร

หากต้องการดูค่าของตัวแปร ให้เรียกใช้คำสั่งย่อย Airflow CLI variables ด้วยอาร์กิวเมนต์ get หรือใช้ Airflow UI

เช่น

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

ซึ่งทำได้โดยใช้ตัวแปร 3 ตัวที่เพิ่งตั้งค่า ซึ่งได้แก่ gcp_project, gcs_bucket และ gce_zone

4. เวิร์กโฟลว์ตัวอย่าง

มาดูโค้ดสำหรับ DAG ที่เราจะใช้ในขั้นตอนที่ 5 กัน ไม่ต้องกังวลเกี่ยวกับการดาวน์โหลดไฟล์ในตอนนี้ เพียงทำตามขั้นตอนที่นี่

ที่นี่มีของมากมายให้คลายเครียด เรามาดูรายละเอียดกัน

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

เรามาเริ่มด้วยการนําเข้าข้อมูลใน Airflow กัน

  • airflow.models - อนุญาตให้เราเข้าถึงและสร้างข้อมูลในฐานข้อมูล Airflow
  • airflow.contrib.operators - ตำแหน่งที่โอเปอเรเตอร์จากชุมชนตั้งอยู่ ในกรณีนี้ เราต้องใช้ dataproc_operator เพื่อเข้าถึง Cloud Dataproc API
  • airflow.utils.trigger_rule - สำหรับเพิ่มกฎทริกเกอร์ในโอเปอเรเตอร์ กฎการเรียกใช้ช่วยให้ควบคุมได้อย่างละเอียดว่าโอเปอเรเตอร์ควรดำเนินการหรือไม่เมื่อเกี่ยวข้องกับสถานะของระดับบนสุด
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

ซึ่งจะระบุตำแหน่งของไฟล์เอาต์พุต บรรทัดที่น่าสนใจคือ models.Variable.get('gcs_bucket') ซึ่งจะดึงค่าตัวแปร gcs_bucket จากฐานข้อมูล Airflow

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - ตำแหน่งของไฟล์ .jar ที่เราจะเรียกใช้บนคลัสเตอร์ Cloud Dataproc ในที่สุด ซึ่งโฮสต์อยู่บน GCP สำหรับคุณอยู่แล้ว
  • input_file - ตำแหน่งของไฟล์ที่มีข้อมูลที่งาน Hadoop จะประมวลผลในท้ายที่สุด เราจะอัปโหลดข้อมูลไปยังตำแหน่งนั้นพร้อมกันในขั้นตอนที่ 5
  • wordcount_args - อาร์กิวเมนต์ที่เราจะส่งไปยังไฟล์ jar
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

ซึ่งจะทำให้ออบเจ็กต์วันที่และเวลาเทียบเท่ากับเที่ยงคืนของวันก่อนหน้า เช่น หากการดําเนินการนี้ทําเมื่อเวลา 11:00 น. ของวันที่ 4 มีนาคม ออบเจ็กต์วันที่และเวลาจะแสดงค่า 00:00 น. ของวันที่ 3 มีนาคม เรื่องนี้เกี่ยวข้องกับวิธีที่ Airflow จัดการการจัดตารางเวลา ดูข้อมูลเพิ่มเติมได้ที่นี่

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

ควรระบุตัวแปร default_dag_args ในรูปแบบพจนานุกรมเมื่อมีการสร้าง DAG ใหม่:

  • 'email_on_failure' - ระบุว่าควรส่งการแจ้งเตือนทางอีเมลเมื่องานล้มเหลวหรือไม่
  • 'email_on_retry' - ระบุว่าควรส่งการแจ้งเตือนทางอีเมลเมื่อมีการพยายามทำงานอีกครั้งหรือไม่
  • 'retries' - แสดงจำนวนครั้งที่ Airflow ควรลองใหม่ในกรณีที่ DAG ล้มเหลว
  • 'retry_delay' - แสดงระยะเวลาที่ Airflow ควรรอก่อนที่จะลองลองอีกครั้ง
  • 'project_id' - บอก DAG ว่าจะใช้รหัสโปรเจ็กต์ GCP ใดเพื่อเชื่อมโยง ซึ่งจำเป็นต้องใช้กับผู้ให้บริการ Dataproc ในภายหลัง
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

การใช้ with models.DAG จะบอกให้สคริปต์รวมทุกอย่างที่อยู่ด้านล่างไว้ใน DAG เดียวกัน และเห็นอาร์กิวเมนต์ 3 รายการที่ส่งเข้ามาด้วย ดังนี้

  • รายการแรกซึ่งเป็นสตริงคือชื่อที่จะตั้งให้กับ DAG ที่เรากําลังสร้าง ในกรณีนี้ เราใช้ composer_hadoop_tutorial
  • schedule_interval - ออบเจ็กต์ datetime.timedelta ซึ่งเราตั้งไว้ที่ 1 วัน ซึ่งหมายความว่า DAG นี้จะพยายามเรียกใช้วันละครั้งหลังจาก 'start_date' ซึ่งตั้งค่าไว้ก่อนหน้าใน 'default_dag_args'
  • default_args - พจนานุกรมที่เราสร้างขึ้นก่อนหน้านี้ซึ่งมีอาร์กิวเมนต์เริ่มต้นสําหรับ DAG

สร้างคลัสเตอร์ Dataproc

ต่อไป เราจะสร้าง dataproc_operator.DataprocClusterCreateOperator ซึ่งจะสร้างคลัสเตอร์ Cloud Dataproc

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

ภายในโอเปอเรเตอร์นี้ เราพบอาร์กิวเมนต์สองสามรายการ ยกเว้นอาร์กิวเมนต์แรกที่เฉพาะเจาะจงสำหรับโอเปอเรเตอร์นี้

  • task_id - เช่นเดียวกับใน BashOperator นี่คือชื่อที่เรากำหนดให้กับโอเปอเรเตอร์ ซึ่งสามารถดูได้จาก Airflow UI
  • cluster_name - ชื่อที่เรากำหนดให้กับคลัสเตอร์ Cloud Dataproc ในที่นี้ เราตั้งชื่อเป็น composer-hadoop-tutorial-cluster-{{ ds_nodash }} (ดูข้อมูลเพิ่มเติมในช่องข้อมูล)
  • num_workers - จํานวนผู้ปฏิบัติงานที่เราจัดสรรให้กับคลัสเตอร์ Cloud Dataproc
  • zone - ภูมิภาคทางภูมิศาสตร์ที่เราต้องการให้คลัสเตอร์ทำงานอยู่ตามที่บันทึกไว้ในฐานข้อมูล Airflow ซึ่งจะอ่านตัวแปร 'gce_zone' ที่เราตั้งค่าไว้ในขั้นตอนที่ 3
  • master_machine_type - ประเภทเครื่องที่เราต้องการจัดสรรให้กับต้นแบบ Cloud Dataproc
  • worker_machine_type - ประเภทของเครื่องที่เราต้องการจัดสรรให้กับผู้ปฏิบัติงาน Cloud Dataproc

ส่งงาน Apache Hadoop

dataproc_operator.DataProcHadoopOperator ช่วยให้เราส่งงานไปยังคลัสเตอร์ Cloud Dataproc ได้

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

เรามีพารามิเตอร์หลายรายการดังนี้

  • task_id - ชื่อที่เรากำหนดให้กับ DAG ส่วนนี้
  • main_jar - ตำแหน่งของไฟล์ .jar ที่ต้องการเรียกใช้กับคลัสเตอร์
  • cluster_name - ชื่อของคลัสเตอร์ที่จะเรียกใช้งาน ซึ่งคุณจะเห็นว่าเหมือนกับที่เราพบในโอเปอเรเตอร์ก่อนหน้า
  • arguments - อาร์กิวเมนต์ที่ส่งผ่านเข้าไปในไฟล์ jar ในลักษณะเดียวกับที่คุณจะดำเนินการกับไฟล์ .jar จากบรรทัดคำสั่ง

ลบคลัสเตอร์

ตัวดำเนินการสุดท้ายที่เราจะสร้างคือ dataproc_operator.DataprocClusterDeleteOperator

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

ตามที่ชื่อบอกไว้ โอเปอเรเตอร์นี้จะลบคลัสเตอร์ Cloud Dataproc ที่ระบุ เราเห็นอาร์กิวเมนต์ 3 รายการที่นี่

  • task_id - เช่นเดียวกับใน BashOperator นี่คือชื่อที่เรากำหนดให้กับโอเปอเรเตอร์ ซึ่งสามารถดูได้จาก Airflow UI
  • cluster_name - ชื่อที่เรากำหนดให้กับคลัสเตอร์ Cloud Dataproc ซึ่งเราได้ตั้งชื่อว่า composer-hadoop-tutorial-cluster-{{ ds_nodash }} (ดูข้อมูลเพิ่มเติมในส่วน "สร้างคลัสเตอร์ Dataproc" ได้ที่ช่องข้อมูล)
  • trigger_rule - เราพูดถึงกฎทริกเกอร์เป็นระยะเวลาสั้นๆ ระหว่างการนำเข้าเมื่อเริ่มต้นขั้นตอนนี้ แต่เรายังมีการดำเนินการอยู่ 1 รายการ โดยค่าเริ่มต้น โอเปอเรเตอร์ Airflow จะไม่ทำงานเว้นแต่โอเปอเรเตอร์อัปสตรีมทั้งหมดจะเสร็จสิ้น กฎทริกเกอร์ ALL_DONE กำหนดให้โอเปอเรเตอร์อัปสตรีมทั้งหมดต้องทำงานให้เสร็จสมบูรณ์เท่านั้น ไม่ว่าโอเปอเรเตอร์จะสำเร็จหรือไม่ก็ตาม ซึ่งหมายความว่าแม้ว่างาน Hadoop จะล้มเหลว แต่เราก็ยังต้องการคลัสเตอร์ดังกล่าวอยู่
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

สุดท้าย เราต้องการให้โอเปอเรเตอร์เหล่านี้ทำงานตามลำดับที่กำหนด และสามารถระบุได้โดยใช้โอเปอเรเตอร์บิตชิฟท์ของ Python ในกรณีนี้ create_dataproc_cluster จะทำงานก่อนเสมอ ตามด้วย run_dataproc_hadoop และสุดท้าย delete_dataproc_cluster

เมื่อรวมทั้งหมดเข้าด้วยกัน โค้ดจะมีลักษณะดังนี้

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. อัปโหลดไฟล์ Airflow ไปยัง Cloud Storage

คัดลอก DAG ไปยังโฟลเดอร์ /dags ของคุณ

  1. ก่อนอื่น ให้เปิด Cloud Shell ซึ่งมี Cloud SDK ติดตั้งให้คุณได้อย่างสะดวก
  2. โคลนที่เก็บตัวอย่าง Python และเปลี่ยนเป็นไดเรกทอรี Composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งชื่อโฟลเดอร์ DAG เป็นตัวแปรสภาพแวดล้อม
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. เรียกใช้คำสั่ง gsutil ต่อไปนี้เพื่อคัดลอกโค้ดบทแนะนำไปวางที่โฟลเดอร์ /dags ของคุณ
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

เอาต์พุตจะมีลักษณะดังนี้

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. การใช้ Airflow UI

วิธีเข้าถึงอินเทอร์เฟซเว็บ Airflow โดยใช้คอนโซล GCP

  1. เปิดหน้าสภาพแวดล้อม
  2. ในคอลัมน์ Airflow webserver สำหรับสภาพแวดล้อม ให้คลิกไอคอนหน้าต่างใหม่ UI เว็บ Airflow จะเปิดในหน้าต่างเบราว์เซอร์ใหม่

ดูข้อมูลเกี่ยวกับ UI ของ Airflow ได้ที่การเข้าถึงอินเทอร์เฟซเว็บ

ดูตัวแปร

ตัวแปรที่คุณตั้งไว้ก่อนหน้านี้จะยังคงอยู่ในสภาพแวดล้อม คุณดูตัวแปรได้โดยเลือกผู้ดูแลระบบ > ตัวแปรจากแถบเมนู UI ของ Airflow

เลือกแท็บรายการและแสดงตารางที่มีคีย์และค่าต่อไปนี้ คีย์: gcp_project, ค่า: project-id คีย์: gcs_bucket, ค่า: gs://bucket-name คีย์: gce_zone, ค่า: zone

การสำรวจการเรียกใช้ DAG

เมื่อคุณอัปโหลดไฟล์ DAG ไปยังโฟลเดอร์ dags ใน Cloud Storage แล้ว Cloud Composer จะแยกวิเคราะห์ไฟล์นั้น หากไม่พบข้อผิดพลาด ชื่อของเวิร์กโฟลว์จะปรากฏในรายการ DAG และเวิร์กโฟลว์จะอยู่ในคิวเพื่อเรียกใช้ทันที หากต้องการดู DAG ให้คลิก DAG ที่ด้านบนของหน้า

84a29c71f20bff98.png

คลิก composer_hadoop_tutorial เพื่อเปิดหน้ารายละเอียด DAG หน้านี้จะมีการนำเสนองานเวิร์กโฟลว์และทรัพยากร Dependency แบบกราฟิก

f4f1663c7a37f47c.png

ตอนนี้ให้คลิกมุมมองกราฟในแถบเครื่องมือ แล้ววางเมาส์เหนือกราฟของงานแต่ละรายการเพื่อดูสถานะ โปรดทราบว่าเส้นขอบรอบแต่ละงานจะระบุสถานะด้วย (เส้นขอบสีเขียว = ทำงานอยู่ สีแดง = ไม่ผ่าน เป็นต้น)

4c5a0c6fa9f88513.png

หากต้องการเรียกใช้เวิร์กโฟลว์อีกครั้งจากมุมมองกราฟ ให้ทำดังนี้

  1. ในมุมมองกราฟ UI ของ Airflow ให้คลิกกราฟิก create_dataproc_cluster
  2. คลิกล้างเพื่อรีเซ็ตงาน 3 รายการ แล้วคลิกตกลงเพื่อยืนยัน

fd1b23b462748f47.png

นอกจากนี้ คุณยังตรวจสอบสถานะและผลลัพธ์ของเวิร์กโฟลว์ composer-hadoop-tutorial ได้โดยไปที่หน้าคอนโซล GCP ต่อไปนี้

  • คลัสเตอร์ Dataproc ของ Cloud เพื่อตรวจสอบการสร้างและการลบคลัสเตอร์ โปรดทราบว่าคลัสเตอร์ที่สร้างโดยเวิร์กโฟลว์เป็นแบบชั่วคราว ซึ่งมีอยู่ในช่วงเวลาของเวิร์กโฟลว์เท่านั้นและจะถูกลบเป็นส่วนหนึ่งของงานเวิร์กโฟลว์ล่าสุด
  • งาน Cloud Dataproc เพื่อดูหรือตรวจสอบงานจำนวนคำของ Apache Hadoop คลิกรหัสงานเพื่อดูเอาต์พุตบันทึกงาน
  • Cloud Storage Browser เพื่อดูผลลัพธ์ของจํานวนคำในโฟลเดอร์ wordcount ในที่เก็บข้อมูล Cloud Storage ที่คุณสร้างสําหรับโค้ดแล็บนี้

7. ล้างข้อมูล

โปรดทำดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP สำหรับทรัพยากรที่ใช้ใน Codelab นี้

  1. (ไม่บังคับ) หากต้องการบันทึกข้อมูล ให้ดาวน์โหลดข้อมูลจากที่เก็บข้อมูล Cloud Storage สําหรับสภาพแวดล้อม Cloud Composer และที่เก็บข้อมูลที่คุณสร้างสําหรับโค้ดแล็บนี้
  2. ลบที่เก็บข้อมูล Cloud Storage ที่คุณสร้างสำหรับโค้ดแล็บนี้
  3. ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อม
  4. ลบสภาพแวดล้อม Cloud Composer โปรดทราบว่าการลบสภาพแวดล้อมจะไม่ลบที่เก็บข้อมูลของพื้นที่เก็บข้อมูลสำหรับสภาพแวดล้อมนั้น

นอกจากนี้ คุณยังลบโปรเจ็กต์ได้ด้วย โดยทำดังนี้

  1. ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
  2. ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
  3. ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเพื่อลบโปรเจ็กต์