Dataproc ve Cloud Pub/Sub'u kullanarak Değişiklik Verileri Yakalama oluşturma

1. Giriş

df8070bd84336207.png

Son güncelleme: 2025-06-19

Değişiklik verilerini yakalama nedir?

Değişiklik Veri Yakalama (CDC), bir veritabanında değişen verileri belirlemek ve izlemek için kullanılan bir dizi yazılım tasarım kalıbıdır. Daha basit bir ifadeyle, verilerde yapılan değişiklikleri yakalayıp kaydederek bu değişikliklerin diğer sistemlere kopyalanmasını sağlayan bir yöntemdir.

Değişiklik verisi yakalama (CDC), veri taşıma, gerçek zamanlı veri ambarı ve analiz, olağanüstü durum kurtarma ve yüksek kullanılabilirlik, denetim ve uygunluk gibi çok çeşitli veri odaklı senaryolarda inanılmaz derecede faydalıdır.

Veri taşıma

CDC, artımlı veri aktarımına olanak tanıyarak veri taşıma projelerini basitleştirir, kapalı kalma süresini azaltır ve kesintiyi en aza indirir.

Gerçek Zamanlı Veri Ambarlama ve Analiz

CDC, veri ambarlarının ve analitik sistemlerin operasyonel veritabanlarındaki en son değişikliklerle sürekli olarak güncellenmesini sağlar.

Bu sayede işletmeler, anlık bilgilere dayalı kararlar alabilir.

Olağanüstü Durum Kurtarma ve Yüksek Kullanılabilirlik

CDC, olağanüstü durum kurtarma amacıyla verilerin ikincil veritabanlarına gerçek zamanlı olarak kopyalanmasını sağlar. Bir hata durumunda CDC, ikincil bir veritabanına hızlı yük devretmeye olanak tanıyarak kapalı kalma süresini ve veri kaybını en aza indirir.

Denetim ve Uygunluk

CDC, veri değişiklikleriyle ilgili ayrıntılı bir denetim izi sağlar. Bu, uyumluluk ve yasal gereklilikler için önemlidir.

Ne oluşturacaksınız?

Bu codelab'de Cloud Pub/Sub, Dataproc, Python ve Apache Spark'ı kullanarak bir değişiklik yakalama (CDC) veri ardışık düzeni oluşturacaksınız. Ardışık düzeniniz:

  • Veritabanı değişikliklerini simüle edin ve bunları ölçeklenebilir ve güvenilir bir mesajlaşma hizmeti olan Cloud Pub/Sub'a etkinlik olarak yayınlayın.
  • Bu etkinlikleri gerçek zamanlı olarak işlemek için Google Cloud'un yönetilen Spark ve Hadoop hizmeti olan Dataproc'un gücünden yararlanın.

Bu hizmetleri bağlayarak, veri değişikliklerini gerçekleştiği anda yakalayıp işleyebilen, gerçek zamanlı analiz, veri ambarı ve diğer kritik uygulamalar için temel oluşturan güçlü bir ardışık düzen oluşturursunuz.

Neler öğreneceksiniz?

  • Temel bir değişiklik verisi yakalama ardışık düzeni oluşturma
  • Akış işleme için Dataproc
  • Anlık mesajlaşma için Cloud Pub/Sub
  • Apache Spark'ın temelleri

Bu codelab, Dataproc ve Cloud Pub/Sub'a odaklanmaktadır. Alakalı olmayan kavramlar ve kod blokları işaretlenmiştir ve yalnızca kopyalayıp yapıştırmanız için paylaşılmıştır.

Gerekenler

  • Proje ayarlanmış etkin bir GCP hesabı Hesabınız yoksa ücretsiz deneme sürümüne kaydolabilirsiniz.
  • gcloud CLI yüklü ve yapılandırılmış olmalıdır.
  • Veritabanı değişikliklerini simüle etmek ve Pub/Sub ile etkileşim kurmak için Python 3.7 veya sonraki bir sürümün yüklü olması gerekir.
  • Dataproc, Cloud Pub/Sub, Apache Spark ve Python hakkında temel bilgiler.

Başlamadan önce

Gerekli API'leri etkinleştirmek için terminalde aşağıdaki komutu çalıştırın:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Cloud Pub/Sub'ı ayarlama

Konu oluşturma

