并行运行 BigQuery 作业和 Workflows

1. 简介

1c05e3d0c2bd2b45.png 74be7b376d45258a.png

Workflows 是一项全托管式编排服务,可按您定义的顺序执行 Google Cloud 服务或外部服务。

BigQuery 是一种全托管式企业数据仓库,可帮助您使用机器学习、地理空间分析和商业智能等内置功能管理和分析 TB 级数据。

在此 Codelab 中,您将针对公开的 Wikipedia 数据集运行一些 BigQuery 查询。然后,您将了解如何在 Workflows 编排中以串行方式连续运行多个 BigQuery 查询。最后,您将使用 Workflows 的并行迭代功能并行化查询,从而将速度提高多达 5 倍。

学习内容

  • 如何针对维基百科数据集运行 BigQuery 查询。
  • 如何以串行方式运行多个查询作为工作流编排的一部分。
  • 如何使用 Workflows 并行迭代功能并行化查询,从而将速度提升高达 5 倍。

2. 设置和要求

自定进度的环境设置

  1. 登录 Google Cloud 控制台,然后创建一个新项目或重复使用现有项目。如果您还没有 Gmail 或 Google Workspace 账号,则必须创建一个

b35bf95b8bf3d5d8.png

a99b7ace416376c4.png

bd84a6d3004737c5.png

  • 项目名称是此项目参与者的显示名称。它是 Google API 尚未使用的字符串。您可以随时更新。
  • 项目 ID 在所有 Google Cloud 项目中必须是唯一的,并且不可变(一经设置便无法更改)。Cloud 控制台会自动生成一个唯一字符串;通常情况下,您无需关注该字符串。在大多数 Codelab 中,您都需要引用项目 ID(通常用 PROJECT_ID 标识)。如果您不喜欢生成的 ID,可以再随机生成一个 ID。或者,您也可以尝试自己的项目 ID,看看是否可用。完成此步骤后便无法更改该 ID,并且此 ID 在项目期间会一直保留。
  • 此外,还有第三个值,即部分 API 使用的项目编号,供您参考。如需详细了解所有这三个值,请参阅文档
  1. 接下来,您需要在 Cloud 控制台中启用结算功能,以便使用 Cloud 资源/API。运行此 Codelab 应该不会产生太多的费用(如果有费用的话)。若要关闭资源以避免产生超出本教程范围的结算费用,您可以删除自己创建的资源或删除整个项目。Google Cloud 的新用户符合参与 $300 USD 免费试用计划的条件。

启动 Cloud Shell

虽然可以通过笔记本电脑对 Google Cloud 进行远程操作,但在此 Codelab 中,您将使用 Google Cloud Shell,这是一个在云端运行的命令行环境。

Google Cloud 控制台 中,点击右上角工具栏中的 Cloud Shell 图标:

55efc1aaa7a4d3ad.png

预配和连接到环境应该只需要片刻时间。完成后,您应该会看到如下内容:

7ffe5cbb04455448.png

这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5 GB 主目录,并且在 Google Cloud 中运行,大大增强了网络性能和身份验证功能。您在此 Codelab 中的所有工作都可以在浏览器中完成。您无需安装任何程序。

3. 探索 Wikipedia 数据集

首先,探索 BigQuery 中的 Wikipedia 数据集。

前往 Google Cloud 控制台的 BigQuery 部分

ea75ab12a7c012a4.png

bigquery-samples 下,您应该会看到各种公共数据集,包括一些与 Wikipedia 相关的数据集:

c9484e305b8e1438.png

wikipedia_pageviews 数据集下,您可以看到不同年份的网页浏览量对应的各种表格:

c540a4162640cbb3.png

您可以选择其中一个表格(例如201207),然后预览数据:

b5b2a334cd6f63c0.png

您还可以针对该表运行查询。例如,以下查询会选择观看次数最多的前 100 个影视内容:

SELECT TITLE, SUM(views)
FROM bigquery-samples.wikipedia_pageviews.201207h
GROUP BY TITLE
ORDER BY SUM(VIEWS) DESC
LIMIT 100

运行查询后,大约需要 20 秒才能加载数据:

1df3877aed1653b4.png

4. 定义用于运行多个查询的工作流

针对单个表运行查询非常简单。不过,针对多个表运行多个查询并整理结果可能会非常繁琐。为了帮助您完成此任务,Workflows 提供了迭代语法!

在 Cloud Shell 中,创建一个 workflow-serial.yaml 文件,以构建一个针对多个表运行多个查询的工作流:

touch workflow-serial.yaml

然后,您可以使用 Cloud Shell 中的编辑器修改该文件:

33bf9325b078ad8.png

workflow-serial.yaml 文件中,在第一个 init 步骤中,创建一个 results 映射,用于跟踪以表名为键的每次迭代。此外,还要定义一个 tables 数组,其中包含您要针对其运行查询的表列表。在本例中,我们选择 5 个表格:

