1. 事前準備
本程式碼研究室將逐步說明如何將 Cloud SQL 上的單一 MySQL 資料庫,遷移至採用 GoogleSQL 方言的 Cloud Spanner 資料庫。重點在於基本的端對端遷移流程,示範核心步驟。您將使用 Google Cloud 服務,包括 Spanner 移轉工具 (SMT)、Dataflow、Datastream、PubSub 和 Google Cloud Storage。
報告內容:
- 如何設定 Cloud SQL 和 Cloud Spanner 範例執行個體。
- 說明如何使用 Spanner 遷移工具 (SMT),將 Cloud SQL MySQL 結構定義轉換為與 Spanner 相容的結構定義。
- 說明如何使用 Dataflow,從 Cloud SQL 大量遷移資料至 Cloud Spanner。
- 瞭解如何使用 Datastream 和 Dataflow,從 Cloud SQL 設定持續性複寫 (CDC) 至 Cloud Spanner。
- 如何設定從 Cloud Spanner 到 Cloud SQL 的反向複製。
本程式碼研究室不會涵蓋以下主題:
- 從分片執行個體遷移。
- 遷移期間的複雜資料轉換。
- 進階錯誤處理或無效信件佇列 (DLQ)。
- 調整遷移作業效能。
- 應用程式遷移:本程式碼研究室著重於資料庫層 (結構定義和資料)。不包含重新部署或遷移應用程式服務的作業程序。
軟硬體需求
- 已啟用計費功能的 Google Cloud 專案。
- 具備足夠的 IAM 權限,可啟用 API,以及建立/管理 Cloud SQL、Spanner、Dataflow、Datastream 和 GCS 資源。雖然專案
Owner角色最適合程式碼研究室,但「環境設定」會涵蓋更具體的角色。 - 網路瀏覽器,例如 Google Chrome。
- 基本熟悉 Google Cloud 控制台和
gcloud等指令列工具。 - 存取 Shell 環境。建議使用 Cloud Shell,因為當中已包含
gcloud。
如要進一步瞭解上述設定,請參閱「環境設定」一節。
2. 瞭解遷移程序
遷移資料庫時,需要將來源 Cloud SQL 資料庫執行個體的資料遷移至 Spanner 執行個體。本節將說明遷移作業所用的架構和重要工具。
遷移流程架構
遷移程序包括以下階段:
1. 結構定義轉換:
- 用途:將來源資料庫結構定義轉換為相容的 Cloud Spanner 結構定義。
- 工具:Spanner 遷移工具 (SMT)
- 程序:SMT 會分析來源資料庫結構定義,並產生對等的 Spanner 資料定義語言 (DDL)。系統會在目標 Spanner 執行個體中建立資料庫,然後自動套用 DDL。
2. 大量資料遷移:
- 用途:從來源資料庫將現有資料完整載入至已佈建的 Spanner 資料表。
- 工具:Dataflow,使用 Google 提供的
Sourcedb to Spanner範本。 - 程序:這項 Dataflow 工作會讀取指定來源資料表的所有資料,並寫入對應的 Spanner 資料表。這項操作會在建立 Spanner 結構定義後進行。
3. 即時遷移 (CDC):
- 用途:擷取來源資料庫的持續變更,並以近乎即時的方式套用至 Cloud Spanner,盡量減少遷移期間的停機時間。
- 工具:
- Datastream:從來源資料庫擷取變更 (插入、更新、刪除),然後寫入 Cloud Storage (GCS)。
- Dataflow:使用
Datastream to Spanner範本從 GCS 讀取變更事件,並套用至 Cloud Spanner。
4. 反向複製:
- 用途:將 Cloud Spanner 的資料變更複製回來源資料庫。這項功能可用於備援策略、分階段遷移,或在來源中維護副本,以因應特定用途。
- 工具:Dataflow,使用
Spanner to SourceDb範本。 - 處理程序:這項工作會使用 Spanner 變更串流擷取 Spanner 中的修改內容,並將這些內容寫回來源資料庫執行個體。
下圖說明元件和資料流程:

重要術語:
- Spanner 遷移工具 (SMT):這項工具可用於評估 MySQL 結構定義、建議對應的 Spanner 結構定義,以及產生 Spanner 資料定義語言 (DDL)。
- 資料定義語言 (DDL):用於定義及修改資料庫結構的陳述式,例如
CREATE TABLE陳述式。SMT 會根據 Cloud SQL 結構定義產生 Spanner DDL。 - Dataflow:全代管的無伺服器資料處理服務。在本程式碼研究室中,這項工具用於執行 Google 提供的範本,以進行大量資料移轉、套用 Datastream 變更,以及反向複製。
- Datastream:無伺服器變更資料擷取 (CDC) 和複製服務。在本程式碼研究室中,這項服務用於將 Cloud SQL 的變更串流至 Cloud Storage。
- Spanner 變更串流:Spanner 功能,可即時串流輸出資料變更 (插入、更新、刪除),做為反向複製的來源。
- Pub/Sub:訊息服務,用於分離產生事件的服務與處理事件的服務。在本程式碼研究室中,每當 Datastream 將新的變更檔案上傳至 Cloud Storage 時,就會觸發 Dataflow 處理更新。
3. 環境設定
開始遷移前,您需要設定 Google Cloud 專案並啟用必要服務。
1. 選取或建立 Google Cloud 專案
您必須擁有已啟用計費功能的 Google Cloud 專案,才能使用本程式碼研究室中的服務。
- 在 Google Cloud 控制台中,前往專案選取器頁面:前往專案選取器
- 選取或建立 Google Cloud 專案。
- 請確認您已為專案啟用計費功能。瞭解如何確認專案已啟用計費功能。
2. 開啟 Cloud Shell
Cloud Shell 是在 Google Cloud 中運作的指令列環境,已預先載入 gcloud CLI 和其他必要工具。
- 按一下 Google Cloud 控制台右上角的「啟用 Cloud Shell」按鈕。
- 系統會在主控台底部的新頁框中開啟 Cloud Shell 工作階段,並顯示指令列提示。

