Migración de extremo a extremo: De MySQL local fragmentado a Cloud Spanner (GoogleSQL)

1. Antes de comenzar

En este codelab, se explica cómo migrar una base de datos de MySQL local fragmentada a una base de datos de Cloud Spanner con el dialecto de GoogleSQL. Usarás los servicios de Google Cloud, incluidas las herramientas Spanner Migration Tool (SMT), Dataflow, Datastream, Pub/Sub y Google Cloud Storage.

Qué aprenderás:

  • Qué es un entorno fragmentado y cómo configurarlo
  • Cómo usar la IU web de la herramienta de migración de Spanner (SMT) para convertir un esquema de MySQL en un esquema compatible con Spanner y realizar modificaciones avanzadas del esquema
  • Cómo realizar la migración masiva de datos desde una instancia de MySQL fragmentada a Cloud Spanner con Dataflow
  • Cómo configurar la replicación continua (CDC) desde una instancia de MySQL fragmentada a Cloud Spanner con Datastream y Dataflow
  • Cómo configurar la replicación inversa de Spanner a las instancias de MySQL fragmentadas
  • Cómo usar las transformaciones personalizadas para completar columnas adicionales durante las migraciones masivas, en vivo y reversibles
  • Cómo configurar transformaciones de fragmentación con claves primarias

Qué NO se aborda en este codelab:

  • Redes personalizadas avanzadas
  • Crear plantillas personalizadas de Dataflow desde cero
  • Ajuste del rendimiento de la migración
  • Migración de la aplicación: Este codelab se enfoca en la capa de la base de datos (esquema y datos). No abarca el proceso operativo de volver a implementar o migrar tus servicios de aplicaciones.

Requisitos

  • Un proyecto de Google Cloud con facturación habilitada.
  • Permisos de IAM suficientes para habilitar APIs y crear o administrar recursos de Spanner, Dataflow, Datastream y GCS Si bien el rol de Project Owner es el más simple para un codelab, en la sección "Configuración del entorno" se abordarán roles más específicos.
  • Durante la fase de configuración, aprovisionaremos una pequeña VM de Compute Engine para simular nuestro servidor local. Asegúrate de que la cuota de tu proyecto permita la creación de VMs.
  • Un navegador web, como Google Chrome
  • Conocimientos básicos de la consola de Google Cloud y herramientas de línea de comandos como gcloud
  • Acceso a un entorno de shell Se recomienda Cloud Shell, ya que incluye gcloud.

Encontrarás más detalles sobre la configuración anterior en la sección Configuración del entorno.

2. Información sobre el proceso de migración

La migración de una base de datos fragmentada implica consolidar varias instancias físicas y lógicas de MySQL en una sola base de datos de Spanner escalable horizontalmente. En esta sección, se describen la arquitectura y las herramientas clave que se usan en la migración.

Arquitectura del flujo de migración

El proceso de migración incluye las siguientes etapas:

1. Conversión de esquemas:

  • Propósito: Convertir el esquema de la base de datos de origen en un esquema compatible de Cloud Spanner
  • Herramienta: Herramienta de migración de Spanner (SMT)
  • Proceso: SMT analiza el esquema de la base de datos de origen y genera el lenguaje de definición de datos (DDL) de Spanner equivalente. En la instancia de Spanner de destino, se crea una base de datos y, luego, se aplica automáticamente el DDL.

2. Migración de datos masiva:

  • Propósito: Realizar una carga inicial completa de los datos existentes desde la base de datos de origen a las tablas de Spanner aprovisionadas.
  • Herramienta: Dataflow, con la plantilla Sourcedb to Spanner proporcionada por Google.
  • Proceso: Este trabajo de Dataflow lee todos los datos de las tablas de origen especificadas y los escribe en las tablas de Spanner correspondientes. Esto se realiza después de crear el esquema de Spanner.

3. Migración en vivo (CDC):

  • Propósito: Capturar y aplicar cambios continuos de la base de datos de origen a Cloud Spanner casi en tiempo real, lo que minimiza el tiempo de inactividad durante la migración.
  • Herramientas:
  • Datastream: Captura los cambios (inserciones, actualizaciones y eliminaciones) de la base de datos de origen y los escribe en Cloud Storage (GCS).
  • Dataflow: Usa la plantilla Datastream to Spanner para leer los eventos de cambio de GCS y aplicarlos a Cloud Spanner.

4. Replicación inversa:

  • Propósito: Replicar los cambios de datos de Cloud Spanner a la base de datos de origen. Esto puede ser útil para las estrategias de resguardo, las migraciones por fases o el mantenimiento de una réplica en la fuente para casos de uso específicos.
  • Herramienta: Dataflow, con la plantilla Spanner to SourceDb.
  • Proceso: Este trabajo utiliza flujos de cambios de Spanner para capturar modificaciones en Spanner y volver a escribirlas en la instancia de la base de datos de origen.

En el siguiente diagrama, se ilustran los componentes y el flujo de datos:

b9e12d4151bf3bb7.png

Terminología clave:

  • Fragmento físico: Es el servidor o la instancia de procesamiento subyacente real que aloja la base de datos (en nuestro caso, la VM de GCE local simulada).
  • Fragmento lógico: Es el esquema de base de datos individual dentro de un servidor físico.
  • VM de Compute Engine (GCE): Es una máquina virtual alojada en la infraestructura de Google Cloud. En este codelab, usaremos una VM de GCE para simular un servidor independiente "local" de metal desnudo que aloja nuestra base de datos MySQL de origen.
  • Herramienta de migración de Spanner (SMT): Es una herramienta que se usa para evaluar esquemas de MySQL, sugerir equivalentes de esquemas de Spanner y generar el lenguaje de definición de datos (DDL) de Spanner.
  • Lenguaje de definición de datos (DDL): Son instrucciones que se usan para definir y modificar la estructura de la base de datos, como las instrucciones CREATE TABLE. SMT genera DDL de Spanner en función del esquema de Cloud SQL.
  • Dataflow: Es un servicio de procesamiento de datos completamente administrado y sin servidores. En este codelab, se usa para ejecutar plantillas proporcionadas por Google para la transferencia de datos masiva, la aplicación de cambios de Datastream y la replicación inversa.
  • Datastream: Es un servicio de replicación y captura de datos modificados (CDC) sin servidores. Se usa para transmitir cambios desde la instancia de MySQL alojada de forma local a Cloud Storage en este codelab.
  • Flujos de cambios de Spanner: Es una función de Spanner que permite transmitir cambios en los datos (inserciones, actualizaciones y eliminaciones) en tiempo real, y se usa como fuente para la replicación inversa.
  • Pub/Sub: Es un servicio de mensajería que se usa para separar los servicios que producen eventos de los servicios que los procesan. En este codelab, se activa Dataflow para procesar actualizaciones cada vez que Datastream sube archivos de cambios nuevos a Cloud Storage.

3. Configuración del entorno

Antes de comenzar la migración, debes configurar tu proyecto de Google Cloud y habilitar los servicios necesarios.

1. Selecciona o crea un proyecto de Google Cloud

Para usar los servicios de este codelab, necesitas un proyecto de Google Cloud con la facturación habilitada.

  1. En la consola de Google Cloud, ve a la página del selector de proyectos: Ir al selector de proyectos
  2. Selecciona o crea un proyecto de Google Cloud.
  3. Asegúrate de tener habilitada la facturación para tu proyecto. Obtén información para confirmar que tienes habilitada la facturación para tu proyecto.

2. Abre Cloud Shell

Cloud Shell es un entorno de línea de comandos que se ejecuta en Google Cloud y que viene precargado con la CLI de gcloud y otras herramientas que necesitas.

  • Haz clic en el botón Activar Cloud Shell en la parte superior derecha de la consola de Google Cloud.
  • Se abrirá una sesión de Cloud Shell en un marco nuevo en la parte inferior de la consola, que mostrará una línea de comandos.

22d57633bc12106d.png

3. Configura las variables de entorno y del proyecto

En Cloud Shell, configura algunas variables de entorno para el ID de tu proyecto y la región que usarás.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1" # Or your preferred region
export ZONE="us-central1-a" # Or a zone within your selected region

gcloud config set project $PROJECT_ID
gcloud config set compute/region $REGION
gcloud config set compute/zone $ZONE

echo "Project ID: $PROJECT_ID"
echo "Region: $REGION"
echo "Zone: $ZONE"

4. Habilita las APIs de Google Cloud requeridas

Habilita las APIs necesarias para Cloud Spanner, Dataflow, Datastream y otros servicios relacionados.

gcloud services enable \
  spanner.googleapis.com \
  dataflow.googleapis.com \
  datastream.googleapis.com \
  pubsub.googleapis.com \
  storage.googleapis.com \
  compute.googleapis.com \
  sqladmin.googleapis.com \
  servicenetworking.googleapis.com \
  cloudresourcemanager.googleapis.com

Este comando puede tardar unos minutos en completarse.

4. Configura la base de datos de MySQL de origen

En esta sección, simularemos una arquitectura de MySQL fragmentada local aprovisionando dos máquinas virtuales de Compute Engine (nuestros 2 "fragmentos físicos"). Luego, instalaremos MySQL en ambas y crearemos dos bases de datos (nuestros "fragmentos lógicos") en cada VM.

1. Crea las VMs de Compute Engine (fragmentos físicos)

Ejecuta los siguientes comandos en Cloud Shell para crear dos VMs con Ubuntu. Más adelante, les asignaremos etiquetas de red para permitir el tráfico entrante de MySQL.

# Create Physical Shard 1
gcloud compute instances create mysql-physical-1 \
    --zone=$ZONE \
    --machine-type=e2-small \
    --image-family=ubuntu-2204-lts \
    --image-project=ubuntu-os-cloud \
    --tags=mysql-server

# Create Physical Shard 2
gcloud compute instances create mysql-physical-2 \
    --zone=$ZONE \
    --machine-type=e2-small \
    --image-family=ubuntu-2204-lts \
    --image-project=ubuntu-os-cloud \
    --tags=mysql-server

2. Configura las reglas de firewall

Para permitir el acceso SSH seguro sin exposición pública y habilitar la conectividad de Datastream, haz lo siguiente:

Crea una regla de firewall para SSH a través de IAP:

Esta regla permite que Identity-Aware Proxy llegue a tus VMs en el puerto SSH (22).

