Dataproc 및 Apache Kafka용 Google 관리형 서비스를 사용한 실시간 IoT 데이터 처리

1. 소개

2efacab8643a653b.png

최종 업데이트: 2024년 6월 10일

배경

스마트 홈 솔루션부터 산업용 센서에 이르기까지 다양한 사물 인터넷 (IoT) 기기는 네트워크 에지에서 방대한 양의 데이터를 생성합니다. 이 데이터는 기기 모니터링, 추적, 진단, 감시, 맞춤설정, 차량 최적화 등 다양한 사용 사례에 매우 유용합니다. Apache Kafka용 Google 관리형 서비스는 OSS 호환 방식으로 사용하기 쉽고 안전한 방식으로 이러한 지속적인 데이터 스트림을 수집하고 저장할 수 있는 확장 가능하고 내구성이 뛰어난 방법을 제공하며, Google Cloud Dataproc을 사용하면 Apache Spark 및 Hadoop 클러스터를 사용하여 데이터 분석을 위해 이러한 대규모 데이터 세트를 처리할 수 있습니다.

빌드할 항목

이 Codelab에서는 Google Managed Service for Apache Kafka, Dataproc, Python, Apache Spark를 사용하여 실시간 분석을 실행하는 IoT 데이터 처리 파이프라인을 빌드합니다. 파이프라인은 다음을 수행합니다.

  • GCE VM을 사용하여 IoT 기기의 데이터를 관리 Kafka 클러스터에 게시
  • 관리형 Kafka 클러스터에서 Dataproc 클러스터로 데이터 스트리밍
  • Dataproc Spark Streaming 작업을 사용하여 데이터 처리

학습할 내용

  • Google 관리 Kafka 및 Dataproc 클러스터를 만드는 방법
  • Dataproc을 사용하여 스트리밍 작업을 실행하는 방법

필요한 항목

2. 개요

이 Codelab에서는 더미 회사인 DemoIOT Solutions의 이야기를 따라가 보겠습니다. DemoIOT Solutions는 온도, 습도, 압력, 조도, 위치 데이터를 측정하고 전송하는 센서 기기를 제공합니다. 이 데이터를 처리하여 고객에게 실시간 통계를 표시하는 파이프라인을 설정하려고 합니다. 이러한 파이프라인을 사용하여 고객이 센서를 설치한 장소에 관한 모니터링, 자동 추천, 알림, 통계와 같은 다양한 서비스를 고객에게 제공할 수 있습니다.

이를 위해 GCE VM을 사용하여 IoT 기기를 시뮬레이션합니다. 기기는 Google 관리 Kafka 클러스터의 Kafka 주제에 데이터를 게시하며, 이 데이터는 Dataproc 스트리밍 작업에 의해 읽히고 처리됩니다. 기본 요건 설정과 다음 페이지를 통해 이러한 모든 단계를 수행할 수 있습니다.

기본 요건 설정

  1. 프로젝트의 프로젝트 이름과 프로젝트 번호를 찾습니다. 프로젝트 이름, 번호, ID 찾기를 참고하세요.
  2. VPC 서브네트워크입니다. 이렇게 하면 GCE VM, Kafka 클러스터, Dataproc 클러스터 간에 연결이 가능해집니다. 이 페이지에 따라 gcloud CLI를 사용하여 기존 서브넷을 나열합니다. 필요한 경우 자동 모드 VPC 네트워크 만들기에 따라 각 Google Cloud 리전에 서브네트워크가 있는 VPC 네트워크를 만듭니다. 하지만 이 Codelab에서는 단일 리전의 서브넷만 사용합니다.
  • 이 서브넷에서 SSH에 필요한 tcp:22의 모든 인그레스를 허용하는 방화벽 규칙이 있는지 확인합니다. 이 규칙은 네트워크를 만들 때 방화벽 규칙 섹션에서 선택할 수 있으므로 선택해야 합니다.
  1. GCS 버킷입니다. Dataproc 작업 리소스를 저장하고 처리된 데이터를 유지하려면 Google Cloud 스토리지 버킷에 액세스해야 합니다. 계정이 없는 경우 GCP 프로젝트에서 계정을 만들 수 있습니다.

