1. Введение

Последнее обновление: 10.06.2024
Фон
Устройства Интернета вещей (IoT), от решений для умного дома до промышленных датчиков, генерируют огромные объемы данных на периферии сети. Эти данные бесценны для множества задач, таких как мониторинг устройств, отслеживание, диагностика, наблюдение, персонализация, оптимизация автопарка и многое другое. Google Managed Service for Apache Kafka предлагает масштабируемый и надежный способ приема и хранения этого непрерывного потока данных в совместимом с открытым исходным кодом, простом в использовании и безопасном формате, а Google Cloud Dataproc позволяет обрабатывать эти большие наборы данных для анализа данных с использованием кластеров Apache Spark и Hadoop.
Что вы построите
В этом практическом занятии вы создадите конвейер обработки данных для Интернета вещей, используя управляемые сервисы Google для Apache Kafka, Dataproc, Python и Apache Spark, который будет выполнять аналитику в реальном времени. Ваш конвейер будет:
- Публикация данных с IoT-устройств в управляемый кластер Kafka с использованием виртуальных машин GCE.
- Передача данных из кластера Manage Kafka в кластер Dataproc.
- Обработка данных с помощью задания Dataproc Spark Streaming.
Что вы узнаете
- Как создать кластеры Kafka и Dataproc, управляемые Google.
- Как запускать потоковые задания с помощью Dataproc
Что вам понадобится
- Для работы вам потребуется активный аккаунт GCP с настроенным проектом. Если у вас его нет, вы можете зарегистрироваться для бесплатной пробной версии .
- Интерфейс командной строки gcloud установлен и настроен. Вы можете следовать инструкциям по установке интерфейса командной строки gcloud в вашей операционной системе .
- Включите API для Google Managed Kafka и Dataproc в вашем проекте GCP.
2. Обзор
В рамках этого практического занятия рассмотрим историю вымышленной компании DemoIOT Solutions. DemoIOT Solutions предоставляет сенсорные устройства, которые измеряют и передают данные о температуре, влажности, давлении, уровне освещенности и местоположении. Компания хочет создать конвейеры обработки этих данных для отображения статистики в реальном времени своим клиентам. Используя такие конвейеры, она сможет предоставлять своим клиентам широкий спектр услуг, таких как мониторинг, автоматические рекомендации, оповещения и информацию о местах установки датчиков.
Для этого мы будем использовать виртуальную машину GCE для имитации IoT-устройства. Устройство будет публиковать данные в топик Kafka в кластере Google Managed Kafka, которые будут считываться и обрабатываться потоковым заданием Dataproc. Предварительная настройка и следующие страницы помогут вам выполнить все эти шаги.
Предварительная настройка
- Найдите название и номер вашего проекта. См. раздел «Как найти название, номер и идентификатор проекта» .
- Подсеть VPC. Это обеспечит связь между виртуальной машиной GCE, кластером Kafka и кластером Dataproc. Следуйте этим инструкциям, чтобы получить список существующих подсетей с помощью CLI gcloud. При необходимости создайте сеть VPC в автоматическом режиме , что создаст сеть VPC с подсетями в каждом регионе Google Cloud. Однако для целей этого практического занятия мы будем использовать подсеть только из одного региона.
- В этой подсети убедитесь, что существует правило брандмауэра, разрешающее весь входящий трафик с TCP:22, необходимый для SSH. Это правило будет доступно для выбора в разделе правил брандмауэра при создании сети, поэтому обязательно выберите его.
- Вам потребуется доступ к хранилищу Google Cloud Storage (GCS) для хранения ресурсов заданий Dataproc и сохранения обработанных данных. Если у вас его нет, вы можете создать его в своем проекте GCP.
Заполните переменные среды
В терминале, где вы запускаете CLI gcloud, заполните эти переменные среды, чтобы их можно было использовать позже.
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
Замените следующее:
-
<project-id>имя проекта GCP, который вы создали. -
<project-number>с названием номера проекта из шага 1 предварительного условия. -
<region>— название региона из списка доступных регионов и зон , которые вы хотите использовать. Например, можно использоватьus-central1. -
<zone>с именем зоны из списка доступных регионов и зон в регионе, который вы выбрали ранее. Например, если вы выбрали регионus-central1, вы можете использоватьus-central1-fв качестве зоны. Эта зона будет использоваться для создания виртуальной машины GCE, которая имитирует устройства IoT. Убедитесь, что ваша зона находится в выбранном вами регионе . -
<subnet-path>— полный путь к подсети из шага 2 предварительного условия. Значение должно быть в формате:projects/<project-id>/regions/<region>/subnetworks/<subnet-name>. -
<bucket-name>с именем корзины GCS из шага 3 предварительного условия.
3. Настройка Google Managed Kafka
В этом разделе настраивается кластер Google Managed Kafka, который развертывает сервер Kafka и создает на этом сервере тему, куда можно публиковать и считывать данные IoT после подписки. Компания DemoIOT Solutions может настроить этот кластер таким образом, чтобы все их устройства публиковали в него данные.
Создайте управляемый кластер Kafka.
- Создайте управляемый кластер Kafka. В данном случае имя кластера —
kafka-iot.
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
Вы должны получить ответ, похожий на следующий:
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
Создание кластера занимает 20-30 минут. Дождитесь завершения этой операции.
Создать тему
- Создайте управляемую тему Kafka в кластере. В данном случае имя темы —
kafka-iot-topic.
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
В результате вы должны получить примерно следующий вывод:
Created topic [kafka-iot-topic].
4. Настройте издателя.
Для публикации данных в кластере Managed Kafka мы настроили экземпляр виртуальной машины Google Compute Engine, который может получить доступ к VPC, содержащей подсеть, используемую кластером Managed Kafka. Эта виртуальная машина имитирует сенсорные устройства, предоставляемые компанией DemoIOT Solutions.
Шаги
- Создайте экземпляр виртуальной машины Google Compute Engine. В данном случае имя виртуальной машины GCE —
publisher-instance.
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Предоставьте учетной записи службы Google Compute Engine по умолчанию разрешения на использование управляемой службы для Apache Kafka.
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- Для подключения к виртуальной машине используйте SSH. В качестве альтернативы, используйте консоль Google Cloud для подключения по SSH .
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Установите Java для запуска инструментов командной строки Kafka и загрузите исполняемый файл Kafka, используя следующие команды.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Загрузите библиотеку управляемой аутентификации Kafka и ее зависимости, а также настройте свойства клиента Kafka.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
Для получения более подробной информации о настройке машины издателя см. раздел «Настройка клиентской машины» .
5. Опубликовать в управляемом Kafka.
Теперь, когда издатель настроен, мы можем использовать командную строку Kafka для публикации фиктивных данных с виртуальной машины GCE (имитирующей устройства IoT от DemoIOT Solutions) в управляемый кластер Kafka.
- Поскольку мы подключились к виртуальной машине GCE по SSH, нам необходимо повторно заполнить переменную
PROJECT_ID:
export PROJECT_ID=<project-id>
export REGION=<region>
Замените следующее:
-
<project-id>имя проекта GCP, который вы создали. -
<region>с регионом, в котором был создан кластер Kafka
- Используйте команду
managed-kafka clusters describe, чтобы получить IP-адрес загрузочного сервера Kafka. Этот адрес можно использовать для подключения к кластеру Kafka.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Перечислите темы в кластере:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
Вы должны увидеть следующий вывод, содержащий созданную нами ранее тему kafka-iot-topic .
__remote_log_metadata
kafka-iot-topic
- Скопируйте и вставьте этот скрипт в новый файл
publish_iot_data.sh. Для создания нового файла на виртуальной машине GCE можно использовать такие инструменты, какvimилиnano.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
Объяснение
- Этот скрипт создает JSON-сообщения с имитированными показаниями датчиков, которые содержат идентификатор устройства, метку времени, данные датчиков (температура, влажность, давление, освещенность), информацию о местоположении (широта, долгота), состояние устройства (батарея, сигнал, тип подключения) и некоторые метаданные.
- Система генерирует непрерывный поток сообщений от заданного количества уникальных устройств, каждое из которых отправляет данные с определенным интервалом времени, имитируя реальные устройства Интернета вещей. Здесь мы публикуем данные с 10 устройств, каждое из которых выдает 20 показаний с интервалом в 10 секунд.
- Кроме того, все сгенерированные данные публикуются в топик Kafka с помощью инструмента командной строки Kafka Producer.
- Установите необходимые для скрипта зависимости: пакет
bcдля математических вычислений и пакетjqдля обработки JSON.
sudo apt-get install bc jq
- Измените скрипт, чтобы он стал исполняемым файлом, и запустите его. Выполнение займет около 2 минут.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
Вы можете убедиться в успешной публикации событий, выполнив эту команду, которая выведет все события. Нажмите <control-c> для выхода.
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Настройка кластера Dataproc
В этом разделе создается кластер Dataproc в подсети VPC, где находится управляемый кластер Kafka. Этот кластер будет использоваться для запуска заданий, генерирующих статистику и аналитические данные в реальном времени, необходимые для решений DemoIOT.
- Создайте кластер Dataproc. В данном случае имя кластера —
dataproc-iot.
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
Вы должны получить ответ, похожий на следующий:
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
Создание кластера может занять 10-15 минут. Дождитесь успешного завершения этой операции и убедитесь, что кластер находится в состоянии RUNNING , описав его.
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Обработка сообщений Kafka с помощью Dataproc.
В этом последнем разделе вы запустите задание Dataproc, которое будет обрабатывать опубликованные сообщения с помощью Spark Streaming . Это задание фактически генерирует статистику и аналитические данные в реальном времени, которые могут быть использованы компанией DemoIOT Solutions.
- Выполните эту команду, чтобы создать локально файл задания потоковой обработки PySpark с именем
process_iot.py.
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
Объяснение
- Этот код настраивает задание PySpark Structured Streaming для чтения данных из указанной темы Kafka. Он использует предоставленный адрес начальной загрузки сервера Kafka и конфигурации Kafka, загруженные из файла конфигурации GCS, для подключения и аутентификации с брокером Kafka.
- Сначала система считывает необработанные данные из Kafka в виде потока массивов байтов, преобразует эти массивы байтов в строки и применяет
json_schemaиспользуя StructType из Spark для указания структуры данных (идентификатор устройства, метка времени, местоположение, данные датчика и т. д.). - Программа выводит первые 10 строк в консоль для предварительного просмотра, вычисляет среднюю температуру для каждого датчика и записывает все данные в хранилище GCS в формате
avro. Avro — это система сериализации данных на основе строк, которая эффективно хранит структурированные данные в компактном, определенном схемой бинарном формате, обеспечивая эволюцию схемы, языковую нейтральность и высокую степень сжатия для обработки больших объемов данных.
- Создайте файл
client.propertiesи заполните переменную среды адресом начальной загрузки сервера Kafka.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- Загрузите файлы
process_iot.pyиclient.propertiesв свой сегмент Google Cloud Storage, чтобы их могла использовать задача Dataproc.
gsutil cp process_iot.py client.properties $BUCKET
- Скопируйте несколько необходимых JAR-файлов для задания Dataproc в ваш GCS-хранилище. В этом каталоге находятся JAR-файлы, необходимые для запуска заданий Spark Streaming с Kafka, а также библиотека управляемой аутентификации Kafka и её зависимости, взятые из раздела «Настройка клиентской машины ».
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Отправьте задание Spark в кластер Dataproc.
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Журналы драйвера Spark будут выведены на экран. Вы также сможете увидеть эти таблицы в консоли и данные, хранящиеся в вашем хранилище GCS.
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. Уборка
Выполните следующие действия для очистки ресурсов после завершения практического задания.
- Удалите управляемый кластер Kafka, виртуальную машину Publisher GCE и кластер Dataproc.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- Удалите подсеть и основную сеть вашей VPC.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- Удалите свой сегмент GCS, если вы больше не хотите использовать данные.
gcloud storage rm --recursive $BUCKET
9. Поздравляем!
Поздравляем, вы успешно создали конвейер обработки данных IoT с помощью Manage Kafka и Dataproc, который помогает DemoIOT Solutions получать информацию в режиме реального времени о данных, публикуемых их устройствами!
Вы создали кластер Managed Kafka, опубликовали в него события IoT и запустили задание Dataproc, которое использовало потоковую обработку Spark для обработки этих событий в реальном времени. Теперь вы знаете ключевые шаги, необходимые для создания конвейеров данных с использованием Managed Kafka и Dataproc.