Chạy công việc đếm từ Hadoop trên cụm Dataproc

1. Giới thiệu

Quy trình công việc là một trường hợp sử dụng phổ biến trong hoạt động phân tích dữ liệu. Quy trình này bao gồm việc nhập, chuyển đổi và phân tích dữ liệu để tìm thông tin có ý nghĩa trong đó. Trong Google Cloud Platform, công cụ điều phối quy trình công việc là Cloud Composer. Đây là phiên bản được lưu trữ của công cụ quy trình công việc nguồn mở phổ biến Apache Airflow. Trong lớp học này, bạn sẽ sử dụng Cloud Composer để tạo một quy trình công việc đơn giản giúp tạo cụm Cloud Dataproc, phân tích cụm đó bằng Cloud Dataproc và Apache Hadoop, sau đó xoá cụm Cloud Dataproc.

Cloud Composer là gì?

Cloud Composer là một dịch vụ điều phối quy trình công việc được quản lý toàn diện, giúp bạn tạo, lên lịch và giám sát các quy trình trải dài trên đám mây và trung tâm dữ liệu tại chỗ. Được xây dựng dựa trên dự án nguồn mở Apache Airflow phổ biến và hoạt động bằng ngôn ngữ lập trình Python, Cloud Composer không bị phụ thuộc và dễ sử dụng.

Khi sử dụng Cloud Composer thay vì phiên bản cục bộ của Apache Airflow, người dùng có thể hưởng lợi từ những tính năng tốt nhất của Airflow mà không phải tốn chi phí cài đặt hay quản lý.

Apache Airflow là gì?

Apache Airflow là một công cụ nguồn mở dùng để tạo, lên lịch và giám sát quy trình công việc theo phương thức lập trình. Có một số thuật ngữ quan trọng cần nhớ liên quan đến Luồng khí mà bạn sẽ thấy trong suốt phòng thí nghiệm:

  • DAG – DAG (Đồ thị không chu trình có hướng) là một tập hợp các tác vụ được sắp xếp mà bạn muốn lên lịch và chạy. DAG (còn gọi là quy trình công việc) được xác định trong các tệp Python chuẩn
  • Toán tử – toán tử mô tả một tác vụ duy nhất trong quy trình làm việc

Cloud Dataproc là gì?

Cloud Dataproc là dịch vụ Apache SparkApache Hadoop của Google Cloud Platform được quản lý hoàn toàn. Cloud Dataproc dễ dàng tích hợp với các dịch vụ khác của GCP, mang đến cho bạn một nền tảng toàn diện và mạnh mẽ cho việc xử lý dữ liệu, phân tích và học máy.

Việc bạn sẽ làm

Lớp học lập trình này hướng dẫn bạn cách tạo và chạy quy trình công việc Apache Airflow trong Cloud Composer để hoàn thành các nhiệm vụ sau:

  • Tạo cụm Cloud Dataproc
  • Chạy một công việc đếm từ Apache Hadoop trên cụm và xuất kết quả của công việc đó vào Cloud Storage
  • Xoá cụm

Kiến thức bạn sẽ học được

  • Cách tạo và chạy quy trình công việc Apache Airflow trong Cloud Composer
  • Cách sử dụng Cloud Composer và Cloud Dataproc để chạy bản phân tích trên một tập dữ liệu
  • Cách truy cập vào môi trường Cloud Composer thông qua Bảng điều khiển Google Cloud Platform, Cloud SDK và giao diện web Airflow

Bạn cần có

  • Tài khoản GCP
  • Kiến thức cơ bản về CLI
  • Hiểu biết cơ bản về Python

2. Thiết lập GCP

Tạo dự án

Chọn hoặc tạo một Dự án Google Cloud Platform.

Ghi lại mã dự án mà bạn sẽ sử dụng trong các bước sau.

Nếu đang tạo một dự án mới, bạn sẽ thấy mã dự án ngay bên dưới Tên dự án trên trang tạo

Nếu đã tạo dự án, bạn có thể tìm mã nhận dạng trên trang chủ của bảng điều khiển trong thẻ Thông tin dự án

Bật API

