在 Dataproc 集群上运行 Hadoop WordCount 作业

1. 简介

工作流是数据分析中的常见用例,它涉及提取、转换和分析数据,以从中找到有意义的信息。在 Google Cloud Platform 中,用于编排工作流的工具是 Cloud Composer,它是热门的开源工作流工具 Apache Airflow 的托管版本。在此实验中,您将使用 Cloud Composer 创建一个简单的工作流,该工作流会创建 Cloud Dataproc 集群,使用 Cloud Dataproc 和 Apache Hadoop 对其进行分析,然后在分析完成后删除 Cloud Dataproc 集群。

什么是 Cloud Composer?

Cloud Composer 是一项全托管式工作流编排服务,您可以利用它编写、安排和监管跨越多项云服务和本地数据中心的流水线。Cloud Composer 在备受欢迎的 Apache Airflow 开源项目基础上构建而成,使用 Python 编程语言进行操作,易于使用且没有束缚。

通过使用 Cloud Composer(而不是 Apache Airflow 的本地实例),用户可以从 Airflow 的强大功能中获益,而不会产生安装或管理开销。

什么是 Apache Airflow?

Apache Airflow 是一款开源工具,用于以程序化方式编写、调度和监控工作流。以下是与 Airflow 相关的一些关键术语,您将在整个实验中看到这些术语:

  • DAG - DAG(有向无环图)是您要安排和运行的有序任务的集合。DAG(也称为工作流)在标准 Python 文件中定义
  • 执行器 - 执行器描述了工作流中的单个任务

什么是 Cloud Dataproc?

Cloud Dataproc 是 Google Cloud Platform 的全托管式 Apache SparkApache Hadoop 服务。Cloud Dataproc 可轻松与其他 GCP 服务集成,为您提供一个强大且完整的数据处理、分析和机器学习平台。

操作内容

本 Codelab 演示了如何在 Cloud Composer 中创建并运行 Apache Airflow 工作流,该工作流可完成以下任务:

  • 创建 Cloud Dataproc 集群
  • 在集群上运行 Apache Hadoop 字数统计作业,并将其结果输出到 Cloud Storage
  • 删除集群

学习内容

  • 如何在 Cloud Composer 中创建和运行 Apache Airflow 工作流
  • 如何使用 Cloud Composer 和 Cloud Dataproc 对数据集运行分析
  • 如何通过 Google Cloud Platform Console、Cloud SDK 和 Airflow 网页界面访问 Cloud Composer 环境

所需条件

  • GCP 账号
  • 具备基本的 CLI 知识
  • 对 Python 有基本的了解

2. 设置 GCP

创建项目

选择或创建 Google Cloud Platform 项目。

记下您的项目 ID,您将在后续步骤中使用该 ID。

如果您要创建新项目,则可以在创建页面上的“项目名称”下方找到项目 ID

如果您已创建项目,可以在控制台首页上的“项目信息”卡片中找到项目 ID

启用 API

启用 Cloud Composer、Cloud Dataproc 和 Cloud Storage API。启用这些 API 后,您可以忽略“前往凭据”按钮,然后继续执行本教程的下一步。

创建 Composer 环境

创建 Cloud Composer 环境,并采用以下配置:

  • 名称:my-composer-environment
  • 位置:us-central1
  • 可用区:us-central1-a

所有其他配置都可以保留默认值。点击底部的“创建”。

创建 Cloud Storage 存储分区

在项目中,创建一个 Cloud Storage 存储分区,并采用以下配置:

  • 名称:<your-project-id>
  • 默认存储类别:Multi-Regional
  • 所在地:美国
  • 访问权限控制模型:精细

准备就绪后,按“创建”

3. 设置 Apache Airflow

查看 Composer 环境信息

在 GCP Console 中,打开环境页面

点击环境名称以查看其详细信息。

环境详情页面会提供 Airflow 网页界面网址、Google Kubernetes Engine 集群 ID、Cloud Storage 存储分区的名称以及 /dags 文件夹的路径等信息。

在 Airflow 中,DAG(有向无环图)是您要安排和运行的有序任务的集合。DAG(也称为工作流)在标准 Python 文件中定义。Cloud Composer 只会安排 /dags 文件夹中的 DAG。/dags 文件夹位于 Cloud Composer 在您创建环境时自动创建的 Cloud Storage 存储分区中。

设置 Apache Airflow 环境变量

Apache Airflow 变量是 Airflow 特有的一个概念,此类变量不同于环境变量。在此步骤中,您将设置以下三个 Airflow 变量gcp_projectgcs_bucketgce_zone

使用 gcloud 设置变量

首先,打开 Cloud Shell,其中已为您便捷地安装了 Cloud SDK。

将环境变量 COMPOSER_INSTANCE 设置为 Composer 环境的名称

COMPOSER_INSTANCE=my-composer-environment

如需使用 gcloud 命令行工具设置 Airflow 变量,请使用 gcloud composer environments run 命令和 variables 子命令。此 gcloud composer 命令会执行 Airflow CLI 子命令 variables。该子命令会将参数传递给 gcloud 命令行工具。

