端到端迁移:从分片的本地 MySQL 迁移到 Cloud Spanner (GoogleSQL)

1. 准备工作

本 Codelab 将指导您将分片的本地 MySQL 数据库迁移到采用 GoogleSQL 方言的 Cloud Spanner 数据库。您将使用 Google Cloud 服务,包括 Spanner 迁移工具 (SMT)、Dataflow、Datastream、PubSub 和 Google Cloud Storage。

您将了解的内容:

  • 什么是分片环境以及如何设置。
  • 如何使用 Spanner 迁移工具 (SMT) 网页界面将 MySQL 架构转换为与 Spanner 兼容的架构,并执行高级架构修改。
  • 如何使用 Dataflow 将分片的 MySQL 实例中的数据批量迁移到 Cloud Spanner。
  • 如何使用 Datastream 和 Dataflow 将分片的 MySQL 实例中的数据持续复制 (CDC) 到 Cloud Spanner。
  • 如何配置从 Spanner 到分片 MySQL 实例的反向复制。
  • 如何使用自定义转换在批量迁移、实时迁移和反向迁移期间填充额外的列。
  • 如何使用主键配置分片转换。

本 Codelab 不涵盖以下内容:

  • 高级自定义网络。
  • 从头开始构建自定义 Dataflow 模板。
  • 迁移性能调优。
  • 应用迁移:此 Codelab 侧重于数据库层(架构和数据)。它不涵盖重新部署或迁移应用服务的运营流程。

所需条件

  • 启用了结算功能的 Google Cloud 项目。
  • 具有足够的 IAM 权限,可用于启用 API 以及创建/管理 Spanner、Dataflow、Datastream 和 GCS 资源。虽然项目 Owner 角色对于 Codelab 来说最简单,但“环境设置”中将介绍更具体的角色。
  • 我们将在设置阶段预配一个小型 Compute Engine 虚拟机,以模拟本地服务器。确保您的项目配额允许创建虚拟机。
  • 网络浏览器,例如 Google Chrome。
  • 基本熟悉 Google Cloud 控制台和 gcloud 等命令行工具。
  • 对 shell 环境的访问权限。建议使用 Cloud Shell,因为它包含 gcloud

如需详细了解上述设置,请参阅环境设置部分。

2. 了解迁移过程

迁移分片数据库涉及将多个物理和逻辑 MySQL 实例整合到单个可横向扩缩的 Spanner 数据库中。本部分概述了迁移中使用的架构和关键工具。

迁移流程架构

迁移过程包括以下阶段:

1. 架构转换:

  • 用途:将源数据库架构转换为兼容的 Cloud Spanner 架构。
  • 工具:Spanner 迁移工具 (SMT)
  • 流程:SMT 会分析源数据库架构,并生成等效的 Spanner 数据定义语言 (DDL)。在目标 Spanner 实例中,系统会创建一个数据库,然后自动应用 DDL。

2. 批量数据迁移:

  • 用途:用于将现有数据从源数据库初始完全加载到已预配的 Spanner 表中。
  • 工具:Dataflow,使用 Google 提供的 Sourcedb to Spanner 模板。
  • 流程:此 Dataflow 作业会从指定的源表中读取所有数据,并将其写入相应的 Spanner 表中。此步骤在创建 Spanner 架构后完成。

3. 实时迁移 (CDC):

  • 用途:近乎实时地捕获源数据库中的持续更改并将其应用到 Cloud Spanner,从而最大限度地缩短迁移期间的停机时间。
  • 工具
  • Datastream:从源数据库捕获更改(插入、更新、删除),并将其写入 Cloud Storage (GCS)。
  • Dataflow:使用 Datastream to Spanner 模板从 GCS 读取更改事件,并将其应用到 Cloud Spanner。

4. 反向复制:

  • 用途:将数据更改从 Cloud Spanner 复制回源数据库。这对于回退策略、分阶段迁移或在源中维护副本以用于特定应用场景非常有用。
  • 工具:Dataflow,使用 Spanner to SourceDb 模板。
  • 处理:此作业利用 Spanner 变更数据流来捕获 Spanner 中的修改,并将其写回源数据库实例。

下图展示了组件和数据传输:

b9e12d4151bf3bb7.png

关键术语:

  • 物理分片:托管数据库的实际底层服务器或计算实例(在本例中为模拟的本地 GCE 虚拟机)。
  • 逻辑分片:物理服务器中的各个数据库架构。
  • Compute Engine (GCE) 虚拟机:托管在 Google Cloud 基础架构上的虚拟机。在此 Codelab 中,我们将使用 GCE 虚拟机来模拟托管源 MySQL 数据库的独立“本地”裸机服务器。
  • Spanner 迁移工具 (SMT):一种用于评估 MySQL 架构、建议 Spanner 架构等效项并生成 Spanner 数据定义语言 (DDL) 的工具。
  • 数据定义语言 (DDL):用于定义和修改数据库结构的语句,例如 CREATE TABLE 语句。SMT 会根据 Cloud SQL 架构生成 Spanner DDL。
  • Dataflow:一种全代管式无服务器数据处理服务。在此 Codelab 中,它用于运行 Google 提供的模板,以进行批量数据转移、应用 Datastream 更改和反向复制。
  • Datastream:一种无服务器的变更数据捕获 (CDC) 和复制服务。在本 Codelab 中,它用于将本地托管的 MySQL 实例中的更改流式传输到 Cloud Storage。
  • Spanner 变更数据流:一种 Spanner 功能,可用于实时流式传输数据更改(插入、更新、删除),用作反向复制的来源。
  • Pub/Sub:一种消息传递服务,用于将生成事件的服务与处理事件的服务分离开。在此 Codelab 中,每当 Datastream 将新的更改文件上传到 Cloud Storage 时,它都会触发 Dataflow 来处理更新。

3. 环境设置

在开始迁移之前,您需要设置 Google Cloud 项目并启用必要的服务。

1. 选择或创建 Google Cloud 项目

您需要一个启用了结算功能的 Google Cloud 项目才能使用本 Codelab 中的服务。

  1. 在 Google Cloud 控制台中,前往项目选择器页面:前往项目选择器
  2. 选择或创建 Google Cloud 项目。
  3. 确保您的项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

2. 打开 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了 gcloud CLI 和您需要的其他工具。

  • 点击 Google Cloud 控制台右上角的激活 Cloud Shell 按钮。
  • 一个 Cloud Shell 会话随即会在控制台底部的新框架内打开,并显示命令行提示符。

22d57633bc12106d.png

3. 设置项目变量和环境变量

在 Cloud Shell 中,为您的项目 ID 和要使用的区域设置一些环境变量。

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1" # Or your preferred region
export ZONE="us-central1-a" # Or a zone within your selected region

gcloud config set project $PROJECT_ID
gcloud config set compute/region $REGION
gcloud config set compute/zone $ZONE

echo "Project ID: $PROJECT_ID"
echo "Region: $REGION"
echo "Zone: $ZONE"

4. 启用必需的 Google Cloud API

启用 Cloud Spanner、Dataflow、Datastream 和其他相关服务所需的 API。

gcloud services enable \
  spanner.googleapis.com \
  dataflow.googleapis.com \
  datastream.googleapis.com \
  pubsub.googleapis.com \
  storage.googleapis.com \
  compute.googleapis.com \
  sqladmin.googleapis.com \
  servicenetworking.googleapis.com \
  cloudresourcemanager.googleapis.com

此命令可能需要几分钟时间才能完成。

4. 设置源 MySQL 数据库

在本部分中,我们将通过预配两个 Compute Engine 虚拟机(我们的 2 个“物理分片”)来模拟本地分片 MySQL 架构。然后,我们将在两个虚拟机上安装 MySQL,并在每个虚拟机上创建两个数据库(即“逻辑分片”)。

1. 创建 Compute Engine 虚拟机(物理分片)

在 Cloud Shell 中运行以下命令,以创建两个搭载 Ubuntu 的虚拟机。稍后,我们将为这些实例分配网络标记,以允许入站 MySQL 流量。

# Create Physical Shard 1
gcloud compute instances create mysql-physical-1 \
    --zone=$ZONE \
    --machine-type=e2-small \
    --image-family=ubuntu-2204-lts \
    --image-project=ubuntu-os-cloud \
    --tags=mysql-server

# Create Physical Shard 2
gcloud compute instances create mysql-physical-2 \
    --zone=$ZONE \
    --machine-type=e2-small \
    --image-family=ubuntu-2204-lts \
    --image-project=ubuntu-os-cloud \
    --tags=mysql-server

2. 配置防火墙规则

如需允许安全 SSH 访问(无需公开)并启用 Datastream 连接,请执行以下操作:

创建通过 IAP 进行 SSH 连接的防火墙规则:

此规则允许 Identity-Aware Proxy 通过 SSH 端口 (22) 访问您的虚拟机。

gcloud compute firewall-rules create allow-ssh-iap \
    --direction=INGRESS \
    --priority=1000 \
    --network=default \
    --action=ALLOW \
    --rules=tcp:22 \
    --source-ranges=35.235.240.0/20 \
    --target-tags=mysql-server

为 Datastream 创建防火墙规则(MySQL 端口):

Datastream 需要能够通过标准 MySQL 端口 (3306) 连接到这些虚拟机。

gcloud compute firewall-rules create allow-mysql-datastream \
    --direction=INGRESS \
    --priority=1000 \
    --network=default \
    --action=ALLOW \
    --rules=tcp:3306 \
    --source-ranges=0.0.0.0/0 \
    --target-tags=mysql-server