환경 변수 채우기

gcloud CLI를 실행하는 터미널에서 나중에 사용할 수 있도록 이러한 환경 변수를 채웁니다.

export PROJECT_ID=<project-id>
export PROJECT_NUMBER=<project-number>
export REGION=<region>
export ZONE=<zone>
export SUBNET_PATH=<subnet-path>
export BUCKET=gs://<bucket-name>

다음을 바꿉니다.

  • <project-id>를 설정한 GCP 프로젝트의 이름으로 바꿉니다.
  • <project-number>를 필수 단계 1의 프로젝트 번호 이름으로 바꿉니다.
  • <region>을 사용하려는 사용 가능한 리전 및 영역의 리전 이름으로 바꿉니다. 예를 들어 us-central1를 사용할 수 있습니다.
  • <zone>을 이전에 선택한 리전의 사용 가능한 리전 및 영역에 있는 영역의 이름으로 바꿉니다. 예를 들어 us-central1를 리전으로 선택한 경우 us-central1-f을 영역으로 사용할 수 있습니다. 이 영역은 IoT 기기를 시뮬레이션하는 GCE VM을 만드는 데 사용됩니다. 영역이 선택한 리전에 있는지 확인합니다.
  • <subnet-path>를 기본 요건 2단계의 서브넷 전체 경로로 바꿉니다. 이 값은 projects/<project-id>/regions/<region>/subnetworks/<subnet-name> 형식이어야 합니다.
  • <bucket-name>을 필수 요건 3단계의 GCS 버킷 이름으로 바꿉니다.

3. Google 관리형 Kafka 설정

이 섹션에서는 Kafka 서버를 배포하고 이 서버에 주제를 만드는 Google 관리 Kafka 클러스터를 설정합니다. 이 주제는 구독한 후 IoT 데이터를 게시하고 읽을 수 있습니다. DemoIOT Solutions는 모든 기기가 데이터를 게시하도록 이 클러스터를 설정할 수 있습니다.

관리형 Kafka 클러스터 만들기

  • 관리형 Kafka 클러스터를 만듭니다. 여기에서 클러스터의 이름은 kafka-iot입니다.
gcloud managed-kafka clusters create kafka-iot \
    --project=$PROJECT_ID \
    --location=$REGION \
    --cpu=3 \
    --memory=12GiB \
    --subnets=$SUBNET_PATH \
    --auto-rebalance

다음과 비슷한 응답이 표시됩니다.

Create request issued for: [kafka-iot]
Waiting for operation [projects/<project-id>/locations/<region>/operations/<operation-id>] to complete...done.                                         
Created cluster [kafka-iot].

클러스터를 만드는 데 20~30분 정도 걸립니다. 이 작업이 완료될 때까지 기다립니다.

주제 만들기

  • 클러스터에 관리형 Kafka 주제를 만듭니다. 여기에서 주제 이름은 kafka-iot-topic입니다.
gcloud managed-kafka topics create kafka-iot-topic \
    --cluster=kafka-iot \
    --location=$REGION \
    --partitions=10 \
    --replication-factor=3

다음과 비슷한 출력이 표시됩니다.

Created topic [kafka-iot-topic].

4. 게시자 설정

관리형 Kafka 클러스터에 게시하기 위해 관리형 Kafka 클러스터에서 사용하는 서브넷을 액세스할 수 있는 Google Compute Engine VM 인스턴스를 설정합니다. 이 VM은 DemoIOT Solutions에서 제공하는 센서 기기를 시뮬레이션합니다.

단계

  1. Google Compute Engine VM 인스턴스를 만듭니다. 여기서 GCE VM의 이름은 publisher-instance입니다.
gcloud compute instances create publisher-instance \
    --scopes=https://www.googleapis.com/auth/cloud-platform \
    --subnet=$SUBNET_PATH \
    --zone=$ZONE
  1. Google Compute Engine 기본 서비스 계정에 Apache Kafka용 관리형 서비스를 사용할 권한을 부여합니다.
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role=roles/managedkafka.client
  1. SSH를 사용하여 VM에 연결합니다. 또는 Google Cloud 콘솔을 사용하여 SSH합니다.
