1. 简介
工作流是数据分析中的一种常见用例,工作流涉及提取、转换和分析数据,以在其中查找有意义的信息。在 Google Cloud Platform 中,用于编排工作流的工具是 Cloud Composer,它是热门开源工作流工具 Apache Airflow 的托管版本。在本实验中,您将使用 Cloud Composer 创建一个简单的工作流,该工作流会创建一个 Cloud Dataproc 集群,使用 Cloud Dataproc 和 Apache Hadoop 对其进行分析,然后删除 Cloud Dataproc 集群。
什么是 Cloud Composer?
Cloud Composer 是一项全托管式工作流编排服务,您可以利用它编写、安排和监管跨越多项云服务和本地数据中心的流水线。Cloud Composer 在备受欢迎的 Apache Airflow 开源项目基础上构建而成,通过 Python 编程语言操作,易于使用且没有供应商锁定。
通过使用 Cloud Composer(而不是 Apache Airflow 的本地实例),用户可以从 Airflow 的强大功能中获益,而不会产生安装或管理开销。
什么是 Apache Airflow?
Apache Airflow 是一种开源工具,用于以编程方式编写、安排和监控工作流。在整个实验中,您会看到与 Airflow 相关的几个关键术语:
- DAG - DAG(有向无环图)是您要安排和运行的有序任务的集合。DAG(也称为工作流)在标准 Python 文件中定义
- 运算符 - 运算符用于描述工作流中的单个任务
什么是 Cloud Dataproc?
Cloud Dataproc 是 Google Cloud Platform 的全托管式 Apache Spark 和 Apache Hadoop 服务。Cloud Dataproc 可轻松集成其他 GCP 服务,为您提供一个强大且完整的数据处理、分析和机器学习平台。
实践内容
本 Codelab 介绍了如何在 Cloud Composer 中创建并运行完成以下任务的 Apache Airflow 工作流:
- 创建 Cloud Dataproc 集群
- 在集群上运行 Apache Hadoop WordCount 作业,并将其结果输出到 Cloud Storage
- 删除集群
学习内容
- 如何在 Cloud Composer 中创建和运行 Apache Airflow 工作流
- 如何使用 Cloud Composer 和 Cloud Dataproc 对数据集运行分析
- 如何通过 Google Cloud Platform 控制台、Cloud SDK 和 Airflow 网页界面访问 Cloud Composer 环境
所需条件
- GCP 账号
- 具备基本的 CLI 知识
- 对 Python 有基本的了解
2. 设置 GCP
创建项目
选择或创建 Google Cloud Platform 项目。
记下您的项目 ID,您将在后续步骤中使用它。
在创建新项目时,项目 ID 位于创建页面上的“项目名称”正下方 | |
如果您已创建项目,可以在控制台首页的“项目信息”卡片中找到该 ID |
启用 API
启用 Cloud Composer、Cloud Dataproc 和 Cloud Storage API。启用这些 API 后,您可以忽略显示“前往凭据”的按钮,并继续执行本教程的下一步。 |
创建 Composer 环境
使用以下配置创建一个 Cloud Composer 环境:
所有其他配置都可以保留默认值。点击底部的“创建”。 |
创建 Cloud Storage 存储分区
在您的项目中,创建一个具有以下配置的 Cloud Storage 存储分区:
准备就绪后,按“创建” |
3. 设置 Apache Airflow
查看 Composer 环境信息
在 GCP 控制台中,打开环境页面
点击环境名称以查看其详细信息。
环境详情页面会提供 Airflow 网页界面网址、Google Kubernetes Engine 集群 ID、Cloud Storage 存储桶的名称以及 /dags 文件夹的路径等信息。
在 Airflow 中,DAG(有向无环图)是您要安排和运行的有序任务的集合。DAG(也称为工作流)在标准 Python 文件中定义。Cloud Composer 仅安排 /dags 文件夹中的 DAG。/dags 文件夹位于 Cloud Composer 在您创建环境时自动创建的 Cloud Storage 存储桶中。
设置 Apache Airflow 环境变量
Apache Airflow 变量是 Airflow 特有的概念,它与环境变量不同。在此步骤中,您将设置以下三个 Airflow 变量:gcp_project
、gcs_bucket
和 gce_zone
。
使用 gcloud
设置变量
首先,打开 Cloud Shell,它会为您方便地安装 Cloud SDK。
将环境变量 COMPOSER_INSTANCE
设置为 Composer 环境的名称
COMPOSER_INSTANCE=my-composer-environment
如需使用 gcloud 命令行工具设置 Airflow 变量,请使用 gcloud composer environments run
命令和 variables
子命令。此 gcloud composer
命令会执行 Airflow CLI 子命令 variables
。该子命令会将参数传递给 gcloud
命令行工具。
此命令将运行三次,用与您的项目相关的变量替换变量。
使用以下命令设置 gcp_project
,将 <your-project-id> 替换为您在第 2 步中记下的项目 ID。
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
输出将如下所示:
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
使用以下命令设置 gcs_bucket
,将 <your-bucket-name>
替换为您在第 2 步中记下的存储桶 ID。如果您遵循了我们的建议,则您的存储分区名称将与项目 ID 相同。输出将与上一个命令类似。
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
使用以下命令设置 gce_zone
。输出将与前面的命令类似。
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(可选)使用 gcloud
查看变量
如需查看变量的值,请使用 get
参数运行 Airflow CLI 子命令 variables
,或使用 Airflow 界面。
例如:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
您可以使用刚刚设置的三个变量中的任意一个来执行此操作:gcp_project
、gcs_bucket
和 gce_zone
。
4. 工作流示例
我们来看看将在第 5 步中使用的 DAG 的代码。不用担心文件下载问题,只需按此处说明操作即可。
这里有很多需要解压的东西,下面我们来详细介绍一下。
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
我们先导入一些 Airflow 代码:
airflow.models
- 允许我们访问和创建 Airflow 数据库中的数据。airflow.contrib.operators
- 社区操作员的居住地。在本例中,我们需要dataproc_operator
才能访问 Cloud Dataproc API。airflow.utils.trigger_rule
- 用于为运算符添加触发规则。触发器规则可让您精细控制操作符是否应根据其父级的状态执行。
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
这会指定输出文件的位置。其中值得注意的行是 models.Variable.get('gcs_bucket')
,它将从 Airflow 数据库获取 gcs_bucket
变量值。
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
- 我们最终在 Cloud Dataproc 集群上运行的 .jar 文件的位置。它已为您托管在 GCP 上。input_file
- 文件的位置,该文件包含我们的 Hadoop 作业最终将用于计算的数据。我们会在第 5 步中将数据一起上传到该位置。wordcount_args
- 我们将传递到 jar 文件中的参数。
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
这将为我们提供一个相当于前一天午夜的 DateTime 对象。例如,如果此操作在 3 月 4 日 11:00 执行,则日期时间对象将表示 3 月 3 日 00:00。这与 Airflow 处理调度的方式有关。如需了解详情,请点击此处。
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
每当创建新 DAG 时,都应以字典的形式提供 default_dag_args
变量:
'email_on_failure'
- 指示是否应在任务失败时发送电子邮件提醒'email_on_retry'
- 指示在重试任务时是否应发送电子邮件提醒'retries'
- 表示当 DAG 失败时 Airflow 应重试多少次'retry_delay'
- 表示 Airflow 在尝试重试之前应等待多长时间'project_id'
- 告知 DAG 要与哪个 GCP 项目 ID 相关联,稍后需要在 Dataproc 操作符中使用该 ID
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
使用 with models.DAG
会指示脚本将其下方的所有内容包含在同一 DAG 中。我们还看到传入了三个参数:
- 第一个是字符串,它是为我们要创建的 DAG 指定的名称。在本例中,我们将使用
composer_hadoop_tutorial
。 schedule_interval
- 一个datetime.timedelta
对象,我们在此处将其设置为一天。这意味着,此 DAG 会在先前在'default_dag_args'
中设置的'start_date'
之后尝试每天执行一次default_args
- 我们之前创建的字典,其中包含 DAG 的默认参数
创建 Dataproc 集群
接下来,我们将创建一个 dataproc_operator.DataprocClusterCreateOperator
,用于创建 Cloud Dataproc 集群。
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
在此运算符中,我们会看到几个参数,除第一个参数外,所有参数都专用于此运算符:
task_id
- 与 BashOperator 中一样,这是我们为操作器分配的名称,可在 Airflow 界面中查看cluster_name
- 我们为 Cloud Dataproc 集群分配的名称。在这里,我们将其命名为composer-hadoop-tutorial-cluster-{{ ds_nodash }}
(如需了解可选的其他信息,请参阅信息框)num_workers
- 我们分配给 Cloud Dataproc 集群的工作器数量zone
- 我们希望集群所在的地理区域,保存在 Airflow 数据库中。这将读取我们在第 3 步中设置的'gce_zone'
变量master_machine_type
- 我们要分配给 Cloud Dataproc 主实例的机器类型worker_machine_type
- 我们要分配给 Cloud Dataproc 工作器的机器类型
提交 Apache Hadoop 作业
dataproc_operator.DataProcHadoopOperator
允许我们将作业提交到 Cloud Dataproc 集群。
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
我们提供了多个参数:
task_id
- 我们为该部分 DAG 指定的名称main_jar
- 我们希望针对集群运行的 .jar 文件的位置cluster_name
- 运行作业的集群的名称,您会注意到,它与我们在前一个运算符中找到的名称相同arguments
- 传入 jar 文件中的参数,就像从命令行执行 .jar 文件时一样
删除集群
我们要创建的最后一个运算符是 dataproc_operator.DataprocClusterDeleteOperator
。
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
顾名思义,此运算符将删除给定的 Cloud Dataproc 集群。我们在这里看到了三个参数:
task_id
- 与在 BashOperator 中一样,这是我们分配给操作器的名称,可在 Airflow 界面中查看cluster_name
- 我们为 Cloud Dataproc 集群分配的名称。在这里,我们将其命名为composer-hadoop-tutorial-cluster-{{ ds_nodash }}
(如需了解可选的其他信息,请参阅“创建 Dataproc 集群”后的信息框)trigger_rule
- 我们在本步骤开头的导入过程中曾简要提及触发器规则,但在这里,我们将实际演示一个触发器规则。默认情况下,除非其所有上游运算符都已成功完成,否则 Airflow 运算符不会执行。ALL_DONE
触发器规则仅要求所有上游运营商均已完成操作,无论其是否成功。这意味着即使 Hadoop 作业失败,我们仍然需要清理集群。
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
最后,我们希望这些运算符按特定顺序执行,我们可以使用 Python 位移运算符来表示这一点。在这种情况下,create_dataproc_cluster
始终会先执行,然后是 run_dataproc_hadoop
,最后是 delete_dataproc_cluster
。
将所有内容整合起来,代码如下所示:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. 将 Airflow 文件上传到 Cloud Storage
将 DAG 复制到 /dags 文件夹
- 首先,打开 Cloud Shell,其中已为您方便地安装了 Cloud SDK。
- 克隆 Python 示例代码库并切换到 composer/workflows 目录
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- 运行以下命令,将 DAG 文件夹的名称设置为环境变量
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- 运行以下
gsutil
命令,将教程代码复制到创建 /dags 文件夹的位置
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
输出将如下所示:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. 使用 Airflow 界面
如需使用 GCP 控制台访问 Airflow 网页界面,请执行以下操作:
|
如需了解 Airflow 界面,请参阅访问网页界面。
查看变量
您之前设置的变量会保留在您的环境中。您可以在 Airflow 界面菜单栏中依次选择 Admin > Variables,以查看这些变量。
探索 DAG 运行
当您将 DAG 文件上传到 Cloud Storage 中的 dags
文件夹时,Cloud Composer 会解析该文件。如果未找到错误,工作流的名称会显示在 DAG 列表中,并且工作流会排入队列以立即运行。如需查看您的 DAG,请点击页面顶部的 DAGs。
点击 composer_hadoop_tutorial
以打开 DAG 详情页面。此页面包含工作流任务和依赖项的图形表示。
现在,在工具栏中点击 Graph View,然后将鼠标悬停在每项任务的图形上以查看其状态。请注意,每个任务的边框也指示状态(绿色边框表示任务正在运行中;红色边框表示任务失败,等等)。
如需从 Graph View 重新运行工作流,请执行以下操作:
- 在 Airflow 界面的“Graph View”中,点击
create_dataproc_cluster
图形。 - 点击 Clear 重置三个任务,然后点击 OK 进行确认。
您还可以转到以下 GCP 控制台页面来检查 composer-hadoop-tutorial
工作流的状态和结果:
- Cloud Dataproc 集群:用于监控集群的创建和删除。请注意,由工作流创建的集群是临时性的,也就是说,此类集群仅在工作流的持续期间内存在,并且将在最后一个工作流任务的执行过程中删除。
- Cloud Dataproc 作业:用于查看或监控 Apache Hadoop WordCount 作业。点击“作业 ID”即可查看作业日志输出。
- Cloud Storage 浏览器,可在您为本 Codelab 创建的 Cloud Storage 存储分区的
wordcount
文件夹中查看 WordCount 的结果。
7. 清理
为避免系统因此 Codelab 中使用的资源向您的 GCP 账号收取费用,请执行以下操作:
- (可选)如需保存数据,请从 Cloud Composer 环境的 Cloud Storage 存储桶以及您为本 Codelab 创建的存储桶中下载数据。
- 删除您为此 Codelab 创建的 Cloud Storage 存储分区。
- 删除环境的 Cloud Storage 存储分区。
- 删除 Cloud Composer 环境。请注意,删除环境并不会删除环境的存储分区。
您也可以选择删除项目:
- 在 GCP Console 中,转到项目页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在框中输入项目 ID,然后点击关停以删除项目。