3. 在物理分片 1 上安装和配置 MySQL

通过 SSH 连接到第一个虚拟机,以安装 MySQL 并配置二进制日志记录(Datastream 需要此功能才能进行实时复制)。

  1. 通过 SSH 连接到第一个虚拟机:
gcloud compute ssh mysql-physical-1 --zone=$ZONE --tunnel-through-iap
  1. 安装 MySQL:
sudo apt-get update
sudo apt-get install mysql-server-8.0 -y

# Verify the installation and version
sudo mysql --version
  1. 配置 mysqld.cnf 文件以启用二进制日志记录并允许外部连接:
sudo sed -i 's/bind-address.*/bind-address = 0.0.0.0/' /etc/mysql/mysql.conf.d/mysqld.cnf
echo -e "[mysqld]\nserver-id=1\nlog_bin=/var/log/mysql/mysql-bin.log\nbinlog_format=ROW" | sudo tee -a /etc/mysql/mysql.conf.d/mysqld.cnf
  1. 重启 MySQL 以应用更改:
sudo systemctl restart mysql

4. 创建逻辑分片、插入数据和创建 Datastream 用户(分片 1)

在仍通过 SSH 连接到 mysql-physical-1 的情况下,登录 MySQL 提示符:

sudo mysql

运行以下 SQL 命令。此脚本会创建两个不同的逻辑分片(shard0_dbshard1_db),在两者中设置相同的架构,在每个分片中插入可唯一标识的数据(以演示分片),并为 Datastream 创建复制用户。

运行以下 SQL 命令,以创建前两个逻辑分片、一个表以及 Datastream 的复制用户:

CREATE DATABASE shard0_db;
CREATE DATABASE shard1_db;

USE shard0_db;
CREATE TABLE Customers (
    CustomerId INT NOT NULL,
    CustomerName VARCHAR(255),
    CreditLimit DECIMAL(10, 2) NOT NULL,
    LegacyRegion VARCHAR(50), -- Renamed to LoyaltyTier in Spanner

    PRIMARY KEY (CustomerId),
    CONSTRAINT CHK_CreditLimit CHECK (CreditLimit > 1000) -- Relaxed in Spanner to > 0 
);

CREATE TABLE Orders (
    CustomerId INT NOT NULL,  -- Logically references Customers.CustomerId in Spanner
    OrderId INT NOT NULL,
    OrderValue DECIMAL(10, 2),
    LegacyOrderSystem VARCHAR(50), -- Extra column in Source, to be dropped in Spanner

    PRIMARY KEY (CustomerId, OrderId) -- Spanner PK will have one additional column in PK
);

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(4, 'David E.', 2000.00, 'EAST'),
(8, 'Eleanor F.', 8100.00, 'WEST'),
(12, 'Frank G.', 12000.00, 'NORTH'),
(16, 'Grace H.', 6500.00, 'SOUTH');

INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) VALUES
(4, 101, 150.00, 'WebStore_v1'),
(4, 102, 25.50, 'InStore_POS'),
(8, 103, 75.00, 'MobileApp_Legacy'),
(12, 104, 3000.00, 'WebStore_v1'),
(16, 105, 120.00, 'Partner_API');

USE shard1_db;
CREATE TABLE Customers (
    CustomerId INT NOT NULL,
    CustomerName VARCHAR(255),
    CreditLimit DECIMAL(10, 2) NOT NULL,
    LegacyRegion VARCHAR(50), -- Renamed to LoyaltyTier in Spanner
    PRIMARY KEY (CustomerId),
    CONSTRAINT CHK_CreditLimit CHECK (CreditLimit > 1000) -- Relaxed in Spanner to > 0 
);

CREATE TABLE Orders (
    CustomerId INT NOT NULL,  -- Logically references Customers.CustomerId in Spanner
    OrderId INT NOT NULL,
    OrderValue DECIMAL(10, 2),
    LegacyOrderSystem VARCHAR(50), -- Extra column in Source, to be dropped in Spanner
    PRIMARY KEY (CustomerId, OrderId) -- Spanner PK will have one additional column in PK
);

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(1, 'Agnes N.', 5100.00, 'NORTHEAST'),(5, 'Alice I.', 15000.00, 'EAST'),
(9, 'Bob J.', 7500.00, 'WEST'),
(13, 'Charlie K.', 2200.00, 'CENTRAL');

INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) VALUES
(1, 201, 50.00, 'MobileApp_Legacy'),
(5, 202, 1250.00, 'WebStore_v1'),
(5, 203, 80.00, 'Partner_API'),
(9, 204, 600.00, 'InStore_POS'),
(13, 205, 199.99, 'WebStore_v1');


-- Create Datastream Replication User
CREATE USER 'datastream_user'@'%' IDENTIFIED BY 'complex_password_123';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT, INSERT, UPDATE, DELETE ON *.* TO 'datastream_user'@'%'; 
FLUSH PRIVILEGES;

您可以在此处找到上述架构的转储文件。请务必单独创建数据流复制用户,因为该用户不包含在转储文件中。

5. 验证数据

快速检查数据是否存在:

SELECT 'Customers shard0_db' AS tbl, COUNT(*) FROM shard0_db.Customers
UNION ALL
SELECT 'Orders shard0_db', COUNT(*) FROM shard0_db.Orders
UNION ALL
SELECT 'Customers shard1_db', COUNT(*) FROM shard1_db.Customers
UNION ALL
SELECT 'Orders shard1_db', COUNT(*) FROM shard1_db.Orders;
EXIT;

预期输出:

+---------------------+----------+
| tbl                 | COUNT(*) |
+---------------------+----------+
| Customers shard0_db |        4 |
| Orders shard0_db    |        5 |
| Customers shard1_db |        4 |
| Orders shard1_db    |        5 |
+---------------------+----------+

输入 exit 以退出与物理分片 1 虚拟机的连接。

6. 针对实体分片 2 重复上述步骤

现在,您将为第二个虚拟机重复完全相同的流程,但您将创建 shard2_dbshard3_db,并更改 server-id

  1. 通过 SSH 连接到第二个虚拟机:
gcloud compute ssh mysql-physical-2 --zone=$ZONE --tunnel-through-iap
  1. 安装 MySQL:
sudo apt-get update
sudo apt-get install mysql-server-8.0 -y
  1. 配置 mysqld.cnf 文件以启用二进制日志记录并允许外部连接 [请注意,server-id 必须不同(例如,2)]
sudo sed -i 's/bind-address.*/bind-address = 0.0.0.0/' /etc/mysql/mysql.conf.d/mysqld.cnf

echo -e "[mysqld]\nserver-id=2\nlog_bin=/var/log/mysql/mysql-bin.log\nbinlog_format=ROW" | sudo tee -a /etc/mysql/mysql.conf.d/mysqld.cnf
  1. 重启 MySQL 以应用更改:
sudo systemctl restart mysql
  1. 进入 MySQL (sudo mysql),然后运行第 4 步中的 SQL 的略微修改版本:
CREATE DATABASE shard2_db;
CREATE DATABASE shard3_db;

USE shard2_db;
CREATE TABLE Customers (
    CustomerId INT NOT NULL,
    CustomerName VARCHAR(255),
    CreditLimit DECIMAL(10, 2) NOT NULL,
    LegacyRegion VARCHAR(50), -- Renamed to LoyaltyTier in Spanner
    PRIMARY KEY (CustomerId),
    CONSTRAINT CHK_CreditLimit CHECK (CreditLimit > 1000) -- Relaxed in Spanner to > 0 
);

CREATE TABLE Orders (
    CustomerId INT NOT NULL,  -- Logically references Customers.CustomerId in Spanner
    OrderId INT NOT NULL,
    OrderValue DECIMAL(10, 2),
    LegacyOrderSystem VARCHAR(50), -- Extra column in Source, to be dropped in Spanner
    PRIMARY KEY (CustomerId, OrderId) -- Spanner PK will have one additional column in PK
);

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(2, 'Brian K.', 2500.00, 'SOUTHWEST'),
(6, 'Diana L.', 1999.00, 'NORTH'),
(10, 'Edward M.', 11000.00, 'EAST'),
(14, 'Fiona N.', 3000.00, 'WEST');

INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) VALUES
(2, 301, 100.00, 'CallCenter_System'),
(6, 302, 99.00, 'MobileApp_Legacy'),
(10, 303, 1000.00, 'WebStore_v1'),
(10, 304, 2500.00, 'InStore_POS'),
(14, 305, 130.00, 'MobileApp_Legacy');

USE shard3_db;
CREATE TABLE Customers (
    CustomerId INT NOT NULL,
    CustomerName VARCHAR(255),
    CreditLimit DECIMAL(10, 2) NOT NULL,
    LegacyRegion VARCHAR(50), -- Renamed to LoyaltyTier in Spanner
    PRIMARY KEY (CustomerId),
    CONSTRAINT CHK_CreditLimit CHECK (CreditLimit > 1000) -- Relaxed in Spanner to > 0 
);

CREATE TABLE Orders (
    CustomerId INT NOT NULL,  -- Logically references Customers.CustomerId in Spanner
    OrderId INT NOT NULL,
    OrderValue DECIMAL(10, 2),
    LegacyOrderSystem VARCHAR(50), -- Extra column in Source, to be dropped in Spanner
    PRIMARY KEY (CustomerId, OrderId) -- Spanner PK will have one additional column in PK
);

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(3, 'Cathy Z.', 6000.00, 'CENTRAL'),
(7, 'George O.', 18000.00, 'SOUTH'),
(11, 'Helen P.', 4000.00, 'NORTHEAST'),
(15, 'Ivy Q.', 9500.00, 'SOUTHWEST');

INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) VALUES
(3, 401, 600.00, 'InStore_POS'),
(7, 402, 1200.00, 'CallCenter_System'),
(11, 403, 350.00, 'MobileApp_Legacy'),
(15, 404, 800.00, 'WebStore_v1'),
(99, 999, 25.00, 'CallCenter_System'); -- Failure row during Bulk Migration due to violation of interleaving

-- Create Datastream Replication User
CREATE USER 'datastream_user'@'%' IDENTIFIED BY 'complex_password_123';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT, INSERT, UPDATE, DELETE ON *.* TO 'datastream_user'@'%'; 
FLUSH PRIVILEGES;

-- Verify Data
SELECT 'Customers shard2_db' AS tbl, COUNT(*) FROM shard2_db.Customers
UNION ALL
SELECT 'Orders shard2_db', COUNT(*) FROM shard2_db.Orders
UNION ALL
SELECT 'Customers shard3_db', COUNT(*) FROM shard3_db.Customers
UNION ALL
SELECT 'Orders shard3_db', COUNT(*) FROM shard3_db.Orders;

EXIT;

预期输出:

+---------------------+----------+
| tbl                 | COUNT(*) |
+---------------------+----------+
| Customers shard2_db |        4 |
| Orders shard2_db    |        5 |
| Customers shard3_db |        4 |
| Orders shard3_db    |        5 |
+---------------------+----------+

您可以在此处找到上述架构的转储文件。请务必单独创建数据流复制用户,因为该用户不包含在转储文件中。

输入 exit 以退出与虚拟机的连接。

5. 设置 Cloud Spanner

现在,您将设置要将数据迁移到的目标 Cloud Spanner 实例。

1. 创建 Cloud Spanner 实例

Compute Engine 虚拟机所在的区域中创建一个 Cloud Spanner 实例,以尽可能缩短延迟时间。此命令使用 100 个处理单元创建一个适合此 Codelab 的小型实例。

export SPANNER_INSTANCE_NAME="target-spanner-instance"
export SPANNER_DATABASE_NAME="sharded-target-db"
export SPANNER_CONFIG="regional-${REGION}"

gcloud spanner instances create $SPANNER_INSTANCE_NAME \
  --config=$SPANNER_CONFIG \
  --description="Target Spanner Instance" \
  --processing-units=100

创建实例可能需要一到两分钟。

6. 使用 Spanner 迁移工具 (SMT) 转换架构

使用 Spanner 迁移工具 (SMT) Web 界面连接到我们的一个逻辑分片 (shard0_db),分析其架构,并在将其转换为 Cloud Spanner 之前应用多项高级修改。

1. 安装 SMT

我们将直接从 Cloud Shell 运行 SMT Web 界面。在 Cloud Shell 终端中,下载并解压缩最新的 SMT 版本:

sudo apt-get update && sudo apt-get install google-cloud-cli-spanner-migration-tool

# Verify installation 
gcloud alpha spanner migrate web --help

2. 连接到源数据库

  1. 对会话进行身份验证
# Authenticate your Google Cloud account
gcloud auth login

# Set up Application Default Credentials (ADC) for SMT
gcloud auth application-default login

# Ensure your current project is set correctly
gcloud config set project $PROJECT_ID

(注意:系统提示时,请按照提供的网址授权您的账号,然后将验证码粘贴回终端。)

  1. 首先,在 Cloud Shell 标签页中运行以下命令,找到第一个物理分片的外部 IP:
gcloud compute instances describe mysql-physical-1 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)'
  1. 打印在配置 SMT 时要使用的目标 Spanner 实例详细信息。
echo "Project ID: $PROJECT_ID"
echo "Instance ID: $SPANNER_INSTANCE_NAME"
echo "Database Name: $SPANNER_DATABASE_NAME"
  1. 启动网页界面:
gcloud alpha spanner migrate web --port=8080
  1. 在 Cloud Shell 窗口的右上角,点击网页预览图标(看起来像一只眼睛),然后选择在端口 8080 上预览。系统随即会在新的浏览器标签页中打开 SMT 界面。

69ff1c4de3072798.png

  1. 在 SMT 网页界面中,选择连接到数据库
  2. 填写连接详细信息:
  • 数据库类型:MySQL
  • 主机: (粘贴第 2 步中的 IP 地址)
  • 端口:3306
  • 用户datastream_user
  • 密码: complex_password_123
  • 数据库名称: shard0_db
  1. 点击右上角的“修改”按钮以配置 Spanner 数据库。
  2. 输入目标 Spanner 详细信息:
  • 项目 ID: (粘贴第 3 步中的项目 ID)
  • Spanner 实例: (粘贴第 3 步中的实例 ID)
  1. 点击测试连接
  2. 通过此检查后,点击关联。SMT 将分析源数据库并提供基准 Spanner 架构。

50a0a11c84f8cd7.png

3. 应用架构修改

现在,我们将重塑架构,以涵盖复杂的迁移场景。

在 SMT 界面中的架构编辑器中,执行以下操作:

A. 重命名 LegacyRegion 列:

  • 点击左侧导航窗格中的 Customers 表。系统会默认打开标签页。
  • 点击“扳手”部分中的“修改”按钮。
  • 在 Spanner 架构视图中找到 LegacyRegion 列。
  • 在列名称对话框中输入 LoyaltyTier,将 Spanner 列名称更改为 LoyaltyTier
  • 点击保存并转换

7eab05df38da8e36.png

2eedd3168cf161a4.png

B. 放宽检查限制条件:

  • Customers 表中,前往检查约束条件标签页。
  • 找到 CHK_CreditLimit 限制条件。点击修改(铅笔)图标。
  • 将条件从 CreditLimit > 1000 更改为 CreditLimit > 0(这会故意导致信用额度较低的行无法进行反向迁移并进入 DLQ)。

2adcfda3b42b428f.png

C. 删除 LegacyOrderSystem 列:

  • 点击 Orders 表格,系统默认会打开标签页。
  • 点击“扳手”部分中的“修改”按钮。
  • 在 Spanner 架构视图中找到 LegacyOrderSystem 列。
  • 点击旁边的三点状菜单图标,然后选择舍弃列
  • 点击保存并转换

53d3bf8695c43d95.png

D. 添加 OrderSource 列并将其设为主键:

  • Orders 表中,点击添加列。将其命名为 OrderSource,并将类型设置为长度为 50STRING,不进行自动生成,并将 IsNullable 设置为 No
  • 前往主密钥标签页。
  • 点击修改,然后从“列名称”下拉菜单中选择 OrderSource
  • 点击添加列,然后点击保存并转换

6fcf3f35352bdbdd.png

b85a72b2d2c521d5.png

E. 交织订单表:

  • Orders 表中,在主表格视图中找到交织标签页。
  • 将父表设置为 Customers
  • 选择 IN PARENT 交错类型和 NO ACTION 删除时操作。
  • 点击保存

c88dbe943652683a.png

4. 下载替换文件并应用架构

  1. 在 SMT 界面右上角,找到下载制品按钮。选择下载替换文件选项。将此文件保存到本地机器。此文件包含我们刚刚进行的所有架构映射更改,将由我们的 Dataflow 流水线使用。
  1. 点击准备迁移

d3ba4884743e077.png

  1. 从下拉菜单中选择 Migration ModeSchema
  2. 输入目标 Spanner 数据库:sharded-target-db

1f80f8636d317920.png

  1. 点击迁移
  2. SMT 将应用 DDL 并创建 Spanner 数据库。SMT 进程完成后,您可以安全地在 Cloud Shell (Ctrl+C) 中停止该进程。

5. 验证 Cloud Spanner 中的架构

检查 Spanner 数据库中是否已创建表。

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="SELECT table_name FROM information_schema.tables WHERE table_schema = '' ORDER BY table_name"

您应该会看到以下输出内容:

table_name: Customers
table_name: Orders

可选:如果您想检查实际的 Spanner DDL 以验证检查约束、交织和额外列是否已应用,请运行以下命令:

gcloud spanner databases ddl describe $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME

预期输出:

CREATE TABLE Customers (
  CustomerId INT64 NOT NULL,
  CustomerName STRING(255),
  CreditLimit NUMERIC NOT NULL,
  LoyaltyTier STRING(50),
  CONSTRAINT CHK_CreditLimit CHECK(`CreditLimit` > 0),
) PRIMARY KEY(CustomerId);

CREATE TABLE Orders (
  CustomerId INT64 NOT NULL,
  OrderId INT64 NOT NULL,
  OrderValue NUMERIC,
  OrderSource STRING(50) NOT NULL,
) PRIMARY KEY(CustomerId, OrderId, OrderSource),
  INTERLEAVE IN PARENT Customers ON DELETE NO ACTION;

7. 初始化变更数据捕获 (CDC)

在本部分中,您将为迁移设置“记录器”。通过在批量数据加载开始之前配置 Datastream 和 Pub/Sub,您可以确保捕获并排队源数据库中的每一项更改,从而防止在过渡期间丢失任何数据。实时迁移需要此设置。

由于我们的架构涉及两个物理服务器,因此我们必须创建两个单独的 Datastream 源配置文件和两个 Datastream 数据流。这两个数据流都将写入单个 Google Cloud Storage (GCS) 存储分区,该存储分区将作为 Dataflow 流水线的统一来源。