gcloud compute ssh publisher-instance --project=$PROJECT_ID --zone=$ZONE
  1. Kafka 명령줄 도구를 실행하기 위해 Java를 설치하고 다음 명령어를 사용하여 Kafka 바이너리를 다운로드합니다.
sudo apt-get install default-jre wget
wget -O kafka_2.13-3.7.2.tgz  https://downloads.apache.org/kafka/3.7.2/kafka_2.13-3.7.2.tgz
tar xfz kafka_2.13-3.7.2.tgz
export KAFKA_HOME=$(pwd)/kafka_2.13-3.7.2
export PATH=$PATH:$KAFKA_HOME/bin
  1. 관리형 Kafka 인증 라이브러리와 종속 항목을 다운로드하고 Kafka 클라이언트 속성을 구성합니다.
wget https://github.com/googleapis/managedkafka/releases/download/v1.0.5/release-and-dependencies.zip
sudo apt-get install unzip
unzip -n release-and-dependencies.zip -d $KAFKA_HOME/libs/
find "$KAFKA_HOME/libs/release-and-dependencies" -type f -name "*.jar" -exec cp -n {} "$KAFKA_HOME/libs/" \;
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF

게시자 머신 설정에 관한 자세한 내용은 클라이언트 머신 설정을 참고하세요.

5. 관리형 Kafka에 게시

이제 게시자가 설정되었으므로 게시자에서 Kafka 명령줄을 사용하여 GCE VM (DemoIOT Solutions에서 IoT 기기를 시뮬레이션)의 더미 데이터를 관리 Kafka 클러스터에 게시할 수 있습니다.

  1. GCE VM 인스턴스에 SSH로 연결했으므로 PROJECT_ID 변수를 다시 채워야 합니다.
export PROJECT_ID=<project-id>
export REGION=<region>

다음을 바꿉니다.

  • <project-id>를 설정한 GCP 프로젝트의 이름으로 바꿉니다.
  • <region>을 Kafka 클러스터가 생성된 리전으로 바꿉니다.
  1. managed-kafka clusters describe 명령어를 사용하여 Kafka 부트스트랩 서버의 IP 주소를 가져옵니다. 이 주소는 Kafka 클러스터에 연결하는 데 사용할 수 있습니다.
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. 클러스터의 주제를 나열합니다.
kafka-topics.sh --list \
--bootstrap-server $BOOTSTRAP \
--command-config client.properties

이전에 만든 kafka-iot-topic 주제가 포함된 다음 출력이 표시됩니다.

__remote_log_metadata
kafka-iot-topic
  1. 이 스크립트를 복사하여 새 파일 publish_iot_data.sh에 붙여넣습니다. GCE VM에서 새 파일을 만들려면 vim 또는 nano과 같은 도구를 사용하면 됩니다.
vi publish_iot_data.sh
# OR (use any one command)
nano publish_iot_data.sh
#!/bin/bash

NUM_MESSAGES_PER_DEVICE=20
NUM_IDS=10
start_time=$(date -d "2025-03-01T00:00:00Z" +%s)
message_interval=10

generate_sensor_data() {
  local temperature=$(printf "%.1f" $(echo "scale=1; 20 + $RANDOM % 100 / 10.0" | bc -l))
  local humidity=$(printf "%.1f" $(echo "scale=1; 50 + $RANDOM % 500 / 10.0" | bc -l))
  local pressure=$(printf "%.1f" $(echo "scale=1; 1000 + $RANDOM % 500 / 10.0" | bc -l))
  local light_level=$((RANDOM % 1000))

  echo "\"temperature\": $temperature,"
  echo "\"humidity\": $humidity,"
  echo "\"pressure\": $pressure,"
  echo "\"light_level\": $light_level"
}

generate_location_data() {
  local latitude=$(printf "%.4f" $(echo "scale=2; 33.0 + $RANDOM % 66" | bc -l))
  local longitude=$(printf "%.4f" $(echo "scale=2; -120.0 + $RANDOM % 66" | bc -l))

  echo "\"latitude\": $latitude,"
  echo "\"longitude\": $longitude"
}

