অ্যাপাচি কাফকার জন্য Dataproc এবং Google পরিচালিত পরিষেবা ব্যবহার করে রিয়েল-টাইম IoT ডেটা প্রসেসিং

১. ভূমিকা

2efacab8643a653b.png

সর্বশেষ হালনাগাদ: ২০২৪-০৬-১০

পটভূমি

স্মার্ট হোম সলিউশন থেকে শুরু করে ইন্ডাস্ট্রিয়াল সেন্সর পর্যন্ত ইন্টারনেট অফ থিংস (IoT) ডিভাইসগুলো নেটওয়ার্কের প্রান্তে বিপুল পরিমাণ ডেটা তৈরি করে। এই ডেটা বিভিন্ন ক্ষেত্রে ব্যবহারের জন্য অমূল্য, যেমন ডিভাইস মনিটরিং, ট্র্যাকিং, ডায়াগনস্টিকস, সার্ভেইল্যান্স, পার্সোনালাইজেশন, ফ্লিট অপটিমাইজেশন এবং আরও অনেক কিছু। গুগল ম্যানেজড সার্ভিস ফর অ্যাপাচি কাফকা এই ডেটার অবিরাম প্রবাহকে একটি OSS সামঞ্জস্যপূর্ণ, সহজে ব্যবহারযোগ্য এবং নিরাপদ উপায়ে গ্রহণ ও সংরক্ষণ করার জন্য একটি পরিমাপযোগ্য এবং টেকসই পদ্ধতি প্রদান করে, অন্যদিকে গুগল ক্লাউড ডেটাপ্রোক অ্যাপাচি স্পার্ক এবং হ্যাডুপ ক্লাস্টার ব্যবহার করে ডেটা অ্যানালিটিক্সের জন্য এই বিশাল ডেটাসেটগুলো প্রসেস করার সুযোগ দেয়।

আপনি যা তৈরি করবেন

এই কোডল্যাবে, আপনি গুগল ম্যানেজড সার্ভিস ফর অ্যাপাচি কাফকা, ডেটাপ্রক, পাইথন এবং অ্যাপাচি স্পার্ক ব্যবহার করে একটি আইওটি ডেটা প্রসেসিং পাইপলাইন তৈরি করবেন যা রিয়েল-টাইম অ্যানালিটিক্স করে। আপনার পাইপলাইনটি যা করবে:

  • GCE VM ব্যবহার করে IoT ডিভাইস থেকে প্রাপ্ত ডেটা একটি পরিচালিত কাফকা ক্লাস্টারে প্রকাশ করুন।
  • ম্যানেজ কাফকা ক্লাস্টার থেকে ডেটাপ্রোক ক্লাস্টারে ডেটা স্ট্রিম করুন
  • Dataproc Spark Streaming job ব্যবহার করে ডেটা প্রসেস করুন

আপনি যা শিখবেন

  • কীভাবে গুগল পরিচালিত কাফকা এবং ডেটাপ্রোক ক্লাস্টার তৈরি করবেন
  • Dataproc ব্যবহার করে কীভাবে স্ট্রিমিং জব চালানো যায়

আপনার যা যা লাগবে

  • প্রজেক্ট সেট আপ করা একটি সক্রিয় GCP অ্যাকাউন্ট থাকতে হবে। যদি আপনার এমন কোনো অ্যাকাউন্ট না থাকে, তবে আপনি একটি ফ্রি ট্রায়ালের জন্য সাইন আপ করতে পারেন।
  • gcloud CLI ইনস্টল এবং কনফিগার করা হয়েছে। আপনি আপনার OS-এ gcloud CLI ইনস্টল করার জন্য নির্দেশাবলী অনুসরণ করতে পারেন।
  • আপনার GCP প্রোজেক্টে Google Managed Kafka এবং Dataproc- এর জন্য API সক্রিয় করুন।

২. সংক্ষিপ্ত বিবরণ

