使用 CSV 从 Snowflake 到 Spanner 的反向 ETL

1. 使用 Google Cloud Storage 和 Dataflow 构建从 Snowflake 到 Spanner 的反向 ETL 流水线

简介

在本实验中,我们将构建一条反向 ETL 流水线。传统上,ETL(提取、转换、加载)流水线会将数据从运营数据库移入 Snowflake 等数据仓库,以进行分析。 反向 ETL 流水线则相反:它会将经过整理和处理的数据从数据仓库移回运营系统,以便为应用提供支持、为面向用户的功能提供服务,或用于实时决策。

我们的目标是将示例数据集从 Snowflake 表移入 Spanner,Spanner 是一种全球分布的关系型数据库,非常适合高可用性应用。

为此,我们将使用 Google Cloud Storage (GCS) 和 Dataflow 作为中间步骤。下面详细介绍了此架构的流程和原因:

  1. Snowflake 到 Google Cloud Storage (GCS)(采用 CSV 格式)
  • 第一步是以开放的通用格式将数据从 Snowflake 中提取出来。导出为 CSV 是一种常见且简单的方法,用于创建可移植的数据文件。我们将在 GCS 中暂存这些文件,GCS 提供可扩缩且持久的对象存储解决方案。
  1. GCS 到 Spanner(通过 Dataflow)
  • 我们不会编写自定义脚本来从 GCS 读取数据并写入 Spanner,而是使用 Google Dataflow,这是一种全托管式数据处理服务。Dataflow 提供专门针对此类任务的预构建模板。使用“GCS Text to Cloud Spanner”模板可以实现高吞吐量、并行化的数据导入,而无需编写任何数据处理代码,从而节省大量开发时间。

学习内容

  • 如何将数据加载到 Snowflake 中
  • 如何创建 GCS 存储分区
  • 如何以 CSV 格式将 Snowflake 表导出到 GCS
  • 如何设置 Spanner 实例
  • 如何使用 Dataflow 将 CSV 表加载到 Spanner

2. 设置、要求和限制

前提条件

  • Snowflake 账号。
  • 已启用 Spanner、Cloud Storage 和 Dataflow API 的 Google Cloud 账号。
  • 通过 Web 浏览器访问 Google Cloud 控制台。
  • 安装了 Google Cloud CLI 的终端。
  • 如果您的 Google Cloud 组织启用了 iam.allowedPolicyMemberDomains 政策,管理员可能需要授予例外权限,以允许来自外部网域的服务账号。我们将在后面的步骤中介绍此内容(如果适用)。

Google Cloud Platform IAM 权限

Google 账号需要拥有以下权限,才能执行此 Codelab 中的所有步骤。

服务账号

iam.serviceAccountKeys.create

允许创建服务账号。

Spanner

spanner.instances.create

允许创建新的 Spanner 实例。

spanner.databases.create

允许运行 DDL 语句来创建

spanner.databases.updateDdl

允许运行 DDL 语句来在数据库中创建表。

Google Cloud Storage

storage.buckets.create

允许创建新的 GCS 存储分区来存储导出的 Parquet 文件。

storage.objects.create

允许将导出的 Parquet 文件写入 GCS 存储分区。

storage.objects.get

允许 BigQuery 从 GCS 存储分区读取 Parquet 文件。

storage.objects.list

允许 BigQuery 列出 GCS 存储分区中的 Parquet 文件。

Dataflow

Dataflow.workitems.lease

允许从 Dataflow 声明工作项。

Dataflow.workitems.sendMessage

允许 Dataflow 工作器将消息发回 Dataflow 服务。

Logging.logEntries.create

允许 Dataflow 工作器将日志条目写入 Google Cloud Logging。

为方便起见,您可以使用包含这些权限的预定义角色。

roles/resourcemanager.projectIamAdmin

roles/iam.serviceAccountKeyAdmin

roles/spanner.instanceAdmin

roles/spanner.databaseAdmin

roles/storage.admin

roles/dataflow.serviceAgent

roles/dataflow.worker

roles/dataflow.serviceAgent

限制

在系统之间移动数据时,请务必注意数据类型差异。

  • Snowflake 到 CSV: 导出时,Snowflake 数据类型会转换为标准文本表示形式。
  • CSV 到 Spanner: 导入时,必须确保目标 Spanner 数据类型与 CSV 文件中的字符串表示形式兼容。本实验将引导您完成一组常见的类型映射。