generate_device_status() {
  local battery_level=$((RANDOM % 101))
  local signal_strength=$((RANDOM % 80 - 100))
  local connection_types=("Wi-Fi" "Cellular" "LoRaWAN")
  local connection_type="${connection_types[$((RANDOM % ${#connection_types[@]}))]}"

  echo "\"battery_level\": $battery_level,"
  echo "\"signal_strength\": $signal_strength,"
  echo "\"connection_type\": \"$connection_type\""
}

publish_to_kafka() {
  local device_index=$1
  local message_index=$2
  local device_id="sensor-$((device_index % NUM_IDS))"
  local timestamp=$((start_time + (message_index * message_interval)))
  local date=$(date -u -d "@$timestamp" +"%Y-%m-%dT%H:%M:%SZ")

  local json_data=$(cat <<EOF
{
  "device_id": "$device_id",
  "timestamp": "$date",
  "location": {
$(generate_location_data)
  },
  "sensor_data": {
$(generate_sensor_data)
  },
  "device_status": {
$(generate_device_status)
  },
  "metadata": {
    "sensor_type": "environmental",
    "unit_temperature": "Celsius",
    "unit_humidity": "%" ,
    "unit_pressure": "hPa",
    "unit_light_level": "lux",
    "firmware_version": "$((RANDOM % 3 +1)).$((RANDOM % 10)).$((RANDOM % 10))"
  }
}
EOF
)

  echo $json_data | jq -rc
}

for message_index in $(seq 0 $((NUM_MESSAGES_PER_DEVICE - 1))); do
  for device_index in $(seq 0 $((NUM_IDS - 1))); do
    publish_to_kafka "$device_index" "$message_index"
  done
done | kafka-console-producer.sh \
    --topic kafka-iot-topic \
    --bootstrap-server $1 \
    --producer.config $2

설명

  • 이 스크립트는 기기 ID, 타임스탬프, 센서 데이터 (온도, 습도, 압력, 조명), 위치 정보 (위도, 경도), 기기 상태 (배터리, 신호, 연결 유형), 일부 메타데이터가 포함된 시뮬레이션된 센서 판독값이 있는 JSON 메시지를 만듭니다.
  • 지정된 시간 간격으로 데이터를 전송하는 고유 기기에서 연속적인 메시지 흐름을 생성하여 실제 IoT 기기를 모방합니다. 여기서는 각각 20개의 판독값을 생성하는 10개 기기의 데이터를 10초 시간 간격으로 게시합니다.
  • 또한 Kafka 생산자 명령줄 도구를 사용하여 생성된 모든 데이터를 Kafka 주제에 게시합니다.
  1. 스크립트에서 사용하는 종속 항목(수학적 계산을 위한 bc 패키지, JSON 처리를 위한 jq 패키지)을 설치합니다.
sudo apt-get install bc jq
  1. 스크립트를 실행 파일로 수정하고 스크립트를 실행합니다. 실행하는 데 약 2분이 소요됩니다.
chmod +x publish_iot_data.sh
./publish_iot_data.sh $BOOTSTRAP client.properties

모든 이벤트를 출력하는 이 명령어를 실행하여 이벤트가 게시되었는지 확인할 수 있습니다. 종료하려면 <control-c>을 누르세요.

kafka-console-consumer.sh \
    --topic kafka-iot-topic \
    --from-beginning \
    --bootstrap-server $BOOTSTRAP \
    --consumer.config client.properties

6. Dataproc 클러스터 설정

이 섹션에서는 관리형 Kafka 클러스터가 있는 VPC 서브넷에 Dataproc 클러스터를 만듭니다. 이 클러스터는 DemoIOT Solutions에 필요한 실시간 통계와 인사이트를 생성하는 작업을 실행하는 데 사용됩니다.

  1. Dataproc 클러스터를 만듭니다. 여기서 클러스터의 이름은 dataproc-iot입니다.
gcloud dataproc clusters create dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION \
    --image-version=2.2-debian12 \
    --enable-component-gateway \
    --subnet=$SUBNET_PATH \
    --master-machine-type=n1-standard-4 \
    --worker-machine-type=n1-standard-4 \
    --num-workers=2 \
    --properties=spark:spark.driver.memory=6G,spark:spark.driver.executor=4G