এই কোডল্যাবের জন্য, চলুন ডেমোআইওটি সলিউশনস (DemoIOT Solutions) নামক একটি ডামি কোম্পানির গল্প অনুসরণ করি। ডেমোআইওটি সলিউশনস এমন সেন্সর ডিভাইস সরবরাহ করে যা তাপমাত্রা, আর্দ্রতা, চাপ, আলোর মাত্রা এবং অবস্থানের ডেটা পরিমাপ ও প্রেরণ করে। তারা এমন পাইপলাইন স্থাপন করতে চায় যা এই ডেটা প্রসেস করে তাদের গ্রাহকদের রিয়েল-টাইম পরিসংখ্যান দেখাবে। এই ধরনের পাইপলাইন ব্যবহার করে, তারা তাদের গ্রাহকদের বিভিন্ন ধরনের পরিষেবা প্রদান করতে পারে, যেমন—পর্যবেক্ষণ, স্বয়ংক্রিয় পরামর্শ, সতর্কতা এবং গ্রাহকরা যেখানে তাদের সেন্সর স্থাপন করেছে সেই স্থানগুলো সম্পর্কে অন্তর্দৃষ্টি।

এটি করার জন্য, আমরা IoT ডিভাইসটিকে সিমুলেট করতে একটি GCE VM ব্যবহার করব। ডিভাইসটি গুগল পরিচালিত কাফকা ক্লাস্টারের একটি কাফকা টপিকে ডেটা প্রকাশ করবে, যা একটি ডেটাপ্রোক স্ট্রিমিং জব দ্বারা পড়া ও প্রসেস করা হবে। পূর্বশর্ত সেটআপ এবং পরবর্তী পৃষ্ঠাগুলো আপনাকে এই সমস্ত ধাপগুলো সম্পন্ন করতে নির্দেশনা দেবে।

পূর্বশর্ত সেটআপ

  1. আপনার প্রকল্পের জন্য প্রকল্পের নাম এবং প্রকল্প নম্বর খুঁজুন। তথ্যের জন্য ‘প্রকল্পের নাম, নম্বর এবং আইডি খুঁজুন’ দেখুন।
  2. ভিপিসি সাবনেটওয়ার্ক। এটি জিসিই ভিএম, কাফকা ক্লাস্টার এবং ডেটাপ্রক ক্লাস্টারের মধ্যে সংযোগ স্থাপন করতে সাহায্য করবে। gcloud CLI ব্যবহার করে বিদ্যমান সাবনেটগুলির তালিকা দেখতে এটি অনুসরণ করুন। প্রয়োজন হলে, একটি অটো মোড ভিপিসি নেটওয়ার্ক তৈরি করার পদ্ধতি অনুসরণ করুন, যা প্রতিটি গুগল ক্লাউড রিজিয়নে সাবনেটওয়ার্ক সহ একটি ভিপিসি নেটওয়ার্ক তৈরি করবে। তবে, এই কোডল্যাবের জন্য আমরা শুধুমাত্র একটি রিজিয়নের সাবনেটওয়ার্ক ব্যবহার করব।
  • এই সাবনেটওয়ার্কে, নিশ্চিত করুন যে tcp:22 থেকে সমস্ত ইনগ্রেসের অনুমতি দেওয়ার জন্য একটি ফায়ারওয়াল নিয়ম রয়েছে, যা SSH-এর জন্য প্রয়োজন। একটি নেটওয়ার্ক তৈরি করার সময় ফায়ারওয়াল নিয়ম বিভাগে এই নিয়মটি নির্বাচনের জন্য উপলব্ধ থাকবে, তাই নিশ্চিত করুন যে আপনি এটি নির্বাচন করেছেন।
  1. জিসিএস বাকেট। ডেটাপ্রোক জব রিসোর্স সংরক্ষণ করতে এবং প্রক্রিয়াকৃত ডেটা স্থায়ীভাবে রাখতে আপনার একটি গুগল ক্লাউড স্টোরেজ বাকেটে অ্যাক্সেস প্রয়োজন হবে। যদি আপনার এমন কোনো বাকেট না থাকে, তবে আপনি আপনার জিসিপি (GCP) প্রজেক্টে একটি তৈরি করে নিতে পারেন।

পরিবেশগত ভেরিয়েবলগুলি পূরণ করুন

আপনার টার্মিনালে, যেখানে আপনি gcloud CLI চালান, এই এনভায়রনমেন্ট ভেরিয়েবলগুলো পূরণ করে রাখুন যাতে পরে সেগুলো ব্যবহার করা যায়।

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

