Khoa học dữ liệu với Spark

1. Tổng quan

TheLook là một nhà bán lẻ quần áo thương mại điện tử giả định, lưu trữ dữ liệu về khách hàng, sản phẩm, đơn đặt hàng, hoạt động hậu cần, sự kiện trên web và chiến dịch tiếp thị kỹ thuật số trong BigQuery. Công ty muốn tận dụng kiến thức chuyên môn hiện có của nhóm về SQL và PySpark để phân tích dữ liệu này bằng Apache Spark.

Để tránh việc cung cấp hoặc điều chỉnh cơ sở hạ tầng theo cách thủ công cho Spark, TheLook đang tìm kiếm một giải pháp tự động cấp tài nguyên bổ sung để họ có thể tập trung vào khối lượng công việc thay vì quản lý cụm. Ngoài ra, họ muốn giảm thiểu công sức cần thiết để tích hợp Spark và BigQuery trong khi vẫn ở trong môi trường BigQuery Studio.

Trong phòng thí nghiệm này, bạn sẽ dự đoán liệu người dùng có mua hàng hay không bằng cách tạo một Trình phân loại hồi quy logistic bằng PySpark, đồng thời tận dụng tính năng tích hợp sổ tay gốc và các tính năng AI của BigQuery Studio để khám phá dữ liệu. Bạn triển khai mô hình này vào một máy chủ suy luận và tạo một tác nhân để truy vấn mô hình bằng ngôn ngữ tự nhiên.

Điều kiện tiên quyết

Trước khi bắt đầu lớp học này, bạn nên làm quen với:

  • Lập trình cơ bản bằng SQL và Python.
  • Chạy mã Python trong sổ tay Jupyter.
  • Hiểu biết cơ bản về điện toán phân tán

Mục tiêu

  • Sử dụng sổ tay BigQuery Studio để chạy quy trình khoa học dữ liệu.
  • Tạo kết nối đến Apache Spark bằng Google Cloud Serverless cho Apache Spark và được hỗ trợ bởi Spark Connect.
  • Sử dụng Lightning Engine để tăng tốc khối lượng công việc Apache Spark lên đến 4,3 lần.
  • Tải dữ liệu từ BigQuery bằng cách sử dụng chế độ tích hợp sẵn giữa Apache Spark và BigQuery.
  • Khám phá dữ liệu bằng tính năng tạo mã có sự hỗ trợ của Gemini.
  • Thực hiện kỹ thuật trích xuất tính chất bằng cách sử dụng khung xử lý dữ liệu của Apache Spark.
  • Huấn luyện và đánh giá một mô hình phân loại bằng cách sử dụng thư viện học máy gốc của Apache Spark, MLlib.
  • Triển khai một máy chủ suy luận cho mô hình phân loại bằng cách sử dụng FlaskCloud Run
  • Triển khai một tác nhân để truy vấn máy chủ suy luận bằng ngôn ngữ tự nhiên bằng Agent EngineBộ công cụ phát triển tác nhân (ADK),

2. Kết nối với môi trường thời gian chạy Colab

Xác định một dự án trên Google Cloud

Tạo một dự án trên Google Cloud. Bạn có thể sử dụng một tài khoản hiện có.

Nhấp vào đây để bật các API sau:

  1. Trong Google Cloud Console, hãy chuyển đến Trình đơn điều hướng > BigQuery.

Một mũi tên đang chỉ vào thẻ BigQuery trong bảng điều khiển Google Cloud.

  1. Trong ngăn BigQuery Studio, hãy nhấp vào nút mũi tên thả xuống, di chuột qua Notebook rồi chọn Tải lên.

11fd85757040c058.png

  1. Chọn nút chọn URL rồi nhập URL sau:
https://github.com/GoogleCloudPlatform/devrel-demos/blob/main/data-analytics/dataproc-webinar/data-science/Spark_Data_Science.ipynb
  1. Đặt vùng thành us-central11 rồi nhấp vào Tải lên.

