Dataproc Serverless

1. 概览 - Google Dataproc

Dataproc 是一项具有高度可扩容性的全托管式服务,用于运行 Apache Spark、Apache Flink、Presto 和众多其他开源工具和框架。使用 Dataproc 可以大规模实现数据湖现代化改造、ETL / ELT 和安全数据科学。Dataproc 还与多种 Google Cloud 服务完全集成,包括 BigQueryCloud StorageVertex AIDataplex

Dataproc 有三种版本:

  • Dataproc Serverless:可让您运行 PySpark 作业,而无需配置基础架构和自动扩缩。Dataproc Serverless 支持 PySpark 批量工作负载和会话 / 笔记本。
  • Dataproc on Google Compute Engine:可让您为基于 YARN 的 Spark 工作负载以及 Flink 和 Presto 等开源工具管理 Hadoop YARN 集群。您可以根据需要纵向或横向扩缩云端集群,包括自动扩缩。
  • Dataproc on Google Kubernetes Engine:可让您在 GKE 基础架构中配置 Dataproc 虚拟集群,以方便提交 Spark、PySpark、SparkR 或 Spark SQL 作业。

在此 Codelab 中,您将了解使用 Dataproc Serverless 的几种不同方式。

Apache Spark 最初是设计为在 Hadoop 集群上运行,并将 YARN 用作其资源管理器。维护 Hadoop 集群需要具备特定的专业知识,并确保集群上的许多不同旋钮都已正确配置。此外,Spark 还需要用户设置另一组旋钮。这会导致许多开发者花费更多时间来配置基础架构,而不是处理 Spark 代码本身。

Dataproc Serverless 无需手动配置 Hadoop 集群或 Spark。Dataproc Serverless 不是在 Hadoop 上运行,而是通过自己的 动态资源分配 来确定其资源要求,包括自动扩缩。虽然有一小部分 Spark 属性 仍支持通过 Dataproc Serverless 进行自定义,但在大多数情况下,您无需调整这些属性。

2. 设置

首先,您将配置在此 Codelab 中使用的环境和资源。

创建 Google Cloud 项目。您可以使用现有项目。

点击 Cloud 控制台 工具栏中的 Cloud Shell,打开 Cloud Shell。

ba0bb17945a73543.png

Cloud Shell 提供了一个可直接使用的 Shell 环境,您可以在此 Codelab 中使用该环境。

68c4ebd2a8539764.png

Cloud Shell 会默认设置您的项目名称。请运行 echo $GOOGLE_CLOUD_PROJECT 进行仔细检查。如果您在输出中没有看到项目 ID,请设置该 ID。

export GOOGLE_CLOUD_PROJECT=<your-project-id>

为您的资源设置 Compute Engine 区域,例如 us-central1europe-west2

export REGION=<your-region>

启用 API

此 Codelab 使用以下 API:

  • BigQuery
  • Dataproc

启用必要的 API。这大约需要一分钟时间,完成后会显示一条成功消息。

gcloud services enable bigquery.googleapis.com
gcloud services enable dataproc.googleapis.com

配置网络访问权限

Dataproc Serverless 要求在您运行 Spark 作业的区域中启用 Google 专用访问通道,因为 Spark 驱动程序和执行程序只有专用 IP。运行以下命令可在 default 子网中启用该功能。

gcloud compute networks subnets update default \
  --region=${REGION} \
  --enable-private-ip-google-access

您可以通过以下命令验证是否已启用 Google 专用访问通道,该命令会输出 TrueFalse

gcloud compute networks subnets describe default \
  --region=${REGION} \
  --format="get(privateIpGoogleAccess)"

创建存储分区

创建一个存储分区,用于存储在此 Codelab 中创建的资源。

为存储分区选择名称。存储分区名称必须在所有用户中保持 全局唯一

export BUCKET=<your-bucket-name>

在您打算运行 Spark 作业的区域中创建存储分区。

gsutil mb -l ${REGION} gs://${BUCKET}

您可以在 Cloud Storage 控制台中看到您的存储分区。您还可以运行 gsutil ls 来查看您的存储分区。

创建永久性历史记录服务器

Spark 界面提供了一组丰富的调试工具,并可深入了解 Spark 作业。如需查看已完成 Dataproc Serverless 作业的 Spark 界面,您必须创建一个单节点 Dataproc 集群,以用作 永久性历史记录服务器

