Dataproc और Apache Kafka के लिए Google की मैनेज की जाने वाली सेवा का इस्तेमाल करके, रीयल-टाइम IoT डेटा प्रोसेसिंग

1. परिचय

2efacab8643a653b.png

पिछले अपडेट की तारीख: 10-06-2024

बैकग्राउंड

इंटरनेट ऑफ़ थिंग्स (IoT) डिवाइस, नेटवर्क के किनारे पर बड़ी मात्रा में डेटा जनरेट करते हैं. इनमें स्मार्ट होम समाधान से लेकर इंडस्ट्रियल सेंसर तक शामिल हैं. यह डेटा कई तरह के कामों के लिए बहुत ज़रूरी है. जैसे, डिवाइस की निगरानी करना, उसे ट्रैक करना, उसकी जांच करना, निगरानी करना, उसे मनमुताबिक बनाना, फ्लीट ऑप्टिमाइज़ेशन वगैरह. Google Managed Service for Apache Kafka, इस डेटा स्ट्रीम को ओएसएस के साथ काम करने वाले, इस्तेमाल में आसान, और सुरक्षित तरीके से इनजेस्ट और सेव करने का एक ऐसा तरीका है जिसे ज़रूरत के हिसाब से बढ़ाया जा सकता है. वहीं, Google Cloud Dataproc की मदद से, Apache Spark और Hadoop क्लस्टर का इस्तेमाल करके, डेटा के विश्लेषण के लिए इन बड़े डेटासेट को प्रोसेस किया जा सकता है.

आपको क्या बनाने को मिलेगा

इस कोडलैब में, Google Managed Service for Apache Kafka, Dataproc, Python, और Apache Spark का इस्तेमाल करके, IoT डेटा प्रोसेसिंग पाइपलाइन बनाई जाएगी. यह पाइपलाइन, रीयल-टाइम में डेटा का विश्लेषण करती है. आपकी पाइपलाइन में ये कार्रवाइयां होंगी:

  • GCE VM का इस्तेमाल करके, IoT डिवाइसों से Managed Kafka क्लस्टर में डेटा पब्लिश करना
  • Manage Kafka क्लस्टर से Dataproc क्लस्टर में डेटा स्ट्रीम करना
  • Dataproc Spark Streaming जॉब का इस्तेमाल करके डेटा प्रोसेस करना

आपको क्या सीखने को मिलेगा

  • Google के मैनेज किए गए Kafka और Dataproc क्लस्टर बनाने का तरीका
  • Dataproc का इस्तेमाल करके स्ट्रीमिंग जॉब चलाने का तरीका

आपको इन चीज़ों की ज़रूरत होगी

2. खास जानकारी

इस कोडलैब के लिए, हम एक डमी कंपनी, DemoIOT Solutions की कहानी को फ़ॉलो करेंगे. DemoIOT Solutions, सेंसर वाले ऐसे डिवाइस उपलब्ध कराता है जो तापमान, नमी, दबाव, रोशनी के लेवल, और जगह की जानकारी का डेटा मेज़र करते हैं और उसे ट्रांसमिट करते हैं. वे ऐसी पाइपलाइन सेट अप करना चाहते हैं जो इस डेटा को प्रोसेस करे, ताकि वे अपने ग्राहकों को रीयल टाइम के आंकड़े दिखा सकें. इन पाइपलाइन का इस्तेमाल करके, वे अपने ग्राहकों को कई तरह की सेवाएं दे सकते हैं. जैसे, निगरानी करना, अपने-आप सुझाव मिलना, सूचनाएं पाना, और उन जगहों के बारे में अहम जानकारी पाना जहां ग्राहकों ने अपने सेंसर इंस्टॉल किए हैं.

इसके लिए, हम IoT डिवाइस का सिम्युलेट करने के लिए GCE VM का इस्तेमाल करेंगे. डिवाइस, Google Managed Kafka क्लस्टर में मौजूद Kafka विषय पर डेटा पब्लिश करेगा. इसे Dataproc स्ट्रीमिंग जॉब पढ़ेगा और प्रोसेस करेगा. ज़रूरी शर्तें पूरी करने के लिए सेटअप और यहां दिए गए पेजों पर जाकर, इन सभी चरणों को पूरा करें.