1f2743e9f0a37b3c.png

  1. Để mở sổ tay, hãy nhấp vào mũi tên thả xuống trong ngăn Explorer (Trình khám phá) có tên project-id của bạn. Sau đó, nhấp vào trình đơn thả xuống Sổ tay. Nhấp vào sổ ghi chú Spark_Data_Science.

aef016c292c8382.png

  1. Thu gọn trình đơn điều hướng BigQueryMục lục của sổ tay để có thêm không gian.

1c4b49de92ade1d9.png

3. Kết nối với một thời gian chạy và chạy mã thiết lập bổ sung

  1. Nhấp vào Kết nối. Trong cửa sổ bật lên, hãy uỷ quyền cho Colab Enterprise bằng tài khoản email của bạn. Sổ tay của bạn sẽ tự động kết nối với một thời gian chạy.

995465ba6dbfa550.png

  1. Sau khi thiết lập thời gian chạy, bạn sẽ thấy những thông tin sau:

7f917e7c54a84c91.png

  1. Trong sổ tay, hãy di chuyển đến phần Thiết lập. Bắt đầu tại đây.

4. Chạy mã thiết lập

Định cấu hình môi trường của bạn bằng các thư viện Python cần thiết để hoàn thành bài tập thực hành. Định cấu hình Private Google Access. Tạo một bộ chứa Storage. Tạo một tập dữ liệu BigQuery. Sao chép mã dự án vào sổ tay. Chọn một vùng. Trong phòng thí nghiệm này, hãy sử dụng khu vực us-central1.

Bạn có thể thực thi một ô mã bằng cách di chuột vào bên trong khối ô và nhấp vào mũi tên.

9b8ccb7d6016ebb9.png

# Enable APIs
import subprocess