নিম্নলিখিতগুলি প্রতিস্থাপন করুন:

  • <project-id> হলো আপনার তৈরি করা GCP প্রজেক্টের নাম।
  • পূর্বশর্ত ধাপ ১ থেকে প্রাপ্ত প্রজেক্ট নম্বরের নাম সহ <project-number>
  • <region> এর মধ্যে উপলব্ধ অঞ্চল এবং জোনগুলো থেকে আপনার পছন্দের অঞ্চলের নামটি দিন। উদাহরণস্বরূপ, আমরা us-central1 ব্যবহার করতে পারি।
  • আপনার পূর্বে নির্বাচিত অঞ্চলের অধীনে উপলব্ধ অঞ্চল এবং জোনগুলো থেকে জোনের নামটি <zone> হিসেবে ব্যবহার করুন। উদাহরণস্বরূপ, যদি আপনি অঞ্চল হিসেবে us-central1 নির্বাচন করে থাকেন, তাহলে আপনি জোন হিসেবে us-central1-f ব্যবহার করতে পারেন। এই জোনটি IoT ডিভাইস সিমুলেটকারী GCE VM তৈরি করতে ব্যবহৃত হবে। নিশ্চিত করুন যে আপনার জোনটি আপনার নির্বাচিত অঞ্চলের মধ্যেই রয়েছে
  • <subnet-path> এ পূর্বশর্ত ধাপ ২ থেকে প্রাপ্ত সাবনেটের সম্পূর্ণ পাথটি দিতে হবে। এর মান অবশ্যই projects/<project-id>/regions/<region>/subnetworks/<subnet-name> ফরম্যাটের হতে হবে।
  • <bucket-name> হলো পূর্বশর্ত ধাপ ৩ থেকে প্রাপ্ত GCS বাকেটের নাম।

৩. গুগল ম্যানেজড কাফকা সেট আপ করুন

এই বিভাগে একটি গুগল পরিচালিত কাফকা ক্লাস্টার সেট আপ করা হয়, যা কাফকা সার্ভার স্থাপন করে এবং এই সার্ভারে একটি টপিক তৈরি করে যেখানে সাবস্ক্রাইব করার পর আইওটি ডেটা প্রকাশ ও পড়া যায়। ডেমোআইওটি সলিউশনস এই ক্লাস্টারটি এমনভাবে সেট আপ করতে পারে যাতে তাদের সমস্ত ডিভাইস এতে ডেটা প্রকাশ করে।

একটি পরিচালিত কাফকা ক্লাস্টার তৈরি করুন

  • ম্যানেজড কাফকা ক্লাস্টারটি তৈরি করুন। এখানে, ক্লাস্টারটির নাম হলো kafka-iot
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

আপনি নিম্নলিখিতের অনুরূপ একটি প্রতিক্রিয়া পাবেন:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

ক্লাস্টার তৈরি হতে ২০-৩০ মিনিট সময় লাগে। এই প্রক্রিয়াটি সম্পন্ন হওয়া পর্যন্ত অপেক্ষা করুন।

একটি বিষয় তৈরি করুন

  • ক্লাস্টারে একটি ম্যানেজড কাফকা টপিক তৈরি করুন। এখানে, টপিকটির নাম হলো kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

আপনি নিম্নলিখিতের অনুরূপ একটি আউটপুট পাবেন:

Created topic [kafka-iot-topic].

৪. একটি পাবলিশার সেট আপ করুন।

ম্যানেজড কাফকা ক্লাস্টারে পাবলিশ করার জন্য, আমরা একটি গুগল কম্পিউট ইঞ্জিন ভিএম ইনস্ট্যান্স সেট আপ করি যা ম্যানেজড কাফকা ক্লাস্টার দ্বারা ব্যবহৃত সাবনেট ধারণকারী ভিপিসি-টি অ্যাক্সেস করতে পারে। এই ভিএমটি ডেমোআইওটি সলিউশনস দ্বারা সরবরাহকৃত সেন্সর ডিভাইসগুলোকে সিমুলেট করে।