다음과 비슷한 응답이 표시됩니다.

Waiting on operation [projects/<project-id>/regions/<region>/operations/<operation-id>.                                                                                                                                                                      
Waiting for cluster creation operation...done.                                                                                                                                                                    
Created [https://dataproc.googleapis.com/v1/projects/<project-id>/regions/<region>/clusters/dataproc-iot] Cluster placed in zone [<zone>].

클러스터를 만드는 데 10~15분 정도 걸릴 수 있습니다. 이 작업이 완료될 때까지 기다린 후 클러스터를 설명하여 클러스터가 RUNNING 상태인지 확인합니다.

gcloud dataproc clusters describe dataproc-iot \
    --project=$PROJECT_ID \
    --region=$REGION

7. Dataproc을 사용하여 Kafka 메시지 처리

이 마지막 섹션에서는 Spark Streaming을 사용하여 게시된 메시지를 처리하는 Dataproc 작업을 제출합니다. 이 작업은 DemoIOT Solutions에서 사용할 수 있는 실시간 통계와 유용한 정보를 실제로 생성합니다.

  1. 다음 명령어를 실행하여 process_iot.py라는 스트리밍 PySpark 작업 파일을 로컬로 만듭니다.
cat > process_iot.py <<EOF

#!/bin/python

import sys
import configparser
from google.cloud import storage
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg
from pyspark.sql.types import FloatType, IntegerType, StringType, StructField, StructType
from urllib.parse import urlparse
from pyspark.sql.utils import AnalysisException

JSON_SCHEMA = StructType([
    StructField("device_id", StringType()),
    StructField("timestamp", StringType()),
    StructField(
        "location",
        StructType([
            StructField("latitude", FloatType()),
            StructField("longitude", FloatType()),
        ]),
    ),
    StructField(
        "sensor_data",
        StructType([
            StructField("temperature", FloatType()),
            StructField("humidity", FloatType()),
            StructField("pressure", FloatType()),
            StructField("light_level", IntegerType()),
        ]),
    ),
    StructField(
        "device_status",
        StructType([
            StructField("battery_level", IntegerType()),
            StructField("signal_strength", IntegerType()),
            StructField("connection_type", StringType()),
        ]),
    ),
    StructField(
        "metadata",
        StructType([
            StructField("sensor_type", StringType()),
            StructField("unit_temperature", StringType()),
            StructField("unit_humidity", StringType()),
            StructField("unit_pressure", StringType()),
            StructField("unit_light_level", StringType()),
            StructField("firmware_version", StringType()),
        ]),
    ),
])
CLIENT_PROPERTY_KEYS = [
    "security.protocol",
    "sasl.mechanism",
    "sasl.login.callback.handler.class",
    "sasl.jaas.config",
]


def get_client_properties(client_properties_path: str):
    # Parse client properties file
    parsed_path = urlparse(client_properties_path)
    if parsed_path.scheme != "gs":
        raise ValueError("Invalid GCS path for client properties. Must start with gs://.")

    bucket_name = parsed_path.netloc
    blob_name = parsed_path.path.lstrip("/")

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    file_content = "[DEFAULT]\n" + blob.download_as_text()

    config = configparser.ConfigParser()
    config.read_string(file_content)

    client_properties = dict()
    for key in CLIENT_PROPERTY_KEYS:
        client_properties[key] = config.get("DEFAULT", key)

    print(f"Client properties: {client_properties}")
    return client_properties

def process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path):
    print("Starting initial data processing...")
    initial_rows = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_server_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", "kafka-iot-topic")
        .option("kafka.security.protocol", client_properties["security.protocol"])
        .option("kafka.sasl.mechanism", client_properties["sasl.mechanism"])
        .option("kafka.sasl.login.callback.handler.class", client_properties["sasl.login.callback.handler.class"])
        .option("kafka.sasl.jaas.config", client_properties["sasl.jaas.config"])
        .load()
    )

    initial_rows = (
        initial_rows.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), JSON_SCHEMA).alias("data"))
        .select("data.*")
    )
    
    # Print first 20 rows
    def print_top_rows(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for printing first 20 rows: batch {batch_id}, size: {batch_df.count()}")
            batch_df.limit(20).show(truncate=False)

    initial_query_print = initial_rows.writeStream \
        .foreachBatch(print_top_rows) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_print)

    # Calculate and print average temperatures
    def process_initial_avg_temp(batch_df, batch_id):
        if batch_df.count() > 0:
            print(f"Processing initial batch for avg temp: batch {batch_id}, size: {batch_df.count()}")
            current_averages = (
                batch_df.select("device_id", "sensor_data.temperature")
                .groupBy("device_id")
                .agg(avg("temperature").alias("average_temperature"))
                .orderBy("device_id")
            )
            current_averages.show(truncate=False)

    initial_query_avg_temp = initial_rows.writeStream \
        .foreachBatch(process_initial_avg_temp) \
        .trigger(once=True) \
        .start()
    queries_to_await.append(initial_query_avg_temp)
    
    # Write data to GCS
    initial_data_gcs_writer = (
        initial_rows.writeStream.outputMode("append")
        .format("avro")
        .option("path", store_data_gcs_path+"/tables/iot_avro/")
        .option("checkpointLocation", store_data_gcs_path+"/chkpt/avro/")
        .trigger(once=True) \
        .start()
    )
    queries_to_await.append(initial_data_gcs_writer)