gcloud compute firewall-rules create allow-ssh-iap \
    --direction=INGRESS \
    --priority=1000 \
    --network=default \
    --action=ALLOW \
    --rules=tcp:22 \
    --source-ranges=35.235.240.0/20 \
    --target-tags=mysql-server

Crea una regla de firewall para Datastream (puerto de MySQL):

Datastream debe poder acceder a estas VMs en el puerto estándar de MySQL (3306).

gcloud compute firewall-rules create allow-mysql-datastream \
    --direction=INGRESS \
    --priority=1000 \
    --network=default \
    --action=ALLOW \
    --rules=tcp:3306 \
    --source-ranges=0.0.0.0/0 \
    --target-tags=mysql-server

3. Instala y configura MySQL en la partición física 1

Establece una conexión SSH a tu primera VM para instalar MySQL y configurar el registro binario (que Datastream requiere para la replicación en vivo).

  1. Establece una conexión SSH a la primera VM:
gcloud compute ssh mysql-physical-1 --zone=$ZONE --tunnel-through-iap
  1. Instala MySQL:
sudo apt-get update
sudo apt-get install mysql-server-8.0 -y

# Verify the installation and version
sudo mysql --version
  1. Configura el archivo mysqld.cnf para habilitar el registro binario y permitir conexiones externas:
sudo sed -i 's/bind-address.*/bind-address = 0.0.0.0/' /etc/mysql/mysql.conf.d/mysqld.cnf
echo -e "[mysqld]\nserver-id=1\nlog_bin=/var/log/mysql/mysql-bin.log\nbinlog_format=ROW" | sudo tee -a /etc/mysql/mysql.conf.d/mysqld.cnf
  1. Reinicia MySQL para aplicar los cambios:
sudo systemctl restart mysql

4. Crea fragmentos lógicos, inserta datos y crea un usuario de Datastream (fragmento 1)

Mientras sigues conectado a mysql-physical-1 a través de SSH, accede al símbolo del sistema de MySQL:

sudo mysql

Ejecuta los siguientes comandos de SQL. Esta secuencia de comandos crea dos fragmentos lógicos distintos (shard0_db y shard1_db), configura el mismo esquema en ambos, inserta datos identificables de forma única en cada uno (para demostrar el fragmentado) y crea el usuario de replicación para Datastream.

Ejecuta los siguientes comandos de SQL para crear tus primeros dos fragmentos lógicos, una tabla y el usuario de replicación para Datastream:

CREATE DATABASE shard0_db;
CREATE DATABASE shard1_db;

USE shard0_db;
CREATE TABLE Customers (
    CustomerId INT NOT NULL,
    CustomerName VARCHAR(255),
    CreditLimit DECIMAL(10, 2) NOT NULL,
    LegacyRegion VARCHAR(50), -- Renamed to LoyaltyTier in Spanner

    PRIMARY KEY (CustomerId),
    CONSTRAINT CHK_CreditLimit CHECK (CreditLimit > 1000) -- Relaxed in Spanner to > 0 
);

CREATE TABLE Orders (
    CustomerId INT NOT NULL,  -- Logically references Customers.CustomerId in Spanner
    OrderId INT NOT NULL,
    OrderValue DECIMAL(10, 2),
    LegacyOrderSystem VARCHAR(50), -- Extra column in Source, to be dropped in Spanner

    PRIMARY KEY (CustomerId, OrderId) -- Spanner PK will have one additional column in PK
);

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(4, 'David E.', 2000.00, 'EAST'),
(8, 'Eleanor F.', 8100.00, 'WEST'),
(12, 'Frank G.', 12000.00, 'NORTH'),
(16, 'Grace H.', 6500.00, 'SOUTH');

INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) VALUES
(4, 101, 150.00, 'WebStore_v1'),
(4, 102, 25.50, 'InStore_POS'),
(8, 103, 75.00, 'MobileApp_Legacy'),
(12, 104, 3000.00, 'WebStore_v1'),
(16, 105, 120.00, 'Partner_API');

USE shard1_db;
CREATE TABLE Customers (
    CustomerId INT NOT NULL,
    CustomerName VARCHAR(255),
    CreditLimit DECIMAL(10, 2) NOT NULL,
    LegacyRegion VARCHAR(50), -- Renamed to LoyaltyTier in Spanner
    PRIMARY KEY (CustomerId),
    CONSTRAINT CHK_CreditLimit CHECK (CreditLimit > 1000) -- Relaxed in Spanner to > 0 
);

CREATE TABLE Orders (
    CustomerId INT NOT NULL,  -- Logically references Customers.CustomerId in Spanner
    OrderId INT NOT NULL,
    OrderValue DECIMAL(10, 2),
    LegacyOrderSystem VARCHAR(50), -- Extra column in Source, to be dropped in Spanner
    PRIMARY KEY (CustomerId, OrderId) -- Spanner PK will have one additional column in PK
);

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(1, 'Agnes N.', 5100.00, 'NORTHEAST'),(5, 'Alice I.', 15000.00, 'EAST'),
(9, 'Bob J.', 7500.00, 'WEST'),
(13, 'Charlie K.', 2200.00, 'CENTRAL');

INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) VALUES
(1, 201, 50.00, 'MobileApp_Legacy'),
(5, 202, 1250.00, 'WebStore_v1'),
(5, 203, 80.00, 'Partner_API'),
(9, 204, 600.00, 'InStore_POS'),
(13, 205, 199.99, 'WebStore_v1');


-- Create Datastream Replication User
CREATE USER 'datastream_user'@'%' IDENTIFIED BY 'complex_password_123';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT, INSERT, UPDATE, DELETE ON *.* TO 'datastream_user'@'%'; 
FLUSH PRIVILEGES;

El archivo de volcado del esquema anterior se puede encontrar aquí. Es importante crear el usuario de replicación del flujo de datos por separado, ya que no se incluye en el archivo de volcado.

5. Verifica datos

Verifica rápidamente que los datos estén presentes:

SELECT 'Customers shard0_db' AS tbl, COUNT(*) FROM shard0_db.Customers
UNION ALL
SELECT 'Orders shard0_db', COUNT(*) FROM shard0_db.Orders
UNION ALL
SELECT 'Customers shard1_db', COUNT(*) FROM shard1_db.Customers
UNION ALL
SELECT 'Orders shard1_db', COUNT(*) FROM shard1_db.Orders;
EXIT;

Resultado esperado:

+---------------------+----------+
| tbl                 | COUNT(*) |
+---------------------+----------+
| Customers shard0_db |        4 |
| Orders shard0_db    |        5 |
| Customers shard1_db |        4 |
| Orders shard1_db    |        5 |
+---------------------+----------+

Ingresa exit para salir de la conexión a la VM del fragmento físico 1.

6. Repite el proceso para el fragmento físico 2

Ahora repetirás el mismo proceso para la segunda VM, pero crearás shard2_db y shard3_db, y cambiarás server-id.

  1. Establece una conexión SSH a la segunda VM:
gcloud compute ssh mysql-physical-2 --zone=$ZONE --tunnel-through-iap
  1. Instala MySQL:
sudo apt-get update
sudo apt-get install mysql-server-8.0 -y
  1. Configura el archivo mysqld.cnf para habilitar el registro binario y permitir conexiones externas [ten en cuenta que el server-id debe ser diferente (p. ej., 2)].
sudo sed -i 's/bind-address.*/bind-address = 0.0.0.0/' /etc/mysql/mysql.conf.d/mysqld.cnf

echo -e "[mysqld]\nserver-id=2\nlog_bin=/var/log/mysql/mysql-bin.log\nbinlog_format=ROW" | sudo tee -a /etc/mysql/mysql.conf.d/mysqld.cnf
  1. Reinicia MySQL para aplicar los cambios:
sudo systemctl restart mysql
  1. Ingresa MySQL (sudo mysql) y ejecuta una versión ligeramente modificada del código SQL del paso 4:
CREATE DATABASE shard2_db;
CREATE DATABASE shard3_db;

USE shard2_db;
CREATE TABLE Customers (
    CustomerId INT NOT NULL,
    CustomerName VARCHAR(255),
    CreditLimit DECIMAL(10, 2) NOT NULL,
    LegacyRegion VARCHAR(50), -- Renamed to LoyaltyTier in Spanner
    PRIMARY KEY (CustomerId),
    CONSTRAINT CHK_CreditLimit CHECK (CreditLimit > 1000) -- Relaxed in Spanner to > 0 
);

CREATE TABLE Orders (
    CustomerId INT NOT NULL,  -- Logically references Customers.CustomerId in Spanner
    OrderId INT NOT NULL,
    OrderValue DECIMAL(10, 2),
    LegacyOrderSystem VARCHAR(50), -- Extra column in Source, to be dropped in Spanner
    PRIMARY KEY (CustomerId, OrderId) -- Spanner PK will have one additional column in PK
);

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(2, 'Brian K.', 2500.00, 'SOUTHWEST'),
(6, 'Diana L.', 1999.00, 'NORTH'),
(10, 'Edward M.', 11000.00, 'EAST'),
(14, 'Fiona N.', 3000.00, 'WEST');

INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) VALUES
(2, 301, 100.00, 'CallCenter_System'),
(6, 302, 99.00, 'MobileApp_Legacy'),
(10, 303, 1000.00, 'WebStore_v1'),
(10, 304, 2500.00, 'InStore_POS'),
(14, 305, 130.00, 'MobileApp_Legacy');

USE shard3_db;
CREATE TABLE Customers (
    CustomerId INT NOT NULL,
    CustomerName VARCHAR(255),
    CreditLimit DECIMAL(10, 2) NOT NULL,
    LegacyRegion VARCHAR(50), -- Renamed to LoyaltyTier in Spanner
    PRIMARY KEY (CustomerId),
    CONSTRAINT CHK_CreditLimit CHECK (CreditLimit > 1000) -- Relaxed in Spanner to > 0 
);

CREATE TABLE Orders (
    CustomerId INT NOT NULL,  -- Logically references Customers.CustomerId in Spanner
    OrderId INT NOT NULL,
    OrderValue DECIMAL(10, 2),
    LegacyOrderSystem VARCHAR(50), -- Extra column in Source, to be dropped in Spanner
    PRIMARY KEY (CustomerId, OrderId) -- Spanner PK will have one additional column in PK
);

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(3, 'Cathy Z.', 6000.00, 'CENTRAL'),
(7, 'George O.', 18000.00, 'SOUTH'),
(11, 'Helen P.', 4000.00, 'NORTHEAST'),
(15, 'Ivy Q.', 9500.00, 'SOUTHWEST');

INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) VALUES
(3, 401, 600.00, 'InStore_POS'),
(7, 402, 1200.00, 'CallCenter_System'),
(11, 403, 350.00, 'MobileApp_Legacy'),
(15, 404, 800.00, 'WebStore_v1'),
(99, 999, 25.00, 'CallCenter_System'); -- Failure row during Bulk Migration due to violation of interleaving

-- Create Datastream Replication User
CREATE USER 'datastream_user'@'%' IDENTIFIED BY 'complex_password_123';
GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT, INSERT, UPDATE, DELETE ON *.* TO 'datastream_user'@'%'; 
FLUSH PRIVILEGES;

-- Verify Data
SELECT 'Customers shard2_db' AS tbl, COUNT(*) FROM shard2_db.Customers
UNION ALL
SELECT 'Orders shard2_db', COUNT(*) FROM shard2_db.Orders
UNION ALL
SELECT 'Customers shard3_db', COUNT(*) FROM shard3_db.Customers
UNION ALL
SELECT 'Orders shard3_db', COUNT(*) FROM shard3_db.Orders;

EXIT;

Resultado esperado:

+---------------------+----------+
| tbl                 | COUNT(*) |
+---------------------+----------+
| Customers shard2_db |        4 |
| Orders shard2_db    |        5 |
| Customers shard3_db |        4 |
| Orders shard3_db    |        5 |
+---------------------+----------+

El archivo de volcado del esquema anterior se puede encontrar aquí. Es importante crear el usuario de replicación del flujo de datos por separado, ya que no se incluye en el archivo de volcado.

Ingresa exit para salir de la conexión a la VM.

5. Configura Cloud Spanner

Ahora, configurarás la instancia de Cloud Spanner de destino a la que se migrarán los datos.

1. Crea una instancia de Cloud Spanner

Crea una instancia de Cloud Spanner en la misma región que tus VMs de Compute Engine para minimizar la latencia. Con este comando, se crea una instancia pequeña adecuada para este codelab, con 100 unidades de procesamiento.

export SPANNER_INSTANCE_NAME="target-spanner-instance"
export SPANNER_DATABASE_NAME="sharded-target-db"
export SPANNER_CONFIG="regional-${REGION}"

gcloud spanner instances create $SPANNER_INSTANCE_NAME \
  --config=$SPANNER_CONFIG \
  --description="Target Spanner Instance" \
  --processing-units=100

La creación de la instancia puede tardar un minuto o dos.

6. Convierte el esquema con la herramienta de migración de Spanner (SMT)

Usa la IU web de Spanner Migration Tool (SMT) para conectarte a uno de nuestros fragmentos lógicos (shard0_db), analizar su esquema y aplicar varias modificaciones avanzadas antes de convertirlo a Cloud Spanner.

1. Instala SMT

Ejecutaremos la IU web de SMT directamente desde Cloud Shell. En la terminal de Cloud Shell, descarga y extrae la versión más reciente de SMT:

sudo apt-get update && sudo apt-get install google-cloud-cli-spanner-migration-tool

# Verify installation 
gcloud alpha spanner migrate web --help

2. Conéctate a la base de datos de origen

  1. Autentica tu sesión
# Authenticate your Google Cloud account
gcloud auth login

# Set up Application Default Credentials (ADC) for SMT
gcloud auth application-default login

# Ensure your current project is set correctly
gcloud config set project $PROJECT_ID

(Nota: Cuando se te solicite, sigue la URL proporcionada para autorizar tu cuenta y pega el código de verificación en la terminal).

  1. Primero, busca la IP externa de tu primer fragmento físico ejecutando este comando en una nueva pestaña de Cloud Shell:
gcloud compute instances describe mysql-physical-1 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)'
  1. Imprime los detalles de la instancia de Spanner de destino que se usarán durante la configuración de SMT.
echo "Project ID: $PROJECT_ID"
echo "Instance ID: $SPANNER_INSTANCE_NAME"
echo "Database Name: $SPANNER_DATABASE_NAME"
  1. Inicia la IU web:
gcloud alpha spanner migrate web --port=8080
  1. En la parte superior derecha de la ventana de Cloud Shell, haz clic en el ícono de Vista previa en la Web (parece un ojo) y selecciona Vista previa en el puerto 8080. Se abrirá la IU de SMT en una nueva pestaña del navegador.

69ff1c4de3072798.png

  1. En la IU web de SMT, selecciona Connect to database.
  2. Completa los detalles de la conexión:
  • Tipo de base de datos: MySQL
  • Host: (pega la dirección IP del paso 2)
  • Puerto: 3306
  • Usuario: datastream_user
  • Contraseña: complex_password_123
  • Nombre de la base de datos: shard0_db
  1. Haz clic en el botón de edición que se encuentra en la esquina superior derecha para configurar la base de datos de Spanner.
  2. Ingresa los detalles de tu destino de Spanner:
  • ID del proyecto: (pega el ID del proyecto del paso 3)
  • Instancia de Spanner: (pega el ID de instancia del paso 3)
  1. Haz clic en Test Connection.
  2. Una vez que se complete la verificación, haz clic en Conectar. SMT analizará la base de datos de origen y presentará un esquema de Spanner de referencia.

50a0a11c84f8cd7.png

3. Aplica modificaciones del esquema

Ahora, cambiaremos la forma del esquema para abarcar nuestras situaciones de migración complejas.

En el editor de esquemas de la IU de SMT, realiza las siguientes acciones:

A. Cambia el nombre de la columna LegacyRegion:

  • Haz clic en la tabla Customers en el panel de navegación izquierdo. Se abrirá la pestaña Columnas de forma predeterminada.
  • Haz clic en el botón Editar de la sección Spanner.
  • Localiza la columna LegacyRegion en la vista del esquema de Spanner.
  • Escribe LoyaltyTier en el diálogo de nombre de columna para cambiar el nombre de la columna de Spanner.
  • Haz clic en Guardar y convertir.

7eab05df38da8e36.png

2eedd3168cf161a4.png

B. Disminuye la rigurosidad de la restricción de verificación:

  • En la tabla Customers, navega a la pestaña Check Constraints.
  • Busca la restricción CHK_CreditLimit. Haz clic en el ícono Editar (lápiz).
  • Cambia la condición de CreditLimit > 1000 a CreditLimit > 0. (Esto provocará intencionalmente que las filas con límites de crédito más bajos no puedan realizar la migración inversa y se envíen a la DLQ).

2adcfda3b42b428f.png

C. Descarta la columna LegacyOrderSystem:

  • Haz clic en la tabla Orders. Se abrirá la pestaña Columnas de forma predeterminada.
  • Haz clic en el botón Editar de la sección Spanner.
  • Localiza la columna LegacyOrderSystem en la vista del esquema de Spanner.
  • Haz clic en el ícono de menú de 3 puntos que aparece junto a él y selecciona Descartar columna.
  • Haz clic en Guardar y convertir.

53d3bf8695c43d95.png

D. Agrega la columna OrderSource y conviértela en clave primaria:

  • En la tabla Orders, haz clic en Agregar columna. Asigna el nombre OrderSource y establece el tipo en STRING con una longitud de 50, sin generación automática y establece IsNullable en No.
  • Ve a la pestaña Clave principal.
  • Haz clic en Editar y elige OrderSource en el menú desplegable Nombre de la columna.
  • Haz clic en Agregar columna y, luego, en Guardar y convertir.

6fcf3f35352bdbdd.png

b85a72b2d2c521d5.png

E. Intercala la tabla Orders:

  • En la tabla Orders, en la vista de tabla principal, busca la pestaña Interleave.
  • Establece la tabla principal en Customers.
  • Elige IN PARENT Interleave type y NO ACTION On Delete Action.
  • Haz clic en Guardar.

c88dbe943652683a.png

4. Descarga el archivo de anulaciones y aplica el esquema

  1. En la esquina superior derecha de la IU de SMT, busca el botón Download Artifacts. Selecciona la opción Download Overrides File. Guarda este archivo en tu máquina local. Este archivo contiene todos los cambios de asignación de esquemas que acabamos de realizar y será utilizado por nuestras canalizaciones de Dataflow.
  1. Haz clic en Preparar migración.

d3ba4884743e077.png

  1. En el menú desplegable, elige Migration Mode como Schema.
  2. Ingresa tu base de datos de Spanner de destino: sharded-target-db

1f80f8636d317920.png

  1. Haz clic en Migrar.
  2. SMT aplicará el DDL y creará la base de datos de Spanner. Puedes detener el proceso de SMT de forma segura en Cloud Shell (Ctrl+C) una vez que se complete.

5. Verifica el esquema en Cloud Spanner

Verifica que se hayan creado las tablas en la base de datos de Spanner.

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --sql="SELECT table_name FROM information_schema.tables WHERE table_schema = '' ORDER BY table_name"

Deberías ver el siguiente resultado:

table_name: Customers
table_name: Orders

Opcional: Si deseas verificar el DDL de Spanner real para confirmar que se aplicaron las restricciones de verificación, la intercalación y las columnas adicionales, ejecuta el siguiente comando:

gcloud spanner databases ddl describe $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME

Resultado esperado:

CREATE TABLE Customers (
  CustomerId INT64 NOT NULL,
  CustomerName STRING(255),
  CreditLimit NUMERIC NOT NULL,
  LoyaltyTier STRING(50),
  CONSTRAINT CHK_CreditLimit CHECK(`CreditLimit` > 0),
) PRIMARY KEY(CustomerId);

CREATE TABLE Orders (
  CustomerId INT64 NOT NULL,
  OrderId INT64 NOT NULL,
  OrderValue NUMERIC,
  OrderSource STRING(50) NOT NULL,
) PRIMARY KEY(CustomerId, OrderId, OrderSource),
  INTERLEAVE IN PARENT Customers ON DELETE NO ACTION;

7. Inicializa la captura de datos modificados (CDC)

En esta sección, configurarás el "grabador" para tu migración. Si configuras Datastream y Pub/Sub antes de que comience la carga de datos masiva, te aseguras de que se capture y ponga en cola cada cambio realizado en las bases de datos de origen, lo que evita la pérdida de datos durante la transición. Esta configuración es obligatoria para la migración en vivo.

Como nuestra arquitectura involucra dos servidores físicos, debemos crear dos perfiles de fuente de Datastream y dos transmisiones de Datastream independientes. Ambos flujos escribirán en un solo bucket de Google Cloud Storage (GCS), que actuará como la fuente unificada para nuestra canalización de Dataflow.