ज़रूरी शर्तें

  1. अपने प्रोजेक्ट का नाम और प्रोजेक्ट नंबर ढूंढें. रेफ़रंस के लिए, प्रोजेक्ट का नाम, नंबर, और आईडी ढूंढना लेख पढ़ें.
  2. वीपीसी सबनेटवर्क. इससे GCE वीएम, Kafka क्लस्टर, और Dataproc क्लस्टर के बीच कनेक्टिविटी चालू हो जाएगी. gcloud सीएलआई का इस्तेमाल करके मौजूदा सबनेट की सूची बनाने के लिए, यह तरीका अपनाएं. अगर ज़रूरी हो, तो ऑटो मोड वाला वीपीसी नेटवर्क बनाएं. इससे हर Google Cloud क्षेत्र में सबनेटवर्क वाला वीपीसी नेटवर्क बन जाएगा. हालांकि, इस कोडलैब के लिए, हम सिर्फ़ एक क्षेत्र के सबनेटवर्क का इस्तेमाल करेंगे.
  • इस सबनेटवर्क में, पक्का करें कि फ़ायरवॉल का ऐसा नियम मौजूद हो जो tcp:22 से आने वाले सभी इन्ग्रेस डेटा ट्रैफ़िक को अनुमति देता हो. यह एसएसएच के लिए ज़रूरी है. नेटवर्क बनाते समय, यह नियम फ़ायरवॉल के नियमों वाले सेक्शन में चुनने के लिए उपलब्ध होगा. इसलिए, इसे चुनना न भूलें.
  1. GCS बकेट. Dataproc जॉब के संसाधनों को सेव करने और प्रोसेस किए गए डेटा को बनाए रखने के लिए, आपको Google Cloud Storage बकेट का ऐक्सेस चाहिए होगा. अगर आपके पास ऐसा कोई खाता नहीं है, तो अपने GCP प्रोजेक्ट में एक खाता बनाएं.

एनवायरमेंट वैरिएबल की वैल्यू सेट करना

gcloud CLI चलाने वाले टर्मिनल में, इन एनवायरमेंट वैरिएबल को भरें, ताकि इनका इस्तेमाल बाद में किया जा सके.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

इनकी जगह ये डालें:

  • <project-id> में, सेट अप किए गए GCP प्रोजेक्ट का नाम दिखेगा.
  • <project-number> की जगह, ज़रूरी शर्तें पूरी करने के पहले चरण में दिए गए प्रोजेक्ट नंबर का नाम डालें.
  • <region> को उपलब्ध क्षेत्र और ज़ोन में से किसी ऐसे क्षेत्र के नाम से बदलें जिसका आपको इस्तेमाल करना है. उदाहरण के लिए, हम us-central1 का इस्तेमाल कर सकते हैं.
  • <zone> में, पहले चुने गए क्षेत्र के नीचे मौजूद उपलब्ध क्षेत्र और ज़ोन में से किसी ज़ोन का नाम डालें. उदाहरण के लिए, अगर आपने us-central1 को क्षेत्र के तौर पर चुना है, तो us-central1-f को ज़ोन के तौर पर इस्तेमाल किया जा सकता है. इस ज़ोन का इस्तेमाल, GCE वर्चुअल मशीन बनाने के लिए किया जाएगा. यह मशीन, IoT डिवाइसों की तरह काम करेगी. पक्का करें कि आपका ज़ोन, उस इलाके में हो जिसे आपने चुना है.
  • <subnet-path> में, ज़रूरी शर्तें पूरी करने के दूसरे चरण में बताया गया सबनेट का पूरा पाथ शामिल होना चाहिए. इसकी वैल्यू, projects/<project-id>/regions/<region>/subnetworks/<subnet-name> फ़ॉर्मैट में होनी चाहिए.
  • <bucket-name> को बदलें. इसकी जगह, ज़रूरी शर्तें पूरी करने के तीसरे चरण में बताए गए GCS बकेट का नाम डालें.

3. Google Managed Kafka को सेट अप करना

