使用 Dataproc 和 Google Managed Service for Apache Kafka 處理即時物聯網資料

1. 簡介

2efacab8643a653b.png

上次更新時間:2024 年 6 月 10 日

背景

從智慧住宅解決方案到工業感應器,物聯網 (IoT) 裝置會在網路邊緣產生大量資料。這類資料非常適合用於各種用途,例如裝置監控、追蹤、診斷、監控、個人化、車隊最佳化等。Google Managed Service for Apache Kafka 提供可擴充且持久的解決方案,以相容於 OSS 的方式輕鬆安全地擷取及儲存這類持續串流的資料,而 Google Cloud Dataproc 則可使用 Apache Spark 和 Hadoop 叢集處理這些大型資料集,進行資料分析。

建構項目

在本程式碼實驗室中,您將使用 Google Managed Service for Apache Kafka、Dataproc、Python 和 Apache Spark 建構 IoT 資料處理管道,進行即時分析。管道會執行下列動作:

  • 使用 GCE VM 將 IoT 裝置的資料發布至 Managed Kafka 叢集
  • 將代管 Kafka 叢集的資料串流至 Dataproc 叢集
  • 使用 Dataproc Spark Streaming 工作處理資料

課程內容

  • 如何建立 Google 代管 Kafka 和 Dataproc 叢集
  • 如何使用 Dataproc 執行串流工作

軟硬體需求

2. 總覽

在本程式碼研究室中,我們將以虛構公司 DemoIOT Solutions 的故事為例。DemoIOT Solutions 提供感應器裝置,可測量及傳輸溫度、濕度、壓力、光照強度和位置資料。他們希望設定管道來處理這項資料,以便向顧客顯示即時統計資料。他們可以運用這類管道,為客戶提供各種服務,例如監控、自動建議、快訊,以及客戶安裝感應器地點的深入分析。

為此,我們將使用 GCE VM 模擬 IoT 裝置。裝置會將資料發布至 Google Managed Kafka 叢集中的 Kafka 主題,而 Dataproc 串流工作會讀取並處理這些資料。「先決條件設定」和後續頁面會引導您完成所有這些步驟。

前置設定

  1. 找出專案的名稱和編號。如需參考資料,請參閱「找出專案名稱、編號和 ID」。
  2. 虛擬私有雲子網路。這樣一來,GCE VM、Kafka 叢集和 Dataproc 叢集之間就能連線。請按照這個頁面的說明,使用 gcloud CLI 列出現有子網路。如有需要,請按照建立自動模式虛擬私有雲網路的步驟操作,在每個 Google Cloud 區域中建立具有子網路的虛擬私有雲網路。不過,在本程式碼研究室中,我們只會使用單一區域的子網路。
  • 在這個子網路中,請確保有防火牆規則允許來自 tcp:22 的所有輸入流量,這是 SSH 的必要條件。建立網路時,您可以在「防火牆規則」部分選取這項規則,請務必選取。
  1. GCS bucket。您需要存取 Google Cloud Storage bucket,才能儲存 Dataproc 工作資源並保存已處理的資料。如果沒有,您可以在 GCP 專案中建立一個。

填入環境變數

在執行 gcloud CLI 的終端機中,填入這些環境變數,以便稍後使用。

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

更改下列內容:

  • <project-id> 替換為您設定的 GCP 專案名稱。
  • <project-number> 替換為必要條件步驟 1 中的專案編號名稱。
  • <region> 改成您要使用的可用區域和可用區名稱。舉例來說,我們可以使用 us-central1
  • <zone>,其中 Available regions and zones 是您先前選取區域下方的區域名稱。舉例來說,如果您選取 us-central1 做為區域,則可以使用 us-central1-f 做為可用區。這個可用區將用於建立模擬 IoT 裝置的 GCE VM。確認可用區位於您選擇的地區
  • <subnet-path>,並提供必要條件步驟 2 中的子網路完整路徑。這個值的格式必須為 projects/<project-id>/regions/<region>/subnetworks/<subnet-name>
  • <bucket-name> 改成先決條件步驟 3 中的 GCS bucket 名稱。