1. Crea un bucket de Cloud Storage

Datastream requiere un destino para almacenar los eventos de cambio capturados. Creemos un bucket de GCS.

export BUCKET_NAME="migration-${PROJECT_ID}-bucket"
gcloud storage buckets create gs://${BUCKET_NAME} --location=$REGION

2. Crea perfiles de conexión de Datastream

Necesitamos dos perfiles de conexión de origen de MySQL distintos (uno para cada fragmento físico) y un perfil de conexión de destino para Cloud Storage.

Obtén las direcciones IP de origen

Primero, recupera las direcciones IP externas de nuestras dos VMs de Compute Engine y almacénalas como variables de entorno:

export MYSQL_IP_1=$(gcloud compute instances describe mysql-physical-1 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)')

export MYSQL_IP_2=$(gcloud compute instances describe mysql-physical-2 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)')

Crea perfiles de conexión de origen (MySQL en Compute Engine)

Crea los perfiles de conexión de Datastream con el objeto datastream_user que creaste antes.

# Create Source Profile for Physical Shard 1
export SQL_CP_NAME_1="mysql-src-cp-1"
gcloud datastream connection-profiles create $SQL_CP_NAME_1 \
    --location=$REGION \
    --type=mysql \
    --mysql-hostname=$MYSQL_IP_1 \
    --mysql-port=3306 \
    --mysql-username=datastream_user \
    --mysql-password=complex_password_123 \
    --display-name="MySQL Source 1 (Physical Shard 1)"

# Create Source Profile for Physical Shard 2
export SQL_CP_NAME_2="mysql-src-cp-2"
gcloud datastream connection-profiles create $SQL_CP_NAME_2 \
    --location=$REGION \
    --type=mysql \
    --mysql-hostname=$MYSQL_IP_2 \
    --mysql-port=3306 \
    --mysql-username=datastream_user \
    --mysql-password=complex_password_123 \
    --display-name="MySQL Source 2 (Physical Shard 2)"

Nota: Datastream se conecta a estas VMs a través de sus IPs públicas, lo que se permite porque agregamos 0.0.0.0/0 a nuestras reglas de firewall anteriormente. En un entorno de producción, deberías incluir en la lista de entidades permitidas de forma estricta los rangos de IP públicas específicos de Datastream.

Crea un perfil de conexión de destino (Cloud Storage):

Esto apunta a la raíz del bucket que acabas de crear.

export GCS_CP_NAME="gcs-dest-cp"
gcloud datastream connection-profiles create $GCS_CP_NAME \
    --location=$REGION \
    --type=google-cloud-storage \
    --bucket=$BUCKET_NAME \
    --root-path=/ \
    --display-name="GCS Destination" --force

3. Crea transmisiones de Datastream

Ahora crearemos dos transmisiones de CDC. El flujo 1 capturará shard0_db y shard1_db. El stream 2 capturará shard2_db y shard3_db. Ambos flujos escriben en el mismo bucket de GCS en formato Avro.

# Stream for Physical Shard 1
export STREAM_NAME_1="mysql-to-spanner-stream-1"
export GCS_STREAM_PATH_1="data/${STREAM_NAME_1}"