设置可重复使用的属性

在本实验中,您将需要重复使用一些值。为方便起见,我们将这些值设置为 shell 变量,以供日后使用。

  • GCP_REGION - GCP 资源将位于的特定区域。如需查看区域列表,请点击此处
  • GCP_PROJECT - 要使用的 GCP 项目 ID。
  • GCP_BUCKET_NAME - 要创建的 GCS 存储分区名称,以及数据文件的存储位置。
  • SPANNER_INSTANCE - 要分配给 Spanner 实例的名称
  • SPANNER_DB - 要分配给 Spanner 实例内数据库的名称
export GCP_REGION = <GCP REGION HERE> 
export GCP_PROJECT= <GCP PROJECT HERE>
export GCS_BUCKET_NAME = <GCS BUCKET NAME HERE>
export SPANNER_INSTANCE = <SPANNER INSTANCE ID HERE>
export SPANNER_DB = <SPANNER DATABASE ID HERE>

Google Cloud

本实验需要一个 Google Cloud 项目。

Google Cloud 项目

项目是 Google Cloud 中的基本组织单位。如果管理员已提供一个项目供您使用,则可以跳过此步骤。

您可以使用 CLI 创建项目,如下所示:

gcloud projects create $GCP_PROJECT
gcloud config set project $GCP_PROJECT

如需详细了解如何创建和管理项目,请点击此处

3. 设置 Spanner

如需开始使用 Spanner,您需要预配实例和数据库。如需详细了解如何配置和创建 Spanner 实例,请点击此处

创建实例

gcloud spanner instances create $SPANNER_INSTANCE \
--config=regional-$GCP_REGION \
--description="Codelabs Snowflake RETL" \
--processing-units=100 \
--edition=ENTERPRISE

创建数据库

gcloud spanner databases create $SPANNER_DB \
--instance=$SPANNER_INSTANCE

4. 创建 Google Cloud Storage 存储分区

Google Cloud Storage (GCS) 将用于在将 CSV 数据文件导入 Spanner 之前临时存储这些文件。这些文件由 Snowflake 生成。

创建存储分区

使用以下命令在特定区域(例如 us-central1)中创建存储分区。

gcloud storage buckets create gs://$GCS_BUCKET_NAME --location=$GCP_REGION

验证存储分区创建情况

该命令成功后,列出所有存储分区以检查结果。新存储分区应显示在结果列表中。存储分区引用通常会在存储分区名称前显示 gs:// 前缀。

gcloud storage ls | grep gs://$GCS_BUCKET_NAME

测试写入权限

此步骤可确保本地环境已正确通过身份验证,并且拥有将文件写入新创建的存储分区所需的权限。

echo "Hello, GCS" | gcloud storage cp - gs://$GCS_BUCKET_NAME/hello.txt

验证上传的文件

列出存储分区中的对象。系统应显示刚刚上传的文件的完整路径。

gcloud storage ls gs://$GCS_BUCKET_NAME

您应该会看到以下输出内容:

gs://$GCS_BUCKET_NAME/hello.txt

如需查看存储分区中对象的内容,可以使用 gcloud storage cat

gcloud storage cat gs://$GCS_BUCKET_NAME/hello.txt

文件内容应可见:

Hello, GCS

清理测试文件

Cloud Storage 存储分区现已设置完毕。现在可以删除临时测试文件。

gcloud storage rm gs://$GCS_BUCKET_NAME/hello.txt

输出应确认删除操作:

Removing gs://$GCS_BUCKET_NAME/hello.txt...
/ [1 objects]
Operation completed over 1 objects.

5. 从 Snowflake 导出到 GCS

在本实验中,我们将使用 TPC-H 数据集 ,这是一个决策支持系统的行业标准基准。默认情况下,所有 Snowflake 账号中都提供此数据集。

在 Snowflake 中准备数据

登录 Snowflake 账号并创建新的工作表。

由于权限问题,Snowflake 提供的示例 TPC-H 数据无法直接从其共享位置导出。首先,必须将 ORDERS 表复制到单独的数据库和架构中。

创建数据库

  1. 在左侧菜单的 Horizon Catalog 下,将鼠标悬停在 Catalog 上,然后点击 Database Explorer
  2. 进入 Databases 页面后,点击右上角的 + Database 按钮。
  3. 将新数据库命名为 codelabs_retl_db

创建工作表

