1. บทนำ
เวิร์กโฟลว์เป็นกรณีการใช้งานที่พบบ่อยในการวิเคราะห์ข้อมูล ซึ่งเกี่ยวข้องกับการนำเข้า การเปลี่ยนรูปแบบ และการวิเคราะห์ข้อมูลเพื่อค้นหาข้อมูลที่สําคัญ ใน Google Cloud Platform เครื่องมือสำหรับจัดระเบียบเวิร์กโฟลว์คือ Cloud Composer ซึ่งเป็นเครื่องมือเวิร์กโฟลว์โอเพนซอร์สยอดนิยม Apache Airflow เวอร์ชันที่โฮสต์ ในห้องทดลองนี้ คุณจะใช้ Cloud Composer เพื่อสร้างเวิร์กโฟลว์แบบง่ายๆ ที่สร้างคลัสเตอร์ Cloud Dataproc, วิเคราะห์โดยใช้ Cloud Dataproc และ Apache Hadoop จากนั้นจึงลบคลัสเตอร์ Cloud Dataproc ในภายหลัง
Cloud Composer คืออะไร
Cloud Composer เป็นบริการเวิร์กโฟลว์แบบจัดการเป็นกลุ่มอย่างครบวงจรที่ช่วยให้คุณเขียน กำหนดเวลา และตรวจสอบไปป์ไลน์ที่ครอบคลุมทั้งระบบคลาวด์และศูนย์ข้อมูลในองค์กร Cloud Composer สร้างขึ้นจากโปรเจ็กต์โอเพนซอร์ส Apache Airflow ยอดนิยมและดำเนินการโดยใช้ภาษาการเขียนโปรแกรม Python ทำให้ปราศจากการผูกมัดและใช้งานง่าย
การใช้ Cloud Composer แทนอินสแตนซ์ในเครื่องของ Apache Airflow จะช่วยให้ผู้ใช้ได้รับประโยชน์จาก Airflow ที่ดีที่สุดโดยไม่ต้องติดตั้งหรือมีค่าใช้จ่ายในการจัดการ
Apache Airflow คืออะไร
Apache Airflow เป็นเครื่องมือโอเพนซอร์สที่ใช้สำหรับเขียน กำหนดเวลา และตรวจสอบเวิร์กโฟลว์โดยใช้โปรแกรม คําศัพท์สําคัญบางคําเกี่ยวกับ Airflow ที่คุณจะเห็นตลอดทั้งห้องทดลองมีดังนี้
- DAG - DAG (กราฟแบบ Acyclic โดยตรง) คือคอลเล็กชันของงานที่มีการจัดระเบียบซึ่งคุณต้องการกำหนดเวลาและเรียกใช้ DAG หรือที่เรียกว่าเวิร์กโฟลว์ จะกำหนดในไฟล์ Python มาตรฐาน
- โอเปอเรเตอร์ - โอเปอเรเตอร์อธิบายงานเดียวในเวิร์กโฟลว์
Cloud Dataproc คืออะไร
Cloud Dataproc เป็นบริการ Apache Spark และ Apache Hadoop ที่มีการจัดการโดยสมบูรณ์ของ Google Cloud Platform Cloud Dataproc ผสานรวมกับบริการอื่นๆ ของ GCP ได้อย่างง่ายดาย จึงเป็นแพลตฟอร์มที่มีประสิทธิภาพและสมบูรณ์แบบสําหรับการประมวลผลข้อมูล การวิเคราะห์ และแมชชีนเลิร์นนิง
สิ่งที่คุณต้องทำ
Codelab นี้แสดงวิธีสร้างและเรียกใช้เวิร์กโฟลว์ Apache Airflow ใน Cloud Composer ซึ่งทํางานต่อไปนี้ให้เสร็จสมบูรณ์
- สร้างคลัสเตอร์ Cloud Dataproc
- เรียกใช้งานนับคำของ Apache Hadoop ในคลัสเตอร์ และแสดงผลลัพธ์ไปยัง Cloud Storage
- ลบคลัสเตอร์
สิ่งที่คุณจะได้เรียนรู้
- วิธีสร้างและเรียกใช้เวิร์กโฟลว์ Apache Airflow ใน Cloud Composer
- วิธีใช้ Cloud Composer และ Cloud Dataproc เพื่อเรียกใช้การวิเคราะห์บนชุดข้อมูล
- วิธีเข้าถึงสภาพแวดล้อม Cloud Composer ผ่านคอนโซล Google Cloud Platform, Cloud SDK และเว็บอินเทอร์เฟซของ Airflow
สิ่งที่ต้องมี
- บัญชี GCP
- ความรู้พื้นฐานเกี่ยวกับ CLI
- ความเข้าใจเบื้องต้นเกี่ยวกับ Python
2. การตั้งค่า GCP
สร้างโปรเจ็กต์
เลือกหรือสร้างโปรเจ็กต์ Google Cloud Platform
จดรหัสโปรเจ็กต์ไว้ ซึ่งจะใช้ในขั้นตอนต่อๆ ไป
หากคุณกำลังสร้างโปรเจ็กต์ใหม่ รหัสโปรเจ็กต์จะอยู่ใต้ชื่อโปรเจ็กต์ในหน้าการสร้าง | |
หากสร้างโปรเจ็กต์ไว้แล้ว คุณจะดูรหัสได้ในหน้าแรกของคอนโซลในการ์ดข้อมูลโปรเจ็กต์ |
เปิดใช้ API
เปิดใช้ Cloud Composer, Cloud Dataproc และ Cloud Storage API เมื่อเปิดใช้แล้ว คุณไม่ต้องสนใจปุ่ม "ไปที่ข้อมูลเข้าสู่ระบบ" และไปยังขั้นตอนถัดไปของบทแนะนำ |
สร้างสภาพแวดล้อมคอมโพสเซอร์
สร้างสภาพแวดล้อม Cloud Composer ด้วยการกำหนดค่าต่อไปนี้
การกําหนดค่าอื่นๆ ทั้งหมดจะยังคงเป็นค่าเริ่มต้นได้ คลิก "สร้าง" ที่ด้านล่าง |
สร้างที่เก็บข้อมูล Cloud Storage
สร้างที่เก็บข้อมูล Cloud Storage ในโปรเจ็กต์โดยกำหนดค่าดังนี้
กด "สร้าง" เมื่อพร้อมแล้ว |
3. การตั้งค่า Apache Airflow
การดูข้อมูลสภาพแวดล้อมของคอมโพสเซอร์
ในคอนโซล GCP ให้เปิดหน้าสภาพแวดล้อม
คลิกชื่อของสภาพแวดล้อมเพื่อดูรายละเอียด
หน้ารายละเอียดสภาพแวดล้อมจะแสดงข้อมูล เช่น URL ของอินเทอร์เฟซเว็บ Airflow, รหัสคลัสเตอร์ของ Google Kubernetes Engine, ชื่อที่เก็บข้อมูล Cloud Storage และเส้นทางสำหรับโฟลเดอร์ /dags
ใน Airflow DAG (กราฟแบบ Acyclic โดยตรง) คือคอลเล็กชันของงานที่มีการจัดระเบียบซึ่งคุณต้องการกำหนดเวลาและเรียกใช้ DAG หรือที่เรียกว่าเวิร์กโฟลว์จะกำหนดไว้ในไฟล์ Python มาตรฐาน Cloud Composer จะกำหนดเวลา DAG ในโฟลเดอร์ /dags เท่านั้น โฟลเดอร์ /dags อยู่ในที่เก็บข้อมูล Cloud Storage ที่ Cloud Composer สร้างขึ้นโดยอัตโนมัติเมื่อคุณสร้างสภาพแวดล้อม
การตั้งค่าตัวแปรสภาพแวดล้อม Apache Airflow
ตัวแปร Apache Airflow เป็นแนวคิดเฉพาะ Airflow ซึ่งแตกต่างจากตัวแปรสภาพแวดล้อม ในขั้นตอนนี้ คุณจะตั้งค่าตัวแปร Airflow 3 รายการ ได้แก่ gcp_project
, gcs_bucket
และ gce_zone
กำลังใช้ gcloud
เพื่อตั้งค่าตัวแปร
ก่อนอื่น ให้เปิด Cloud Shell ซึ่งมี Cloud SDK ติดตั้งให้คุณได้อย่างสะดวก
ตั้งค่าตัวแปรสภาพแวดล้อม COMPOSER_INSTANCE
เป็นชื่อสภาพแวดล้อมคอมโพสเซอร์
COMPOSER_INSTANCE=my-composer-environment
หากต้องการตั้งค่าตัวแปร Airflow โดยใช้เครื่องมือบรรทัดคำสั่ง gcloud ให้ใช้คำสั่ง gcloud composer environments run
ที่มีคำสั่งย่อย variables
คำสั่ง gcloud composer
นี้จะเรียกใช้คำสั่งย่อย variables
ของ Airflow CLI คำสั่งย่อยจะส่งอาร์กิวเมนต์ไปยังเครื่องมือบรรทัดคำสั่ง gcloud
คุณจะต้องเรียกใช้คำสั่งนี้ 3 ครั้ง โดยแทนที่ตัวแปรด้วยตัวแปรที่เกี่ยวข้องกับโปรเจ็กต์ของคุณ
ตั้งค่า gcp_project
โดยใช้คําสั่งต่อไปนี้ โดยแทนที่ <your-project-id> ด้วยรหัสโปรเจ็กต์ที่คุณจดไว้ในระยะที่ 2
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
เอาต์พุตจะมีลักษณะดังนี้
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
ตั้งค่า gcs_bucket
โดยใช้คําสั่งต่อไปนี้ โดยแทนที่ <your-bucket-name>
ด้วยรหัสที่เก็บข้อมูลที่คุณจดไว้ในระยะที่ 2 หากคุณทำตามคำแนะนำของเรา ชื่อที่เก็บข้อมูลของคุณจะเหมือนกับรหัสโปรเจ็กต์ เอาต์พุตจะคล้ายกับคำสั่งก่อนหน้า
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
ตั้งค่า gce_zone
โดยใช้คำสั่งต่อไปนี้ เอาต์พุตจะคล้ายกับคำสั่งก่อนหน้า
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(ไม่บังคับ) การใช้ gcloud
เพื่อดูตัวแปร
หากต้องการดูค่าของตัวแปร ให้เรียกใช้คำสั่งย่อย Airflow CLI variables
ด้วยอาร์กิวเมนต์ get
หรือใช้ Airflow UI
เช่น
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
ซึ่งทำได้โดยใช้ตัวแปร 3 ตัวที่เพิ่งตั้งค่า ซึ่งได้แก่ gcp_project
, gcs_bucket
และ gce_zone
4. เวิร์กโฟลว์ตัวอย่าง
มาดูโค้ดสำหรับ DAG ที่เราจะใช้ในขั้นตอนที่ 5 กัน ไม่ต้องกังวลเกี่ยวกับการดาวน์โหลดไฟล์ในตอนนี้ เพียงทำตามขั้นตอนที่นี่
ที่นี่มีของมากมายให้คลายเครียด เรามาดูรายละเอียดกัน
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
เรามาเริ่มด้วยการนําเข้าข้อมูลใน Airflow กัน
airflow.models
- อนุญาตให้เราเข้าถึงและสร้างข้อมูลในฐานข้อมูล Airflowairflow.contrib.operators
- ตำแหน่งที่โอเปอเรเตอร์จากชุมชนตั้งอยู่ ในกรณีนี้ เราต้องใช้dataproc_operator
เพื่อเข้าถึง Cloud Dataproc APIairflow.utils.trigger_rule
- สำหรับเพิ่มกฎทริกเกอร์ในโอเปอเรเตอร์ กฎการเรียกใช้ช่วยให้ควบคุมได้อย่างละเอียดว่าโอเปอเรเตอร์ควรดำเนินการหรือไม่เมื่อเกี่ยวข้องกับสถานะของระดับบนสุด
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
ซึ่งจะระบุตำแหน่งของไฟล์เอาต์พุต บรรทัดที่น่าสนใจคือ models.Variable.get('gcs_bucket')
ซึ่งจะดึงค่าตัวแปร gcs_bucket
จากฐานข้อมูล Airflow
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
- ตำแหน่งของไฟล์ .jar ที่เราจะเรียกใช้บนคลัสเตอร์ Cloud Dataproc ในที่สุด ซึ่งโฮสต์อยู่บน GCP สำหรับคุณอยู่แล้วinput_file
- ตำแหน่งของไฟล์ที่มีข้อมูลที่งาน Hadoop จะประมวลผลในท้ายที่สุด เราจะอัปโหลดข้อมูลไปยังตำแหน่งนั้นพร้อมกันในขั้นตอนที่ 5wordcount_args
- อาร์กิวเมนต์ที่เราจะส่งไปยังไฟล์ jar
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
ซึ่งจะทำให้ออบเจ็กต์วันที่และเวลาเทียบเท่ากับเที่ยงคืนของวันก่อนหน้า เช่น หากการดําเนินการนี้ทําเมื่อเวลา 11:00 น. ของวันที่ 4 มีนาคม ออบเจ็กต์วันที่และเวลาจะแสดงค่า 00:00 น. ของวันที่ 3 มีนาคม เรื่องนี้เกี่ยวข้องกับวิธีที่ Airflow จัดการการจัดตารางเวลา ดูข้อมูลเพิ่มเติมได้ที่นี่
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
ควรระบุตัวแปร default_dag_args
ในรูปแบบพจนานุกรมเมื่อมีการสร้าง DAG ใหม่:
'email_on_failure'
- ระบุว่าควรส่งการแจ้งเตือนทางอีเมลเมื่องานล้มเหลวหรือไม่'email_on_retry'
- ระบุว่าควรส่งการแจ้งเตือนทางอีเมลเมื่อมีการพยายามทำงานอีกครั้งหรือไม่'retries'
- แสดงจำนวนครั้งที่ Airflow ควรลองใหม่ในกรณีที่ DAG ล้มเหลว'retry_delay'
- แสดงระยะเวลาที่ Airflow ควรรอก่อนที่จะลองลองอีกครั้ง'project_id'
- บอก DAG ว่าจะใช้รหัสโปรเจ็กต์ GCP ใดเพื่อเชื่อมโยง ซึ่งจำเป็นต้องใช้กับผู้ให้บริการ Dataproc ในภายหลัง
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
การใช้ with models.DAG
จะบอกให้สคริปต์รวมทุกอย่างที่อยู่ด้านล่างไว้ใน DAG เดียวกัน และเห็นอาร์กิวเมนต์ 3 รายการที่ส่งเข้ามาด้วย ดังนี้
- รายการแรกซึ่งเป็นสตริงคือชื่อที่จะตั้งให้กับ DAG ที่เรากําลังสร้าง ในกรณีนี้ เราใช้
composer_hadoop_tutorial
schedule_interval
- ออบเจ็กต์datetime.timedelta
ซึ่งเราตั้งไว้ที่ 1 วัน ซึ่งหมายความว่า DAG นี้จะพยายามเรียกใช้วันละครั้งหลังจาก'start_date'
ซึ่งตั้งค่าไว้ก่อนหน้าใน'default_dag_args'
default_args
- พจนานุกรมที่เราสร้างขึ้นก่อนหน้านี้ซึ่งมีอาร์กิวเมนต์เริ่มต้นสําหรับ DAG
สร้างคลัสเตอร์ Dataproc
ต่อไป เราจะสร้าง dataproc_operator.DataprocClusterCreateOperator
ซึ่งจะสร้างคลัสเตอร์ Cloud Dataproc
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
ภายในโอเปอเรเตอร์นี้ เราพบอาร์กิวเมนต์สองสามรายการ ยกเว้นอาร์กิวเมนต์แรกที่เฉพาะเจาะจงสำหรับโอเปอเรเตอร์นี้
task_id
- เช่นเดียวกับใน BashOperator นี่คือชื่อที่เรากำหนดให้กับโอเปอเรเตอร์ ซึ่งสามารถดูได้จาก Airflow UIcluster_name
- ชื่อที่เรากำหนดให้กับคลัสเตอร์ Cloud Dataproc ในที่นี้ เราตั้งชื่อเป็นcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(ดูข้อมูลเพิ่มเติมในช่องข้อมูล)num_workers
- จํานวนผู้ปฏิบัติงานที่เราจัดสรรให้กับคลัสเตอร์ Cloud Dataproczone
- ภูมิภาคทางภูมิศาสตร์ที่เราต้องการให้คลัสเตอร์ทำงานอยู่ตามที่บันทึกไว้ในฐานข้อมูล Airflow ซึ่งจะอ่านตัวแปร'gce_zone'
ที่เราตั้งค่าไว้ในขั้นตอนที่ 3master_machine_type
- ประเภทเครื่องที่เราต้องการจัดสรรให้กับต้นแบบ Cloud Dataprocworker_machine_type
- ประเภทของเครื่องที่เราต้องการจัดสรรให้กับผู้ปฏิบัติงาน Cloud Dataproc
ส่งงาน Apache Hadoop
dataproc_operator.DataProcHadoopOperator
ช่วยให้เราส่งงานไปยังคลัสเตอร์ Cloud Dataproc ได้
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
เรามีพารามิเตอร์หลายรายการดังนี้
task_id
- ชื่อที่เรากำหนดให้กับ DAG ส่วนนี้main_jar
- ตำแหน่งของไฟล์ .jar ที่ต้องการเรียกใช้กับคลัสเตอร์cluster_name
- ชื่อของคลัสเตอร์ที่จะเรียกใช้งาน ซึ่งคุณจะเห็นว่าเหมือนกับที่เราพบในโอเปอเรเตอร์ก่อนหน้าarguments
- อาร์กิวเมนต์ที่ส่งผ่านเข้าไปในไฟล์ jar ในลักษณะเดียวกับที่คุณจะดำเนินการกับไฟล์ .jar จากบรรทัดคำสั่ง
ลบคลัสเตอร์
ตัวดำเนินการสุดท้ายที่เราจะสร้างคือ dataproc_operator.DataprocClusterDeleteOperator
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
ตามที่ชื่อบอกไว้ โอเปอเรเตอร์นี้จะลบคลัสเตอร์ Cloud Dataproc ที่ระบุ เราเห็นอาร์กิวเมนต์ 3 รายการที่นี่
task_id
- เช่นเดียวกับใน BashOperator นี่คือชื่อที่เรากำหนดให้กับโอเปอเรเตอร์ ซึ่งสามารถดูได้จาก Airflow UIcluster_name
- ชื่อที่เรากำหนดให้กับคลัสเตอร์ Cloud Dataproc ซึ่งเราได้ตั้งชื่อว่าcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(ดูข้อมูลเพิ่มเติมในส่วน "สร้างคลัสเตอร์ Dataproc" ได้ที่ช่องข้อมูล)trigger_rule
- เราพูดถึงกฎทริกเกอร์เป็นระยะเวลาสั้นๆ ระหว่างการนำเข้าเมื่อเริ่มต้นขั้นตอนนี้ แต่เรายังมีการดำเนินการอยู่ 1 รายการ โดยค่าเริ่มต้น โอเปอเรเตอร์ Airflow จะไม่ทำงานเว้นแต่โอเปอเรเตอร์อัปสตรีมทั้งหมดจะเสร็จสิ้น กฎทริกเกอร์ALL_DONE
กำหนดให้โอเปอเรเตอร์อัปสตรีมทั้งหมดต้องทำงานให้เสร็จสมบูรณ์เท่านั้น ไม่ว่าโอเปอเรเตอร์จะสำเร็จหรือไม่ก็ตาม ซึ่งหมายความว่าแม้ว่างาน Hadoop จะล้มเหลว แต่เราก็ยังต้องการคลัสเตอร์ดังกล่าวอยู่
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
สุดท้าย เราต้องการให้โอเปอเรเตอร์เหล่านี้ทำงานตามลำดับที่กำหนด และสามารถระบุได้โดยใช้โอเปอเรเตอร์บิตชิฟท์ของ Python ในกรณีนี้ create_dataproc_cluster
จะทำงานก่อนเสมอ ตามด้วย run_dataproc_hadoop
และสุดท้าย delete_dataproc_cluster
เมื่อรวมทั้งหมดเข้าด้วยกัน โค้ดจะมีลักษณะดังนี้
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. อัปโหลดไฟล์ Airflow ไปยัง Cloud Storage
คัดลอก DAG ไปยังโฟลเดอร์ /dags ของคุณ
- ก่อนอื่น ให้เปิด Cloud Shell ซึ่งมี Cloud SDK ติดตั้งให้คุณได้อย่างสะดวก
- โคลนที่เก็บตัวอย่าง Python และเปลี่ยนเป็นไดเรกทอรี Composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- เรียกใช้คำสั่งต่อไปนี้เพื่อตั้งชื่อโฟลเดอร์ DAG เป็นตัวแปรสภาพแวดล้อม
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- เรียกใช้คำสั่ง
gsutil
ต่อไปนี้เพื่อคัดลอกโค้ดบทแนะนำไปวางที่โฟลเดอร์ /dags ของคุณ
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
เอาต์พุตจะมีลักษณะดังนี้
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. การใช้ Airflow UI
วิธีเข้าถึงอินเทอร์เฟซเว็บ Airflow โดยใช้คอนโซล GCP
|
ดูข้อมูลเกี่ยวกับ UI ของ Airflow ได้ที่การเข้าถึงอินเทอร์เฟซเว็บ
ดูตัวแปร
ตัวแปรที่คุณตั้งไว้ก่อนหน้านี้จะยังคงอยู่ในสภาพแวดล้อม คุณดูตัวแปรได้โดยเลือกผู้ดูแลระบบ > ตัวแปรจากแถบเมนู UI ของ Airflow
การสำรวจการเรียกใช้ DAG
เมื่อคุณอัปโหลดไฟล์ DAG ไปยังโฟลเดอร์ dags
ใน Cloud Storage แล้ว Cloud Composer จะแยกวิเคราะห์ไฟล์นั้น หากไม่พบข้อผิดพลาด ชื่อของเวิร์กโฟลว์จะปรากฏในรายการ DAG และเวิร์กโฟลว์จะอยู่ในคิวเพื่อเรียกใช้ทันที หากต้องการดู DAG ให้คลิก DAG ที่ด้านบนของหน้า
คลิก composer_hadoop_tutorial
เพื่อเปิดหน้ารายละเอียด DAG หน้านี้จะมีการนำเสนองานเวิร์กโฟลว์และทรัพยากร Dependency แบบกราฟิก
ตอนนี้ให้คลิกมุมมองกราฟในแถบเครื่องมือ แล้ววางเมาส์เหนือกราฟของงานแต่ละรายการเพื่อดูสถานะ โปรดทราบว่าเส้นขอบรอบแต่ละงานจะระบุสถานะด้วย (เส้นขอบสีเขียว = ทำงานอยู่ สีแดง = ไม่ผ่าน เป็นต้น)
หากต้องการเรียกใช้เวิร์กโฟลว์อีกครั้งจากมุมมองกราฟ ให้ทำดังนี้
- ในมุมมองกราฟ UI ของ Airflow ให้คลิกกราฟิก
create_dataproc_cluster
- คลิกล้างเพื่อรีเซ็ตงาน 3 รายการ แล้วคลิกตกลงเพื่อยืนยัน
นอกจากนี้ คุณยังตรวจสอบสถานะและผลลัพธ์ของเวิร์กโฟลว์ composer-hadoop-tutorial
ได้โดยไปที่หน้าคอนโซล GCP ต่อไปนี้
- คลัสเตอร์ Dataproc ของ Cloud เพื่อตรวจสอบการสร้างและการลบคลัสเตอร์ โปรดทราบว่าคลัสเตอร์ที่สร้างโดยเวิร์กโฟลว์เป็นแบบชั่วคราว ซึ่งมีอยู่ในช่วงเวลาของเวิร์กโฟลว์เท่านั้นและจะถูกลบเป็นส่วนหนึ่งของงานเวิร์กโฟลว์ล่าสุด
- งาน Cloud Dataproc เพื่อดูหรือตรวจสอบงานจำนวนคำของ Apache Hadoop คลิกรหัสงานเพื่อดูเอาต์พุตบันทึกงาน
- Cloud Storage Browser เพื่อดูผลลัพธ์ของจํานวนคำในโฟลเดอร์
wordcount
ในที่เก็บข้อมูล Cloud Storage ที่คุณสร้างสําหรับโค้ดแล็บนี้
7. ล้างข้อมูล
โปรดทำดังนี้เพื่อเลี่ยงไม่ให้เกิดการเรียกเก็บเงินกับบัญชี GCP สำหรับทรัพยากรที่ใช้ใน Codelab นี้
- (ไม่บังคับ) หากต้องการบันทึกข้อมูล ให้ดาวน์โหลดข้อมูลจากที่เก็บข้อมูล Cloud Storage สําหรับสภาพแวดล้อม Cloud Composer และที่เก็บข้อมูลที่คุณสร้างสําหรับโค้ดแล็บนี้
- ลบที่เก็บข้อมูล Cloud Storage ที่คุณสร้างสำหรับโค้ดแล็บนี้
- ลบที่เก็บข้อมูล Cloud Storage สำหรับสภาพแวดล้อม
- ลบสภาพแวดล้อม Cloud Composer โปรดทราบว่าการลบสภาพแวดล้อมจะไม่ลบที่เก็บข้อมูลของพื้นที่เก็บข้อมูลสำหรับสภาพแวดล้อมนั้น
นอกจากนี้ คุณยังลบโปรเจ็กต์ได้ด้วย โดยทำดังนี้
- ในคอนโซล GCP ให้ไปที่หน้าโปรเจ็กต์
- ในรายการโปรเจ็กต์ ให้เลือกโปรเจ็กต์ที่ต้องการลบ แล้วคลิกลบ
- ในช่อง ให้พิมพ์รหัสโปรเจ็กต์ แล้วคลิกปิดเพื่อลบโปรเจ็กต์