command = [
    "gcloud",
    "services",
    "enable",
    "aiplatform.googleapis.com",
    "bigquery.googleapis.com",
    "bigquerystorage.googleapis.com",
    "bigqueryunified.googleapis.com",
    "cloudaicompanion.googleapis.com",
    "dataproc.googleapis.com",
    "run.googleapis.com",
    "storage.googleapis.com"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

# Configure a PROJECT_ID and REGION
PROJECT_ID = "<YOUR_PROJECT_ID>"
REGION = "<YOUR_REGION>"

# Enable Private Google Access
import subprocess

command = [
    "gcloud",
    "compute",
    "networks",
    "subnets",
    "update",
    "default",
    f"--region={REGION}",
    "--enable-private-ip-google-access"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

# Create a Cloud Storage Bucket
from google.cloud import storage
from google.cloud.exceptions import NotFound

BUCKET_NAME = f"{PROJECT_ID}-demo"

storage_client = storage.Client(project=PROJECT_ID)
try:
    bucket = storage_client.get_bucket(BUCKET_NAME)
    print(f"Bucket {BUCKET_NAME} already exists.")
except NotFound:
    bucket = storage_client.create_bucket(BUCKET_NAME, location=REGION)
    print(f"Bucket {BUCKET_NAME} created.")


# Create a BigQuery dataset.
from google.cloud import bigquery

DATASET_ID = f"{PROJECT_ID}.demo"

client = bigquery.Client()

dataset = bigquery.Dataset(DATASET_ID)

dataset.location = REGION

dataset = client.create_dataset(dataset, exists_ok=True)

5. Tạo kết nối với Google Cloud Serverless cho Apache Spark

Khi sử dụng Spark Connect, bạn sẽ kết nối với một phiên Spark không máy chủ để chạy các công việc Spark tương tác. Bạn định cấu hình thời gian chạy bằng Lightning Engine để có hiệu suất Spark nâng cao. Lightning Engine hoạt động bằng cách tăng tốc khối lượng công việc bằng Apache GlutenVelox.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session = Session()

session.runtime_config.version = "3.0"

# You can optionally configure Spark properties as well. See https://cloud.google.com/dataproc-serverless/docs/concepts/properties.
session.runtime_config.properties = {
  "dataproc.runtime": "premium",
  "spark.dataproc.engine": "lightningEngine",
}

# To avoid going over quota in this demo, cap the max number of Spark workers.
session.runtime_config.properties = {
    "spark.dynamicAllocation.maxExecutors": "4"
}

spark = (
    DataprocSparkSession.builder
      .appName("CustomSparkSession")
      .dataprocSessionConfig(session)
      .getOrCreate()
)

6. Tải và khám phá dữ liệu bằng Gemini

Trong phần này, bạn sẽ thực hiện bước quan trọng đầu tiên trong mọi dự án khoa học dữ liệu: chuẩn bị dữ liệu. Bạn bắt đầu bằng cách tải dữ liệu vào một khung dữ liệu Apache Spark từ BigQuery.

# Load the tables
order_items = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.order_items").load()

users = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.users").load()

# Register temp tables
users.createOrReplaceTempView("users")
order_items.createOrReplaceTempView("order_items")

# Verify temp tables
spark.sql("SELECT * FROM order_items LIMIT 10").show()

Sau đó, bạn sử dụng Gemini để tạo mã PySpark nhằm khám phá và hiểu rõ hơn về dữ liệu.

200d3133ea7d410b.png

Câu lệnh 1: Sử dụng PySpark, khám phá bảng người dùng và cho thấy 10 hàng đầu tiên.

# prompt:  Using PySpark, explore the users table and show the first 10 rows.

users.show(10)

Câu lệnh 2: Sử dụng PySpark, khám phá bảng order_items và hiển thị 10 hàng đầu tiên.

# prompt: Using PySpark, explore the order_items table and show the first 10 rows.

order_items.show(10)

Câu lệnh 3: Sử dụng PySpark, cho biết 5 quốc gia xuất hiện thường xuyên nhất trong bảng người dùng. Hiển thị quốc gia và số lượng người dùng ở mỗi quốc gia.

# prompt: Using PySpark, show the top 5 most frequent countries in the users table. Display the country and the number of users from each country.

from pyspark.sql.functions import col, count

users.groupBy("country").agg(count("*").alias("user_count")).orderBy(col("user_count").desc()).limit(5).show()

Câu lệnh 4: Sử dụng PySpark để tìm giá bán trung bình của các mặt hàng trong bảng order_items.

# prompt: Using PySpark, find the average sale price of items in the order_items table.

from pyspark.sql import functions as F

average_sale_price = order_items.agg(F.avg("sale_price").alias("average_sale_price"))
average_sale_price.show()

Câu lệnh 5: Sử dụng bảng "users", tạo mã để vẽ biểu đồ quốc gia so với nguồn lưu lượng truy cập bằng cách sử dụng một thư viện vẽ biểu đồ phù hợp.

# prompt: Using the table "users", generate code to plot country vs traffic source using a suitable plotting library.

sql = """
    SELECT
        country,
        traffic_source
    FROM
        `bigquery-public-data.thelook_ecommerce.users`
    WHERE country IS NOT NULL AND traffic_source IS NOT NULL
"""
project_id = "iceberg-summit-2025"
df = pandas_gbq.read_gbq(sql, project_id=project_id, dialect="standard")

import matplotlib.pyplot as plt
import seaborn as sns

# Group by country and traffic_source and count occurrences
df_grouped = df.groupby(['country', 'traffic_source']).size().reset_index(name='count')

# Create a pivot table for easier plotting
pivot_table = df_grouped.pivot(index='country', columns='traffic_source', values='count').fillna(0)

# Plotting
plt.figure(figsize=(15, 8))
pivot_table.plot(kind='bar', stacked=True, figsize=(15, 8))
plt.title('Traffic Source Distribution by Country')
plt.xlabel('Country')
plt.ylabel('Number of Users')
plt.xticks(rotation=90)
plt.legend(title='Traffic Source')
plt.tight_layout()
plt.show()

Câu lệnh 6: Tạo biểu đồ tần suất cho thấy sự phân bổ của "độ tuổi", "quốc gia", "giới tính", "traffic_source".

# prompt: Create a histogram showing the distribution of "age", "country", "gender", "traffic_source".

import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas DataFrame for visualization
users_pd = users.toPandas()

# Create histograms for 'age', 'country', 'gender', 'traffic_source'
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Distribution of User Attributes')

# Age distribution
axes[0, 0].hist(users_pd['age'].dropna(), bins=20, edgecolor='black')
axes[0, 0].set_title('Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Number of Users')

# Country distribution
users_pd['country'].value_counts().head(10).plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Top 10 Countries Distribution')
axes[0, 1].set_xlabel('Country')
axes[0, 1].set_ylabel('Number of Users')
axes[0, 1].tick_params(axis='x', rotation=45)

# Gender distribution
users_pd['gender'].value_counts().plot(kind='bar', ax=axes[1, 0])
axes[1, 0].set_title('Gender Distribution')
axes[1, 0].set_xlabel('Gender')
axes[1, 0].set_ylabel('Number of Users')
axes[1, 0].tick_params(axis='x', rotation=0)

# Traffic Source distribution
users_pd['traffic_source'].value_counts().head(10).plot(kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('Top 10 Traffic Source Distribution')
axes[1, 1].set_xlabel('Traffic Source')
axes[1, 1].set_ylabel('Number of Users')
axes[1, 1].tick_params(axis='x', rotation=45)

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

7. Chuẩn bị dữ liệu và kỹ thuật trích xuất tính chất

Tiếp theo, bạn thực hiện kỹ thuật trích xuất tính chất trên dữ liệu. Chọn các cột thích hợp, chuyển đổi dữ liệu thành các kiểu dữ liệu phù hợp hơn và xác định một cột nhãn.

features = spark.sql("""
SELECT
  CAST(u.age AS DOUBLE) AS age,
  CAST(hash(u.country) AS BIGINT) * 1.0 AS country_hash,
  CAST(hash(u.gender) AS BIGINT) * 1.0 AS gender_hash,
  CAST(hash(u.traffic_source) AS BIGINT) * 1.0 AS traffic_source_hash,
  CASE WHEN COUNT(oi.id) > 0 THEN 1 ELSE 0 END AS label -- Changed label generation to count order items
FROM users AS u
LEFT JOIN order_items AS oi
ON u.id = oi.user_id
GROUP BY u.id, u.age, u.country, u.gender, u.traffic_source
""")
features.show()

8. Huấn luyện mô hình hồi quy logistic

Bằng cách sử dụng MLlib, bạn sẽ huấn luyện một mô hình hồi quy logistic. Trước tiên, bạn dùng VectorAssembler để chuyển đổi dữ liệu sang định dạng vectơ. Sau đó, StandardScaler sẽ điều chỉnh tỷ lệ cột tính năng để có hiệu suất tốt hơn. Sau đó, bạn tạo một tham chiếu đến mô hình LogisticRegression và xác định các siêu tham số. Bạn kết hợp các bước này thành một đối tượng Pipeline, huấn luyện mô hình bằng hàm fit() và biến đổi dữ liệu bằng hàm transform().

from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.functions import array_to_vector

#Split Train and Test Data (80:20)
train_data, test_data = features.randomSplit([0.8, 0.2], seed=42)

# Initialize VectorAssembler
assembler = VectorAssembler(
    inputCols=["age", "country_hash", "gender_hash", "traffic_source_hash"],
    outputCol="assembled_features"
)

# Initialize StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")

# Initialize Logistic Regression model
lr = LogisticRegression(
    maxIter=100,
    regParam=0.2,
    threshold=0.8,
    featuresCol="scaled_features",
    labelCol="label"
)

# Define pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Fit the model
pipeline_model = pipeline.fit(train_data)

# Transform the dataset using the trained model
transformed_dataset = pipeline_model.transform(test_data)
transformed_dataset.show()

9. Đánh giá mô hình

Đánh giá tập dữ liệu mới được chuyển đổi. Tạo chỉ số đánh giá diện tích dưới đường cong (AUC).

# Model evaluation
eva = BinaryClassificationEvaluator(metricName="areaUnderPR")
aucPR = eva.evaluate(transformed_dataset)
print(f"AUC PR: {aucPR}")

Sau đó, hãy dùng Gemini để tạo mã PySpark nhằm trực quan hoá đầu ra của mô hình.

Câu lệnh 1: Tạo mã để vẽ đường cong Precision-Recall (PR). Tính độ chính xác và khả năng thu hồi từ các dự đoán của mô hình, đồng thời hiển thị đường cong PR bằng cách sử dụng một thư viện vẽ biểu đồ phù hợp.

# prompt: Generate code to plot the Precision-Recall (PR) curve. Calculate precision and recall from the model's predictions and display the PR curve using a suitable plotting library.

import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, auc

# Extract predictions and labels
predictions = transformed_dataset.select("prediction", "label").toPandas()

# Calculate precision and recall
precision, recall, _ = precision_recall_curve(predictions["label"], predictions["prediction"])

# Calculate AUC-PR
pr_auc = auc(recall, precision)

# Plot the PR curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid(True)
plt.show()

Lời nhắc 2: Tạo mã để tạo một bản trực quan hoá ma trận nhầm lẫn. Tính toán ma trận nhầm lẫn từ các dự đoán của mô hình và hiển thị ma trận này dưới dạng bản đồ nhiệt hoặc bảng có số lượng dương tính thực (TP), âm tính thực (TN), dương tính giả (FP) và âm tính giả (FN).

# prompt: Generate code to create a confusion matrix visualization. Calculate the confusion matrix from the model's predictions and display it as a heat map or a table with counts of true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN).

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# Extract predictions and labels
predictions_and_labels = transformed_dataset.select("prediction", "label").toPandas()

# Calculate the confusion matrix
cm = confusion_matrix(predictions_and_labels["label"], predictions_and_labels["prediction"])

# Create a DataFrame for better visualization
cm_df = pd.DataFrame(cm,
                     index=['Actual Negative', 'Actual Positive'],
                     columns=['Predicted Negative', 'Predicted Positive'])

# Display the confusion matrix as a table
print("Confusion Matrix:")
print(cm_df)

# Display the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=.5)
plt.title('Confusion Matrix')
plt.ylabel('Actual Label')
plt.xlabel('Predicted Label')
plt.show()

# Calculate and display TP, TN, FP, FN
TN, FP, FN, TP = cm.ravel()
print(f"
True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")

10. Ghi các dự đoán vào BigQuery

Dùng Gemini để tạo mã nhằm ghi các dự đoán vào một bảng mới trong tập dữ liệu BigQuery.

Câu lệnh: Sử dụng Spark để ghi tập dữ liệu đã chuyển đổi vào BigQuery.

# prompt: Using Spark, write the transformed dataset to BigQuery.

transformed_dataset.write.format("bigquery").option("table", f"{PROJECT_ID}.demo.predictions").mode("overwrite").save()

11. Lưu mô hình vào Cloud Storage

Sử dụng chức năng gốc của MLlib, hãy lưu mô hình của bạn vào Cloud Storage. Máy chủ suy luận tải mô hình từ đây.

MODEL_PATH = "models/prediction_model"
pipeline_model.write().overwrite().save(f"gs://{BUCKET_NAME}/{MODEL_PATH}")

12. Tạo một máy chủ suy luận

Cloud Run là một công cụ linh hoạt để chạy các ứng dụng web không máy chủ. Nó sử dụng các vùng chứa Docker để mang đến cho người dùng khả năng tuỳ chỉnh tối đa. Đối với phòng thí nghiệm này, Dockerfile được định cấu hình để chạy một ứng dụng Flask hỗ trợ PySpark. Vùng chứa này chạy trên Cloud Run để thực hiện suy luận trên dữ liệu đầu vào. Bạn có thể tìm thấy mã của tính năng này tại đây.

Sao chép kho lưu trữ bằng mã máy chủ suy luận.

!git clone https://github.com/GoogleCloudPlatform/devrel-demos.git

Xem tệp Docker.

FROM python:3.12-slim

# Install OpenJDK-21 (Required for Spark)
RUN apt-get update && \
    apt-get install -y openjdk-21-jre-headless procps && \
    rm -rf /var/lib/apt/lists/*

ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
ENV PORT=8080

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY main.py .

CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app"]

Xem mã Python cho máy chủ.

import os
import json
import logging

from flask import Flask, request, jsonify
from google.cloud import storage
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import hash, col

# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Initialization: Spark and Model Loading ---
GCS_BUCKET = os.environ.get("GCS_BUCKET")
GCS_MODEL_PATH = os.environ.get("GCS_MODEL_PATH")
LOCAL_MODEL_PATH = "/tmp/model"

try:
    spark = SparkSession.builder \
        .appName("CloudRunSparkService") \
        .master("local[*]") \
        .getOrCreate()
    logging.info("Spark Session successfully initialized.")
except Exception as e:
    logging.error(f"Failed to initialize Spark Session: {e}")
    raise

def download_directory(bucket_name, prefix, local_path):
    """Downloads a directory from GCS to local filesystem."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=prefix))
    
    if len(blobs) == 0:
        logging.error(f"No files found in GCS bucket {bucket_name} at prefix {prefix}")
        return
    
    for blob in blobs:
        if blob.name.endswith("/"): continue # Skip directories
        
        # Structure local paths
        relative_path = os.path.relpath(blob.name, prefix)
        local_file_path = os.path.join(local_path, relative_path)
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
        
        blob.download_to_filename(local_file_path)
    print(f"Model downloaded to {local_path}")

# Load model
def load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH):
    """Download and load model on startup to avoid latency per request."""
    global MODEL
    if not os.path.exists(LOCAL_MODEL_PATH):
        download_directory(GCS_BUCKET, GCS_MODEL_PATH, LOCAL_MODEL_PATH)
    
    logging.info(f"Loading PySpark model from {GCS_MODEL_PATH}")
    # Load the Spark ML model
    try:
        MODEL = PipelineModel.load(LOCAL_MODEL_PATH)
        logging.info("Spark model loaded successfully.")
    except Exception as e:
        logging.error(f"Failed to load model: {e}")
        raise
    
# Load Model on Startup
load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH)

# --- Flask Application Setup ---
app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict():
    """
    Handles incoming POST requests for inference.
    Expects JSON data that can be converted into a Spark DataFrame.
    """
    if MODEL is None:
        return jsonify({"error": "Model failed to load at startup."}), 500

    try:
        # 1. Get data from the request
        data = request.get_json()
        
        # 2. Check length of list
        data_len = len(data)
        cap = 100
        if data_len > cap:
            return jsonify({"error": f"Too many records. Count: {data_len}, Max: {cap}"}), 400

        # 2. Create Spark DataFrame
        df = spark.createDataFrame(data)
        
        # 3. Transform data
        input_df = df.select(
            col("age").cast("DOUBLE").alias("age"), 
            (hash(col("country")).cast("BIGINT") * 1.0).alias("country_hash"),
            (hash(col("gender")).cast("BIGINT") * 1.0).alias("gender_hash"),
            (hash(col("traffic_source")).cast("BIGINT") * 1.0).alias("traffic_source_hash")
        )

        # 3. Perform Inference
        predictions_df = MODEL.transform(input_df)

        # 4. Prepare results (collect and serialize)
        results = [p.prediction for p in predictions_df.select("prediction").collect()]

        # 5. Return JSON response
        return jsonify({"predictions": results})

    except Exception as e:
        logging.error(f"An error occurred during prediction: {e}")
        #return jsonify({"error": str(e)}), 500
        raise e  
    
# Gunicorn entry point uses 'app' from this file
if __name__ == '__main__':
    # Local testing only: Cloud Run uses Gunicorn/CMD command
    app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

Triển khai máy chủ suy luận.

import subprocess

command = [
    "gcloud",
    "run",
    "deploy",
    "inference-server",
    "--source",
    "/content/devrel-demos/data-analytics/dataproc-webinar/data-science/inference-server",
    "--region",
    f"{REGION}",
    "--port",
    "8080",
    "--memory",
    "2Gi",
    "--allow-unauthenticated",
    "--set-env-vars",
    f"GCS_BUCKET={BUCKET_NAME},GCS_MODEL_PATH={MODEL_PATH}",
    "--startup-probe",
    "tcpSocket.port=8080,initialDelaySeconds=240,failureThreshold=3,timeoutSeconds=240,periodSeconds=240"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

Sao chép URL của máy chủ suy luận từ đầu ra vào một biến mới. Chế độ này sẽ tương tự như https://inference-server-123456789.us-central1.run.app.

INFERENCE_SERVER_URL = "<YOUR_SERVER_URL>"

Kiểm thử máy chủ suy luận.

import requests

age = "25.0"
country = "United States"
traffic_source = "Search"
gender = "F"

response = requests.post(
    f"{INFERENCE_SERVER_URL}/predict",
    json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
    headers={"Content-Type": "application/json"}
)

print(response.json())

Kết quả đầu ra phải là 1.0 hoặc 0.0.

{'predictions': [1.0]}

13. Định cấu hình một tác nhân

Sử dụng Agent Engine để tạo một tác nhân có thể thực hiện suy luận. Agent Engine là một phần của Nền tảng Trí tuệ nhân tạo Vertex AI, một bộ dịch vụ cho phép nhà phát triển triển khai, quản lý và mở rộng quy mô các tác nhân AI trong quá trình sản xuất. Nền tảng này có nhiều công cụ, bao gồm cả công cụ đánh giá tác nhân, bối cảnh phiên và thực thi mã. Thư viện này hỗ trợ nhiều khung tác nhân phổ biến, bao gồm cả Bộ công cụ phát triển tác nhân (ADK). ADK là một khung tác nhân nguồn mở, được xây dựng và tối ưu hoá để sử dụng với Gemini và hệ sinh thái Google, nhưng không phụ thuộc vào mô hình. Khung này được thiết kế để giúp quá trình phát triển tác nhân giống với quá trình phát triển phần mềm hơn.

Khởi chạy ứng dụng Vertex AI.

import vertexai
from vertexai import agent_engines # For the prebuilt templates

client = vertexai.Client(  # For service interactions via client.agent_engines
    project=f"{PROJECT_ID}",
    location=f"{REGION}",
)

Xác định một hàm để truy vấn mô hình đã triển khai.

def predict_purchase(
    age: str = "25.0",
    country: str = "United States",
    traffic_source: str = "Search",
    gender: str = "M",
):
    """Predicts whether or not a user will purchase a product.

    Args:
        age: The age of the user.
        country: The country of the user. One of: "China", "Poland", "Germany", "United States", "Spain", "United Kingdom", "España", "Japan", "Brasil", "Colombia", "Belgium", "South Korea", "Austria", "France", "Australia".
        Traffic_source: The source of the user's traffic. One of: "Display", "Email", "Search", "Organic", "Facebook".
        gender: The gender of the user. One of: "M" or "F".

    Returns:
        True if the model output is 1.0, False otherwise.
    """
    import requests
    response = requests.post(
        f"{INFERENCE_SERVER_URL}/predict",
        json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
        headers={"Content-Type": "application/json"}
    )
    return response.json()

Kiểm thử hàm bằng cách truyền các tham số mẫu.

predict_purchase(age=25.0, country="United States", traffic_source="Search", gender="M")

Sử dụng ADK, hãy xác định một tác nhân bên dưới và cung cấp hàm predict_purchase làm công cụ.

from google.adk.agents import Agent
from vertexai import agent_engines

agent = Agent(
   model="gemini-2.5-flash",
   name='purchase_prediction_agent',
   tools=[predict_purchase]
)

Kiểm thử tác nhân cục bộ bằng cách truyền vào một truy vấn.

app = agent_engines.AdkApp(agent=agent)
async for event in app.async_stream_query(
    user_id="123",
    message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
    try:
        print(event['content']['parts'][0]['text'])
    except:
      continue

Triển khai mô hình vào Agent Engine.

remote_agent = client.agent_engines.create(
    agent=app,
    config={
        "requirements": ["google-cloud-aiplatform[agent_engines,adk]"],
        "staging_bucket": f"gs://{BUCKET_NAME}",
        "display_name": "purchase-predictor",
        "description": "Agent that predicts whether or not a user will purchase a product.",
    }
)

Sau khi hoàn tất, hãy xem mô hình đã triển khai trong Cloud Console.

Truy vấn lại mô hình. Thao tác này hiện trỏ đến tác nhân đã triển khai thay vì phiên bản cục bộ.

async for event in remote_agent.async_stream_query(
    user_id="123",
    message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
    try:
        print(event['content']['parts'][0]['text'])
    except:
      continue

14. Dọn dẹp

Xoá tất cả tài nguyên trên Google Cloud mà bạn đã tạo. Việc chạy các lệnh dọn dẹp như thế này là một phương pháp hay quan trọng để tránh phát sinh các khoản phí trong tương lai.

# Delete the deployed agent.
remote_agent.delete(force=True)

# Delete the inference server.
import subprocess

command = [
    "gcloud",
    "run",
    "services",
    "delete",
    "inference-server",
    "--region",
    f"{REGION}",
    "--quiet"
]

subprocess.run(command, capture_output=True, text=True)

# Delete the BigQuery dataset.
bigquery_client = bigquery.Client()

bigquery_client.delete_dataset(
    f"{PROJECT_ID}.demo", delete_contents=True, not_found_ok=True
)

# Delete the Storage bucket.
storage_client = storage.Client()

bucket = storage_client.get_bucket(BUCKET_NAME)
bucket.delete_blobs(list(bucket.list_blobs()))
bucket.delete()

15. Xin chúc mừng!

Thật tuyệt vời! Trong lớp học lập trình này, bạn đã thực hiện những việc sau:

  • Sử dụng sổ tay BigQuery Studio để chạy quy trình khoa học dữ liệu.
  • Tạo kết nối với Apache Spark bằng Google Cloud Serverless cho Apache Spark và được hỗ trợ bởi Spark Connect.
  • Sử dụng Lightning Engine để tăng tốc khối lượng công việc Apache Spark lên đến 4,3 lần.
  • Tải dữ liệu từ BigQuery bằng cách sử dụng chế độ tích hợp sẵn giữa Apache Spark và BigQuery.
  • Khám phá dữ liệu bằng tính năng tạo mã có sự hỗ trợ của Gemini.
  • Thực hiện kỹ thuật trích xuất tính chất bằng cách sử dụng khung xử lý dữ liệu của Apache Spark.
  • Huấn luyện và đánh giá một mô hình phân loại bằng cách sử dụng thư viện học máy gốc của Apache Spark, MLlib.
  • Triển khai một máy chủ suy luận cho mô hình phân loại bằng cách sử dụng FlaskCloud Run
  • Triển khai một tác nhân để truy vấn máy chủ suy luận bằng ngôn ngữ tự nhiên với Agent EngineBộ công cụ phát triển tác nhân (ADK),

Tiếp theo là gì?