1. 创建 Cloud Storage 存储桶

Datastream 需要一个目标来存储捕获的更改事件。我们来创建一个 GCS 存储分区。

export BUCKET_NAME="migration-${PROJECT_ID}-bucket"
gcloud storage buckets create gs://${BUCKET_NAME} --location=$REGION

2. 创建 Datastream 连接配置文件

我们需要两个不同的 MySQL 来源连接配置文件(每个物理分片一个)和一个 Cloud Storage 目标连接配置文件。

获取来源 IP 地址

首先,获取两个 Compute Engine 虚拟机的外部 IP 地址,并将其存储为环境变量:

export MYSQL_IP_1=$(gcloud compute instances describe mysql-physical-1 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)')

export MYSQL_IP_2=$(gcloud compute instances describe mysql-physical-2 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)')

创建源连接配置文件(Compute Engine 上的 MySQL)

使用之前创建的 datastream_user 创建 Datastream 连接配置文件。

# Create Source Profile for Physical Shard 1
export SQL_CP_NAME_1="mysql-src-cp-1"
gcloud datastream connection-profiles create $SQL_CP_NAME_1 \
    --location=$REGION \
    --type=mysql \
    --mysql-hostname=$MYSQL_IP_1 \
    --mysql-port=3306 \
    --mysql-username=datastream_user \
    --mysql-password=complex_password_123 \
    --display-name="MySQL Source 1 (Physical Shard 1)"

# Create Source Profile for Physical Shard 2
export SQL_CP_NAME_2="mysql-src-cp-2"
gcloud datastream connection-profiles create $SQL_CP_NAME_2 \
    --location=$REGION \
    --type=mysql \
    --mysql-hostname=$MYSQL_IP_2 \
    --mysql-port=3306 \
    --mysql-username=datastream_user \
    --mysql-password=complex_password_123 \
    --display-name="MySQL Source 2 (Physical Shard 2)"

注意:Datastream 通过这些虚拟机的公共 IP 地址连接到它们,这是允许的,因为我们之前已将 0.0.0.0/0 添加到防火墙规则中。在生产环境中,您需要严格将 Datastream 的特定公共 IP 范围列入许可名单。

创建目标连接配置文件 (Cloud Storage):

此引用指向新创建的存储分区的根目录。

export GCS_CP_NAME="gcs-dest-cp"
gcloud datastream connection-profiles create $GCS_CP_NAME \
    --location=$REGION \
    --type=google-cloud-storage \
    --bucket=$BUCKET_NAME \
    --root-path=/ \
    --display-name="GCS Destination" --force

3. 创建 Datastream 数据流

现在,我们将创建两个 CDC 流。数据流 1 将捕获 shard0_dbshard1_db。Stream 2 将拍摄 shard2_dbshard3_db。这两个流都以 Avro 格式写入同一 GCS 存储分区。

# Stream for Physical Shard 1
export STREAM_NAME_1="mysql-to-spanner-stream-1"
export GCS_STREAM_PATH_1="data/${STREAM_NAME_1}"

gcloud datastream streams create $STREAM_NAME_1 \
    --location=$REGION \
    --display-name="MySQL Source 1 CDC Stream" \
    --source=$SQL_CP_NAME_1 \
    --destination=$GCS_CP_NAME \
    --mysql-source-config=<(echo "includeObjects:
  mysqlDatabases:
  - database: 'shard0_db'
  - database: 'shard1_db'") \
    --gcs-destination-config=<(echo "path: ${GCS_STREAM_PATH_1}/
fileRotationMb: 5
fileRotationInterval: 15s
avroFileFormat: {}") \
    --backfill-none

# Stream for Physical Shard 2
export STREAM_NAME_2="mysql-to-spanner-stream-2"
export GCS_STREAM_PATH_2="data/${STREAM_NAME_2}"

gcloud datastream streams create $STREAM_NAME_2 \
    --location=$REGION \
    --display-name="MySQL Source 2 CDC Stream" \
    --source=$SQL_CP_NAME_2 \
    --destination=$GCS_CP_NAME \
    --mysql-source-config=<(echo "includeObjects:
  mysqlDatabases:
  - database: 'shard2_db'
  - database: 'shard3_db'") \
    --gcs-destination-config=<(echo "path: ${GCS_STREAM_PATH_2}/
fileRotationMb: 5
fileRotationInterval: 15s
avroFileFormat: {}") \
    --backfill-none

使用较小的文件轮替设置(5 MB 或 15 秒)有助于我们在 Codelab 期间更快地看到复制的更改。

此命令可能需要一些时间才能完成。查看状态:gcloud datastream streams describe $STREAM_NAME_1 --location=$REGION

4. 启动 Datastream 流

同时激活这两个数据流,以便它们开始记录更改。

gcloud datastream streams update $STREAM_NAME_1 \
    --location=$REGION \
    --state=RUNNING

gcloud datastream streams update $STREAM_NAME_2 \
    --location=$REGION \
    --state=RUNNING

检查状态:您可以运行 gcloud datastream streams describe $STREAM_NAME_1 --location=$REGION。状态最初为 STARTING,几分钟后会变为 RUNNING。等待两者都完全运行后再开始实时迁移。

5. 为 GCS 通知设置 Pub/Sub

当任一 Datastream 数据流将新文件写入 GCS 存储分区时,Dataflow 需要立即收到通知。我们将配置 GCS 以向单个 Pub/Sub 主题发送通知。

创建 Pub/Sub 主题:

export PUBSUB_TOPIC="datastream-gcs-updates"
gcloud pubsub topics create $PUBSUB_TOPIC

创建 GCS 通知

在创建任何以 data/ 为前缀的对象(涵盖我们的两个数据流)时通知相应主题。

gcloud storage buckets notifications create gs://${BUCKET_NAME} --topic=projects/$PROJECT_ID/topics/$PUBSUB_TOPIC --payload-format=json --object-prefix=data/

创建 Pub/Sub 订阅

创建具有建议的 Dataflow 确认截止时间的订阅。

export PUBSUB_SUBSCRIPTION="datastream-gcs-sub"
gcloud pubsub subscriptions create $PUBSUB_SUBSCRIPTION \
  --topic=$PUBSUB_TOPIC \
  --ack-deadline=600

8. 自定义转换

由于我们的 Spanner 架构与 MySQL 架构不同(因为我们通过 SMT Web 界面添加和舍弃了列),因此开箱即用的 Dataflow 迁移会失败。Dataflow 需要有关如何在正向(从 MySQL 到 Spanner)和反向(从 Spanner 到 MySQL)流水线中映射这些差异的说明。

此外,由于我们正在进行分片反向迁移,因此 Dataflow 需要一种路由机制,以便在反向复制期间知道更新后的 Spanner 行属于哪个逻辑分片(shard0_dbshard1_db 等)。

我们将使用 Google 提供的 Spanner 自定义分片模板编写自定义转换 JAR 来实现此目的。

1. 下载自定义分片模板

在 Cloud Shell 中,下载 Google Cloud Dataflow 模板代码库,然后前往自定义分片文件夹:

git clone https://github.com/GoogleCloudPlatform/DataflowTemplates.git
cd DataflowTemplates/v2/spanner-custom-shard

2. 配置数据转换逻辑

我们需要修改 CustomTransformationFetcher.java 文件。

  • 正向迁移 (toSpannerRow):使用 MySQL 中的 LegacyOrderSystem 列填充新添加的 OrderSource 列。
  • 反向迁移 (toSourceRow):重新填充 MySQL 所需的已舍弃的 LegacyOrderSystem 列,并从 Spanner 的 OrderSource 中派生该列。

修改 CustomTransformationFetcher.java 文件。您可以运行以下命令,自动使用自定义逻辑覆盖模板文件,而无需手动打开文本编辑器:

cat << 'EOF' > src/main/java/com/custom/CustomTransformationFetcher.java 
package com.custom;

import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse;
import java.util.HashMap;
import java.util.Map;

public class CustomTransformationFetcher implements ISpannerMigrationTransformer {

 @Override
 public void init(String customParameters) {}

 @Override
 public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request)
     throws InvalidTransformationException {
   if (request.getTableName().equals("Orders")) {
     Map<String, Object> requestRow = request.getRequestRow();
     Map<String, Object> responseRow = new HashMap<>();

     Object legacySysObj = requestRow.get("LegacyOrderSystem");
     String legacySys = (legacySysObj != null) ? (String) legacySysObj : "UNKNOWN_SYSTEM";

     // Transform: Trim the string to remove everything after the first underscore
     String orderSource = legacySys;
     if (legacySys.contains("_")) {
       orderSource = legacySys.substring(0, legacySys.indexOf('_'));
     }

     // Populate the new Spanner column (e.g., "WebStore_v1" becomes "WebStore")
     responseRow.put("OrderSource", orderSource);

     return new MigrationTransformationResponse(responseRow, false);
   }

   return new MigrationTransformationResponse(new HashMap<>(), false);
 }

 @Override
 public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request)
     throws InvalidTransformationException {
   if (request.getTableName().equals("Orders")) {
     Map<String, Object> requestRow = request.getRequestRow();
     Map<String, Object> responseRow = new HashMap<>();

     // Safely fetch the Spanner OrderSource
     Object sourceObj = requestRow.get("OrderSource");
     String source = (sourceObj != null) ? (String) sourceObj : "UNKNOWN_SYSTEM";
     String legacySys = "'" + source + "_v1'";

     // Transform: Append a suffix to visibly prove the reverse transformation worked
     // e.g., "WebStore" becomes "WebStore_v1"
     responseRow.put("LegacyOrderSystem", legacySys);

     return new MigrationTransformationResponse(responseRow, false);
   }

   return new MigrationTransformationResponse(new HashMap<>(), false);
 }

 @Override
 public MigrationTransformationResponse transformFailedSpannerMutation(
     MigrationTransformationRequest request) throws InvalidTransformationException {
   return new MigrationTransformationResponse(new HashMap<>(), false);
 }
}
EOF