Bật các API Cloud Composer, Cloud Dataproc và Cloud Storage.Sau khi bật các API này, bạn có thể bỏ qua nút có nội dung "Go to Credentials" (Truy cập vào thông tin xác thực) và chuyển sang bước tiếp theo của hướng dẫn.

Tạo môi trường Composer

Tạo môi trường Cloud Composer bằng cấu hình sau:

  • Tên: my-composer-environment
  • Vị trí: us-central1
  • Vùng: us-central1-a

Tất cả cấu hình khác có thể giữ nguyên giá trị mặc định. Nhấp vào "Tạo" ở dưới cùng.

Tạo bộ chứa Cloud Storage

Trong dự án của bạn, hãy tạo một bộ chứa Cloud Storage với cấu hình sau:

  • Tên: <your-project-id>
  • Lớp bộ nhớ mặc định: Đa khu vực
  • Vị trí: Hoa Kỳ
  • Mô hình kiểm soát quyền truy cập: chi tiết

Nhấn "Tạo" khi bạn đã sẵn sàng

3. Thiết lập Apache Airflow

Xem thông tin môi trường Compose

Trong Bảng điều khiển GCP, hãy mở trang Môi trường

Nhấp vào tên của môi trường để xem thông tin chi tiết.

Trang Thông tin chi tiết về môi trường cung cấp thông tin, chẳng hạn như URL giao diện web Airflow, mã nhận dạng cụm Google Kubernetes Engine, tên của bộ chứa Cloud Storage và đường dẫn đến thư mục /dags.

Trong Airflow, DAG (Đồ thị không chu trình có hướng) là một tập hợp các tác vụ được sắp xếp mà bạn muốn lên lịch và chạy. DAG (còn gọi là quy trình công việc) được xác định trong các tệp Python chuẩn. Cloud Composer chỉ lên lịch cho các DAG trong thư mục /dags. Thư mục /dags nằm trong bộ chứa Cloud Storage mà Cloud Composer tạo tự động khi bạn tạo môi trường.

Cài đặt biến môi trường luồng khí Apache

Biến Apache Airflow là một khái niệm dành riêng cho luồng Airflow, khác với các biến môi trường. Trong bước này, bạn sẽ thiết lập 3 biến Luồng không khí sau: gcp_project, gcs_bucketgce_zone.

Sử dụng gcloud để Đặt biến

Trước tiên, hãy mở Cloud Shell. Cloud Shell đã cài đặt sẵn Cloud SDK cho bạn.

Đặt biến môi trường COMPOSER_INSTANCE thành tên của môi trường trong Composer

COMPOSER_INSTANCE=my-composer-environment

Để đặt biến Airflow bằng công cụ dòng lệnh gcloud, hãy sử dụng lệnh gcloud composer environments run với lệnh con variables. Lệnh gcloud composer này thực thi lệnh con CLI của Airflow variables. Lệnh con sẽ truyền các đối số đến công cụ dòng lệnh gcloud.

Bạn sẽ chạy lệnh này ba lần, thay thế các biến bằng các biến liên quan đến dự án của mình.

Đặt gcp_project bằng lệnh sau, thay thế <your-project-id> bằng mã dự án mà bạn đã ghi chú ở Bước 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Kết quả của bạn sẽ có dạng như sau

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

Đặt gcs_bucket bằng lệnh sau đây, thay thế <your-bucket-name> bằng mã nhóm mà bạn đã ghi chú trong Bước 2. Nếu bạn làm theo đề xuất của chúng tôi, tên bộ chứa của bạn sẽ giống với mã dự án. Kết quả sẽ tương tự như lệnh trước.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Đặt gce_zone bằng lệnh sau. Kết quả của bạn sẽ tương tự như các lệnh trước đó.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(Không bắt buộc) Sử dụng gcloud để xem biến

Để xem giá trị của một biến, hãy chạy lệnh con CLI của Airflow variables với đối số get hoặc sử dụng Giao diện người dùng của Airflow.

Ví dụ:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

Bạn có thể thực hiện việc này bằng một trong 3 biến vừa đặt: gcp_project, gcs_bucketgce_zone.

4. Quy trình làm việc mẫu

Hãy xem mã cho DAG mà chúng ta sẽ sử dụng ở bước 5. Đừng lo về việc tải tệp xuống, bạn chỉ cần làm theo hướng dẫn tại đây.

