Tworzenie rozwiązania do rejestrowania zmian za pomocą Dataproc i Cloud Pub/Sub

1. Wprowadzenie

df8070bd84336207.png

Ostatnia aktualizacja: 19 czerwca 2025 r.

Co to jest przechwytywanie zmian danych?

Przechwytywanie zmian danych (CDC) to zestaw wzorców projektowania oprogramowania służących do określania i śledzenia danych, które uległy zmianie w bazie danych. Mówiąc prościej, jest to sposób rejestrowania zmian wprowadzonych w danych, aby można było je odtworzyć w innych systemach.

Przechwytywanie zmian danych (CDC) jest niezwykle przydatne w wielu scenariuszach opartych na danych, takich jak migracja danych, magazynowanie i analiza danych w czasie rzeczywistym, odtwarzanie po awarii i wysoka dostępność, audyt i zgodność itp.

Migracja danych

CDC upraszcza projekty migracji danych, ponieważ umożliwia przyrostowe przesyłanie danych, co ogranicza przestoje i minimalizuje zakłócenia.

Hurtownie danych i analityka w czasie rzeczywistym

CDC zapewnia, że hurtownie danych i systemy analityczne są stale aktualizowane o najnowsze zmiany z operacyjnych baz danych.

Dzięki temu firmy mogą podejmować decyzje na podstawie informacji w czasie rzeczywistym.

Odtwarzanie awaryjne i wysoka dostępność

CDC umożliwia replikację danych w czasie rzeczywistym do dodatkowych baz danych na potrzeby odtwarzania awaryjnego. W przypadku awarii CDC umożliwia szybkie przełączenie awaryjne na dodatkową bazę danych, co minimalizuje przestoje i utratę danych.

Kontrola i zgodność z przepisami

CDC zapewnia szczegółową ścieżkę audytu zmian danych, co jest niezbędne do zachowania zgodności z wymaganiami prawnymi i regulacyjnymi.

Co utworzysz

W tym module dowiesz się, jak utworzyć potok danych do przechwytywania zmian danych (CDC) za pomocą Cloud Pub/Sub, Dataproc, Pythona i Apache Spark. Potok:

  • Symuluj zmiany w bazie danych i publikuj je jako zdarzenia w Cloud Pub/Sub, skalowalnej i niezawodnej usłudze przesyłania wiadomości.
  • Wykorzystaj moc Dataproc, zarządzanej usługi Spark i Hadoop w Google Cloud, aby przetwarzać te zdarzenia w czasie rzeczywistym.

Połączenie tych usług utworzy solidny potok, który będzie w stanie rejestrować i przetwarzać zmiany danych w miarę ich występowania, co zapewni podstawę do analizy w czasie rzeczywistym, hurtowni danych i innych kluczowych aplikacji.

Czego się nauczysz

  • Jak utworzyć podstawowy potok przechwytywania zmian danych
  • Dataproc do przetwarzania strumieniowego
  • Cloud Pub/Sub do przesyłania wiadomości w czasie rzeczywistym
  • Podstawy Apache Spark

Ten moduł dotyczy głównie Dataproc i Cloud Pub/Sub. Nieistotne koncepcje i bloki kodu zostały pominięte. Można je po prostu skopiować i wkleić.

Czego potrzebujesz

  • aktywne konto GCP z konfiguracją projektu; Jeśli nie masz konta, możesz zarejestrować się na bezpłatną wersję próbną.
  • Zainstalowany i skonfigurowany interfejs wiersza poleceń gcloud.
  • Zainstalowana wersja Pythona 3.7 lub nowsza do symulowania zmian w bazie danych i interakcji z Pub/Sub.
  • Podstawowa znajomość usług Dataproc, Cloud Pub/Sub, Apache Spark i Pythona.

Zanim rozpoczniesz

Aby włączyć wymagane interfejsy API, uruchom w terminalu to polecenie:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Konfigurowanie Cloud Pub/Sub

Tworzenie tematu