您将运行此命令三次,并将变量替换为与您的项目相关的变量。

使用以下命令设置 gcp_project,并将 <your-project-id> 替换为您在第 2 步中记下的项目 ID。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

输出将如下所示

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

使用以下命令设置 gcs_bucket,并将 <your-bucket-name> 替换为您在第 2 步中记下的存储分区 ID。如果您按照我们的建议操作,则存储分区名称与项目 ID 相同。输出将与上一个命令类似。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

使用以下命令设置 gce_zone。输出将与之前的命令类似。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(可选)使用 gcloud 查看变量

如需查看变量的值,请运行带 get 实参的 Airflow CLI 子命令 variables,或使用 Airflow 界面

例如:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

您可以使用刚刚设置的三个变量中的任意一个来执行此操作:gcp_projectgcs_bucketgce_zone

4. 工作流示例

我们来看看将在第 5 步中使用的 DAG 的代码。暂时不用担心下载文件,只需按照此处的说明操作即可。

这里有很多内容需要解读,我们来简单说明一下。

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

我们先从一些 Airflow 导入开始:

  • airflow.models - 允许我们在 Airflow 数据库中访问和创建数据。
  • airflow.contrib.operators - 社区中运营商的所在地。在这种情况下,我们需要 dataproc_operator 来访问 Cloud Dataproc API。
  • airflow.utils.trigger_rule - 用于向我们的运算符添加触发器规则。借助触发规则,您可以精细地控制运算符是否应根据其父级的状态执行。
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

这指定了输出文件的位置。这里值得注意的一行是 models.Variable.get('gcs_bucket'),它将从 Airflow 数据库中获取 gcs_bucket 变量值。

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - 我们最终将在 Cloud Dataproc 集群上运行的 .jar 文件的位置。我们已为您将其托管在 GCP 上。
  • input_file - 包含 Hadoop 作业最终将计算的数据的文件位置。我们将在第 5 步中将数据一起上传到该位置。
  • wordcount_args - 将传递到 jar 文件中的实参。
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

这将为我们提供一个表示前一天午夜的等效日期时间对象。例如,如果在 3 月 4 日 11:00 执行此函数,则 datetime 对象将表示 3 月 3 日 00:00。这与 Airflow 处理调度的方式有关。如需了解相关详情,请点击此处

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

每当创建新的 DAG 时,都应提供字典形式的 default_dag_args 变量:

  • 'email_on_failure' - 指示任务失败时是否应发送电子邮件提醒
  • 'email_on_retry' - 指示在重试任务时是否应发送电子邮件提醒
  • 'retries' - 表示在 DAG 失败的情况下,Airflow 应尝试重试多少次
  • 'retry_delay' - 表示 Airflow 在尝试重试之前应等待多长时间
  • 'project_id' - 告知 DAG 要将其与哪个 GCP 项目 ID 相关联,这将在稍后使用 Dataproc 运算符时用到
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

使用 with models.DAG 可告知脚本将该命令下方的所有内容都包含在同一 DAG 中。我们还看到传入了三个实参:

  • 第一个参数是字符串,表示要创建的 DAG 的名称。在本例中,我们使用的是 composer_hadoop_tutorial
  • schedule_interval - 一个 datetime.timedelta 对象,此处我们将其设置为一天。这意味着,此 DAG 将尝试在 'default_dag_args' 中之前设置的 'start_date' 之后每天执行一次
  • default_args - 我们之前创建的字典,其中包含 DAG 的默认实参

创建 Dataproc 集群

接下来,我们将创建一个 dataproc_operator.DataprocClusterCreateOperator,用于创建 Cloud Dataproc 集群。

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

在此运算符中,我们看到了一些实参,除了第一个实参之外,所有实参都特定于此运算符:

  • task_id - 与在 BashOperator 中一样,这是我们为操作器分配的名称,可从 Airflow 界面中查看
  • cluster_name - 我们为 Cloud Dataproc 集群分配的名称。在此示例中,我们将其命名为 composer-hadoop-tutorial-cluster-{{ ds_nodash }}(如需了解可选的其他信息,请参阅信息框)
  • num_workers - 为 Cloud Dataproc 集群分配的工作节点数
  • zone - 我们希望集群所在的地理区域,如 Airflow 数据库中所保存的那样。这将读取我们在第 3 步中设置的 'gce_zone' 变量
  • master_machine_type - 我们要分配给 Cloud Dataproc 主实例的机器类型
  • worker_machine_type - 我们要分配给 Cloud Dataproc 工作器的机器类型

提交 Apache Hadoop 作业

借助 dataproc_operator.DataProcHadoopOperator,我们可以将作业提交到 Cloud Dataproc 集群。

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

我们提供了多个参数:

  • task_id - 我们为此 DAG 片段分配的名称
  • main_jar - 我们希望针对集群运行的 .jar 文件的位置
  • cluster_name - 要运行作业的集群的名称,您会发现该名称与我们在上一个运算符中找到的名称相同
  • arguments - 传递到 JAR 文件中的实参,与从命令行执行 .jar 文件时传递的实参相同

