1. Introduction

Dernière mise à jour : 10/06/2024
Contexte
Les appareils IoT (Internet des objets), qui vont des solutions pour la maison connectée aux capteurs industriels, génèrent d'énormes quantités de données en périphérie du réseau. Ces données sont précieuses pour de nombreux cas d'utilisation, comme la surveillance, le suivi, les diagnostics, la surveillance, la personnalisation, l'optimisation de la flotte et bien d'autres. Google Managed Service pour Apache Kafka offre un moyen évolutif et durable d'ingérer et de stocker ce flux continu de données de manière compatible avec l'OSS, facile à utiliser et sécurisée. Google Cloud Dataproc permet de traiter ces grands ensembles de données pour l'analyse des données à l'aide de clusters Apache Spark et Hadoop.
Objectifs de l'atelier
Dans cet atelier de programmation, vous allez créer un pipeline de traitement des données IoT à l'aide de Google Managed Service pour Apache Kafka, Dataproc, Python et Apache Spark, qui effectue des analyses en temps réel. Votre pipeline :
- Publier des données provenant d'appareils IoT dans un cluster Kafka géré à l'aide de VM GCE
- Transférer des données du cluster Managed Kafka vers un cluster Dataproc
- Traiter des données à l'aide d'un job Dataproc Spark Streaming
Points abordés
- Créer des clusters Google Managed Kafka et Dataproc
- Exécuter des tâches de streaming à l'aide de Dataproc
Prérequis
- Un compte GCP actif avec un projet configuré. Si vous n'en avez pas, vous pouvez vous inscrire à un essai sans frais.
- gcloud CLI installée et configurée. Vous pouvez suivre les instructions pour installer gcloud CLI sur votre OS.
- Les API Google Managed Kafka et Dataproc sont activées dans votre projet GCP.
2. Présentation
Pour cet atelier de programmation, nous allons suivre l'histoire d'une entreprise fictive, DemoIOT Solutions. DemoIOT Solutions propose des capteurs qui mesurent et transmettent des données sur la température, l'humidité, la pression, la luminosité et la localisation. Ils souhaitent configurer des pipelines qui traitent ces données pour afficher des statistiques en temps réel à leurs clients. Grâce à ces pipelines, ils peuvent fournir une grande variété de services à leurs clients, comme la surveillance, des suggestions automatisées, des alertes et des insights sur les lieux où les clients ont installé leurs capteurs.
Pour ce faire, nous utiliserons une VM GCE pour simuler l'appareil IoT. L'appareil publiera des données dans un sujet Kafka du cluster Google Managed Kafka, qui seront lues et traitées par un job de flux de données Dataproc. La configuration des prérequis et les pages suivantes vous guideront pour effectuer toutes ces étapes.
Configuration requise
- Recherchez le nom et le numéro de votre projet. Pour en savoir plus, consultez Trouver le nom, le numéro et l'ID du projet.
- Sous-réseau VPC. Cela permettra la connectivité entre la VM GCE, le cluster Kafka et le cluster Dataproc. Suivez cette procédure pour lister les sous-réseaux existants à l'aide de la gcloud CLI. Si nécessaire, suivez la procédure Créer un réseau VPC en mode automatique. Vous créerez ainsi un réseau VPC avec un sous-réseau dans chaque région Google Cloud. Toutefois, pour les besoins de cet atelier de programmation, nous n'utiliserons qu'un sous-réseau d'une seule région.
- Dans ce sous-réseau, assurez-vous qu'il existe une règle de pare-feu autorisant toutes les entrées à partir de tcp:22, qui est requis pour SSH. Cette règle sera disponible dans la section "Règles de pare-feu" lorsque vous créerez un réseau. Assurez-vous de la sélectionner.
- Bucket GCS. Vous aurez besoin d'accéder à un bucket de stockage Google Cloud pour stocker les ressources des jobs Dataproc et conserver les données traitées. Si vous n'en avez pas, vous pouvez en créer un dans votre projet GCP.
Remplir les variables d'environnement
Dans le terminal où vous exécutez la gcloud CLI, renseignez ces variables d'environnement pour pouvoir les utiliser ultérieurement.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Remplacez les éléments suivants :
<project-id>par le nom du projet GCP que vous avez configuré.<project-number>par le nom du numéro de projet de l'étape 1 des prérequis.<region>par le nom d'une région de la page Régions et zones disponibles que vous souhaitez utiliser. Par exemple, nous pouvons utiliserus-central1.<zone>par le nom de la zone de la section Régions et zones disponibles, sous la région que vous avez sélectionnée précédemment. Par exemple, si vous avez sélectionnéus-central1comme région, vous pouvez utiliserus-central1-fcomme zone. Cette zone sera utilisée pour créer la VM GCE qui simule les appareils IoT. Assurez-vous que votre zone se trouve dans la région que vous avez choisie.<subnet-path>par le chemin d'accès complet du sous-réseau de l'étape préalable 2. La valeur doit être au formatprojects/<project-id>/regions/<region>/subnetworks/<subnet-name>.<bucket-name>par le nom du bucket GCS de l'étape 3 des prérequis.
3. Configurer Google Managed Kafka
Cette section configure un cluster Google Managed Kafka, qui déploie le serveur Kafka et crée un sujet sur ce serveur où les données IoT peuvent être publiées et lues après l'abonnement. DemoIOT Solutions peut configurer ce cluster pour que tous ses appareils y publient des données.
Créer un cluster Managed Kafka
- Créez le cluster Kafka géré. Ici, le nom du cluster est
kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Vous devriez obtenir un résultat semblable à celui-ci :
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
La création du cluster prend entre 20 et 30 minutes. Attendez la fin de cette opération.
Créer un sujet
- Créez un sujet Kafka géré sur le cluster. Ici, le nom du sujet est
kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Vous devriez obtenir un résultat semblable à celui-ci :
Created topic [kafka-iot-topic].
4. Configurer un éditeur
Pour publier sur le cluster Kafka géré, nous configurons une instance de VM Google Compute Engine pouvant accéder au VPC contenant le sous-réseau utilisé par le cluster Kafka géré. Cette VM simule les capteurs fournis par DemoIOT Solutions.
Étapes
- Créez l'instance de VM Google Compute Engine. Ici, le nom de la VM GCE est
publisher-instance.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Accordez au compte de service Compute Engine par défaut les autorisations nécessaires pour utiliser Managed Service pour Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Utilisez SSH pour vous connecter à la VM. Vous pouvez également utiliser la console Google Cloud pour SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Installez Java pour exécuter les outils de ligne de commande Kafka, puis téléchargez le binaire Kafka à l'aide de ces commandes.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Téléchargez la bibliothèque d'authentification Managed Kafka et ses dépendances, puis configurez les propriétés du client Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Pour en savoir plus sur la configuration de la machine de l'éditeur, consultez Configurer une machine cliente.
5. Publier sur Managed Kafka
Maintenant que l'éditeur est configuré, nous pouvons utiliser la ligne de commande Kafka pour publier des données fictives à partir de la VM GCE (simulant des appareils IoT par DemoIOT Solutions) sur le cluster Managed Kafka.
- Étant donné que nous nous sommes connectés en SSH à l'instance de VM GCE, nous devons remplir à nouveau la variable
PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>
Remplacez les éléments suivants :
<project-id>par le nom du projet GCP que vous avez configuré.<region>par la région dans laquelle le cluster Kafka a été créé.
- Utilisez la commande
managed-kafka clusters describepour obtenir l'adresse IP du serveur d'amorçage Kafka. Cette adresse peut être utilisée pour se connecter au cluster Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Affichez la liste des thèmes du cluster :
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Vous devriez voir le résultat suivant, qui contient le sujet kafka-iot-topic que nous avons créé précédemment.
__remote_log_metadata
kafka-iot-topic
- Copiez et collez ce script dans un nouveau fichier
publish_iot_data.sh. Pour créer un fichier sur la VM GCE, vous pouvez utiliser un outil tel quevimounano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Explication
- Ce script crée des messages JSON avec des relevés de capteurs simulés qui contiennent l'ID de l'appareil, le code temporel, les données des capteurs (température, humidité, pression, luminosité), les informations de localisation (latitude, longitude), l'état de l'appareil (batterie, signal, type de connexion) et certaines métadonnées.
- Il génère un flux continu de messages à partir d'un nombre défini d'appareils uniques, chacun envoyant des données à un intervalle de temps spécifié, imitant les appareils IoT réels. Ici, nous publions les données de 10 appareils qui produisent chacun 20 lectures, à un intervalle de temps de 10 secondes.
- Il publie également toutes les données générées dans le sujet Kafka à l'aide de l'outil de ligne de commande Kafka Producer.
- Installez certaines dépendances utilisées par le script : le package
bcpour les calculs mathématiques et le packagejqpour le traitement JSON.
sudo apt-get install bc jq
- Modifiez le script pour le rendre exécutable, puis exécutez-le. L'exécution devrait prendre environ deux minutes.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Vous pouvez vérifier que les événements ont bien été publiés en exécutant cette commande, qui affichera tous les événements. Appuyez sur <control-c> pour quitter.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Configurer le cluster Dataproc
Cette section crée un cluster Dataproc dans le sous-réseau VPC où se trouve le cluster Kafka géré. Ce cluster sera utilisé pour exécuter des jobs qui génèrent les statistiques et les insights en temps réel dont DemoIOT Solutions a besoin.
- Créer un cluster Dataproc Ici, le nom du cluster est
dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Vous devriez obtenir un résultat semblable à celui-ci :
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
La création du cluster peut prendre entre 10 et 15 minutes. Attendez que cette opération soit terminée et vérifiez que le cluster est à l'état RUNNING en décrivant le cluster.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Traiter des messages Kafka à l'aide de Dataproc
Dans cette dernière section, vous allez envoyer un job Dataproc qui traite les messages publiés à l'aide de Spark Streaming. Ce job génère des statistiques et des insights en temps réel qui peuvent être utilisés par DemoIOT Solutions.
- Exécutez cette commande pour créer localement le fichier de job PySpark de streaming appelé
process_iot.py.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Explication
- Ce code configure un job de streaming structuré PySpark pour lire les données d'un sujet Kafka spécifié. Il utilise l'adresse d'amorçage du serveur Kafka fournie et les configurations Kafka chargées à partir d'un fichier de configuration GCS pour se connecter à l'agent Kafka et s'authentifier auprès de celui-ci.
- Il lit d'abord les données brutes de Kafka sous forme de flux de tableaux d'octets, puis convertit ces tableaux d'octets en chaînes et applique
json_schemaà l'aide de StructType de Spark pour spécifier la structure des données (ID de l'appareil, code temporel, emplacement, données de capteur, etc.). - Il affiche les 10 premières lignes dans la console pour l'aperçu, calcule la température moyenne par capteur et écrit toutes les données dans le bucket GCS au format
avro. Avro est un système de sérialisation de données basé sur des lignes qui stocke efficacement les données structurées dans un format binaire compact défini par un schéma. Il offre une évolution du schéma, une neutralité du langage et une compression élevée pour le traitement des données à grande échelle.
- Créez le fichier
client.propertieset renseignez la variable d'environnement pour l'adresse d'amorçage du serveur Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Importez les fichiers
process_iot.pyetclient.propertiesdans votre bucket Google Cloud Storage pour qu'ils puissent être utilisés par le job Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- Copiez des fichiers JAR de dépendance pour le job Dataproc dans votre bucket GCS. Ce répertoire contient les fichiers JAR nécessaires à l'exécution des jobs Spark Streaming avec Kafka, ainsi que la bibliothèque d'authentification Managed Kafka et ses dépendances, extraites de Configurer une machine cliente.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Envoyez la tâche Spark au cluster Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Les journaux du pilote Spark s'affichent. Vous devriez également voir ces tables enregistrées dans la console et les données stockées dans votre bucket GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Effectuer un nettoyage
Suivez les étapes pour nettoyer les ressources une fois l'atelier de programmation terminé.
- Supprimez le cluster Kafka géré, la VM GCE de l'éditeur et le cluster Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Supprimez votre sous-réseau et votre réseau VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Supprimez votre bucket GCS si vous ne souhaitez plus utiliser les données.
gcloud storage rm --recursive $BUCKET
9. Félicitations
Félicitations ! Vous avez réussi à créer un pipeline de traitement des données IoT avec Managed Service pour Apache Kafka et Dataproc. Il permet à DemoIOT Solutions d'obtenir des insights en temps réel sur les données publiées par ses appareils.
Vous avez créé un cluster Kafka géré, y avez publié des événements IoT et avez exécuté un job Dataproc qui utilisait le streaming Spark pour traiter ces événements en temps réel. Vous connaissez désormais les principales étapes nécessaires pour créer des pipelines de données à l'aide de Managed Kafka et Dataproc.