3. 配置反向分片逻辑

Dataflow 在反向复制期间使用 CustomShardIdFetcher.java 来确定 Spanner 变更应路由到何处。我们将使用 CustomerId 主键和取模 (%4) 逻辑,以动态方式将记录路由回其正确的逻辑分片。

使用 cat 编辑 CustomShardIdFetcher.java 文件,并将其内容完全替换为以下代码:

cat << 'EOF' > src/main/java/com/custom/CustomShardIdFetcher.java 
package com.custom;

import com.google.cloud.teleport.v2.spanner.utils.IShardIdFetcher;
import com.google.cloud.teleport.v2.spanner.utils.ShardIdRequest;
import com.google.cloud.teleport.v2.spanner.utils.ShardIdResponse;
import java.util.Map;

public class CustomShardIdFetcher implements IShardIdFetcher {
    
    @Override
    public void init(String parameters) {}

    @Override
    public ShardIdResponse getShardId(ShardIdRequest shardIdRequest) {
        Map<String, Object> keys = shardIdRequest.getSpannerRecord
();
        
        // Use the Primary Key to identify the correct logical shard
        if (keys != null && keys.containsKey("CustomerId")) {
            long customerId = Long.parseLong(keys.get("CustomerId").toString());
            long shardIdx = customerId % 4;
            
            ShardIdResponse response = new ShardIdResponse();
            response.setLogicalShardId("shard" + shardIdx + "_db");
            return response;
        }
        
        return new ShardIdResponse();
    }
}
EOF

4. 构建并上传 JAR

现在,我们已编写自定义 Java 逻辑,接下来需要将其编译为 JAR 文件并上传到我们之前创建的 Google Cloud Storage 存储分区,以便 Dataflow 可以访问该文件。

在 Cloud Shell 中运行以下命令:

# Return to DataflowTemplates directory 
cd ../..

# Build the JAR using Maven
mvn clean install -DskipTests -Dcheckstyle.skip -Dspotless.check.skip=true -Djib.skip -pl v2/spanner-custom-shard -am

# Upload the JAR to GCS
export CUSTOM_JAR_PATH="gs://${BUCKET_NAME}/custom-logic/spanner-custom-shard-1.0.jar"

gcloud storage cp v2/spanner-custom-shard/target/spanner-custom-shard-1.0-SNAPSHOT.jar $CUSTOM_JAR_PATH

# Return to home directory
cd ~

9. 将数据从 MySQL 批量迁移到 Spanner

在设置好 Spanner 架构并构建自定义转换 JAR 后,我们现在可以将现有数据从 MySQL 数据库复制到 Cloud Spanner。您将使用 Sourcedb to Spanner Dataflow Flex 模板,该模板旨在将数据从可通过 JDBC 访问的数据库批量复制到 Spanner。

1. 上传架构替换文件

在第 6 部分中,您使用 SMT Web 界面下载了 Spanner 替换 JSON 文件。我们需要将此文件上传到 GCS 存储分区,以便 Dataflow 可以使用它来映射架构差异(例如重命名的列)。

  1. 在 Cloud Shell 中,点击三点状菜单(更多),然后选择上传

4b17d17ab13e90df.png

  1. 选择您之前下载的替换 JSON 文件(例如 spanner_overrides.json)。
  2. 将其移至您的 GCS 存储分区:
export OVERRIDES_FILE="spanner_overrides.json" # Change this if your downloaded file has a different name

export GCS_OVERRIDES_PATH="gs://${BUCKET_NAME}/config/${OVERRIDES_FILE}"

gcloud storage cp ~/${OVERRIDES_FILE} $GCS_OVERRIDES_PATH

2. 创建并上传分片配置文件

Dataflow 需要知道如何连接到两个物理虚拟机中的所有四个逻辑分片。我们将为此创建一个 sharding.json 文件。

在 Cloud Shell 中运行以下命令,以生成并上传配置:

cat <<EOF > sharding.json
{
  "configType": "dataflow",
  "shardConfigurationBulk": {
    "schemaSource": {
      "dataShardId": "mysql-physical-1",
      "host": "${MYSQL_IP_1}",
      "user": "datastream_user",
      "password": "complex_password_123",
      "port": "3306",
      "dbName": "shard0_db"
    },
    "dataShards": [
      {
        "dataShardId": "mysql-physical-1",
        "host": "${MYSQL_IP_1}",
        "user": "datastream_user",
        "password": "complex_password_123",
        "port": "3306",
        "dbName": "",
        "namespace": "namespace-mysql-1",
        "databases": [
          {
            "dbName": "shard0_db",
            "databaseId": "shard0_db",
            "refDataShardId": "mysql-physical-1"
          },
          {
            "dbName": "shard1_db",
            "databaseId": "shard1_db",
            "refDataShardId": "mysql-physical-1"
          }
        ]
      },
      {
        "dataShardId": "mysql-physical-2",
        "host": "${MYSQL_IP_2}",
        "user": "datastream_user",
        "password": "complex_password_123",
        "port": "3306",
        "dbName": "",
        "namespace": "namespace-mysql-2",
        "databases": [
          {
            "dbName": "shard2_db",
            "databaseId": "shard2_db",
            "refDataShardId": "mysql-physical-2"
          },
          {
            "dbName": "shard3_db",
            "databaseId": "shard3_db",
            "refDataShardId": "mysql-physical-2"
          }
        ]
      }
    ]
  }
}
EOF


export GCS_SHARDING_PATH="gs://${BUCKET_NAME}/config/sharding.json"
gcloud storage cp sharding.json $GCS_SHARDING_PATH

3. 运行批量迁移 Dataflow 作业

我们将使用 Sourcedb to Spanner Flex 模板。由于这是具有自定义转换的分片迁移,因此我们传递了替换文件、分片配置和自定义 Java JAR。

export JOB_NAME="mysql-sharded-bulk-to-spanner-$(date +%Y%m%d-%H%M%S)"
export OUTPUT_DIR="gs://${BUCKET_NAME}/bulk-migration"

gcloud dataflow flex-template run $JOB_NAME \
  --project=$PROJECT_ID \
  --region=$REGION \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Sourcedb_to_Spanner_Flex" \
--max-workers=2 \
--num-workers=1 \
--worker-machine-type=n2-highmem-8 \
  --parameters \
sourceConfigURL=$GCS_SHARDING_PATH,\
instanceId=$SPANNER_INSTANCE_NAME,\
databaseId=$SPANNER_DATABASE_NAME,\
projectId=$PROJECT_ID,\
outputDirectory=$OUTPUT_DIR,\
username=datastream_user,\
password=complex_password_123,\
schemaOverridesFilePath=$GCS_OVERRIDES_PATH,\
transformationJarPath=$CUSTOM_JAR_PATH,\
transformationClassName=com.custom.CustomTransformationFetcher

关键参数说明:

  • sourceConfigURL:我们创建的 sharding.json 文件的路径。这会告知 Dataflow 如何连接到两个物理虚拟机中的所有四个逻辑 MySQL 分片。
  • schemaOverridesFilePath:从 SMT Web 界面下载的 JSON 文件的路径。这会指示 Dataflow 如何处理我们所做的架构修改(例如舍弃的 LegacyRegion 列和收紧的检查约束)。
  • transformationJarPath:我们在上一部分中构建的已编译 Java JAR 文件的 GCS 路径。此文件包含用于执行自定义转换的实际代码。
  • transformationClassName:JAR 中实现正向迁移逻辑 (com.custom.CustomTransformationFetcher) 的 Java 类的完全限定名称。
  • outputDirectory:Dataflow 将在其中写入临时文件(最重要的是死信队列 [DLQ] 文件)的 GCS 位置。
  • maxWorkersnumWorkers:控制 Dataflow 作业的扩缩。对于此小型数据集,保持较低值。
  • instanceIddatabaseIdprojectId:指定目标 Cloud Spanner 实例和数据库。

网络注意事项:此作业通过 Cloud SQL 实例的公共 IP 连接到该实例。之所以能够这么做,是因为您之前已将 0.0.0.0/0 添加到实例的授权网络。这样,具有外部 IP 的 Dataflow 工作器虚拟机便可访问数据库。

4. 监控 Dataflow 作业

您可以在 Google Cloud 控制台中跟踪作业的进度:

  1. 前往 Dataflow 作业页面:前往 Dataflow 作业
  2. 找到名为 mysql-sharded-bulk-to-spanner-... 的作业,然后点击它。
  3. 查看作业图和指标。等待作业状态变为成功。此过程大约需要 5-15 分钟。

f3ffd88c35fa8042.png

  • 如果作业遇到问题,请查看 Dataflow 作业详情页面中的日志标签页,了解错误消息。
  • 作业指标可提供有关作业进度和资源消耗(例如吞吐量和 CPU 利用率)的更多信息。

5. 验证 Cloud Spanner 中的数据并检查死信队列 (DLQ)

Dataflow 作业成功完成后,我们需要验证数据是否已安全到达,并检查我们有意设计为失败的记录。

A. 验证迁移数据的总体健康状况:

使用 gcloud CLI 对合并后的 Spanner 数据库运行一些快速健康检查,以确保有效记录已正确迁移,并且我们的自定义 JAR 已填充额外的列。

# 1. Verify total Customer count
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT COUNT(*) as TotalCustomers FROM Customers"

# 2. Verify total Orders count (Total minus the orphan record)
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT COUNT(*) as TotalOrders FROM Orders"

# 3. Verify the Custom Transformation on OrderSource worked
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, OrderId, OrderSource FROM Orders LIMIT 3"

# 4. Verify that renamed column LoyaltyTier has the correct data
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, CustomerName, LoyaltyTier FROM Customers LIMIT 3"

预期输出:

TotalCustomers: 16
TotalOrders: 19

CustomerId: 1
OrderId: 201
OrderSource: MobileApp

CustomerId: 2
OrderId: 301
OrderSource: CallCenter

CustomerId: 3
OrderId: 401
OrderSource: InStore

CustomerId: 1
CustomerName: Agnes N.
LoyaltyTier: NORTHEAST

CustomerId: 2
CustomerName: Brian K.
LoyaltyTier: SOUTHWEST

CustomerId: 3
CustomerName: Cathy Z.
LoyaltyTier: CENTRAL
  • Customers 表中的所有行均已成功迁移。
  • 我们看到,由于 Spanner 上的 INTERLEAVE IN PARENTOrders 表中出现了 1 行失败 - CustomerId 99 是孤立的子项,因为 Customers 表中没有对应的行。

B. 检查 DLQ 中的有意失败:

上述失败记录在批量迁移流水线创建的死信队列 (DLQ) 文件夹中。

  1. 在 Google Cloud 控制台中,前往 Cloud Storage
  2. 前往您的存储分区,然后打开 bulk-migration/dlq/severe 文件夹。
  3. 检查其中的 JSON 文件。您会发现 Orders 行中包含孤立的 CustomerId
  4. 您可以按照此处所述的步骤重试批量迁移 DLQ 错误。

从 Cloud SQL 到 Cloud Spanner 的初始批量数据加载现已完成。下一步是设置实时复制,以捕获持续发生的变化。

10. 开始实时迁移 (CDC)

现在,批量数据加载已完成,接下来您将启动持续的 Dataflow 流式作业。此作业将读取 Datastream 正在写入 GCS 存储分区的变更数据捕获 (CDC) 事件,并近乎实时地将这些更改应用到 Cloud Spanner。

我们还将通过注入有效数据和故意注入的无效数据来测试此流水线,以观察 Dataflow 如何处理实时复制并将故障路由到死信队列 (DLQ)。

1. 创建实时迁移分片配置文件

与批量迁移(使用 JDBC 连接字符串)不同,实时迁移流水线从 GCS 读取 Datastream 事件。它需要完全不同的 JSON 配置,将 Datastream 流名称和数据库映射到逻辑 Spanner 分片。

在 Cloud Shell 中运行以下命令,以创建并上传实时分片配置:

cat <<EOF > live-sharding.json
{
  "StreamToDbAndShardMap": {
    "${STREAM_NAME_1}": {
      "shard0_db": "shard0_db",
      "shard1_db": "shard1_db"
    },
    "${STREAM_NAME_2}": {
      "shard2_db": "shard2_db",
      "shard3_db": "shard3_db"
    }
  }
}
EOF

export GCS_LIVE_SHARDING_PATH="gs://${BUCKET_NAME}/config/live-sharding.json"
gcloud storage cp live-sharding.json $GCS_LIVE_SHARDING_PATH

2. 运行实时迁移 Dataflow 作业

启动流式 Dataflow 作业,以从 GCS 读取数据并写入 Spanner。此模板将使用 GCS Pub/Sub 通知来立即处理新文件。

export JOB_NAME_CDC="mysql-sharded-cdc-to-spanner-$(date +%Y%m%d-%H%M%S)"
export DLQ_DIR_CDC="gs://${BUCKET_NAME}/live-migration"

gcloud dataflow flex-template run $JOB_NAME_CDC \
  --project=$PROJECT_ID \
  --region=$REGION \
--worker-machine-type=n2-highmem-8 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Cloud_Datastream_to_Spanner" \
  --parameters \
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
projectId="$PROJECT_ID",\
inputFileFormat="avro",\
gcsPubSubSubscription="projects/${PROJECT_ID}/subscriptions/${PUBSUB_SUBSCRIPTION}",\
shardingContextFilePath=$GCS_LIVE_SHARDING_PATH,\
deadLetterQueueDirectory="$DLQ_DIR_CDC",\
transformationJarPath=$CUSTOM_JAR_PATH,\
transformationClassName="com.custom.CustomTransformationFetcher",\
schemaOverridesFilePath=$GCS_OVERRIDES_PATH,\
datastreamSourceType="mysql",\
dlqRetryMinutes=1,\
dlqMaxRetryCount=2

关键形参

  • gcsPubSubSubscription:用于监听来自 GCS 的新文件通知的 Pub/Sub 订阅。这样,作业就可以在 Datastream 写入更改时立即处理这些更改。
  • inputFileFormat="avro":告知 Dataflow 预期会收到来自 Datastream 的 Avro 文件。此值必须与 Datastream“目标”配置(例如,avroFileFormatjsonFileFormat)相匹配。
  • shardingContextFilePath:一个 JSON 文件,用于将 Datastream 流映射到逻辑分片。
  • dlqRetryMinutes:死信队列重试之间的分钟数。默认设置为 10
  • dlqMaxRetryCount:可通过 DLQ 重试临时错误的次数上限。默认设置为 500

Dataflow 作业控制台中监控作业启动情况。

3. 注入实时数据并触发有意为之的故障

在 Dataflow 流式作业启动期间(可能需要 3-5 分钟),我们通过 SSH 连接到第一个物理 MySQL 虚拟机,并插入一些新记录。我们将插入一条有效记录和一条无效记录。

通过 SSH 连接到第一个物理分片:

gcloud compute ssh mysql-physical-1 --zone=$ZONE

登录 MySQL:

sudo mysql

shard1_db 上运行以下插入操作:

USE shard1_db;

-- 1. Valid Insert: 'MobileApp_v2' will be trimmed to 'MobileApp'
INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) 
VALUES (4, 501, 99.99, 'MobileApp_v2');

-- 2. Invalid Insert (DLQ Test): This violates Interleave constraint as CustomerId 99999 doesn't exist in Customers table.
INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) 
VALUES (99999, 502, 50.00, 'WebStore_v1');

-- 3. Valid Update
UPDATE Orders SET OrderValue = '1500' WHERE CustomerId = 5 AND OrderId = 202; 

-- 4. Valid Delete
DELETE FROM Orders WHERE CustomerId = 5 AND OrderId = 203; 

EXIT;

再次输入 exit 以返回到 Cloud Shell 提示符。

4. 验证实时迁移数据并检查 CDC DLQ

现在,我们已注入数据,Datastream 将捕获 CDC 事件,而 Dataflow 将尝试将这些事件应用于 Spanner。

A. 验证 Spanner 中的有效 DML 更改

运行以下查询,验证 INSERTUPDATEDELETE 事件是否已成功到达 Spanner,以及自定义转换是否已在插入和更新时触发。

# 1. Verify INSERT: Should return the new row with transformed OrderSource
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, OrderId, OrderValue, OrderSource FROM Orders WHERE CustomerId = 4 AND OrderId = 501"

# 2. Verify UPDATE: Should show OrderValue changed to 1500
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, OrderId, OrderValue, OrderSource FROM Orders WHERE CustomerId = 5 AND OrderId = 202"

# 3. Verify DELETE: Should return 0, confirming the order was deleted
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT COUNT(*) FROM Orders WHERE CustomerId = 5 AND OrderId = 203"

# 4. Verify DLQ Failure: Should return 0, confirming the row migration failed
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT COUNT(*) FROM Orders WHERE CustomerId = 99999 AND OrderId = 502"

预期输出:

CustomerId: 4
OrderId: 501
OrderValue: 99.99
OrderSource: MobileApp

CustomerId: 5
OrderId: 202
OrderValue: 1500
OrderSource: WebStore

0
0

注意:如果任何查询未显示预期结果,请稍等片刻,然后重试,因为流式处理工作器可能仍在处理队列。

B. 检查 DLQ 中的有意失败:

由于 CustomerId = 99999Customers 表中没有父级,因此它应该被 Spanner 拒绝,并由 Dataflow 安全地路由到 DLQ。

  1. 在 Google Cloud 控制台中,前往 Cloud Storage
  2. 前往您的存储分区,然后打开 live-migration/dlq/severe/ 文件夹。
  3. 您应该会看到新生成的 JSON 文件。点击这些文件即可检查内容。您将看到 CustomerId = 99999 的详细信息以及具体的 Spanner 错误消息:NOT_FOUND: Parent row for row [99999,502,WebStore] in table Orders is missing. Row cannot be written."
  4. 通过运行设置了 runMode=retryDLQ 的 Dataflow 模板,可以重试实时迁移 DLQ 错误。

5. 处理 DLQ 错误

severe/ 目录中的错误需要人工干预。让我们来修正数据问题并重新处理失败的事件。

A. 修复来源中的数据

发生此错误的原因是缺少父客户记录 CustomerId = 99999。我们将其插入到源 MySQL 数据库中。

再次通过 SSH 连接到 MySQL 实例:

gcloud compute ssh mysql-physical-1 --zone=$ZONE

