Dataproc और Cloud Pub/Sub का इस्तेमाल करके, बदलाव का डेटा कैप्चर करने की सुविधा बनाना

1. परिचय

df8070bd84336207.png

पिछले अपडेट की तारीख: 19-06-2025

बदले गए डेटा को कैप्चर करने की सुविधा क्या है?

बदले गए डेटा को कैप्चर करने की प्रोसेस (सीडीसी), सॉफ़्टवेयर डिज़ाइन पैटर्न का एक सेट है. इसका इस्तेमाल, डेटाबेस में बदले गए डेटा का पता लगाने और उसे ट्रैक करने के लिए किया जाता है. आसान शब्दों में कहें, तो यह डेटा में किए गए बदलावों को कैप्चर और रिकॉर्ड करने का एक तरीका है, ताकि उन बदलावों को अन्य सिस्टम में दोहराया जा सके.

डेटा माइग्रेशन, रीयल-टाइम डेटा वेयरहाउसिंग और Analytics, आपदा से उबरने और हाई अवेलेबिलिटी, ऑडिट और अनुपालन वगैरह जैसे डेटा पर आधारित कई तरह के कामों के लिए, बदलावों को कैप्चर करने की सुविधा (सीडीसी) बहुत काम की होती है.

डेटा माइग्रेशन

सीडीसी, डेटा माइग्रेशन प्रोजेक्ट को आसान बनाता है. यह इंक्रीमेंटल डेटा ट्रांसफ़र की अनुमति देता है, जिससे डाउनटाइम कम हो जाता है और रुकावटें कम हो जाती हैं.

रीयल-टाइम डेटा वेयरहाउसिंग और विश्लेषण

सीडीसी यह पक्का करता है कि डेटा वेयरहाउस और विश्लेषण करने वाले सिस्टम, ऑपरेशनल डेटाबेस में हुए नए बदलावों के साथ लगातार अपडेट होते रहें.

इससे कारोबारों को रीयल-टाइम की जानकारी के आधार पर फ़ैसले लेने में मदद मिलती है.

डेटा रिकवरी और हाई अवेलेबिलिटी

डेटा में हुए बदलावों को कैप्चर करने की सुविधा (सीडीसी) की मदद से, आपदा के बाद डेटा वापस पाने के लिए, डेटा को सेकंडरी डेटाबेस में रीयल-टाइम में कॉपी किया जा सकता है. अगर कोई गड़बड़ी होती है, तो सीडीसी की मदद से तुरंत सेकंडरी डेटाबेस पर फ़ेलओवर किया जा सकता है. इससे डाउनटाइम कम होता है और डेटा का नुकसान भी कम होता है.

ऑडिट और अनुपालन

सीडीसी, डेटा में हुए बदलावों का पूरा ऑडिट ट्रेल उपलब्ध कराता है. यह ऑडिट ट्रेल, नियमों और शर्तों का पालन करने के लिए ज़रूरी है.

आपको क्या बनाने को मिलेगा

इस कोडलैब में, Cloud Pub/Sub, Dataproc, Python, और Apache Spark का इस्तेमाल करके, चेंज-डेटा-कैप्चर (सीडीसी) डेटा पाइपलाइन बनाई जाएगी. आपकी पाइपलाइन में ये कार्रवाइयां होंगी:

  • डेटाबेस में होने वाले बदलावों को सिम्युलेट करें और उन्हें Cloud Pub/Sub पर इवेंट के तौर पर पब्लिश करें. Cloud Pub/Sub, बड़े पैमाने पर इस्तेमाल की जा सकने वाली और भरोसेमंद मैसेजिंग सेवा है.
  • इन इवेंट को रीयल-टाइम में प्रोसेस करने के लिए, Dataproc का इस्तेमाल करें. यह Google Cloud की मैनेज की गई Spark और Hadoop सेवा है.

इन सेवाओं को कनेक्ट करके, एक मज़बूत पाइपलाइन बनाई जा सकती है. यह पाइपलाइन, डेटा में होने वाले बदलावों को कैप्चर और प्रोसेस कर सकती है. इससे रीयल-टाइम में आंकड़ों का विश्लेषण, डेटा वेयरहाउसिंग, और अन्य ज़रूरी ऐप्लिकेशन के लिए एक आधार तैयार किया जा सकता है.

