معالجة بيانات إنترنت الأشياء في الوقت الفعلي باستخدام Dataproc وGoogle Managed Service for Apache Kafka

1. مقدمة

2efacab8643a653b.png

تاريخ آخر تعديل: 2024-06-10

الخلفية

تُنشئ أجهزة إنترنت الأشياء (IoT)، بدءًا من حلول المنازل المزوّدة بأجهزة ذكية ووصولاً إلى أدوات الاستشعار الصناعية، كميات هائلة من البيانات على أطراف الشبكة. تُعدّ هذه البيانات قيّمة للغاية في مجموعة متنوعة من حالات الاستخدام، مثل مراقبة الأجهزة وتتبُّعها وتشخيصها وتخصيصها وتحسين أداء مجموعة الأجهزة وغير ذلك الكثير. توفّر خدمة Google Managed Service for Apache Kafka طريقة قابلة للتوسّع ومتينة لاستيعاب وتخزين هذا التدفق المستمر من البيانات بطريقة آمنة وسهلة الاستخدام ومتوافقة مع البرامج المفتوحة المصدر، بينما تتيح خدمة Google Cloud Dataproc معالجة مجموعات البيانات الكبيرة هذه لتحليل البيانات باستخدام مجموعات Apache Spark وHadoop.

ما ستنشئه

في هذا الدرس التطبيقي حول الترميز، ستنشئ مسارًا لمعالجة بيانات إنترنت الأشياء باستخدام Google Managed Service for Apache Kafka وDataproc وPython وApache Spark لإجراء تحليلات في الوقت الفعلي. سيتضمّن مسار التعلّم ما يلي:

  • نشر البيانات من أجهزة إنترنت الأشياء إلى مجموعة Kafka مُدارة باستخدام الأجهزة الافتراضية على Google Compute Engine
  • نقل البيانات من مجموعة Manage Kafka إلى مجموعة Dataproc
  • معالجة البيانات باستخدام مهمة Spark Streaming في Dataproc

ما ستتعلمه

  • كيفية إنشاء مجموعات Google Managed Kafka وDataproc
  • كيفية تنفيذ مهام البث باستخدام Dataproc

المتطلبات

2. نظرة عامة

في هذا الدرس التطبيقي حول الترميز، سنتابع قصة شركة وهمية، وهي DemoIOT Solutions. توفّر DemoIOT Solutions أجهزة استشعار تقيس البيانات المتعلقة بدرجة الحرارة والرطوبة والضغط ومستوى الضوء والموقع الجغرافي وترسلها. ويريدون إعداد قنوات تعالج هذه البيانات لعرض إحصاءات في الوقت الفعلي لعملائهم. باستخدام مسارات التعلّم هذه، يمكنهم تقديم مجموعة متنوعة من الخدمات لعملائهم، مثل الرصد والاقتراحات المبرمَجة والتنبيهات والإحصاءات حول الأماكن التي ثبّت فيها العملاء أدوات الاستشعار.

لإجراء ذلك، سنستخدم جهازًا افتراضيًا على Google Compute Engine لمحاكاة جهاز إنترنت الأشياء. سينشر الجهاز البيانات في موضوع Kafka في مجموعة Google المُدارة من Kafka، وسيتم قراءة البيانات ومعالجتها من خلال مهمة بث في Dataproc. سيساعدك الإعداد المسبق والصفحات التالية في تنفيذ كل هذه الخطوات.

إعداد المتطلبات الأساسية

  1. ابحث عن اسم المشروع ورقمه. يمكنك الاطّلاع على العثور على اسم المشروع ورقمه ومعرّفه كمرجع.
  2. شبكة VPC الفرعية سيسمح ذلك بالربط بين الجهاز الافتراضي على GCE ومجموعة Kafka ومجموعة Dataproc. اتّبِع الخطوات الواردة في هذا الرابط لإدراج الشبكات الفرعية الحالية باستخدام gcloud CLI. إذا لزم الأمر، اتّبِع الخطوات الواردة في إنشاء شبكة سحابة افتراضية خاصة (VPC) في الوضع التلقائي، ما سيؤدي إلى إنشاء شبكة سحابة افتراضية خاصة (VPC) مع شبكة فرعية في كل منطقة من مناطق Google Cloud. ومع ذلك، لغرض هذا الدرس التطبيقي، سنستخدم شبكة فرعية من منطقة واحدة فقط.
  • في هذه الشبكة الفرعية، تأكَّد من توفُّر قاعدة جدار حماية تسمح بجميع عمليات الدخول من tcp:22، وهو بروتوكول النقل الآمن (SSH) المطلوب. ستكون هذه القاعدة متاحة للاختيار ضمن قسم "قواعد جدار الحماية" عند إنشاء شبكة، لذا احرص على اختيارها.
  1. حزمة GCS يجب أن يكون لديك إذن الوصول إلى حزمة Google Cloud Storage لتخزين موارد مهام Dataproc والاحتفاظ بالبيانات المعالَجة. إذا لم يكن لديك حساب، يمكنك إنشاء حساب في مشروعك على Google Cloud Platform.