def main(bootstrap_server_address, client_properties_path, store_data_gcs_path):

    client_properties = get_client_properties(client_properties_path)

    # Create SparkSession
    spark = SparkSession.builder.appName("IotStreamingParser").getOrCreate()
    
    queries_to_await = []

    process_initial_data(spark, bootstrap_server_address, client_properties, queries_to_await, store_data_gcs_path)
    
    # Wait for all queries to terminate
    for query in queries_to_await:
        try:
            query.awaitTermination()
        except Exception as e:
            print(f"Error awaiting query: {e}")
        finally:
            query.stop()

    spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Invalid number of arguments passed ", len(sys.argv))
        print(
            "Usage: ",
            sys.argv[0],
            " <bootstrap_server_address> <client_properties_path> <gcs_path_to_store_data>",
        )
        raise ValueError("Invalid number of arguments passed.")

    parsed_data_path = urlparse(sys.argv[3])
    if parsed_data_path.scheme != "gs":
        raise ValueError("Invalid GCS path for storing data. Must start with gs://.")


    main(sys.argv[1], sys.argv[2], sys.argv[3])


EOF

설명

  • 이 코드는 지정된 Kafka 주제에서 데이터를 읽는 PySpark 구조적 스트리밍 작업을 설정합니다. 제공된 Kafka 서버 부트스트랩 주소와 GCS 구성 파일에서 로드된 Kafka 구성을 사용하여 Kafka 브로커에 연결하고 인증합니다.
  • 먼저 Kafka에서 원시 데이터를 바이트 배열 스트림으로 읽고 이러한 바이트 배열을 문자열로 변환한 다음 Spark의 StructType을 사용하여 데이터 구조 (기기 ID, 타임스탬프, 위치, 센서 데이터 등)를 지정하여 json_schema를 적용합니다.
  • 처음 10개 행을 콘솔에 출력하여 미리 보고 센서별 평균 온도를 계산하고 모든 데이터를 avro 형식으로 GCS 버킷에 씁니다. Avro는 구조화된 데이터를 간결한 스키마 정의 바이너리 형식으로 효율적으로 저장하는 행 기반 데이터 직렬화 시스템으로, 대규모 데이터 처리를 위한 스키마 진화, 언어 중립성, 높은 압축률을 제공합니다.
  1. client.properties 파일을 만들고 Kafka 서버의 부트스트랩 주소에 대한 환경 변수를 채웁니다.
cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
EOF
export BOOTSTRAP=$(gcloud managed-kafka clusters describe kafka-iot --project=$PROJECT_ID --location=$REGION --format='value(bootstrapAddress)')
  1. Dataproc 작업에서 사용할 수 있도록 process_iot.pyclient.properties 파일을 Google Cloud Storage 버킷에 업로드합니다.