为您的永久性历史记录服务器设置一个名称。

PHS_CLUSTER_NAME=my-phs

运行以下命令。

gcloud dataproc clusters create ${PHS_CLUSTER_NAME} \
    --region=${REGION} \
    --single-node \
    --enable-component-gateway \
    --properties=spark:spark.history.fs.logDirectory=gs://${BUCKET}/phs/*/spark-job-history

此 Codelab 后续部分会更详细地介绍 Spark 界面和永久性历史记录服务器。

3. 使用 Dataproc 批处理运行无服务器 Spark 作业

在此示例中,您将使用 纽约市 (NYC) 花旗单车行程公共数据集 中的一组数据。纽约市花旗单车是纽约市内的一款付费共享单车系统。您将执行一些简单的转换,并输出最热门的十个花旗单车站点 ID。值得注意的是,此示例还使用了开源 spark-bigquery-connector,以便在 Spark 和 BigQuery 之间无缝读取和写入数据。

克隆以下 GitHub 代码库,然后通过 cd 命令进入包含 citibike.py 文件的目录。

git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
cd devrel-demos/data-analytics/next-2022-workshop/dataproc-serverless

citibike.py

import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType

if len(sys.argv) == 1:
    print("Please provide a GCS bucket name.")

bucket = sys.argv[1]
table = "bigquery-public-data:new_york_citibike.citibike_trips"

spark = SparkSession.builder \
          .appName("pyspark-example") \
          .config("spark.jars","gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar") \
          .getOrCreate()

df = spark.read.format("bigquery").load(table)

top_ten = df.filter(col("start_station_id") \
            .isNotNull()) \
            .groupBy("start_station_id") \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(10) \
            .cache()

top_ten.show()

top_ten.write.option("header", True).csv(f"gs://{bucket}/citibikes_top_ten_start_station_ids")

使用 Cloud SDK 将作业提交到 Serverless Spark,Cloud Shell 中默认提供 Cloud SDK。在 Shell 中运行以下命令,该命令会使用 Cloud SDK 和 Dataproc Batches API 提交 Serverless Spark 作业。

gcloud dataproc batches submit pyspark citibike.py \
  --batch=citibike-job \
  --region=${REGION} \
  --deps-bucket=gs://${BUCKET} \
  --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar \
--history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER_NAME} \
  -- ${BUCKET}

具体说明如下:

  • gcloud dataproc batches submit 引用 Dataproc Batches API
  • pyspark 表示您要提交 PySpark 作业。
  • --batch 是作业的名称。如果未提供,系统会使用随机生成的 UUID。
  • --region=${REGION} 是作业将在其中处理的地理区域。
  • --deps-bucket=${BUCKET} 是在 Serverless 环境中运行之前,本地 Python 文件上传到的位置。
  • --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar 在 Spark 运行时环境中包含 spark-bigquery-connector 的 jar。
  • --history-server-cluster=projects/${GOOGLE_CLOUD_PROJECT}/regions/${REGION}/clusters/${PHS_CLUSTER} 是永久性历史记录服务器的完全限定名称。Spark 事件数据(与控制台输出分开存放)存储在此处,并且可以通过 Spark 界面查看。
  • 结尾的 -- 表示此项之后的所有内容都将是程序的运行时实参。在本例中,您将提交作业所需的存储分区名称。

提交批量工作负载后,您会看到以下输出内容。

Batch [citibike-job] submitted.

几分钟后,您会看到以下输出内容以及作业的元数据。

+----------------+------+
|start_station_id| count|
+----------------+------+
|             519|551078|
|             497|423334|
|             435|403795|
|             426|384116|
|             293|372255|
|             402|367194|
|             285|344546|
|             490|330378|
|             151|318700|
|             477|311403|
+----------------+------+

Batch [citibike-job] finished.

在下一部分中,您将了解如何找到此作业的日志。

其他功能

借助 Spark Serverless,您可以使用其他选项来运行作业。

  • 您可以创建一个 自定义 Docker 映像,以便作业在该映像上运行。这是添加额外依赖项(包括 Python 和 R 库)的好方法。
  • 您可以将 Dataproc Metastore 实例连接到作业,以访问 Hive 元数据。
  • 为提供更多控制,Dataproc Serverless 还支持对一小部分 Spark 属性进行配置。

4. Dataproc 指标和可观测性