Ten temat będzie używany do publikowania zmian w bazie danych. Zadanie Dataproc będzie odbiorcą tych wiadomości i będzie je przetwarzać w celu przechwytywania zmian danych. Więcej informacji o tematach znajdziesz w oficjalnej dokumentacji tutaj.

gcloud pubsub topics create database-changes

Tworzenie subskrypcji

Utwórz subskrypcję, która będzie używana do pobierania wiadomości z Pub/Sub. Więcej informacji o subskrypcjach znajdziesz w oficjalnej dokumentacji tutaj.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Symulowanie zmian w bazie danych

Kroki

  1. Utwórz skrypt w języku Python (np. simulate_cdc.py), aby symulować zmiany w bazie danych i publikować je w Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Zastąp ID_TWOJEGO_PROJEKTU identyfikatorem projektu GCP.

  1. Zainstaluj bibliotekę klienta Pub/Sub:
pip install google-cloud-pubsub
  1. Uruchom skrypt w terminalu. Ten skrypt będzie działać w sposób ciągły i publikować wiadomości w temacie Pub/Sub co 2 sekundy.
python simulate_cdc.py
  1. Po uruchomieniu skryptu na przykład na 1 minutę będziesz mieć w Pub/Sub wystarczającą liczbę wiadomości do przetworzenia. Aby zakończyć działanie skryptu w języku Python, naciśnij Ctrl + C lub Cmd + C (w zależności od systemu operacyjnego).
  2. Wyświetlanie opublikowanych wiadomości:

Otwórz inny terminal i uruchom to polecenie, aby wyświetlić opublikowane wiadomości:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Powinien wyświetlić się wiersz tabeli zawierający wiadomość i inne pola:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Wyjaśnienie

  • Skrypt w Pythonie symuluje zmiany w bazie danych, losowo generując zdarzenia INSERT, UPDATE lub DELETE.
  • Każda zmiana jest reprezentowana jako obiekt JSON zawierający typ zmiany, identyfikator rekordu, sygnaturę czasową i dane.
  • Skrypt używa biblioteki klienta Cloud Pub/Sub do publikowania tych zdarzeń zmiany w temacie database-changes.
  • Polecenie subskrybenta umożliwia wyświetlanie wiadomości wysyłanych do tematu Pub/Sub.

4. Tworzenie konta usługi Dataproc

W tej sekcji utworzysz konto usługi, z którego może korzystać klaster Dataproc. Musisz też przypisać niezbędne uprawnienia, aby umożliwić instancjom klastra dostęp do Cloud Pub/Sub i Dataproc.

  1. Utwórz konto usługi:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Dodaj rolę pracownika Dataproc, aby umożliwić kontu usługi tworzenie klastrów i uruchamianie zadań. Dodaj identyfikator konta usługi wygenerowany w poprzednim poleceniu jako element w poleceniu poniżej:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Dodaj rolę subskrybującego Pub/Sub, aby umożliwić kontu usługi subskrybowanie subskrypcji Pub/Sub „change-subscriber”:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Tworzenie klastra Dataproc

Klaster Dataproc uruchomi aplikację Spark, która będzie przetwarzać wiadomości w Pub/Sub. Będziesz potrzebować konta usługi utworzonego w poprzedniej sekcji. Dataproc przypisuje to konto usługi do każdej instancji w klastrze, aby wszystkie instancje miały odpowiednie uprawnienia do uruchamiania aplikacji.

Aby utworzyć klaster Dataproc, użyj tego polecenia:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Przesyłanie zadania Spark do klastra Dataproc

Aplikacja do przetwarzania strumieniowego Spark przetwarza wiadomości o zmianach w bazie danych w Pub/Sub i wyświetla je w konsoli.

Kroki

  1. Utwórz katalog i dodaj kod źródłowy konsumenta do pliku PubsubConsumer.scala.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Utwórz i dodaj do pliku pom.xml:
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Przejdź do katalogu spark projektu i zapisz ścieżkę w zmiennej środowiskowej, aby użyć jej później:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Zmień katalog:
cd $REPO_ROOT/spark
  1. Pobierz Javę 1.8 i umieść folder w /usr/lib/jvm/. Następnie zmień zmienną JAVA_HOME, aby wskazywała ten katalog:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Tworzenie pliku JAR aplikacji
