1. はじめに

最終更新日: 2024-06-10
背景
スマートホーム ソリューションから産業用センサーまで、モノのインターネット(IoT)デバイスはネットワーク エッジで大量のデータを生成します。このデータは、デバイスのモニタリング、追跡、診断、監視、パーソナライズ、フリートの最適化など、さまざまなユースケースで非常に役立ちます。Google Managed Service for Apache Kafka は、この連続的なデータ ストリームを OSS 互換の使いやすく安全な方法で取り込んで保存できる、スケーラブルで耐久性のある方法を提供します。一方、Google Cloud Dataproc を使用すると、Apache Spark クラスタと Hadoop クラスタを使用して、これらの大規模なデータセットをデータ分析用に処理できます。
作成するアプリの概要
この Codelab では、Google Managed Service for Apache Kafka、Dataproc、Python、Apache Spark を使用して、リアルタイム分析を行う IoT データ処理パイプラインを構築します。パイプラインは次のようになります。
- GCE VM を使用して、IoT デバイスから Managed Kafka クラスタにデータを公開する
- Managed Kafka クラスタから Dataproc クラスタにデータをストリーミングする
- Dataproc Spark Streaming ジョブを使用してデータを処理する
学習内容
- Google Managed Kafka クラスタと Dataproc クラスタを作成する方法
- Dataproc を使用してストリーミング ジョブを実行する方法
必要なもの
- プロジェクトが設定された有効な GCP アカウント。お持ちでない場合は、無料トライアルにお申し込みください。
- gcloud CLI がインストールされ、構成されている。gcloud CLI をオペレーティング システムにインストールするの手順に沿って操作してください。
- GCP プロジェクトで Google Managed Kafka と Dataproc の API が有効になっている。
2. 概要
この Codelab では、架空の企業である DemoIOT Solutions のストーリーに沿って説明します。DemoIOT Solutions は、温度、湿度、圧力、光量、位置のデータを測定して送信するセンサー デバイスを提供しています。このデータを処理して、顧客にリアルタイムの統計情報を表示するパイプラインを設定したいと考えています。このようなパイプラインを使用すると、顧客がセンサーを設置した場所に関するモニタリング、自動提案、アラート、分析情報など、さまざまなサービスを顧客に提供できます。
これを行うには、GCE VM を使用して IoT デバイスをシミュレートします。デバイスは、Google Managed Kafka クラスタの Kafka トピックにデータを公開します。このデータは、Dataproc ストリーミング ジョブによって読み取られて処理されます。前提条件の設定と次のページでは、これらの手順をすべて実行します。
前提条件の設定
- プロジェクトのプロジェクト名とプロジェクト番号を確認します。リファレンスについては、プロジェクト名、番号、ID を確認するをご覧ください。
- VPC サブネットワーク。これにより、GCE VM、Kafka クラスタ、Dataproc クラスタ間の接続が可能になります。gcloud CLI を使用して既存のサブネットを一覧表示するには、こちらの手順に沿って操作してください。必要に応じて、自動モードの VPC ネットワークを作成するの手順に沿って、Google Cloud リージョンごとにサブネットワークを持つ VPC ネットワークを作成します。ただし、この Codelab では、1 つのリージョンのサブネットワークのみを使用します。
- このサブネットワークで、tcp:22 からのすべての上り(内向き)を許可するファイアウォール ルールがあることを確認します。これは SSH に必要です。このルールは、ネットワークの作成時に [ファイアウォール ルール] セクションで選択できます。必ず選択してください。
- GCS バケット。Dataproc ジョブリソースを保存し、処理済みデータを永続化するには、Google Cloud Storage バケットへのアクセス権が必要です。お持ちでない場合は、GCP プロジェクトで 作成 できます。
環境変数を設定する
gcloud CLI を実行するターミナルで、これらの環境変数を設定して、後で使用できるようにします。
export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>
次のように置き換えます。
<project-id>は、設定した GCP プロジェクトの名前に置き換えます。<project-number>は、前提条件のステップ 1 のプロジェクト番号の名前に置き換えます。<region>は、使用する 利用可能なリージョンとゾーン のリージョンの名前に置き換えます。たとえば、us-central1を使用できます。<zone>は、以前に選択したリージョンの 利用可能なリージョンとゾーン のゾーンの名前に置き換えます。たとえば、リージョンとしてus-central1を選択した場合は、ゾーンとしてus-central1-fを使用できます。このゾーンは、IoT デバイスをシミュレートする GCE VM の作成に使用されます。ゾーンが選択したリージョンにあることを確認します。<subnet-path>は、前提条件のステップ 2 のサブネットのフルパスに置き換えます。この値は、projects/<project-id>/regions/<region>/subnetworks/<subnet-name>の形式にする必要があります。<bucket-name>は、前提条件のステップ 3 の GCS バケットの名前に置き換えます。
3. Google Managed Kafka を設定する
このセクションでは、Google Managed Kafka クラスタを設定します。これにより、Kafka サーバーがデプロイされ、このサーバーにトピックが作成されます。このトピックに IoT データを公開し、サブスクライブ後に読み取ることができます。DemoIOT Solutions は、すべてのデバイスがデータを公開するようにこのクラスタを設定できます。
Managed Kafka クラスタを作成する
- Managed Kafka クラスタを作成します。ここでは、クラスタの名前は
kafka-iotです。
gcloud managed-kafka clusters create kafka-iot \
--project=$PROJECT_ID \
--location=$REGION \
--cpu=3 \
--memory=12GiB \
--subnets=$SUBNET_PATH \
--auto-rebalance
次のようなレスポンスが返されます。
Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.
Created cluster [kafka-iot].
クラスタの作成には 20 ~ 30 分かかります。このオペレーションが完了するまで待ちます。
トピックを作成する
- クラスタに Managed Kafka トピックを作成します。ここでは、トピックの名前は
kafka-iot-topicです。
gcloud managed-kafka topics create kafka-iot-topic \
--cluster=kafka-iot \
--location=$REGION \
--partitions=10 \
--replication-factor=3
次のような出力が表示されます。
Created topic [kafka-iot-topic].
4. パブリッシャーを設定する
Managed Kafka クラスタに公開するには、Managed Kafka クラスタで使用されるサブネットを含む VPC にアクセスできる Google Compute Engine VM インスタンスを設定します。この VM は、DemoIOT Solutions が提供するセンサー デバイスをシミュレートします。
手順
- Google Compute Engine VM インスタンスを作成します。ここでは、GCE VM の名前は
publisher-instanceです。
gcloud compute instances create publisher-instance \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--subnet=$SUBNET_PATH \
--zone=$ZONE
- Google Compute Engine のデフォルトのサービス アカウントに、Managed Service for Apache Kafka を使用する権限を付与します。
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
--role=roles/managedkafka.client
- SSH で VM に接続します。または、Google Cloud コンソールを使用して SSH を使用します。
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
- Kafka コマンドライン ツールを実行するために Java をインストールし、次のコマンドを使用して Kafka バイナリをダウンロードします。
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
- Managed Kafka 認証ライブラリとその依存関係をダウンロードし、Kafka クライアントのプロパティを構成します。
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
パブリッシャー マシンの設定の詳細については、クライアント マシンを設定するをご覧ください。
5. Managed Kafka に公開する
パブリッシャーが設定されたので、Kafka コマンドラインを使用して、GCE VM(DemoIOT Solutions による IoT デバイスのシミュレーション)から Managed Kafka クラスタにダミーデータを公開できます。
- GCE VM インスタンスに SSH で接続したので、
PROJECT_ID変数を再設定する必要があります。
export PROJECT_ID=<project-id>
export REGION=<region>
次のように置き換えます。
<project-id>は、設定した GCP プロジェクトの名前に置き換えます。<region>は、Kafka クラスタが作成されたリージョンに置き換えます。
managed-kafka clusters describeコマンドを使用して、Kafka ブートストラップ サーバーの IP アドレスを取得します。このアドレスを使用して Kafka クラスタに接続できます。
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
- クラスタ内のトピックを一覧表示します。
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties
以前に作成したトピック kafka-iot-topic を含む次の出力が表示されます。
__remote_log_metadata
kafka-iot-topic
- このスクリプトをコピーして、新しいファイル
publish_iot_data.shに貼り付けます。GCE VM に新しいファイルを作成するには、vimやnanoなどのツールを使用します。
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash
NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10
generate_sensor_data() {
local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
local light_level=$((RANDOM % 1000))
echo "\"temperature\": $temperature,"
echo "\"humidity\": $humidity,"
echo "\"pressure\": $pressure,"
echo "\"light_level\": $light_level"
}
generate_location_data() {
local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))
echo "\"latitude\": $latitude,"
echo "\"longitude\": $longitude"
}
generate_device_status() {
local battery_level=$((RANDOM % 101))
local signal_strength=$((RANDOM % 80 - 100))
local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"
echo "\"battery_level\": $battery_level,"
echo "\"signal_strength\": $signal_strength,"
echo "\"connection_type\": \"$connection_type\""
}
publish_to_kafka() {
local device_index=$1
local message_index=$2
local device_id="sensor-$((device_index % NUM_IDS))"
local timestamp=$((start_time + (message_index * message_interval)))
local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")
local json_data=$(cat <<EOF
{
"device_id": "$device_id",
"timestamp": "$date",
"location": {
$(generate_location_data)
},
"sensor_data": {
$(generate_sensor_data)
},
"device_status": {
$(generate_device_status)
},
"metadata": {
"sensor_type": "environmental",
"unit_temperature": "Celsius",
"unit_humidity": "%" ,
"unit_pressure": "hPa",
"unit_light_level": "lux",
"firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
}
}
EOF
)
echo $json_data | jq -rc
}
for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
for device_index in $(seq 0 $((NUM_IDS - 1))); do
publish_to_kafka "$device_index" "$message_index"
done
done | kafka-console-producer.sh \
--topic kafka-iot-topic \
--bootstrap-server $1 \
--producer.config $2
説明
- このスクリプトは、デバイス ID、タイムスタンプ、センサーデータ(温度、湿度、圧力、光)、位置情報(緯度、経度)、デバイスのステータス(バッテリー、信号、接続タイプ)、メタデータを含む、シミュレートされたセンサー読み取り値を含む JSON メッセージを作成します。
- 指定された数の固有のデバイスからメッセージの連続フローを生成します。各デバイスは指定された時間間隔でデータを送信し、実際の IoT デバイスを模倣します。ここでは、10 台のデバイスからデータを公開します。各デバイスは 10 秒間隔で 20 個の読み取り値を生成します。
- また、Kafka プロデューサー コマンドライン ツールを使用して、生成されたすべてのデータを Kafka トピックに公開します。
- スクリプトで使用される依存関係をインストールします。数学計算用の
bcパッケージと JSON 処理用のjqパッケージです。
sudo apt-get install bc jq
- スクリプトを実行可能に変更して、スクリプトを実行します。実行には約 2 分かかります。
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties
このコマンドを実行すると、すべてのイベントが出力され、イベントが正常に公開されたことを確認できます。<control-c> を押して終了します。
kafka-console-consumer.sh \
--topic kafka-iot-topic \
--from-beginning \
--bootstrap-server $BOOTSTRAP \
--consumer.config client.properties
6. Dataproc クラスタを設定する
このセクションでは、Managed Kafka クラスタが存在する VPC サブネットワークに Dataproc クラスタを作成します。このクラスタは、DemoIOT Solutions に必要なリアルタイムの統計情報と分析情報を生成するジョブを実行するために使用されます。
- Dataproc クラスタを作成します。ここでは、クラスタの名前は
dataproc-iotです。
gcloud dataproc clusters create dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION \
--image-version=2.2-debian12 \
--enable-component-gateway \
--subnet=$SUBNET_PATH \
--master-machine-type=n1-standard-4 \
--worker-machine-type=n1-standard-4 \
--num-workers=2 \
--properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G
次のようなレスポンスが返されます。
Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.
Waiting for cluster creation operation...done.
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].
クラスタの作成には 10 ~ 15 分かかることがあります。このオペレーションが正常に完了するまで待ち、クラスタを記述してクラスタが RUNNING 状態であることを確認します。
gcloud dataproc clusters describe dataproc-iot \
--project=$PROJECT_ID \
--region=$REGION
7. Dataproc を使用して Kafka メッセージを処理する
最後のセクションでは、Spark Streaming を使用して公開されたメッセージを処理する Dataproc ジョブを送信します。このジョブは、DemoIOT Solutions で使用できるリアルタイムの統計情報と分析情報を生成します。
- このコマンドを実行して、
process_iot.pyという名前のストリーミング PySpark ジョブファイルをローカルに作成します。
cat > process_iot.py <<EOF
#!/bin/python
import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException
JSON_SCHEMA = StructType([
StructField("device_id", StringType()),
StructField("timestamp", StringType()),
StructField(
"location",
StructType([
StructField("latitude", FloatType()),
StructField("longitude", FloatType()),
]),
),
StructField(
"sensor_data",
StructType([
StructField("temperature", FloatType()),
StructField("humidity", FloatType()),
StructField("pressure", FloatType()),
StructField("light_level", IntegerType()),
]),
),
StructField(
"device_status",
StructType([
StructField("battery_level", IntegerType()),
StructField("signal_strength", IntegerType()),
StructField("connection_type", StringType()),
]),
),
StructField(
"metadata",
StructType([
StructField("sensor_type", StringType()),
StructField("unit_temperature", StringType()),
StructField("unit_humidity", StringType()),
StructField("unit_pressure", StringType()),
StructField("unit_light_level", StringType()),
StructField("firmware_version", StringType()),
]),
),
])
CLIENT_PROPERTY_KEYS = [
"security.protocol",
"sasl.mechanism",
"sasl.login.callback.handler.class",
"sasl.jaas.config",
]
def get_client_properties(client_properties_path: str):
# Parse client properties file
parsed_path = urlparse(client_properties_path)
if parsed_path.scheme != "gs":
raise ValueError("Invalid GCS path for client properties. Must start with gs://.")
bucket_name = parsed_path.netloc
blob_name = parsed_path.path.lstrip("/")
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
file_content = "[DEFAULT]\n" + blob.download_as_text()
config = configparser.ConfigParser()
config.read_string(file_content)
client_properties = dict()
for key in CLIENT_PROPERTY_KEYS:
client_properties[key] = config.get("DEFAULT", key)
print(f"Client properties: {client_properties}")
return client_properties
def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
print("Starting initial data processing...")
initial_rows = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_server_address)
.option("startingOffsets", "earliest")
.option("subscribe", "kafka-iot-topic")
.option("kafka.security.protocol", client_properties["security.protocol"])
.option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
.option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
.option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
.load()
)
initial_rows = (
initial_rows.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), JSON_SCHEMA).alias("data"))
.select("data.*")
)
# Print first 20 rows
def print_top_rows(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
batch_df.limit(20).show(truncate=False)
initial_query_print = initial_rows.writeStream \
.foreachBatch(print_top_rows) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_print)
# Calculate and print average temperatures
def process_initial_avg_temp(batch_df, batch_id):
if batch_df.count() > 0:
print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
current_averages = (
batch_df.select("device_id", "sensor_data.temperature")
.groupBy("device_id")
.agg(avg("temperature").alias("average_temperature"))
.orderBy("device_id")
)
current_averages.show(truncate=False)
initial_query_avg_temp = initial_rows.writeStream \
.foreachBatch(process_initial_avg_temp) \
.trigger(once=True) \
.start()
queries_to_await.append(initial_query_avg_temp)
# Write data to GCS
initial_data_gcs_writer = (
initial_rows.writeStream.outputMode("append")
.format("avro")
.option("path", store_data_gcs_path+"/tables/iot_avro/")
.option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
.trigger(once=True) \
.start()
)
queries_to_await.append(initial_data_gcs_writer)
def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):
client_properties = get_client_properties(client_properties_path)
# Create SparkSession
spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
queries_to_await = []
process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
# Wait for all queries to terminate
for query in queries_to_await:
try:
query.awaitTermination()
except Exception as e:
print(f"Error awaiting query: {e}")
finally:
query.stop()
spark.stop()
if __name__ == "__main__":
if len(sys.argv) < 4:
print("Invalid number of arguments passed ", len(sys.argv))
print(
"Usage: ",
sys.argv[0],
" <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
)
raise ValueError("Invalid number of arguments passed.")
parsed_data_path = urlparse(sys.argv[3])
if parsed_data_path.scheme != "gs":
raise ValueError("Invalid GCS path for storing data. Must start with gs://.")
main(sys.argv[1], sys.argv[2], sys.argv[3])
EOF
説明
- このコードは、指定された Kafka トピックからデータを読み取る PySpark Structured Streaming ジョブを設定します。指定された Kafka サーバーのブートストラップ アドレスと、GCS 構成ファイルから読み込まれた Kafka 構成を使用して、Kafka ブローカーに接続して認証します。
- まず、Kafka から未加工データをバイト配列のストリームとして読み取り、それらのバイト配列を文字列にキャストし、Spark の StructType を使用して
json_schemaを適用して、データの構造(デバイス ID、タイムスタンプ、位置情報、センサーデータなど)を指定します。 - 最初の 10 行をコンソールに出力してプレビューし、センサーごとの平均温度を計算して、すべてのデータを
avro形式で GCS バケットに書き込みます。Avro は行ベースのデータ シリアル化システムで、構造化データをコンパクトなスキーマ定義のバイナリ形式で効率的に保存し、スキーマの進化、言語の中立性、大規模なデータ処理の高圧縮を実現します。
client.propertiesファイルを作成し、Kafka サーバーのブートストラップ アドレスの環境変数を設定します。
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
process_iot.pyファイルとclient.propertiesファイルを Google Cloud Storage バケットにアップロードして、Dataproc ジョブで使用できるようにします。
gsutil cp process_iot.py client.properties $BUCKET
- Dataproc ジョブの依存関係 JAR を GCS バケットにコピーします。このディレクトリには、Kafka で Spark Streaming ジョブを実行するために必要な JAR、Managed Kafka 認証ライブラリとその依存関係が含まれています。これらは、クライアント マシンを設定するから取得されます。
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
- Spark ジョブを Dataproc クラスタに送信します。
gcloud dataproc jobs submit pyspark \
$BUCKET/process_iot.py \
--project=$PROJECT_ID \
--region=$REGION \
--cluster=dataproc-iot \
--properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
-- $BOOTSTRAP $BUCKET/client.properties $BUCKET
Spark ドライバのログが出力されます。これらのテーブルがコンソールに記録され、データが GCS バケットに保存されていることも確認できます。
25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp |location |sensor_data |device_status |metadata |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi} |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN} |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954 |
|sensor-1 |20.475 |
|sensor-2 |20.475 |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138 |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422 |
|sensor-9 |20.405000019073487 |
+---------+-------------------+
8. クリーンアップ
Codelab の完了後にリソースをクリーンアップする手順は次のとおりです。
- Managed Kafka クラスタ、Publisher GCE VM、Dataproc クラスタを削除します。
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
- VPC サブネットワークとネットワークを削除します。
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
- データを不要になった場合は、GCS バケットを削除します。
gcloud storage rm --recursive $BUCKET
9. 完了
お疲れさまでした。Managed Kafka と Dataproc を使用して IoT データ処理パイプラインを構築し、DemoIOT Solutions がデバイスから公開されたデータに関するリアルタイムの分析情報を取得できるようにしました。
Managed Kafka クラスタを作成し、IoT イベントを公開して、Spark Streaming を使用してこれらのイベントをリアルタイムで処理する Dataproc ジョブを実行しました。Managed Kafka と Dataproc を使用してデータ パイプラインを作成するために必要な主な手順を理解できました。