使用 sudo mysql 登录 MySQL,并将缺少的父行插入 shard1_db

USE shard1_db;

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(99999, 'DLQ Parent Holder', 5000.00, 'NORTH_AMERICA');

EXIT;

输入 exit 以返回到 Cloud Shell。

B. 运行 retryDLQ Dataflow 作业

如需重新处理 severe/ DLQ 中的事件,您需要启动相同的 Dataflow 模板,但要以 retryDLQ 模式启动。此模式专门从 deadLetterQueueDirectory/severe 路径读取数据,通过自定义转换重新运行这些数据,然后将它们应用到 Spanner。

retryDLQ 模式下启动作业:

export JOB_NAME_RETRY="mysql-sharded-cdc-retry-$(date +%Y%m%d-%H%M%S)"

gcloud dataflow flex-template run $JOB_NAME_RETRY \
  --project=$PROJECT_ID \
  --region=$REGION \
  --worker-machine-type=n2-highmem-8 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Cloud_Datastream_to_Spanner" \
  --parameters \
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
projectId="$PROJECT_ID",\
runMode="retryDLQ",\
deadLetterQueueDirectory="$DLQ_DIR_CDC",\
datastreamSourceType="mysql",\
transformationJarPath=$CUSTOM_JAR_PATH,\
transformationClassName="com.custom.CustomTransformationFetcher",\
schemaOverridesFilePath=$GCS_OVERRIDES_PATH,\
shardingContextFilePath=$GCS_LIVE_SHARDING_PATH

重试的关键形参变更

  • runMode="retryDLQ":告知模板从 severe DLQ 目录读取数据。
  • 移除了 gcsPubSubSubscription:由于我们不从实时 Datastream GCS 存储分区读取数据,因此不需要此参数。

监控重试流程:

与主 CDC 流水线类似,retryDLQ 也是一个流式处理流水线,在手动取消之前会一直处于 RUNNING 状态。

  1. 前往 $JOB_NAME_RETRY 的 Dataflow 作业页面。
  2. 指标窗格下,查找以下两个计数器:
  3. elementsReconsumedFromDeadLetterQueue:在提取错误文件时进行评估。
  4. Successful events:在将记录写入 Spanner 时递增。
  5. 检查 severe/ 目录是否存在反复出现的故障。
  6. 当“成功”事件的计数器递增了您要重试的商品数量(在我们的测试用例中为 1)后,请前往下一个验证步骤。

C. 验证重试数据

在重试失败的记录后(可能需要一段时间才能成功),检查 Spanner 以查看子行是否已成功迁移:

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, OrderId, OrderValue, OrderSource FROM Orders WHERE CustomerId = 99999 AND OrderId = 502"

您现在应该会看到以下行:

CustomerId: 99999
OrderId: 502
OrderValue: 50
OrderSource: WebStore

另请检查 GCS 中的 $DLQ_DIR_CDC/severe/ 文件夹。处理后的文件应已移动或删除,这表示重新处理成功。

11. 设置反向复制(从 Spanner 到 MySQL)

为了应对可能需要回滚或在过渡期内使原始 MySQL 数据库与 Spanner 保持同步的场景,您可以设置反向复制。

此流水线使用 Spanner 变更数据流来捕获 Spanner 中的实时修改。然后,它会使用我们的自定义转换 JAR 来反向映射架构差异,并使用我们的自定义分片 JAR 来准确计算更新应写回哪个物理 MySQL 虚拟机和逻辑分片。

1. 创建 Spanner 变更数据流

首先,您需要在 Spanner 数据库中创建一个变更数据流,以跟踪 CustomersOrders 表中的变更。

export CHANGE_STREAM_NAME="CustomersOrdersChangeStream"

gcloud spanner databases ddl update $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --ddl="CREATE CHANGE STREAM $CHANGE_STREAM_NAME FOR Customers, Orders"

此变更数据流现在将记录对指定表的所有数据修改。

2. 为 Dataflow 元数据创建 Spanner 数据库

Spanner to SourceDB Dataflow 模板需要单独的 Spanner 数据库来存储元数据,以便管理变更数据流使用情况。

export SPANNER_METADATA_DB_NAME="migration-metadata-db"

gcloud spanner databases create $SPANNER_METADATA_DB_NAME \
  --instance=$SPANNER_INSTANCE_NAME

3. 为 Dataflow 准备 Cloud SQL 连接配置

Dataflow 模板需要 Cloud Storage 中的一个 JSON 文件,其中包含目标 Cloud SQL 数据库的连接详细信息。

创建名为 shard_config.json 的本地文件:

cat <<EOF > reverse-sharding.json
[
  {
    "logicalShardId": "shard0_db",
    "host": "${MYSQL_IP_1}",
    "port": "3306",
    "user": "datastream_user",
    "password": "complex_password_123",
    "dbName": "shard0_db"
  },
  {
    "logicalShardId": "shard1_db",
    "host": "${MYSQL_IP_1}",
    "port": "3306",
    "user": "datastream_user",
    "password": "complex_password_123",
    "dbName": "shard1_db"
  },
  {
    "logicalShardId": "shard2_db",
    "host": "${MYSQL_IP_2}",
    "port": "3306",
    "user": "datastream_user",
    "password": "complex_password_123",
    "dbName": "shard2_db"
  },
  {
    "logicalShardId": "shard3_db",
    "host": "${MYSQL_IP_2}",
    "port": "3306",
    "user": "datastream_user",
    "password": "complex_password_123",
    "dbName": "shard3_db"
  }
]
EOF

将此文件上传到您的 GCS 存储分区:

export GCS_REVERSE_SHARDING_PATH="gs://${BUCKET_NAME}/config/reverse-sharding.json"
gcloud storage cp reverse-sharding.json $GCS_REVERSE_SHARDING_PATH

4. 运行反向复制 Dataflow 作业

使用 Spanner_to_SourceDb Flex 模板启动 Dataflow 作业。

export JOB_NAME_REVERSE="spanner-sharded-reverse-to-mysql-$(date +%Y%m%d-%H%M%S)"
export DLQ_DIR_REVERSE="gs://${BUCKET_NAME}/reverse-replication"

gcloud dataflow flex-template run $JOB_NAME_REVERSE \
  --project=$PROJECT_ID \
  --region=$REGION \
--worker-machine-type=n2-highmem-8 \
--max-workers=2 \
--num-workers=1 \
--additional-experiments=use_runner_v2 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Spanner_to_SourceDb" \
  --parameters \
changeStreamName="$CHANGE_STREAM_NAME",\
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
spannerProjectId="$PROJECT_ID",\
metadataInstance="$SPANNER_INSTANCE_NAME",\
metadataDatabase="$SPANNER_METADATA_DB_NAME",\
sourceShardsFilePath="$GCS_REVERSE_SHARDING_PATH",\
transformationJarPath=$CUSTOM_JAR_PATH,\
transformationClassName="com.custom.CustomTransformationFetcher",\
shardingCustomJarPath=$CUSTOM_JAR_PATH,\
shardingCustomClassName="com.custom.CustomShardIdFetcher",\
schemaOverridesFilePath=$GCS_OVERRIDES_PATH,\
deadLetterQueueDirectory=$DLQ_DIR_REVERSE

关键形参

  • changeStreamName:要从中读取数据的 Spanner 变更数据流的名称。
  • metadataInstance, metadataDatabase:用于存储元数据的 Spanner 实例/数据库,连接器使用这些元数据来控制变更数据流 API 数据的使用。
  • sourceShardsFilePathshard_config.json 的 GCS 路径。
  • filtrationMode:指定如何根据条件舍弃某些记录。默认值为 forward_migration(过滤使用正向迁移流水线写入的记录)
  • shardingCustomJarPath:我们之前构建的已编译 Java JAR 文件的 GCS 路径。
  • shardingCustomClassName:执行自定义 %4 取模数学运算以动态确定哪个逻辑分片应接收记录的完全限定类名称 (com.custom.CustomShardIdFetcher)。

网络注意事项:Dataflow 工作器将使用 shard_config.json 中指定的公共 IP 连接到 Cloud SQL 实例。由于 Cloud SQL 实例的“已获授权的网络”中存在 0.0.0.0/0 条目,因此允许此连接。

Dataflow 作业控制台中监控作业启动情况。

5. 注入 Spanner 数据并触发有意为之的故障

等待 Dataflow 作业进入 Running 状态(这可能需要大约 5 分钟)。然后,我们直接在 Spanner 中执行全套查询(INSERTUPDATEDELETE),并故意让其中一个查询失败,以测试反向 DLQ。

在 Cloud Shell 中运行下面的命令:

# All these operations are done on rows mapping to shard0_db for convenience

# Valid INSERT: Insert parent row in Customers
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LoyaltyTier) VALUES (88, 'Reverse Tester', 5000, 'GOLD_TIER')"

# 1. Valid INSERT (Orders): 'WebStore' transformed to 'WebStore_v1'
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="INSERT INTO Orders (CustomerId, OrderId, OrderValue, OrderSource) VALUES (88, 9001, 150.00, 'WebStore')"

# 2. Valid UPDATE (Orders)
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="UPDATE Orders SET OrderValue = 200.00 WHERE CustomerId = 16 AND OrderId = 105 AND OrderSource = 'Partner'"

# 3. Valid DELETE (Orders)
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="DELETE FROM Orders WHERE CustomerId = 12 AND OrderId = 104 AND OrderSource = 'WebStore'"