आपको क्या सीखने को मिलेगा

  • डेटा कैप्चर करने वाली बुनियादी पाइपलाइन बनाने का तरीका
  • स्ट्रीम प्रोसेसिंग के लिए Dataproc
  • रीयल-टाइम मैसेजिंग के लिए Cloud Pub/Sub
  • Apache Spark के बारे में बुनियादी जानकारी

यह कोडलैब, Dataproc और Cloud Pub/Sub पर आधारित है. काम के न होने वाले कॉन्सेप्ट और कोड ब्लॉक को हटा दिया जाता है. साथ ही, आपको सिर्फ़ कॉपी और पेस्ट करने के लिए कॉन्सेप्ट और कोड ब्लॉक दिए जाते हैं.

आपको इन चीज़ों की ज़रूरत होगी

  • प्रोजेक्ट सेट अप किया गया हो और GCP खाता चालू हो. अगर आपके पास खाता नहीं है, तो बिना किसी शुल्क के इसे आज़माने के लिए साइन अप करें.
  • gcloud सीएलआई इंस्टॉल और कॉन्फ़िगर किया गया हो.
  • डेटाबेस में हुए बदलावों को सिम्युलेट करने और Pub/Sub के साथ इंटरैक्ट करने के लिए, Python 3.7+ इंस्टॉल किया गया हो.
  • Dataproc, Cloud Pub/Sub, Apache Spark, और Python की बुनियादी जानकारी.

शुरू करने से पहले

ज़रूरी एपीआई चालू करने के लिए, टर्मिनल में यह कमांड चलाएं:

gcloud services enable \
    dataproc.googleapis.com \
    pubsub.googleapis.com \

2. Cloud Pub/Sub को सेट अप करना

कोई विषय बनाएं

इस विषय का इस्तेमाल, डेटाबेस में हुए बदलावों को पब्लिश करने के लिए किया जाएगा. Dataproc जॉब, इन मैसेज का इस्तेमाल करेगा. साथ ही, बदलाव किए गए डेटा को कैप्चर करने के लिए, मैसेज को प्रोसेस करेगा. अगर आपको विषयों के बारे में ज़्यादा जानना है, तो आधिकारिक दस्तावेज़ यहां पढ़ें.

gcloud pubsub topics create database-changes

सदस्यता बनाना

Pub/Sub में मौजूद मैसेज इस्तेमाल करने के लिए, सदस्यता बनाएं. सदस्यताओं के बारे में ज़्यादा जानने के लिए, आधिकारिक दस्तावेज़ यहां पढ़ें.

gcloud pubsub subscriptions create --topic database-changes change-subscriber

3. डेटाबेस में किए गए बदलावों को सिम्युलेट करना

तरीका

  1. डेटाबेस में होने वाले बदलावों को सिम्युलेट करने और उन्हें Pub/Sub पर पब्लिश करने के लिए, एक Python स्क्रिप्ट (जैसे, simulate_cdc.py) बनाएं.
from google.cloud import pubsub_v1
import json
import time
import random

project_id = "YOUR_PROJECT_ID"  # Replace with your GCP project ID
topic_id = "database-changes"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

def publish_message(data):
    data_str = json.dumps(data).encode("utf-8")
    future = publisher.publish(topic_path, data=data_str)
    print(f"Published message ID: {future.result()}")

def simulate_change():
    change_types = ["INSERT", "UPDATE", "DELETE"]
    change_type = random.choice(change_types)
    record_id = random.randint(1, 100)
    timestamp = time.time()
    change_data = {
        "change_type": change_type,
        "record_id": record_id,
        "timestamp": timestamp,
        "data": {"field1": "value1", "field2": "value2"}, #Place holder data.
    }
    publish_message(change_data)

if __name__ == "__main__":
    while True:
        simulate_change()
        time.sleep(2)  # Simulate changes every 2 seconds