ملء المتغيرات البيئية

في نافذة الوحدة الطرفية التي تشغّل فيها gcloud CLI، املأ متغيرات البيئة هذه حتى يمكن استخدامها لاحقًا.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

غيِّر القيم في السلسلة على الشكل التالي:

  • استبدِل <project-id> باسم مشروع Google Cloud الذي أعددته.
  • <project-number> باسم رقم المشروع من الخطوة 1 في المتطلبات الأساسية.
  • <region> مع اسم منطقة من المناطق والمناطق المتاحة التي تريد استخدامها. على سبيل المثال، يمكننا استخدام us-central1.
  • <zone> مع اسم المنطقة من المناطق والأقسام المتاحة ضمن المنطقة التي اخترتها سابقًا. على سبيل المثال، إذا اختَرت us-central1 كمنطقة، يمكنك استخدام us-central1-f كمنطقة. سيتم استخدام هذه المنطقة لإنشاء جهاز Google Compute Engine الافتراضي الذي يحاكي أجهزة إنترنت الأشياء. تأكَّد من أنّ المنطقة تقع في المنطقة الجغرافية التي اخترتها.
  • استبدِل <subnet-path> بالمسار الكامل للشبكة الفرعية من الخطوة 2 في قسم "المتطلبات الأساسية". يجب أن تكون القيمة بالتنسيق: projects/<project-id>/regions/<region>/subnetworks/<subnet-name>.
  • <bucket-name> مع اسم حزمة GCS من الخطوة 3 من المتطلبات الأساسية.

3- إعداد Google Managed Kafka

يُعدِّد هذا القسم خطوات إعداد مجموعة Google Managed Kafka، التي تنشر خادم Kafka، وتنشئ موضوعًا على هذا الخادم يمكن نشر بيانات إنترنت الأشياء فيه وقراءتها منه بعد الاشتراك فيه. يمكن لشركة DemoIOT Solutions إعداد هذه المجموعة لكي تنشر جميع أجهزتها البيانات إليها.

إنشاء مجموعة Managed Kafka

  • أنشئ مجموعة Managed Kafka. في هذا المثال، اسم المجموعة هو kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

من المفترض أن تتلقّى ردًا مشابهًا لما يلي:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

تستغرق عملية إنشاء المجموعة من 20 إلى 30 دقيقة. يُرجى الانتظار إلى أن تكتمل هذه العملية.

إنشاء موضوع

  • أنشئ موضوع Managed Kafka على المجموعة. في هذا المثال، اسم الموضوع هو kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

من المفترض أن تحصل على نتيجة مشابهة لما يلي:

Created topic [kafka-iot-topic].

4. إعداد حساب ناشر

للنشر في مجموعة Managed Kafka، علينا إعداد مثيل جهاز افتراضي على Google Compute Engine يمكنه الوصول إلى شبكة VPC التي تحتوي على الشبكة الفرعية المستخدَمة من قِبل مجموعة Managed Kafka. تحاكي هذه الآلة الافتراضية أجهزة الاستشعار التي توفّرها شركة DemoIOT Solutions.

الخطوات

  1. أنشئ مثيل الجهاز الافتراضي (VM) على Google Compute Engine. في هذا المثال، اسم جهاز GCE الافتراضي هو publisher-instance.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. امنح حساب الخدمة التلقائي في Google Compute Engine الأذونات اللازمة لاستخدام Managed Service for Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. استخدِم بروتوكول النقل الآمن (SSH) للاتصال بالجهاز الافتراضي. بدلاً من ذلك، يمكنك استخدام Google Cloud Console لتنفيذ بروتوكول النقل الآمن (SSH).
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. ثبِّت Java لتشغيل أدوات سطر أوامر Kafka، ونزِّل ملف Kafka الثنائي باستخدام هذه الأوامر.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. نزِّل مكتبة مصادقة Managed Kafka والموارد التابعة لها واضبط خصائص برنامج Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