gcloud datastream streams create $STREAM_NAME_1 \
    --location=$REGION \
    --display-name="MySQL Source 1 CDC Stream" \
    --source=$SQL_CP_NAME_1 \
    --destination=$GCS_CP_NAME \
    --mysql-source-config=<(echo "includeObjects:
  mysqlDatabases:
  - database: 'shard0_db'
  - database: 'shard1_db'") \
    --gcs-destination-config=<(echo "path: ${GCS_STREAM_PATH_1}/
fileRotationMb: 5
fileRotationInterval: 15s
avroFileFormat: {}") \
    --backfill-none

# Stream for Physical Shard 2
export STREAM_NAME_2="mysql-to-spanner-stream-2"
export GCS_STREAM_PATH_2="data/${STREAM_NAME_2}"

gcloud datastream streams create $STREAM_NAME_2 \
    --location=$REGION \
    --display-name="MySQL Source 2 CDC Stream" \
    --source=$SQL_CP_NAME_2 \
    --destination=$GCS_CP_NAME \
    --mysql-source-config=<(echo "includeObjects:
  mysqlDatabases:
  - database: 'shard2_db'
  - database: 'shard3_db'") \
    --gcs-destination-config=<(echo "path: ${GCS_STREAM_PATH_2}/
fileRotationMb: 5
fileRotationInterval: 15s
avroFileFormat: {}") \
    --backfill-none

Usar parámetros de configuración de rotación de archivos más pequeños (5 MB o 15 segundos) nos ayuda a ver los cambios replicados más rápido durante el codelab.

Este comando puede tardar un tiempo en completarse. Verifica el estado: gcloud datastream streams describe $STREAM_NAME_1 --location=$REGION.

4. Inicia las transmisiones de Datastream

Activa ambas transmisiones para que comiencen a registrar los cambios.

gcloud datastream streams update $STREAM_NAME_1 \
    --location=$REGION \
    --state=RUNNING

gcloud datastream streams update $STREAM_NAME_2 \
    --location=$REGION \
    --state=RUNNING

Verificar el estado: Puedes ejecutar gcloud datastream streams describe $STREAM_NAME_1 --location=$REGION. Inicialmente, el estado será STARTING y cambiará a RUNNING después de unos momentos. Espera a que ambos se ejecuten por completo antes de iniciar la migración en vivo.

5. Configura Pub/Sub para las notificaciones de GCS

Se debe notificar a Dataflow de inmediato cuando cualquiera de las transmisiones de Datastream escriba un archivo nuevo en el bucket de GCS. Configuraremos GCS para que envíe notificaciones a un solo tema de Pub/Sub.

Crea un tema de Pub/Sub:

export PUBSUB_TOPIC="datastream-gcs-updates"
gcloud pubsub topics create $PUBSUB_TOPIC

Crea una notificación de GCS

Notifica el tema cuando se cree cualquier objeto con el prefijo data/ (que abarca ambos flujos).

gcloud storage buckets notifications create gs://${BUCKET_NAME} --topic=projects/$PROJECT_ID/topics/$PUBSUB_TOPIC --payload-format=json --object-prefix=data/

Cree una suscripción a Pub/Sub

Crea la suscripción con una fecha límite de confirmación recomendada para Dataflow.

export PUBSUB_SUBSCRIPTION="datastream-gcs-sub"
gcloud pubsub subscriptions create $PUBSUB_SUBSCRIPTION \
  --topic=$PUBSUB_TOPIC \
  --ack-deadline=600

8. Transformación personalizada

Dado que nuestro esquema de Spanner difiere de nuestro esquema de MySQL (debido a las columnas que agregamos y quitamos a través de la IU web de SMT), la migración de Dataflow lista para usar fallará. Dataflow necesita instrucciones sobre cómo asignar estas diferencias durante las canalizaciones de avance (de MySQL a Spanner) y retroceso (de Spanner a MySQL).

Además, como realizamos una migración inversa fragmentada, Dataflow necesita un mecanismo de enrutamiento para saber a qué fragmento lógico (shard0_db, shard1_db, etc.) pertenece una fila de Spanner actualizada durante la replicación inversa.

Para ello, escribiremos un JAR de transformación personalizado con la plantilla de fragmento personalizado de Spanner proporcionada por Google.

1. Descarga la plantilla de fragmento personalizada

En Cloud Shell, descarga el repositorio de plantillas de Google Cloud Dataflow y navega a la carpeta de fragmentos personalizados:

git clone https://github.com/GoogleCloudPlatform/DataflowTemplates.git
cd DataflowTemplates/v2/spanner-custom-shard

2. Configura la lógica de transformación de datos

Tenemos que editar el archivo CustomTransformationFetcher.java.

  • Migración hacia adelante (toSpannerRow): Propaga la columna OrderSource recién agregada con la columna LegacyOrderSystem de MySQL.
  • Migración inversa (toSourceRow): Vuelve a completar la columna LegacyOrderSystem que se descartó y que requiere MySQL, y la deriva del OrderSource de Spanner.

Edita el archivo CustomTransformationFetcher.java. En lugar de abrir manualmente un editor de texto, ejecuta el siguiente comando para anular automáticamente el archivo de plantilla con nuestra lógica personalizada:

cat << 'EOF' > src/main/java/com/custom/CustomTransformationFetcher.java 
package com.custom;

import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
import com.google.cloud.teleport.v2.spanner.utils.ISpannerMigrationTransformer;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationRequest;
import com.google.cloud.teleport.v2.spanner.utils.MigrationTransformationResponse;
import java.util.HashMap;
import java.util.Map;

public class CustomTransformationFetcher implements ISpannerMigrationTransformer {

 @Override
 public void init(String customParameters) {}

 @Override
 public MigrationTransformationResponse toSpannerRow(MigrationTransformationRequest request)
     throws InvalidTransformationException {
   if (request.getTableName().equals("Orders")) {
     Map<String, Object> requestRow = request.getRequestRow();
     Map<String, Object> responseRow = new HashMap<>();

     Object legacySysObj = requestRow.get("LegacyOrderSystem");
     String legacySys = (legacySysObj != null) ? (String) legacySysObj : "UNKNOWN_SYSTEM";

     // Transform: Trim the string to remove everything after the first underscore
     String orderSource = legacySys;
     if (legacySys.contains("_")) {
       orderSource = legacySys.substring(0, legacySys.indexOf('_'));
     }

     // Populate the new Spanner column (e.g., "WebStore_v1" becomes "WebStore")
     responseRow.put("OrderSource", orderSource);

     return new MigrationTransformationResponse(responseRow, false);
   }

   return new MigrationTransformationResponse(new HashMap<>(), false);
 }

 @Override
 public MigrationTransformationResponse toSourceRow(MigrationTransformationRequest request)
     throws InvalidTransformationException {
   if (request.getTableName().equals("Orders")) {
     Map<String, Object> requestRow = request.getRequestRow();
     Map<String, Object> responseRow = new HashMap<>();

     // Safely fetch the Spanner OrderSource
     Object sourceObj = requestRow.get("OrderSource");
     String source = (sourceObj != null) ? (String) sourceObj : "UNKNOWN_SYSTEM";
     String legacySys = "'" + source + "_v1'";

     // Transform: Append a suffix to visibly prove the reverse transformation worked
     // e.g., "WebStore" becomes "WebStore_v1"
     responseRow.put("LegacyOrderSystem", legacySys);

     return new MigrationTransformationResponse(responseRow, false);
   }

   return new MigrationTransformationResponse(new HashMap<>(), false);
 }

 @Override
 public MigrationTransformationResponse transformFailedSpannerMutation(
     MigrationTransformationRequest request) throws InvalidTransformationException {
   return new MigrationTransformationResponse(new HashMap<>(), false);
 }
}
EOF

3. Configura la lógica de sharding inverso

Dataflow usa CustomShardIdFetcher.java durante la replicación inversa para determinar a dónde se debe enrutar una mutación de Spanner. Usaremos la clave principal CustomerId y la lógica de módulo (%4) para enrutar de forma dinámica los registros a su fragmento lógico correcto.

Edita el archivo CustomShardIdFetcher.java con cat y reemplaza todo su contenido por el siguiente código:

cat << 'EOF' > src/main/java/com/custom/CustomShardIdFetcher.java 
package com.custom;

import com.google.cloud.teleport.v2.spanner.utils.IShardIdFetcher;
import com.google.cloud.teleport.v2.spanner.utils.ShardIdRequest;
import com.google.cloud.teleport.v2.spanner.utils.ShardIdResponse;
import java.util.Map;

public class CustomShardIdFetcher implements IShardIdFetcher {
    
    @Override
    public void init(String parameters) {}

    @Override
    public ShardIdResponse getShardId(ShardIdRequest shardIdRequest) {
        Map<String, Object> keys = shardIdRequest.getSpannerRecord
();
        
        // Use the Primary Key to identify the correct logical shard
        if (keys != null && keys.containsKey("CustomerId")) {
            long customerId = Long.parseLong(keys.get("CustomerId").toString());
            long shardIdx = customerId % 4;
            
            ShardIdResponse response = new ShardIdResponse();
            response.setLogicalShardId("shard" + shardIdx + "_db");
            return response;
        }
        
        return new ShardIdResponse();
    }
}
EOF

4. Compila y sube el archivo JAR

Ahora que escribimos nuestra lógica personalizada de Java, debemos compilarla en un archivo JAR y subirla al bucket de Google Cloud Storage que creamos anteriormente para que Dataflow pueda acceder a ella.

Ejecute los siguientes comandos en Cloud Shell:

# Return to DataflowTemplates directory 
cd ../..

# Build the JAR using Maven
mvn clean install -DskipTests -Dcheckstyle.skip -Dspotless.check.skip=true -Djib.skip -pl v2/spanner-custom-shard -am

# Upload the JAR to GCS
export CUSTOM_JAR_PATH="gs://${BUCKET_NAME}/custom-logic/spanner-custom-shard-1.0.jar"

gcloud storage cp v2/spanner-custom-shard/target/spanner-custom-shard-1.0-SNAPSHOT.jar $CUSTOM_JAR_PATH

# Return to home directory
cd ~

9. Migra datos de MySQL a Spanner de forma masiva

Con el esquema de Spanner implementado y el JAR de transformación personalizado compilado, ahora podemos copiar los datos existentes de tu base de datos de MySQL a Cloud Spanner. Usarás la plantilla de Flex de Sourcedb to Spanner Dataflow, que está diseñada para copiar de forma masiva datos de bases de datos accesibles a través de JDBC a Spanner.

1. Sube el archivo de anulaciones del esquema

En la sección 6, descargaste el archivo JSON de anulaciones de Spanner con la IU web de SMT. Debemos subirlo a nuestro bucket de GCS para que Dataflow pueda usarlo para asignar las diferencias de esquema (como las columnas renombradas).

  1. En Cloud Shell, haz clic en el menú de tres puntos (Más) y selecciona Subir.

4b17d17ab13e90df.png

  1. Selecciona el archivo JSON de anulaciones que descargaste antes (p.ej., spanner_overrides.json).
  2. Mueve el archivo a tu bucket de GCS:
export OVERRIDES_FILE="spanner_overrides.json" # Change this if your downloaded file has a different name

export GCS_OVERRIDES_PATH="gs://${BUCKET_NAME}/config/${OVERRIDES_FILE}"

gcloud storage cp ~/${OVERRIDES_FILE} $GCS_OVERRIDES_PATH

2. Crea y sube el archivo de configuración de fragmentación

Dataflow necesita saber cómo conectarse a los cuatro fragmentos lógicos en tus dos VMs físicas. Para ello, crearemos un archivo sharding.json.

Ejecuta el siguiente comando en Cloud Shell para generar y subir la configuración:

cat <<EOF > sharding.json
{
  "configType": "dataflow",
  "shardConfigurationBulk": {
    "schemaSource": {
      "dataShardId": "mysql-physical-1",
      "host": "${MYSQL_IP_1}",
      "user": "datastream_user",
      "password": "complex_password_123",
      "port": "3306",
      "dbName": "shard0_db"
    },
    "dataShards": [
      {
        "dataShardId": "mysql-physical-1",
        "host": "${MYSQL_IP_1}",
        "user": "datastream_user",
        "password": "complex_password_123",
        "port": "3306",
        "dbName": "",
        "namespace": "namespace-mysql-1",
        "databases": [
          {
            "dbName": "shard0_db",
            "databaseId": "shard0_db",
            "refDataShardId": "mysql-physical-1"
          },
          {
            "dbName": "shard1_db",
            "databaseId": "shard1_db",
            "refDataShardId": "mysql-physical-1"
          }
        ]
      },
      {
        "dataShardId": "mysql-physical-2",
        "host": "${MYSQL_IP_2}",
        "user": "datastream_user",
        "password": "complex_password_123",
        "port": "3306",
        "dbName": "",
        "namespace": "namespace-mysql-2",
        "databases": [
          {
            "dbName": "shard2_db",
            "databaseId": "shard2_db",
            "refDataShardId": "mysql-physical-2"
          },
          {
            "dbName": "shard3_db",
            "databaseId": "shard3_db",
            "refDataShardId": "mysql-physical-2"
          }
        ]
      }
    ]
  }
}
EOF


export GCS_SHARDING_PATH="gs://${BUCKET_NAME}/config/sharding.json"
gcloud storage cp sharding.json $GCS_SHARDING_PATH

3. Ejecuta el trabajo de Dataflow de migración masiva

Usaremos la plantilla de Flex Sourcedb to Spanner. Como se trata de una migración fragmentada con transformaciones personalizadas, pasamos el archivo de anulaciones, la configuración de fragmentación y nuestro JAR de Java personalizado.

export JOB_NAME="mysql-sharded-bulk-to-spanner-$(date +%Y%m%d-%H%M%S)"
export OUTPUT_DIR="gs://${BUCKET_NAME}/bulk-migration"

gcloud dataflow flex-template run $JOB_NAME \
  --project=$PROJECT_ID \
  --region=$REGION \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Sourcedb_to_Spanner_Flex" \
--max-workers=2 \
--num-workers=1 \
--worker-machine-type=n2-highmem-8 \
  --parameters \
sourceConfigURL=$GCS_SHARDING_PATH,\
instanceId=$SPANNER_INSTANCE_NAME,\
databaseId=$SPANNER_DATABASE_NAME,\
projectId=$PROJECT_ID,\
outputDirectory=$OUTPUT_DIR,\
username=datastream_user,\
password=complex_password_123,\
schemaOverridesFilePath=$GCS_OVERRIDES_PATH,\
transformationJarPath=$CUSTOM_JAR_PATH,\
transformationClassName=com.custom.CustomTransformationFetcher

Explicación de los parámetros clave:

  • sourceConfigURL: Es la ruta de acceso al archivo sharding.json que creamos. Esto le indica a Dataflow cómo conectarse a las cuatro particiones lógicas de MySQL en las dos VMs físicas.
  • schemaOverridesFilePath: Es la ruta de acceso al archivo JSON que descargamos de la IU web de SMT. Esto le indica a Dataflow cómo controlar las modificaciones del esquema que realizamos (como la columna LegacyRegion descartada y la restricción de verificación más estricta).
  • transformationJarPath: Es la ruta de GCS al archivo JAR de Java compilado que creamos en la sección anterior. Contiene el código real para ejecutar nuestras transformaciones personalizadas.
  • transformationClassName: Es el nombre completamente calificado de la clase de Java dentro de nuestro archivo JAR que implementa la lógica de migración hacia adelante (com.custom.CustomTransformationFetcher).
  • outputDirectory: Es la ubicación de GCS en la que Dataflow escribirá sus archivos temporales y, lo que es más importante, los archivos de la cola de mensajes no entregados (DLQ).
  • maxWorkers, numWorkers: Controla el ajuste de escala del trabajo de Dataflow. Se mantiene bajo para este conjunto de datos pequeño.
  • instanceId, databaseId, projectId: Especifica la instancia y la base de datos de Cloud Spanner de destino.

Nota sobre la red: Este trabajo se conecta a la instancia de Cloud SQL a través de su IP pública. Esto es posible porque anteriormente agregaste 0.0.0.0/0 a las redes autorizadas de la instancia. Esto permite que las VMs de trabajador de Dataflow, que tienen IPs externas, lleguen a la base de datos.

4. Supervisa el trabajo de Dataflow

Puedes hacer un seguimiento del progreso del trabajo en la consola de Google Cloud:

  1. Navega a la página Trabajos de Dataflow: Ir a Trabajos de Dataflow
  2. Ubica el trabajo llamado mysql-sharded-bulk-to-spanner-... y haz clic en él.
  3. Observa el gráfico y las métricas del trabajo. Espera a que el estado del trabajo cambie a Completado. Este proceso debería tardar entre 5 y 15 minutos aproximadamente.

f3ffd88c35fa8042.png

  • Si el trabajo tiene problemas, revisa la pestaña Registros en la página de detalles del trabajo de Dataflow para ver los mensajes de error.
  • Métricas del trabajo proporciona más información sobre el progreso del trabajo y el consumo de recursos, como el rendimiento y el uso de CPU.

5. Verifica los datos en Cloud Spanner y revisa la cola de mensajes no entregados (DLQ)

Una vez que el trabajo de Dataflow se complete correctamente, debemos verificar que nuestros datos hayan llegado de forma segura y, luego, inspeccionar los registros que diseñamos intencionalmente para que fallaran.

A. Verifica el estado general de los datos migrados:

Usa la CLI de gcloud para ejecutar algunas verificaciones rápidas del estado de tu base de datos consolidada de Spanner y asegurarte de que los registros válidos se hayan migrado correctamente y de que nuestro archivo JAR personalizado haya completado la columna adicional.

# 1. Verify total Customer count
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT COUNT(*) as TotalCustomers FROM Customers"

# 2. Verify total Orders count (Total minus the orphan record)
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT COUNT(*) as TotalOrders FROM Orders"

# 3. Verify the Custom Transformation on OrderSource worked
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, OrderId, OrderSource FROM Orders LIMIT 3"

# 4. Verify that renamed column LoyaltyTier has the correct data
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, CustomerName, LoyaltyTier FROM Customers LIMIT 3"

Resultado esperado:

TotalCustomers: 16
TotalOrders: 19

CustomerId: 1
OrderId: 201
OrderSource: MobileApp

CustomerId: 2
OrderId: 301
OrderSource: CallCenter

CustomerId: 3
OrderId: 401
OrderSource: InStore

CustomerId: 1
CustomerName: Agnes N.
LoyaltyTier: NORTHEAST

CustomerId: 2
CustomerName: Brian K.
LoyaltyTier: SOUTHWEST

CustomerId: 3
CustomerName: Cathy Z.
LoyaltyTier: CENTRAL
  • Se migraron correctamente todas las filas de la tabla Customers.
  • Vemos 1 fila con errores en la tabla Orders debido a INTERLEAVE IN PARENT en Spanner: CustomerId 99 es un elemento secundario huérfano porque no hay una fila correspondiente en la tabla Customers.

B. Verifica las fallas intencionales en la DLQ:

El error anterior se documenta en la carpeta de la cola de mensajes no entregados (DLQ) que creó la canalización de migración masiva.

  1. Navega a Cloud Storage en la consola de Google Cloud.
  2. Ve a tu bucket y abre la carpeta bulk-migration/dlq/severe.
  3. Inspecciona los archivos JSON que contiene. Encontrarás la fila Orders con el CustomerId huérfano.
  4. Para volver a intentar los errores de la DLQ de la migración masiva, sigue los pasos que se mencionan aquí.

Se completó la carga masiva inicial de datos de Cloud SQL a Cloud Spanner. El siguiente paso es configurar la replicación en vivo para capturar los cambios en curso.

10. Iniciar la migración en vivo (CDC)

Ahora que se completó la carga masiva de datos, iniciarás un trabajo de transmisión continuo de Dataflow. Este trabajo leerá los eventos de captura de datos modificados (CDC) que Datastream escribe en tu bucket de GCS y aplicará esos cambios a Cloud Spanner casi en tiempo real.

También probaremos esta canalización insertando datos válidos y datos no válidos de forma intencional para observar cómo Dataflow controla la replicación en vivo y enruta los errores a la cola de mensajes no entregados (DLQ).

1. Crea el archivo de configuración de fragmentación de la migración en vivo

A diferencia de la migración masiva (que usa cadenas de conexión JDBC), la canalización de migración en vivo lee eventos de Datastream desde GCS. Necesita una configuración de JSON completamente diferente que asigne los nombres de las transmisiones y las bases de datos de Datastream a tus fragmentos lógicos de Spanner.

Ejecuta el siguiente comando en Cloud Shell para crear y subir la configuración de fragmentación en vivo:

cat <<EOF > live-sharding.json
{
  "StreamToDbAndShardMap": {
    "${STREAM_NAME_1}": {
      "shard0_db": "shard0_db",
      "shard1_db": "shard1_db"
    },
    "${STREAM_NAME_2}": {
      "shard2_db": "shard2_db",
      "shard3_db": "shard3_db"
    }
  }
}
EOF

export GCS_LIVE_SHARDING_PATH="gs://${BUCKET_NAME}/config/live-sharding.json"
gcloud storage cp live-sharding.json $GCS_LIVE_SHARDING_PATH

2. Ejecuta el trabajo de Dataflow de migración en vivo

Inicia el trabajo de transmisión de Dataflow para leer desde GCS y escribir en Spanner. Esta plantilla usará las notificaciones de Pub/Sub de GCS para procesar archivos nuevos de forma instantánea.

export JOB_NAME_CDC="mysql-sharded-cdc-to-spanner-$(date +%Y%m%d-%H%M%S)"
export DLQ_DIR_CDC="gs://${BUCKET_NAME}/live-migration"

gcloud dataflow flex-template run $JOB_NAME_CDC \
  --project=$PROJECT_ID \
  --region=$REGION \
--worker-machine-type=n2-highmem-8 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Cloud_Datastream_to_Spanner" \
  --parameters \
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
projectId="$PROJECT_ID",\
inputFileFormat="avro",\
gcsPubSubSubscription="projects/${PROJECT_ID}/subscriptions/${PUBSUB_SUBSCRIPTION}",\
shardingContextFilePath=$GCS_LIVE_SHARDING_PATH,\
deadLetterQueueDirectory="$DLQ_DIR_CDC",\
transformationJarPath=$CUSTOM_JAR_PATH,\
transformationClassName="com.custom.CustomTransformationFetcher",\
schemaOverridesFilePath=$GCS_OVERRIDES_PATH,\
datastreamSourceType="mysql",\
dlqRetryMinutes=1,\
dlqMaxRetryCount=2

Parámetros clave

  • gcsPubSubSubscription: Es la suscripción de Pub/Sub que escucha las notificaciones de archivos nuevos de GCS. Esto permite que el trabajo procese los cambios de inmediato a medida que Datastream los escribe.
  • inputFileFormat="avro": Indica a Dataflow que espere archivos Avro de Datastream. Debe coincidir con la configuración del "Destino" de tu flujo de datos (p. ej., avroFileFormat frente a jsonFileFormat).
  • shardingContextFilePath: Es un archivo JSON que asigna transmisiones de Datastream a fragmentos lógicos.
  • dlqRetryMinutes: Es la cantidad de minutos entre reintentos de la cola de mensajes no entregados. La configuración predeterminada es 10.
  • dlqMaxRetryCount: Es la cantidad máxima de veces que se pueden reintentar los errores temporales a través de DLQ. La configuración predeterminada es 500.

Supervisa el inicio del trabajo en la consola de trabajos de Dataflow.

3. Cómo insertar datos en tiempo real y activar fallas intencionales

Mientras se inicia el trabajo de transmisión de Dataflow (esto puede tardar de 3 a 5 minutos), conectémonos a nuestra primera VM física de MySQL a través de SSH y, luego, insertemos algunos registros nuevos. Insertaremos un registro válido y uno no válido.

Establece una conexión SSH al primer fragmento físico:

gcloud compute ssh mysql-physical-1 --zone=$ZONE

Accede a MySQL:

sudo mysql

Ejecuta las siguientes inserciones en shard1_db:

USE shard1_db;

-- 1. Valid Insert: 'MobileApp_v2' will be trimmed to 'MobileApp'
INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) 
VALUES (4, 501, 99.99, 'MobileApp_v2');

-- 2. Invalid Insert (DLQ Test): This violates Interleave constraint as CustomerId 99999 doesn't exist in Customers table.
INSERT INTO Orders (CustomerId, OrderId, OrderValue, LegacyOrderSystem) 
VALUES (99999, 502, 50.00, 'WebStore_v1');

-- 3. Valid Update
UPDATE Orders SET OrderValue = '1500' WHERE CustomerId = 5 AND OrderId = 202; 

-- 4. Valid Delete
DELETE FROM Orders WHERE CustomerId = 5 AND OrderId = 203; 

EXIT;

Vuelve a escribir exit para regresar al símbolo del sistema de Cloud Shell.

4. Verifica los datos de la migración en vivo y, luego, inspecciona la DLQ de CDC

Ahora que insertamos los datos, Datastream capturará los eventos de CDC y Dataflow intentará aplicarlos a Spanner.

A. Verifica los cambios de DML válidos en Spanner

Ejecuta las siguientes consultas para verificar que los eventos INSERT, UPDATE y DELETE hayan llegado correctamente a Spanner y que la transformación personalizada se haya activado tanto en la inserción como en la actualización.

# 1. Verify INSERT: Should return the new row with transformed OrderSource
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, OrderId, OrderValue, OrderSource FROM Orders WHERE CustomerId = 4 AND OrderId = 501"

# 2. Verify UPDATE: Should show OrderValue changed to 1500
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, OrderId, OrderValue, OrderSource FROM Orders WHERE CustomerId = 5 AND OrderId = 202"

# 3. Verify DELETE: Should return 0, confirming the order was deleted
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT COUNT(*) FROM Orders WHERE CustomerId = 5 AND OrderId = 203"

# 4. Verify DLQ Failure: Should return 0, confirming the row migration failed
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT COUNT(*) FROM Orders WHERE CustomerId = 99999 AND OrderId = 502"

Resultado esperado:

CustomerId: 4
OrderId: 501
OrderValue: 99.99
OrderSource: MobileApp

CustomerId: 5
OrderId: 202
OrderValue: 1500
OrderSource: WebStore

0
0

Nota: Si alguna búsqueda no muestra el resultado esperado, espera un minuto y vuelve a intentarlo, ya que es posible que los trabajadores de transmisión aún estén procesando la cola.

B. Verifica la falla intencional en la DLQ:

Como CustomerId = 99999 no tiene un elemento superior en la tabla Customers, Spanner debería haberlo rechazado y Dataflow debería haberlo enviado de forma segura a la DLQ.

  1. Navega a Cloud Storage en la consola de Google Cloud.
  2. Ve a tu bucket y abre la carpeta live-migration/dlq/severe/.
  3. Deberías ver los archivos JSON recién generados. Haz clic en ellos para inspeccionar el contenido. Verás los detalles de CustomerId = 99999 y el mensaje de error específico de Spanner: NOT_FOUND: Parent row for row [99999,502,WebStore] in table Orders is missing. Row cannot be written."
  4. Los errores de la DLQ de la migración en vivo se pueden reintentar ejecutando la plantilla de Dataflow con runMode=retryDLQ configurado.

5. Cómo controlar los errores de la DLQ

Los errores en el directorio severe/ requieren intervención manual. Corrijamos el problema de datos y volvamos a procesar el evento fallido.

A. Cómo corregir los datos en la fuente

El error se produjo porque falta el registro del cliente principal CustomerId = 99999. Insertémoslo en la base de datos de MySQL de origen.

Vuelve a establecer una conexión SSH a la instancia de MySQL:

gcloud compute ssh mysql-physical-1 --zone=$ZONE

Accede a MySQL con sudo mysql y, luego, inserta la fila principal faltante en shard1_db:

USE shard1_db;

INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LegacyRegion) VALUES
(99999, 'DLQ Parent Holder', 5000.00, 'NORTH_AMERICA');