इस सेक्शन में, Google मैनेज किया गया Kafka क्लस्टर सेट अप किया जाता है. यह Kafka सर्वर को डिप्लॉय करता है और इस सर्वर पर एक विषय बनाता है. इस विषय पर IoT डेटा पब्लिश किया जा सकता है. साथ ही, इसकी सदस्यता लेने के बाद इसे पढ़ा जा सकता है. DemoIOT Solutions इस क्लस्टर को सेट अप कर सकता है, ताकि उसके सभी डिवाइस इस पर डेटा पब्लिश कर सकें.

मैनेज किया गया Kafka क्लस्टर बनाना

  • मैनेज किया गया Kafka क्लस्टर बनाएं. यहां क्लस्टर का नाम kafka-iot है.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

आपको इस तरह का जवाब मिलेगा:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

क्लस्टर बनने में 20 से 30 मिनट लगते हैं. इस कार्रवाई के पूरा होने का इंतज़ार करें.

कोई विषय बनाएं

  • क्लस्टर पर मैनेज किया गया Kafka विषय बनाएं. यहां विषय का नाम kafka-iot-topic है.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

आपको इससे मिलता-जुलता आउटपुट मिलेगा:

Created topic [kafka-iot-topic].

4. कोई पब्लिशर सेट अप करना

Managed Kafka क्लस्टर में पब्लिश करने के लिए, हम Google Compute Engine VM इंस्टेंस सेट अप करते हैं. यह Managed Kafka क्लस्टर के इस्तेमाल किए गए सबनेट वाले वीपीसी को ऐक्सेस कर सकता है. यह वीएम, DemoIOT Solutions के सेंसर डिवाइसों को सिम्युलेट करता है.

चरण

  1. Google Compute Engine VM इंस्टेंस बनाएं. यहां GCE वीएम का नाम publisher-instance है.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. Google Compute Engine के डिफ़ॉल्ट सेवा खाते को Managed Service for Apache Kafka इस्तेमाल करने की अनुमतियां दें.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. वीएम से कनेक्ट करने के लिए, एसएसएच का इस्तेमाल करें. इसके अलावा, एसएसएच के लिए Google Cloud Console का इस्तेमाल करें.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Kafka के कमांड लाइन टूल चलाने के लिए, Java इंस्टॉल करें. साथ ही, इन कमांड का इस्तेमाल करके Kafka बाइनरी डाउनलोड करें.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. Managed Kafka की पुष्टि करने वाली लाइब्रेरी और उसकी डिपेंडेंसी डाउनलोड करें. साथ ही, Kafka क्लाइंट प्रॉपर्टी कॉन्फ़िगर करें.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

पब्लिशर मशीन सेटअप करने के बारे में ज़्यादा जानकारी के लिए, क्लाइंट मशीन सेट अप करना लेख पढ़ें.

5. मैनेज किए गए Kafka में पब्लिश करें

पब्लिशर सेट अप हो जाने के बाद, हम इस पर Kafka कमांड लाइन का इस्तेमाल कर सकते हैं. इससे GCE वीएम (DemoIOT Solutions के IoT डिवाइसों का सिम्युलेट करके) से मैनेज किए गए Kafka क्लस्टर में कुछ डमी डेटा पब्लिश किया जा सकता है.

  1. हमने GCE वीएम इंस्टेंस में SSH किया है. इसलिए, हमें PROJECT_ID वैरिएबल को फिर से पॉप्युलेट करना होगा:
export PROJECT_ID=<project-id>
export REGION=<region>

इनकी जगह ये डालें:

  • <project-id> में, सेट अप किए गए GCP प्रोजेक्ट का नाम दिखेगा.
  • <region> में वह इलाका होता है जहां Kafka क्लस्टर बनाया गया था
  1. Kafka बूटस्ट्रैप सर्वर का आईपी पता पाने के लिए, managed-kafka clusters describe कमांड का इस्तेमाल करें. इस पते का इस्तेमाल, Kafka क्लस्टर से कनेक्ट करने के लिए किया जा सकता है.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. क्लस्टर में मौजूद विषयों की सूची बनाएं:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

आपको यह आउटपुट दिखना चाहिए. इसमें kafka-iot-topic शामिल है, जिसे हमने पहले बनाया था.

__remote_log_metadata
kafka-iot-topic
  1. इस स्क्रिप्ट को कॉपी करके नई फ़ाइल publish_iot_data.sh में चिपकाएं. GCE वीएम पर नई फ़ाइल बनाने के लिए, vim या nano जैसे टूल का इस्तेमाल किया जा सकता है.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

