1. ภาพรวม
TheLook ซึ่งเป็นผู้ค้าปลีกเสื้อผ้าอีคอมเมิร์ซสมมติ จัดเก็บข้อมูลเกี่ยวกับลูกค้า ผลิตภัณฑ์ คำสั่งซื้อ โลจิสติกส์ เหตุการณ์บนเว็บ และแคมเปญการตลาดดิจิทัลใน BigQuery บริษัทต้องการใช้ประโยชน์จากความเชี่ยวชาญด้าน SQL และ PySpark ที่ทีมมีอยู่เพื่อวิเคราะห์ข้อมูลนี้โดยใช้ Apache Spark
TheLook ต้องการโซลูชันการปรับขนาดอัตโนมัติที่ช่วยให้มุ่งเน้นไปที่ภาระงานแทนการจัดการคลัสเตอร์ เพื่อหลีกเลี่ยงการจัดสรรหรือการปรับโครงสร้างพื้นฐานสำหรับ Spark ด้วยตนเอง นอกจากนี้ ยังต้องการลดความพยายามที่จำเป็นในการผสานรวม Spark และ BigQuery ในขณะที่ยังคงอยู่ในสภาพแวดล้อม BigQuery Studio
ใน Lab นี้ คุณจะคาดการณ์ว่าผู้ใช้จะทำการซื้อหรือไม่โดยการสร้างตัวแยกประเภทการถดถอยลอจิสติกโดยใช้ PySpark และใช้ประโยชน์จากการผสานรวม Notebook ดั้งเดิมและฟีเจอร์ AI ของ BigQuery Studio เพื่อสํารวจข้อมูล คุณทำให้ใช้งานได้โมเดลนี้ในเซิร์ฟเวอร์การอนุมานและสร้าง Agent เพื่อค้นหาโมเดลโดยใช้ภาษาธรรมชาติ
ข้อกำหนดเบื้องต้น
ก่อนเริ่มแล็บนี้ คุณควรมีความรู้เกี่ยวกับสิ่งต่อไปนี้
- การเขียนโปรแกรม SQL และ Python ขั้นพื้นฐาน
- การเรียกใช้โค้ด Python ในสมุดบันทึก Jupyter
- ความเข้าใจพื้นฐานเกี่ยวกับการประมวลผลแบบกระจาย
วัตถุประสงค์
- ใช้ Notebook ของ BigQuery Studio เพื่อเรียกใช้เวิร์กโฟลว์ Data Science
- สร้างการเชื่อมต่อกับ Apache Spark โดยใช้ Google Cloud Serverless สำหรับ Apache Spark และขับเคลื่อนโดย Spark Connect
- ใช้ Lightning Engine เพื่อเร่งความเร็วเวิร์กโหลด Apache Spark ได้สูงสุด 4.3 เท่า
- โหลดข้อมูลจาก BigQuery โดยใช้การผสานรวมในตัวระหว่าง Apache Spark กับ BigQuery
- สำรวจข้อมูลโดยใช้การสร้างโค้ดที่ Gemini ช่วย
- ทำการ Feature Engineering โดยใช้เฟรมเวิร์กการประมวลผลข้อมูลของ Apache Spark
- ฝึกและประเมินโมเดลการแยกประเภทโดยใช้ไลบรารีแมชชีนเลิร์นนิงดั้งเดิมของ Apache Spark ซึ่งก็คือ MLlib
- ติดตั้งใช้งานเซิร์ฟเวอร์การอนุมานสำหรับโมเดลการจัดประเภทโดยใช้ Flask และ Cloud Run
- ทำให้ Agent ใช้งานได้เพื่อค้นหาเซิร์ฟเวอร์การอนุมานโดยใช้ภาษาธรรมชาติด้วย Agent Engine และ Agent Development Kit (ADK)
2. เชื่อมต่อกับสภาพแวดล้อมรันไทม์ของ Colab
ระบุโปรเจ็กต์ Google Cloud
สร้างโปรเจ็กต์ Google Cloud คุณอาจใช้บัญชีที่มีอยู่แล้วก็ได้
เปิดใช้ API ที่แนะนำ
คลิกที่นี่เพื่อเปิดใช้ API ต่อไปนี้
- aiplatform.googleapis.com
- bigquery.googleapis.com
- bigquerystorage.googleapis.com
- bigqueryunified.googleapis.com
- cloudaicompanion.googleapis.com
- dataproc.googleapis.com
- storage.googleapis.com
- run.googleapis.com
การไปยังส่วนต่างๆ ของ UI:
- ในคอนโซล Google Cloud ให้ไปที่เมนูการนำทาง > BigQuery