EXIT;

Escribe exit para volver a Cloud Shell.

B. Ejecuta el trabajo de Dataflow de retryDLQ

Para volver a procesar eventos de la DLQ de severe/, inicia la misma plantilla de Dataflow, pero en modo retryDLQ. Este modo lee específicamente la ruta de acceso deadLetterQueueDirectory/severe, vuelve a ejecutar las transformaciones personalizadas y las aplica a Spanner.

Inicia el trabajo en el modo retryDLQ:

export JOB_NAME_RETRY="mysql-sharded-cdc-retry-$(date +%Y%m%d-%H%M%S)"

gcloud dataflow flex-template run $JOB_NAME_RETRY \
  --project=$PROJECT_ID \
  --region=$REGION \
  --worker-machine-type=n2-highmem-8 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Cloud_Datastream_to_Spanner" \
  --parameters \
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
projectId="$PROJECT_ID",\
runMode="retryDLQ",\
deadLetterQueueDirectory="$DLQ_DIR_CDC",\
datastreamSourceType="mysql",\
transformationJarPath=$CUSTOM_JAR_PATH,\
transformationClassName="com.custom.CustomTransformationFetcher",\
schemaOverridesFilePath=$GCS_OVERRIDES_PATH,\
shardingContextFilePath=$GCS_LIVE_SHARDING_PATH