ज़्यादा जानकारी

  • यह स्क्रिप्ट, सेंसर की नकली रीडिंग वाले JSON मैसेज बनाती है. इनमें डिवाइस आईडी, टाइमस्टैंप, सेंसर डेटा (तापमान, नमी, दबाव, रोशनी), जगह की जानकारी (अक्षांश, देशांतर), डिवाइस की स्थिति (बैटरी, सिग्नल, कनेक्शन टाइप) और कुछ मेटाडेटा होता है.
  • यह कुछ खास डिवाइसों से लगातार मैसेज जनरेट करता है. हर डिवाइस, तय किए गए समय अंतराल पर डेटा भेजता है. इससे असल दुनिया के IoT डिवाइसों की तरह काम करने में मदद मिलती है. यहां, हम 10 डिवाइसों का डेटा पब्लिश करते हैं. हर डिवाइस से 20 रीडिंग मिलती हैं और हर रीडिंग के बीच 10 सेकंड का समय होता है.
  • यह जनरेट किए गए सभी डेटा को Kafka के विषय पर भी पब्लिश करता है. इसके लिए, Kafka प्रोड्यूसर कमांड लाइन टूल का इस्तेमाल किया जाता है.
  1. स्क्रिप्ट में इस्तेमाल की गई कुछ डिपेंडेंसी इंस्टॉल करें - गणितीय कैलकुलेशन के लिए bc पैकेज और JSON प्रोसेसिंग के लिए jq पैकेज.
sudo apt-get install bc jq
  1. स्क्रिप्ट में बदलाव करके उसे एक्ज़ीक्यूटेबल बनाएं और स्क्रिप्ट चलाएं. इसे चलने में करीब दो मिनट लगेंगे.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

इस कमांड को चलाकर यह देखा जा सकता है कि इवेंट पब्लिश हुए हैं या नहीं. इससे सभी इवेंट प्रिंट हो जाएंगे. बाहर निकलने के लिए, <control-c> दबाएं.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. Dataproc क्लस्टर सेट अप करना

यह सेक्शन, वीपीसी सबनेटवर्क में एक Dataproc क्लस्टर बनाता है. इसी सबनेटवर्क में मैनेज किया गया Kafka क्लस्टर मौजूद होता है. इस क्लस्टर का इस्तेमाल उन जॉब को चलाने के लिए किया जाएगा जो DemoIOT Solutions के लिए ज़रूरी रीयल टाइम के आंकड़े और अहम जानकारी जनरेट करती हैं.

  1. Dataproc क्लस्टर बनाएं. यहां क्लस्टर का नाम dataproc-iot है.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

आपको इस तरह का जवाब मिलेगा:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

क्लस्टर बनने में 10 से 15 मिनट लग सकते हैं. इस कार्रवाई के पूरा होने का इंतज़ार करें. साथ ही, क्लस्टर की जानकारी देकर देखें कि क्लस्टर RUNNING स्थिति में है या नहीं.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Dataproc का इस्तेमाल करके Kafka मैसेज प्रोसेस करना

इस आखिरी सेक्शन में, आपको एक Dataproc जॉब सबमिट करनी होगी. यह जॉब, पब्लिश किए गए मैसेज को प्रोसेस करेगी. इसके लिए, Spark Streaming का इस्तेमाल किया जाएगा. इस जॉब से, रीयल टाइम के कुछ आंकड़े और अहम जानकारी जनरेट होती है. इसका इस्तेमाल DemoIOT Solutions कर सकता है.

  1. process_iot.py नाम की स्ट्रीमिंग PySpark जॉब फ़ाइल को स्थानीय तौर पर बनाने के लिए, यह कमांड चलाएं.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

