การสร้างเครื่องมือบันทึกการเปลี่ยนแปลงโดยใช้ Dataproc และ Cloud Pub/Sub

1. บทนำ

df8070bd84336207.png

อัปเดตล่าสุด: 19-06-2025

การจับการเปลี่ยนแปลงข้อมูลคืออะไร

การจับการเปลี่ยนแปลงข้อมูล (CDC) คือชุดรูปแบบการออกแบบซอฟต์แวร์ที่ใช้ในการระบุและติดตามข้อมูลที่มีการเปลี่ยนแปลงในฐานข้อมูล กล่าวอย่างง่ายๆ คือ เป็นวิธีจับภาพและบันทึกการเปลี่ยนแปลงที่เกิดขึ้นกับข้อมูลเพื่อให้สามารถจำลองการเปลี่ยนแปลงเหล่านั้นไปยังระบบอื่นๆ ได้

การจับการเปลี่ยนแปลงข้อมูล (CDC) มีประโยชน์อย่างยิ่งในสถานการณ์ที่ขับเคลื่อนด้วยข้อมูลที่หลากหลาย เช่น การย้ายข้อมูล คลังข้อมูลและการวิเคราะห์แบบเรียลไทม์ การกู้คืนจากภัยพิบัติและความพร้อมใช้งานสูง การตรวจสอบและการปฏิบัติตามข้อกำหนด ฯลฯ

การย้ายข้อมูล

CDC ช่วยลดความซับซ้อนของโปรเจ็กต์การย้ายข้อมูลด้วยการอนุญาตให้โอนข้อมูลแบบเพิ่มทีละรายการ ซึ่งจะช่วยลดช่วงหยุดทำงานและลดการหยุดชะงัก

คลังข้อมูลและการวิเคราะห์แบบเรียลไทม์

CDC ช่วยให้มั่นใจได้ว่าคลังข้อมูลและระบบวิเคราะห์จะได้รับการอัปเดตการเปลี่ยนแปลงล่าสุดจากฐานข้อมูลการดำเนินงานอยู่เสมอ

ซึ่งช่วยให้ธุรกิจตัดสินใจโดยอิงตามข้อมูลแบบเรียลไทม์ได้

การกู้ข้อมูลคืนหลังจากภัยพิบัติและความพร้อมใช้งานสูง

CDC ช่วยให้จำลองข้อมูลแบบเรียลไทม์ไปยังฐานข้อมูลสำรองเพื่อวัตถุประสงค์ในการกู้คืนข้อมูลหลังเกิดภัยพิบัติได้ ในกรณีที่เกิดข้อผิดพลาด CDC ช่วยให้สามารถเฟลโอเวอร์ไปยังฐานข้อมูลรองได้อย่างรวดเร็ว ซึ่งจะช่วยลดเวลาหยุดทำงานและการสูญเสียข้อมูล

การตรวจสอบและการปฏิบัติตามข้อกำหนด

CDC จะแสดงบันทึกการตรวจสอบโดยละเอียดของการเปลี่ยนแปลงข้อมูล ซึ่งจำเป็นต่อการปฏิบัติตามข้อกำหนดด้านกฎระเบียบ

สิ่งที่คุณจะสร้าง

ในโค้ดแล็บนี้ คุณจะได้สร้างไปป์ไลน์ข้อมูลการจับการเปลี่ยนแปลงข้อมูล (CDC) โดยใช้ Cloud Pub/Sub, Dataproc, Python และ Apache Spark ไปป์ไลน์จะทำสิ่งต่อไปนี้

  • จำลองการเปลี่ยนแปลงฐานข้อมูลและเผยแพร่เป็นเหตุการณ์ไปยัง Cloud Pub/Sub ซึ่งเป็นบริการรับส่งข้อความที่เชื่อถือได้และปรับขนาดได้
  • ใช้ประโยชน์จากความสามารถของ Dataproc ซึ่งเป็นบริการ Spark และ Hadoop ที่มีการจัดการของ Google Cloud เพื่อประมวลผลเหตุการณ์เหล่านี้แบบเรียลไทม์

การเชื่อมต่อบริการเหล่านี้จะช่วยสร้างไปป์ไลน์ที่มีประสิทธิภาพซึ่งสามารถบันทึกและประมวลผลการเปลี่ยนแปลงข้อมูลได้ทันทีที่เกิดขึ้น ซึ่งเป็นรากฐานสำหรับการวิเคราะห์แบบเรียลไทม์ คลังข้อมูล และแอปพลิเคชันที่สำคัญอื่นๆ

