1. Introducción

Última actualización: 10/06/2024
Información general
Los dispositivos de Internet de las cosas (IoT), que abarcan desde soluciones para casas inteligentes hasta sensores industriales, generan grandes cantidades de datos en el borde de la red. Estos datos son valiosos para una variedad de casos de uso, como la supervisión, el seguimiento, el diagnóstico, la vigilancia, la personalización, la optimización de flotas y mucho más. Google Managed Service para Apache Kafka ofrece una forma escalable y duradera de transferir y almacenar este flujo continuo de datos de una manera compatible con OSS, fácil de usar y segura, mientras que Google Cloud Dataproc permite procesar estos grandes conjuntos de datos para el análisis de datos con clústeres de Apache Spark y Hadoop.
Qué compilarás
En este codelab, crearás una canalización de procesamiento de datos de IoT con Google Managed Service para Apache Kafka, Dataproc, Python y Apache Spark que realice análisis en tiempo real. Tu canalización hará lo siguiente:
- Publica datos de dispositivos de IoT en un clúster de Kafka administrado con VMs de GCE
- Transmite datos del clúster de Kafka administrado a un clúster de Dataproc
- Procesa datos con un trabajo de Spark Streaming de Dataproc
Qué aprenderás
- Cómo crear clústeres de Google Managed Kafka y Dataproc
- Cómo ejecutar trabajos de transmisión con Dataproc
Requisitos
- Una cuenta de GCP activa con un proyecto configurado Si no tienes una, puedes registrarte para obtener una prueba gratuita.
- La CLI de gcloud está instalada y configurada. Puedes seguir las instrucciones para instalar gcloud CLI en tu SO.
- Las APIs habilitadas para Google Managed Kafka y Dataproc en tu proyecto de GCP
2. Descripción general
Para este codelab, sigamos la historia de una empresa ficticia, DemoIOT Solutions. DemoIOT Solutions proporciona dispositivos sensores que miden y transmiten datos de temperatura, humedad, presión, nivel de luz y ubicación. Quieren configurar canalizaciones que procesen estos datos para mostrar estadísticas en tiempo real a sus clientes. Con estas canalizaciones, pueden brindar una amplia variedad de servicios a sus clientes, como supervisión, sugerencias automatizadas, alertas y estadísticas sobre los lugares en los que los clientes instalaron sus sensores.
Para ello, usaremos una VM de GCE para simular el dispositivo IoT. El dispositivo publicará datos en un tema de Kafka en el clúster de Kafka administrado de Google, que un trabajo de transmisión de Dataproc leerá y procesará. La configuración de requisitos previos y las siguientes páginas te guiarán para realizar todos estos pasos.
Configuración de requisitos previos
- Busca el nombre y el número de tu proyecto. Consulta Cómo encontrar el nombre, el número y el ID del proyecto como referencia.
- Es la subred de VPC. Esto permitirá la conectividad entre la VM de GCE, el clúster de Kafka y el clúster de Dataproc. Sigue estos pasos para enumerar las subredes existentes con la CLI de gcloud. Si es necesario, sigue los pasos para crear una red de VPC en modo automático, lo que creará una red de VPC con una subred en cada región de Google Cloud. Sin embargo, para los fines de este codelab, solo usaremos una subred de una sola región.
- En esta subred, asegúrate de que haya una regla de firewall que permita todo el tráfico de entrada desde tcp:22, que es el SSH requerido. Esta regla estará disponible para seleccionarla en la sección Reglas de firewall cuando crees una red, así que asegúrate de seleccionarla.
- Es el bucket de GCS. Necesitarás acceso a un bucket de almacenamiento de Google Cloud para almacenar los recursos de los trabajos de Dataproc y conservar los datos procesados. Si no tienes uno, puedes crearlo en tu proyecto de GCP.
Propaga las variables de entorno
En la terminal en la que ejecutas la CLI de gcloud, completa estas variables de entorno para que se puedan usar más adelante.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Reemplaza lo siguiente:
<project-id>con el nombre del proyecto de GCP que configuraste.<project-number>por el nombre del número de proyecto del paso 1 de los requisitos previos.<region>con el nombre de una región de Regiones y zonas disponibles que desees usar. Por ejemplo, podemos usarus-central1.<zone>con el nombre de la zona de Regiones y zonas disponibles en la región que seleccionaste anteriormente Por ejemplo, si seleccionasteus-central1como región, puedes usarus-central1-fcomo zona. Esta zona se usará para crear la VM de GCE que simula dispositivos IoT. Asegúrate de que tu zona esté en la región que elegiste.<subnet-path>por la ruta completa de la subred del paso 2 de los requisitos previos. El valor debe tener el formatoprojects/<project-id>/regions/<region>/subnetworks/<subnet-name>.<bucket-name>por el nombre del bucket de GCS del paso 3 de los requisitos previos.
3. Configura Google Managed Kafka
En esta sección, se configura un clúster de Kafka administrado por Google, que implementa el servidor de Kafka y crea un tema en este servidor en el que se pueden publicar y leer los datos de IoT después de suscribirse a él. DemoIOT Solutions puede configurar este clúster para que todos sus dispositivos publiquen datos en él.
Crea un clúster de Kafka administrado
- Crea el clúster de Kafka administrado. Aquí, el nombre del clúster es
kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Deberías recibir una respuesta similar a la que figura a continuación:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
La creación del clúster tarda entre 20 y 30 minutos. Espera a que se complete esta operación.
Crea un tema
- Crea un tema de Kafka administrado en el clúster. Aquí, el nombre del tema es
kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
Deberías obtener un resultado similar al siguiente:
Created topic [kafka-iot-topic].
4. Cómo configurar un publicador
Para publicar en el clúster de Kafka administrado, configuramos una instancia de VM de Google Compute Engine que puede acceder a la VPC que contiene la subred que usa el clúster de Kafka administrado. Esta VM simula los dispositivos sensores que proporciona DemoIOT Solutions.
Pasos
- Crea la instancia de VM de Google Compute Engine. Aquí, el nombre de la VM de GCE es
publisher-instance.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Otorga a la cuenta de servicio predeterminada de Google Compute Engine los permisos para usar Managed Service for Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Usa SSH para conectarte a la VM. Como alternativa, usa la consola de Google Cloud para acceder a SSH.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Instala Java para ejecutar las herramientas de línea de comandos de Kafka y descarga el archivo binario de Kafka con estos comandos.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Descarga la biblioteca de autenticación de Kafka administrado y sus dependencias, y configura las propiedades del cliente de Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Para obtener más detalles sobre la configuración de la máquina del publicador, consulta Cómo configurar una máquina cliente.
5. Publica en Kafka administrado
Ahora que el publicador está configurado, podemos usar la línea de comandos de Kafka en él para publicar algunos datos de prueba desde la VM de GCE (simulando dispositivos de IoT de DemoIOT Solutions) en el clúster de Kafka administrado.
- Como establecimos una conexión SSH a la instancia de VM de GCE, debemos volver a completar la variable
PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>
Reemplaza lo siguiente:
<project-id>con el nombre del proyecto de GCP que configuraste.<region>con la región en la que se creó el clúster de Kafka
- Usa el comando
managed-kafka clusters describepara obtener la dirección IP del servidor de arranque de Kafka. Esta dirección se puede usar para conectarse al clúster de Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Enumera los temas del clúster:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Deberías ver el siguiente resultado, que contiene el tema kafka-iot-topic que creamos antes.
__remote_log_metadata
kafka-iot-topic
- Copia y pega este código en un archivo
publish_iot_data.shnuevo. Para crear un archivo nuevo en la VM de GCE, puedes usar una herramienta comovimonano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Explicación
- Esta secuencia de comandos crea mensajes JSON con lecturas de sensores simuladas que tienen ID de dispositivo, marca de tiempo, datos del sensor (temperatura, humedad, presión, luz), información de ubicación (latitud, longitud), estado del dispositivo (batería, señal, tipo de conexión) y algunos metadatos.
- Genera un flujo continuo de mensajes desde una cantidad determinada de dispositivos únicos, cada uno de los cuales envía datos en un intervalo de tiempo especificado, lo que simula dispositivos de IoT del mundo real. Aquí, publicamos datos de 10 dispositivos que producen 20 lecturas cada uno, con un intervalo de tiempo de 10 segundos.
- También publica todos los datos generados en el tema de Kafka con la herramienta de línea de comandos del productor de Kafka.
- Instala algunas dependencias que usa la secuencia de comandos: el paquete
bcpara cálculos matemáticos y el paquetejqpara el procesamiento de JSON.
sudo apt-get install bc jq
- Modifica la secuencia de comandos para que sea ejecutable y ejecútala. Debería tardar unos 2 minutos en ejecutarse.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Para verificar que los eventos se publicaron correctamente, ejecuta este comando, que imprimirá todos los eventos. Presiona <control-c> para salir.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Configura el clúster de Dataproc
En esta sección, se crea un clúster de Dataproc en la subred de VPC en la que se encuentra el clúster de Kafka administrado. Este clúster se usará para ejecutar trabajos que generen las estadísticas y las estadísticas en tiempo real que necesita DemoIOT Solutions.
- Crearás un clúster de Dataproc. Aquí, el nombre del clúster es
dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Deberías recibir una respuesta similar a la que figura a continuación:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
La creación del clúster puede tardar entre 10 y 15 minutos. Espera a que se complete correctamente esta operación y verifica que el clúster esté en estado RUNNING describiéndolo.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Procesa mensajes de Kafka con Dataproc
En esta última sección, enviarás un trabajo de Dataproc que procese los mensajes publicados con Spark Streaming. Este trabajo genera estadísticas y análisis en tiempo real que DemoIOT Solutions puede usar.
- Ejecuta este comando para crear el archivo de trabajo de PySpark de transmisión llamado
process_iot.pyde forma local.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Explicación
- Este código configura un trabajo de transmisión estructurada de PySpark para leer datos de un tema de Kafka especificado. Utiliza la dirección de arranque del servidor de Kafka proporcionada y los parámetros de configuración de Kafka cargados desde un archivo de configuración de GCS para conectarse y autenticarse con el agente de Kafka.
- Primero, lee los datos sin procesar de Kafka como un flujo de arrays de bytes, convierte esos arrays de bytes en cadenas y aplica
json_schemacon StructType de Spark para especificar la estructura de los datos (ID del dispositivo, marca de tiempo, ubicación, datos del sensor, etcétera). - Imprime las primeras 10 filas en la consola para obtener una vista previa, calcula la temperatura promedio por sensor y escribe todos los datos en el bucket de GCS en formato
avro. Avro es un sistema de serialización de datos basado en filas que almacena de forma eficiente datos estructurados en un formato binario compacto definido por el esquema, y ofrece evolución del esquema, neutralidad del lenguaje y alta compresión para el procesamiento de datos a gran escala.
- Crea el archivo
client.propertiesy completa la variable de entorno para la dirección de arranque del servidor de Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Sube los archivos
process_iot.pyyclient.propertiesa tu bucket de Google Cloud Storage para que el trabajo de Dataproc pueda usarlos.
gsutil cp process_iot.py client.properties $BUCKET
- Copia algunos archivos .jar de dependencia para el trabajo de Dataproc en tu bucket de GCS. Este directorio contiene los archivos .jar necesarios para ejecutar trabajos de Spark Streaming con Kafka, y la biblioteca de autenticación de Kafka administrado y sus dependencias, tomados de Cómo configurar una máquina cliente.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Envía el trabajo de Spark al clúster de Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Se imprimirán los registros del controlador de Spark. También deberías poder ver estas tablas registradas en la consola y los datos almacenados en tu bucket de GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Limpia
Sigue los pasos para limpiar los recursos después de completar el codelab.
- Borra el clúster de Kafka administrado, la VM de GCE del publicador y el clúster de Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Borra tu subred y red de VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Borra tu bucket de GCS si ya no quieres usar los datos.
gcloud storage rm --recursive $BUCKET
9. Felicitaciones
¡Felicitaciones! Creaste correctamente una canalización de procesamiento de datos de IoT con Managed Service for Apache Kafka y Dataproc que ayuda a DemoIOT Solutions a obtener estadísticas en tiempo real sobre los datos que publican sus dispositivos.
Creaste un clúster de Kafka administrado, publicaste eventos de IoT en él y ejecutaste un trabajo de Dataproc que usó la transmisión de Spark para procesar estos eventos en tiempo real. Ahora conoces los pasos clave necesarios para crear canalizaciones de datos con Managed Kafka y Dataproc.