1. 使用 Google Cloud Storage 和 Dataflow 构建从 Snowflake 到 Spanner 的反向 ETL 流水线
简介
在本实验中,我们将构建一条反向 ETL 流水线。传统上,ETL(提取、转换、加载)流水线会将数据从运营数据库移入 Snowflake 等数据仓库,以进行分析。 反向 ETL 流水线则相反:它会将经过整理和处理的数据从数据仓库移回运营系统,以便为应用提供支持、为面向用户的功能提供服务,或用于实时决策。
我们的目标是将示例数据集从 Snowflake 表移入 Spanner,Spanner 是一种全球分布的关系型数据库,非常适合高可用性应用。
为此,我们将使用 Google Cloud Storage (GCS) 和 Dataflow 作为中间步骤。下面详细介绍了此架构的流程和原因:
- Snowflake 到 Google Cloud Storage (GCS)(采用 CSV 格式):
- 第一步是以开放的通用格式将数据从 Snowflake 中提取出来。导出为 CSV 是一种常见且简单的方法,用于创建可移植的数据文件。我们将在 GCS 中暂存这些文件,GCS 提供可扩缩且持久的对象存储解决方案。
- GCS 到 Spanner(通过 Dataflow):
- 我们不会编写自定义脚本来从 GCS 读取数据并写入 Spanner,而是使用 Google Dataflow,这是一种全托管式数据处理服务。Dataflow 提供专门针对此类任务的预构建模板。使用“GCS Text to Cloud Spanner”模板可以实现高吞吐量、并行化的数据导入,而无需编写任何数据处理代码,从而节省大量开发时间。
学习内容
- 如何将数据加载到 Snowflake 中
- 如何创建 GCS 存储分区
- 如何以 CSV 格式将 Snowflake 表导出到 GCS
- 如何设置 Spanner 实例
- 如何使用 Dataflow 将 CSV 表加载到 Spanner
2. 设置、要求和限制
前提条件
- Snowflake 账号。
- 已启用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 账号。
- 通过 Web 浏览器访问 Google Cloud 控制台。
- 安装了 Google Cloud CLI 的终端。
- 如果您的 Google Cloud 组织启用了
iam.allowedPolicyMemberDomains政策,管理员可能需要授予例外权限,以允许来自外部网域的服务账号。我们将在后面的步骤中介绍此内容(如果适用)。
Google Cloud Platform IAM 权限
Google 账号需要拥有以下权限,才能执行此 Codelab 中的所有步骤。
服务账号 | ||
| 允许创建服务账号。 | |
Spanner | ||
| 允许创建新的 Spanner 实例。 | |
| 允许运行 DDL 语句来创建 | |
| 允许运行 DDL 语句来在数据库中创建表。 | |
Google Cloud Storage | ||
| 允许创建新的 GCS 存储分区来存储导出的 Parquet 文件。 | |
| 允许将导出的 Parquet 文件写入 GCS 存储分区。 | |
| 允许 BigQuery 从 GCS 存储分区读取 Parquet 文件。 | |
| 允许 BigQuery 列出 GCS 存储分区中的 Parquet 文件。 | |
Dataflow | ||
| 允许从 Dataflow 声明工作项。 | |
| 允许 Dataflow 工作器将消息发回 Dataflow 服务。 | |
| 允许 Dataflow 工作器将日志条目写入 Google Cloud Logging。 | |
为方便起见,您可以使用包含这些权限的预定义角色。
|
|
|
|
|
|
|
|
限制
在系统之间移动数据时,请务必注意数据类型差异。
- Snowflake 到 CSV: 导出时,Snowflake 数据类型会转换为标准文本表示形式。
- CSV 到 Spanner: 导入时,必须确保目标 Spanner 数据类型与 CSV 文件中的字符串表示形式兼容。本实验将引导您完成一组常见的类型映射。
设置可重复使用的属性
在本实验中,您将需要重复使用一些值。为方便起见,我们将这些值设置为 shell 变量,以供日后使用。
- GCP_REGION - GCP 资源将位于的特定区域。如需查看区域列表,请点击此处。
- GCP_PROJECT - 要使用的 GCP 项目 ID。
- GCP_BUCKET_NAME - 要创建的 GCS 存储分区名称,以及数据文件的存储位置。
- SPANNER_INSTANCE - 要分配给 Spanner 实例的名称
- SPANNER_DB - 要分配给 Spanner 实例内数据库的名称
export GCP_REGION = <GCP REGION HERE>
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>
Google Cloud
本实验需要一个 Google Cloud 项目。
Google Cloud 项目
项目是 Google Cloud 中的基本组织单位。如果管理员已提供一个项目供您使用,则可以跳过此步骤。
您可以使用 CLI 创建项目,如下所示:
gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT
如需详细了解如何创建和管理项目,请点击此处。
3. 设置 Spanner
如需开始使用 Spanner,您需要预配实例和数据库。如需详细了解如何配置和创建 Spanner 实例,请点击此处。
创建实例
gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE
创建数据库
gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE
4. 创建 Google Cloud Storage 存储分区
Google Cloud Storage (GCS) 将用于在将 CSV 数据文件导入 Spanner 之前临时存储这些文件。这些文件由 Snowflake 生成。
创建存储分区
使用以下命令在特定区域(例如 us-central1)中创建存储分区。
gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION
验证存储分区创建情况
该命令成功后,列出所有存储分区以检查结果。新存储分区应显示在结果列表中。存储分区引用通常会在存储分区名称前显示 gs:// 前缀。
gcloud storage ls | grep gs://$GCS_BUCKET_NAME
测试写入权限
此步骤可确保本地环境已正确通过身份验证,并且拥有将文件写入新创建的存储分区所需的权限。
echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt
验证上传的文件
列出存储分区中的对象。系统应显示刚刚上传的文件的完整路径。
gcloud storage ls gs://$GCS_BUCKET_NAME
您应该会看到以下输出内容:
gs://$GCS_BUCKET_NAME/hello.txt
如需查看存储分区中对象的内容,可以使用 gcloud storage cat。
gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt
文件内容应可见:
Hello, GCS
清理测试文件
Cloud Storage 存储分区现已设置完毕。现在可以删除临时测试文件。
gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt
输出应确认删除操作:
Removing gs://$GCS_BUCKET_NAME/hello.txt... / [1 objects] Operation completed over 1 objects.
5. 从 Snowflake 导出到 GCS
在本实验中,我们将使用 TPC-H 数据集 ,这是一个决策支持系统的行业标准基准。默认情况下,所有 Snowflake 账号中都提供此数据集。
在 Snowflake 中准备数据
登录 Snowflake 账号并创建新的工作表。
由于权限问题,Snowflake 提供的示例 TPC-H 数据无法直接从其共享位置导出。首先,必须将 ORDERS 表复制到单独的数据库和架构中。
创建数据库
- 在左侧菜单的 Horizon Catalog 下,将鼠标悬停在 Catalog 上,然后点击 Database Explorer
- 进入 Databases 页面后,点击右上角的 + Database 按钮。
- 将新数据库命名为
codelabs_retl_db
创建工作表
如需针对数据库运行 SQL 命令,您需要使用工作表。
如需创建工作表,请执行以下操作:
- 在左侧边菜单的 Work with data 下,将鼠标悬停在 Projects 上,然后点击 Workspaces
- 在 My Workspaces 边栏中,点击 + Add new 按钮,然后选择 SQL File
USE DATABASE codelabs_retl_db;
CREATE SCHEMA codelabs_retl_export;
CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT
n.n_name AS nation_name,
c.c_mktsegment AS market_segment,
YEAR(o.o_orderdate) AS order_year,
o.o_orderpriority AS order_priority,
COUNT(o.o_orderkey) AS total_order_count,
ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c
ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
ON c.c_nationkey = n.n_nationkey
GROUP BY
n.n_name,
c.c_mktsegment,
YEAR(o.o_orderdate),
o.o_orderpriority;
SELECT COUNT(*) FROM regional_sales_csv;
输出应指明已复制 4375 行。
配置 Snowflake 以访问 GCS
如需允许 Snowflake 将数据写入 GCS 存储分区,您需要创建存储集成 和暂存区 。
- 存储集成: 一个 Snowflake 对象,用于存储为外部 Cloud Storage 生成的服务账号和身份验证信息。
- 暂存区: 一个已命名对象,用于引用特定存储分区和路径,并使用存储集成来处理身份验证。它为数据加载和卸载操作提供了一个方便的命名位置。
首先,创建存储集成。
CREATE OR REPLACE STORAGE INTEGRATION gcs_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
-- Grant Snowflake permission to write to a specific path in your bucket.
STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');
接下来,描述集成以获取 Snowflake 为其创建的服务账号。
DESC STORAGE INTEGRATION gcs_int;
在结果中,复制 STORAGE_GCP_SERVICE_ACCOUNT 的值。它看起来像一个电子邮件地址。
将此服务账号存储到 shell 实例中的环境变量中,以便日后重复使用
export GCP_SERVICE_ACCOUNT=<Your service account>
向 Snowflake 授予 GCS 权限
现在,必须向 Snowflake 服务账号授予写入 GCS 存储分区的权限。
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.objectAdmin"
gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
--member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
--role="roles/storage.legacyBucketReader"
创建暂存区并导出数据
现在权限已设置完毕,请返回到 Snowflake 工作表。创建一个使用集成的暂存区,然后使用 COPY INTO 命令将 SAMPLE_ORDERS 表数据导出到该暂存区。
CREATE OR REPLACE STAGE retl_gcs_stage
URL = 'gcs://<Your bucket name>/regional_sales_csv'
STORAGE_INTEGRATION = gcs_int
-- Define the output file format
FILE_FORMAT = (TYPE = 'CSV');
COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);
在“结果”窗格中,rows_unloaded 应可见,其值为 1500000。
验证 GCS 中的数据
检查 GCS 存储分区以查看 Snowflake 创建的文件。这确认导出操作已成功。
gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/
一个或多个编号的 CSV 文件应可见。
gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv ...
6. 使用 Dataflow 将数据加载到 Spanner
现在数据已位于 GCS 中,我们将使用 Dataflow 将数据导入 Spanner。Dataflow 是 Google Cloud 的全托管式服务,用于流处理和批处理数据。我们将使用预构建的 Google 模板,该模板专门用于将文本文件从 GCS 导入到 Spanner。
创建 Spanner 表
首先,在 Spanner 中创建目标表。架构需要与 CSV 文件中的数据兼容。
gcloud spanner databases ddl update $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--ddl="$(cat <<EOF
CREATE TABLE regional_sales (
nation_name STRING(MAX),
market_segment STRING(MAX),
order_year INT64,
order_priority STRING(MAX),
total_order_count INT64,
total_revenue NUMERIC,
unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"
创建 Dataflow 清单
Dataflow 模板需要一个“清单”文件。这是一个 JSON 文件,用于告知模板在何处查找源数据文件以及要将这些文件加载到哪个 Spanner 表中。
定义新的 regional_sales_manifest.json 并将其上传到 GCS 存储分区:
cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json
{
"tables": [
{
"table_name": "regional_sales",
"file_patterns": [
"gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
]
}
]
}
EOF
启用 Dataflow API
在使用 Dataflow 之前,需要先启用它。请使用以下命令执行此操作:
gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT
创建并运行 Dataflow 作业
导入作业现已准备就绪,可以开始运行。此命令使用 GCS_Text_to_Cloud_Spanner 模板启动
Dataflow 作业。
该命令很长,并且有多个参数。下面详细介绍了这些参数:
| GCS 上预构建模板的路径。 | |
| Dataflow 作业将运行的区域。 | |
| ||
| 目标 Spanner 实例和数据库。 | |
| 刚刚创建的清单文件的 GCS 路径。 | |
gcloud dataflow jobs run spanner-import-from-gcs \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
--region=$GCP_REGION \
--staging-location=gs://$GCS_BUCKET_NAME/staging \
--parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'
您可以使用以下命令检查 Dataflow 作业的状态
gcloud dataflow jobs list \
--filter="name:spanner-import-from-gcs" \
--region="$GCP_REGION" \
--sort-by="~creationTime" \
--limit=1
该作业大约需要 5 分钟才能完成。
验证 Spanner 中的数据
Dataflow 作业成功后,验证数据是否已加载到 Spanner 中。
首先,检查行数。行数应为 4375
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'
接下来,查询几行以检查数据。
gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'
来自 Snowflake 表的导入数据应可见。
7. 清理
清理 Spanner
删除 Spanner 数据库和实例
gcloud spanner instances delete $SPANNER_INSTANCE
清理 GCS
删除为托管数据而创建的 GCS 存储分区
gcloud storage rm --recursive gs://$GCS_BUCKET_NAME
清理 Snowflake
删除数据库
- 在左侧菜单的 Horizon Catalog 下,将鼠标悬停在 Catalog 上,然后点击 Database Explorer
- 点击
CODELABS_RETL_DB数据库右侧的 ... 以展开选项,然后选择 Drop - 在弹出的确认对话框中,选择 Drop Database
删除工作簿
- 在左侧侧边菜单的 Work with data 下,将鼠标悬停在 Projects 上,然后点击 Workspaces
- 在 My Workspace 边栏中,将鼠标悬停在本实验中使用的不同工作区文件上,以显示 ... 其他选项,然后点击该选项
- 选择 Delete ,然后在弹出的确认对话框中再次选择 Delete 。
- 针对您为此实验创建的所有 SQL 工作区文件执行此操作。
8. 恭喜
恭喜您完成此 Codelab。
所学内容
- 如何将数据加载到 Snowflake 中
- 如何创建 GCS 存储分区
- 如何以 CSV 格式将 Snowflake 表导出到 GCS
- 如何设置 Spanner 实例
- 如何使用 Dataflow 将 CSV 表加载到 Spanner