3. 設定 Google 代管 Kafka

本節將設定 Google Managed Kafka 叢集,部署 Kafka 伺服器,並在這個伺服器上建立主題,以便發布及讀取 IoT 資料 (訂閱後)。DemoIOT Solutions 可以設定這個叢集,讓所有裝置將資料發布至該叢集。

建立 Kafka 代管叢集

  • 建立 Managed Kafka 叢集。這裡的叢集名稱是 kafka-iot
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

您應該會收到類似以下的回覆:

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

叢集建立作業需要 20 到 30 分鐘。等待這項作業完成。

建立主題

  • 在叢集上建立 Managed Kafka 主題。這裡的主題名稱為 kafka-iot-topic
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

輸出結果應該會類似下列內容:

Created topic [kafka-iot-topic].

4. 設定發布者

如要發布至 Managed Kafka 叢集,請設定可存取虛擬私有雲的 Google Compute Engine VM 執行個體,該虛擬私有雲包含 Managed Kafka 叢集使用的子網路。這個 VM 會模擬 DemoIOT Solutions 提供的感應器裝置。

步驟

  1. 建立 Google Compute Engine VM 執行個體。這裡的 GCE VM 名稱為 publisher-instance
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. 授予 Google Compute Engine 預設服務帳戶使用 Managed Service for Apache Kafka 的權限。
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. 使用 SSH 連線至 VM。或者,您也可以使用 Google Cloud 控制台透過 SSH 連線
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. 安裝 Java 來執行 Kafka 指令列工具,並使用這些指令下載 Kafka 二進位檔。
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. 下載代管 Kafka 驗證程式庫及其依附元件,並設定 Kafka 用戶端屬性。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

如要進一步瞭解發布者機器的設定,請參閱「設定用戶端機器」。

5. 發布至代管 Kafka

發布者設定完成後,我們就可以在發布者上使用 Kafka 指令列,將一些虛擬資料從 GCE VM (由 DemoIOT Solutions 模擬 IoT 裝置) 發布至 Managed Kafka 叢集。

  1. 由於我們已透過 SSH 連線至 GCE VM 執行個體,因此需要重新填入 PROJECT_ID 變數:
export PROJECT_ID=<project-id>
export REGION=<region>

更改下列內容:

  • <project-id> 替換為您設定的 GCP 專案名稱。
  • <region>,其中 Kafka 叢集是在該區域建立
  1. 使用 managed-kafka clusters describe 指令取得 Kafka 啟動伺服器的 IP 位址。這個位址可用於連線至 Kafka 叢集。
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. 列出叢集中的主題:
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

您應該會看到下列輸出內容,其中包含先前建立的主題 kafka-iot-topic

__remote_log_metadata
kafka-iot-topic
  1. 複製這段指令碼並貼到新檔案 publish_iot_data.sh 中。如要在 GCE VM 上建立新檔案,可以使用 vimnano 等工具。
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

說明

  • 這項指令碼會建立 JSON 訊息,其中包含模擬感應器讀數、裝置 ID、時間戳記、感應器資料 (溫度、濕度、壓力、光線)、位置資訊 (緯度、經度)、裝置狀態 (電池、訊號、連線類型) 和一些中繼資料。
  • 這項工具會從一組不重複的裝置產生連續訊息流,每部裝置都會在指定時間間隔傳送資料,模擬真實的 IoT 裝置。在此,我們發布 10 部裝置的資料,每部裝置會以 10 秒的時間間隔產生 20 個讀數。
  • 此外,這項工具也會使用 Kafka 生產者指令列工具,將所有產生的資料發布至 Kafka 主題。
  1. 安裝指令碼使用的部分依附元件 - bc 套件用於數學計算,jq 套件用於處理 JSON。