mvn clean package

W katalogu spark/target zostanie utworzone archiwum spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar zawierające kod aplikacji i zależności.

  1. Prześlij aplikację Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Wyświetl listę aktywnych zadań i zanotuj wartość JOB_ID dla każdego z nich:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

Dane wyjściowe będą podobne do

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Aby wyświetlić dane wyjściowe zadania, otwórz w przeglądarce ten adres URL. Zastąp [JOB_ID] wartością zapisaną w poprzednim kroku.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. Dane wyjściowe są podobne do tych:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Zadanie przesyłania strumieniowego Spark działające w Dataproc pobiera wiadomości z Pub/Sub, przetwarza je i wyświetla dane wyjściowe w konsoli.

  1. Zakończ zadanie: aby zakończyć zadanie, uruchom to polecenie. Zastąp JOB_ID tym samym identyfikatorem, który został zapisany wcześniej.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Gratulacje! Właśnie utworzyliśmy zaawansowany potok CDC, który przechwytuje zmiany w bazie danych w Pub/Sub i przetwarza je za pomocą strumieniowego przetwarzania Spark działającego w Cloud Dataproc.

7. Czyszczenie danych

Usuń wszystkie utworzone zasoby, aby w przyszłości nie obciążać Cię za nie opłatami. Najprostszym sposobem na uniknięcie płatności jest usunięcie projektu utworzonego w tym samouczku. Możesz też usunąć poszczególne zasoby.

Aby usunąć poszczególne zasoby, uruchom te polecenia:

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Gratulacje

Gratulacje! Właśnie udało Ci się ukończyć praktyczne ćwiczenie, które pokazuje, jak utworzyć niezawodny potok danych w czasie rzeczywistym za pomocą Google Cloud Platform. Podsumujmy, co udało Ci się osiągnąć:

  • Symulowane przechwytywanie zmian danych (CDC): poznaliśmy podstawy CDC i wdrożyliśmy skrypt w Pythonie, aby symulować zmiany w bazie danych, generując zdarzenia reprezentujące modyfikacje danych w czasie rzeczywistym.
  • Wykorzystanie Cloud Pub/Sub: konfigurujesz tematy i subskrypcje Cloud Pub/Sub, zapewniając skalowalną i niezawodną usługę przesyłania wiadomości do strumieniowego przesyłania zdarzeń CDC. Opublikowano symulowane zmiany w bazie danych w Pub/Sub, tworząc strumień danych w czasie rzeczywistym.
  • Przetwarzanie danych za pomocą Dataproc i Spark: utworzono klaster Dataproc i wdrożono zadanie Spark Streaming, aby wykorzystywać wiadomości z subskrypcji Pub/Sub. Przetworzono i przekształcono przychodzące zdarzenia CDC w czasie rzeczywistym, a wyniki wyświetlono w dziennikach zadań Dataproc.
  • Utworzenie kompleksowego potoku w czasie rzeczywistym: udało Ci się zintegrować te usługi, aby utworzyć kompletny potok danych, który rejestruje, przesyła strumieniowo i przetwarza zmiany danych w czasie rzeczywistym. Zdobędziesz praktyczne doświadczenie w budowaniu systemu, który może obsługiwać ciągłe strumienie danych.
  • Użyto oprogramowania sprzęgającego Spark Pub/Sub: udało Ci się skonfigurować klaster Dataproc do używania oprogramowania sprzęgającego Spark Pub/Sub, które jest niezbędne do odczytywania danych z Pub/Sub przez Spark Structured Streaming.

Masz teraz solidne podstawy do tworzenia bardziej złożonych i zaawansowanych potoków danych na potrzeby różnych aplikacji, w tym analizy w czasie rzeczywistym, hurtowni danych i architektur mikroserwisów. Odkrywaj i twórz dalej.

Dokumentacja