Cambios en los parámetros clave para los reintentos

  • runMode="retryDLQ": Indica a la plantilla que lea desde el directorio de la DLQ severe.
  • Se quitó gcsPubSubSubscription: No es necesario, ya que no leemos desde el bucket de GCS de Datastream en vivo.

Supervisa el proceso de reintento:

Al igual que la canalización principal de CDC, retryDLQ es una canalización de transmisión que permanecerá RUNNING hasta que se cancele de forma manual.

  1. Ve a la página Trabajo de Dataflow de $JOB_NAME_RETRY.
  2. En el panel Métricas, busca estos dos contadores:
  3. elementsReconsumedFromDeadLetterQueue: Se evalúa cuando se recuperan los archivos de error.
  4. Successful events: Se incrementa cuando el registro se escribe en Spanner.
  5. Verifica si hay fallas recurrentes en el directorio severe/.
  6. Una vez que los eventos de tipo Successful aumenten en la cantidad de elementos que querías volver a intentar (1 en nuestro caso de prueba), ve al siguiente paso de verificación.

C. Verifica los datos reintentados

Después de que se reintente el registro fallido (puede tardar un tiempo en completarse), verifica en Spanner si la fila secundaria se migró correctamente:

gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="SELECT CustomerId, OrderId, OrderValue, OrderSource FROM Orders WHERE CustomerId = 99999 AND OrderId = 502"

Ahora deberías ver la fila:

CustomerId: 99999
OrderId: 502
OrderValue: 50
OrderSource: WebStore

También revisa la carpeta $DLQ_DIR_CDC/severe/ en GCS. Los archivos procesados deberían haberse movido o borrado, lo que indica que el reprocesamiento se realizó correctamente.

11. Configura la replicación inversa (de Spanner a MySQL)

Para controlar situaciones en las que es posible que debas revertir o mantener la base de datos de MySQL original sincronizada con Spanner durante un período de transición, puedes configurar la replicación inversa.

Esta canalización usa flujos de cambios de Spanner para capturar modificaciones en vivo en Spanner. Luego, usa nuestro JAR de transformación personalizada para realizar la asignación inversa de las diferencias de esquema y nuestro JAR de particionamiento personalizado para calcular exactamente en qué VM física de MySQL y en qué partición lógica se debe volver a escribir la actualización.

1. Crea un flujo de cambios de Spanner

Primero, debes crear un flujo de cambios en tu base de datos de Spanner para hacer un seguimiento de los cambios en las tablas Customers y Orders.

export CHANGE_STREAM_NAME="CustomersOrdersChangeStream"

gcloud spanner databases ddl update $SPANNER_DATABASE_NAME \
  --instance=$SPANNER_INSTANCE_NAME \
  --ddl="CREATE CHANGE STREAM $CHANGE_STREAM_NAME FOR Customers, Orders"

Este flujo de cambios ahora registrará todas las modificaciones de datos en las tablas especificadas.

2. Crea una base de datos de Spanner para los metadatos de Dataflow

La plantilla de Spanner to SourceDB Dataflow requiere una base de datos de Spanner independiente para almacenar metadatos y administrar el consumo del flujo de cambios.

export SPANNER_METADATA_DB_NAME="migration-metadata-db"

gcloud spanner databases create $SPANNER_METADATA_DB_NAME \
  --instance=$SPANNER_INSTANCE_NAME

3. Prepara la configuración de la conexión de Cloud SQL para Dataflow

La plantilla de Dataflow necesita un archivo JSON en Cloud Storage que contenga los detalles de conexión de la base de datos de Cloud SQL de destino.

Crea un archivo local llamado shard_config.json:

cat <<EOF > reverse-sharding.json
[
  {
    "logicalShardId": "shard0_db",
    "host": "${MYSQL_IP_1}",
    "port": "3306",
    "user": "datastream_user",
    "password": "complex_password_123",
    "dbName": "shard0_db"
  },
  {
    "logicalShardId": "shard1_db",
    "host": "${MYSQL_IP_1}",
    "port": "3306",
    "user": "datastream_user",
    "password": "complex_password_123",
    "dbName": "shard1_db"
  },
  {
    "logicalShardId": "shard2_db",
    "host": "${MYSQL_IP_2}",
    "port": "3306",
    "user": "datastream_user",
    "password": "complex_password_123",
    "dbName": "shard2_db"
  },
  {
    "logicalShardId": "shard3_db",
    "host": "${MYSQL_IP_2}",
    "port": "3306",
    "user": "datastream_user",
    "password": "complex_password_123",
    "dbName": "shard3_db"
  }
]
EOF

Sube este archivo a tu bucket de GCS:

export GCS_REVERSE_SHARDING_PATH="gs://${BUCKET_NAME}/config/reverse-sharding.json"
gcloud storage cp reverse-sharding.json $GCS_REVERSE_SHARDING_PATH

4. Ejecuta el trabajo de Dataflow de replicación inversa

Inicia el trabajo de Dataflow con la plantilla de Flex Spanner_to_SourceDb.

export JOB_NAME_REVERSE="spanner-sharded-reverse-to-mysql-$(date +%Y%m%d-%H%M%S)"
export DLQ_DIR_REVERSE="gs://${BUCKET_NAME}/reverse-replication"

gcloud dataflow flex-template run $JOB_NAME_REVERSE \
  --project=$PROJECT_ID \
  --region=$REGION \
--worker-machine-type=n2-highmem-8 \
--max-workers=2 \
--num-workers=1 \
--additional-experiments=use_runner_v2 \
--template-file-gcs-location="gs://dataflow-templates-${REGION}/latest/flex/Spanner_to_SourceDb" \
  --parameters \
changeStreamName="$CHANGE_STREAM_NAME",\
instanceId="$SPANNER_INSTANCE_NAME",\
databaseId="$SPANNER_DATABASE_NAME",\
spannerProjectId="$PROJECT_ID",\
metadataInstance="$SPANNER_INSTANCE_NAME",\
metadataDatabase="$SPANNER_METADATA_DB_NAME",\
sourceShardsFilePath="$GCS_REVERSE_SHARDING_PATH",\
transformationJarPath=$CUSTOM_JAR_PATH,\
transformationClassName="com.custom.CustomTransformationFetcher",\
shardingCustomJarPath=$CUSTOM_JAR_PATH,\
shardingCustomClassName="com.custom.CustomShardIdFetcher",\
schemaOverridesFilePath=$GCS_OVERRIDES_PATH,\
deadLetterQueueDirectory=$DLQ_DIR_REVERSE

Parámetros clave

  • changeStreamName: Es el nombre del flujo de cambios de Spanner desde el que se leerá.
  • metadataInstance, metadataDatabase: Instancia o base de datos de Spanner para almacenar los metadatos que usa el conector para controlar el consumo de los datos de la API de flujo de cambios.
  • sourceShardsFilePath: Es la ruta de acceso a tu shard_config.json en GCS.
  • filtrationMode: Especifica cómo descartar ciertos registros según un criterio. El valor predeterminado es forward_migration (filtra los registros escritos con la canalización de migración hacia adelante).
  • shardingCustomJarPath: Es la ruta de acceso de GCS al archivo JAR de Java compilado que creamos antes.
  • shardingCustomClassName: Es el nombre de clase completamente calificado (com.custom.CustomShardIdFetcher) que ejecuta nuestras matemáticas de módulo %4 personalizadas para determinar de forma dinámica qué fragmento lógico debe recibir el registro.

Nota sobre la red: Los trabajadores de Dataflow se conectarán a la instancia de Cloud SQL con la IP pública especificada en shard_config.json. Esta conexión se permite debido a la entrada 0.0.0.0/0 en las redes autorizadas de la instancia de Cloud SQL.

Supervisa el inicio del trabajo en la consola de trabajos de Dataflow.

5. Inyecta datos de Spanner y activa fallas intencionales

Espera a que el trabajo de Dataflow entre en el estado Running (esto puede tardar alrededor de 5 minutos). Luego, ejecutemos un conjunto completo de consultas (INSERT, UPDATE, DELETE) directamente en Spanner, junto con una falla intencional para probar la DLQ inversa.

En Cloud Shell, ejecute lo siguiente:

# All these operations are done on rows mapping to shard0_db for convenience

# Valid INSERT: Insert parent row in Customers
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LoyaltyTier) VALUES (88, 'Reverse Tester', 5000, 'GOLD_TIER')"

# 1. Valid INSERT (Orders): 'WebStore' transformed to 'WebStore_v1'
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="INSERT INTO Orders (CustomerId, OrderId, OrderValue, OrderSource) VALUES (88, 9001, 150.00, 'WebStore')"

# 2. Valid UPDATE (Orders)
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="UPDATE Orders SET OrderValue = 200.00 WHERE CustomerId = 16 AND OrderId = 105 AND OrderSource = 'Partner'"