لمزيد من التفاصيل حول إعداد جهاز الناشر، يُرجى الاطّلاع على مقالة إعداد جهاز عميل.

5- النشر على Managed Kafka

بعد إعداد الناشر، يمكننا استخدام سطر أوامر Kafka عليه لنشر بعض البيانات الوهمية من الجهاز الافتراضي على Google Compute Engine (محاكاة أجهزة إنترنت الأشياء من خلال DemoIOT Solutions) إلى مجموعة Kafka المُدارة.

  1. بما أنّنا سجّلنا الدخول إلى مثيل الجهاز الافتراضي في GCE باستخدام SSH، علينا إعادة ملء المتغيّر PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>

غيِّر القيم في السلسلة على الشكل التالي:

  • استبدِل <project-id> باسم مشروع Google Cloud الذي أعددته.
  • <region> مع المنطقة التي تم فيها إنشاء مجموعة Kafka
  1. استخدِم الأمر managed-kafka clusters describe للحصول على عنوان IP لخادم Kafka التمهيدي. يمكن استخدام هذا العنوان للربط بمجموعة Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. أدرِج المواضيع في المجموعة:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

من المفترض أن يظهر لك الناتج التالي الذي يتضمّن الموضوع kafka-iot-topic الذي أنشأناه سابقًا.

__remote_log_metadata
kafka-iot-topic
  1. انسخ هذا النص البرمجي والصقه في ملف جديد publish_iot_data.sh. لإنشاء ملف جديد على الجهاز الافتراضي في GCE، يمكنك استخدام أداة مثل vim أو nano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

الشرح

  • ينشئ هذا النص البرمجي رسائل JSON تتضمّن قراءات محاكاة من أجهزة الاستشعار تتضمّن رقم تعريف الجهاز والطابع الزمني وبيانات أجهزة الاستشعار (درجة الحرارة والرطوبة والضغط والضوء) ومعلومات عن الموقع الجغرافي للجهاز (خطوط الطول والعرض) وحالة الجهاز (البطارية والإشارة ونوع الاتصال) وبعض البيانات الوصفية.
  • تُنشئ هذه الأداة تدفقًا مستمرًا من الرسائل من عدد محدّد من الأجهزة الفريدة، ويرسل كل جهاز البيانات في فاصل زمني محدّد، ما يحاكي أجهزة إنترنت الأشياء (IoT) في العالم الحقيقي. في ما يلي، ننشر بيانات من 10 أجهزة تنتج 20 قراءة لكل جهاز، وذلك بفارق زمني يبلغ 10 ثوانٍ.
  • تنشر أيضًا جميع البيانات التي تم إنشاؤها في موضوع Kafka باستخدام أداة سطر الأوامر الخاصة بمنتج Kafka.
  1. ثبِّت بعض التبعيات التي يستخدمها النص البرمجي، مثل حزمة bc لإجراء العمليات الحسابية وحزمة jq لمعالجة JSON.
sudo apt-get install bc jq
  1. عدِّل النص البرمجي ليكون قابلاً للتنفيذ وشغِّله. من المفترض أن يستغرق تشغيله حوالي دقيقتين.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

يمكنك التأكّد من نشر الأحداث بنجاح من خلال تنفيذ هذا الأمر الذي سيؤدي إلى طباعة جميع الأحداث. اضغط على <control-c> للخروج.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. إعداد مجموعة Dataproc

ينشئ هذا القسم مجموعة Dataproc في الشبكة الفرعية لشبكة VPC التي تتوفّر فيها مجموعة Kafka المُدارة. سيتم استخدام هذه المجموعة لتشغيل المهام التي تنشئ الإحصاءات والإحصاءات في الوقت الفعلي التي تحتاج إليها DemoIOT Solutions.

  1. أنشئ مجموعة Dataproc. في هذا المثال، اسم المجموعة هو dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

من المفترض أن تتلقّى ردًا مشابهًا لما يلي:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