YOUR_PROJECT_ID की जगह अपना असल GCP प्रोजेक्ट आईडी डालें

  1. Pub/Sub क्लाइंट लाइब्रेरी इंस्टॉल करें:
pip install google-cloud-pubsub
  1. अपने टर्मिनल पर स्क्रिप्ट चलाएं. यह स्क्रिप्ट लगातार चलती रहेगी और हर दो सेकंड में Pub/Sub टॉपिक पर मैसेज पब्लिश करेगी.
python simulate_cdc.py
  1. स्क्रिप्ट को एक मिनट तक चलाने के बाद, Pub/Sub में इस्तेमाल करने के लिए काफ़ी मैसेज उपलब्ध होंगे. अपने ओएस के हिसाब से, ctrl + C या Cmd + C दबाकर, चल रही Python स्क्रिप्ट को बंद किया जा सकता है.
  2. पब्लिश किए गए मैसेज देखने के लिए:

पब्लिश किए गए मैसेज देखने के लिए, दूसरा टर्मिनल खोलें और यह कमांड चलाएं:

gcloud pubsub subscriptions pull --auto-ack change-subscriber

आपको टेबल की एक लाइन दिखेगी. इसमें मैसेज और अन्य फ़ील्ड शामिल होंगे:

{"change_type": "UPDATE", "record_id": 10, "timestamp": 1742466264.888465, "data": {"field1": "value1", "field2": "value2"}}

जानकारी

  • Python स्क्रिप्ट, डेटाबेस में बदलावों का सिम्युलेट करती है. इसके लिए, यह INSERT, UPDATE या DELETE इवेंट को रैंडम तरीके से जनरेट करती है.
  • हर बदलाव को एक JSON ऑब्जेक्ट के तौर पर दिखाया जाता है. इसमें बदलाव का टाइप, रिकॉर्ड आईडी, टाइमस्टैंप, और डेटा शामिल होता है.
  • यह स्क्रिप्ट, Cloud Pub/Sub क्लाइंट लाइब्रेरी का इस्तेमाल करके, बदलाव से जुड़े इन इवेंट को database-changes विषय पर पब्लिश करती है.
  • सदस्यता लेने वाले लोगों के लिए उपलब्ध कमांड की मदद से, Pub/Sub विषय पर भेजे जा रहे मैसेज देखे जा सकते हैं.

4. Dataproc के लिए सेवा खाता बनाना

इस सेक्शन में, आपको एक सेवा खाता बनाना होगा, जिसका इस्तेमाल Dataproc क्लस्टर कर सकता है. इसके अलावा, क्लस्टर इंस्टेंस को Cloud Pub/Sub और Dataproc को ऐक्सेस करने की अनुमति देने के लिए, ज़रूरी अनुमतियां असाइन करें.

  1. सेवा खाता बनाएं:
export SERVICE_ACCOUNT_NAME="dataproc-service-account"

gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME
  1. Dataproc वर्कर की भूमिका जोड़ें, ताकि सेवा खाता क्लस्टर बना सके और जॉब चला सके. यहां दिए गए निर्देश में, पिछले निर्देश में जनरेट किए गए सेवा खाते के आईडी को सदस्य के तौर पर जोड़ें:
export PROJECT=$(gcloud info --format='value(config.project)')

gcloud projects add-iam-policy-binding $PROJECT \
        --role roles/dataproc.worker \
        --member="serviceAccount:<your-service-account-with-domain>"
  1. Pub/Sub सदस्य की भूमिका जोड़ें, ताकि सेवा खाता "change-subscriber" Pub/Sub सदस्यता के लिए साइन अप कर सके:
gcloud beta pubsub subscriptions add-iam-policy-binding \
        change-subscriber \
        --role roles/pubsub.subscriber \
        --member="serviceAccount:<your-service-account-with-domain"

5. Dataproc क्लस्टर बनाना

Dataproc क्लस्टर, Spark ऐप्लिकेशन को चलाएगा. यह ऐप्लिकेशन, Pub/Sub में मौजूद मैसेज को प्रोसेस करेगा. आपको पिछले सेक्शन में बनाया गया सेवा खाता चाहिए होगा. Dataproc, इस सेवा खाते को क्लस्टर के हर इंस्टेंस को असाइन करता है, ताकि सभी इंस्टेंस को ऐप्लिकेशन चलाने के लिए सही अनुमतियां मिल सकें.