删除集群

我们将创建的最后一个运算符是 dataproc_operator.DataprocClusterDeleteOperator

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

顾名思义,此运算符将删除指定的 Cloud Dataproc 集群。这里有三个实参:

  • task_id - 与在 BashOperator 中一样,这是我们为操作器分配的名称,可从 Airflow 界面中查看
  • cluster_name - 我们为 Cloud Dataproc 集群分配的名称。在此示例中,我们将其命名为 composer-hadoop-tutorial-cluster-{{ ds_nodash }}(如需了解可选的其他信息,请参阅“创建 Dataproc 集群”后的信息框)
  • trigger_rule - 我们在本步骤开始时的导入过程中简要介绍了触发规则,但这里有一个实际应用的触发规则。默认情况下,除非某个 Airflow 运算符的所有上游运算符都已成功完成,否则该运算符不会执行。ALL_DONE 触发规则仅要求所有上游运算符都已完成,无论它们是否成功完成。在此处,这意味着即使 Hadoop 作业失败,我们仍希望拆除集群。
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

最后,我们希望这些运算符按特定顺序执行,可以使用 Python 位移运算符来表示这一点。在这种情况下,create_dataproc_cluster 将始终先执行,然后是 run_dataproc_hadoop,最后是 delete_dataproc_cluster

综上,代码如下所示:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. 将 Airflow 文件上传到 Cloud Storage

将 DAG 复制到您的 /dags 文件夹

  1. 首先,打开 Cloud Shell,其中已为您便捷地安装了 Cloud SDK。
  2. 克隆 Python 示例代码库并切换到 composer/workflows 目录
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. 运行以下命令,将 DAG 文件夹的名称设置为环境变量
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. 运行以下 gsutil 命令,将教程代码复制到创建 /dags 文件夹的位置
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

输出将如下所示:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. 使用 Airflow 界面

如需使用 GCP 控制台访问 Airflow 网页界面,请执行以下操作:

  1. 打开环境页面。
  2. 在环境对应的 Airflow Web 服务器列中,点击新窗口图标。Airflow 网页界面会在新的浏览器窗口中打开。

如需了解 Airflow 界面,请参阅访问网页界面

查看变量

您之前设置的变量会保留在您的环境中。您可以从 Airflow 界面菜单栏中选择 Admin > Variables,以查看变量。

列表标签页处于选中状态,并显示一个包含以下键和值的表格:键:gcp_project,值:project-id;键:gcs_bucket,值:gs://bucket-name;键:gce_zone,值:zone

探索 DAG 运行

当您将 DAG 文件上传到 Cloud Storage 中的 dags 文件夹时,Cloud Composer 会解析该文件。如果未发现任何错误,工作流的名称会显示在 DAG 列表中,并且工作流会排入队列以立即运行。如需查看您的 DAG,请点击页面顶部的 DAG

84a29c71f20bff98.png

点击 composer_hadoop_tutorial 以打开 DAG 详情页面。此页面包含工作流任务和依赖项的图形表示。

f4f1663c7a37f47c.png

现在,在工具栏中点击图表视图,然后将鼠标悬停在每个任务对应的图形上,查看其状态。请注意,每个任务的边框也指示状态(绿色边框表示任务正在运行中;红色边框表示任务失败,等等)。

4c5a0c6fa9f88513.png

如需从 Graph View 重新运行工作流,请执行以下操作:

  1. 在 Airflow 界面的“Graph View”中,点击 create_dataproc_cluster 图形。
  2. 点击清除可重置这三项任务,然后点击确定进行确认。

fd1b23b462748f47.png

您还可以转到以下 GCP 控制台页面来检查 composer-hadoop-tutorial 工作流的状态和结果:

  • Cloud Dataproc 集群:可以监控集群的创建和删除。请注意,由工作流创建的集群是临时性的,也就是说,此类集群仅在工作流的持续期间内存在,并且将在最后一个工作流任务的执行过程中删除。
  • Cloud Dataproc 作业:可以查看或监控 Apache Hadoop Wordcount 作业。点击“任务 ID”即可查看作业日志输出。
  • Cloud Storage 浏览器:可以在您为本 Codelab 创建的 Cloud Storage 存储分区所含的 wordcount 文件夹中查看 WordCount 的结果。

7. 清理

为避免系统因本 Codelab 中使用的资源向您的 GCP 账号收取费用,请执行以下操作:

  1. (可选)如需保存数据,请从 Cloud Composer 环境的 Cloud Storage 存储分区以及您为此 Codelab 创建的存储分区中下载数据
  2. 删除您为此 Codelab 创建的 Cloud Storage 存储分区。
  3. 删除环境的 Cloud Storage 存储分区
  4. 删除 Cloud Composer 环境。请注意,删除环境并不会删除其存储分区。

您还可以选择删除项目:

  1. 在 GCP Console 中,前往项目页面。
  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在框中输入项目 ID,然后点击关停以删除项目。