Có rất nhiều điều cần tìm hiểu ở đây, vì vậy, hãy cùng phân tích một chút.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

Chúng ta bắt đầu bằng một số lệnh nhập Airflow:

  • airflow.models – Cho phép chúng ta truy cập và tạo dữ liệu trong cơ sở dữ liệu Airflow.
  • airflow.contrib.operators – Nơi các nhà điều hành trong cộng đồng sinh sống. Trong trường hợp này, chúng ta cần có dataproc_operator để truy cập vào API Cloud Dataproc.
  • airflow.utils.trigger_rule – Để thêm quy tắc kích hoạt vào toán tử của chúng tôi. Quy tắc kích hoạt cho phép kiểm soát chi tiết việc một toán tử có nên thực thi hay không liên quan đến trạng thái của toán tử mẹ.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

Tham số này chỉ định vị trí của tệp đầu ra. Dòng đáng chú ý ở đây là models.Variable.get('gcs_bucket') sẽ lấy giá trị biến gcs_bucket từ cơ sở dữ liệu Airflow.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR – Vị trí của tệp .jar mà sau này chúng tôi sẽ chạy trên cụm Cloud Dataproc. Ứng dụng này đã được lưu trữ trên GCP cho bạn.
  • input_file – Vị trí của tệp chứa dữ liệu mà công việc Hadoop của chúng ta sẽ dùng để tính toán sau này. Chúng ta sẽ tải dữ liệu lên vị trí đó cùng nhau trong Bước 5.
  • wordcount_args – Các đối số mà chúng tôi sẽ chuyển vào tệp jar.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

Điều này sẽ cung cấp cho chúng ta một đối tượng ngày giờ tương đương đại diện cho nửa đêm của ngày trước đó. Chẳng hạn, nếu điều này được thực thi vào lúc 11:00 ngày 4 tháng 3, thì đối tượng ngày giờ sẽ đại diện cho 00:00 ngày 3 tháng 3. Điều này có liên quan đến cách Airflow xử lý việc lên lịch. Bạn có thể xem thêm thông tin về vấn đề này tại đây.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

Biến default_dag_args ở dạng từ điển phải được cung cấp mỗi khi tạo DAG mới:

  • 'email_on_failure' – Cho biết có gửi cảnh báo qua email khi không thực hiện được một tác vụ hay không
  • 'email_on_retry' – Cho biết liệu có gửi cảnh báo qua email khi thử lại một tác vụ hay không
  • 'retries' – Cho biết số lần luồng khí cần thử lại trong trường hợp không thành công DAG
  • 'retry_delay' – Cho biết thời gian Luồng khí phải đợi trước khi thử lại
  • 'project_id' – Cho DAG biết mã dự án GCP cần liên kết, mã này sẽ cần thiết sau này với Toán tử Dataproc
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Việc sử dụng with models.DAG yêu cầu tập lệnh phải đưa mọi nội dung bên dưới vào cùng một DAG. Chúng ta cũng thấy 3 đối số được truyền vào:

  • Chuỗi đầu tiên là tên đặt cho DAG mà chúng ta đang tạo. Trong trường hợp này, chúng ta sẽ sử dụng composer_hadoop_tutorial.
  • schedule_interval – Đối tượng datetime.timedelta mà chúng ta đã đặt thành một ngày. Tức là DAG này sẽ cố thực thi mỗi ngày một lần sau 'start_date' đã đặt trước đó trong 'default_dag_args'
  • default_args – Từ điển chúng ta tạo trước đó có chứa các đối số mặc định cho DAG

Tạo cụm Dataproc

Tiếp theo, chúng ta sẽ tạo một dataproc_operator.DataprocClusterCreateOperator để tạo cụm Cloud Dataproc.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

