1. Einführung

Zuletzt aktualisiert: 10.06.2024
Hintergrund
Geräte aus dem Internet der Dinge (Internet of Things, IoT), von Smart-Home-Lösungen bis hin zu industriellen Sensoren, generieren riesige Datenmengen am Netzwerkrand. Diese Daten sind für eine Vielzahl von Anwendungsfällen von unschätzbarem Wert, z. B. für die Geräteüberwachung, das Tracking, die Diagnose, die Überwachung, die Personalisierung und die Flottenoptimierung. Google Managed Service for Apache Kafka bietet eine skalierbare und zuverlässige Möglichkeit, diesen kontinuierlichen Datenstrom auf OSS-kompatible, benutzerfreundliche und sichere Weise aufzunehmen und zu speichern. Mit Google Cloud Dataproc können diese großen Datasets für die Datenanalyse mit Apache Spark- und Hadoop-Clustern verarbeitet werden.
Umfang
In diesem Codelab erstellen Sie eine IoT-Datenverarbeitungspipeline mit Google Managed Service for Apache Kafka, Dataproc, Python und Apache Spark, die Echtzeitanalysen durchführt. Ihre Pipeline:
- Daten von IoT-Geräten in einem verwalteten Kafka-Cluster mit GCE-VMs veröffentlichen
- Daten aus dem Managed Kafka-Cluster in einen Dataproc-Cluster streamen
- Daten mit einem Dataproc Spark Streaming-Job verarbeiten
Lerninhalte
- Google Managed Kafka- und Dataproc-Cluster erstellen
- Streamingjobs mit Dataproc ausführen
Voraussetzungen
- Ein aktives GCP-Konto mit einem eingerichteten Projekt. Wenn Sie noch kein Konto haben, können Sie sich für einen kostenlosen Testzeitraum registrieren.
- Die gcloud CLI ist installiert und konfiguriert. Folgen Sie der Anleitung zum Installieren der gcloud CLI auf Ihrem Betriebssystem.
- Aktivierte APIs für Google Managed Kafka und Dataproc in Ihrem GCP-Projekt.
2. Übersicht
In diesem Codelab folgen wir der Geschichte eines fiktiven Unternehmens namens DemoIOT Solutions. DemoIOT Solutions bietet Sensoren an, die Daten zu Temperatur, Luftfeuchtigkeit, Druck, Lichtstärke und Standort messen und übertragen. Sie möchten Pipelines einrichten, mit denen diese Daten verarbeitet werden, um ihren Kunden Statistiken in Echtzeit zu präsentieren. Mithilfe solcher Pipelines können sie ihren Kunden eine Vielzahl von Diensten anbieten, z. B. Monitoring, automatisierte Vorschläge, Benachrichtigungen und Statistiken zu Orten, an denen die Kunden ihre Sensoren installiert haben.
Dazu verwenden wir eine GCE-VM, um das IoT-Gerät zu simulieren. Das Gerät veröffentlicht Daten in einem Kafka-Thema im Google Managed Kafka-Cluster, die von einem Dataproc-Streamingjob gelesen und verarbeitet werden. Auf den folgenden Seiten werden Sie durch alle diese Schritte geführt.
Vorbereitende Einrichtung
- Suchen Sie den Projektnamen und die Projektnummer für Ihr Projekt. Weitere Informationen finden Sie unter Projektname, ‑nummer und ‑ID ermitteln.
- VPC-Subnetzwerk. Dadurch wird die Verbindung zwischen der GCE-VM, dem Kafka-Cluster und dem Dataproc-Cluster ermöglicht. Hier finden Sie Informationen zum Auflisten vorhandener Subnetze mit der gcloud CLI. Falls erforderlich, erstellen Sie ein VPC-Netzwerk im automatischen Modus. Dadurch wird ein VPC-Netzwerk mit einem Subnetz in jeder Google Cloud-Region erstellt. Für dieses Codelab verwenden wir jedoch nur ein Subnetzwerk aus einer einzelnen Region.
- Prüfen Sie, ob in diesem Subnetzwerk eine Firewallregel vorhanden ist, die den gesamten eingehenden Traffic von tcp:22 zulässt. Dies ist für SSH erforderlich. Diese Regel kann beim Erstellen eines Netzwerks im Bereich „Firewallregeln“ ausgewählt werden. Achten Sie darauf, dass Sie sie auswählen.
- GCS-Bucket. Sie benötigen Zugriff auf einen Google Cloud Storage-Bucket, um Dataproc-Jobressourcen zu speichern und verarbeitete Daten beizubehalten. Wenn Sie keine haben, können Sie eine in Ihrem GCP-Projekt erstellen.
Umgebungsvariablen einfügen
Füllen Sie diese Umgebungsvariablen in Ihrem Terminal, in dem Sie die gcloud CLI ausführen, so aus, dass sie später verwendet werden können.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Ersetzen Sie Folgendes:
- Ersetzen Sie
<project-id>durch den Namen des GCP-Projekts, das Sie eingerichtet haben. <project-number>durch den Namen der Projektnummer aus Schritt 1 der Voraussetzungen.<region>durch den Namen einer Region aus Verfügbare Regionen und Zonen, die Sie verwenden möchten. Wir können beispielsweiseus-central1verwenden.- Ersetzen Sie
<zone>durch den Namen der Zone aus Verfügbare Regionen und Zonen unter der zuvor ausgewählten Region. Wenn Sie beispielsweiseus-central1als Region ausgewählt haben, können Sieus-central1-fals Zone verwenden. In dieser Zone wird die GCE-VM erstellt, die IoT-Geräte simuliert. Achten Sie darauf, dass sich Ihre Zone in der von Ihnen ausgewählten Region befindet. <subnet-path>durch den vollständigen Pfad des Subnetzes aus Voraussetzung 2. Der Wert muss das Formatprojects/<project-id>/regions/<region>/subnetworks/<subnet-name>haben.<bucket-name>durch den Namen des GCS-Buckets aus Schritt 3 der Voraussetzungen.
3. Von Google verwaltetes Kafka einrichten
In diesem Abschnitt wird ein von Google verwalteter Kafka-Cluster eingerichtet, in dem der Kafka-Server bereitgestellt wird. Außerdem wird auf diesem Server ein Thema erstellt, in dem die IoT-Daten veröffentlicht und nach dem Abonnieren gelesen werden können. DemoIOT Solutions kann diesen Cluster so einrichten, dass alle Geräte Daten darin veröffentlichen.
Managed Kafka-Cluster erstellen
- Erstellen Sie den Managed Kafka-Cluster. Hier lautet der Name des Clusters
kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Sie sollten eine Antwort ähnlich der folgenden erhalten:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
Die Clustererstellung dauert 20 bis 30 Minuten. Warten Sie, bis dieser Vorgang abgeschlossen ist.
Thema erstellen
- Erstellen Sie ein verwaltetes Kafka-Thema im Cluster. Hier lautet der Name des Themas
kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Die Ausgabe sollte in etwa so aussehen:
Created topic [kafka-iot-topic].
4. Publisher einrichten
Um im verwalteten Kafka-Cluster zu veröffentlichen, richten wir eine Google Compute Engine-VM-Instanz ein, die auf die VPC zugreifen kann, die das vom verwalteten Kafka-Cluster verwendete Subnetz enthält. Diese VM simuliert die von DemoIOT Solutions bereitgestellten Sensoren.
Schritte
- Erstellen Sie die Google Compute Engine-VM-Instanz. Hier ist
publisher-instanceder Name der GCE-VM.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Weisen Sie dem Google Compute Engine-Standarddienstkonto die Berechtigungen zur Verwendung von Managed Service for Apache Kafka zu.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Stellen Sie über SSH eine Verbindung zur VM her. Alternativ können Sie die Google Cloud Console für SSH verwenden.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Installieren Sie Java, um die Kafka-Befehlszeilentools auszuführen, und laden Sie die Kafka-Binärdatei mit diesen Befehlen herunter.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Laden Sie die Authentifizierungsbibliothek für Managed Kafka und ihre Abhängigkeiten herunter und konfigurieren Sie die Kafka-Clientattribute.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Weitere Informationen zur Einrichtung des Verlagscomputers finden Sie unter Clientcomputer einrichten.
5. In Managed Kafka veröffentlichen
Nachdem der Publisher eingerichtet ist, können wir die Kafka-Befehlszeile verwenden, um einige Testdaten von der GCE-VM (die IoT-Geräte von DemoIOT Solutions simuliert) im Managed Kafka-Cluster zu veröffentlichen.
- Da wir eine SSH-Verbindung zur GCE-VM-Instanz hergestellt haben, müssen wir die Variable
PROJECT_IDneu belegen:
export PROJECT_ID=<project-id>
export REGION=<region>
Ersetzen Sie Folgendes:
- Ersetzen Sie
<project-id>durch den Namen des GCP-Projekts, das Sie eingerichtet haben. <region>durch die Region, in der der Kafka-Cluster erstellt wurde.
- Verwenden Sie den Befehl
managed-kafka clusters describe, um die IP-Adresse des Kafka-Bootstrapservers abzurufen. Diese Adresse kann verwendet werden, um eine Verbindung zum Kafka-Cluster herzustellen.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Lassen Sie die Themen im Cluster auflisten:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Sie sollten die folgende Ausgabe sehen, die das zuvor erstellte Thema kafka-iot-topic enthält.
__remote_log_metadata
kafka-iot-topic
- Kopieren Sie dieses Skript und fügen Sie es in eine neue Datei
publish_iot_data.shein. Wenn Sie eine neue Datei auf der GCE-VM erstellen möchten, können Sie ein Tool wievimodernanoverwenden.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Erklärung
- Dieses Skript erstellt JSON-Nachrichten mit simulierten Sensorwerten, die Geräte-ID, Zeitstempel, Sensordaten (Temperatur, Luftfeuchtigkeit, Druck, Licht), Standortinformationen (Breitengrad, Längengrad), Gerätestatus (Akku, Signal, Verbindungstyp) und einige Metadaten enthalten.
- Es wird ein kontinuierlicher Nachrichtenfluss von einer bestimmten Anzahl eindeutiger Geräte generiert, die jeweils Daten in einem bestimmten Zeitintervall senden. So werden IoT-Geräte in der realen Welt simuliert. Hier veröffentlichen wir Daten von 10 Geräten, die jeweils 20 Messwerte in einem Zeitintervall von 10 Sekunden liefern.
- Außerdem werden alle generierten Daten mit dem Kafka-Befehlszeilentool für Producer im Kafka-Thema veröffentlicht.
- Installieren Sie einige Abhängigkeiten, die vom Script verwendet werden: das
bc-Paket für mathematische Berechnungen und dasjq-Paket für die JSON-Verarbeitung.
sudo apt-get install bc jq
- Ändern Sie das Skript so, dass es ausführbar ist, und führen Sie es aus. Die Ausführung dauert etwa 2 Minuten.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Sie können prüfen, ob die Ereignisse erfolgreich veröffentlicht wurden, indem Sie diesen Befehl ausführen, der alle Ereignisse ausgibt. Drücke zum Beenden <control-c>.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Dataproc-Cluster einrichten
In diesem Abschnitt wird ein Dataproc-Cluster im VPC-Subnetzwerk erstellt, in dem sich der Managed Kafka-Cluster befindet. In diesem Cluster werden Jobs ausgeführt, mit denen die Echtzeitstatistiken und ‑informationen generiert werden, die für DemoIOT Solutions erforderlich sind.
- Einen Dataproc-Cluster erstellen Hier lautet der Name des Clusters
dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Sie sollten eine Antwort ähnlich der folgenden erhalten:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
Die Erstellung eines Clusters kann 10 bis 15 Minuten dauern. Warten Sie, bis dieser Vorgang abgeschlossen ist, und prüfen Sie, ob der Cluster den Status RUNNING hat.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Kafka-Nachrichten mit Dataproc verarbeiten
In diesem letzten Abschnitt senden Sie einen Dataproc-Job, der die veröffentlichten Nachrichten mit Spark Streaming verarbeitet. Mit diesem Job werden Echtzeitstatistiken und ‑informationen generiert, die von DemoIOT Solutions verwendet werden können.
- Führen Sie diesen Befehl aus, um die lokale Streaming-PySpark-Jobdatei mit dem Namen
process_iot.pyzu erstellen.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Erklärung
- Mit diesem Code wird ein PySpark Structured Streaming-Job eingerichtet, um Daten aus einem angegebenen Kafka-Thema zu lesen. Dazu werden die angegebene Bootstrap-Adresse des Kafka-Servers und die Kafka-Konfigurationen verwendet, die aus einer GCS-Konfigurationsdatei geladen werden, um eine Verbindung zum Kafka-Broker herzustellen und sich dort zu authentifizieren.
- Zuerst werden die Rohdaten aus Kafka als Stream von Byte-Arrays gelesen. Diese Byte-Arrays werden in Strings umgewandelt und
json_schemawird mit dem StructType von Spark angewendet, um die Struktur der Daten anzugeben (Geräte-ID, Zeitstempel, Standort, Sensordaten usw.). - Die ersten 10 Zeilen werden zur Vorschau in der Konsole ausgegeben, die durchschnittliche Temperatur pro Sensor wird berechnet und alle Daten werden im
avro-Format in den GCS-Bucket geschrieben. Avro ist ein zeilenbasiertes Daten-Serialisierungssystem, mit dem strukturierte Daten effizient in einem kompakten, schemadefinierten Binärformat gespeichert werden. Es bietet Schemaentwicklung, Sprachneutralität und hohe Komprimierung für die Verarbeitung großer Datenmengen.
- Erstellen Sie die Datei
client.propertiesund füllen Sie die Umgebungsvariable für die Bootstrap-Adresse des Kafka-Servers aus.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Laden Sie die Dateien
process_iot.pyundclient.propertiesin Ihren Google Cloud Storage-Bucket hoch, damit sie vom Dataproc-Job verwendet werden können.
gsutil cp process_iot.py client.properties $BUCKET
- Kopieren Sie einige JAR-Dateien für Abhängigkeiten für den Dataproc-Job in Ihren GCS-Bucket. Dieses Verzeichnis enthält JAR-Dateien, die zum Ausführen von Spark Streaming-Jobs mit Kafka erforderlich sind, sowie die Managed Kafka-Authentifizierungsbibliothek und ihre Abhängigkeiten aus Clientcomputer einrichten.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Senden Sie den Spark-Job an den Dataproc-Cluster.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Die Spark-Treiberlogs werden ausgegeben. Sie sollten diese Tabellen auch in der Konsole protokolliert und Daten in Ihrem GCS-Bucket gespeichert sehen.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Bereinigen
Folgen Sie der Anleitung, um Ressourcen nach Abschluss des Codelabs zu bereinigen.
- Löschen Sie den Managed Kafka-Cluster, die Publisher-GCE-VM und den Dataproc-Cluster.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Löschen Sie Ihr VPC-Subnetz und ‑Netzwerk.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Löschen Sie Ihren GCS-Bucket, wenn Sie die Daten nicht mehr verwenden möchten.
gcloud storage rm --recursive $BUCKET
9. Glückwunsch
Herzlichen Glückwunsch! Sie haben eine IoT-Datenverarbeitungs-Pipeline mit Managed Kafka und Dataproc erstellt, mit der DemoIOT Solutions Echtzeitinformationen zu den von ihren Geräten veröffentlichten Daten erhalten.
Sie haben einen Managed Kafka-Cluster erstellt, IoT-Ereignisse darin veröffentlicht und einen Dataproc-Job ausgeführt, der Spark-Streaming verwendet hat, um diese Ereignisse in Echtzeit zu verarbeiten. Sie kennen jetzt die wichtigsten Schritte, die zum Erstellen von Datenpipelines mit Managed Kafka und Dataproc erforderlich sind.