Bu konu, veritabanı değişikliklerini yayınlamak için kullanılır. Dataproc işi, bu iletilerin tüketicisi olacak ve değişiklikle ilgili veri yakalama için iletileri işleyecek. Topics hakkında daha fazla bilgi edinmek istiyorsanız resmi dokümanları buradan okuyabilirsiniz.

gcloud pubsub topics create database-changes

Abonelik oluşturma

Pub/Sub'daki mesajları kullanmak için bir abonelik oluşturun. Abonelikler hakkında daha fazla bilgi edinmek için resmi dokümanları burada bulabilirsiniz.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Veritabanı Değişikliklerini Simüle Etme

Adımlar

  1. Veritabanı değişikliklerini simüle etmek ve bunları Pub/Sub'da yayınlamak için bir Python komut dosyası (ör. simulate_cdc.py) oluşturun.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

YOUR_PROJECT_ID kısmını gerçek GCP proje kimliğinizle değiştirin.

  1. Pub/Sub istemci kitaplığını yükleyin:
pip install google-cloud-pubsub
  1. Komut dosyasını terminalinizde çalıştırın. Bu komut dosyası sürekli olarak çalışır ve Pub/Sub konusuna her 2 saniyede bir mesaj yayınlar.
python simulate_cdc.py
  1. Komut dosyasını örneğin 1 dakika çalıştırdıktan sonra Pub/Sub'ta tüketebileceğiniz yeterli sayıda mesaj olur. Çalışan Python komut dosyasını, işletim sisteminize bağlı olarak Ctrl + C veya Cmd + C tuşlarına basarak sonlandırabilirsiniz.
  2. Yayınlanan Mesajları Görüntüleme:

Başka bir terminal açın ve yayınlanan mesajları görüntülemek için aşağıdaki komutu çalıştırın:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Mesajı ve diğer alanları içeren bir tablo satırı görürsünüz:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Açıklama

  • Python komut dosyası, INSERT, UPDATE veya DELETE etkinliklerini rastgele oluşturarak veritabanı değişikliklerini simüle eder.
  • Her değişiklik, değişiklik türünü, kayıt kimliğini, zaman damgasını ve verileri içeren bir JSON nesnesi olarak gösterilir.
  • Komut dosyası, bu değişiklik etkinliklerini database-changes konusuna yayınlamak için Cloud Pub/Sub istemci kitaplığını kullanır.
  • Abone komutu, pub/sub konusuna gönderilen mesajları görüntülemenize olanak tanır.

4. Dataproc için hizmet hesabı oluşturma

Bu bölümde, Dataproc kümesinin kullanabileceği bir hizmet hesabı oluşturursunuz. Ayrıca, küme örneklerinin Cloud Pub/Sub ve Dataproc'a erişmesine izin vermek için gerekli izinleri de atarsınız.

  1. Hizmet hesabı oluşturun:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Hizmet hesabının küme oluşturmasına ve iş çalıştırmasına izin vermek için Dataproc çalışanı rolünü ekleyin. Önceki komutta oluşturulan hizmet hesabı kimliğini aşağıdaki komuta üye olarak ekleyin:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Hizmet hesabının "change-subscriber" Pub/Sub aboneliğine abone olmasına izin vermek için Pub/Sub abone rolünü ekleyin:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Dataproc kümesi oluşturma

Dataproc kümesi, Pub/Sub'daki mesajları işleyecek Spark uygulamasını çalıştırır. Önceki bölümde oluşturulan hizmet hesabına ihtiyacınız vardır. Dataproc, bu hizmet hesabını kümedeki her örneğe atar. Böylece tüm örnekler, uygulamayı çalıştırmak için doğru izinleri alır.

Dataproc kümesi oluşturmak için aşağıdaki komutu kullanın:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Spark işini Dataproc kümesine gönderme

Spark akış uygulaması, Pub/Sub'daki veritabanı değişikliği mesajlarını işler ve bunları konsola yazdırır.

Adımlar

  1. Bir dizin oluşturun ve tüketicinin kaynak kodunu PubsubConsumer.scala dosyasına ekleyin.
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Aşağıdakileri oluşturup pom.xml dosyasına ekleyin
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Projenin Spark dizinine geçin ve yolu daha sonra kullanılacak bir ortam değişkenine kaydedin:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Dizini değiştirin:
cd $REPO_ROOT/spark
  1. Java 1.8'i indirip klasörü /usr/lib/jvm/ konumuna yerleştirin. Ardından JAVA_HOME'u şu şekilde değiştirin:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Uygulama JAR'ını oluşturma