สิ่งที่คุณจะได้เรียนรู้

  • วิธีสร้างไปป์ไลน์การจับการเปลี่ยนแปลงข้อมูลพื้นฐาน
  • Dataproc สำหรับการประมวลผลสตรีม
  • Cloud Pub/Sub สำหรับการรับส่งข้อความแบบเรียลไทม์
  • พื้นฐานของ Apache Spark

Codelab นี้มุ่งเน้นที่ Dataproc และ Cloud Pub/Sub เราจะข้ามแนวคิดและบล็อกโค้ดที่ไม่เกี่ยวข้องไป และจะให้คุณคัดลอกและวางได้ง่ายๆ

สิ่งที่คุณต้องมี

  • บัญชี GCP ที่ใช้งานอยู่ซึ่งมีการตั้งค่าโปรเจ็กต์ หากยังไม่มีบัญชี คุณสามารถลงชื่อสมัครใช้เพื่อทดลองใช้ฟรีได้
  • ติดตั้งและกำหนดค่า gcloud CLI แล้ว
  • ติดตั้ง Python 3.7 ขึ้นไปเพื่อจำลองการเปลี่ยนแปลงฐานข้อมูลและโต้ตอบกับ Pub/Sub
  • ความรู้พื้นฐานเกี่ยวกับ Dataproc, Cloud Pub/Sub, Apache Spark และ Python

ก่อนจะเริ่มต้น

เรียกใช้คำสั่งต่อไปนี้ในเทอร์มินัลเพื่อเปิดใช้ API ที่จำเป็น

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. ตั้งค่า Cloud Pub/Sub

สร้างหัวข้อ

ระบบจะใช้หัวข้อนี้เพื่อเผยแพร่การเปลี่ยนแปลงฐานข้อมูล งาน Dataproc จะเป็นผู้ใช้ข้อความเหล่านี้และจะประมวลผลข้อความสำหรับการจับภาพการเปลี่ยนแปลงข้อมูล หากต้องการทราบข้อมูลเพิ่มเติมเกี่ยวกับ Topics โปรดอ่านเอกสารประกอบอย่างเป็นทางการที่นี่

gcloud pubsub topics create database-changes

สร้างการสมัครใช้บริการ

สร้างการสมัครใช้บริการที่จะใช้เพื่อใช้ข้อความใน Pub/Sub หากต้องการทราบข้อมูลเพิ่มเติมเกี่ยวกับการสมัครใช้บริการ โปรดอ่านเอกสารอย่างเป็นทางการที่นี่

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. จำลองการเปลี่ยนแปลงฐานข้อมูล

ขั้นตอน

  1. สร้างสคริปต์ Python (เช่น simulate_cdc.py) เพื่อจำลองการเปลี่ยนแปลงฐานข้อมูลและเผยแพร่ไปยัง Pub/Sub
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

แทนที่ YOUR_PROJECT_ID ด้วยรหัสโปรเจ็กต์ GCP จริง

  1. ติดตั้งไลบรารีของไคลเอ็นต์ Pub/Sub โดยทำดังนี้
pip install google-cloud-pubsub
  1. เรียกใช้สคริปต์ในเทอร์มินัล สคริปต์นี้จะทำงานอย่างต่อเนื่องและเผยแพร่ข้อความทุกๆ 2 วินาทีไปยังหัวข้อ Pub/Sub
python simulate_cdc.py
  1. หลังจากเรียกใช้สคริปต์เป็นเวลา 1 นาที คุณจะมีข้อความเพียงพอใน Pub/Sub ที่จะใช้ คุณสามารถสิ้นสุดสคริปต์ Python ที่กำลังทำงานได้โดยกด Ctrl + C หรือ Cmd + C ทั้งนี้ขึ้นอยู่กับระบบปฏิบัติการ
  2. ดูข้อความที่เผยแพร่แล้ว

เปิดเทอร์มินัลอีกหน้าต่าง แล้วเรียกใช้คำสั่งต่อไปนี้เพื่อดูข้อความที่เผยแพร่

gcloud pubsub subscriptions pull --auto-ack change-subscriber

คุณควรเห็นแถวตารางที่มีข้อความและช่องอื่นๆ ดังนี้

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