- ในแผง BigQuery Studio ให้คลิกปุ่มลูกศรเมนูแบบเลื่อนลง วางเมาส์เหนือ Notebook แล้วเลือกอัปโหลด

- เลือกปุ่มตัวเลือก URL แล้วป้อน URL ต่อไปนี้
https://github.com/GoogleCloudPlatform/devrel-demos/blob/main/data-analytics/dataproc-webinar/data-science/Spark_Data_Science.ipynb
- ตั้งค่าภูมิภาคเป็น
us-central11แล้วคลิกอัปโหลด

- หากต้องการเปิด Notebook ให้คลิกลูกศรเมนูแบบเลื่อนลงในแผง Explorer ที่มีชื่อโปรเจ็กต์-รหัส จากนั้นคลิกเมนูแบบเลื่อนลงสำหรับ Notebook คลิก Notebook
Spark_Data_Science

- ยุบเมนูการนำทาง BigQuery และสารบัญของ Notebook เพื่อให้มีพื้นที่มากขึ้น

3. เชื่อมต่อกับรันไทม์และเรียกใช้โค้ดการตั้งค่าเพิ่มเติม
- คลิกเชื่อมต่อ ในป๊อปอัป ให้ให้สิทธิ์ Colab Enterprise ด้วยบัญชีอีเมล สมุดบันทึกจะเชื่อมต่อกับรันไทม์โดยอัตโนมัติ

- เมื่อกำหนดระยะเวลาการทำงานแล้ว คุณจะเห็นข้อมูลต่อไปนี้

- เลื่อนไปที่ส่วนการตั้งค่าใน Notebook เริ่มที่นี่
4. เรียกใช้รหัสการตั้งค่า
กำหนดค่าสภาพแวดล้อมด้วยไลบรารี Python ที่จำเป็นเพื่อทำ Lab ให้เสร็จสมบูรณ์ กำหนดค่าการเข้าถึง Google แบบส่วนตัว สร้างที่เก็บข้อมูล สร้างชุดข้อมูล BigQuery คัดลอกรหัสโปรเจ็กต์ลงใน Notebook เลือกภูมิภาค สำหรับแล็บนี้ ให้ใช้ภูมิภาค us-central1
คุณรันโค้ดในเซลล์โค้ดได้โดยวางเคอร์เซอร์ไว้ในบล็อกเซลล์แล้วคลิกลูกศร

