Xây dựng tính năng Thu thập dữ liệu thay đổi bằng Dataproc và Cloud Pub/Sub

1. Giới thiệu

df8070bd84336207.png

Lần cập nhật gần đây nhất: Ngày 19 tháng 6 năm 2025

Tính năng Ghi nhận dữ liệu thay đổi là gì?

Change Data Capture (CDC) là một tập hợp các mẫu thiết kế phần mềm được dùng để xác định và theo dõi dữ liệu đã thay đổi trong cơ sở dữ liệu. Nói một cách đơn giản, đây là một cách để ghi lại và lưu giữ những thay đổi đối với dữ liệu để những thay đổi đó có thể được sao chép sang các hệ thống khác.

Tính năng Change Data Capture (CDC) (Ghi nhận thay đổi dữ liệu) cực kỳ hữu ích trong nhiều trường hợp dựa trên dữ liệu như Di chuyển dữ liệu, Kho dữ liệu và Phân tích theo thời gian thực, Khôi phục sau thảm hoạ và Khả năng hoạt động cao, Kiểm tra và Tuân thủ, v.v.

Di chuyển dữ liệu

CDC giúp đơn giản hoá các dự án di chuyển dữ liệu bằng cách cho phép chuyển dữ liệu gia tăng, giảm thời gian ngừng hoạt động và giảm thiểu sự gián đoạn.

Lưu trữ dữ liệu và phân tích theo thời gian thực

CDC đảm bảo rằng các kho dữ liệu và hệ thống phân tích luôn được cập nhật những thay đổi mới nhất từ cơ sở dữ liệu hoạt động.

Nhờ đó, các doanh nghiệp có thể đưa ra quyết định dựa trên thông tin theo thời gian thực.

Phục hồi sau thảm hoạ và khả năng đáp ứng cao

CDC cho phép sao chép dữ liệu theo thời gian thực vào các cơ sở dữ liệu phụ nhằm mục đích phục hồi sau thảm hoạ. Trong trường hợp xảy ra lỗi, CDC cho phép chuyển đổi dự phòng nhanh chóng sang cơ sở dữ liệu phụ, giảm thiểu thời gian ngừng hoạt động và mất dữ liệu.

Kiểm tra và tuân thủ

CDC cung cấp nhật ký kiểm tra chi tiết về các thay đổi dữ liệu, điều này rất cần thiết để tuân thủ các yêu cầu theo quy định.

Sản phẩm bạn sẽ tạo ra

Trong lớp học lập trình này, bạn sẽ xây dựng một quy trình dữ liệu ghi nhận thay đổi dữ liệu (CDC) bằng cách sử dụng Cloud Pub/Sub, Dataproc, Python và Apache Spark. Quy trình của bạn sẽ:

  • Mô phỏng các thay đổi về cơ sở dữ liệu và xuất bản các thay đổi đó dưới dạng sự kiện lên Cloud Pub/Sub, một dịch vụ nhắn tin có khả năng mở rộng và đáng tin cậy.
  • Tận dụng sức mạnh của Dataproc (dịch vụ Spark và Hadoop được quản lý của Google Cloud) để xử lý các sự kiện này theo thời gian thực.

Bằng cách kết nối các dịch vụ này, bạn sẽ tạo ra một quy trình mạnh mẽ có khả năng ghi lại và xử lý các thay đổi về dữ liệu khi chúng xảy ra, cung cấp nền tảng cho hoạt động phân tích theo thời gian thực, kho dữ liệu và các ứng dụng quan trọng khác.

Kiến thức bạn sẽ học được

  • Cách tạo quy trình cơ bản để ghi lại dữ liệu thay đổi
  • Dataproc để xử lý theo luồng
  • Cloud Pub/Sub để nhắn tin theo thời gian thực
  • Kiến thức cơ bản về Apache Spark

Lớp học lập trình này tập trung vào Dataproc và Cloud Pub/Sub. Các khái niệm và khối mã không liên quan được tinh chỉnh và cung cấp cho bạn, chỉ cần sao chép và dán.

Bạn cần có

  • một tài khoản GCP đang hoạt động có dự án được thiết lập. Nếu chưa có tài khoản, bạn có thể đăng ký dùng thử miễn phí.
  • Đã cài đặt và định cấu hình gcloud CLI.
  • Đã cài đặt Python 3.7 trở lên để mô phỏng các thay đổi về cơ sở dữ liệu và tương tác với Pub/Sub.
  • Kiến thức cơ bản về Dataproc, Cloud Pub/Sub, Apache Spark và Python.

Trước khi bạn bắt đầu