如需针对数据库运行 SQL 命令,您需要使用工作表。

如需创建工作表,请执行以下操作:

  1. 在左侧边菜单的 Work with data 下,将鼠标悬停在 Projects 上,然后点击 Workspaces
  2. My Workspaces 边栏中,点击 + Add new 按钮,然后选择 SQL File
USE DATABASE codelabs_retl_db;

CREATE SCHEMA codelabs_retl_export;

CREATE TABLE codelabs_retl_export.regional_sales_csv AS
SELECT 
    n.n_name AS nation_name,
    c.c_mktsegment AS market_segment,
    YEAR(o.o_orderdate) AS order_year,
    o.o_orderpriority AS order_priority,
    COUNT(o.o_orderkey) AS total_order_count,
    ROUND(SUM(o.o_totalprice), 2) AS total_revenue,
    COUNT(DISTINCT c.c_custkey) AS unique_customer_count
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders AS o
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.customer AS c 
    ON o.o_custkey = c.c_custkey
INNER JOIN SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.nation AS n
    ON c.c_nationkey = n.n_nationkey
GROUP BY 
    n.n_name, 
    c.c_mktsegment, 
    YEAR(o.o_orderdate), 
    o.o_orderpriority;

SELECT COUNT(*) FROM regional_sales_csv;

输出应指明已复制 4375 行。

配置 Snowflake 以访问 GCS

如需允许 Snowflake 将数据写入 GCS 存储分区,您需要创建存储集成暂存区

  • 存储集成: 一个 Snowflake 对象,用于存储为外部 Cloud Storage 生成的服务账号和身份验证信息。
  • 暂存区: 一个已命名对象,用于引用特定存储分区和路径,并使用存储集成来处理身份验证。它为数据加载和卸载操作提供了一个方便的命名位置。

首先,创建存储集成。

CREATE OR REPLACE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  -- Grant Snowflake permission to write to a specific path in your bucket.
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<Your bucket name>/sample_orders');

接下来,描述集成以获取 Snowflake 为其创建的服务账号。

DESC STORAGE INTEGRATION gcs_int; 

在结果中,复制 STORAGE_GCP_SERVICE_ACCOUNT 的值。它看起来像一个电子邮件地址。

将此服务账号存储到 shell 实例中的环境变量中,以便日后重复使用

export GCP_SERVICE_ACCOUNT=<Your service account>

向 Snowflake 授予 GCS 权限

现在,必须向 Snowflake 服务账号授予写入 GCS 存储分区的权限。

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.objectAdmin"

gcloud storage buckets add-iam-policy-binding gs://$GCS_BUCKET_NAME \
    --member="serviceAccount:$GCP_SERVICE_ACCOUNT" \
    --role="roles/storage.legacyBucketReader"

创建暂存区并导出数据

现在权限已设置完毕,请返回到 Snowflake 工作表。创建一个使用集成的暂存区,然后使用 COPY INTO 命令将 SAMPLE_ORDERS 表数据导出到该暂存区。

CREATE OR REPLACE STAGE retl_gcs_stage
    URL = 'gcs://<Your bucket name>/regional_sales_csv'
    STORAGE_INTEGRATION = gcs_int
    -- Define the output file format
    FILE_FORMAT = (TYPE = 'CSV');

COPY INTO @retl_gcs_stage/regional_sales_csv
FROM (SELECT * FROM codelabs_retl_export.regional_sales_csv)
FILE_FORMAT = (TYPE = CSV, COMPRESSION = NONE);

在“结果”窗格中,rows_unloaded 应可见,其值为 1500000

验证 GCS 中的数据

检查 GCS 存储分区以查看 Snowflake 创建的文件。这确认导出操作已成功。

gcloud storage ls gs://$GCS_BUCKET_NAME/regional_sales_csv/

一个或多个编号的 CSV 文件应可见。

gs://your-bucket-name/regional_sales_csv/regional_sales_csv_0_0_0.csv
...

6. 使用 Dataflow 将数据加载到 Spanner

现在数据已位于 GCS 中,我们将使用 Dataflow 将数据导入 Spanner。Dataflow 是 Google Cloud 的全托管式服务,用于流处理和批处理数据。我们将使用预构建的 Google 模板,该模板专门用于将文本文件从 GCS 导入到 Spanner。

创建 Spanner 表

首先,在 Spanner 中创建目标表。架构需要与 CSV 文件中的数据兼容。