Trong toán tử này, chúng ta thấy một vài đối số, tất cả ngoại trừ đối số đầu tiên là dành riêng cho toán tử này:

  • task_id – Giống như trong BashOperator, đây là tên mà chúng ta chỉ định cho toán tử. Bạn có thể xem tên này từ giao diện người dùng Airflow
  • cluster_name – Tên mà chúng ta chỉ định cho cụm Cloud Dataproc. Ở đây, chúng tôi đặt tên là composer-hadoop-tutorial-cluster-{{ ds_nodash }} (xem hộp thông tin để biết thông tin bổ sung không bắt buộc)
  • num_workers – Số lượng worker mà chúng tôi phân bổ cho cụm Cloud Dataproc
  • zone – Khu vực địa lý mà chúng ta muốn đặt cụm, được lưu trong cơ sở dữ liệu Airflow. Thao tác này sẽ đọc biến 'gce_zone' mà chúng ta đã đặt ở Bước 3
  • master_machine_type – Loại máy mà chúng tôi muốn phân bổ cho máy chủ Cloud Dataproc
  • worker_machine_type – Loại máy mà chúng ta muốn phân bổ cho worker Cloud Dataproc

Gửi công việc Apache Hadoop

dataproc_operator.DataProcHadoopOperator cho phép chúng ta gửi một công việc đến cụm Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

Chúng tôi cung cấp một vài thông số:

  • task_id – Tên mà chúng tôi chỉ định cho phần này của DAG
  • main_jar – Vị trí của tệp .jar mà chúng ta muốn chạy trên cụm
  • cluster_name – Tên của cụm để chạy công việc dựa trên đó mà bạn sẽ thấy giống với tên chúng ta tìm thấy trong toán tử trước đó
  • arguments – Các đối số được truyền vào tệp jar, như khi bạn thực thi tệp .jar từ dòng lệnh

Xoá cụm

Toán tử cuối cùng chúng ta sẽ tạo là dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

Đúng như tên gọi, toán tử này sẽ xoá một cụm Cloud Dataproc cụ thể. Chúng ta thấy 3 đối số ở đây:

  • task_id – Giống như trong BashOperator, đây là tên mà chúng ta chỉ định cho toán tử. Bạn có thể xem tên này từ giao diện người dùng Airflow
  • cluster_name – Tên mà chúng ta chỉ định cho cụm Cloud Dataproc. Ở đây, chúng tôi đặt tên là composer-hadoop-tutorial-cluster-{{ ds_nodash }} (xem hộp thông tin sau phần "Tạo cụm Dataproc" để biết thêm thông tin không bắt buộc)
  • trigger_rule – Chúng ta đã đề cập ngắn gọn về Quy tắc kích hoạt trong quá trình nhập ở đầu bước này, nhưng ở đây chúng ta có một quy tắc đang hoạt động. Theo mặc định, toán tử Airflow sẽ không thực thi trừ phi tất cả toán tử ngược dòng của toán tử đó đã hoàn tất thành công. Quy tắc điều kiện kích hoạt ALL_DONE chỉ yêu cầu tất cả các toán tử ngược dòng phải hoàn tất, bất kể các toán tử đó có thành công hay không. Ở đây, điều này có nghĩa là ngay cả khi công việc Hadoop không thành công, chúng ta vẫn muốn chia nhỏ cụm đó.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Cuối cùng, chúng ta muốn các toán tử này thực thi theo một thứ tự cụ thể và chúng ta có thể biểu thị điều này bằng cách sử dụng toán tử dịch bit Python. Trong trường hợp này, create_dataproc_cluster sẽ luôn thực thi trước, sau đó là run_dataproc_hadoop và cuối cùng là delete_dataproc_cluster.

Khi kết hợp tất cả lại với nhau, mã sẽ có dạng như sau:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Tải tệp Airflow lên Cloud Storage

Sao chép DAG vào thư mục /dags của bạn

  1. Trước tiên, hãy mở Cloud Shell, trong đó có Cloud SDK được cài đặt một cách thuận tiện cho bạn.
  2. Sao chép kho lưu trữ mẫu python và thay đổi thành thư mục Compose/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. Chạy lệnh sau để đặt tên thư mục DAG thành một biến môi trường
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. Chạy lệnh gsutil sau để sao chép mã hướng dẫn vào nơi tạo thư mục /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

Kết quả sẽ có dạng như sau:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Sử dụng giao diện người dùng Airflow

Cách truy cập vào giao diện web của Airflow bằng bảng điều khiển GCP:

  1. Mở trang Môi trường.
  2. Trong cột Máy chủ web của Airflow cho môi trường, hãy nhấp vào biểu tượng cửa sổ mới. Giao diện người dùng web của Airflow sẽ mở ra trong một cửa sổ trình duyệt mới.