Thực thi lệnh sau trong dòng lệnh để bật các API bắt buộc:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Thiết lập Cloud Pub/Sub

Tạo một chủ đề

Chủ đề này sẽ được dùng để xuất bản các thay đổi về cơ sở dữ liệu. Tác vụ Dataproc sẽ là đối tượng sử dụng các thông báo này và sẽ xử lý các thông báo để ghi lại dữ liệu thay đổi. Nếu muốn biết thêm về các chủ đề, bạn có thể đọc tài liệu chính thức tại đây.

gcloud pubsub topics create database-changes

Tạo gói thuê bao

Tạo gói thuê bao sẽ được dùng để tiêu thụ các thông báo trong Pub/Sub. Để biết thêm về các gói thuê bao, bạn có thể đọc tài liệu chính thức tại đây.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. Mô phỏng các thay đổi về cơ sở dữ liệu

Các bước

  1. Tạo một tập lệnh Python (ví dụ: simulate_cdc.py) để mô phỏng các thay đổi đối với cơ sở dữ liệu và xuất bản các thay đổi đó lên Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

Thay YOUR_PROJECT_ID bằng mã dự án thực tế của bạn trên GCP

  1. Cài đặt thư viện ứng dụng Pub/Sub:
pip install google-cloud-pubsub
  1. Chạy tập lệnh trên cửa sổ dòng lệnh. Tập lệnh này sẽ chạy liên tục và xuất bản thông báo sau mỗi 2 giây vào chủ đề Pub/Sub.
python simulate_cdc.py
  1. Sau khi chạy tập lệnh trong khoảng 1 phút, bạn sẽ có đủ thông báo trong Pub/Sub để sử dụng. Bạn có thể kết thúc tập lệnh python đang chạy bằng cách nhấn tổ hợp phím ctrl + C hoặc Cmd + C, tuỳ thuộc vào hệ điều hành của bạn.
  2. Xem tin nhắn đã xuất bản:

Mở một cửa sổ dòng lệnh khác rồi chạy lệnh sau để xem các thông báo đã xuất bản:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

Bạn sẽ thấy một hàng trong bảng chứa thông báo và các trường khác:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

Giải thích

  • Tập lệnh Python mô phỏng các thay đổi đối với cơ sở dữ liệu bằng cách tạo ngẫu nhiên các sự kiện INSERT, UPDATE hoặc DELETE.
  • Mỗi thay đổi được biểu thị dưới dạng một đối tượng JSON chứa loại thay đổi, mã nhận dạng bản ghi, dấu thời gian và dữ liệu.
  • Tập lệnh này sử dụng thư viện ứng dụng Cloud Pub/Sub để xuất bản các sự kiện thay đổi này vào chủ đề database-changes.
  • Lệnh subscriber cho phép bạn xem những thông báo đang được gửi đến chủ đề pub/sub.

4. Tạo tài khoản dịch vụ cho Dataproc

Trong phần này, bạn sẽ tạo một Tài khoản dịch vụ mà cụm Dataproc có thể sử dụng. Bạn cũng chỉ định các quyền cần thiết để cho phép các phiên bản cụm truy cập vào Cloud Pub/Sub và Dataproc.

  1. Tạo tài khoản dịch vụ:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Thêm vai trò worker Dataproc để cho phép tài khoản dịch vụ tạo các cụm và chạy các công việc. Thêm mã nhận dạng tài khoản dịch vụ được tạo trong lệnh trước đó làm thành viên trong lệnh bên dưới:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Thêm vai trò người đăng ký Pub/Sub để cho phép tài khoản dịch vụ đăng ký gói thuê bao Pub/Sub "change-subscriber":
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Tạo một Cụm Dataproc

Cụm Dataproc sẽ chạy ứng dụng spark để xử lý các thông báo trong Pub/Sub. Bạn sẽ cần tài khoản dịch vụ đã tạo trong phần trước. Dataproc chỉ định tài khoản dịch vụ này cho mọi phiên bản trong cụm để tất cả các phiên bản đều có quyền phù hợp để chạy ứng dụng.

Sử dụng lệnh sau để tạo một cụm Dataproc:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Gửi Spark Job đến Cụm Dataproc

Ứng dụng truyền phát trực tuyến Spark xử lý các thông báo thay đổi cơ sở dữ liệu trong Pub/Sub và in các thông báo đó ra bảng điều khiển.

Các bước

  1. Tạo một thư mục và thêm mã nguồn của người dùng vào tệp PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. Tạo và thêm nội dung sau vào pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. Thay đổi thành thư mục spark của dự án và lưu đường dẫn trong một biến môi trường để sử dụng sau này:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. Thay đổi thư mục:
cd $REPO_ROOT/spark
  1. Tải Java 1.8 xuống và đặt thư mục này vào /usr/lib/jvm/. Sau đó, hãy thay đổi JAVA_HOME để trỏ đến thư mục này:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. Tạo tệp jar ứng dụng
mvn clean package

Tệp lưu trữ spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar chứa mã xử lý ứng dụng và các phần phụ thuộc được tạo trong thư mục spark/target

  1. Gửi ứng dụng spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. Hiển thị danh sách các công việc đang hoạt động và lưu ý giá trị JOB_ID cho công việc:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

Kết quả sẽ có dạng tương tự như

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. Xem đầu ra của công việc bằng cách mở URL sau trong trình duyệt. Thay thế [JOB_ID] bằng giá trị bạn đã ghi lại ở bước trước.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. Kết quả sẽ tương tự như sau:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Tác vụ truyền trực tuyến Spark chạy trong Dataproc sẽ kéo các thông báo từ Pub/Sub, xử lý các thông báo đó và hiển thị đầu ra trên bảng điều khiển.

  1. Kết thúc công việc: Chạy lệnh sau để kết thúc công việc. Thay thế JOB_ID bằng mã nhận dạng mà chúng ta đã ghi nhận trước đó
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

Xin chúc mừng! Bạn vừa tạo một quy trình CDC mạnh mẽ, có thể ghi lại các thay đổi về cơ sở dữ liệu trong Pub/Sub và xử lý các thay đổi đó bằng cách sử dụng tính năng truyền trực tuyến Spark chạy trong Cloud Dataproc.

7. Dọn dẹp

Dọn dẹp mọi tài nguyên bạn đã tạo để bạn không bị tính phí cho những tài nguyên đó trong tương lai. Cách dễ nhất để loại bỏ phí thanh toán là xoá dự án mà bạn đã tạo cho hướng dẫn này. Ngoài ra, bạn có thể xoá từng tài nguyên.

Chạy các lệnh sau để xoá từng tài nguyên

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. Xin chúc mừng

Chúc mừng bạn đã hoàn thành lớp học lập trình thực hành này. Lớp học này minh hoạ cách tạo một quy trình dữ liệu mạnh mẽ theo thời gian thực bằng Google Cloud Platform. Hãy cùng tóm tắt những thành tích bạn đã đạt được:

  • Mô phỏng tính năng ghi nhận dữ liệu thay đổi (CDC): Bạn đã tìm hiểu những kiến thức cơ bản về CDC và triển khai một tập lệnh Python để mô phỏng các thay đổi trong cơ sở dữ liệu, tạo ra các sự kiện thể hiện những điểm sửa đổi dữ liệu theo thời gian thực.
  • Tận dụng Cloud Pub/Sub: Bạn thiết lập các chủ đề và gói thuê bao Cloud Pub/Sub, cung cấp dịch vụ nhắn tin có thể mở rộng và đáng tin cậy để truyền phát các sự kiện CDC. Bạn đã xuất bản các thay đổi đối với cơ sở dữ liệu mô phỏng lên Pub/Sub, tạo ra một luồng dữ liệu theo thời gian thực.
  • Dữ liệu đã xử lý bằng Dataproc và Spark: Bạn đã cung cấp một cụm Dataproc và triển khai một công việc Truyền phát trực tiếp Spark để sử dụng các thông báo từ gói thuê bao Pub/Sub. Bạn đã xử lý và chuyển đổi các sự kiện CDC đến theo thời gian thực, hiển thị kết quả trong nhật ký công việc Dataproc.
  • Xây dựng một quy trình theo thời gian thực từ đầu đến cuối: Bạn đã tích hợp thành công các dịch vụ này để tạo một quy trình dữ liệu hoàn chỉnh, có thể ghi nhận, truyền trực tuyến và xử lý các thay đổi về dữ liệu theo thời gian thực. Bạn đã có được kinh nghiệm thực tế trong việc xây dựng một hệ thống có thể xử lý các luồng dữ liệu liên tục.
  • Đã sử dụng Trình kết nối Spark Pub/Sub: Bạn đã định cấu hình thành công một cụm Dataproc để sử dụng trình kết nối Spark Pub/Sub. Đây là một bước quan trọng để Spark Structured Streaming đọc dữ liệu từ Pub/Sub.

Giờ đây, bạn đã có một nền tảng vững chắc để xây dựng các quy trình dữ liệu phức tạp và tinh vi hơn cho nhiều ứng dụng, bao gồm cả phân tích theo thời gian thực, kho dữ liệu và cấu trúc vi dịch vụ. Hãy tiếp tục khám phá và xây dựng!

Tài liệu tham khảo