পদক্ষেপ

  1. Google Compute Engine VM ইনস্ট্যান্সটি তৈরি করুন। এখানে, GCE VM-টির নাম হলো publisher-instance
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. গুগল কম্পিউট ইঞ্জিন ডিফল্ট সার্ভিস অ্যাকাউন্টকে অ্যাপাচি কাফকার ম্যানেজড সার্ভিস ব্যবহার করার অনুমতি দিন।
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. ভিএম-এ সংযোগ করতে SSH ব্যবহার করুন। বিকল্পভাবে, SSH করার জন্য গুগল ক্লাউড কনসোল ব্যবহার করুন
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. কাফকা কমান্ড লাইন টুলগুলো চালানোর জন্য জাভা ইনস্টল করুন এবং এই কমান্ডগুলো ব্যবহার করে কাফকা বাইনারি ডাউনলোড করুন।
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. ম্যানেজড কাফকা অথেন্টিকেশন লাইব্রেরি ও এর নির্ভরতাগুলো ডাউনলোড করুন এবং কাফকা ক্লায়েন্টের বৈশিষ্ট্যগুলো কনফিগার করুন।
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

পাবলিশার মেশিন সেটআপ সম্পর্কে আরও বিস্তারিত জানতে, “ক্লায়েন্ট মেশিন সেট আপ করুন” অংশটি দেখুন।

৫. ম্যানেজড কাফকাতে প্রকাশ করুন

এখন যেহেতু পাবলিশারটি সেট আপ করা হয়ে গেছে, আমরা এর কাফকা কমান্ড লাইন ব্যবহার করে GCE VM (যা DemoIOT Solutions দ্বারা IoT ডিভাইস অনুকরণ করছে) থেকে কিছু ডামি ডেটা ম্যানেজড কাফকা ক্লাস্টারে পাবলিশ করতে পারি।

  1. যেহেতু আমরা GCE VM ইনস্ট্যান্সে SSH করেছি, তাই আমাদের PROJECT_ID ভেরিয়েবলটি পুনরায় পূরণ করতে হবে:
export PROJECT_ID=<project-id>
export REGION=<region>

নিম্নলিখিতগুলি প্রতিস্থাপন করুন:

  • <project-id> হলো আপনার তৈরি করা GCP প্রজেক্টের নাম।
  • <region> সেই অঞ্চল সহ যেখানে কাফকা ক্লাস্টারটি তৈরি করা হয়েছিল
  1. কাফকা বুটস্ট্র্যাপ সার্ভারের আইপি অ্যাড্রেস পেতে managed-kafka clusters describe কমান্ডটি ব্যবহার করুন। এই অ্যাড্রেসটি কাফকা ক্লাস্টারে সংযোগ করার জন্য ব্যবহার করা যেতে পারে।
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. ক্লাস্টারের বিষয়গুলো তালিকাভুক্ত করুন:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

আপনি নিম্নলিখিত আউটপুটটি দেখতে পাবেন, যেখানে আমাদের পূর্বে তৈরি করা kafka-iot-topic টপিকটি থাকবে।

__remote_log_metadata
kafka-iot-topic
  1. এই স্ক্রিপ্টটি কপি করে publish_iot_data.sh নামের একটি নতুন ফাইলে পেস্ট করুন। GCE VM-এ নতুন ফাইল তৈরি করার জন্য আপনি vim বা nano মতো টুল ব্যবহার করতে পারেন।
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

ব্যাখ্যা

  • এই স্ক্রিপ্টটি সিমুলেটেড সেন্সর রিডিং সহ JSON মেসেজ তৈরি করে, যেগুলিতে ডিভাইস আইডি, টাইমস্ট্যাম্প, সেন্সর ডেটা (তাপমাত্রা, আর্দ্রতা, চাপ, আলো), অবস্থানের তথ্য (অক্ষাংশ, দ্রাঘিমাংশ), ডিভাইসের স্ট্যাটাস (ব্যাটারি, সিগন্যাল, সংযোগের ধরণ) এবং কিছু মেটাডেটা থাকে।
  • এটি একটি নির্দিষ্ট সংখ্যক স্বতন্ত্র ডিভাইস থেকে বার্তার একটি অবিচ্ছিন্ন প্রবাহ তৈরি করে, যেখানে প্রতিটি ডিভাইস একটি নির্দিষ্ট সময় ব্যবধানে ডেটা পাঠায়, যা বাস্তব জগতের IoT ডিভাইসগুলোর অনুকরণ করে। এখানে, আমরা ১০টি ডিভাইস থেকে ডেটা প্রকাশ করি, যেগুলোর প্রতিটি ১০ সেকেন্ডের সময় ব্যবধানে ২০টি করে রিডিং প্রদান করে।
  • এটি কাফকা প্রোডিউসার কমান্ড লাইন টুল ব্যবহার করে তৈরি হওয়া সমস্ত ডেটা কাফকা টপিকে প্রকাশ করে।
  1. স্ক্রিপ্টটিতে ব্যবহৃত কিছু ডিপেন্ডেন্সি ইনস্টল করুন - গাণিতিক গণনার জন্য bc প্যাকেজ এবং JSON প্রক্রিয়াকরণের জন্য jq প্যাকেজ।