ज़्यादा जानकारी

  • यह कोड, PySpark Structured Streaming जॉब को सेट अप करता है, ताकि किसी तय किए गए Kafka विषय से डेटा पढ़ा जा सके. यह Kafka ब्रोकर से कनेक्ट करने और उसकी पुष्टि करने के लिए, दिए गए Kafka सर्वर बूटस्ट्रैप पते और GCS कॉन्फ़िगरेशन फ़ाइल से लोड किए गए Kafka कॉन्फ़िगरेशन का इस्तेमाल करता है.
  • यह सबसे पहले Kafka से रॉ डेटा को बाइट ऐरे की स्ट्रीम के तौर पर पढ़ता है. इसके बाद, उन बाइट ऐरे को स्ट्रिंग में बदलता है. साथ ही, डेटा के स्ट्रक्चर (डिवाइस आईडी, टाइमस्टैंप, जगह, सेंसर डेटा वगैरह) के बारे में बताने के लिए, Spark के StructType का इस्तेमाल करके json_schema लागू करता है.
  • यह कंसोल पर पहली 10 पंक्तियों को प्रिंट करता है, ताकि उन्हें देखा जा सके. साथ ही, हर सेंसर के हिसाब से औसत तापमान का हिसाब लगाता है और सभी डेटा को avro फ़ॉर्मैट में GCS बकेट में लिखता है. Avro, लाइन पर आधारित डेटा सीरियलाइज़ेशन सिस्टम है. यह स्ट्रक्चर्ड डेटा को छोटे, स्कीमा के हिसाब से तय किए गए बाइनरी फ़ॉर्मैट में सेव करता है. साथ ही, स्कीमा में बदलाव करने, भाषा के हिसाब से काम करने, और बड़े पैमाने पर डेटा प्रोसेसिंग के लिए ज़्यादा कंप्रेस करने की सुविधा देता है.
  1. client.properties फ़ाइल बनाएं और Kafka सर्वर के बूटस्ट्रैप पते के लिए एनवायरमेंट वैरिएबल भरें.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. process_iot.py और client.properties फ़ाइलों को अपने Google Cloud Storage बकेट में अपलोड करें, ताकि Dataproc जॉब में उनका इस्तेमाल किया जा सके.
gsutil cp process_iot.py client.properties $BUCKET
  1. Dataproc जॉब के लिए, कुछ डिपेंडेंसी जार को अपने GCS बकेट में कॉपी करें. इस डायरेक्ट्री में ऐसे जार होते हैं जिनकी ज़रूरत Kafka के साथ Spark Streaming के जॉब चलाने के लिए होती है. साथ ही, इसमें Managed Kafka की पुष्टि करने वाली लाइब्रेरी और उसकी डिपेंडेंसी भी होती हैं. ये सभी जार, क्लाइंट मशीन सेट अप करें से लिए जाते हैं.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Dataproc क्लस्टर में Spark जॉब सबमिट करें.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Spark ड्राइवर के लॉग प्रिंट किए जाएंगे. आपको कंसोल में लॉग की गई ये टेबल और आपके GCS बकेट में सेव किया गया डेटा भी दिखना चाहिए.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. व्यवस्थित करें

कोडलैब पूरा करने के बाद, संसाधनों को हटाने के लिए यह तरीका अपनाएं.

  1. Managed Kafka क्लस्टर, Publisher GCE VM, और Dataproc क्लस्टर मिटाएं.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. अपने वीपीसी सबनेटवर्क और नेटवर्क को मिटाएं.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. अगर आपको डेटा का इस्तेमाल नहीं करना है, तो अपना GCS बकेट मिटाएं.
gcloud storage rm --recursive $BUCKET

9. बधाई हो

बधाई हो, आपने Manage Kafka और Dataproc की मदद से, IoT डेटा प्रोसेसिंग पाइपलाइन को सफलतापूर्वक बनाया है. इससे DemoIOT Solutions को, उनके डिवाइसों से पब्लिश किए गए डेटा के बारे में रीयल टाइम में अहम जानकारी मिलती है!

आपने मैनेज किया गया Kafka क्लस्टर बनाया है. साथ ही, आपने उस पर IoT इवेंट पब्लिश किए हैं. इसके अलावा, आपने एक Dataproc जॉब चलाया है. यह जॉब, Spark स्ट्रीमिंग का इस्तेमाल करके इन इवेंट को रीयल टाइम में प्रोसेस करता है. अब आपको मैनेज किए गए Kafka और Dataproc का इस्तेमाल करके, डेटा पाइपलाइन बनाने के लिए ज़रूरी मुख्य चरणों के बारे में पता चल गया है.

रेफ़रंस दस्तावेज़