Como criar a captura de dados de alteração usando o Dataproc e o Cloud Pub/Sub

1. Introdução

df8070bd84336207.png

Última atualização:19/06/2025

O que é captura de dados alterados?

A captura de dados alterados (CDC) é um conjunto de padrões de design de software usados para determinar e rastrear dados que foram alterados em um banco de dados. Em termos mais simples, é uma maneira de capturar e registrar mudanças feitas nos dados para que elas possam ser replicadas em outros sistemas.

A captura de dados alterados (CDC) é muito útil em uma ampla variedade de cenários orientados a dados, como migração de dados, data warehousing e análises em tempo real, recuperação de desastres e alta disponibilidade, auditoria e compliance etc.

Migração de dados

A CDC simplifica os projetos de migração de dados ao permitir a transferência incremental de dados, reduzindo o tempo de inatividade e minimizando a interrupção.

Armazenamento de dados e análises em tempo real

A CDC garante que os data warehouses e os sistemas analíticos sejam atualizados constantemente com as mudanças mais recentes dos bancos de dados operacionais.

Assim, as empresas tomam decisões com base em informações em tempo real.

Recuperação de desastres e alta disponibilidade

A CDC permite a replicação de dados em tempo real para bancos de dados secundários com fins de recuperação de desastres. Em caso de falha, o CDC permite um failover rápido para um banco de dados secundário, minimizando o tempo de inatividade e a perda de dados.

Auditoria e compliance

A CDC fornece uma trilha de auditoria detalhada das mudanças de dados, o que é essencial para a conformidade e os requisitos regulatórios.

O que você vai criar

Neste codelab, você vai criar um pipeline de dados de captura de dados de mudança (CDC, na sigla em inglês) usando o Cloud Pub/Sub, o Dataproc, o Python e o Apache Spark. O pipeline vai:

  • Simule mudanças no banco de dados e publique-as como eventos no Cloud Pub/Sub, um serviço de mensagens escalonável e confiável.
  • Aproveite o poder do Dataproc, o serviço gerenciado do Spark e do Hadoop do Google Cloud, para processar esses eventos em tempo real.

Ao conectar esses serviços, você cria um pipeline robusto capaz de capturar e processar mudanças de dados à medida que elas ocorrem, fornecendo uma base para análises em tempo real, data warehousing e outros aplicativos essenciais.

O que você vai aprender

  • Como criar um pipeline básico de captura de dados alterados
  • Dataproc para processamento de stream
  • Cloud Pub/Sub para mensagens em tempo real
  • Conceitos básicos do Apache Spark

Este codelab se concentra no Dataproc e no Cloud Pub/Sub. Conceitos e blocos de códigos sem relevância não serão abordados. Eles são incluídos somente para você copiar e colar.

O que é necessário

  • uma conta ativa do GCP com um projeto configurado. Se você não tiver uma, inscreva-se para um teste sem custo financeiro.
  • A CLI gcloud instalada e configurada.
  • Python 3.7 ou mais recente instalado para simular mudanças no banco de dados e interagir com o Pub/Sub.
  • Conhecimento básico do Dataproc, Cloud Pub/Sub, Apache Spark e Python.

Antes de começar

Execute o seguinte comando no terminal para ativar as APIs necessárias:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. configurar o Cloud Pub/Sub

Criar um tópico

Esse tópico será usado para publicar as mudanças no banco de dados. O job do Dataproc será o consumidor dessas mensagens e as processará para captura de dados alterados. Para saber mais sobre tópicos, leia a documentação oficial aqui.

gcloud pubsub topics create database-changes

Criar uma assinatura

Crie uma assinatura que será usada para consumir as mensagens no Pub/Sub. Para saber mais sobre assinaturas, leia a documentação oficial aqui.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Simular mudanças no banco de dados

Etapas

  1. Crie um script em Python (por exemplo, simulate_cdc.py) para simular mudanças no banco de dados e publicá-las no Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Substitua YOUR_PROJECT_ID pelo ID do seu projeto do GCP.

  1. Instale a biblioteca de cliente do Pub/Sub:
pip install google-cloud-pubsub
  1. Execute o script no terminal. Esse script será executado continuamente e publicará mensagens a cada dois segundos no tópico do Pub/Sub.
python simulate_cdc.py
  1. Depois de executar o script por um minuto, por exemplo, você terá mensagens suficientes no Pub/Sub para consumir. Para encerrar o script Python em execução, pressione Ctrl + C ou Cmd + C, dependendo do seu SO.
  2. Ver mensagens publicadas:

Abra outro terminal e execute o seguinte comando para ver as mensagens publicadas:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Você vai encontrar uma linha da tabela com a mensagem e outros campos:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Explicação

  • O script Python simula mudanças no banco de dados gerando aleatoriamente eventos INSERT, UPDATE ou DELETE.
  • Cada mudança é representada como um objeto JSON que contém o tipo de mudança, o ID do registro, o carimbo de data/hora e os dados.
  • O script usa a biblioteca de cliente do Cloud Pub/Sub para publicar esses eventos de mudança no tópico database-changes.
  • O comando do assinante permite visualizar as mensagens que estão sendo enviadas ao tópico do Pub/Sub.