sudo apt-get install bc jq
  1. স্ক্রিপ্টটিকে এক্সিকিউটেবল করার জন্য পরিবর্তন করুন এবং স্ক্রিপ্টটি চালান। এটি চলতে প্রায় ২ মিনিট সময় লাগবে।
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

এই কমান্ডটি চালিয়ে আপনি যাচাই করতে পারেন যে ইভেন্টগুলো সফলভাবে প্রকাশিত হয়েছে কিনা, যা সমস্ত ইভেন্ট প্রিন্ট করবে। বের হওয়ার জন্য <control-c> চাপুন।

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

৬. ডেটাপ্রোক ক্লাস্টার সেট আপ করুন

এই অংশটি সেই VPC সাবনেটওয়ার্কে একটি Dataproc ক্লাস্টার তৈরি করে যেখানে Managed Kafka ক্লাস্টারটি অবস্থিত। এই ক্লাস্টারটি DemoIOT Solutions-এর জন্য প্রয়োজনীয় রিয়েল-টাইম পরিসংখ্যান এবং ইনসাইট তৈরি করে এমন জবগুলো চালানোর জন্য ব্যবহৃত হবে।

  1. একটি ডেটাপ্রোক ক্লাস্টার তৈরি করুন। এখানে ক্লাস্টারটির নাম হলো dataproc-iot
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

আপনি নিম্নলিখিতের অনুরূপ একটি প্রতিক্রিয়া পাবেন:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

ক্লাস্টার তৈরি হতে ১০-১৫ মিনিট সময় লাগতে পারে। এই প্রক্রিয়াটি সফলভাবে সম্পন্ন হওয়া পর্যন্ত অপেক্ষা করুন এবং ক্লাস্টারটির বর্ণনা দিয়ে যাচাই করুন যে এটি RUNNING অবস্থায় আছে।

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

৭. Dataproc ব্যবহার করে কাফকা বার্তাগুলি প্রক্রিয়া করুন

এই শেষ অংশে, আপনি একটি ডেটাপ্রক জব জমা দেবেন যা স্পার্ক স্ট্রিমিং ব্যবহার করে প্রকাশিত বার্তাগুলি প্রক্রিয়া করে। এই জবটি মূলত কিছু রিয়েল-টাইম পরিসংখ্যান এবং অন্তর্দৃষ্টি তৈরি করে যা ডেমোআইওটি সলিউশনস ব্যবহার করতে পারে।

  1. স্থানীয়ভাবে process_iot.py নামের স্ট্রিমিং পাইস্পার্ক জব ফাইলটি তৈরি করতে এই কমান্ডটি চালান।
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