3. 設定專案和環境變數
在 Cloud Shell 中,為專案 ID 和您要使用的區域設定一些環境變數。
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1" # Or your preferred region
export ZONE="us-central1-a" # Or a zone within your selected region
gcloud config set project $PROJECT_ID
gcloud config set compute/region $REGION
gcloud config set compute/zone $ZONE
echo "Project ID: $PROJECT_ID"
echo "Region: $REGION"
echo "Zone: $ZONE"
4. 啟用必要的 Google Cloud API
啟用 Cloud Spanner、Dataflow、Datastream 和其他相關服務所需的 API。
gcloud services enable \
spanner.googleapis.com \
dataflow.googleapis.com \
datastream.googleapis.com \
pubsub.googleapis.com \
storage.googleapis.com \
compute.googleapis.com \
sqladmin.googleapis.com \
servicenetworking.googleapis.com \
cloudresourcemanager.googleapis.com
這個指令可能需要幾分鐘才能完成。
5. 設定服務帳戶權限
Dataflow 工作和 Datastream 需要特定權限,才能與其他 Google Cloud 服務互動。本程式碼研究室中的 Dataflow 工作會使用預設的 Compute Engine 服務帳戶。
首先,請取得專案編號:
export PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)")
export SA_EMAIL="${PROJECT_NUMBER}-compute@developer.gserviceaccount.com"
現在,請將必要的 IAM 角色授予 Compute Engine 預設服務帳戶:
# Role for Dataflow to run jobs
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/dataflow.admin" \
--condition=None
# Roles for Dataflow workers
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/dataflow.worker" \
--condition=None
# Role to connect to Cloud SQL instance
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/cloudsql.client" \
--condition=None
# Role to read/write from Cloud Spanner
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/spanner.databaseUser" \
--condition=None
# Role to access GCS buckets (Datastream output, Dataflow temp, JDBC driver)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/storage.objectAdmin" \
--condition=None
# Roles for Datastream and Pub/Sub (for CDC)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/datastream.viewer"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/pubsub.subscriber"
6. 建立 Cloud Storage bucket
在與其他資源相同的地區中建立 GCS 值區。這個值區會儲存 JDBC 驅動程式和 Datastream 輸出內容,並供 Dataflow 儲存暫存檔案。
export BUCKET_NAME="migration-${PROJECT_ID}-bucket"
gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION
echo "Created bucket: gs://$BUCKET_NAME"
7. 安裝 Spanner 遷移工具 (SMT)
確認 Spanner 遷移工具 (SMT) 已安裝在 Cloud Shell 環境中。
sudo apt-get update && sudo apt-get install google-cloud-cli-spanner-migration-tool
# Verify installation
gcloud alpha spanner migrate web --help
這項指令應會顯示 SMT 網頁介面的說明資訊,確認 gcloud 元件已安裝。本程式碼研究室將使用 SMT 的 CLI 功能,這些功能與 SMT 屬於同一元件。
4. 設定來源 Cloud SQL 資料庫
在本節中,您將建立及設定具有公開 IP 的 MySQL 適用的 Cloud SQL 執行個體,做為來源資料庫。
1. 建立 MySQL 適用的 Cloud SQL 執行個體
在 Cloud Shell 中執行下列 gcloud 指令,建立 MySQL 8.0 執行個體。已啟用二進位記錄 (Datastream 必須啟用),且執行個體已設定公開 IP。
export SQL_INSTANCE_NAME="source-mysql-instance"
export DB_ROOT_PASSWORD="Welcome@1" # Replace with a strong password if you prefer
gcloud sql instances create $SQL_INSTANCE_NAME \
--database-version=MYSQL_8_0 \
--tier=db-n1-standard-2 \
--region=$REGION \
--root-password=$DB_ROOT_PASSWORD \
--enable-bin-log \
--assign-ip
--enable-bin-log:Datastream 擷取變更時必須使用這項設定。--assign-ip:確保執行個體取得公開 IP 位址。
執行個體建立作業需要幾分鐘才能完成。您可以在 Cloud SQL 執行個體頁面檢查執行個體是否已建立。
2. 設定授權網路
如要透過公開 IP 連線至執行個體,請將 IP 位址新增至「授權網路」清單。
取得 Cloud Shell IP:
export CLOUD_SHELL_IP=$(curl -s ipinfo.io/ip)
echo "Your Cloud Shell IP: $CLOUD_SHELL_IP"
授權 Cloud Shell IP 和開放存取權
下列指令會新增 Cloud Shell IP。並新增 0.0.0.0/0,允許透過任何 IP 位址存取。這樣做是為了簡化 Dataflow worker 的連線,不必進行複雜的網路設定。
gcloud sql instances patch $SQL_INSTANCE_NAME \
--authorized-networks="${CLOUD_SHELL_IP}/32,0.0.0.0/0"
3. 從 Cloud Shell 連線至 Cloud SQL 執行個體
擷取指派的公開 IP 位址
export SQL_INSTANCE_IP=$(gcloud sql instances list --filter="name=$SQL_INSTANCE_NAME" --format="value(PRIMARY_ADDRESS)")
echo "Cloud SQL Public IP: $SQL_INSTANCE_IP"
這個 IP 位址將用於連線。
從 Cloud Shell 連線至 Cloud SQL 執行個體
使用標準 mysql 用戶端連線,並使用取得的公開 IP 位址:
mysql -h $SQL_INSTANCE_IP -u root -p
出現提示時,請輸入您設定的根密碼 (Welcome@1)。現在您會看到 mysql> 提示。
4. 建立資料庫和範例資料
在 mysql> 提示字元中執行下列 SQL 指令:
CREATE DATABASE music_db;
USE music_db;
CREATE TABLE Singers (
SingerId BIGINT NOT NULL,
FirstName VARCHAR(1024),
LastName VARCHAR(1024),
BirthDate DATE,
AlbumCount BIGINT,
PRIMARY KEY (SingerId)
);
CREATE TABLE Albums (
SingerId BIGINT NOT NULL,
AlbumId BIGINT NOT NULL,
AlbumTitle VARCHAR(1024),
ReleaseDate DATE,
PRIMARY KEY (SingerId, AlbumId),
CONSTRAINT FK_Albums_Singers FOREIGN KEY (SingerId) REFERENCES Singers (SingerId)
);
INSERT INTO Singers (SingerId, FirstName, LastName, BirthDate, AlbumCount) VALUES
(1, 'Marc', 'Richards', '1970-09-03', 2),
(2, 'Catalina', 'Smith', '1990-08-17', 1),
(3, 'Alice', 'Trentor', '1991-10-02', 3);
INSERT INTO Albums (SingerId, AlbumId, AlbumTitle, ReleaseDate) VALUES
(1, 1, 'Total Junk', '2014-03-15'),
(1, 2, 'Go Go Go', '2016-11-01'),
(2, 1, 'Green', '2018-02-28'),
(3, 1, 'Blue', '2019-01-10'),
(3, 2, 'Red', '2020-05-22'),
(3, 3, 'Purple', '2022-11-11');
如要查看上述架構的傾印檔案,請按這裡。
5. 驗證資料
快速檢查資料是否已存在:
SELECT 'Singers music_db' as tbl, COUNT(*) FROM music_db.Singers
UNION ALL
SELECT 'Albums music_db', COUNT(*) FROM music_db.Albums;
EXIT;
畫面上應該會顯示每個表格的計數。
+------------------+----------+ | tbl | COUNT(*) | +------------------+----------+ | Singers music_db | 3 | | Albums music_db | 6 | +------------------+----------+
5. 設定 Cloud Spanner
接下來,請設定要遷移資料的目標 Cloud Spanner 執行個體。
1. 建立 Cloud Spanner 執行個體
在與 Cloud SQL 執行個體相同的區域中建立 Cloud Spanner 執行個體。這個指令會建立適合本程式碼研究室的小型執行個體,並使用 100 個處理單元。
export SPANNER_INSTANCE_NAME="target-spanner-instance"
export SPANNER_DATABASE_NAME="music-db-migrated"
export SPANNER_CONFIG="regional-${REGION}"
gcloud spanner instances create $SPANNER_INSTANCE_NAME \
--config=$SPANNER_CONFIG \
--description="Target Spanner Instance" \
--processing-units=100
建立執行個體可能需要一到兩分鐘。
6. 使用 Spanner 遷移工具 (SMT) 轉換結構定義
使用 SMT CLI 分析 MySQL 資料庫 (music_db),並產生 Spanner 結構定義語言 (DDL)。由於 Cloud SQL 執行個體已設定公開 IP 和適當的授權網路,因此 SMT 可以直接連線。
1. 準備 SMT 環境
確認先前步驟中設定的必要環境變數:
echo "Cloud SQL Instance Public IP: $SQL_INSTANCE_IP"
echo "Cloud SQL Root Password: $DB_ROOT_PASSWORD"
echo "Spanner Instance: $SPANNER_INSTANCE_NAME"
echo "Spanner Database: $SPANNER_DATABASE_NAME"
echo "Project ID: $PROJECT_ID"
2. 為 music_db 執行結構定義轉換
執行 SMT schema 指令,直接連線至 Cloud SQL 公開 IP 位址:
gcloud alpha spanner migrate schema \
--source=mysql \
--source-profile="host=${SQL_INSTANCE_IP},port=3306,user=root,password=${DB_ROOT_PASSWORD},dbName=music_db" \
--target-profile="project=${PROJECT_ID},instance=${SPANNER_INSTANCE_NAME},dbName=${SPANNER_DATABASE_NAME}" \
--prefix="music-db"
這項指令會透過 Proxy 連線至 Cloud SQL 執行個體,並產生以 music-db 為前置字元的結構定義檔案。
3. 查看產生的檔案
SMT 會在目前的目錄中建立幾個檔案。主要項目如下:
music-db.schema.ddl.txt:產生的 Spanner DDL 陳述式。music-db-.overrides.json:包含手動對應變更的結構定義覆寫檔案。music-db.session.json:結構定義遷移作業的階段檔案。music-db.report.txt:結構定義轉換的評估報告。
你可以使用 ls music-db-* 列出這些項目
4. 在 Cloud Spanner 中驗證結構定義
確認資料表已在 Spanner 資料庫中建立。
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="SELECT table_name FROM information_schema.tables WHERE table_schema = '' ORDER BY table_name"
您應該會看到以下的輸出內容:
table_name: Albums table_name: Singers
選用:如要檢查 Spanner DDL,請執行下列指令:
gcloud spanner databases ddl describe $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME
7. 初始化變更資料擷取 (CDC)
在本節中,您將為遷移作業設定「記錄器」。在大量載入資料前設定 Datastream 和 Pub/Sub,可確保系統擷取並排隊處理來源資料庫的所有變更,避免遷移期間發生資料遺失。這是即時遷移的必要設定。
1. 建立 Datastream 連線設定檔
來源設定檔 (Cloud SQL)
這個設定檔會連線至 Cloud SQL 執行個體的公開 IP。Datastream 會使用 IP 許可清單進行連線。
export SQL_CP_NAME="mysql-src-cp"
gcloud datastream connection-profiles create $SQL_CP_NAME \
--location=$REGION \
--type=mysql \
--mysql-hostname=$SQL_INSTANCE_IP \
--mysql-port=3306 \
--mysql-username=root \
--mysql-password=$DB_ROOT_PASSWORD \
--display-name="Cloud SQL Source - Public IP"
注意:這項連線取決於 Cloud SQL 執行個體的授權網路是否允許存取。如先前使用 0.0.0.0/0 設定,Datastream 的公開 IP 即可連線。在實際工作環境中,您會將 0.0.0.0/0 換成 Datastream IP 許可清單和區域中列出的區域專屬 IP 範圍。
目的地設定檔 (Cloud Storage)
指向 bucket 的根目錄。
export GCS_CP_NAME="gcs-dest-cp"
gcloud datastream connection-profiles create $GCS_CP_NAME \
--location=$REGION \
--type=google-cloud-storage \
--bucket=$BUCKET_NAME \
--root-path=/ \
--display-name="GCS Destination" --force
2. 建立 Datastream 串流
建立要從 music_db 複製的串流。
export STREAM_NAME="mysql-to-spanner-stream"
export GCS_STREAM_PATH="data/${STREAM_NAME}"
gcloud datastream streams create $STREAM_NAME \
--location=$REGION \
--display-name="MySQL to Spanner CDC Stream" \
--source=$SQL_CP_NAME \
--destination=$GCS_CP_NAME \
--mysql-source-config=<(echo "
includeObjects:
mysqlDatabases:
- database: 'music_db'
") \
--gcs-destination-config=<(echo "
path: ${GCS_STREAM_PATH}
fileRotationMb: 5
fileRotationInterval: 15s
avroFileFormat: {}
") \
--backfill-none
- 資料串流會將檔案寫入
gs://${BUCKET_NAME}/${GCS_STREAM_PREFIX}/底下 - Datastream 會以 Avro 格式寫入檔案。執行即時遷移指令時,我們會將 inputFileFormat 指定為 avro,讓管道正確處理檔案。
- 使用較小的檔案輪替設定,有助於在程式碼研究室中更快看到變化。
這個指令可能需要一段時間才能完成。查看狀態:gcloud datastream streams describe $STREAM_NAME --location=$REGION。
3. 啟動 Datastream 串流
gcloud datastream streams update $STREAM_NAME \
--location=$REGION \
--state=RUNNING
檢查狀態:gcloud datastream streams describe $STREAM_NAME --location=$REGION.狀態一開始會是 STARTING,過一段時間後會變成 RUNNING。確認處於 RUNNING 狀態後,再繼續執行下一步。
4. 為 GCS 通知設定 Pub/Sub
建立 Pub/Sub 主題:
export PUBSUB_TOPIC="datastream-gcs-updates"
gcloud pubsub topics create $PUBSUB_TOPIC
建立 GCS 通知
在 data/ 前置字串下建立物件時傳送通知。
gcloud storage buckets notifications create gs://${BUCKET_NAME} --topic=projects/$PROJECT_ID/topics/$PUBSUB_TOPIC --payload-format=json --object-prefix=data/
建立 Pub/Sub 訂閱項目
加入建議的確認期限。
export PUBSUB_SUBSCRIPTION="datastream-gcs-sub"
gcloud pubsub subscriptions create $PUBSUB_SUBSCRIPTION \
--topic=$PUBSUB_TOPIC \
--ack-deadline=600
8. 從 Cloud SQL 大量遷移資料至 Spanner
Spanner 結構定義就位後,現在要將現有資料從 Cloud SQL music_db 資料庫複製到 Cloud Spanner。您將使用 Sourcedb to Spanner Dataflow 彈性範本,這個範本專為從 JDBC 可存取的資料庫大量複製資料到 Spanner 而設計。
1. 為 music_db 執行大量遷移 Dataflow 工作
在 Cloud Shell 中執行下列指令,啟動 Dataflow 工作。這項指令會使用 gcloud dataflow flex-template run 指令,參照 Google 提供的範本,大量將 JDBC 遷移至 Spanner。
export JOB_NAME_MUSIC="mysql-music-db-to-spanner-bulk-$(date +%Y%m%d-%H%M%S)"
export MUSIC_DB_JDBC_URL="jdbc:mysql://${SQL_INSTANCE_IP}:3306/music_db"
export OUTPUT_DIR="gs://${BUCKET_NAME}/bulk-migration-output"
gcloud dataflow flex-template run $JOB_NAME_MUSIC \
--project=$PROJECT_ID \
--region=$REGION \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Sourcedb_to_Spanner_Flex" \
--max-workers=2 \
--num-workers=1 \
--worker-machine-type=n2-highmem-8 \
--parameters \
sourceConfigURL="$MUSIC_DB_JDBC_URL",\
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
projectId="$PROJECT_ID",\
outputDirectory="$OUTPUT_DIR/music_db",\
username="root",\
password="$DB_ROOT_PASSWORD",\
jdbcDriverClassName="com.mysql.cj.jdbc.Driver",\
jdbcDriverJars="gs://${BUCKET_NAME}/lib/mysql-connector-j-8.0.33.jar",\
spannerHost="https://batch-spanner.googleapis.com"
重要參數說明:
sourceConfigURL:來源music_db的 JDBC 連線字串。instanceId、databaseId、projectId:指定目標 Cloud Spanner 執行個體和資料庫。outputDirectory:Dataflow 會將無法遷移的記錄相關資訊寫入這個 Cloud Storage 路徑。jdbcDriverClassName:指定 MySQL JDBC 驅動程式。jdbcDriverJars:暫存 JDBC 驅動程式 JAR 的 GCS 路徑。spannerHost:使用 Spanner 寫入作業的批次最佳化端點。maxWorkers、numWorkers:控制 Dataflow 工作的縮放。這個小型資料集會維持低值。
網路注意事項:這項作業會透過公開 IP 連線至 Cloud SQL 執行個體。這是因為您先前已將 0.0.0.0/0 新增至執行個體的已授權網路。這樣一來,具有外部 IP 的 Dataflow 工作人員 VM 就能連線至資料庫。
2. 監控 Dataflow 工作
您可以在 Google Cloud 控制台中追蹤工作進度:
- 前往 Dataflow 的「Jobs」(工作) 頁面:前往 Dataflow 的「Jobs」(工作) 頁面
- 找出名為
mysql-music-db-to-spanner-bulk-...的工作,然後按一下。 - 觀察工作圖表和指標。等待工作狀態變更為「Succeeded」。這項作業約需 5 至 15 分鐘。

- 如果工作發生問題,請查看 Dataflow 工作詳細資料頁面中的「記錄」分頁,瞭解錯誤訊息。
- 「工作指標」會提供工作進度和資源耗用量 (例如輸送量和 CPU 使用率) 的相關資訊。
3. 驗證 Cloud Spanner 中的資料
Dataflow 工作順利完成後,請確認資料已複製到 Spanner 資料表。使用 gcloud 查詢 Spanner 資料庫:
# Verify row counts
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME --instance=$SPANNER_INSTANCE_NAME --sql="SELECT COUNT(*) as row_count FROM Singers"
# Expected output: 3
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME --instance=$SPANNER_INSTANCE_NAME --sql="SELECT COUNT(*) as row_count FROM Albums"
# Expected output: 6
# Inspect some data
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME --instance=$SPANNER_INSTANCE_NAME --sql="SELECT SingerId, FirstName, LastName FROM Singers ORDER BY SingerId"
預期輸出內容:
row_count: 3 row_count: 6 SingerId: 1 FirstName: Marc LastName: Richards SingerId: 2 FirstName: Catalina LastName: Smith SingerId: 3 FirstName: Alice LastName: Trentor
從 Cloud SQL 大量載入資料至 Cloud Spanner 的作業已完成。下一步是設定即時複製功能,擷取持續變更。
9. 開始即時遷移 (CDC)
大量資料載入作業完成後,您將使用 Datastream 設定持續複製串流,從 Cloud SQL 擷取變更資料擷取 (CDC) 事件,並使用 Dataflow 串流工作,近乎即時地將這些變更套用至 Cloud Spanner。
1. 執行 Live Migration Dataflow 工作
啟動串流 Dataflow 工作,從 GCS 讀取資料並寫入 Spanner。這個範本會使用 GCS Pub/Sub 通知,即時處理新檔案。
export JOB_NAME_CDC="datastream-to-spanner-cdc-$(date +%Y%m%d-%H%M%S)"
export DLQ_DIR="gs://${BUCKET_NAME}/dlq"
gcloud dataflow flex-template run $JOB_NAME_CDC \
--project=$PROJECT_ID \
--region=$REGION \
--worker-machine-type=n2-highmem-8 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Cloud_Datastream_to_Spanner" \
--parameters \
gcsPubSubSubscription="projects/${PROJECT_ID}/subscriptions/${PUBSUB_SUBSCRIPTION}",\
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
projectId="$PROJECT_ID",\
inputFileFormat="avro",\
deadLetterQueueDirectory="$DLQ_DIR",\
streamName="projects/${PROJECT_ID}/locations/${REGION}/streams/${STREAM_NAME}"
重要參數
gcsPubSubSubscription:Pub/Sub 訂閱項目,用於監聽 GCS 的新檔案通知。這樣一來,工作就能在 Datastream 寫入變更時立即處理。inputFileFormat="avro":告知 Dataflow 預期會收到 Datastream 傳送的 Avro 檔案。這必須與 Datastream 的「目的地」設定相符 (例如avroFileFormat與jsonFileFormat)。deadLetterQueueDirectory:GCS 路徑,工作會將處理失敗的記錄 (例如因結構定義不符) 儲存在這個路徑,以供日後手動檢查。streamName:Datastream 串流的完整資源路徑,可讓 Dataflow 工作追蹤複製狀態和中繼資料。
在 Dataflow Jobs 控制台中監控工作啟動情形。
2. 測試即時遷移
對來源 Cloud SQL music_db 進行變更,測試 CDC 管道。
連線至 Cloud SQL:
mysql -h $SQL_INSTANCE_IP -u root -p
輸入密碼 (Welcome@1) 並選取資料庫:
USE music_db;
-- INSERT
INSERT INTO Singers (SingerId, FirstName, LastName, BirthDate, AlbumCount) VALUES (4, 'Elena', 'Nadal', '1985-05-30', 0);
SELECT * FROM Singers WHERE SingerId = 4;
-- UPDATE
UPDATE Singers SET LastName = 'Richards-Smith' WHERE SingerId = 1;
SELECT * FROM Singers WHERE SingerId = 1;
-- DELETE
DELETE FROM Albums WHERE SingerId = 2;
DELETE FROM Singers WHERE SingerId = 2;
SELECT * FROM Singers WHERE SingerId = 2;
EXIT;
在 Spanner 中驗證 (稍待片刻):
# Verify INSERT: This should return the new row for Elena Nadal.
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="SELECT * FROM Singers WHERE SingerId = 4"
# Verify UPDATE: This should show LastName as Richards-Smith.
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId = 1"
# Verify DELETE: This should now return 0 rows.
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="SELECT * FROM Albums WHERE SingerId = 2"
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="SELECT * FROM Singers WHERE SingerId = 2"
預期輸出內容:
SingerId: 4 FirstName: Elena LastName: Nadal BirthDate: 1985-05-30 AlbumCount: 0 SingerId: 1 FirstName: Marc LastName: Richards-Smith
3. Spanner 中的最終驗證
檢查 Spanner 中 Singers 資料表的整體狀態:
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="SELECT SingerId, FirstName, LastName, AlbumCount FROM Singers ORDER BY SingerId"
預期輸出內容:
SingerId: 1 FirstName: Marc LastName: Richards-Smith AlbumCount: 2 SingerId: 3 FirstName: Alice LastName: Trentor AlbumCount: 3 SingerId: 4 FirstName: Elena LastName: Nadal AlbumCount: 0
10. 設定反向複製 (Spanner 到 Cloud SQL)
如要處理可能需要回溯或讓 Cloud SQL 資料庫在一段時間內與 Spanner 同步的情況,可以設定反向複製。這個管道會使用 Spanner 變更串流擷取 Spanner 中的變更,並將變更寫回 Cloud SQL music_db。
1. 建立 Spanner 變更串流
首先,您需要在 Spanner 資料庫中建立變更串流,追蹤 Singers 和 Albums 資料表的變更。
export CHANGE_STREAM_NAME="MusicDBChangeStream"
gcloud spanner databases ddl update $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--ddl="CREATE CHANGE STREAM $CHANGE_STREAM_NAME FOR Singers, Albums"
這個變更串流現在會記錄指定資料表的所有資料修改。
2. 建立 Dataflow 中繼資料的 Spanner 資料庫
Spanner to SourceDB Dataflow 範本需要另外一個 Spanner 資料庫,才能儲存用於管理變更串流用量的中繼資料。
export SPANNER_METADATA_DB_NAME="reverse-replication-metadata"
gcloud spanner databases create $SPANNER_METADATA_DB_NAME \
--instance=$SPANNER_INSTANCE_NAME
3. 準備 Dataflow 適用的 Cloud SQL 連線設定
Dataflow 範本需要 Cloud Storage 中的 JSON 檔案,內含目標 Cloud SQL 資料庫的連線詳細資料。
建立名為 shard_config.json 的本機檔案:
cat << EOF > shard_config.json
[
{
"logicalShardId": "mysql_shard",
"host": "${SQL_INSTANCE_IP}",
"port": "3306",
"user": "root",
"password": "${DB_ROOT_PASSWORD}",
"dbName": "music_db"
}
]
EOF
將這個檔案上傳至 GCS bucket:
export SHARD_CONFIG_FILE="gs://${BUCKET_NAME}/shard_config.json"
gcloud storage cp shard_config.json $SHARD_CONFIG_FILE
4. 執行反向複製 Dataflow 工作
使用 Spanner_to_SourceDb Flex 範本啟動 Dataflow 工作。
export JOB_NAME_REVERSE="spanner-to-mysql-reverse-$(date +%Y%m%d-%H%M%S)"
export REVERSE_DLQ_DIR="gs://${BUCKET_NAME}/reverse-dlq"
gcloud dataflow flex-template run $JOB_NAME_REVERSE \
--project=$PROJECT_ID \
--region=$REGION \
--worker-machine-type=n2-highmem-8 \
--max-workers=2 \
--num-workers=1 \
--additional-experiments=use_runner_v2 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Spanner_to_SourceDb" \
--parameters \
changeStreamName="$CHANGE_STREAM_NAME",\
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
spannerProjectId="$PROJECT_ID",\
metadataInstance="$SPANNER_INSTANCE_NAME",\
metadataDatabase="$SPANNER_METADATA_DB_NAME",\
sourceShardsFilePath="$SHARD_CONFIG_FILE",\
deadLetterQueueDirectory="$REVERSE_DLQ_DIR"
重要參數
changeStreamName:要讀取的 Spanner 變更串流名稱。metadataInstance, metadataDatabase:Spanner 執行個體/資料庫,用於儲存連接器使用的中繼資料,以控管變更串流 API 資料的用量。sourceShardsFilePath:shard_config.json的 GCS 路徑。filtrationMode:指定如何根據條件捨棄特定記錄。預設為forward_migration(篩選使用轉送遷移管道寫入的記錄)
網路注意事項:Dataflow 工作站會使用 shard_config.json 中指定的公開 IP 位址連線至 Cloud SQL 執行個體。Cloud SQL 執行個體的「授權網路」中含有 0.0.0.0/0 項目,因此允許這項連線。
在 Dataflow Jobs 控制台中監控工作啟動情形。
5. 測試反向複製
現在,請直接在 Cloud Spanner 中進行變更,並確認變更反映在 Cloud SQL 中。只有在 Dataflow 工作啟動並處於處理狀態時,才執行這項操作。
測試「INSERT」、「UPDATE」和「DELETE」
# INSERT: Insert a new singer (SingerId 5) into Spanner
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="INSERT INTO Singers (SingerId, FirstName, LastName, BirthDate, AlbumCount) VALUES (5, 'David', 'Chen', '1995-02-18', 0)"
# UPDATE: Update SingerId 3's AlbumCount in Spanner
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="UPDATE Singers SET AlbumCount = 5 WHERE SingerId = 3"
# DELETE: Delete SingerId 1 from Spanner
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="DELETE FROM Albums WHERE SingerId = 1"
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
--instance=$SPANNER_INSTANCE_NAME \
--sql="DELETE FROM Singers WHERE SingerId = 1"
在 Cloud SQL 中驗證 (過一會兒):
連線至 Cloud SQL:
mysql -h $SQL_INSTANCE_IP -u root -p
在系統提示時輸入密碼 (Welcome@1),然後在 mysql> 提示中執行下列 SQL 指令。
USE music_db;
-- Verify INSERT: This should show the new row for David Chen
SELECT * FROM Singers WHERE SingerId = 5;
-- Verify UPDATE: This should show AlbumCount as 5.
SELECT SingerId, FirstName, AlbumCount FROM Singers WHERE SingerId = 3;
-- Verify DELETE: This should return an empty set.
SELECT * FROM Albums WHERE SingerId = 1;
SELECT * FROM Singers WHERE SingerId = 1;
-- Final Verification
SELECT SingerId, FirstName, LastName, AlbumCount FROM Singers ORDER BY SingerId;
EXIT;
Cloud SQL 中的預期輸出內容應反映 Spanner 中所做的變更。
+----------+-----------+----------------+------------+ | SingerId | FirstName | LastName | AlbumCount | +----------+-----------+----------------+------------+ | 3 | Alice | Trentor | 5 | | 4 | Elena | Nadal | 0 | | 5 | David | Chen | 0 | +----------+-----------+----------------+------------+
這會確認反向複製管道是否正常運作,並將 Spanner 中的變更同步回 Cloud SQL。
11. 清除資源
如要避免系統向您的 Google Cloud 帳戶收取額外費用,請刪除本程式碼研究室建立的資源。
設定環境變數 (如有需要)
檢查環境變數是否已正確設定:
echo "PROJECT_ID: $PROJECT_ID"
echo "REGION: $REGION"
echo "SQL_INSTANCE_NAME: $SQL_INSTANCE_NAME"
echo "SPANNER_INSTANCE_NAME: $SPANNER_INSTANCE_NAME"
echo "BUCKET_NAME: $BUCKET_NAME"
echo "STREAM_NAME: $STREAM_NAME"
echo "SQL_CP_NAME: $SQL_CP_NAME"
echo "GCS_CP_NAME: $GCS_CP_NAME"
echo "PUBSUB_SUBSCRIPTION: $PUBSUB_SUBSCRIPTION"
echo "PUBSUB_TOPIC: $PUBSUB_TOPIC"
echo "CHANGE_STREAM_NAME: $CHANGE_STREAM_NAME"
列出工作,找出執行中 Dataflow 工作的 Job ID。據此匯出 JOB_ID_CDC 和 JOB_ID_REVERSE。
gcloud dataflow jobs list --region=$REGION --filter="state=Running"
export JOB_ID_CDC=<PASTE_JOB_ID_HERE>
export JOB_ID_REVERSE=<PASTE_JOB_ID_HERE>
如果您位於新的 Cloud Shell 工作階段,請重新匯出金鑰環境變數:
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1" # Or the region you used
export SQL_INSTANCE_NAME="source-mysql-instance"
export SPANNER_INSTANCE_NAME="target-spanner-instance"
export BUCKET_NAME="migration-${PROJECT_ID}-bucket"
export STREAM_NAME="mysql-to-spanner-stream"
export SQL_CP_NAME="mysql-src-cp"
export GCS_CP_NAME="gcs-dest-cp"
export PUBSUB_TOPIC="datastream-gcs-updates"
export PUBSUB_SUBSCRIPTION="datastream-gcs-sub"
export CHANGE_STREAM_NAME="MusicDBChangeStream"
停止 Dataflow 串流工作
取消 Datastream to Spanner (即時遷移) 工作:
gcloud dataflow jobs cancel $JOB_ID_CDC --region=$REGION --project=$PROJECT_ID
取消 Spanner to Cloud SQL (反向複製) 工作:
gcloud dataflow jobs cancel $JOB_ID_REVERSE --region=$REGION --project=$PROJECT_ID
刪除 Datastream 來源
停止並刪除串流:
gcloud datastream streams update $STREAM_NAME \
--location=$REGION --state=PAUSED --project=$PROJECT_ID
# Wait a moment for the stream to pause
gcloud datastream streams delete $STREAM_NAME \
--location=$REGION --project=$PROJECT_ID --quiet
刪除連線設定檔
gcloud datastream connection-profiles delete $SQL_CP_NAME \
--location=$REGION --project=$PROJECT_ID --quiet
gcloud datastream connection-profiles delete $GCS_CP_NAME \
--location=$REGION --project=$PROJECT_ID --quiet
刪除 Pub/Sub 資源
刪除訂閱項目:
gcloud pubsub subscriptions delete $PUBSUB_SUBSCRIPTION \
--project=$PROJECT_ID --quiet
刪除主題:
gcloud pubsub topics delete $PUBSUB_TOPIC \
--project=$PROJECT_ID --quiet
刪除 Cloud SQL 執行個體
系統會自動刪除其中的資料庫 (music_db)。
gcloud sql instances delete $SQL_INSTANCE_NAME \
--project=$PROJECT_ID --quiet
刪除 Cloud Spanner 執行個體
這項操作也會刪除其中的資料庫 (music-db-migrated 和 reverse-replication-metadata)。
gcloud spanner instances delete $SPANNER_INSTANCE_NAME \
--project=$PROJECT_ID --quiet
刪除 GCS Bucket 和內容
gcloud storage rm --recursive gs://${BUCKET_NAME}
刪除本機檔案
移除 Cloud Shell 主目錄中產生的所有檔案:
rm -f music-db* shard_config.json
您已清除為本程式碼研究室建立的資源。