คำอธิบาย

  • สคริปต์ Python จะจำลองการเปลี่ยนแปลงฐานข้อมูลโดยสร้างเหตุการณ์ INSERT, UPDATE หรือ DELETE แบบสุ่ม
  • การเปลี่ยนแปลงแต่ละรายการจะแสดงเป็นออบเจ็กต์ JSON ที่มีประเภทการเปลี่ยนแปลง รหัสระเบียน การประทับเวลา และข้อมูล
  • สคริปต์ใช้ไลบรารีไคลเอ็นต์ Cloud Pub/Sub เพื่อเผยแพร่เหตุการณ์การเปลี่ยนแปลงเหล่านี้ไปยังหัวข้อ database-changes
  • คำสั่งผู้สมัครรับข้อมูลช่วยให้คุณดูข้อความที่ส่งไปยังหัวข้อ Pub/Sub ได้

4. สร้างบัญชีบริการสำหรับ Dataproc

ในส่วนนี้ คุณจะได้สร้างบัญชีบริการที่คลัสเตอร์ Dataproc ใช้ได้ นอกจากนี้ คุณยังต้องกำหนดสิทธิ์ที่จำเป็นเพื่ออนุญาตให้อินสแตนซ์คลัสเตอร์เข้าถึง Cloud Pub/Sub และ Dataproc ด้วย

  1. สร้างบัญชีบริการ
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. เพิ่มบทบาทผู้ปฏิบัติงาน Dataproc เพื่ออนุญาตให้บัญชีบริการสร้างคลัสเตอร์และเรียกใช้งาน เพิ่มรหัสบัญชีบริการที่สร้างขึ้นในคำสั่งก่อนหน้าเป็นสมาชิกในคำสั่งด้านล่าง
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. เพิ่มบทบาทผู้ใช้บริการ Pub/Sub เพื่ออนุญาตให้บัญชีบริการสมัครใช้บริการ Pub/Sub "change-subscriber" โดยทำดังนี้
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. สร้างคลัสเตอร์ Dataproc

คลัสเตอร์ Dataproc จะเรียกใช้แอป Spark ซึ่งจะประมวลผลข้อความใน Pub/Sub คุณจะต้องมีบัญชีบริการที่สร้างขึ้นในส่วนก่อนหน้า Dataproc จะกำหนดบัญชีบริการนี้ให้กับทุกอินสแตนซ์ในคลัสเตอร์เพื่อให้ทุกอินสแตนซ์ได้รับสิทธิ์ที่ถูกต้องในการเรียกใช้แอป

ใช้คำสั่งต่อไปนี้เพื่อสร้างคลัสเตอร์ Dataproc

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. ส่งงาน Spark ไปยังคลัสเตอร์ Dataproc

แอป Spark Streaming จะประมวลผลข้อความการเปลี่ยนแปลงฐานข้อมูลใน Pub/Sub และพิมพ์ข้อความเหล่านั้นไปยังคอนโซล

ขั้นตอน

  1. สร้างไดเรกทอรีและเพิ่มซอร์สโค้ดของผู้ใช้ไปยังไฟล์ PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. สร้างและเพิ่มโค้ดต่อไปนี้ลงใน pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. เปลี่ยนเป็นไดเรกทอรี Spark ของโปรเจ็กต์และบันทึกเส้นทางในตัวแปรสภาพแวดล้อมเพื่อใช้ในภายหลัง
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. เปลี่ยนไดเรกทอรี
cd $REPO_ROOT/spark
  1. ดาวน์โหลด Java 1.8 แล้ววางโฟลเดอร์ไว้ใน /usr/lib/jvm/ จากนั้นเปลี่ยน JAVA_HOME ให้ชี้ไปที่
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. สร้าง JAR ของแอปพลิเคชัน
mvn clean package

ระบบจะสร้างที่เก็บถาวร spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar ที่มีโค้ดแอปพลิเคชันและการอ้างอิงในไดเรกทอรี spark/target

  1. ส่งแอปพลิเคชัน Spark
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. แสดงรายการงานที่ใช้งานอยู่และจดค่า JOB_ID สำหรับงาน
gcloud dataproc jobs list --region=us-central1 --state-filter=active

เอาต์พุตจะมีลักษณะคล้ายกับ

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. ดูเอาต์พุตของงานโดยเปิด URL ต่อไปนี้ในเบราว์เซอร์ แทนที่ [JOB_ID] ด้วยค่าที่จดไว้ในขั้นตอนก่อนหน้า
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. เอาต์พุตจะคล้ายกับตัวอย่างต่อไปนี้
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