# Enable APIs
import subprocess
command = [
"gcloud",
"services",
"enable",
"aiplatform.googleapis.com",
"bigquery.googleapis.com",
"bigquerystorage.googleapis.com",
"bigqueryunified.googleapis.com",
"cloudaicompanion.googleapis.com",
"dataproc.googleapis.com",
"run.googleapis.com",
"storage.googleapis.com"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
# Configure a PROJECT_ID and REGION
PROJECT_ID = "<YOUR_PROJECT_ID>"
REGION = "<YOUR_REGION>"
# Enable Private Google Access
import subprocess
command = [
"gcloud",
"compute",
"networks",
"subnets",
"update",
"default",
f"--region={REGION}",
"--enable-private-ip-google-access"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
# Create a Cloud Storage Bucket
from google.cloud import storage
from google.cloud.exceptions import NotFound
BUCKET_NAME = f"{PROJECT_ID}-demo"
storage_client = storage.Client(project=PROJECT_ID)
try:
bucket = storage_client.get_bucket(BUCKET_NAME)
print(f"Bucket {BUCKET_NAME} already exists.")
except NotFound:
bucket = storage_client.create_bucket(BUCKET_NAME, location=REGION)
print(f"Bucket {BUCKET_NAME} created.")
# Create a BigQuery dataset.
from google.cloud import bigquery
DATASET_ID = f"{PROJECT_ID}.demo"
client = bigquery.Client()
dataset = bigquery.Dataset(DATASET_ID)
dataset.location = REGION
dataset = client.create_dataset(dataset, exists_ok=True)
5. สร้างการเชื่อมต่อกับ Google Cloud Serverless สำหรับ Apache Spark
เมื่อใช้ Spark Connect คุณจะเชื่อมต่อกับเซสชัน Spark แบบไร้เซิร์ฟเวอร์เพื่อเรียกใช้งาน Spark แบบอินเทอร์แอกทีฟได้ คุณกำหนดค่ารันไทม์ด้วย Lightning Engine เพื่อประสิทธิภาพ Spark ขั้นสูง Lightning Engine ทำงานโดยการเร่งความเร็วเวิร์กโหลดโดยใช้ Apache Gluten และ Velox
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.version = "3.0"
# You can optionally configure Spark properties as well. See https://cloud.google.com/dataproc-serverless/docs/concepts/properties.
session.runtime_config.properties = {
"dataproc.runtime": "premium",
"spark.dataproc.engine": "lightningEngine",
}
# To avoid going over quota in this demo, cap the max number of Spark workers.
session.runtime_config.properties = {
"spark.dynamicAllocation.maxExecutors": "4"
}
spark = (
DataprocSparkSession.builder
.appName("CustomSparkSession")
.dataprocSessionConfig(session)
.getOrCreate()
)
6. โหลดและสำรวจข้อมูลโดยใช้ Gemini
ในส่วนนี้ คุณจะได้ทำตามขั้นตอนแรกที่สำคัญในโปรเจ็กต์วิทยาศาสตร์ข้อมูล นั่นคือการเตรียมข้อมูล คุณเริ่มต้นด้วยการโหลดข้อมูลลงใน Apache Spark DataFrame จาก BigQuery
# Load the tables
order_items = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.order_items").load()
users = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.users").load()
# Register temp tables
users.createOrReplaceTempView("users")
order_items.createOrReplaceTempView("order_items")
# Verify temp tables
spark.sql("SELECT * FROM order_items LIMIT 10").show()
จากนั้นใช้ Gemini เพื่อสร้างโค้ด PySpark เพื่อสำรวจข้อมูลและทำความเข้าใจข้อมูลได้ดียิ่งขึ้น

พรอมต์ 1: ใช้ PySpark เพื่อสำรวจตารางผู้ใช้และแสดง 10 แถวแรก
# prompt: Using PySpark, explore the users table and show the first 10 rows.
users.show(10)
พรอมต์ 2: ใช้ PySpark เพื่อสำรวจตาราง order_items และแสดง 10 แถวแรก
# prompt: Using PySpark, explore the order_items table and show the first 10 rows.
order_items.show(10)
พรอมต์ 3: ใช้ PySpark เพื่อแสดง 5 ประเทศที่ผู้ใช้เข้าชมบ่อยที่สุดในตารางผู้ใช้ แสดงประเทศและจํานวนผู้ใช้จากแต่ละประเทศ
# prompt: Using PySpark, show the top 5 most frequent countries in the users table. Display the country and the number of users from each country.
from pyspark.sql.functions import col, count
users.groupBy("country").agg(count("*").alias("user_count")).orderBy(col("user_count").desc()).limit(5).show()
พรอมต์ 4: ใช้ PySpark เพื่อหาราคาลดเฉลี่ยของสินค้าในตาราง order_items
# prompt: Using PySpark, find the average sale price of items in the order_items table.
from pyspark.sql import functions as F
average_sale_price = order_items.agg(F.avg("sale_price").alias("average_sale_price"))
average_sale_price.show()
พรอมต์ 5: ใช้ตาราง "users" สร้างโค้ดเพื่อพล็อตประเทศเทียบกับแหล่งที่มาของการเข้าชมโดยใช้ไลบรารีการพล็อตที่เหมาะสม
# prompt: Using the table "users", generate code to plot country vs traffic source using a suitable plotting library.
sql = """
SELECT
country,
traffic_source
FROM
`bigquery-public-data.thelook_ecommerce.users`
WHERE country IS NOT NULL AND traffic_source IS NOT NULL
"""
project_id = "iceberg-summit-2025"
df = pandas_gbq.read_gbq(sql, project_id=project_id, dialect="standard")
import matplotlib.pyplot as plt
import seaborn as sns
# Group by country and traffic_source and count occurrences
df_grouped = df.groupby(['country', 'traffic_source']).size().reset_index(name='count')
# Create a pivot table for easier plotting
pivot_table = df_grouped.pivot(index='country', columns='traffic_source', values='count').fillna(0)
# Plotting
plt.figure(figsize=(15, 8))
pivot_table.plot(kind='bar', stacked=True, figsize=(15, 8))
plt.title('Traffic Source Distribution by Country')
plt.xlabel('Country')
plt.ylabel('Number of Users')
plt.xticks(rotation=90)
plt.legend(title='Traffic Source')
plt.tight_layout()
plt.show()
พรอมต์ 6: สร้างฮิสโตแกรมที่แสดงการกระจายของ "อายุ" "ประเทศ" "เพศ" "traffic_source"
# prompt: Create a histogram showing the distribution of "age", "country", "gender", "traffic_source".
import matplotlib.pyplot as plt
# Convert Spark DataFrame to Pandas DataFrame for visualization
users_pd = users.toPandas()
# Create histograms for 'age', 'country', 'gender', 'traffic_source'
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Distribution of User Attributes')
# Age distribution
axes[0, 0].hist(users_pd['age'].dropna(), bins=20, edgecolor='black')
axes[0, 0].set_title('Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Number of Users')
# Country distribution
users_pd['country'].value_counts().head(10).plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Top 10 Countries Distribution')
axes[0, 1].set_xlabel('Country')
axes[0, 1].set_ylabel('Number of Users')
axes[0, 1].tick_params(axis='x', rotation=45)
# Gender distribution
users_pd['gender'].value_counts().plot(kind='bar', ax=axes[1, 0])
axes[1, 0].set_title('Gender Distribution')
axes[1, 0].set_xlabel('Gender')
axes[1, 0].set_ylabel('Number of Users')
axes[1, 0].tick_params(axis='x', rotation=0)
# Traffic Source distribution
users_pd['traffic_source'].value_counts().head(10).plot(kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('Top 10 Traffic Source Distribution')
axes[1, 1].set_xlabel('Traffic Source')
axes[1, 1].set_ylabel('Number of Users')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()
7. การเตรียมข้อมูลและ Feature Engineering
จากนั้น คุณจะทำการปรับแต่งฟีเจอร์ในข้อมูล เลือกคอลัมน์ที่เหมาะสม แปลงข้อมูลเป็นประเภทข้อมูลที่เหมาะสมยิ่งขึ้น และระบุคอลัมน์ป้ายกำกับ
features = spark.sql("""
SELECT
CAST(u.age AS DOUBLE) AS age,
CAST(hash(u.country) AS BIGINT) * 1.0 AS country_hash,
CAST(hash(u.gender) AS BIGINT) * 1.0 AS gender_hash,
CAST(hash(u.traffic_source) AS BIGINT) * 1.0 AS traffic_source_hash,
CASE WHEN COUNT(oi.id) > 0 THEN 1 ELSE 0 END AS label -- Changed label generation to count order items
FROM users AS u
LEFT JOIN order_items AS oi
ON u.id = oi.user_id
GROUP BY u.id, u.age, u.country, u.gender, u.traffic_source
""")
features.show()
8. ฝึกโมเดลการถดถอยแบบโลจิสติก
คุณฝึกโมเดลการถดถอยแบบโลจิสติกโดยใช้ MLlib ก่อนอื่น คุณต้องใช้ VectorAssembler เพื่อแปลงข้อมูลเป็นรูปแบบเวกเตอร์ จากนั้น StandardScaler จะปรับขนาดคอลัมน์ฟีเจอร์เพื่อให้ประสิทธิภาพดียิ่งขึ้น จากนั้นสร้างการอ้างอิงถึงLogisticRegressionโมเดลและกำหนดไฮเปอร์พารามิเตอร์ คุณจะรวมขั้นตอนเหล่านี้ไว้ในออบเจ็กต์ Pipeline ฝึกโมเดลโดยใช้ฟังก์ชัน fit() และแปลงข้อมูลโดยใช้ฟังก์ชัน transform()
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.functions import array_to_vector
#Split Train and Test Data (80:20)
train_data, test_data = features.randomSplit([0.8, 0.2], seed=42)
# Initialize VectorAssembler
assembler = VectorAssembler(
inputCols=["age", "country_hash", "gender_hash", "traffic_source_hash"],
outputCol="assembled_features"
)
# Initialize StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")
# Initialize Logistic Regression model
lr = LogisticRegression(
maxIter=100,
regParam=0.2,
threshold=0.8,
featuresCol="scaled_features",
labelCol="label"
)
# Define pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Fit the model
pipeline_model = pipeline.fit(train_data)
# Transform the dataset using the trained model
transformed_dataset = pipeline_model.transform(test_data)
transformed_dataset.show()
9. การประเมินโมเดล
ประเมินชุดข้อมูลที่เพิ่งแปลง สร้างเมตริกการประเมินพื้นที่ใต้กราฟ (AUC)
# Model evaluation
eva = BinaryClassificationEvaluator(metricName="areaUnderPR")
aucPR = eva.evaluate(transformed_dataset)
print(f"AUC PR: {aucPR}")
จากนั้นใช้ Gemini เพื่อสร้างโค้ด PySpark เพื่อแสดงเอาต์พุตโมเดลเป็นภาพ
พรอมต์ 1: สร้างโค้ดเพื่อพล็อตเส้นโค้ง Precision-Recall (PR) คำนวณความแม่นยำและความอ่อนไหวจากการคาดการณ์ของโมเดล และแสดงกราฟ PR โดยใช้ไลบรารีการพล็อตที่เหมาะสม
# prompt: Generate code to plot the Precision-Recall (PR) curve. Calculate precision and recall from the model's predictions and display the PR curve using a suitable plotting library.
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, auc
# Extract predictions and labels
predictions = transformed_dataset.select("prediction", "label").toPandas()
# Calculate precision and recall
precision, recall, _ = precision_recall_curve(predictions["label"], predictions["prediction"])
# Calculate AUC-PR
pr_auc = auc(recall, precision)
# Plot the PR curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid(True)
plt.show()
พรอมต์ 2: สร้างโค้ดเพื่อสร้างภาพเมทริกซ์ความสับสน คำนวณเมทริกซ์ความสับสนจากการคาดการณ์ของโมเดลและแสดงเป็นแผนที่ความร้อนหรือตารางที่มีจำนวนผลบวกจริง (TP), ผลลบจริง (TN), ผลบวกลวง (FP) และผลลบลวง (FN)
# prompt: Generate code to create a confusion matrix visualization. Calculate the confusion matrix from the model's predictions and display it as a heat map or a table with counts of true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN).
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
# Extract predictions and labels
predictions_and_labels = transformed_dataset.select("prediction", "label").toPandas()
# Calculate the confusion matrix
cm = confusion_matrix(predictions_and_labels["label"], predictions_and_labels["prediction"])
# Create a DataFrame for better visualization
cm_df = pd.DataFrame(cm,
index=['Actual Negative', 'Actual Positive'],
columns=['Predicted Negative', 'Predicted Positive'])
# Display the confusion matrix as a table
print("Confusion Matrix:")
print(cm_df)
# Display the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=.5)
plt.title('Confusion Matrix')
plt.ylabel('Actual Label')
plt.xlabel('Predicted Label')
plt.show()
# Calculate and display TP, TN, FP, FN
TN, FP, FN, TP = cm.ravel()
print(f"
True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")
10. เขียนการคาดการณ์ไปยัง BigQuery
ใช้ Gemini เพื่อสร้างโค้ดเพื่อเขียนการคาดการณ์ลงในตารางใหม่ในชุดข้อมูล BigQuery
พรอมต์: ใช้ Spark เขียนชุดข้อมูลที่แปลงแล้วไปยัง BigQuery
# prompt: Using Spark, write the transformed dataset to BigQuery.
transformed_dataset.write.format("bigquery").option("table", f"{PROJECT_ID}.demo.predictions").mode("overwrite").save()
11. บันทึกโมเดลไปยัง Cloud Storage
บันทึกโมเดลไปยัง Cloud Storage โดยใช้ฟังก์ชันการทำงานดั้งเดิมของ MLlib เซิร์ฟเวอร์การอนุมานจะโหลดโมเดลจากที่นี่
MODEL_PATH = "models/prediction_model"
pipeline_model.write().overwrite().save(f"gs://{BUCKET_NAME}/{MODEL_PATH}")
12. สร้างเซิร์ฟเวอร์การอนุมาน
Cloud Run เป็นเครื่องมือที่ยืดหยุ่นในการเรียกใช้เว็บแอปแบบ Serverless โดยใช้คอนเทนเนอร์ Docker เพื่อให้ผู้ใช้ปรับแต่งได้สูงสุด สำหรับแล็บนี้ เราได้กำหนดค่า Dockerfile ให้เรียกใช้แอป Flask ที่ขับเคลื่อน PySpark คอนเทนเนอร์นี้ทำงานบน Cloud Run เพื่อทำการอนุมานในข้อมูลนำเข้า ดูโค้ดสำหรับฟีเจอร์นี้ได้ที่นี่
โคลนที่เก็บด้วยโค้ดเซิร์ฟเวอร์การอนุมาน
!git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
ดู Dockerfile
FROM python:3.12-slim
# Install OpenJDK-21 (Required for Spark)
RUN apt-get update && \
apt-get install -y openjdk-21-jre-headless procps && \
rm -rf /var/lib/apt/lists/*
ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
ENV PORT=8080
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app"]
ดูโค้ด Python สำหรับเซิร์ฟเวอร์
import os
import json
import logging
from flask import Flask, request, jsonify
from google.cloud import storage
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import hash, col
# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Initialization: Spark and Model Loading ---
GCS_BUCKET = os.environ.get("GCS_BUCKET")
GCS_MODEL_PATH = os.environ.get("GCS_MODEL_PATH")
LOCAL_MODEL_PATH = "/tmp/model"
try:
spark = SparkSession.builder \
.appName("CloudRunSparkService") \
.master("local[*]") \
.getOrCreate()
logging.info("Spark Session successfully initialized.")
except Exception as e:
logging.error(f"Failed to initialize Spark Session: {e}")
raise
def download_directory(bucket_name, prefix, local_path):
"""Downloads a directory from GCS to local filesystem."""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix=prefix))
if len(blobs) == 0:
logging.error(f"No files found in GCS bucket {bucket_name} at prefix {prefix}")
return
for blob in blobs:
if blob.name.endswith("/"): continue # Skip directories
# Structure local paths
relative_path = os.path.relpath(blob.name, prefix)
local_file_path = os.path.join(local_path, relative_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
blob.download_to_filename(local_file_path)
print(f"Model downloaded to {local_path}")
# Load model
def load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH):
"""Download and load model on startup to avoid latency per request."""
global MODEL
if not os.path.exists(LOCAL_MODEL_PATH):
download_directory(GCS_BUCKET, GCS_MODEL_PATH, LOCAL_MODEL_PATH)
logging.info(f"Loading PySpark model from {GCS_MODEL_PATH}")
# Load the Spark ML model
try:
MODEL = PipelineModel.load(LOCAL_MODEL_PATH)
logging.info("Spark model loaded successfully.")
except Exception as e:
logging.error(f"Failed to load model: {e}")
raise
# Load Model on Startup
load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH)
# --- Flask Application Setup ---
app = Flask(__name__)
@app.route('/predict', methods=['POST'])
def predict():
"""
Handles incoming POST requests for inference.
Expects JSON data that can be converted into a Spark DataFrame.
"""
if MODEL is None:
return jsonify({"error": "Model failed to load at startup."}), 500
try:
# 1. Get data from the request
data = request.get_json()
# 2. Check length of list
data_len = len(data)
cap = 100
if data_len > cap:
return jsonify({"error": f"Too many records. Count: {data_len}, Max: {cap}"}), 400
# 2. Create Spark DataFrame
df = spark.createDataFrame(data)
# 3. Transform data
input_df = df.select(
col("age").cast("DOUBLE").alias("age"),
(hash(col("country")).cast("BIGINT") * 1.0).alias("country_hash"),
(hash(col("gender")).cast("BIGINT") * 1.0).alias("gender_hash"),
(hash(col("traffic_source")).cast("BIGINT") * 1.0).alias("traffic_source_hash")
)
# 3. Perform Inference
predictions_df = MODEL.transform(input_df)
# 4. Prepare results (collect and serialize)
results = [p.prediction for p in predictions_df.select("prediction").collect()]
# 5. Return JSON response
return jsonify({"predictions": results})
except Exception as e:
logging.error(f"An error occurred during prediction: {e}")
#return jsonify({"error": str(e)}), 500
raise e
# Gunicorn entry point uses 'app' from this file
if __name__ == '__main__':
# Local testing only: Cloud Run uses Gunicorn/CMD command
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
ทำให้ใช้งานได้เซิร์ฟเวอร์การอนุมาน
import subprocess
command = [
"gcloud",
"run",
"deploy",
"inference-server",
"--source",
"/content/devrel-demos/data-analytics/dataproc-webinar/data-science/inference-server",
"--region",
f"{REGION}",
"--port",
"8080",
"--memory",
"2Gi",
"--allow-unauthenticated",
"--set-env-vars",
f"GCS_BUCKET={BUCKET_NAME},GCS_MODEL_PATH={MODEL_PATH}",
"--startup-probe",
"tcpSocket.port=8080,initialDelaySeconds=240,failureThreshold=3,timeoutSeconds=240,periodSeconds=240"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
คัดลอก URL ของเซิร์ฟเวอร์การอนุมานจากเอาต์พุตไปยังตัวแปรใหม่ ซึ่งจะคล้ายกับ https://inference-server-123456789.us-central1.run.app.
INFERENCE_SERVER_URL = "<YOUR_SERVER_URL>"
ทดสอบเซิร์ฟเวอร์การอนุมาน
import requests
age = "25.0"
country = "United States"
traffic_source = "Search"
gender = "F"
response = requests.post(
f"{INFERENCE_SERVER_URL}/predict",
json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
headers={"Content-Type": "application/json"}
)
print(response.json())
เอาต์พุตควรเป็น 1.0 หรือ 0.0
{'predictions': [1.0]}
13. กำหนดค่า Agent
ใช้ Agent Engine เพื่อสร้าง Agent ที่ทำการอนุมานได้ Agent Engine เป็นส่วนหนึ่งของแพลตฟอร์ม Vertex AI ซึ่งเป็นชุดบริการที่ช่วยให้นักพัฒนาแอปสามารถทำให้ใช้งานได้ จัดการ และปรับขนาด AI Agent ในเวอร์ชันที่ใช้งานจริง โดยมีเครื่องมือมากมาย เช่น การประเมิน Agent, บริบทเซสชัน และการดำเนินการโค้ด โดยรองรับเฟรมเวิร์กแบบ Agent ที่ได้รับความนิยมหลายรายการ ซึ่งรวมถึง Agent Development Kit (ADK) ADK เป็นเฟรมเวิร์กแบบ Agent โอเพนซอร์สที่สร้างและเพิ่มประสิทธิภาพเพื่อใช้กับ Gemini และระบบนิเวศของ Google แต่ก็เป็นโมเดลที่ไม่ขึ้นกับแพลตฟอร์ม โดยออกแบบมาเพื่อให้การพัฒนา Agent มีลักษณะคล้ายกับการพัฒนาซอฟต์แวร์มากขึ้น
เริ่มต้นไคลเอ็นต์ Vertex AI
import vertexai
from vertexai import agent_engines # For the prebuilt templates
client = vertexai.Client( # For service interactions via client.agent_engines
project=f"{PROJECT_ID}",
location=f"{REGION}",
)
กำหนดฟังก์ชันสำหรับการค้นหาโมเดลที่ใช้งานจริง
def predict_purchase(
age: str = "25.0",
country: str = "United States",
traffic_source: str = "Search",
gender: str = "M",
):
"""Predicts whether or not a user will purchase a product.
Args:
age: The age of the user.
country: The country of the user. One of: "China", "Poland", "Germany", "United States", "Spain", "United Kingdom", "España", "Japan", "Brasil", "Colombia", "Belgium", "South Korea", "Austria", "France", "Australia".
Traffic_source: The source of the user's traffic. One of: "Display", "Email", "Search", "Organic", "Facebook".
gender: The gender of the user. One of: "M" or "F".
Returns:
True if the model output is 1.0, False otherwise.
"""
import requests
response = requests.post(
f"{INFERENCE_SERVER_URL}/predict",
json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
headers={"Content-Type": "application/json"}
)
return response.json()
ทดสอบฟังก์ชันโดยส่งพารามิเตอร์ตัวอย่าง
predict_purchase(age=25.0, country="United States", traffic_source="Search", gender="M")
ใช้ ADK เพื่อกำหนด Agent ด้านล่างและระบุฟังก์ชัน predict_purchase เป็นเครื่องมือ
from google.adk.agents import Agent
from vertexai import agent_engines
agent = Agent(
model="gemini-2.5-flash",
name='purchase_prediction_agent',
tools=[predict_purchase]
)
ทดสอบ Agent ในเครื่องโดยส่งคำค้นหา
app = agent_engines.AdkApp(agent=agent)
async for event in app.async_stream_query(
user_id="123",
message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
try:
print(event['content']['parts'][0]['text'])
except:
continue
ทำให้โมเดลใช้งานได้กับ Agent Engine
remote_agent = client.agent_engines.create(
agent=app,
config={
"requirements": ["google-cloud-aiplatform[agent_engines,adk]"],
"staging_bucket": f"gs://{BUCKET_NAME}",
"display_name": "purchase-predictor",
"description": "Agent that predicts whether or not a user will purchase a product.",
}
)
เมื่อเสร็จแล้ว ให้ดูโมเดลที่ใช้งานจริงใน Cloud Console
ค้นหาโมเดลอีกครั้ง ตอนนี้จะชี้ไปยังเอเจนต์ที่ใช้งานจริงแทนที่จะเป็นเวอร์ชันในเครื่อง
async for event in remote_agent.async_stream_query(
user_id="123",
message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
try:
print(event['content']['parts'][0]['text'])
except:
continue
14. ล้างข้อมูล
ลบทรัพยากร Google Cloud ทั้งหมดที่คุณสร้างขึ้น การเรียกใช้คำสั่งล้างข้อมูลเช่นนี้เป็นแนวทางปฏิบัติแนะนำที่สำคัญเพื่อหลีกเลี่ยงการเรียกเก็บเงินในอนาคต
# Delete the deployed agent.
remote_agent.delete(force=True)
# Delete the inference server.
import subprocess
command = [
"gcloud",
"run",
"services",
"delete",
"inference-server",
"--region",
f"{REGION}",
"--quiet"
]
subprocess.run(command, capture_output=True, text=True)
# Delete the BigQuery dataset.
bigquery_client = bigquery.Client()
bigquery_client.delete_dataset(
f"{PROJECT_ID}.demo", delete_contents=True, not_found_ok=True
)
# Delete the Storage bucket.
storage_client = storage.Client()
bucket = storage_client.get_bucket(BUCKET_NAME)
bucket.delete_blobs(list(bucket.list_blobs()))
bucket.delete()
15. ยินดีด้วย
สำเร็จแล้ว! ใน Codelab นี้ คุณได้ทำสิ่งต่อไปนี้
- ใช้สมุดบันทึก BigQuery Studio เพื่อเรียกใช้เวิร์กโฟลว์ Data Science
- สร้างการเชื่อมต่อกับ Apache Spark โดยใช้ Google Cloud Serverless สำหรับ Apache Spark และขับเคลื่อนโดย Spark Connect
- ใช้ Lightning Engine เพื่อเร่งความเร็วเวิร์กโหลด Apache Spark ได้สูงสุด 4.3 เท่า
- โหลดข้อมูลจาก BigQuery โดยใช้การผสานรวมในตัวระหว่าง Apache Spark กับ BigQuery
- สำรวจข้อมูลโดยใช้การสร้างโค้ดที่ Gemini ช่วย
- ดำเนินการ Feature Engineering โดยใช้เฟรมเวิร์กการประมวลผลข้อมูลของ Apache Spark
- ฝึกและประเมินโมเดลการจัดประเภทโดยใช้ไลบรารีแมชชีนเลิร์นนิงดั้งเดิมของ Apache Spark ซึ่งก็คือ MLlib
- ติดตั้งใช้งานเซิร์ฟเวอร์การอนุมานสำหรับโมเดลการจัดประเภทโดยใช้ Flask และ Cloud Run
- ติดตั้งใช้งาน Agent เพื่อค้นหาเซิร์ฟเวอร์การอนุมานโดยใช้ภาษาธรรมชาติด้วย Agent Engine และ Agent Development Kit (ADK)
สิ่งต่อไปที่ควรทำ
- ดูข้อมูลเพิ่มเติมเกี่ยวกับ Google Cloud Serverless สำหรับ Apache Spark
- ดูวิธีกำหนดค่า Cloud Run
- อ่านข้อมูลเกี่ยวกับ Agent Engine