ব্যাখ্যা

  • এই কোডটি একটি নির্দিষ্ট কাফকা টপিক থেকে ডেটা পড়ার জন্য একটি পাইস্পার্ক স্ট্রাকচার্ড স্ট্রিমিং জব সেট আপ করে। এটি কাফকা ব্রোকারের সাথে সংযোগ স্থাপন এবং প্রমাণীকরণের জন্য প্রদত্ত কাফকা সার্ভার বুটস্ট্র্যাপ অ্যাড্রেস এবং একটি GCS কনফিগ ফাইল থেকে লোড করা কাফকা কনফিগারেশন ব্যবহার করে।
  • এটি প্রথমে কাফকা থেকে বাইট অ্যারের একটি স্ট্রিম হিসাবে কাঁচা ডেটা পড়ে, সেই বাইট অ্যারেগুলিকে স্ট্রিং-এ রূপান্তর করে এবং ডেটার কাঠামো (ডিভাইস আইডি, টাইমস্ট্যাম্প, অবস্থান, সেন্সর ডেটা, ইত্যাদি) নির্দিষ্ট করার জন্য স্পার্কের StructType ব্যবহার করে json_schema প্রয়োগ করে।
  • এটি প্রিভিউ করার জন্য প্রথম ১০টি সারি কনসোলে প্রিন্ট করে, প্রতিটি সেন্সরের গড় তাপমাত্রা গণনা করে এবং সমস্ত ডেটা avro ফরম্যাটে GCS বাকেটে লিখে দেয়। অ্যাভ্রো হলো একটি সারি-ভিত্তিক ডেটা সিরিয়ালাইজেশন সিস্টেম যা কাঠামোগত ডেটাকে একটি কম্প্যাক্ট, স্কিমা-সংজ্ঞায়িত বাইনারি ফরম্যাটে দক্ষতার সাথে সংরক্ষণ করে এবং বৃহৎ পরিসরের ডেটা প্রক্রিয়াকরণের জন্য স্কিমা বিবর্তন, ভাষা নিরপেক্ষতা ও উচ্চ কম্প্রেশন সুবিধা প্রদান করে।
  1. client.properties ফাইলটি তৈরি করুন এবং কাফকা সার্ভারের বুটস্ট্র্যাপ অ্যাড্রেসের জন্য এনভায়রনমেন্ট ভেরিয়েবলটি পূরণ করুন।
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. process_iot.py এবং client.properties ফাইল দুটি আপনার গুগল ক্লাউড স্টোরেজ বাকেটে আপলোড করুন, যাতে Dataproc জবটি সেগুলো ব্যবহার করতে পারে।
gsutil cp process_iot.py client.properties $BUCKET
  1. Dataproc জবের জন্য কিছু ডিপেন্ডেন্সি জার আপনার GCS বাকেটে কপি করুন। এই ডিরেক্টরিতে কাফকা সহ Spark Streaming জব চালানোর জন্য প্রয়োজনীয় জার এবং "একটি ক্লায়েন্ট মেশিন সেট আপ করুন" থেকে নেওয়া Managed Kafka অথেনটিকেশন লাইব্রেরি ও এর ডিপেন্ডেন্সিগুলো রয়েছে।
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. স্পার্ক জবটি ডেটাপ্রোক ক্লাস্টারে জমা দিন।
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

স্পার্ক ড্রাইভারের লগগুলো প্রিন্ট করা হবে। এছাড়াও, আপনি কনসোলে এই টেবিলগুলো লগ হতে এবং আপনার GCS বাকেটে ডেটা সংরক্ষিত হতে দেখতে পাবেন।

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

৮. পরিষ্কার করুন

কোডল্যাবটি সম্পন্ন করার পর রিসোর্সগুলো পরিষ্কার করতে ধাপগুলো অনুসরণ করুন।

  1. ম্যানেজড কাফকা ক্লাস্টার, পাবলিশার জিসিই ভিএম এবং ডেটাপ্রক ক্লাস্টারটি মুছে ফেলুন।
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. আপনার VPC সাবনেটওয়ার্ক এবং নেটওয়ার্ক মুছে ফেলুন।
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. আপনি যদি ডেটা আর ব্যবহার করতে না চান, তাহলে আপনার GCS বাকেটটি মুছে ফেলুন।
gcloud storage rm --recursive $BUCKET

৯. অভিনন্দন

অভিনন্দন, আপনি Manage Kafka এবং Dataproc ব্যবহার করে সফলভাবে একটি IoT ডেটা প্রসেসিং পাইপলাইন তৈরি করেছেন, যা DemoIOT Solutions-কে তাদের ডিভাইসগুলো থেকে প্রকাশিত ডেটার ওপর রিয়েল-টাইম তথ্য পেতে সাহায্য করে!

আপনি একটি ম্যানেজড কাফকা ক্লাস্টার তৈরি করেছেন, এতে IoT ইভেন্ট প্রকাশ করেছেন এবং একটি ডেটাপ্রোক জব চালিয়েছেন যা স্পার্ক স্ট্রিমিং ব্যবহার করে রিয়েল টাইমে এই ইভেন্টগুলো প্রসেস করে। ম্যানেজড কাফকা এবং ডেটাপ্রোক ব্যবহার করে ডেটা পাইপলাইন তৈরির জন্য প্রয়োজনীয় মূল ধাপগুলো এখন আপনি জানেন।

রেফারেন্স নথি