main:
    steps:
    - init:
        assign:
            - results : {}
            - tables:
                - 201201h
                - 201202h
                - 201203h
                - 201204h
                - 201205h

接下来,定义 runQueries 步。此步骤会遍历每个表,并使用工作流的 BigQuery 连接器运行查询,以查找每个表中网页浏览次数最多的前 100 个标题。然后,它会将结果映射中每个表中的热门影视内容和观看次数保存下来:

    - runQueries:
        for:
            value: table
            in: ${tables}
            steps:
            - runQuery:
                call: googleapis.bigquery.v2.jobs.query
                args:
                    projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    body:
                        useLegacySql: false
                        useQueryCache: false
                        timeoutMs: 30000
                        # Find the top 100 titles with most views on Wikipedia
                        query: ${
                            "SELECT TITLE, SUM(views)
                            FROM `bigquery-samples.wikipedia_pageviews." + table + "`
                            WHERE LENGTH(TITLE) > 10
                            GROUP BY TITLE
                            ORDER BY SUM(VIEWS) DESC
                            LIMIT 100"
                            }
                result: queryResult
            - returnResult:
                assign:
                    # Return the top title from each table
                    - results[table]: {}
                    - results[table].title: ${queryResult.rows[0].f[0].v}
                    - results[table].views: ${queryResult.rows[0].f[1].v}

在最后一步中,返回 results 映射:

    - returnResults:
        return: ${results}

5. 使用工作流运行多个查询

在部署和运行工作流之前,您需要确保已启用 Workflows API。您可以在 Google Cloud 控制台中启用它,也可以在 Cloud Shell 中使用 gcloud 启用它:

gcloud services enable workflows.googleapis.com

为 Workflows 创建服务账号:

SERVICE_ACCOUNT=workflows-bigquery-sa
gcloud iam service-accounts create $SERVICE_ACCOUNT \
  --display-name="Workflows BigQuery service account"

确保服务账号具有记录和运行 BigQuery 作业的角色:

PROJECT_ID=your-project-id
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --role roles/logging.logWriter \
  --role roles/bigquery.jobUser \
  --member serviceAccount:$SERVICE_ACCOUNT@$PROJECT_ID.iam.gserviceaccount.com

使用服务账号部署工作流:

gcloud workflows deploy bigquery-serial \
    --source=workflow-serial.yaml \
    --service-account=$SERVICE_ACCOUNT@$PROJECT_ID.iam.gserviceaccount.com

最后,您就可以运行工作流了。

在 Cloud 控制台的“Workflows”部分下找到 bigquery-serial 工作流,然后点击 Execute 按钮:

b6afa4747680334f.png

或者,您也可以在 Cloud Shell 中使用 gcloud 运行工作流:

gcloud workflows run bigquery-serial

您应该会看到工作流执行大约 1 分钟(每个表 20 秒)。

最后,您将看到每个表格的输出,其中包含热门影视内容和观看次数:

304d11a5bffdada4.png

baf31533d3671c9e.png

6. 通过并行步骤并行处理多个查询

上一步中的工作流大约耗时 1 分钟,因为它运行了 5 个查询,每个查询耗时 20 秒。由于这些查询是独立的,因此您实际上可以使用工作流的并行迭代功能并行运行它们。

workflow-serial.yaml 文件复制到新的 workflow-parallel.yaml 文件中。在新文件中,您将进行一些更改,以将串行步骤转换为并行步骤。

workflow-parallel.yaml 文件中,更改 runQueries 步骤。首先,添加 parallel 个关键字。这样,for 循环的每次迭代都可以并行运行。其次,将 results 变量声明为 shared 变量。这样一来,分支就可以写入该变量。我们将每个结果附加到此变量。

- runQueries:
    parallel:
        shared: [results]
        for:
            value: table
            in: ${tables}

部署并行工作流:

gcloud workflows deploy bigquery-parallel \
    --source=workflow-parallel.yaml \
    --service-account=$SERVICE_ACCOUNT@$PROJECT_ID.iam.gserviceaccount.com

运行工作流:

gcloud workflows run bigquery-parallel

您应该会看到工作流执行持续约 20 秒。这是因为所有 5 个查询都在并行运行。只需更改几行代码,即可将速度提升高达 5 倍!

最终,您会看到每个表都输出了热门影视内容和观看次数,但执行时间要短得多:

1825d49ef225c828.png

7. 恭喜

恭喜,您已完成此 Codelab!如需了解详情,请参阅有关并行步骤的工作流文档

所学内容

  • 如何针对维基百科数据集运行 BigQuery 查询。
  • 如何以串行方式运行多个查询作为工作流编排的一部分。
  • 如何使用 Workflows 并行迭代功能并行化查询,从而将速度提升高达 5 倍。