Dataproc क्लस्टर बनाने के लिए, इस निर्देश का इस्तेमाल करें:

gcloud dataproc clusters create cdc-dataproc-cluster \
    --region=us-central1 \
    --zone=us-central1-a \
    --scopes=pubsub,datastore \
    --image-version=1.3 \
    --service-account="<your-service-account-with-domain-id>"

6. Dataproc क्लस्टर में Spark जॉब सबमिट करना

स्पार्क स्ट्रीमिंग ऐप्लिकेशन, Pub/sub में डेटाबेस में हुए बदलावों के मैसेज को प्रोसेस करता है और उन्हें कंसोल पर प्रिंट करता है.

तरीका

  1. एक डायरेक्ट्री बनाएं और PubsubConsumer.scala फ़ाइल में उपभोक्ता का सोर्स कोड जोड़ें
mkdir -p dataproc-pubsub-spark-streaming/spark/src/main/scala/demo && \
touch dataproc-pubsub-spark-streaming/spark/src/main/scala/demo/PubsubConsumer.scala 
package demo

import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.pubsub.{PubsubUtils, SparkGCPCredentials}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PubsubConsumer {

  def createContext(projectID: String, checkpointDirectory: String)
    : StreamingContext = {

    // [START stream_setup]
    val sparkConf = new SparkConf().setAppName("DatabaseUpdates")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Set the checkpoint directory
    val yarnTags = sparkConf.get("spark.yarn.tags")
    val jobId = yarnTags.split(",").filter(_.startsWith("dataproc_job")).head
    ssc.checkpoint(checkpointDirectory + '/' + jobId)
    
    // Create stream
    val messagesStream: DStream[String] = PubsubUtils
      .createStream(
        ssc,
        projectID,
        None,
        "change-subscriber",  // Cloud Pub/Sub subscription for incoming database updates
        SparkGCPCredentials.builder.build(), StorageLevel.MEMORY_AND_DISK_SER_2)
      .map(message => new String(message.getData(), StandardCharsets.UTF_8))
    // [END stream_setup]

    processStringDStream(messagesStream)
    
        ssc
  }

  def processStringDStream(stringDStream: DStream[String]): Unit = {
    stringDStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val listOfStrings: List[String] = rdd.collect().toList
        listOfStrings.foreach(str => println(s"message received: $str"))
      } else {
        println("looking for message...")
      }
    }
  }

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("arguments are not passed correctly!")
      System.exit(1)
    }

    val Seq(projectID, checkpointDirectory) = args.toSeq

    // Create Spark context
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(projectID, checkpointDirectory))

    // Start streaming until we receive an explicit termination
    ssc.start()
    ssc.awaitTermination()
  }

}
  1. pom.xml में यह कोड बनाएं और जोड़ें
touch dataproc-pubsub-spark-streaming/spark/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <jvm.options.xms>-Xms512M</jvm.options.xms>
    <jvm.options.xmx>-Xmx2048M</jvm.options.xmx>
    <jvm.options.maxpermsize>-XX:MaxPermSize=2048M</jvm.options.maxpermsize>
    <jvm.options.xx>-XX:+CMSClassUnloadingEnabled</jvm.options.xx>
  </properties>

  <groupId>dataproc-spark-demos</groupId>
  <artifactId>spark-streaming-pubsub-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.2.0</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-datastore</artifactId>
      <version>1.34.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>spark-streaming-pubsub_2.11</artifactId>
      <version>2.2.0</version>
    </dependency>

    <dependency>
      <groupId>org.scalacheck</groupId>
      <artifactId>scalacheck_2.11</artifactId>
      <version>1.14.0</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

  <build>
    <plugins>

      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>demo.PubsubConsumer</mainClass>
                </transformer>
              </transformers>
              <relocations>
                <relocation>
                  <pattern>com</pattern>
                  <shadedPattern>repackaged.com</shadedPattern>
                  <includes>
                    <include>com.google.protobuf.**</include>
                    <include>com.google.common.**</include>
                  </includes>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>

    </plugins>
  </build>