Dataproc 批处理控制台列出了您的所有 Dataproc Serverless 作业。在控制台中,您会看到每个作业的批次 ID、位置、状态创建时间、所用时间类型。点击作业的批次 ID 即可查看有关该作业的更多信息。

在此页面上,您会看到监控 等信息,其中显示了作业在一段时间内使用的批量 Spark 执行器 的数量(表示自动扩缩的程度)。

详情 标签页中,您会看到有关作业的更多元数据,包括随作业提交的任何实参和形参。

您还可以从此页面访问所有日志。运行 Dataproc Serverless 作业时,系统会生成三组不同的日志:

  • 服务级
  • 控制台输出
  • Spark 事件日志记录

服务级 日志包括 Dataproc Serverless 服务生成的日志。其中包括 Dataproc Serverless 请求额外 CPU 以进行自动扩缩等内容。您可以点击查看日志来查看这些日志,系统会打开Cloud Logging

您可以在输出 下查看控制台输出 。这是作业生成的输出,包括 Spark 在开始作业时输出的元数据或作业中包含的任何输出语句。

您可以通过 Spark 界面访问 Spark 事件日志记录 。由于您为 Spark 作业提供了永久性历史记录服务器,因此您可以点击查看 Spark 历史记录服务器来访问 Spark 界面,其中包含之前运行的 Spark 作业的信息。您可以从官方 Spark 文档中详细了解 Spark 界面。

5. Dataproc 模板:BQ -> GCS

Dataproc 模板是开源工具,有助于进一步简化云端数据处理任务。这些模板充当 Dataproc 无服务器的封装容器,并包含许多数据导入和导出任务的模板,包括:

  • BigQuerytoGCSGCStoBigQuery
  • GCStoBigTable
  • GCStoJDBCJDBCtoGCS
  • HivetoBigQuery
  • MongotoGCSGCStoMongo

如需查看完整列表,请参阅 README

在本部分中,您将使用 Dataproc 模板将数据从 BigQuery 导出到 GCS

克隆代码库

克隆代码库并切换到 python 文件夹。

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python

配置环境

现在,您将设置环境变量。Dataproc 模板会使用您的项目 ID 的 GCP_PROJECT 环境变量,因此请将此变量设置为 GOOGLE_CLOUD_PROJECT.

export GCP_PROJECT=${GOOGLE_CLOUD_PROJECT}

您的 区域 应已在之前的环境中设置。如果未设置,请在此处进行设置。

export REGION=<region>

Dataproc 模板会使用 spark-bigquery-conector 来处理 BigQuery 作业,并且需要将 URI 包含在环境变量 JARS 中。设置 JARS 变量。

export JARS="gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.26.0.jar"

配置模板参数

设置供服务使用的暂存存储分区的名称。

export GCS_STAGING_LOCATION=gs://${BUCKET}

接下来,您将设置一些特定于作业的变量。对于输入表,您将再次引用 BigQuery NYC Citibike 数据集。

BIGQUERY_GCS_INPUT_TABLE=bigquery-public-data.new_york_citibike.citibike_trips

您可以选择 csvparquetavrojson。对于此 Codelab,请选择 CSV - 在下一部分中,您将了解如何使用 Dataproc 模板转换文件类型。

BIGQUERY_GCS_OUTPUT_FORMAT=csv

将输出模式设置为 overwrite。您可以选择 overwriteappendignoreerrorifexists.

BIGQUERY_GCS_OUTPUT_MODE=overwrite

将 GCS 输出位置设置为存储分区中的路径。

BIGQUERY_GCS_OUTPUT_LOCATION=gs://${BUCKET}/BQtoGCS

运行模板

运行 BIGQUERYTOGCS 模板,方法是指定以下模板并提供您设置的输入参数。

./bin/start.sh \
-- --template=BIGQUERYTOGCS \
        --bigquery.gcs.input.table=${BIGQUERY_GCS_INPUT_TABLE} \
        --bigquery.gcs.output.format=${BIGQUERY_GCS_OUTPUT_FORMAT} \
        --bigquery.gcs.output.mode=${BIGQUERY_GCS_OUTPUT_MODE} \
        --bigquery.gcs.output.location=${BIGQUERY_GCS_OUTPUT_LOCATION}

输出内容会比较杂乱,但大约一分钟后,您会看到以下内容。