Để biết thông tin về giao diện người dùng Airflow, hãy xem bài viết Truy cập vào giao diện web.

Xem biến

Các biến bạn đã đặt trước đó sẽ được lưu giữ trong môi trường của bạn. Bạn có thể xem các biến bằng cách chọn Quản trị > Biến trên thanh trình đơn giao diện người dùng Airflow.

Thẻ Danh sách được chọn và hiện một bảng có các khoá và khoá giá trị sau: gcp_project, value: project-id key: gcs_bucket, value: gs://bucket-name key: gce_zone, value: vùng

Khám phá các lần chạy DAG

Khi bạn tải tệp DAG lên thư mục dags trong Cloud Storage, Cloud Composer sẽ phân tích cú pháp tệp đó. Nếu không tìm thấy lỗi, tên của quy trình công việc sẽ xuất hiện trong danh sách DAG và quy trình công việc sẽ được đưa vào hàng đợi để chạy ngay lập tức. Để xem DAG, hãy nhấp vào DAG ở đầu trang.

84a29c71f20bff98.png

Nhấp vào composer_hadoop_tutorial để mở trang chi tiết về DAG. Trang này bao gồm bản trình bày dạng hình ảnh về các nhiệm vụ trong quy trình công việc và phần phụ thuộc.

f4f1663c7a37f47c.png

Bây giờ, trên thanh công cụ, hãy nhấp vào Graph View (Chế độ xem biểu đồ) rồi di chuột qua hình ảnh đồ hoạ cho từng tác vụ để xem trạng thái của tác vụ đó. Xin lưu ý rằng đường viền xung quanh mỗi tác vụ cũng cho biết trạng thái (đường viền màu xanh lục = đang chạy; màu đỏ = không thành công, v.v.).

4c5a0c6fa9f88513.pngS

Để chạy lại quy trình công việc từ Chế độ xem biểu đồ:

  1. Trong chế độ xem Biểu đồ trên giao diện người dùng Airflow, hãy nhấp vào hình ảnh đồ hoạ create_dataproc_cluster.
  2. Nhấp vào Xoá để đặt lại 3 việc cần làm, rồi nhấp vào OK để xác nhận.

fd1b23b462748f47.png

Bạn cũng có thể kiểm tra trạng thái và kết quả của quy trình công việc composer-hadoop-tutorial bằng cách truy cập vào các trang sau trong Bảng điều khiển GCP:

  • Các cụm Cloud Dataproc để giám sát việc tạo và xoá cụm. Xin lưu ý rằng cụm do quy trình tạo ra là tạm thời: cụm này chỉ tồn tại trong suốt thời gian của quy trình công việc và sẽ bị xoá trong nhiệm vụ mới nhất của quy trình công việc.
  • Công việc trên Cloud Dataproc để xem hoặc theo dõi công việc đếm từ trên Apache Hadoop. Nhấp vào ID công việc để xem kết quả của nhật ký công việc.
  • Trình duyệt Cloud Storage để xem kết quả của số từ trong thư mục wordcount trong bộ chứa Cloud Storage mà bạn đã tạo cho lớp học lập trình này.

7. Dọn dẹp

Cách tránh tính phí vào tài khoản GCP đối với những tài nguyên bạn dùng trong lớp học lập trình này:

  1. (Không bắt buộc) Để lưu dữ liệu, hãy tải dữ liệu xuống từ bộ chứa Cloud Storage cho môi trường Cloud Composer và bộ chứa bộ nhớ mà bạn đã tạo cho lớp học lập trình này.
  2. Xoá bộ chứa Cloud Storage mà bạn đã tạo cho lớp học lập trình này.
  3. Xoá bộ chứa trong Cloud Storage cho môi trường đó.
  4. Xoá môi trường Cloud Composer. Xin lưu ý rằng việc xoá môi trường sẽ không xoá bộ chứa lưu trữ của môi trường đó.

Bạn cũng có thể xoá dự án nếu muốn:

  1. Trong Bảng điều khiển GCP, hãy chuyển đến trang Dự án.
  2. Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Xoá.
  3. Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.