</project>
  1. प्रोजेक्ट की स्पार्क डायरेक्ट्री में बदलाव करें और पाथ को एनवायरमेंट वैरिएबल में सेव करें, ताकि बाद में इसका इस्तेमाल किया जा सके:
cd dataproc-pubsub-spark-streaming
export REPO_ROOT=$PWD
  1. डायरेक्ट्री बदलें:
cd $REPO_ROOT/spark
  1. Java 1.8 डाउनलोड करें और फ़ोल्डर को /usr/lib/jvm/ में रखें. इसके बाद, JAVA_HOME को बदलकर यह करें:
export JAVA_HOME=/usr/lib/jvm/<your-java-1.8>/jre
  1. ऐप्लिकेशन जार बनाना
mvn clean package

ऐप्लिकेशन कोड और डिपेंडेंसी वाला spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar संग्रह, spark/target डायरेक्ट्री में बनाया जाता है

  1. स्पार्क ऐप्लिकेशन सबमिट करें:
export PROJECT=$(gcloud info --format='value(config.project)')
export JAR="spark-streaming-pubsub-demo-1.0-SNAPSHOT.jar"
export SPARK_PROPERTIES="spark.dynamicAllocation.enabled=false,spark.streaming.receiver.writeAheadLog.enabled=true"
export ARGUMENTS="$PROJECT hdfs:///user/spark/checkpoint"

gcloud dataproc jobs submit spark \
    --cluster cdc-dataproc-cluster \
    --region us-central1 \
    --async \
    --jar target/$JAR \
    --max-failures-per-hour 10 \
    --properties $SPARK_PROPERTIES \
    -- $ARGUMENTS
  1. चालू जॉब की सूची दिखाएं और जॉब के लिए JOB_ID वैल्यू नोट करें:
gcloud dataproc jobs list --region=us-central1 --state-filter=active

आउटपुट कुछ ऐसा दिखेगा

JOB_ID                            TYPE   STATUS
473ecb6d14e2483cb88a18988a5b2e56  spark  RUNNING
  1. अपने ब्राउज़र में यह यूआरएल खोलकर, नौकरी का आउटपुट देखें. [JOB_ID] को पिछले चरण में नोट की गई वैल्यू से बदलें.
https://console.cloud.google.com/dataproc/jobs/[JOB_ID]?region=us-central1
  1. आउटपुट इस तरह का होता है:
looking for message...
looking for message...
message received: {"change_type": "INSERT", "record_id": 72, "timestamp": 1745409434.969086, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 55, "timestamp": 1745409439.269171, "data": {"field1": "value1", "field2": "value2"}}
looking for message...
message received: {"change_type": "DELETE", "record_id": 71, "timestamp": 1745409430.673305, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 15, "timestamp": 1745409432.819154, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "DELETE", "record_id": 18, "timestamp": 1745409426.3570209, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "INSERT", "record_id": 85, "timestamp": 1745409428.5078359, "data": {"field1": "value1", "field2": "value2"}}
message received: {"change_type": "UPDATE", "record_id": 18, "timestamp": 1745409441.436026, "data": {"field1": "value1", "field2": "value2"}}
looking for message...

Dataproc में चल रहा स्पार्क स्ट्रीमिंग जॉब, Pub/sub से मैसेज खींचता है, उन्हें प्रोसेस करता है, और आउटपुट को कंसोल पर दिखाता है.

  1. जॉब बंद करना: जॉब बंद करने के लिए, यह कमांड चलाएं. JOB_ID को उसी आईडी से बदलें जिसे हमने पहले नोट किया था
gcloud dataproc jobs kill [JOB_ID] --region us-central1 --quiet

बधाई हो! आपने अभी-अभी एक शक्तिशाली सीडीसी पाइपलाइन बनाई है. यह Pub/Sub में डेटाबेस में हुए बदलावों को कैप्चर करती है और Cloud Dataproc में चल रही Spark Streaming का इस्तेमाल करके उन्हें प्रोसेस करती है.