قد يستغرق إنشاء المجموعة من 10 إلى 15 دقيقة. انتظِر إلى أن تكتمل هذه العملية بنجاح وتأكَّد من أنّ المجموعة في حالة RUNNING من خلال وصف المجموعة.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. معالجة رسائل Kafka باستخدام Dataproc

في هذا القسم الأخير، سترسل مهمة Dataproc تعالج الرسائل المنشورة باستخدام Spark Streaming. تنشئ هذه المهمة في الواقع بعض الإحصاءات والرؤى في الوقت الفعلي التي يمكن أن تستخدمها حلول DemoIOT.

  1. نفِّذ هذا الأمر لإنشاء ملف مهمة PySpark للبث باسم process_iot.py محليًا.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

الشرح

  • يُعدّ هذا الرمز مهمة PySpark Structured Streaming لقراءة البيانات من موضوع Kafka محدّد. يستخدم عنوان بدء تشغيل خادم Kafka الذي تم توفيره وإعدادات Kafka التي تم تحميلها من ملف إعداد GCS للاتصال والتحقّق من الهوية باستخدام وسيط Kafka.
  • يقرأ أولاً البيانات الأولية من Kafka كتدفق لمصفوفات البايت، ويحوّل مصفوفات البايت هذه إلى سلاسل، ثم يطبّق json_schema باستخدام StructType في Spark لتحديد بنية البيانات (رقم تعريف الجهاز والطابع الزمني والموقع الجغرافي وبيانات جهاز الاستشعار وما إلى ذلك).
  • يطبع هذا الرمز أول 10 صفوف في وحدة التحكّم للمعاينة، ويحسب متوسط درجة الحرارة لكل أداة استشعار، ويكتب جميع البيانات في حزمة GCS بتنسيق avro. ‫Avro هو نظام تسلسل بيانات مستند إلى الصفوف يخزِّن البيانات المنظَّمة بكفاءة بتنسيق ثنائي مضغوط ومحدّد بواسطة المخطط، ويوفر إمكانية تطوير المخطط، والتوافق مع لغات متعددة، ومعدل ضغط مرتفع لمعالجة البيانات على نطاق واسع.
  1. أنشئ ملف client.properties واملأ متغيّر البيئة بعنوان bootstrap لخادم Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. حمِّل ملفات process_iot.py وclient.properties إلى حزمة Google Cloud Storage، كي يتمكّن مهمة Dataproc من استخدامها.
gsutil cp process_iot.py client.properties $BUCKET
  1. انسخ بعض ملفات JAR التابعة لمهمة Dataproc إلى حزمة GCS. يحتوي هذا الدليل على ملفات JAR المطلوبة لتشغيل مهام Spark Streaming باستخدام Kafka، ومكتبة مصادقة Managed Kafka وتبعياتها، مأخوذة من إعداد جهاز عميل.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. أرسِل مهمة Spark إلى مجموعة Dataproc.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

ستتم طباعة سجلات برنامج تشغيل Spark. من المفترض أن تتمكّن أيضًا من رؤية هذه الجداول المسجّلة في وحدة التحكّم والبيانات المخزّنة في حزمة GCS.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. تَنظيم

اتّبِع الخطوات لإخلاء مساحة الموارد بعد إكمال الدرس البرمجي.

  1. احذف مجموعة Managed Kafka وجهاز Publisher GCE الافتراضي ومجموعة Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. احذف الشبكة الفرعية وشبكة VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. احذف حزمة GCS إذا لم تعُد تريد استخدام البيانات.
gcloud storage rm --recursive $BUCKET

9- تهانينا

تهانينا، لقد أنشأت بنجاح مسار معالجة بيانات إنترنت الأشياء باستخدام Manage Kafka وDataproc، ما يساعد DemoIOT Solutions في الحصول على إحصاءات في الوقت الفعلي حول البيانات التي تنشرها أجهزتها.

لقد أنشأت مجموعة Kafka مُدارة، ونشرت أحداث إنترنت الأشياء فيها، وشغّلت مهمة Dataproc استخدمت بث Spark لمعالجة هذه الأحداث في الوقت الفعلي. أصبحت الآن على دراية بالخطوات الرئيسية المطلوبة لإنشاء مسارات نقل البيانات باستخدام Managed Kafka وDataproc.

المستندات المرجعية