1. Обзор
TheLook, гипотетический интернет-магазин одежды, хранит данные о клиентах, товарах, заказах, логистике, веб-событиях и цифровых маркетинговых кампаниях в BigQuery. Компания хочет использовать имеющиеся у команды знания SQL и PySpark для анализа этих данных с помощью Apache Spark.
Чтобы избежать ручного выделения или настройки инфраструктуры для Spark, компания TheLook ищет решение для автоматического масштабирования, которое позволит ей сосредоточиться на рабочих нагрузках, а не на управлении кластером. Кроме того, они хотят свести к минимуму усилия, необходимые для интеграции Spark и BigQuery, оставаясь при этом в среде BigQuery Studio.
В этой лабораторной работе вы предскажете, совершит ли пользователь покупку, создав классификатор логистической регрессии с использованием PySpark и задействовав встроенную интеграцию блокнотов BigQuery Studio и функции ИИ для анализа данных. Вы развернете эту модель на сервере вывода и создадите агента для выполнения запросов к модели на естественном языке.
Предварительные требования
Перед началом лабораторной работы вам необходимо ознакомиться со следующим:
- Основы программирования на SQL и Python.
- Запуск кода Python в блокноте Jupyter.
- Базовое понимание распределенных вычислений
Цели
- Используйте блокноты BigQuery Studio для запуска рабочих процессов анализа данных.
- Создайте подключение к Apache Spark, используя Google Cloud Serverless для Apache Spark на базе Spark Connect .
- Используйте Lightning Engine для ускорения рабочих нагрузок Apache Spark до 4,3 раз.
- Загружайте данные из BigQuery, используя встроенную интеграцию между Apache Spark и BigQuery.
- Изучите данные с помощью генерации кода, осуществляемой с использованием Gemini.
- Выполните проектирование признаков с использованием фреймворка обработки данных Apache Spark.
- Обучите и оцените модель классификации, используя встроенную библиотеку машинного обучения Apache Spark, MLlib .
- Разверните сервер вывода для модели классификации, используя Flask и Cloud Run.
- Разверните агента для выполнения запросов к серверу вывода с использованием естественного языка с помощью Agent Engine и Agent Development Kit (ADK) .
2. Подключитесь к среде выполнения Colab.
Определите проект Google Cloud.
Создайте проект в Google Cloud. Вы можете использовать уже существующий.
Включите рекомендуемые API:
Нажмите здесь , чтобы включить следующие API :
- aiplatform.googleapis.com
- bigquery.googleapis.com
- bigquerystorage.googleapis.com
- bigqueryunified.googleapis.com
- cloudaicompanion.googleapis.com
- dataproc.googleapis.com
- storage.googleapis.com
- run.googleapis.com
Навигация по пользовательскому интерфейсу:
- В консоли Google Cloud перейдите в меню «Навигация» > «BigQuery».

- В панели BigQuery Studio нажмите кнопку со стрелкой раскрывающегося списка, наведите курсор на «Блокнот» и выберите «Загрузить».

- Выберите переключатель «URL» и введите следующий URL-адрес:
https://github.com/GoogleCloudPlatform/devrel-demos/blob/main/data-analytics/dataproc-webinar/data-science/Spark_Data_Science.ipynb
- Установите регион на
us-central11и нажмите «Загрузить».

- Чтобы открыть блокнот, щелкните стрелку раскрывающегося списка в панели Проводника с именем вашего проекта. Затем щелкните раскрывающийся список «Блокноты» . Щелкните блокнот
Spark_Data_Science.

- Сверните меню навигации BigQuery и оглавление блокнота, чтобы освободить больше места.

3. Подключитесь к среде выполнения и выполните дополнительный код настройки.
- Нажмите «Подключиться». Во всплывающем окне авторизуйте Colab Enterprise, используя свою учетную запись электронной почты. Ваш блокнот автоматически подключится к среде выполнения.

- После установки среды выполнения вы увидите следующее:

- Внутри блокнота прокрутите до раздела «Настройка» . Начните отсюда.
4. Запустите код настройки.
Настройте среду с необходимыми библиотеками Python для выполнения лабораторной работы. Настройте частный доступ к Google . Создайте хранилище данных (storage bucket). Создайте набор данных BigQuery. Скопируйте идентификатор вашего проекта в блокнот. Выберите регион . Для этой лабораторной работы используйте регион us-central1 .
Для выполнения кода в ячейке необходимо навести курсор на соответствующий блок ячейки и щелкнуть стрелку.

# Enable APIs
import subprocess
command = [
"gcloud",
"services",
"enable",
"aiplatform.googleapis.com",
"bigquery.googleapis.com",
"bigquerystorage.googleapis.com",
"bigqueryunified.googleapis.com",
"cloudaicompanion.googleapis.com",
"dataproc.googleapis.com",
"run.googleapis.com",
"storage.googleapis.com"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
# Configure a PROJECT_ID and REGION
PROJECT_ID = "<YOUR_PROJECT_ID>"
REGION = "<YOUR_REGION>"
# Enable Private Google Access
import subprocess
command = [
"gcloud",
"compute",
"networks",
"subnets",
"update",
"default",
f"--region={REGION}",
"--enable-private-ip-google-access"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
# Create a Cloud Storage Bucket
from google.cloud import storage
from google.cloud.exceptions import NotFound
BUCKET_NAME = f"{PROJECT_ID}-demo"
storage_client = storage.Client(project=PROJECT_ID)
try:
bucket = storage_client.get_bucket(BUCKET_NAME)
print(f"Bucket {BUCKET_NAME} already exists.")
except NotFound:
bucket = storage_client.create_bucket(BUCKET_NAME, location=REGION)
print(f"Bucket {BUCKET_NAME} created.")
# Create a BigQuery dataset.
from google.cloud import bigquery
DATASET_ID = f"{PROJECT_ID}.demo"
client = bigquery.Client()
dataset = bigquery.Dataset(DATASET_ID)
dataset.location = REGION
dataset = client.create_dataset(dataset, exists_ok=True)
5. Создайте подключение к Google Cloud Serverless для Apache Spark.
С помощью Spark Connect вы подключаетесь к бессерверной сессии Spark для запуска интерактивных заданий Spark. Вы настраиваете среду выполнения с помощью Lightning Engine для повышения производительности Spark. Lightning Engine работает за счет ускорения рабочих нагрузок с использованием Apache Gluten и Velox .
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.version = "3.0"
# You can optionally configure Spark properties as well. See https://cloud.google.com/dataproc-serverless/docs/concepts/properties.
session.runtime_config.properties = {
"dataproc.runtime": "premium",
"spark.dataproc.engine": "lightningEngine",
}
# To avoid going over quota in this demo, cap the max number of Spark workers.
session.runtime_config.properties = {
"spark.dynamicAllocation.maxExecutors": "4"
}
spark = (
DataprocSparkSession.builder
.appName("CustomSparkSession")
.dataprocSessionConfig(session)
.getOrCreate()
)
6. Загрузка и анализ данных с помощью Gemini.
В этом разделе вы пройдете первый важный шаг в любом проекте по анализу данных: подготовку данных. Вы начнете с загрузки данных в DataFrame Apache Spark из BigQuery.
# Load the tables
order_items = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.order_items").load()
users = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.users").load()
# Register temp tables
users.createOrReplaceTempView("users")
order_items.createOrReplaceTempView("order_items")
# Verify temp tables
spark.sql("SELECT * FROM order_items LIMIT 10").show()
Затем вы используете Gemini для генерации кода PySpark, чтобы исследовать данные и лучше их понять.

Задание 1 : Используя PySpark, изучите таблицу пользователей и отобразите первые 10 строк.
# prompt: Using PySpark, explore the users table and show the first 10 rows.
users.show(10)
Задание 2 : Используя PySpark, изучите таблицу order_items и покажите первые 10 строк.
# prompt: Using PySpark, explore the order_items table and show the first 10 rows.
order_items.show(10)
Задание 3 : Используя PySpark, отобразите 5 стран с наибольшей частотой посещений в таблице пользователей. Покажите страну и количество пользователей из каждой страны.
# prompt: Using PySpark, show the top 5 most frequent countries in the users table. Display the country and the number of users from each country.
from pyspark.sql.functions import col, count
users.groupBy("country").agg(count("*").alias("user_count")).orderBy(col("user_count").desc()).limit(5).show()
Задание 4 : Используя PySpark, найдите среднюю цену продажи товаров в таблице order_items.
# prompt: Using PySpark, find the average sale price of items in the order_items table.
from pyspark.sql import functions as F
average_sale_price = order_items.agg(F.avg("sale_price").alias("average_sale_price"))
average_sale_price.show()
Задание 5 : Используя таблицу "users", сгенерируйте код для построения графика зависимости страны от источника трафика, используя подходящую библиотеку для построения графиков.
# prompt: Using the table "users", generate code to plot country vs traffic source using a suitable plotting library.
sql = """
SELECT
country,
traffic_source
FROM
`bigquery-public-data.thelook_ecommerce.users`
WHERE country IS NOT NULL AND traffic_source IS NOT NULL
"""
project_id = "iceberg-summit-2025"
df = pandas_gbq.read_gbq(sql, project_id=project_id, dialect="standard")
import matplotlib.pyplot as plt
import seaborn as sns
# Group by country and traffic_source and count occurrences
df_grouped = df.groupby(['country', 'traffic_source']).size().reset_index(name='count')
# Create a pivot table for easier plotting
pivot_table = df_grouped.pivot(index='country', columns='traffic_source', values='count').fillna(0)
# Plotting
plt.figure(figsize=(15, 8))
pivot_table.plot(kind='bar', stacked=True, figsize=(15, 8))
plt.title('Traffic Source Distribution by Country')
plt.xlabel('Country')
plt.ylabel('Number of Users')
plt.xticks(rotation=90)
plt.legend(title='Traffic Source')
plt.tight_layout()
plt.show()
Задание 6: Создайте гистограмму, показывающую распределение значений "возраст", "страна", "пол", "источник трафика".
# prompt: Create a histogram showing the distribution of "age", "country", "gender", "traffic_source".
import matplotlib.pyplot as plt
# Convert Spark DataFrame to Pandas DataFrame for visualization
users_pd = users.toPandas()
# Create histograms for 'age', 'country', 'gender', 'traffic_source'
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Distribution of User Attributes')
# Age distribution
axes[0, 0].hist(users_pd['age'].dropna(), bins=20, edgecolor='black')
axes[0, 0].set_title('Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Number of Users')
# Country distribution
users_pd['country'].value_counts().head(10).plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Top 10 Countries Distribution')
axes[0, 1].set_xlabel('Country')
axes[0, 1].set_ylabel('Number of Users')
axes[0, 1].tick_params(axis='x', rotation=45)
# Gender distribution
users_pd['gender'].value_counts().plot(kind='bar', ax=axes[1, 0])
axes[1, 0].set_title('Gender Distribution')
axes[1, 0].set_xlabel('Gender')
axes[1, 0].set_ylabel('Number of Users')
axes[1, 0].tick_params(axis='x', rotation=0)
# Traffic Source distribution
users_pd['traffic_source'].value_counts().head(10).plot(kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('Top 10 Traffic Source Distribution')
axes[1, 1].set_xlabel('Traffic Source')
axes[1, 1].set_ylabel('Number of Users')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()
7. Подготовка данных и разработка признаков.
Далее вы выполняете инженерию признаков данных. Выбираете соответствующие столбцы, преобразуете данные в более подходящие типы данных и определяете столбец с метками.
features = spark.sql("""
SELECT
CAST(u.age AS DOUBLE) AS age,
CAST(hash(u.country) AS BIGINT) * 1.0 AS country_hash,
CAST(hash(u.gender) AS BIGINT) * 1.0 AS gender_hash,
CAST(hash(u.traffic_source) AS BIGINT) * 1.0 AS traffic_source_hash,
CASE WHEN COUNT(oi.id) > 0 THEN 1 ELSE 0 END AS label -- Changed label generation to count order items
FROM users AS u
LEFT JOIN order_items AS oi
ON u.id = oi.user_id
GROUP BY u.id, u.age, u.country, u.gender, u.traffic_source
""")
features.show()
8. Обучите модель логистической регрессии.
С помощью MLlib вы обучаете модель логистической регрессии. Сначала вы используете VectorAssembler для преобразования данных в векторный формат. Затем StandardScaler масштабирует столбец признаков для повышения производительности. После этого вы создаете ссылку на модель LogisticRegression и определяете гиперпараметры. Вы объединяете эти шаги в объект Pipeline , обучаете модель с помощью функции fit() и преобразуете данные с помощью функции transform() .
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.functions import array_to_vector
#Split Train and Test Data (80:20)
train_data, test_data = features.randomSplit([0.8, 0.2], seed=42)
# Initialize VectorAssembler
assembler = VectorAssembler(
inputCols=["age", "country_hash", "gender_hash", "traffic_source_hash"],
outputCol="assembled_features"
)
# Initialize StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")
# Initialize Logistic Regression model
lr = LogisticRegression(
maxIter=100,
regParam=0.2,
threshold=0.8,
featuresCol="scaled_features",
labelCol="label"
)
# Define pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Fit the model
pipeline_model = pipeline.fit(train_data)
# Transform the dataset using the trained model
transformed_dataset = pipeline_model.transform(test_data)
transformed_dataset.show()
9. Оценка модели
Оцените преобразованный набор данных. Сгенерируйте метрику оценки — площадь под кривой (AUC) .
# Model evaluation
eva = BinaryClassificationEvaluator(metricName="areaUnderPR")
aucPR = eva.evaluate(transformed_dataset)
print(f"AUC PR: {aucPR}")
Затем используйте Gemini для генерации кода PySpark, чтобы визуализировать результаты работы вашей модели.
Задание 1: Сгенерируйте код для построения кривой точности-полноты (PR). Вычислите точность и полноту на основе прогнозов модели и отобразите кривую PR, используя подходящую библиотеку для построения графиков.
# prompt: Generate code to plot the Precision-Recall (PR) curve. Calculate precision and recall from the model's predictions and display the PR curve using a suitable plotting library.
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, auc
# Extract predictions and labels
predictions = transformed_dataset.select("prediction", "label").toPandas()
# Calculate precision and recall
precision, recall, _ = precision_recall_curve(predictions["label"], predictions["prediction"])
# Calculate AUC-PR
pr_auc = auc(recall, precision)
# Plot the PR curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid(True)
plt.show()
Задание 2: Сгенерируйте код для визуализации матрицы ошибок. Вычислите матрицу ошибок на основе прогнозов модели и отобразите ее в виде тепловой карты или таблицы с количеством истинно положительных результатов (TP), истинно отрицательных результатов (TN), ложноположительных результатов (FP) и ложноотрицательных результатов (FN).
# prompt: Generate code to create a confusion matrix visualization. Calculate the confusion matrix from the model's predictions and display it as a heat map or a table with counts of true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN).
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
# Extract predictions and labels
predictions_and_labels = transformed_dataset.select("prediction", "label").toPandas()
# Calculate the confusion matrix
cm = confusion_matrix(predictions_and_labels["label"], predictions_and_labels["prediction"])
# Create a DataFrame for better visualization
cm_df = pd.DataFrame(cm,
index=['Actual Negative', 'Actual Positive'],
columns=['Predicted Negative', 'Predicted Positive'])
# Display the confusion matrix as a table
print("Confusion Matrix:")
print(cm_df)
# Display the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=.5)
plt.title('Confusion Matrix')
plt.ylabel('Actual Label')
plt.xlabel('Predicted Label')
plt.show()
# Calculate and display TP, TN, FP, FN
TN, FP, FN, TP = cm.ravel()
print(f"
True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")
10. Запись прогнозов в BigQuery
Используйте Gemini для генерации кода, который запишет ваши прогнозы в новую таблицу в вашем наборе данных BigQuery.
Задание: Используя Spark, запишите преобразованный набор данных в BigQuery.
# prompt: Using Spark, write the transformed dataset to BigQuery.
transformed_dataset.write.format("bigquery").option("table", f"{PROJECT_ID}.demo.predictions").mode("overwrite").save()
11. Сохраните модель в облачное хранилище.
Используя встроенные функции MLlib, сохраните свою модель в Cloud Storage. Сервер вывода загрузит модель оттуда.
MODEL_PATH = "models/prediction_model"
pipeline_model.write().overwrite().save(f"gs://{BUCKET_NAME}/{MODEL_PATH}")
12. Создайте сервер вывода
Cloud Run — это гибкий инструмент для запуска бессерверных веб-приложений. Он использует контейнеры Docker, предоставляя пользователям максимальные возможности настройки. Для этой лабораторной работы настроен Dockerfile для запуска приложения Flask, работающего на базе PySpark. Этот контейнер работает в Cloud Run и выполняет вывод данных на основе входных данных. Код можно найти здесь .
Клонируйте репозиторий с кодом сервера вывода.
!git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
Просмотрите Dockerfile.
FROM python:3.12-slim
# Install OpenJDK-21 (Required for Spark)
RUN apt-get update && \
apt-get install -y openjdk-21-jre-headless procps && \
rm -rf /var/lib/apt/lists/*
ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
ENV PORT=8080
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app"]
Посмотрите код Python для сервера.
import os
import json
import logging
from flask import Flask, request, jsonify
from google.cloud import storage
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import hash, col
# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Initialization: Spark and Model Loading ---
GCS_BUCKET = os.environ.get("GCS_BUCKET")
GCS_MODEL_PATH = os.environ.get("GCS_MODEL_PATH")
LOCAL_MODEL_PATH = "/tmp/model"
try:
spark = SparkSession.builder \
.appName("CloudRunSparkService") \
.master("local[*]") \
.getOrCreate()
logging.info("Spark Session successfully initialized.")
except Exception as e:
logging.error(f"Failed to initialize Spark Session: {e}")
raise
def download_directory(bucket_name, prefix, local_path):
"""Downloads a directory from GCS to local filesystem."""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix=prefix))
if len(blobs) == 0:
logging.error(f"No files found in GCS bucket {bucket_name} at prefix {prefix}")
return
for blob in blobs:
if blob.name.endswith("/"): continue # Skip directories
# Structure local paths
relative_path = os.path.relpath(blob.name, prefix)
local_file_path = os.path.join(local_path, relative_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
blob.download_to_filename(local_file_path)
print(f"Model downloaded to {local_path}")
# Load model
def load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH):
"""Download and load model on startup to avoid latency per request."""
global MODEL
if not os.path.exists(LOCAL_MODEL_PATH):
download_directory(GCS_BUCKET, GCS_MODEL_PATH, LOCAL_MODEL_PATH)
logging.info(f"Loading PySpark model from {GCS_MODEL_PATH}")
# Load the Spark ML model
try:
MODEL = PipelineModel.load(LOCAL_MODEL_PATH)
logging.info("Spark model loaded successfully.")
except Exception as e:
logging.error(f"Failed to load model: {e}")
raise
# Load Model on Startup
load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH)
# --- Flask Application Setup ---
app = Flask(__name__)
@app.route('/predict', methods=['POST'])
def predict():
"""
Handles incoming POST requests for inference.
Expects JSON data that can be converted into a Spark DataFrame.
"""
if MODEL is None:
return jsonify({"error": "Model failed to load at startup."}), 500
try:
# 1. Get data from the request
data = request.get_json()
# 2. Check length of list
data_len = len(data)
cap = 100
if data_len > cap:
return jsonify({"error": f"Too many records. Count: {data_len}, Max: {cap}"}), 400
# 2. Create Spark DataFrame
df = spark.createDataFrame(data)
# 3. Transform data
input_df = df.select(
col("age").cast("DOUBLE").alias("age"),
(hash(col("country")).cast("BIGINT") * 1.0).alias("country_hash"),
(hash(col("gender")).cast("BIGINT") * 1.0).alias("gender_hash"),
(hash(col("traffic_source")).cast("BIGINT") * 1.0).alias("traffic_source_hash")
)
# 3. Perform Inference
predictions_df = MODEL.transform(input_df)
# 4. Prepare results (collect and serialize)
results = [p.prediction for p in predictions_df.select("prediction").collect()]
# 5. Return JSON response
return jsonify({"predictions": results})
except Exception as e:
logging.error(f"An error occurred during prediction: {e}")
#return jsonify({"error": str(e)}), 500
raise e
# Gunicorn entry point uses 'app' from this file
if __name__ == '__main__':
# Local testing only: Cloud Run uses Gunicorn/CMD command
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
Разверните сервер вывода.
import subprocess
command = [
"gcloud",
"run",
"deploy",
"inference-server",
"--source",
"/content/devrel-demos/data-analytics/dataproc-webinar/data-science/inference-server",
"--region",
f"{REGION}",
"--port",
"8080",
"--memory",
"2Gi",
"--allow-unauthenticated",
"--set-env-vars",
f"GCS_BUCKET={BUCKET_NAME},GCS_MODEL_PATH={MODEL_PATH}",
"--startup-probe",
"tcpSocket.port=8080,initialDelaySeconds=240,failureThreshold=3,timeoutSeconds=240,periodSeconds=240"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
Скопируйте URL-адрес сервера вывода из выходных данных в новую переменную. Он будет выглядеть примерно так: https://inference-server-123456789.us-central1.run.app.
INFERENCE_SERVER_URL = "<YOUR_SERVER_URL>"
Протестируйте сервер вывода.
import requests
age = "25.0"
country = "United States"
traffic_source = "Search"
gender = "F"
response = requests.post(
f"{INFERENCE_SERVER_URL}/predict",
json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
headers={"Content-Type": "application/json"}
)
print(response.json())
Результатом должно быть 1.0 или 0.0 .
{'predictions': [1.0]}
13. Настройка агента
Используйте Agent Engine для создания агента, способного выполнять инференцию. Agent Engine является частью платформы Vertex AI Platform, набора сервисов, позволяющих разработчикам развертывать, управлять и масштабировать ИИ-агентов в производственной среде. Он включает в себя множество инструментов, в том числе инструменты для оценки агентов, контекстов сессий и выполнения кода. Он поддерживает множество популярных агентных фреймворков, включая Agent Development Kit (ADK) . ADK — это агентный фреймворк с открытым исходным кодом, который, хотя и создан и оптимизирован для использования с Gemini и экосистемой Google, является модельно-независимым. Он разработан таким образом, чтобы разработка агентов больше походила на разработку программного обеспечения.
Инициализируйте клиент Vertex AI.
import vertexai
from vertexai import agent_engines # For the prebuilt templates
client = vertexai.Client( # For service interactions via client.agent_engines
project=f"{PROJECT_ID}",
location=f"{REGION}",
)
Определите функцию для запроса к развернутой модели.
def predict_purchase(
age: str = "25.0",
country: str = "United States",
traffic_source: str = "Search",
gender: str = "M",
):
"""Predicts whether or not a user will purchase a product.
Args:
age: The age of the user.
country: The country of the user. One of: "China", "Poland", "Germany", "United States", "Spain", "United Kingdom", "España", "Japan", "Brasil", "Colombia", "Belgium", "South Korea", "Austria", "France", "Australia".
Traffic_source: The source of the user's traffic. One of: "Display", "Email", "Search", "Organic", "Facebook".
gender: The gender of the user. One of: "M" or "F".
Returns:
True if the model output is 1.0, False otherwise.
"""
import requests
response = requests.post(
f"{INFERENCE_SERVER_URL}/predict",
json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
headers={"Content-Type": "application/json"}
)
return response.json()
Протестируйте функцию, передав ей примеры параметров.
predict_purchase(age=25.0, country="United States", traffic_source="Search", gender="M")
Используя ADK, определите агента, как показано ниже, и предоставьте функцию predict_purchase в качестве инструмента.
from google.adk.agents import Agent
from vertexai import agent_engines
agent = Agent(
model="gemini-2.5-flash",
name='purchase_prediction_agent',
tools=[predict_purchase]
)
Протестируйте агента локально, передав ему запрос.
app = agent_engines.AdkApp(agent=agent)
async for event in app.async_stream_query(
user_id="123",
message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
try:
print(event['content']['parts'][0]['text'])
except:
continue
Разверните модель в Agent Engine.
remote_agent = client.agent_engines.create(
agent=app,
config={
"requirements": ["google-cloud-aiplatform[agent_engines,adk]"],
"staging_bucket": f"gs://{BUCKET_NAME}",
"display_name": "purchase-predictor",
"description": "Agent that predicts whether or not a user will purchase a product.",
}
)
После завершения просмотрите развернутую модель в консоли Cloud Console .
Повторите запрос к модели. Теперь он указывает на развернутый агент, а не на локальную версию.
async for event in remote_agent.async_stream_query(
user_id="123",
message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
try:
print(event['content']['parts'][0]['text'])
except:
continue
14. Уборка
Удалите все созданные вами ресурсы Google Cloud. Выполнение подобных команд очистки — важная рекомендация, позволяющая избежать дополнительных расходов в будущем.
# Delete the deployed agent.
remote_agent.delete(force=True)
# Delete the inference server.
import subprocess
command = [
"gcloud",
"run",
"services",
"delete",
"inference-server",
"--region",
f"{REGION}",
"--quiet"
]
subprocess.run(command, capture_output=True, text=True)
# Delete the BigQuery dataset.
bigquery_client = bigquery.Client()
bigquery_client.delete_dataset(
f"{PROJECT_ID}.demo", delete_contents=True, not_found_ok=True
)
# Delete the Storage bucket.
storage_client = storage.Client()
bucket = storage_client.get_bucket(BUCKET_NAME)
bucket.delete_blobs(list(bucket.list_blobs()))
bucket.delete()
15. Поздравляем!
У вас получилось! В этом практическом задании вы выполнили следующие действия:
- Использовал блокноты BigQuery Studio для запуска рабочего процесса анализа данных.
- Установлено соединение с Apache Spark с использованием Google Cloud Serverless for Apache Spark и платформы Spark Connect .
- Использование Lightning Engine позволило ускорить выполнение задач Apache Spark до 4,3 раз.
- Данные загружены из BigQuery с использованием встроенной интеграции между Apache Spark и BigQuery.
- Проведен анализ данных с использованием генерации кода с помощью Gemini.
- Выполнил инженерию признаков с использованием фреймворка обработки данных Apache Spark.
- Обучили и оценили модель классификации, используя встроенную библиотеку машинного обучения Apache Spark, MLlib .
- Развернул сервер вывода для модели классификации с использованием Flask и Cloud Run.
- Развернул агента для выполнения запросов к серверу вывода с использованием естественного языка с помощью Agent Engine и Agent Development Kit (ADK) .
Что дальше?
- Узнайте больше о бессерверных вычислениях Google Cloud для Apache Spark .
- Узнайте, как настроить Cloud Run .
- Ознакомьтесь с информацией об Agent Engine .