4. Criar uma conta de serviço para o Dataproc

Nesta seção, você vai criar uma conta de serviço que o cluster do Dataproc pode usar. Você também atribui as permissões necessárias para permitir que as instâncias do cluster acessem o Cloud Pub/Sub e o Dataproc.

  1. Crie uma conta de serviço:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Adicione o papel de worker do Dataproc para permitir que a conta de serviço crie clusters e execute jobs. Adicione o ID da conta de serviço gerado no comando anterior como membro no comando abaixo:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Adicione a função de assinante do Pub/Sub para permitir que a conta de serviço assine a assinatura do Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. crie um cluster do Dataproc

O cluster do Dataproc vai executar o app Spark, que vai processar as mensagens no Pub/Sub. Você vai precisar da conta de serviço criada na seção anterior. O Dataproc atribui essa conta de serviço a todas as instâncias do cluster para que todas as instâncias recebam as permissões corretas para executar o app.

Use o comando a seguir para criar um cluster do Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Enviar o job do Spark para o cluster do Dataproc

O app de streaming do Spark processa as mensagens de mudança do banco de dados no Pub/Sub e as imprime no console.

Etapas

  1. Crie um diretório e adicione o código-fonte do consumidor ao arquivo PubsubConsumer.scala.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Crie e adicione o seguinte ao pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Mude para o diretório do Spark do projeto e salve o caminho em uma variável de ambiente para usar mais tarde:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Mude o diretório:
cd $REPO_ROOT/spark
  1. Faça o download do Java 1.8 e coloque a pasta em /usr/lib/jvm/. Em seguida, mude o JAVA_HOME para apontar para:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Criar o jar do aplicativo
mvn clean package

O arquivo spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar que contém o código do aplicativo e as dependências é criado no diretório spark/target.

  1. Envie o aplicativo Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Exiba a lista de jobs ativos e observe o valor de JOB_ID para o trabalho:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

A saída será semelhante a

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Veja o resultado do job abrindo o URL a seguir no seu navegador. Substitua [JOB_ID] pelo valor anotado na etapa anterior.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. O resultado será assim:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

O job de streaming do Spark em execução no Dataproc extrai mensagens do Pub/Sub, as processa e mostra a saída no console.

  1. Encerre o job: execute o seguinte comando para encerrar o job. Substitua JOB_ID pelo mesmo que anotamos anteriormente.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Parabéns! Você acabou de criar um pipeline de CDC eficiente que captura as mudanças do banco de dados no Pub/Sub e as processa usando o streaming do Spark no Cloud Dataproc.

7. Limpar

Limpe todos os recursos que você criou para não ser cobrado por eles no futuro. O jeito mais fácil de evitar o faturamento é excluindo o projeto criado para este tutorial. Como alternativa, exclua os recursos individuais.

Execute os comandos a seguir para excluir recursos individuais

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Parabéns

Parabéns! Você acabou de concluir um codelab prático que demonstra como criar um pipeline de dados robusto em tempo real usando o Google Cloud Platform. Vamos recapitular o que você fez:

  • Captura de dados de alterações (CDC) simulada: você aprendeu os fundamentos da CDC e implementou um script Python para simular mudanças no banco de dados, gerando eventos que representam modificações de dados em tempo real.
  • Uso do Cloud Pub/Sub:você configura tópicos e assinaturas do Cloud Pub/Sub, fornecendo um serviço de mensagens escalonável e confiável para transmitir seus eventos de CDC. Você publicou as mudanças simuladas do banco de dados no Pub/Sub, criando um fluxo de dados em tempo real.
  • Dados processados com o Dataproc e o Spark:você provisionou um cluster do Dataproc e implantou um job do Spark Streaming para consumir mensagens da sua assinatura do Pub/Sub. Você processou e transformou os eventos de CDC recebidos em tempo real, mostrando os resultados nos registros de jobs do Dataproc.
  • Criou um pipeline completo em tempo real:você integrou esses serviços para criar um pipeline de dados completo que captura, transmite e processa mudanças de dados em tempo real. Você ganhou experiência prática na criação de um sistema que pode processar fluxos de dados contínuos.
  • Usou o conector do Pub/Sub do Spark:você configurou um cluster do Dataproc para usar o conector do Pub/Sub do Spark, que é essencial para o Streaming estruturado do Spark ler dados do Pub/Sub.

Agora você tem uma base sólida para criar pipelines de dados mais complexos e sofisticados para vários aplicativos, incluindo análises em tempo real, armazenamento em data warehouse e arquiteturas de microsserviços. Continue aprendendo e criando!

Documentos de referência