使用 CSV 从 Databricks 到 Spanner 的反向 ETL

1. 使用 GCS 和 Dataflow 构建从 Databricks 到 Spanner 的反向 ETL 流水线

简介

在此 Codelab 中,您将使用存储在 Google Cloud Storage 中的 CSV 文件,构建从 Databricks 到 Spanner 的反向 ETL 流水线。传统上,ETL(提取、转换、加载)流水线会将数据从运营数据库移到 Databricks 等数据仓库中进行分析。 反向 ETL 流水线则相反:它会将经过整理和处理的数据从数据仓库移回运营系统,以便为应用提供支持、为面向用户的功能提供服务,或用于实时决策。

目标是将示例数据集从 Databricks 表移到 Spanner(一种全球分布的关系型数据库,非常适合高可用性应用)。

为此,我们将使用 Google Cloud Storage (GCS) 和 Dataflow 作为中间步骤。以下是数据传输的细分以及此架构背后的原因:

  1. Databricks 到 Google Cloud Storage (GCS)(采用 CSV 格式)
  • 第一步是以开放的通用格式将数据从 Databricks 中提取出来。导出为 CSV 是一种常见且简单的方法,用于创建可移植的数据文件。这些文件将暂存在 GCS 中,GCS 提供可扩缩且持久的对象存储解决方案。
  1. GCS 到 Spanner(通过 Dataflow)
  • 我们没有编写自定义脚本来从 GCS 读取数据并写入 Spanner,而是使用了 Google Dataflow(一种全托管式数据处理服务)。Dataflow 提供了专门针对此类任务的预构建模板。使用“GCS Text to Cloud Spanner”模板可以实现高吞吐量、并行化的数据导入,而无需编写任何数据处理代码,从而节省大量开发时间。

学习内容

  • 如何将数据加载到 Databricks 中
  • 如何创建 GCS 存储分区
  • 如何以 CSV 格式将 Databricks 表导出到 GCS
  • 如何设置 Spanner 实例
  • 如何使用 Dataflow 将 CSV 表加载到 Spanner

2. 设置、要求和限制

前提条件

  • 一个 Databricks 账号,具有创建集群和安装库的权限。免费试用账号不适用于此实验。
  • 一个 Google Cloud 账号,启用了 Spanner、Cloud Storage 和 Dataflow API。
  • 通过 Web 浏览器访问 Google Cloud 控制台。
  • 一个安装了 Google Cloud CLI 的终端。
  • 如果您的 Google Cloud 组织启用了 iam.allowedPolicyMemberDomains 政策,管理员可能需要授予例外权限,以允许来自外部网域的服务账号。如果适用,我们将在后续步骤中介绍这一点。

Google Cloud Platform IAM 权限

Google 账号需要拥有以下权限,才能执行此 Codelab 中的所有步骤。

服务账号

iam.serviceAccountKeys.create

允许创建服务账号。

Spanner

spanner.instances.create

允许创建新的 Spanner 实例。

spanner.databases.create

允许运行 DDL 语句来创建

spanner.databases.updateDdl

允许运行 DDL 语句来在数据库中创建表。

Google Cloud Storage

storage.buckets.create

允许创建新的 GCS 存储分区来存储导出的 Parquet 文件。

storage.objects.create

允许将导出的 Parquet 文件写入 GCS 存储分区。

storage.objects.get

允许 BigQuery 从 GCS 存储分区读取 Parquet 文件。

storage.objects.list

允许 BigQuery 列出 GCS 存储分区中的 Parquet 文件。

Dataflow

Dataflow.workitems.lease

允许从 Dataflow 声明工作项。

Dataflow.workitems.sendMessage

允许 Dataflow 工作器将消息发回 Dataflow 服务。

Logging.logEntries.create

允许 Dataflow 工作器将日志条目写入 Google Cloud Logging。

为方便起见,您可以使用包含这些权限的预定义角色。

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

限制

在系统之间移动数据时,请务必注意数据类型差异。

  • Databricks 到 CSV: 导出时,Databricks 数据类型会转换为标准文本表示形式。
  • CSV 到 Spanner: 导入时,必须确保目标 Spanner 数据类型与 CSV 文件中的字符串表示形式兼容。此实验将引导您完成一组常见的类型映射。

设置可重复使用的属性