sudo apt-get install bc jq
  1. 將指令碼修改為可執行檔,然後執行指令碼。執行過程大約需要 2 分鐘。
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

執行下列指令即可列印所有事件,確認事件是否已順利發布。按下 <control-c> 即可退出。

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. 設定 Dataproc 叢集

本節會在 Managed Kafka 叢集所在的虛擬私有雲子網路中,建立 Dataproc 叢集。這個叢集將用於執行作業,產生 DemoIOT Solutions 所需的即時統計資料和洞察資料。

  1. 建立 Dataproc 叢集。這裡的叢集名稱為 dataproc-iot
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

您應該會收到類似以下的回覆:

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

建立叢集可能需要 10 到 15 分鐘。等待這項作業順利完成,然後說明叢集,確認叢集處於 RUNNING 狀態。

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. 使用 Dataproc 處理 Kafka 訊息

在最後一節中,您將提交 Dataproc 工作,使用 Spark Streaming 處理發布的訊息。這項工作實際上會產生一些即時統計資料和洞察資訊,供 DemoIOT 解決方案使用。

  1. 執行這項指令,在本機建立名為 process_iot.py 的串流 PySpark 工作檔案。
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

說明

  • 這段程式碼會設定 PySpark 結構化串流工作,從指定的 Kafka 主題讀取資料。這項服務會使用提供的 Kafka 伺服器啟動位址,以及從 GCS 設定檔載入的 Kafka 設定,連線至 Kafka 代理程式並進行驗證。
  • 首先,它會從 Kafka 讀取原始資料 (以位元組陣列串流的形式),然後將這些位元組陣列轉換為字串,並使用 Spark 的 StructType 套用 json_schema,指定資料結構 (裝置 ID、時間戳記、位置、感應器資料等)。
  • 這會將前 10 列資料顯示到控制台以供預覽、計算每個感應器的平均溫度,並以 avro 格式將所有資料寫入 GCS bucket。Avro 是以列為基礎的資料序列化系統,可將結構化資料有效率地儲存在緊湊的結構定義二進位格式中,並提供結構定義演進、語言中立性,以及適用於大規模資料處理的高壓縮率。
  1. 建立 client.properties 檔案,並填入 Kafka 伺服器啟動位址的環境變數。
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. process_iot.pyclient.properties 檔案上傳至 Google Cloud Storage 值區,供 Dataproc 工作使用。
gsutil cp process_iot.py client.properties $BUCKET
  1. 將 Dataproc 作業的一些依附元件 JAR 複製到 GCS bucket。這個目錄包含使用 Kafka 執行 Spark Streaming 工作所需的 JAR,以及從「設定用戶端機器」取得的 Managed Kafka 驗證程式庫及其依附元件。
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. 將 Spark 工作提交至 Dataproc 叢集。
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

系統會列印 Spark 驅動程式記錄。您也應該能在控制台中看到這些資料表記錄,以及儲存在 GCS 值區中的資料。

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. 清理

完成 Codelab 後,請按照步驟清理資源。

  1. 刪除代管 Kafka 叢集、發布者 GCE VM 和 Dataproc 叢集。
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. 刪除虛擬私有雲子網路和網路。
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. 如果不再需要資料,請刪除 GCS 值區。
gcloud storage rm --recursive $BUCKET

9. 恭喜

恭喜!您已使用 Manage Kafka 和 Dataproc 成功建構 IoT 資料處理管道,協助 DemoIOT Solutions 即時掌握裝置發布的資料!

您已建立 Managed Kafka 叢集、將 IoT 事件發布至該叢集,並執行 Dataproc 工作,使用 Spark 串流即時處理這些事件。您現已瞭解,如要使用 Managed Kafka 和 Dataproc 建立資料管道,必須採取哪些重要步驟。

參考文件