gcloud spanner databases ddl update $SPANNER_DB \
  --instance=$SPANNER_INSTANCE \
  --ddl="$(cat <<EOF
CREATE TABLE regional_sales (
    nation_name STRING(MAX),
    market_segment STRING(MAX),
    order_year INT64,
    order_priority STRING(MAX),
    total_order_count INT64,
    total_revenue NUMERIC,
    unique_customer_count INT64
) PRIMARY KEY (nation_name, market_segment, order_year, order_priority);
EOF
)"

创建 Dataflow 清单

Dataflow 模板需要一个“清单”文件。这是一个 JSON 文件,用于告知模板在何处查找源数据文件以及要将这些文件加载到哪个 Spanner 表中。

定义新的 regional_sales_manifest.json 并将其上传到 GCS 存储分区:

cat <<EOF | gcloud storage cp - gs://$GCS_BUCKET_NAME/regional_sales_manifest.json 
{ 
  "tables": [
    {
       "table_name": "regional_sales", 
       "file_patterns": [ 
         "gs://$GCS_BUCKET_NAME/regional_sales_csv/*.csv"
       ] 
    } 
  ] 
} 
EOF

启用 Dataflow API

在使用 Dataflow 之前,需要先启用它。请使用以下命令执行此操作:

gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT

创建并运行 Dataflow 作业

导入作业现已准备就绪,可以开始运行。此命令使用 GCS_Text_to_Cloud_Spanner 模板启动 Dataflow 作业。

该命令很长,并且有多个参数。下面详细介绍了这些参数:

–gcs-location

GCS 上预构建模板的路径。

–region

Dataflow 作业将运行的区域。

–parameters

instanceIddatabaseId

目标 Spanner 实例和数据库。

importManifest

刚刚创建的清单文件的 GCS 路径。

gcloud dataflow jobs run spanner-import-from-gcs \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Cloud_Spanner \
  --region=$GCP_REGION \
  --staging-location=gs://$GCS_BUCKET_NAME/staging \
  --parameters \
instanceId=$SPANNER_INSTANCE,\
databaseId=$SPANNER_DB,\
importManifest=gs://$GCS_BUCKET_NAME/regional_sales_manifest.json,escape='\'

您可以使用以下命令检查 Dataflow 作业的状态

gcloud dataflow jobs list \
    --filter="name:spanner-import-from-gcs" \
    --region="$GCP_REGION" \
    --sort-by="~creationTime" \
    --limit=1

该作业大约需要 5 分钟才能完成。

验证 Spanner 中的数据

Dataflow 作业成功后,验证数据是否已加载到 Spanner 中。

首先,检查行数。行数应为 4375

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT COUNT(*) FROM regional_sales;'

接下来,查询几行以检查数据。

gcloud spanner databases execute-sql $SPANNER_DB \
--instance=$SPANNER_INSTANCE \
--sql='SELECT * FROM regional_sales LIMIT 5'

来自 Snowflake 表的导入数据应可见。

7. 清理

清理 Spanner

删除 Spanner 数据库和实例

gcloud spanner instances delete $SPANNER_INSTANCE

清理 GCS

删除为托管数据而创建的 GCS 存储分区

gcloud storage rm --recursive gs://$GCS_BUCKET_NAME

清理 Snowflake

删除数据库

  1. 在左侧菜单的 Horizon Catalog 下,将鼠标悬停在 Catalog 上,然后点击 Database Explorer
  2. 点击 CODELABS_RETL_DB 数据库右侧的 ... 以展开选项,然后选择 Drop
  3. 在弹出的确认对话框中,选择 Drop Database

删除工作簿

  1. 在左侧侧边菜单的 Work with data 下,将鼠标悬停在 Projects 上,然后点击 Workspaces
  2. My Workspace 边栏中,将鼠标悬停在本实验中使用的不同工作区文件上,以显示 ... 其他选项,然后点击该选项
  3. 选择 Delete ,然后在弹出的确认对话框中再次选择 Delete
  4. 针对您为此实验创建的所有 SQL 工作区文件执行此操作。

8. 恭喜

恭喜您完成此 Codelab。

所学内容

  • 如何将数据加载到 Snowflake 中
  • 如何创建 GCS 存储分区
  • 如何以 CSV 格式将 Snowflake 表导出到 GCS
  • 如何设置 Spanner 实例
  • 如何使用 Dataflow 将 CSV 表加载到 Spanner