在此实验中,您将需要重复使用一些值。为方便起见,我们将这些值设置为 shell 变量,以供日后使用。

  • GCP_REGION - GCP 资源所在的特定区域。如需查看区域列表,请点击此处
  • GCP_PROJECT - 要使用的 GCP 项目 ID。
  • GCP_BUCKET_NAME - 要创建的 GCS 存储分区名称,以及数据文件的存储位置。
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Databricks

对于此实验,我们将使用托管在 GCP 上的 Databricks 账号,以便在 GCS 中定义外部数据位置。

Google Cloud

此实验需要一个 Google Cloud 项目。

Google Cloud 项目

项目是 Google Cloud 中的基本组织单位。如果管理员已提供一个项目供您使用,则可以跳过此步骤。

您可以使用 CLI 创建项目,如下所示:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

如需详细了解如何创建和管理项目,请点击此处

设置 Spanner

如需开始使用 Spanner,您需要预配实例和数据库。如需了解有关配置和创建 Spanner 实例的详细信息,请点击此处

创建实例

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

创建数据库

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

3. 创建 Google Cloud Storage 存储分区

Google Cloud Storage (GCS) 将用于在将 CSV 数据文件导入 Spanner 之前暂时存储这些文件。

创建存储分区

使用以下命令在特定区域中创建存储分区。

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

验证存储分区创建

该命令成功后,列出所有存储分区以检查结果。新存储分区应显示在结果列表中。存储分区引用通常会在存储分区名称前显示 gs:// 前缀。

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

测试写入权限

此步骤可确保本地环境已正确通过身份验证,并且拥有将文件写入新创建的存储分区所需的权限。

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

验证上传的文件

列出存储分区中的对象。刚刚上传的文件的完整路径应显示出来。

gcloud storage ls gs://$GCS_BUCKET_NAME

您应该会看到以下输出内容:

gs://$GCS_BUCKET_NAME/hello.txt

如需查看存储分区中对象的内容,可以使用 gcloud storage cat

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

文件内容应可见:

Hello, GCS

清理测试文件

Cloud Storage 存储分区现已设置完毕。现在可以删除临时测试文件。

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

输出应确认删除:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

4. 从 Databricks 导出到 GCS

现在,我们将配置 Databricks 环境以安全地连接到 GCS 并导出数据。

创建凭据

  1. 在左侧菜单中,点击目录
  2. 如果目录页面顶部提供了外部数据 ,请点击该选项。否则,请点击连接 下拉菜单,然后点击凭据
  3. 如果您尚未进入凭据 标签页,请切换到该标签页。
  4. 点击创建凭据
  5. 凭据类型 选择 GCP Service Account
  6. 凭据名称 输入 codelabs-retl-credentials
  7. 点击创建
  8. 从对话框中复制服务账号电子邮件地址,然后点击完成

将此服务账号设置为 shell 实例中的环境变量,以便重复使用:

export GCP_SERVICE_ACCOUNT=<Your service account>

向 Databricks 授予 GCS 权限

现在,必须向 Snowflake 服务账号授予写入 GCS 存储分区的权限。

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

创建外部位置

  1. 使用页面顶部的面包屑导航返回凭据 页面
  2. 切换到外部位置 标签页
  3. 点击创建外部位置
  4. 外部位置名称 设置为 codelabs-retl-gcs
  5. 存储类型 保留为 GCP
  6. 将存储分区路径设置为 网址
  7. 存储凭据 设置为 codelabs-retl-credentials
  8. 点击创建
  9. 在确认对话框中。点击创建

创建目录和架构

  1. 在左侧菜单中,点击目录
  2. 依次点击创建创建目录
  3. 目录名称 设置为 retl_tpch_project
  4. 类型 设置为 Standard
  5. 选择 codelabs-retl-gcs 作为外部位置
  6. 点击创建
  7. 目录 列表中点击 retl_tpch_project
  8. 点击创建架构
  9. 架构名称 设置为 tpch_data
  10. 选择存储位置codelabs-retl-gcs
  11. 点击创建

以 CSV 格式导出数据

现在,数据已准备好导出。我们将使用示例 TPC-H 数据集来定义新表,该表将以 CSV 格式外部存储。

首先,将示例数据复制到工作区中的新表中。为此,需要从查询运行 SQL 代码。

  1. 在左侧边菜单中的 SQL 下,点击 查询
  2. 点击创建查询 按钮
  3. 运行 按钮旁边,将工作区 设置为 retl_tpch_project
