1. מבוא

העדכון האחרון: 19 ביוני 2025
מה זה סימון נתונים שהשתנו (CDC)?
סימון נתונים שהשתנו (CDC) הוא קבוצה של דפוסי עיצוב תוכנה שמשמשים לקביעה ולמעקב של נתונים שהשתנו במסד נתונים. במילים פשוטות, זו דרך לתעד ולשמור שינויים שנעשו בנתונים, כך שאפשר יהיה לשכפל את השינויים האלה למערכות אחרות.
סימון נתונים שהשתנו (CDC) הוא שימושי מאוד במגוון רחב של תרחישים מבוססי-נתונים, כמו העברת נתונים, מחסני נתונים וניתוחים בזמן אמת, התאוששות מאסון וזמינות גבוהה, ביקורת ותאימות ועוד.
העברת נתונים
CDC מפשטת פרויקטים של העברת נתונים בכך שהיא מאפשרת העברת נתונים מצטברת, מקטינה את זמן ההשבתה וממזערת את השיבושים.
מחסני נתונים וניתוח בזמן אמת
CDC מבטיח שמחסני נתונים ומערכות ניתוח יתעדכנו כל הזמן בשינויים האחרונים ממסדי נתונים תפעוליים.
כך עסקים יכולים לקבל החלטות על סמך מידע בזמן אמת.
התאוששות מאסון וזמינות גבוהה
התכונה CDC מאפשרת שכפול נתונים בזמן אמת למסדי נתונים משניים לצורך התאוששות מאסון. במקרה של כשל, ה-CDC מאפשר מעבר מהיר ליתירות כשל למסד נתונים משני, וכך מצמצם את זמן ההשבתה ואת אובדן הנתונים.
ביקורת ותאימות
CDC מספק נתיב ביקורת מפורט של שינויים בנתונים, שחיוני לצורך עמידה בדרישות התאימות והרגולציה.
מה תפַתחו
ב-Codelab הזה תבנו פייפליין לסימון נתונים שהשתנו (CDC) באמצעות Cloud Pub/Sub, Dataproc, Python ו-Apache Spark. צינור עיבוד הנתונים:
- הסימולציה של שינויים במסד הנתונים ופרסום שלהם כאירועים ב-Cloud Pub/Sub, שירות העברת הודעות אמין וניתן להרחבה.
- כדי לעבד את האירועים האלה בזמן אמת, אפשר להשתמש ב-Dataproc, שהוא שירות מנוהל של Spark ו-Hadoop ב-Google Cloud.
כשמקשרים בין השירותים האלה, נוצר צינור חזק שיכול לתעד ולעבד שינויים בנתונים בזמן שהם מתרחשים, וכך מספק בסיס לניתוח בזמן אמת, למחסן נתונים ולאפליקציות קריטיות אחרות.
מה תלמדו
- איך יוצרים צינור בסיסי של סימון נתונים שהשתנו (CDC)
- Dataproc לעיבוד זרמי נתונים
- Cloud Pub/Sub להעברת הודעות בזמן אמת
- היסודות של Apache Spark
ה-Codelab הזה מתמקד ב-Dataproc וב-Cloud Pub/Sub. מושגים ובלוקים של קוד שלא רלוונטיים מוצגים בקצרה, ואתם יכולים פשוט להעתיק ולהדביק אותם.
מה תצטרכו
- חשבון GCP פעיל עם פרויקט מוגדר. אם אין לכם חשבון, אתם יכולים להירשם לתקופת ניסיון בחינם.
- ה-CLI של gcloud מותקן ומוגדר.
- Python 3.7 ואילך מותקן כדי לדמות שינויים במסד הנתונים ולקיים אינטראקציה עם Pub/Sub.
- ידע בסיסי ב-Dataproc, ב-Cloud Pub/Sub, ב-Apache Spark וב-Python.
לפני שמתחילים
מריצים את הפקודה הבאה בטרמינל כדי להפעיל את ממשקי ה-API הנדרשים:
gcloud services enable \
dataproc.googleapis.com \
pubsub.googleapis.com \
2. הגדרה של Cloud Pub/Sub
יצירת נושא
הנושא הזה ישמש לפרסום השינויים במסד הנתונים. עבודת Dataproc תהיה הצרכן של ההודעות האלה, והיא תעבד את ההודעות לצורך לכידת נתונים לשינוי. אם רוצים לקבל מידע נוסף על נושאים, אפשר לקרוא את המסמכים הרשמיים כאן.
gcloud pubsub topics create database-changes
יצירת מינוי
יוצרים מינוי שישמש לצריכת ההודעות ב-Pub/Sub. מידע נוסף על מינויים זמין במאמרי העזרה הרשמיים כאן.
gcloud pubsub subscriptions create --topic database-changes change-subscriber
3. סימולציה של שינויים במסד הנתונים
שלבים
- יוצרים סקריפט Python (למשל,
simulate_cdc.py) כדי לדמות שינויים במסד הנתונים ולפרסם אותם ב-Pub/Sub.
from google.cloud import pubsub_v1
import json
import time
import random
project_id = "YOUR_PROJECT_ID" # Replace with your GCP project ID
topic_id = "database-changes"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message(data):
data_str = json.dumps(data).encode("utf-8")
future = publisher.publish(topic_path, data=data_str)
print(f"Published message ID: {future.result()}")
def simulate_change():
change_types = ["INSERT", "UPDATE", "DELETE"]
change_type = random.choice(change_types)
record_id = random.randint(1, 100)
timestamp = time.time()
change_data = {
"change_type": change_type,
"record_id": record_id,
"timestamp": timestamp,
"data": {"field1": "value1", "field2": "value2"}, #Place holder data.
}
publish_message(change_data)
if __name__ == "__main__":
while True:
simulate_change()
time.sleep(2) # Simulate changes every 2 seconds
מחליפים את YOUR_PROJECT_ID במזהה הפרויקט בפועל ב-GCP.
- מתקינים את ספריית הלקוח של Pub/Sub:
pip install google-cloud-pubsub
- מריצים את הסקריפט בטרמינל. הסקריפט הזה יפעל ברציפות ויפרסם הודעות כל 2 שניות בנושא Pub/Sub.
python simulate_cdc.py
- אחרי שמריצים את הסקריפט למשך דקה, למשל, יהיו מספיק הודעות ב-Pub/Sub לצריכה. כדי להפסיק את הרצת סקריפט Python, מקישים על Ctrl + C או על Cmd + C, בהתאם למערכת ההפעלה.
- הצגת ההודעות שפורסמו:
פותחים טרמינל נוסף ומריצים את הפקודה הבאה כדי לראות את ההודעות שפורסמו:
gcloud pubsub subscriptions pull --auto-ack change-subscriber
אמורה להופיע שורה בטבלה עם ההודעה ושדות אחרים:
{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}
הסבר
- סקריפט Python מדמה שינויים במסד הנתונים על ידי יצירה אקראית של אירועים מסוג
INSERT,UPDATEאוDELETE. - כל שינוי מיוצג כאובייקט JSON שמכיל את סוג השינוי, מזהה הרשומה, חותמת הזמן והנתונים.
- הסקריפט משתמש בספריית הלקוח של Cloud Pub/Sub כדי לפרסם את אירועי השינוי האלה בנושא
database-changes. - הפקודה subscriber מאפשרת לכם לראות את ההודעות שנשלחות לנושא ב-Pub/Sub.
4. יצירת חשבון שירות ל-Dataproc
בקטע הזה יוצרים חשבון שירות שאפשר להשתמש בו באשכול Dataproc. אתם גם מקצים את ההרשאות הנדרשות כדי לאפשר למופעי האשכול גישה ל-Cloud Pub/Sub ול-Dataproc.
- יוצרים חשבון שירות:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"
gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
- מוסיפים את תפקיד העובד של Dataproc כדי לאפשר לחשבון השירות ליצור אשכולות ולהריץ משימות. מוסיפים את מזהה חשבון השירות שנוצר בפקודה הקודמת כחבר בפקודה הבאה:
export PROJECT=$(gcloud info --format='value(config.project)')
gcloud projects add-iam-policy-binding $PROJECT \
--role roles/dataproc.worker \
--member="serviceAccount:<your-service-account-with-domain>"
- מוסיפים את התפקיד Pub/Sub Subscriber כדי לאפשר לחשבון השירות להירשם למינוי Pub/Sub change-subscriber:
gcloud beta pubsub subscriptions add-iam-policy-binding \
change-subscriber \
--role roles/pubsub.subscriber \
--member="serviceAccount:<your-service-account-with-domain"
5. יצירת אשכול Dataproc
אשכול Dataproc יריץ את אפליקציית Spark שתעבד את ההודעות ב-Pub/Sub. תצטרכו את חשבון השירות שיצרתם בקטע הקודם. Dataproc מקצה את חשבון השירות הזה לכל מופע באשכול, כדי שלכל המופעים יהיו ההרשאות הנכונות להרצת האפליקציה.
כדי ליצור אשכול Dataproc, משתמשים בפקודה הבאה:
gcloud dataproc clusters create cdc-dataproc-cluster \
--region=us-central1 \
--zone=us-central1-a \
--scopes=pubsub,datastore \
--image-version=1.3 \
--service-account="<your-service-account-with-domain-id>"
6. שליחת משימת Spark לאשכול Dataproc
אפליקציית הסטרימינג של Spark מעבדת את ההודעות על שינויים במסד הנתונים ב-Pub/Sub ומדפיסה אותן במסוף.
שלבים
- יוצרים ספרייה ומוסיפים את קוד המקור של הצרכן לקובץ PubsubConsumer.scala
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala
package demo
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PubsubConsumer {
def createContext(projectID: String, checkpointDirectory: String)
: StreamingContext = {
// [START stream_setup]
val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Set the checkpoint directory
val yarnTags = sparkConf.get("spark.yarn.tags")
val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
ssc.checkpoint(checkpointDirectory + '/' + jobId)
// Create stream
val messagesStream: DStream[String] = PubsubUtils
.createStream(
ssc,
projectID,
None,
"change-subscriber", // Cloud Pub/Sub subscription for incoming database updates
SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
.map(message => new String(message.getData(), StandardCharsets.UTF_8))
// [END stream_setup]
processStringDStream(messagesStream)
ssc
}
def processStringDStream(stringDStream: DStream[String]): Unit = {
stringDStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val listOfStrings: List[String] = rdd.collect().toList
listOfStrings.foreach(str => println(s"message received: $str"))
} else {
println("looking for message...")
}
}
}
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("arguments are not passed correctly!")
System.exit(1)
}
val Seq(projectID, checkpointDirectory) = args.toSeq
// Create Spark context
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(projectID, checkpointDirectory))
// Start streaming until we receive an explicit termination
ssc.start()
ssc.awaitTermination()
}
}
- יוצרים את הקוד הבא ומוסיפים אותו לקובץ pom.xml
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<jvm.options.xms>-Xms512M</jvm.options.xms>
<jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
<jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
<jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
</properties>
<groupId>dataproc-spark-demos</groupId>
<artifactId>spark-streaming-pubsub-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.34.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-pubsub_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_2.11</artifactId>
<version>1.14.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.PubsubConsumer</mainClass>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>com</pattern>
<shadedPattern>repackaged.com</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
- עוברים לספריית ה-Spark של הפרויקט ושומרים את הנתיב במשתנה סביבה לשימוש מאוחר יותר:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
- משנים את הספרייה:
cd $REPO_ROOT/spark
- מורידים את Java 1.8 וממקמים את התיקייה ב- /usr/lib/jvm/. לאחר מכן משנים את JAVA_HOME כך שיפנה ל:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
- יצירת קובץ JAR של האפליקציה
mvn clean package
הארכיון spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar שמכיל את קוד האפליקציה ואת התלויות נוצר בספרייה spark/target
- שולחים את אפליקציית Spark:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"
gcloud dataproc jobs submit spark \
--cluster cdc-dataproc-cluster \
--region us-central1 \
--async \
--jar target/$JAR \
--max-failures-per-hour 10 \
--properties $SPARK_PROPERTIES \
-- $ARGUMENTS
- מציגים את רשימת המשימות הפעילות ורושמים את הערך
JOB_IDשל המשימה:
gcloud dataproc jobs list --region=us-central1 --state-filter=active
הפלט ייראה בערך כך:
JOB_ID TYPE STATUS
473ecb6d14e2483cb88a18988a5b2e56 spark RUNNING
- כדי לראות את הפלט של העבודה, פותחים את כתובת ה-URL הבאה בדפדפן. מחליפים את [JOB_ID] בערך שרשמתם בשלב הקודם.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
- הפלט אמור להיראות כך:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
משימת הסטרימינג של Spark שפועלת ב-Dataproc שולפת הודעות מ-Pub/Sub, מעבדת אותן ומציגה את הפלט במסוף.
- סיום העבודה: מריצים את הפקודה הבאה כדי לסיים את העבודה. מחליפים את JOB_ID במזהה שרשמנו קודם.
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet
מזל טוב! הרגע יצרתם צינור CDC יעיל שמתעד את השינויים במסד הנתונים ב-Pub/Sub ומעבד אותם באמצעות Spark Streaming שפועל ב-Cloud Dataproc.
7. הסרת המשאבים
חשוב לפנות את המשאבים שיצרתם כדי שלא תחויבו עליהם בעתיד. הדרך הקלה ביותר לבטל את החיוב היא למחוק את הפרויקט שיצרתם בשביל המדריך. אפשר גם למחוק משאבים ספציפיים.
מריצים את הפקודות הבאות כדי למחוק משאבים ספציפיים
gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet
8. מזל טוב
סיימתם בהצלחה Codelab מעשי שמדגים איך ליצור פייפליין נתונים חזק בזמן אמת באמצעות Google Cloud Platform. בואו נסכם את מה שהשגתם:
- סימול של סימון נתונים שהשתנו (CDC): למדתם את היסודות של CDC והטמעתם סקריפט Python כדי לדמות שינויים במסד נתונים, וכך ליצור אירועים שמייצגים שינויים בנתונים בזמן אמת.
- השתמשתם ב-Cloud Pub/Sub: הגדרתם נושאים ומינויים ב-Cloud Pub/Sub, וכך קיבלתם שירות העברת הודעות מהימן וניתן להרחבה להזרמת אירועי ה-CDC. פרסמתם את השינויים המדומים במסד הנתונים ב-Pub/Sub, ויצרתם מקור נתונים בזמן אמת.
- נתונים מעובדים באמצעות Dataproc ו-Spark: הקציתם אשכול Dataproc ופרסתם משימת Spark Streaming כדי לצרוך הודעות מהמינוי שלכם ל-Pub/Sub. עיבדתם והמרתם את אירועי ה-CDC הנכנסים בזמן אמת, והצגתם את התוצאות ביומני העבודות של Dataproc.
- בניתם פייפליין מקצה לקצה בזמן אמת: הצלחתם לשלב בין השירותים האלה כדי ליצור פייפליין מלא שמתעד, מעביר ומעבד שינויים בנתונים בזמן אמת. התנסות מעשית בבניית מערכת שיכולה לטפל במקורות נתונים רציפים.
- השתמשתם במחבר Spark Pub/Sub: הגדרתם בהצלחה אשכול Dataproc לשימוש במחבר Spark Pub/Sub, שהוא חיוני כדי ש-Spark Structured Streaming יוכל לקרוא נתונים מ-Pub/Sub.
עכשיו יש לכם בסיס מוצק לבניית צינורות נתונים מורכבים ומתקדמים יותר עבור אפליקציות שונות, כולל ניתוח נתונים בזמן אמת, מחסני נתונים וארכיטקטורות של מיקרו-שירותים. המשך לחקור ולבנות!