7. व्यवस्थित करें

आपने जो भी संसाधन बनाए हैं उन्हें हटा दें, ताकि आने वाले समय में आपसे उनका शुल्क न लिया जाए. बिलिंग से बचने का सबसे आसान तरीका यह है कि ट्यूटोरियल के लिए बनाया गया प्रोजेक्ट मिटा दें. इसके अलावा, अलग-अलग संसाधनों को भी मिटाया जा सकता है.

अलग-अलग संसाधनों को मिटाने के लिए, ये कमांड चलाएं

gcloud dataproc clusters delete cdc-dataproc-cluster --quiet
gcloud pubsub topics delete database-changes --quiet
gcloud pubsub subscriptions delete change-subscriber --quiet
gcloud iam service-accounts delete <your-service-account-with-domain> --quiet

8. बधाई हो

बधाई हो! आपने अभी-अभी एक कोडलैब पूरा किया है. इसमें Google Cloud Platform का इस्तेमाल करके, रीयल-टाइम डेटा पाइपलाइन बनाने का तरीका बताया गया है. आइए, एक बार फिर से जान लेते हैं कि आपने क्या-क्या हासिल किया है:

  • सिम्युलेटेड चेंज डेटा कैप्चर (सीडीसी): आपने सीडीसी के बुनियादी सिद्धांतों के बारे में जाना. साथ ही, डेटाबेस में बदलावों को सिम्युलेट करने के लिए, Python स्क्रिप्ट लागू की. इससे ऐसे इवेंट जनरेट हुए जो रीयल-टाइम डेटा में हुए बदलावों को दिखाते हैं.
  • Cloud Pub/Sub का इस्तेमाल किया गया: आपने Cloud Pub/Sub के विषयों और सदस्यताओं को सेट अप किया है. इससे आपको सीडीसी इवेंट को स्ट्रीम करने के लिए, एक भरोसेमंद और बड़े पैमाने पर इस्तेमाल की जा सकने वाली मैसेजिंग सेवा मिलती है. आपने Pub/Sub में, डेटाबेस में किए गए बदलावों को पब्लिश किया है. इससे रीयल-टाइम डेटा स्ट्रीम बन गई है.
  • Dataproc और Spark की मदद से प्रोसेस किया गया डेटा: आपने Dataproc क्लस्टर तैयार किया है और Pub/Sub सदस्यता से मैसेज पाने के लिए, Spark Streaming जॉब को डिप्लॉय किया है. आपने सीडीसी इवेंट को रीयल-टाइम में प्रोसेस और ट्रांसफ़ॉर्म किया. साथ ही, नतीजों को Dataproc जॉब लॉग में दिखाया.
  • एंड-टू-एंड रीयल-टाइम पाइपलाइन बनाई गई है: आपने इन सेवाओं को इंटिग्रेट करके एक पूरी डेटा पाइपलाइन बनाई है. यह पाइपलाइन, डेटा में होने वाले बदलावों को रीयल-टाइम में कैप्चर, स्ट्रीम, और प्रोसेस करती है. आपको एक ऐसा सिस्टम बनाने का व्यावहारिक अनुभव मिला जो लगातार डेटा स्ट्रीम को मैनेज कर सकता है.
  • Spark Pub/Sub कनेक्टर का इस्तेमाल किया गया हो: आपने Spark Pub/Sub कनेक्टर का इस्तेमाल करने के लिए, Dataproc क्लस्टर को कॉन्फ़िगर किया हो. यह Spark Structured Streaming के लिए ज़रूरी है, ताकि वह Pub/Sub से डेटा पढ़ सके.

अब आपके पास अलग-अलग ऐप्लिकेशन के लिए, ज़्यादा जटिल और बेहतर डेटा पाइपलाइन बनाने का एक मज़बूत आधार है. इनमें रीयल-टाइम में डेटा का विश्लेषण, डेटा वेयरहाउसिंग, और माइक्रोसेवाओं के आर्किटेक्चर शामिल हैं. एक्सप्लोर करते रहें और कम्यूनिटी को बढ़ाते रहें!

रेफ़रंस दस्तावेज़