# 3. Valid DELETE (Orders)
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="DELETE FROM Orders WHERE CustomerId = 12 AND OrderId = 104 AND OrderSource = 'WebStore'"

# 4. INVALID Insert- DLQ Test: CreditLimit=500 will fail check constraint of >1000 at source 
gcloud spanner databases execute-sql $SPANNER_DATABASE_NAME \
    --instance=$SPANNER_INSTANCE_NAME \
    --sql="INSERT INTO Customers (CustomerId, CustomerName, CreditLimit, LoyaltyTier) VALUES (44, 'DLQ Test Customer', 500, 'GOLD_TIER')"

6. Verifica los datos de replicación inversa y revisa la DLQ

Confirmemos que nuestro JAR de Custom Sharding enrutó correctamente CustomerId 88 a shard0_db en nuestra primera VM física y que el JAR de Custom Transformation quitó correctamente "_TIER" de la región.

A. Verifica el registro válido en MySQL:

Establece una conexión SSH al primer fragmento físico:

gcloud compute ssh mysql-physical-1 --zone=$ZONE

Accede a MySQL y consulta shard0_db:

sudo mysql
USE shard0_db;

-- 1. Verify INSERT: Row migrated with transformed LegacyOrderSystem
SELECT CustomerId, OrderId, OrderValue, LegacyOrderSystem 
FROM Orders 
WHERE CustomerId = 88 AND OrderId = 9001;

-- 2. Verify UPDATE: The OrderValue should now be updated to 200.00.
SELECT CustomerId, OrderId, OrderValue, LegacyOrderSystem 
FROM Orders 
WHERE CustomerId = 16 AND OrderId = 105;

-- 3. Verify DELETE: Returns 0 rows, confirming the order was successfully deleted from MySQL.
SELECT CustomerId, OrderId 
FROM Orders 
WHERE CustomerId = 12 AND OrderId = 104;

-- 4. Verify failed replication - this should be in DLQ as CreditLimit < 1000 and will fail stricter check constraint at source 
SELECT CustomerId, CustomerName, CreditLimit, LegacyRegion
FROM Customers
WHERE CustomerId = 44;

EXIT;

El resultado esperado en Cloud SQL debe reflejar los cambios realizados en Spanner.

+------------+---------+------------+-------------------+
| CustomerId | OrderId | OrderValue | LegacyOrderSystem |
+------------+---------+------------+-------------------+
|         88 |    9001 |     150.00 | Webstore_v1       |
+------------+---------+------------+-------------------+

+------------+---------+------------+-------------------+
| CustomerId | OrderId | OrderValue | LegacyOrderSystem |
+------------+---------+------------+-------------------+
|         16 |     105 |     200.00 | Partner_v1        |
+------------+---------+------------+-------------------+

Empty set (0.00 sec)
Empty set (0.00 sec)

Tipo

exit

para volver a Cloud Shell

Esto confirma que la canalización de replicación inversa funciona y sincroniza los cambios de Spanner a Cloud SQL.

B. Verifica la falla intencional en la DLQ

Dado que nuestro nuevo registro Customers tiene un CreditLimit de 500 (lo que incumple la restricción de verificación estricta > 1000 que definimos en nuestra base de datos de MySQL de origen), Dataflow detectó el error de forma segura.

  1. Navega a Cloud Storage en la consola de Google Cloud.
  2. Ve a tu bucket y abre la carpeta dlq/severe/.
  3. Abre el archivo JSON para ver el registro Customers rechazado y el error exacto de incumplimiento de la restricción de verificación.
  4. Los errores de la DLQ de replicación inversa se pueden volver a intentar ejecutando la plantilla de Dataflow con runMode=retryDLQ configurado.

12. Limpia los recursos

Para evitar que se apliquen cargos adicionales a tu cuenta de Google Cloud, borra los recursos que creaste durante este codelab.

Configura las variables de entorno (si es necesario)

Si se agotó el tiempo de espera de tu sesión de Cloud Shell o abriste una terminal nueva, deberás volver a exportar tus variables de entorno antes de ejecutar los comandos de limpieza.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1" # Or your preferred region
export ZONE="us-central1-a" # Or a zone within your selected region
export SPANNER_INSTANCE_NAME="target-spanner-instance"
export SPANNER_DATABASE_NAME="sharded-target-db"
export SPANNER_CONFIG="regional-${REGION}"
export BUCKET_NAME="migration-${PROJECT_ID}-bucket"
export MYSQL_IP_1=$(gcloud compute instances describe mysql-physical-1 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)')
export MYSQL_IP_2=$(gcloud compute instances describe mysql-physical-2 --zone=$ZONE --format='get(networkInterfaces[0].accessConfigs[0].natIP)')
export SQL_CP_NAME_1="mysql-src-cp-1"
export SQL_CP_NAME_2="mysql-src-cp-2"
export GCS_CP_NAME="gcs-dest-cp"
export STREAM_NAME_1="mysql-to-spanner-stream-1"
export GCS_STREAM_PATH_1="data/${STREAM_NAME_1}"
export STREAM_NAME_2="mysql-to-spanner-stream-2"
export GCS_STREAM_PATH_2="data/${STREAM_NAME_2}"
export PUBSUB_TOPIC="datastream-gcs-updates"
export PUBSUB_SUBSCRIPTION="datastream-gcs-sub"
export CUSTOM_JAR_PATH="gs://${BUCKET_NAME}/custom-logic/spanner-custom-shard-1.0.jar"
export OVERRIDES_FILE="spanner_overrides.json" 
export GCS_OVERRIDES_PATH="gs://${BUCKET_NAME}/config/${OVERRIDES_FILE}"
export GCS_SHARDING_PATH="gs://${BUCKET_NAME}/config/sharding.json"
export OUTPUT_DIR="gs://${BUCKET_NAME}/bulk-migration"
export GCS_LIVE_SHARDING_PATH="gs://${BUCKET_NAME}/config/live-sharding.json"
export DLQ_DIR_CDC="gs://${BUCKET_NAME}/live-migration"
export CHANGE_STREAM_NAME="CustomersOrdersChangeStream"
export SPANNER_METADATA_DB_NAME="migration-metadata-db"
export GCS_REVERSE_SHARDING_PATH="gs://${BUCKET_NAME}/config/reverse-sharding.json"
export DLQ_DIR_REVERSE="gs://${BUCKET_NAME}/reverse-replication"

Detén los trabajos de transmisión de Dataflow

Enumera tus trabajos para encontrar los IDs de los trabajos de Dataflow en ejecución. Exporta JOB_ID_CDC y JOB_ID_REVERSE según corresponda.

gcloud dataflow jobs list --region=$REGION --filter="state=Running"
export JOB_ID_CDC=<PASTE_JOB_ID_HERE>
export JOB_ID_CDC_RETRY=<PASTE_JOB_ID_HERE>
export JOB_ID_REVERSE=<PASTE_JOB_ID_HERE>

Cancela el trabajo de Datastream to Spanner (migración en vivo) y su trabajo de reintento:

gcloud dataflow jobs cancel $JOB_ID_CDC --region=$REGION --project=$PROJECT_ID

gcloud dataflow jobs cancel $JOB_ID_CDC_RETRY --region=$REGION --project=$PROJECT_ID

Cancela el trabajo de Spanner to Cloud SQL (replicación inversa):

gcloud dataflow jobs cancel $JOB_ID_REVERSE --region=$REGION --project=$PROJECT_ID

Borra los recursos de Datastream

Detén y borra la transmisión:

gcloud datastream streams update $STREAM_NAME_1 \
  --location=$REGION --state=PAUSED --project=$PROJECT_ID
# Wait a moment for the stream to pause
gcloud datastream streams delete $STREAM_NAME_1 \
  --location=$REGION --project=$PROJECT_ID --quiet

gcloud datastream streams update $STREAM_NAME_2 \
  --location=$REGION --state=PAUSED --project=$PROJECT_ID
# Wait a moment for the stream to pause
gcloud datastream streams delete $STREAM_NAME_2 \
  --location=$REGION --project=$PROJECT_ID --quiet

# Delete Connection Profiles
gcloud datastream connection-profiles delete $SQL_CP_NAME_1 \
  --location=$REGION --project=$PROJECT_ID --quiet
gcloud datastream connection-profiles delete $SQL_CP_NAME_2 \
  --location=$REGION --project=$PROJECT_ID --quiet
gcloud datastream connection-profiles delete $GCS_CP_NAME \
  --location=$REGION --project=$PROJECT_ID --quiet

Borra las VMs de MySQL de origen (Compute Engine)

Borra las dos instancias de Compute Engine que simularon los fragmentos físicos de MySQL locales.

gcloud compute instances delete mysql-physical-1 mysql-physical-2 --zone=$ZONE --quiet

Cómo borrar reglas de firewall

Quita las reglas de firewall de red creadas para permitir el acceso SSH y la conectividad de Datastream a tus VMs. (Nota: Si usaste nombres diferentes para tus reglas de firewall anteriormente en el codelab, ajústalos aquí).

gcloud compute firewall-rules delete allow-ssh-iap --quiet
gcloud compute firewall-rules delete allow-mysql-datastream --quiet

Borra recursos de Pub/Sub

Borra la suscripción:

gcloud pubsub subscriptions delete $PUBSUB_SUBSCRIPTION \
  --project=$PROJECT_ID --quiet

Borrar tema:

gcloud pubsub topics delete $PUBSUB_TOPIC \
  --project=$PROJECT_ID --quiet

Borra la instancia de Cloud Spanner

Borra la instancia de Cloud Spanner (esto borra automáticamente las bases de datos sharded-target-db y migration-metadata-db que contiene).

gcloud spanner instances delete $SPANNER_INSTANCE_NAME \
  --project=$PROJECT_ID --quiet

Borra el bucket y el contenido de GCS

Por último, borra el bucket de Cloud Storage que contiene los archivos de Datastream, la configuración de Dataflow y las colas de mensajes no entregados. El comando rm -r borra de forma recursiva el bucket y todo su contenido.

gcloud storage rm --recursive gs://${BUCKET_NAME}

Borra archivos locales de Cloud Shell

Para limpiar los archivos y directorios locales que se generaron en Cloud Shell durante este codelab, ejecuta los siguientes comandos:

# Remove the JSON configuration files
rm -f sharding.json live-sharding.json reverse-sharding.json spanner_overrides.json

# Remove the cloned Google Cloud DataflowTemplates repository
rm -rf DataflowTemplates