gsutil cp process_iot.py client.properties $BUCKET
  1. Dataproc 작업의 종속성 jar를 GCS 버킷에 복사합니다. 이 디렉터리에는 Kafka로 Spark Streaming 작업을 실행하는 데 필요한 JAR와 클라이언트 머신 설정에서 가져온 관리형 Kafka 인증 라이브러리 및 종속 항목이 포함되어 있습니다.
gsutil -m cp -R gs://dataproc-codelabs/managed-kafka-dependencies/ $BUCKET
  1. Dataproc 클러스터에 Spark 작업을 제출합니다.
gcloud dataproc jobs submit pyspark \
    $BUCKET/process_iot.py \
    --project=$PROJECT_ID \
    --region=$REGION \
    --cluster=dataproc-iot \
    --properties=spark.jars=$BUCKET/managed-kafka-dependencies/* \
    -- $BOOTSTRAP $BUCKET/client.properties $BUCKET

Spark 드라이버 로그가 출력됩니다. 또한 이러한 테이블이 콘솔에 로깅되고 데이터가 GCS 버킷에 저장된 것을 확인할 수 있습니다.

25/06/11 05:16:51 INFO AppInfoParser: Kafka version: 3.7.1
25/06/11 05:16:51 INFO AppInfoParser: Kafka commitId: e2494e6ffb89f828
25/06/11 05:16:51 INFO AppInfoParser: Kafka startTimeMs: 1749619011472
Processing initial batch for avg temp: batch 0, size: 200
Processing initial batch for printing first 20 rows: batch 0, size: 200
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|device_id|timestamp           |location        |sensor_data              |device_status       |metadata                                    |
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+
|sensor-6 |2025-03-01T00:03:00Z|{33.42, -119.66}|{20.9, 50.7, 1003.2, 525}|{70, -41, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.6.0}|
|sensor-7 |2025-03-01T00:03:00Z|{33.32, -119.4} |{20.1, 51.0, 1000.9, 611}|{47, -98, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.4.5}|
|sensor-8 |2025-03-01T00:03:00Z|{33.46, -119.74}|{20.8, 54.3, 1001.3, 734}|{70, -45, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.6}|
|sensor-9 |2025-03-01T00:03:00Z|{33.04, -119.38}|{20.9, 54.5, 1002.9, 956}|{91, -99, Cellular} |{environmental, Celsius, %, hPa, lux, 2.7.4}|
|sensor-0 |2025-03-01T00:03:10Z|{33.22, -119.56}|{20.4, 53.0, 1000.2, 239}|{41, -95, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 2.1.3}|
|sensor-1 |2025-03-01T00:03:10Z|{33.62, -119.4} |{20.0, 53.8, 1000.2, 907}|{97, -84, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.3.1}|
|sensor-2 |2025-03-01T00:03:10Z|{33.56, -119.66}|{20.1, 51.6, 1004.8, 824}|{37, -36, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.5.1}|
|sensor-3 |2025-03-01T00:03:10Z|{33.32, -120.0} |{20.6, 52.5, 1004.1, 557}|{96, -70, Cellular} |{environmental, Celsius, %, hPa, lux, 2.9.4}|
|sensor-4 |2025-03-01T00:03:10Z|{33.48, -119.36}|{20.2, 53.7, 1002.8, 818}|{22, -76, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.2}|
|sensor-5 |2025-03-01T00:03:10Z|{33.48, -119.42}|{20.7, 51.5, 1002.7, 310}|{92, -44, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.5.9}|
|sensor-6 |2025-03-01T00:03:10Z|{33.02, -119.8} |{20.4, 50.4, 1001.0, 190}|{36, -90, Cellular} |{environmental, Celsius, %, hPa, lux, 1.4.5}|
|sensor-7 |2025-03-01T00:03:10Z|{33.16, -119.72}|{20.8, 54.2, 1004.9, 509}|{19, -38, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 2.9.8}|
|sensor-8 |2025-03-01T00:03:10Z|{33.52, -119.9} |{20.0, 54.5, 1003.8, 10} |{29, -31, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.0.5}|
|sensor-9 |2025-03-01T00:03:10Z|{33.64, -119.64}|{20.4, 52.4, 1003.7, 246}|{53, -64, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 1.7.3}|
|sensor-8 |2025-03-01T00:00:40Z|{33.46, -119.42}|{20.0, 52.8, 1003.1, 311}|{85, -67, Cellular} |{environmental, Celsius, %, hPa, lux, 2.2.9}|
|sensor-9 |2025-03-01T00:00:40Z|{33.62, -119.98}|{20.0, 53.5, 1004.1, 502}|{22, -26, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.1.3}|
|sensor-0 |2025-03-01T00:00:50Z|{33.0, -119.38} |{20.1, 53.1, 1003.2, 500}|{49, -84, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 3.0.2}|
|sensor-1 |2025-03-01T00:00:50Z|{33.46, -119.48}|{20.1, 51.6, 1001.3, 982}|{52, -86, Wi-Fi}    |{environmental, Celsius, %, hPa, lux, 3.0.7}|
|sensor-2 |2025-03-01T00:00:50Z|{33.56, -119.74}|{20.3, 52.9, 1004.2, 367}|{29, -100, Cellular}|{environmental, Celsius, %, hPa, lux, 3.2.6}|
|sensor-3 |2025-03-01T00:00:50Z|{33.54, -119.98}|{20.5, 51.2, 1004.2, 657}|{79, -50, LoRaWAN}  |{environmental, Celsius, %, hPa, lux, 1.0.0}|
+---------+--------------------+----------------+-------------------------+--------------------+--------------------------------------------+

25/06/11 05:17:10 INFO AppInfoParser: App info kafka.admin.client for adminclient-2 unregistered
25/06/11 05:17:10 INFO Metrics: Metrics scheduler closed
25/06/11 05:17:10 INFO Metrics: Closing reporter org.apache.kafka.common.metrics.JmxReporter
25/06/11 05:17:10 INFO Metrics: Metrics reporters closed
+---------+-------------------+
|device_id|average_temperature|
+---------+-------------------+
|sensor-0 |20.45999994277954  |
|sensor-1 |20.475             |
|sensor-2 |20.475             |
|sensor-3 |20.405000305175783 |
|sensor-4 |20.42000017166138  |
|sensor-5 |20.464999961853028 |
|sensor-6 |20.579999923706055 |
|sensor-7 |20.544999885559083 |
|sensor-8 |20.41999969482422  |
|sensor-9 |20.405000019073487 |
+---------+-------------------+

8. 삭제

단계에 따라 Codelab을 완료한 후 리소스를 정리합니다.

  1. 관리형 Kafka 클러스터, 게시자 GCE VM, Dataproc 클러스터를 삭제합니다.
gcloud managed-kafka clusters delete kafka-iot --project=$PROJECT_ID --location=$REGION
gcloud compute instances delete publisher-instance --project=$PROJECT_ID --zone=$ZONE
gcloud dataproc clusters delete dataproc-iot --project=$PROJECT_ID --region=$REGION
  1. VPC 서브네트워크 및 네트워크를 삭제합니다.
gcloud compute networks subnets delete $SUBNET --region=$REGION
gcloud compute networks delete <network-name>
  1. 더 이상 데이터를 사용하지 않으려면 GCS 버킷을 삭제합니다.
gcloud storage rm --recursive $BUCKET

9. 축하합니다

축하합니다. DemoIOT Solutions가 기기에서 게시한 데이터에 대한 실시간 유용한 정보를 얻을 수 있도록 Manage Kafka 및 Dataproc으로 IoT 데이터 처리 파이프라인을 성공적으로 빌드했습니다.

관리형 Kafka 클러스터를 만들고, 여기에 IoT 이벤트를 게시하고, Spark 스트리밍을 사용하여 이러한 이벤트를 실시간으로 처리하는 Dataproc 작업을 실행했습니다. 이제 관리형 Kafka와 Dataproc을 사용하여 데이터 파이프라인을 만드는 데 필요한 주요 단계를 알게 되었습니다.

참조 문서