งาน Spark Streaming ที่ทำงานใน Dataproc จะดึงข้อความจาก Pub/Sub ประมวลผลข้อความ และแสดงเอาต์พุตในคอนโซล

  1. การสิ้นสุดงาน: เรียกใช้คำสั่งต่อไปนี้เพื่อสิ้นสุดงาน แทนที่ JOB_ID ด้วยรหัสเดียวกันกับที่เราจดไว้ก่อนหน้านี้
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

ยินดีด้วย คุณเพิ่งสร้างไปป์ไลน์ CDC ที่มีประสิทธิภาพซึ่งบันทึกการเปลี่ยนแปลงฐานข้อมูลใน Pub/Sub และประมวลผลโดยใช้ Spark Streaming ที่ทำงานใน Cloud Dataproc

7. ล้างข้อมูล

ล้างข้อมูลทรัพยากรที่คุณสร้างขึ้นเพื่อไม่ให้ระบบเรียกเก็บเงินจากคุณในอนาคต วิธีที่ง่ายที่สุดในการยกเลิกการเรียกเก็บเงินคือการลบโปรเจ็กต์ที่คุณสร้างขึ้นสำหรับบทแนะนำ หรือจะลบทรัพยากรแต่ละรายการก็ได้

เรียกใช้คำสั่งต่อไปนี้เพื่อลบทรัพยากรแต่ละรายการ

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. ขอแสดงความยินดี

ขอแสดงความยินดี คุณเพิ่งทำ Codelab แบบลงมือปฏิบัติที่แสดงวิธีสร้างไปป์ไลน์ข้อมูลแบบเรียลไทม์ที่แข็งแกร่งโดยใช้ Google Cloud Platform เสร็จสมบูรณ์ มาสรุปสิ่งที่คุณทำสำเร็จกัน

  • การดักจับการเปลี่ยนแปลงข้อมูล (CDC) แบบจำลอง: คุณได้เรียนรู้พื้นฐานของ CDC และใช้สคริปต์ Python เพื่อจำลองการเปลี่ยนแปลงฐานข้อมูล ซึ่งสร้างเหตุการณ์ที่แสดงถึงการแก้ไขข้อมูลแบบเรียลไทม์
  • ใช้ประโยชน์จาก Cloud Pub/Sub: คุณตั้งค่าหัวข้อและการสมัครใช้บริการ Cloud Pub/Sub เพื่อให้บริการรับส่งข้อความที่เชื่อถือได้และปรับขนาดได้สำหรับการสตรีมเหตุการณ์ CDC คุณได้เผยแพร่การเปลี่ยนแปลงฐานข้อมูลจำลองไปยัง Pub/Sub ซึ่งเป็นการสร้างสตรีมข้อมูลแบบเรียลไทม์
  • ประมวลผลข้อมูลด้วย Dataproc และ Spark: คุณจัดสรรคลัสเตอร์ Dataproc และทำให้ใช้งานได้กับงาน Spark Streaming เพื่อใช้ข้อความจากการสมัครรับข้อมูล Pub/Sub คุณประมวลผลและแปลงเหตุการณ์ CDC ที่เข้ามาแบบเรียลไทม์ โดยแสดงผลลัพธ์ในบันทึกของงาน Dataproc
  • สร้างไปป์ไลน์แบบเรียลไทม์ตั้งแต่ต้นจนจบ: คุณผสานรวมบริการเหล่านี้เรียบร้อยแล้วเพื่อสร้างไปป์ไลน์ข้อมูลที่สมบูรณ์ซึ่งจะบันทึก สตรีม และประมวลผลการเปลี่ยนแปลงข้อมูลแบบเรียลไทม์ คุณได้รับประสบการณ์จริงในการสร้างระบบที่จัดการสตรีมข้อมูลต่อเนื่องได้
  • ใช้เครื่องมือเชื่อมต่อ Spark Pub/Sub: คุณกำหนดค่าคลัสเตอร์ Dataproc ให้ใช้เครื่องมือเชื่อมต่อ Spark Pub/Sub ได้สำเร็จ ซึ่งเป็นสิ่งสำคัญสำหรับ Spark Structured Streaming ในการอ่านข้อมูลจาก Pub/Sub

ตอนนี้คุณมีรากฐานที่มั่นคงสำหรับการสร้างไปป์ไลน์ข้อมูลที่ซับซ้อนและซับซ้อนยิ่งขึ้นสําหรับแอปพลิเคชันต่างๆ ซึ่งรวมถึงการวิเคราะห์แบบเรียลไทม์ คลังข้อมูล และสถาปัตยกรรม Microservice ขอให้สนุกกับการสำรวจและสร้างสรรค์

เอกสารอ้างอิง