Batch [5766411d6c78444cb5e80f305308d8f8] submitted.
...
Batch [5766411d6c78444cb5e80f305308d8f8] finished.

您可以运行以下命令来验证是否已生成文件。

gsutil ls ${BIGQUERY_GCS_OUTPUT_LOCATION}

Spark 默认会写入多个文件,具体取决于数据量。在本例中,您会看到大约生成了 30 个文件。Spark 输出文件名带有 part- 前缀,后跟五位数字(表示部件号)以及一个哈希字符串。对于大量数据,Spark 通常会写入多个文件。示例文件名为 part-00000-cbf69737-867d-41cc-8a33-6521a725f7a0-c000.csv

6. Dataproc 模板:CSV 到 Parquet

现在,您将使用 Dataproc 模板,通过 GCSTOGCS 将 GCS 中的数据从一种文件类型转换为另一种文件类型。此模板使用 SparkSQL,并提供了一个选项,用于提交 SparkSQL 查询以在转换期间进行处理,从而进行额外的处理。

确认环境变量

确认 GCP_PROJECTREGIONGCS_STAGING_BUCKET 已在上一部分中设置。

echo ${GCP_PROJECT}
echo ${REGION}
echo ${GCS_STAGING_LOCATION}

设置模板参数

现在,您将为 GCStoGCS 设置配置参数。首先,设置输入文件的位置。请注意,这是一个目录,而不是特定文件,因为目录中的所有文件都将进行处理。将其设置为 BIGQUERY_GCS_OUTPUT_LOCATION

GCS_TO_GCS_INPUT_LOCATION=${BIGQUERY_GCS_OUTPUT_LOCATION}

设置输入文件的格式。

GCS_TO_GCS_INPUT_FORMAT=csv

设置所需的输出格式。您可以选择 Parquet、JSON、Avro 或 CSV。

GCS_TO_GCS_OUTPUT_FORMAT=parquet

将输出模式设置为 overwrite。您可以选择 overwriteappendignoreerrorifexists.

GCS_TO_GCS_OUTPUT_MODE=overwrite

设置输出位置。

GCS_TO_GCS_OUTPUT_LOCATION=gs://${BUCKET}/GCStoGCS

运行模板

运行 GCStoGCS 模板。

./bin/start.sh \
-- --template=GCSTOGCS \
        --gcs.to.gcs.input.location=${GCS_TO_GCS_INPUT_LOCATION} \
        --gcs.to.gcs.input.format=${GCS_TO_GCS_INPUT_FORMAT} \
        --gcs.to.gcs.output.format=${GCS_TO_GCS_OUTPUT_FORMAT} \
        --gcs.to.gcs.output.mode=${GCS_TO_GCS_OUTPUT_MODE} \
        --gcs.to.gcs.output.location=${GCS_TO_GCS_OUTPUT_LOCATION}

输出内容会比较杂乱,但大约一分钟后,您应该会看到类似如下的成功消息。

Batch [c198787ba8e94abc87e2a0778c05ec8a] submitted.
...
Batch [c198787ba8e94abc87e2a0778c05ec8a] finished.

您可以运行以下命令来验证是否已生成文件。

gsutil ls ${GCS_TO_GCS_OUTPUT_LOCATION}

借助此模板,您还可以通过将 gcs.to.gcs.temp.view.namegcs.to.gcs.sql.query 传递给模板来提供 SparkSQL 查询,从而在写入 GCS 之前对数据运行 SparkSQL 查询。

7. 清理资源

为避免在此 Codelab 完成后向您的 GCP 帐号收取不必要的费用,请执行以下操作:

  1. 删除您创建的环境的 Cloud Storage 存储分区
gsutil rm -r gs://${BUCKET}
  1. 删除用于永久性历史记录服务器的 Dataproc 集群
gcloud dataproc clusters delete ${PHS_CLUSTER_NAME} \
  --region=${REGION}
  1. 删除 Dataproc Serverless 作业。前往 批处理控制台,点击要删除的每个作业旁边的复选框,然后点击 删除

如果您专门为此 Codelab 创建了一个项目,您还可以选择删除该项目:

  1. 在 GCP 控制台中,转到 项目 页面。
  2. 在项目列表中,选择要删除的项目,然后点击“删除”。
  3. 在框中输入项目 ID,然后点击“关停”以删除项目。

8. 后续步骤

以下资源提供了更多利用 Serverless Spark 的方法: