1. Giới thiệu
Quy trình công việc là một trường hợp sử dụng phổ biến trong hoạt động phân tích dữ liệu. Quy trình này bao gồm việc nhập, chuyển đổi và phân tích dữ liệu để tìm thông tin có ý nghĩa trong đó. Trong Google Cloud Platform, công cụ điều phối quy trình công việc là Cloud Composer. Đây là phiên bản được lưu trữ của công cụ quy trình công việc nguồn mở phổ biến Apache Airflow. Trong lớp học này, bạn sẽ sử dụng Cloud Composer để tạo một quy trình công việc đơn giản giúp tạo cụm Cloud Dataproc, phân tích cụm đó bằng Cloud Dataproc và Apache Hadoop, sau đó xoá cụm Cloud Dataproc.
Cloud Composer là gì?
Cloud Composer là một dịch vụ điều phối quy trình công việc được quản lý toàn diện, giúp bạn tạo, lên lịch và giám sát các quy trình trải dài trên đám mây và trung tâm dữ liệu tại chỗ. Được xây dựng dựa trên dự án nguồn mở Apache Airflow phổ biến và hoạt động bằng ngôn ngữ lập trình Python, Cloud Composer không bị phụ thuộc và dễ sử dụng.
Khi sử dụng Cloud Composer thay vì phiên bản cục bộ của Apache Airflow, người dùng có thể hưởng lợi từ những tính năng tốt nhất của Airflow mà không phải tốn chi phí cài đặt hay quản lý.
Apache Airflow là gì?
Apache Airflow là một công cụ nguồn mở dùng để tạo, lên lịch và giám sát quy trình công việc theo phương thức lập trình. Có một số thuật ngữ quan trọng cần nhớ liên quan đến Luồng khí mà bạn sẽ thấy trong suốt phòng thí nghiệm:
- DAG – DAG (Đồ thị không chu trình có hướng) là một tập hợp các tác vụ được sắp xếp mà bạn muốn lên lịch và chạy. DAG (còn gọi là quy trình công việc) được xác định trong các tệp Python chuẩn
- Toán tử – toán tử mô tả một tác vụ duy nhất trong quy trình làm việc
Cloud Dataproc là gì?
Cloud Dataproc là dịch vụ Apache Spark và Apache Hadoop của Google Cloud Platform được quản lý hoàn toàn. Cloud Dataproc dễ dàng tích hợp với các dịch vụ khác của GCP, mang đến cho bạn một nền tảng toàn diện và mạnh mẽ cho việc xử lý dữ liệu, phân tích và học máy.
Việc bạn sẽ làm
Lớp học lập trình này hướng dẫn bạn cách tạo và chạy quy trình công việc Apache Airflow trong Cloud Composer để hoàn thành các nhiệm vụ sau:
- Tạo cụm Cloud Dataproc
- Chạy một công việc đếm từ Apache Hadoop trên cụm và xuất kết quả của công việc đó vào Cloud Storage
- Xoá cụm
Kiến thức bạn sẽ học được
- Cách tạo và chạy quy trình công việc Apache Airflow trong Cloud Composer
- Cách sử dụng Cloud Composer và Cloud Dataproc để chạy bản phân tích trên một tập dữ liệu
- Cách truy cập vào môi trường Cloud Composer thông qua Bảng điều khiển Google Cloud Platform, Cloud SDK và giao diện web Airflow
Bạn cần có
- Tài khoản GCP
- Kiến thức cơ bản về CLI
- Hiểu biết cơ bản về Python
2. Thiết lập GCP
Tạo dự án
Chọn hoặc tạo một Dự án Google Cloud Platform.
Ghi lại mã dự án mà bạn sẽ sử dụng trong các bước sau.
Nếu đang tạo một dự án mới, bạn sẽ thấy mã dự án ngay bên dưới Tên dự án trên trang tạo | |
Nếu đã tạo dự án, bạn có thể tìm mã nhận dạng trên trang chủ của bảng điều khiển trong thẻ Thông tin dự án |
Bật API
Bật các API Cloud Composer, Cloud Dataproc và Cloud Storage.Sau khi bật các API này, bạn có thể bỏ qua nút có nội dung "Go to Credentials" (Truy cập vào thông tin xác thực) và chuyển sang bước tiếp theo của hướng dẫn. |
Tạo môi trường Composer
Tạo môi trường Cloud Composer bằng cấu hình sau:
Tất cả cấu hình khác có thể giữ nguyên giá trị mặc định. Nhấp vào "Tạo" ở dưới cùng. |
Tạo bộ chứa Cloud Storage
Trong dự án của bạn, hãy tạo một bộ chứa Cloud Storage với cấu hình sau:
Nhấn "Tạo" khi bạn đã sẵn sàng |
3. Thiết lập Apache Airflow
Xem thông tin môi trường Compose
Trong Bảng điều khiển GCP, hãy mở trang Môi trường
Nhấp vào tên của môi trường để xem thông tin chi tiết.
Trang Thông tin chi tiết về môi trường cung cấp thông tin, chẳng hạn như URL giao diện web Airflow, mã nhận dạng cụm Google Kubernetes Engine, tên của bộ chứa Cloud Storage và đường dẫn đến thư mục /dags.
Trong Airflow, DAG (Đồ thị không chu trình có hướng) là một tập hợp các tác vụ được sắp xếp mà bạn muốn lên lịch và chạy. DAG (còn gọi là quy trình công việc) được xác định trong các tệp Python chuẩn. Cloud Composer chỉ lên lịch cho các DAG trong thư mục /dags. Thư mục /dags nằm trong bộ chứa Cloud Storage mà Cloud Composer tạo tự động khi bạn tạo môi trường.
Cài đặt biến môi trường luồng khí Apache
Biến Apache Airflow là một khái niệm dành riêng cho luồng Airflow, khác với các biến môi trường. Trong bước này, bạn sẽ thiết lập 3 biến Luồng không khí sau: gcp_project
, gcs_bucket
và gce_zone
.
Sử dụng gcloud
để Đặt biến
Trước tiên, hãy mở Cloud Shell. Cloud Shell đã cài đặt sẵn Cloud SDK cho bạn.
Đặt biến môi trường COMPOSER_INSTANCE
thành tên của môi trường trong Composer
COMPOSER_INSTANCE=my-composer-environment
Để đặt biến Airflow bằng công cụ dòng lệnh gcloud, hãy sử dụng lệnh gcloud composer environments run
với lệnh con variables
. Lệnh gcloud composer
này thực thi lệnh con CLI của Airflow variables
. Lệnh con sẽ truyền các đối số đến công cụ dòng lệnh gcloud
.
Bạn sẽ chạy lệnh này ba lần, thay thế các biến bằng các biến liên quan đến dự án của mình.
Đặt gcp_project
bằng lệnh sau, thay thế <your-project-id> bằng mã dự án mà bạn đã ghi chú ở Bước 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
Kết quả của bạn sẽ có dạng như sau
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
Đặt gcs_bucket
bằng lệnh sau đây, thay thế <your-bucket-name>
bằng mã nhóm mà bạn đã ghi chú trong Bước 2. Nếu bạn làm theo đề xuất của chúng tôi, tên bộ chứa của bạn sẽ giống với mã dự án. Kết quả sẽ tương tự như lệnh trước.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
Đặt gce_zone
bằng lệnh sau. Kết quả của bạn sẽ tương tự như các lệnh trước đó.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(Không bắt buộc) Sử dụng gcloud
để xem biến
Để xem giá trị của một biến, hãy chạy lệnh con CLI của Airflow variables
với đối số get
hoặc sử dụng Giao diện người dùng của Airflow.
Ví dụ:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
Bạn có thể thực hiện việc này bằng một trong 3 biến vừa đặt: gcp_project
, gcs_bucket
và gce_zone
.
4. Quy trình làm việc mẫu
Hãy xem mã cho DAG mà chúng ta sẽ sử dụng ở bước 5. Đừng lo về việc tải tệp xuống, bạn chỉ cần làm theo hướng dẫn tại đây.
Có rất nhiều điều cần tìm hiểu ở đây, vì vậy, hãy cùng phân tích một chút.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
Chúng ta bắt đầu bằng một số lệnh nhập Airflow:
airflow.models
– Cho phép chúng ta truy cập và tạo dữ liệu trong cơ sở dữ liệu Airflow.airflow.contrib.operators
– Nơi các nhà điều hành trong cộng đồng sinh sống. Trong trường hợp này, chúng ta cần códataproc_operator
để truy cập vào API Cloud Dataproc.airflow.utils.trigger_rule
– Để thêm quy tắc kích hoạt vào toán tử của chúng tôi. Quy tắc kích hoạt cho phép kiểm soát chi tiết việc một toán tử có nên thực thi hay không liên quan đến trạng thái của toán tử mẹ.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
Tham số này chỉ định vị trí của tệp đầu ra. Dòng đáng chú ý ở đây là models.Variable.get('gcs_bucket')
sẽ lấy giá trị biến gcs_bucket
từ cơ sở dữ liệu Airflow.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
– Vị trí của tệp .jar mà sau này chúng tôi sẽ chạy trên cụm Cloud Dataproc. Ứng dụng này đã được lưu trữ trên GCP cho bạn.input_file
– Vị trí của tệp chứa dữ liệu mà công việc Hadoop của chúng ta sẽ dùng để tính toán sau này. Chúng ta sẽ tải dữ liệu lên vị trí đó cùng nhau trong Bước 5.wordcount_args
– Các đối số mà chúng tôi sẽ chuyển vào tệp jar.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
Điều này sẽ cung cấp cho chúng ta một đối tượng ngày giờ tương đương đại diện cho nửa đêm của ngày trước đó. Chẳng hạn, nếu điều này được thực thi vào lúc 11:00 ngày 4 tháng 3, thì đối tượng ngày giờ sẽ đại diện cho 00:00 ngày 3 tháng 3. Điều này có liên quan đến cách Airflow xử lý việc lên lịch. Bạn có thể xem thêm thông tin về vấn đề này tại đây.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
Biến default_dag_args
ở dạng từ điển phải được cung cấp mỗi khi tạo DAG mới:
'email_on_failure'
– Cho biết có gửi cảnh báo qua email khi không thực hiện được một tác vụ hay không'email_on_retry'
– Cho biết liệu có gửi cảnh báo qua email khi thử lại một tác vụ hay không'retries'
– Cho biết số lần luồng khí cần thử lại trong trường hợp không thành công DAG'retry_delay'
– Cho biết thời gian Luồng khí phải đợi trước khi thử lại'project_id'
– Cho DAG biết mã dự án GCP cần liên kết, mã này sẽ cần thiết sau này với Toán tử Dataproc
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
Việc sử dụng with models.DAG
yêu cầu tập lệnh phải đưa mọi nội dung bên dưới vào cùng một DAG. Chúng ta cũng thấy 3 đối số được truyền vào:
- Chuỗi đầu tiên là tên đặt cho DAG mà chúng ta đang tạo. Trong trường hợp này, chúng ta sẽ sử dụng
composer_hadoop_tutorial
. schedule_interval
– Đối tượngdatetime.timedelta
mà chúng ta đã đặt thành một ngày. Tức là DAG này sẽ cố thực thi mỗi ngày một lần sau'start_date'
đã đặt trước đó trong'default_dag_args'
default_args
– Từ điển chúng ta tạo trước đó có chứa các đối số mặc định cho DAG
Tạo cụm Dataproc
Tiếp theo, chúng ta sẽ tạo một dataproc_operator.DataprocClusterCreateOperator
để tạo cụm Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
Trong toán tử này, chúng ta thấy một vài đối số, tất cả ngoại trừ đối số đầu tiên là dành riêng cho toán tử này:
task_id
– Giống như trong BashOperator, đây là tên mà chúng ta chỉ định cho toán tử. Bạn có thể xem tên này từ giao diện người dùng Airflowcluster_name
– Tên mà chúng ta chỉ định cho cụm Cloud Dataproc. Ở đây, chúng tôi đặt tên làcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(xem hộp thông tin để biết thông tin bổ sung không bắt buộc)num_workers
– Số lượng worker mà chúng tôi phân bổ cho cụm Cloud Dataproczone
– Khu vực địa lý mà chúng ta muốn đặt cụm, được lưu trong cơ sở dữ liệu Airflow. Thao tác này sẽ đọc biến'gce_zone'
mà chúng ta đã đặt ở Bước 3master_machine_type
– Loại máy mà chúng tôi muốn phân bổ cho máy chủ Cloud Dataprocworker_machine_type
– Loại máy mà chúng ta muốn phân bổ cho worker Cloud Dataproc
Gửi công việc Apache Hadoop
dataproc_operator.DataProcHadoopOperator
cho phép chúng ta gửi một công việc đến cụm Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
Chúng tôi cung cấp một vài thông số:
task_id
– Tên mà chúng tôi chỉ định cho phần này của DAGmain_jar
– Vị trí của tệp .jar mà chúng ta muốn chạy trên cụmcluster_name
– Tên của cụm để chạy công việc dựa trên đó mà bạn sẽ thấy giống với tên chúng ta tìm thấy trong toán tử trước đóarguments
– Các đối số được truyền vào tệp jar, như khi bạn thực thi tệp .jar từ dòng lệnh
Xoá cụm
Toán tử cuối cùng chúng ta sẽ tạo là dataproc_operator.DataprocClusterDeleteOperator
.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
Đúng như tên gọi, toán tử này sẽ xoá một cụm Cloud Dataproc cụ thể. Chúng ta thấy 3 đối số ở đây:
task_id
– Giống như trong BashOperator, đây là tên mà chúng ta chỉ định cho toán tử. Bạn có thể xem tên này từ giao diện người dùng Airflowcluster_name
– Tên mà chúng ta chỉ định cho cụm Cloud Dataproc. Ở đây, chúng tôi đặt tên làcomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(xem hộp thông tin sau phần "Tạo cụm Dataproc" để biết thêm thông tin không bắt buộc)trigger_rule
– Chúng ta đã đề cập ngắn gọn về Quy tắc kích hoạt trong quá trình nhập ở đầu bước này, nhưng ở đây chúng ta có một quy tắc đang hoạt động. Theo mặc định, toán tử Airflow sẽ không thực thi trừ phi tất cả toán tử ngược dòng của toán tử đó đã hoàn tất thành công. Quy tắc điều kiện kích hoạtALL_DONE
chỉ yêu cầu tất cả các toán tử ngược dòng phải hoàn tất, bất kể các toán tử đó có thành công hay không. Ở đây, điều này có nghĩa là ngay cả khi công việc Hadoop không thành công, chúng ta vẫn muốn chia nhỏ cụm đó.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
Cuối cùng, chúng ta muốn các toán tử này thực thi theo một thứ tự cụ thể và chúng ta có thể biểu thị điều này bằng cách sử dụng toán tử dịch bit Python. Trong trường hợp này, create_dataproc_cluster
sẽ luôn thực thi trước, sau đó là run_dataproc_hadoop
và cuối cùng là delete_dataproc_cluster
.
Khi kết hợp tất cả lại với nhau, mã sẽ có dạng như sau:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Tải tệp Airflow lên Cloud Storage
Sao chép DAG vào thư mục /dags của bạn
- Trước tiên, hãy mở Cloud Shell, trong đó có Cloud SDK được cài đặt một cách thuận tiện cho bạn.
- Sao chép kho lưu trữ mẫu python và thay đổi thành thư mục Compose/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- Chạy lệnh sau để đặt tên thư mục DAG thành một biến môi trường
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- Chạy lệnh
gsutil
sau để sao chép mã hướng dẫn vào nơi tạo thư mục /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
Kết quả sẽ có dạng như sau:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. Sử dụng giao diện người dùng Airflow
Cách truy cập vào giao diện web của Airflow bằng bảng điều khiển GCP:
|
Để biết thông tin về giao diện người dùng Airflow, hãy xem bài viết Truy cập vào giao diện web.
Xem biến
Các biến bạn đã đặt trước đó sẽ được lưu giữ trong môi trường của bạn. Bạn có thể xem các biến bằng cách chọn Quản trị > Biến trên thanh trình đơn giao diện người dùng Airflow.
Khám phá các lần chạy DAG
Khi bạn tải tệp DAG lên thư mục dags
trong Cloud Storage, Cloud Composer sẽ phân tích cú pháp tệp đó. Nếu không tìm thấy lỗi, tên của quy trình công việc sẽ xuất hiện trong danh sách DAG và quy trình công việc sẽ được đưa vào hàng đợi để chạy ngay lập tức. Để xem DAG, hãy nhấp vào DAG ở đầu trang.
Nhấp vào composer_hadoop_tutorial
để mở trang chi tiết về DAG. Trang này bao gồm bản trình bày dạng hình ảnh về các nhiệm vụ trong quy trình công việc và phần phụ thuộc.
Bây giờ, trên thanh công cụ, hãy nhấp vào Graph View (Chế độ xem biểu đồ) rồi di chuột qua hình ảnh đồ hoạ cho từng tác vụ để xem trạng thái của tác vụ đó. Xin lưu ý rằng đường viền xung quanh mỗi tác vụ cũng cho biết trạng thái (đường viền màu xanh lục = đang chạy; màu đỏ = không thành công, v.v.).
Để chạy lại quy trình công việc từ Chế độ xem biểu đồ:
- Trong chế độ xem Biểu đồ trên giao diện người dùng Airflow, hãy nhấp vào hình ảnh đồ hoạ
create_dataproc_cluster
. - Nhấp vào Xoá để đặt lại 3 việc cần làm, rồi nhấp vào OK để xác nhận.
Bạn cũng có thể kiểm tra trạng thái và kết quả của quy trình công việc composer-hadoop-tutorial
bằng cách truy cập vào các trang sau trong Bảng điều khiển GCP:
- Các cụm Cloud Dataproc để giám sát việc tạo và xoá cụm. Xin lưu ý rằng cụm do quy trình tạo ra là tạm thời: cụm này chỉ tồn tại trong suốt thời gian của quy trình công việc và sẽ bị xoá trong nhiệm vụ mới nhất của quy trình công việc.
- Công việc trên Cloud Dataproc để xem hoặc theo dõi công việc đếm từ trên Apache Hadoop. Nhấp vào ID công việc để xem kết quả của nhật ký công việc.
- Trình duyệt Cloud Storage để xem kết quả của số từ trong thư mục
wordcount
trong bộ chứa Cloud Storage mà bạn đã tạo cho lớp học lập trình này.
7. Dọn dẹp
Cách tránh tính phí vào tài khoản GCP đối với những tài nguyên bạn dùng trong lớp học lập trình này:
- (Không bắt buộc) Để lưu dữ liệu, hãy tải dữ liệu xuống từ bộ chứa Cloud Storage cho môi trường Cloud Composer và bộ chứa bộ nhớ mà bạn đã tạo cho lớp học lập trình này.
- Xoá bộ chứa Cloud Storage mà bạn đã tạo cho lớp học lập trình này.
- Xoá bộ chứa trong Cloud Storage cho môi trường đó.
- Xoá môi trường Cloud Composer. Xin lưu ý rằng việc xoá môi trường sẽ không xoá bộ chứa lưu trữ của môi trường đó.
Bạn cũng có thể xoá dự án nếu muốn:
- Trong Bảng điều khiển GCP, hãy chuyển đến trang Dự án.
- Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Xoá.
- Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.