CREATE TABLE retl_tpch_project.tpch_data.regional_sales_csv
USING CSV
LOCATION 'gs://<Your bucket name>/regional_sales_csv'
OPTIONS (
  header "false",
  delimiter ","
)
AS
SELECT
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM samples.tpch.orders AS o
INNER JOIN samples.tpch.customer AS c
    ON o.o_custkey = c.c_custkey
INNER JOIN samples.tpch.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 1, 2, 3, 4;

验证 GCS 中的数据

检查 GCS 存储分区,查看 Databricks 创建的文件。

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

您应该会看到一个或多个 .csv 文件,以及 _SUCCESS 和日志文件。

5. 使用 Dataflow 将数据加载到 Spanner

我们将使用 Google 提供的 Dataflow 模板将 CSV 数据从 GCS 导入到 Spanner。

创建 Spanner 表

首先,在 Spanner 中创建目标表。架构需要与 CSV 文件中的数据兼容。

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

创建 Dataflow 清单

Dataflow 模板需要一个“清单”文件。这是一个 JSON 文件,用于告知模板在哪里查找源数据文件以及将这些文件加载到哪个 Spanner 表中。

定义新的 regional_sales_manifest.json 并将其上传到 GCS 存储分区:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

启用 Dataflow API

在使用 Dataflow 之前,需要先启用它。请使用以下命令执行此操作:

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

创建并运行 Dataflow 作业

导入作业现已准备好运行。此命令使用 GCS_Text_to_Cloud_Spanner 模板启动 Dataflow 作业。

该命令很长,并且有多个参数。以下是细分维度:

  • --gcs-location:GCS 上预构建模板的路径。
  • --region:Dataflow 作业将在其中运行的区域。
  • --parameters:特定于模板的键值对列表:
  • instanceIddatabaseId:目标 Spanner 实例和数据库。
  • importManifest:刚刚创建的清单文件的 GCS 路径。
gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

您可以使用以下命令检查 Dataflow 作业的状态:

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

该作业大约需要 5 分钟才能完成。

验证 Spanner 中的数据

Dataflow 作业成功后,验证数据是否已加载到 Spanner 中。

首先,检查行数,应为 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

接下来,查询几行以检查数据。

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

来自 Databricks 表的导入数据应可见。

6. 清理

清理 Spanner

删除 Spanner 数据库和实例

gcloud spanner instances delete $SPANNER_INSTANCE

清理 GCS

删除为托管数据而创建的 GCS 存储分区

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

清理 Databricks

删除目录/架构/表

  1. 登录 Databricks 实例
  2. 点击左侧菜单中的 20bae9c2c9097306.png
  3. 从目录列表中选择之前创建的 retl_tpch_project

fc566eb3fddd7477.png

  1. 在架构列表中,选择已创建的 tpch_data
  2. 从表列表中选择之前创建的 regional_sales_csv
  3. 点击 df6dbe6356f141c6.png 展开表选项,然后选择 删除
  4. 在确认对话框中点击删除 以删除表
  5. 删除表后,您将返回到架构页面
  6. 点击 df6dbe6356f141c6.png 展开架构选项,然后选择 删除
  7. 在确认对话框中点击删除 以删除架构
  8. 删除架构后,您将返回到目录页面
  9. 再次按照第 4-11 步删除 default 架构(如果存在)。
  10. 在目录页面中,点击 df6dbe6356f141c6.png 展开目录选项,然后选择 删除
  11. 在确认对话框中点击删除 以删除目录

删除外部数据位置 / 凭据

  1. 在“目录”屏幕中,点击 32d5a94ae444cd8e.png
  2. 如果您没有看到 External Data 选项,则可能会在 Connect 下拉菜单中找到 External Location
  3. 点击之前创建的 retl-gcs-location 外部数据位置
  4. 在外部位置页面中,点击 df6dbe6356f141c6.png 展开位置选项,然后选择 Delete
  5. 在确认对话框中点击删除 以删除外部位置
  6. 点击 e03562324c0ba85e.png
  7. 点击之前创建的 retl-gcs-credential
  8. 在凭据页面中,点击 df6dbe6356f141c6.png 展开凭据选项,然后选择 Delete
  9. 在确认对话框中点击删除 以删除凭据。

7. 恭喜

恭喜您完成此 Codelab。

所学内容

  • 如何将数据加载到 Databricks 中
  • 如何创建 GCS 存储分区
  • 如何以 CSV 格式将 Databricks 表导出到 GCS
  • 如何设置 Spanner 实例
  • 如何使用 Dataflow 将 CSV 表加载到 Spanner