mvn clean package

Uygulama kodunu ve bağımlılıkları içeren spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar arşivi, spark/target dizininde oluşturulur.

  1. Spark uygulamasını gönderin:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Etkin işlerin listesini görüntüleyin ve iş için JOB_ID değerini not edin:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

Çıkış, aşağıdakine benzer şekilde görünür:

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Tarayıcınızda aşağıdaki URL'yi açarak iş çıkışını görüntüleyin. [JOB_ID] kısmını önceki adımda not edilen değerle değiştirin.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. Çıkış şuna benzer:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Dataproc'ta çalışan Spark Streaming işi, Pub/Sub'dan mesajları çeker, işler ve çıkışı konsolda gösterir.

  1. İşi sonlandırma: İşi sonlandırmak için aşağıdaki komutu çalıştırın. JOB_ID değerini, daha önce not ettiğimiz değerle değiştirin.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Tebrikler! Pub/Sub'daki veritabanı değişikliklerini yakalayan ve Cloud Dataproc'ta çalışan Spark Streaming'i kullanarak bu değişiklikleri işleyen güçlü bir CDC ardışık düzeni oluşturdunuz.

7. Temizleme

Gelecekte bu kaynaklar için faturalandırılmamak üzere oluşturduğunuz tüm kaynakları temizleyin. Faturalandırılmanın önüne geçmenin en kolay yolu, eğitim için oluşturduğunuz projeyi silmektir. Alternatif olarak, kaynakları tek tek silebilirsiniz.

Tek tek kaynakları silmek için aşağıdaki komutları çalıştırın.

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Tebrikler

Tebrikler. Google Cloud Platform'u kullanarak nasıl sağlam bir gerçek zamanlı veri ardışık düzeni oluşturacağınızı gösteren uygulamalı bir codelab'i tamamladınız. Başarılarınızı özetleyelim:

  • Simüle Edilmiş Değişiklik Verisi Yakalama (CDC): CDC'nin temellerini öğrendiniz ve veritabanı değişikliklerini simüle etmek için bir Python komut dosyası uygulayarak gerçek zamanlı veri değişikliklerini temsil eden etkinlikler oluşturdunuz.
  • Cloud Pub/Sub'dan yararlanma: Cloud Pub/Sub konuları ve abonelikleri oluşturarak CDC etkinliklerinizi yayınlamak için ölçeklenebilir ve güvenilir bir mesajlaşma hizmeti sağlarsınız. Simüle edilmiş veritabanı değişikliklerinizi Pub/Sub'da yayınlayarak gerçek zamanlı bir veri akışı oluşturdunuz.
  • Dataproc ve Spark ile İşlenen Veriler: Bir Dataproc kümesi sağladınız ve Pub/Sub aboneliğinizdeki mesajları kullanmak için bir Spark Streaming işi dağıttınız. Gelen CDC etkinliklerini gerçek zamanlı olarak işleyip dönüştürdünüz ve sonuçları Dataproc iş günlüklerinizde gösterdiniz.
  • Uçtan Uca Gerçek Zamanlı Bir Ardışık Düzen Oluşturma: Veri değişikliklerini gerçek zamanlı olarak yakalayan, aktaran ve işleyen eksiksiz bir veri ardışık düzeni oluşturmak için bu hizmetleri başarıyla entegre ettiniz. Sürekli veri akışlarını işleyebilen bir sistem oluşturma konusunda pratik deneyim kazandınız.
  • Spark Pub/Sub bağlayıcısını kullandıysanız: Spark Structured Streaming'in Pub/Sub'dan veri okuması için kritik öneme sahip olan Spark Pub/Sub bağlayıcısını kullanmak üzere bir Dataproc kümesini başarıyla yapılandırdınız.

Artık gerçek zamanlı analiz, veri ambarı ve mikro hizmet mimarileri gibi çeşitli uygulamalar için daha karmaşık ve gelişmiş veri ardışık düzenleri oluşturmak üzere sağlam bir temeliniz var. Keşfetmeye ve geliştirmeye devam edin.

Referans belgeler