# 4. INVALID Insert- DLQ Test: CreditLimit=500 will fail check constraint of >1000 at source 
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LoyaltyTier) VALUES (44, 'DLQ Test Customer', 500, 'GOLD_TIER')"

6. 验证反向复制数据并检查 DLQ

我们来确认一下,自定义分片 JAR 是否已成功将 CustomerId 88 路由到第一个物理虚拟机上的 shard0_db,以及自定义转换 JAR 是否已成功从区域中剥离 "_TIER"

A. 验证 MySQL 中的有效记录:

通过 SSH 连接到第一个物理分片:

gcloud compute ssh mysql-physical-1 --zone=$ZONE

登录 MySQL 并查询 shard0_db

sudo mysql
USE shard0_db;

-- 1. Verify INSERT: Row migrated with transformed LegacyOrderSystem
SELECT CustomerId, OrderId, OrderValue, LegacyOrderSystem 
FROM Orders 
WHERE CustomerId = 88 AND OrderId = 9001;

-- 2. Verify UPDATE: The OrderValue should now be updated to 200.00.
SELECT CustomerId, OrderId, OrderValue, LegacyOrderSystem 
FROM Orders 
WHERE CustomerId = 16 AND OrderId = 105;

-- 3. Verify DELETE: Returns 0 rows, confirming the order was successfully deleted from MySQL.
SELECT CustomerId, OrderId 
FROM Orders 
WHERE CustomerId = 12 AND OrderId = 104;

-- 4. Verify failed replication - this should be in DLQ as CreditLimit < 1000 and will fail stricter check constraint at source 
SELECT CustomerId, CustomerName, CreditLimit, LegacyRegion
FROM Customers
WHERE CustomerId = 44;

EXIT;

Cloud SQL 中的预期输出应反映在 Spanner 中所做的更改。

+------------+---------+------------+-------------------+
| CustomerId | OrderId | OrderValue | LegacyOrderSystem |
+------------+---------+------------+-------------------+
|         88 |    9001 |     150.00 | Webstore_v1       |
+------------+---------+------------+-------------------+

+------------+---------+------------+-------------------+
| CustomerId | OrderId | OrderValue | LegacyOrderSystem |
+------------+---------+------------+-------------------+
|         16 |     105 |     200.00 | Partner_v1        |
+------------+---------+------------+-------------------+

Empty set (0.00 sec)
Empty set (0.00 sec)

类型

exit

返回到 Cloud Shell。

这会确认反向复制流水线是否正常运行,是否能将 Spanner 中的更改同步回 Cloud SQL。

B. 检查 DLQ 中的有意失败

由于我们的新 Customers 记录的 CreditLimit 为 500(违反了我们在源 MySQL 数据库中定义的严格 > 1000 检查约束),Dataflow 安全地捕获了该错误。

  1. 在 Google Cloud 控制台中,前往 Cloud Storage
  2. 前往您的存储分区,然后打开 dlq/severe/ 文件夹。
  3. 打开 JSON 文件,查看被拒绝的 Customers 记录和确切的检查约束违规错误。
  4. 可以通过运行设置了 runMode=retryDLQ 的 Dataflow 模板来重试反向复制 DLQ 错误。

12. 清理资源

为避免您的 Google Cloud 账号产生进一步费用,请删除在此 Codelab 中创建的资源。

设置环境变量(如果需要)

如果您的 Cloud Shell 会话超时或您打开了新终端,则需要在运行清理命令之前重新导出环境变量。

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1" # Or your preferred region
export ZONE="us-central1-a" # Or a zone within your selected region
export SPANNER_INSTANCE_NAME="target-spanner-instance"
export SPANNER_DATABASE_NAME="sharded-target-db"
export SPANNER_CONFIG="regional-${REGION}"
export BUCKET_NAME="migration-${PROJECT_ID}-bucket"
export MYSQL_IP_1=$(gcloud compute instances describe mysql-physical-1 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)')
export MYSQL_IP_2=$(gcloud compute instances describe mysql-physical-2 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)')
export SQL_CP_NAME_1="mysql-src-cp-1"
export SQL_CP_NAME_2="mysql-src-cp-2"
export GCS_CP_NAME="gcs-dest-cp"
export STREAM_NAME_1="mysql-to-spanner-stream-1"
export GCS_STREAM_PATH_1="data/${STREAM_NAME_1}"
export STREAM_NAME_2="mysql-to-spanner-stream-2"
export GCS_STREAM_PATH_2="data/${STREAM_NAME_2}"
export PUBSUB_TOPIC="datastream-gcs-updates"
export PUBSUB_SUBSCRIPTION="datastream-gcs-sub"
export CUSTOM_JAR_PATH="gs://${BUCKET_NAME}/custom-logic/spanner-custom-shard-1.0.jar"
export OVERRIDES_FILE="spanner_overrides.json" 
export GCS_OVERRIDES_PATH="gs://${BUCKET_NAME}/config/${OVERRIDES_FILE}"
export GCS_SHARDING_PATH="gs://${BUCKET_NAME}/config/sharding.json"
export OUTPUT_DIR="gs://${BUCKET_NAME}/bulk-migration"
export GCS_LIVE_SHARDING_PATH="gs://${BUCKET_NAME}/config/live-sharding.json"
export DLQ_DIR_CDC="gs://${BUCKET_NAME}/live-migration"
export CHANGE_STREAM_NAME="CustomersOrdersChangeStream"
export SPANNER_METADATA_DB_NAME="migration-metadata-db"
export GCS_REVERSE_SHARDING_PATH="gs://${BUCKET_NAME}/config/reverse-sharding.json"
export DLQ_DIR_REVERSE="gs://${BUCKET_NAME}/reverse-replication"

停止 Dataflow 流式处理作业

列出您的作业,以查找正在运行的 Dataflow 作业的作业 ID。相应地导出 JOB_ID_CDCJOB_ID_REVERSE

gcloud dataflow jobs list --region=$REGION --filter="state=Running"
export JOB_ID_CDC=<PASTE_JOB_ID_HERE>
export JOB_ID_CDC_RETRY=<PASTE_JOB_ID_HERE>
export JOB_ID_REVERSE=<PASTE_JOB_ID_HERE>

取消 Datastream to Spanner(实时迁移)作业及其重试作业:

gcloud dataflow jobs cancel $JOB_ID_CDC --region=$REGION --project=$PROJECT_ID

gcloud dataflow jobs cancel $JOB_ID_CDC_RETRY --region=$REGION --project=$PROJECT_ID

取消 Spanner to Cloud SQL(反向复制)作业:

gcloud dataflow jobs cancel $JOB_ID_REVERSE --region=$REGION --project=$PROJECT_ID

删除 Datastream 资源

停止并删除数据流:

gcloud datastream streams update $STREAM_NAME_1 \
  --location=$REGION --state=PAUSED --project=$PROJECT_ID
# Wait a moment for the stream to pause
gcloud datastream streams delete $STREAM_NAME_1 \
  --location=$REGION --project=$PROJECT_ID --quiet

gcloud datastream streams update $STREAM_NAME_2 \
  --location=$REGION --state=PAUSED --project=$PROJECT_ID
# Wait a moment for the stream to pause
gcloud datastream streams delete $STREAM_NAME_2 \
  --location=$REGION --project=$PROJECT_ID --quiet

# Delete Connection Profiles
gcloud datastream connection-profiles delete $SQL_CP_NAME_1 \
  --location=$REGION --project=$PROJECT_ID --quiet
gcloud datastream connection-profiles delete $SQL_CP_NAME_2 \
  --location=$REGION --project=$PROJECT_ID --quiet
gcloud datastream connection-profiles delete $GCS_CP_NAME \
  --location=$REGION --project=$PROJECT_ID --quiet

删除源 MySQL 虚拟机 (Compute Engine)

删除模拟本地 MySQL 物理分片的两个 Compute Engine 实例。

gcloud compute instances delete mysql-physical-1 mysql-physical-2 --zone=$ZONE --quiet

删除防火墙规则

移除为允许 SSH 访问和 Datastream 连接到虚拟机而创建的网络防火墙规则。(注意:如果您在 Codelab 前面的步骤中为防火墙规则使用了其他名称,请在此处进行调整)。

gcloud compute firewall-rules delete allow-ssh-iap --quiet
gcloud compute firewall-rules delete allow-mysql-datastream --quiet

删除 Pub/Sub 资源

删除订阅:

gcloud pubsub subscriptions delete $PUBSUB_SUBSCRIPTION \
  --project=$PROJECT_ID --quiet

删除主题:

gcloud pubsub topics delete $PUBSUB_TOPIC \
  --project=$PROJECT_ID --quiet

删除 Cloud Spanner 实例

删除 Cloud Spanner 实例(这会自动删除其中的 sharded-target-dbmigration-metadata-db 数据库)。

gcloud spanner instances delete $SPANNER_INSTANCE_NAME \
  --project=$PROJECT_ID --quiet

删除 GCS 存储分区和内容

最后,删除包含 Datastream 文件、Dataflow 配置和 Dead Letter Queue 的 Cloud Storage 存储分区。rm -r 命令会以递归方式删除存储分区及其所有内容。

gcloud storage rm --recursive gs://${BUCKET_NAME}

删除本地 Cloud Shell 文件

如需清理在此 Codelab 期间在 Cloud Shell 中生成的本地文件和目录,请运行以下命令:

# Remove the JSON configuration files
rm -f sharding.json live-sharding.json reverse-sharding.json spanner_overrides.json

# Remove the cloned Google Cloud DataflowTemplates repository
rm -rf DataflowTemplates