1. Panoramica
TheLook, un rivenditore di abbigliamento e-commerce ipotetico, archivia i dati su clienti, prodotti, ordini, logistica, eventi web e campagne di marketing digitale in BigQuery. L'azienda vuole sfruttare le competenze esistenti del team in SQL e PySpark per analizzare questi dati utilizzando Apache Spark.
Per evitare il provisioning o l'ottimizzazione manuale dell'infrastruttura per Spark, TheLook cerca una soluzione di scalabilità automatica che consenta di concentrarsi sui carichi di lavoro anziché sulla gestione dei cluster. Inoltre, vuole ridurre al minimo lo sforzo necessario per integrare Spark e BigQuery rimanendo nell'ambiente BigQuery Studio.
In questo lab, prevedi se un utente effettuerà un acquisto creando un classificatore di regressione logistica utilizzando PySpark e sfruttando l'integrazione nativa dei notebook e le funzionalità AI di BigQuery Studio per esplorare i dati. Esegui il deployment di questo modello in un server di inferenza e crei un agente per eseguire query sul modello utilizzando il linguaggio naturale.
Prerequisiti
Prima di iniziare questo lab, dovresti acquisire familiarità con:
- Programmazione di base in SQL e Python.
- Esecuzione di codice Python in un notebook Jupyter.
- Una conoscenza di base del computing distribuito
Obiettivi
- Utilizzare i notebook di BigQuery Studio per eseguire un flusso di lavoro di data science.
- Creare una connessione ad Apache Spark utilizzando Google Cloud Serverless per Apache Spark e basato su Spark Connect.
- Utilizzare Lightning Engine per accelerare i carichi di lavoro Apache Spark fino a 4,3 volte.
- Caricare i dati da BigQuery utilizzando l'integrazione integrata tra Apache Spark e BigQuery.
- Esplorare i dati utilizzando la generazione di codice assistita da Gemini.
- Eseguire il feature engineering utilizzando il framework di elaborazione dei dati di Apache Spark.
- Addestrare e valutare un modello di classificazione utilizzando la libreria di machine learning nativa di Apache Spark, MLlib.
- Eseguire il deployment di un server di inferenza per il modello di classificazione utilizzando Flask e Cloud Run
- Eseguire il deployment di un agente per eseguire query sul server di inferenza utilizzando il linguaggio naturale con Agent Engine e Agent Development Kit (ADK),
2. Connettersi a un ambiente di runtime Colab
Identificare un progetto Google Cloud
Crea un progetto Google Cloud. Puoi utilizzarne uno esistente.
Abilitare le API consigliate :
Fai clic qui per abilitare le seguenti API:
- aiplatform.googleapis.com
- bigquery.googleapis.com
- bigquerystorage.googleapis.com
- bigqueryunified.googleapis.com
- cloudaicompanion.googleapis.com
- dataproc.googleapis.com
- storage.googleapis.com
- run.googleapis.com
Navigare nell'interfaccia utente :
- Nella console Google Cloud, vai al menu di navigazione > BigQuery.

- Nel riquadro BigQuery Studio, fai clic sul pulsante della freccia del menu a discesa, passa il mouse sopra Notebook e seleziona Carica.

- Seleziona il pulsante di opzione URL e inserisci il seguente URL:
https://github.com/GoogleCloudPlatform/devrel-demos/blob/main/data-analytics/dataproc-webinar/data-science/Spark_Data_Science.ipynb
- Imposta la regione su
us-central11e fai clic su Carica.

- Per aprire il notebook, fai clic sulla freccia del menu a discesa nel riquadro Explorer con il nome dell'ID progetto. Quindi fai clic sul menu a discesa per Notebook. Fai clic sul notebook
Spark_Data_Science.

- Comprimi il menu di navigazione di BigQuery e il sommario del notebook per avere più spazio.

3. Connettersi a un runtime ed eseguire codice di configurazione aggiuntivo
- Fai clic su Connetti. Nel popup, autorizza Colab Enterprise con il tuo account email. Il notebook si connetterà automaticamente a un runtime.

- Una volta stabilito il runtime, vedrai quanto segue:

- Nel notebook, scorri fino alla sezione Configurazione. Inizia da qui.
4. Eseguire il codice di configurazione
Configura l'ambiente con le librerie Python necessarie per completare il lab. Configura l'accesso privato Google. Crea un bucket di Cloud Storage. Crea un set di dati BigQuery. Copia l'ID progetto nel notebook. Seleziona una regione. Per questo lab, utilizza la regione us-central1.
Puoi eseguire una cella di codice passando il cursore all'interno del blocco di celle e facendo clic sulla freccia.

# Enable APIs
import subprocess
command = [
"gcloud",
"services",
"enable",
"aiplatform.googleapis.com",
"bigquery.googleapis.com",
"bigquerystorage.googleapis.com",
"bigqueryunified.googleapis.com",
"cloudaicompanion.googleapis.com",
"dataproc.googleapis.com",
"run.googleapis.com",
"storage.googleapis.com"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
# Configure a PROJECT_ID and REGION
PROJECT_ID = "<YOUR_PROJECT_ID>"
REGION = "<YOUR_REGION>"
# Enable Private Google Access
import subprocess
command = [
"gcloud",
"compute",
"networks",
"subnets",
"update",
"default",
f"--region={REGION}",
"--enable-private-ip-google-access"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
# Create a Cloud Storage Bucket
from google.cloud import storage
from google.cloud.exceptions import NotFound
BUCKET_NAME = f"{PROJECT_ID}-demo"
storage_client = storage.Client(project=PROJECT_ID)
try:
bucket = storage_client.get_bucket(BUCKET_NAME)
print(f"Bucket {BUCKET_NAME} already exists.")
except NotFound:
bucket = storage_client.create_bucket(BUCKET_NAME, location=REGION)
print(f"Bucket {BUCKET_NAME} created.")
# Create a BigQuery dataset.
from google.cloud import bigquery
DATASET_ID = f"{PROJECT_ID}.demo"
client = bigquery.Client()
dataset = bigquery.Dataset(DATASET_ID)
dataset.location = REGION
dataset = client.create_dataset(dataset, exists_ok=True)
5. Creare una connessione a Google Cloud Serverless per Apache Spark
Utilizzando Spark Connect, ti connetti a una sessione Spark serverless per eseguire job Spark interattivi. Configura il runtime con Lightning Engine per prestazioni Spark avanzate. Lightning Engine funziona accelerando i carichi di lavoro utilizzando Apache Gluten e Velox.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.version = "3.0"
# You can optionally configure Spark properties as well. See https://cloud.google.com/dataproc-serverless/docs/concepts/properties.
session.runtime_config.properties = {
"dataproc.runtime": "premium",
"spark.dataproc.engine": "lightningEngine",
}
# To avoid going over quota in this demo, cap the max number of Spark workers.
session.runtime_config.properties = {
"spark.dynamicAllocation.maxExecutors": "4"
}
spark = (
DataprocSparkSession.builder
.appName("CustomSparkSession")
.dataprocSessionConfig(session)
.getOrCreate()
)
6. Caricare ed esplorare i dati utilizzando Gemini
In questa sezione, esegui il primo passaggio importante di qualsiasi progetto di data science: la preparazione dei dati. Inizia caricando i dati in un dataframe Apache Spark da BigQuery.
# Load the tables
order_items = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.order_items").load()
users = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.users").load()
# Register temp tables
users.createOrReplaceTempView("users")
order_items.createOrReplaceTempView("order_items")
# Verify temp tables
spark.sql("SELECT * FROM order_items LIMIT 10").show()
Poi, utilizzi Gemini per generare codice PySpark per esplorare i dati e comprenderli meglio.

Prompt 1: utilizzando PySpark, esplora la tabella users e mostra le prime 10 righe.
# prompt: Using PySpark, explore the users table and show the first 10 rows.
users.show(10)
Prompt 2: utilizzando PySpark, esplora la tabella order_items e mostra le prime 10 righe.
# prompt: Using PySpark, explore the order_items table and show the first 10 rows.
order_items.show(10)
Prompt 3: utilizzando PySpark, mostra i 5 paesi più frequenti nella tabella users. Visualizza il paese e il numero di utenti di ogni paese.
# prompt: Using PySpark, show the top 5 most frequent countries in the users table. Display the country and the number of users from each country.
from pyspark.sql.functions import col, count
users.groupBy("country").agg(count("*").alias("user_count")).orderBy(col("user_count").desc()).limit(5).show()
Prompt 4: utilizzando PySpark, trova il prezzo scontato medio degli articoli nella tabella order_items.
# prompt: Using PySpark, find the average sale price of items in the order_items table.
from pyspark.sql import functions as F
average_sale_price = order_items.agg(F.avg("sale_price").alias("average_sale_price"))
average_sale_price.show()
Prompt 5: utilizzando la tabella "users", genera codice per tracciare il paese rispetto alla sorgente di traffico utilizzando una libreria di tracciamento adatta.
# prompt: Using the table "users", generate code to plot country vs traffic source using a suitable plotting library.
sql = """
SELECT
country,
traffic_source
FROM
`bigquery-public-data.thelook_ecommerce.users`
WHERE country IS NOT NULL AND traffic_source IS NOT NULL
"""
project_id = "iceberg-summit-2025"
df = pandas_gbq.read_gbq(sql, project_id=project_id, dialect="standard")
import matplotlib.pyplot as plt
import seaborn as sns
# Group by country and traffic_source and count occurrences
df_grouped = df.groupby(['country', 'traffic_source']).size().reset_index(name='count')
# Create a pivot table for easier plotting
pivot_table = df_grouped.pivot(index='country', columns='traffic_source', values='count').fillna(0)
# Plotting
plt.figure(figsize=(15, 8))
pivot_table.plot(kind='bar', stacked=True, figsize=(15, 8))
plt.title('Traffic Source Distribution by Country')
plt.xlabel('Country')
plt.ylabel('Number of Users')
plt.xticks(rotation=90)
plt.legend(title='Traffic Source')
plt.tight_layout()
plt.show()
Prompt 6:crea un istogramma che mostri la distribuzione di "age", "country", "gender", "traffic_source".
# prompt: Create a histogram showing the distribution of "age", "country", "gender", "traffic_source".
import matplotlib.pyplot as plt
# Convert Spark DataFrame to Pandas DataFrame for visualization
users_pd = users.toPandas()
# Create histograms for 'age', 'country', 'gender', 'traffic_source'
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Distribution of User Attributes')
# Age distribution
axes[0, 0].hist(users_pd['age'].dropna(), bins=20, edgecolor='black')
axes[0, 0].set_title('Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Number of Users')
# Country distribution
users_pd['country'].value_counts().head(10).plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Top 10 Countries Distribution')
axes[0, 1].set_xlabel('Country')
axes[0, 1].set_ylabel('Number of Users')
axes[0, 1].tick_params(axis='x', rotation=45)
# Gender distribution
users_pd['gender'].value_counts().plot(kind='bar', ax=axes[1, 0])
axes[1, 0].set_title('Gender Distribution')
axes[1, 0].set_xlabel('Gender')
axes[1, 0].set_ylabel('Number of Users')
axes[1, 0].tick_params(axis='x', rotation=0)
# Traffic Source distribution
users_pd['traffic_source'].value_counts().head(10).plot(kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('Top 10 Traffic Source Distribution')
axes[1, 1].set_xlabel('Traffic Source')
axes[1, 1].set_ylabel('Number of Users')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()
7. Preparazione dei dati e feature engineering
Poi, esegui il feature engineering sui dati. Seleziona le colonne appropriate, trasforma i dati in tipi di dati più adatti e identifica una colonna di etichette.
features = spark.sql("""
SELECT
CAST(u.age AS DOUBLE) AS age,
CAST(hash(u.country) AS BIGINT) * 1.0 AS country_hash,
CAST(hash(u.gender) AS BIGINT) * 1.0 AS gender_hash,
CAST(hash(u.traffic_source) AS BIGINT) * 1.0 AS traffic_source_hash,
CASE WHEN COUNT(oi.id) > 0 THEN 1 ELSE 0 END AS label -- Changed label generation to count order items
FROM users AS u
LEFT JOIN order_items AS oi
ON u.id = oi.user_id
GROUP BY u.id, u.age, u.country, u.gender, u.traffic_source
""")
features.show()
8. Addestrare un modello di regressione logistica
Utilizzando MLlib, addestra un modello di regressione logistica. Innanzitutto, utilizzi un VectorAssembler per convertire i dati in un formato vettoriale. Poi, StandardScaler scala la colonna delle funzionalità per migliorare le prestazioni. Poi, crei un riferimento a un modello LogisticRegression e definisci gli iperparametri. Combina questi passaggi in un oggetto Pipeline, addestra il modello utilizzando la funzione fit() e trasforma i dati utilizzando la funzione transform().
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.functions import array_to_vector
#Split Train and Test Data (80:20)
train_data, test_data = features.randomSplit([0.8, 0.2], seed=42)
# Initialize VectorAssembler
assembler = VectorAssembler(
inputCols=["age", "country_hash", "gender_hash", "traffic_source_hash"],
outputCol="assembled_features"
)
# Initialize StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")
# Initialize Logistic Regression model
lr = LogisticRegression(
maxIter=100,
regParam=0.2,
threshold=0.8,
featuresCol="scaled_features",
labelCol="label"
)
# Define pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Fit the model
pipeline_model = pipeline.fit(train_data)
# Transform the dataset using the trained model
transformed_dataset = pipeline_model.transform(test_data)
transformed_dataset.show()
9. Valutazione del modello
Valuta il set di dati appena trasformato. Genera l'area della metrica di valutazione sotto la curva (AUC).
# Model evaluation
eva = BinaryClassificationEvaluator(metricName="areaUnderPR")
aucPR = eva.evaluate(transformed_dataset)
print(f"AUC PR: {aucPR}")
Poi, utilizza Gemini per generare codice PySpark per visualizzare l'output del modello.
Prompt 1:genera codice per tracciare la curva di precisione-richiamo (PR). Calcola la precisione e il richiamo dalle previsioni del modello e visualizza la curva PR utilizzando una libreria di tracciamento adatta.
# prompt: Generate code to plot the Precision-Recall (PR) curve. Calculate precision and recall from the model's predictions and display the PR curve using a suitable plotting library.
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, auc
# Extract predictions and labels
predictions = transformed_dataset.select("prediction", "label").toPandas()
# Calculate precision and recall
precision, recall, _ = precision_recall_curve(predictions["label"], predictions["prediction"])
# Calculate AUC-PR
pr_auc = auc(recall, precision)
# Plot the PR curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid(True)
plt.show()
Prompt 2:genera codice per creare una visualizzazione della matrice di confusione. Calcola la matrice di confusione dalle previsioni del modello e visualizzala come una mappa termica o una tabella con i conteggi di veri positivi (TP), veri negativi (TN), falsi positivi (FP) e falsi negativi (FN).
# prompt: Generate code to create a confusion matrix visualization. Calculate the confusion matrix from the model's predictions and display it as a heat map or a table with counts of true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN).
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
# Extract predictions and labels
predictions_and_labels = transformed_dataset.select("prediction", "label").toPandas()
# Calculate the confusion matrix
cm = confusion_matrix(predictions_and_labels["label"], predictions_and_labels["prediction"])
# Create a DataFrame for better visualization
cm_df = pd.DataFrame(cm,
index=['Actual Negative', 'Actual Positive'],
columns=['Predicted Negative', 'Predicted Positive'])
# Display the confusion matrix as a table
print("Confusion Matrix:")
print(cm_df)
# Display the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=.5)
plt.title('Confusion Matrix')
plt.ylabel('Actual Label')
plt.xlabel('Predicted Label')
plt.show()
# Calculate and display TP, TN, FP, FN
TN, FP, FN, TP = cm.ravel()
print(f"
True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")
10. Scrivere le previsioni in BigQuery
Utilizza Gemini per generare codice per scrivere le previsioni in una nuova tabella nel set di dati BigQuery.
Prompt:utilizzando Spark, scrivi il set di dati trasformato in BigQuery.
# prompt: Using Spark, write the transformed dataset to BigQuery.
transformed_dataset.write.format("bigquery").option("table", f"{PROJECT_ID}.demo.predictions").mode("overwrite").save()
11. Salvare il modello in Cloud Storage
Utilizzando la funzionalità nativa di MLlib, salva il modello in Cloud Storage. Il server di inferenza carica il modello da qui.
MODEL_PATH = "models/prediction_model"
pipeline_model.write().overwrite().save(f"gs://{BUCKET_NAME}/{MODEL_PATH}")
12. Creare un server di inferenza
Cloud Run è uno strumento flessibile per eseguire app web serverless. Utilizza i container Docker per offrire agli utenti la massima personalizzazione. Per questo lab, un Dockerfile è configurato per eseguire un'app Flask che utilizza PySpark. Questo container viene eseguito su Cloud Run per eseguire l'inferenza sui dati di input. Il codice è disponibile qui.
Clona il repository con il codice del server di inferenza.
!git clone https://github.com/GoogleCloudPlatform/devrel-demos.git
Visualizza il Dockerfile.
FROM python:3.12-slim
# Install OpenJDK-21 (Required for Spark)
RUN apt-get update && \
apt-get install -y openjdk-21-jre-headless procps && \
rm -rf /var/lib/apt/lists/*
ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
ENV PORT=8080
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app"]
Visualizza il codice Python per il server.
import os
import json
import logging
from flask import Flask, request, jsonify
from google.cloud import storage
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import hash, col
# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Initialization: Spark and Model Loading ---
GCS_BUCKET = os.environ.get("GCS_BUCKET")
GCS_MODEL_PATH = os.environ.get("GCS_MODEL_PATH")
LOCAL_MODEL_PATH = "/tmp/model"
try:
spark = SparkSession.builder \
.appName("CloudRunSparkService") \
.master("local[*]") \
.getOrCreate()
logging.info("Spark Session successfully initialized.")
except Exception as e:
logging.error(f"Failed to initialize Spark Session: {e}")
raise
def download_directory(bucket_name, prefix, local_path):
"""Downloads a directory from GCS to local filesystem."""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix=prefix))
if len(blobs) == 0:
logging.error(f"No files found in GCS bucket {bucket_name} at prefix {prefix}")
return
for blob in blobs:
if blob.name.endswith("/"): continue # Skip directories
# Structure local paths
relative_path = os.path.relpath(blob.name, prefix)
local_file_path = os.path.join(local_path, relative_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
blob.download_to_filename(local_file_path)
print(f"Model downloaded to {local_path}")
# Load model
def load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH):
"""Download and load model on startup to avoid latency per request."""
global MODEL
if not os.path.exists(LOCAL_MODEL_PATH):
download_directory(GCS_BUCKET, GCS_MODEL_PATH, LOCAL_MODEL_PATH)
logging.info(f"Loading PySpark model from {GCS_MODEL_PATH}")
# Load the Spark ML model
try:
MODEL = PipelineModel.load(LOCAL_MODEL_PATH)
logging.info("Spark model loaded successfully.")
except Exception as e:
logging.error(f"Failed to load model: {e}")
raise
# Load Model on Startup
load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH)
# --- Flask Application Setup ---
app = Flask(__name__)
@app.route('/predict', methods=['POST'])
def predict():
"""
Handles incoming POST requests for inference.
Expects JSON data that can be converted into a Spark DataFrame.
"""
if MODEL is None:
return jsonify({"error": "Model failed to load at startup."}), 500
try:
# 1. Get data from the request
data = request.get_json()
# 2. Check length of list
data_len = len(data)
cap = 100
if data_len > cap:
return jsonify({"error": f"Too many records. Count: {data_len}, Max: {cap}"}), 400
# 2. Create Spark DataFrame
df = spark.createDataFrame(data)
# 3. Transform data
input_df = df.select(
col("age").cast("DOUBLE").alias("age"),
(hash(col("country")).cast("BIGINT") * 1.0).alias("country_hash"),
(hash(col("gender")).cast("BIGINT") * 1.0).alias("gender_hash"),
(hash(col("traffic_source")).cast("BIGINT") * 1.0).alias("traffic_source_hash")
)
# 3. Perform Inference
predictions_df = MODEL.transform(input_df)
# 4. Prepare results (collect and serialize)
results = [p.prediction for p in predictions_df.select("prediction").collect()]
# 5. Return JSON response
return jsonify({"predictions": results})
except Exception as e:
logging.error(f"An error occurred during prediction: {e}")
#return jsonify({"error": str(e)}), 500
raise e
# Gunicorn entry point uses 'app' from this file
if __name__ == '__main__':
# Local testing only: Cloud Run uses Gunicorn/CMD command
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
Esegui il deployment del server di inferenza.
import subprocess
command = [
"gcloud",
"run",
"deploy",
"inference-server",
"--source",
"/content/devrel-demos/data-analytics/dataproc-webinar/data-science/inference-server",
"--region",
f"{REGION}",
"--port",
"8080",
"--memory",
"2Gi",
"--allow-unauthenticated",
"--set-env-vars",
f"GCS_BUCKET={BUCKET_NAME},GCS_MODEL_PATH={MODEL_PATH}",
"--startup-probe",
"tcpSocket.port=8080,initialDelaySeconds=240,failureThreshold=3,timeoutSeconds=240,periodSeconds=240"
]
result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
Copia l'URL del server di inferenza dall'output in una nuova variabile. Sarà simile a https://inference-server-123456789.us-central1.run.app.
INFERENCE_SERVER_URL = "<YOUR_SERVER_URL>"
Testa il server di inferenza.
import requests
age = "25.0"
country = "United States"
traffic_source = "Search"
gender = "F"
response = requests.post(
f"{INFERENCE_SERVER_URL}/predict",
json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
headers={"Content-Type": "application/json"}
)
print(response.json())
L'output deve essere 1.0 o 0.0.
{'predictions': [1.0]}
13. Configurare un agente
Utilizza Agent Engine per creare un agente in grado di eseguire l'inferenza. Agent Engine fa parte di Vertex AI Platform, un insieme di servizi che consentono agli sviluppatori di eseguire il deployment, gestire e scalare gli agenti AI in produzione. Dispone di molti strumenti, tra cui la valutazione degli agenti, i contesti delle sessioni e l'esecuzione del codice. Supporta molti framework di agenti noti, tra cui il Agent Development Kit (ADK). ADK è un framework di agenti open source che, sebbene sia stato creato e ottimizzato per l'utilizzo con Gemini e l'ecosistema Google, è indipendente dal modello. È progettato per rendere lo sviluppo di agenti più simile allo sviluppo di software.
Inizializza il client Vertex AI.
import vertexai
from vertexai import agent_engines # For the prebuilt templates
client = vertexai.Client( # For service interactions via client.agent_engines
project=f"{PROJECT_ID}",
location=f"{REGION}",
)
Definisci una funzione per eseguire query sul modello di cui hai eseguito il deployment.
def predict_purchase(
age: str = "25.0",
country: str = "United States",
traffic_source: str = "Search",
gender: str = "M",
):
"""Predicts whether or not a user will purchase a product.
Args:
age: The age of the user.
country: The country of the user. One of: "China", "Poland", "Germany", "United States", "Spain", "United Kingdom", "España", "Japan", "Brasil", "Colombia", "Belgium", "South Korea", "Austria", "France", "Australia".
Traffic_source: The source of the user's traffic. One of: "Display", "Email", "Search", "Organic", "Facebook".
gender: The gender of the user. One of: "M" or "F".
Returns:
True if the model output is 1.0, False otherwise.
"""
import requests
response = requests.post(
f"{INFERENCE_SERVER_URL}/predict",
json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
headers={"Content-Type": "application/json"}
)
return response.json()
Testa la funzione passando i parametri di esempio.
predict_purchase(age=25.0, country="United States", traffic_source="Search", gender="M")
Utilizzando l'ADK, definisci un agente di seguito e fornisci la funzione predict_purchase come strumento.
from google.adk.agents import Agent
from vertexai import agent_engines
agent = Agent(
model="gemini-2.5-flash",
name='purchase_prediction_agent',
tools=[predict_purchase]
)
Testa l'agente in locale passando una query.
app = agent_engines.AdkApp(agent=agent)
async for event in app.async_stream_query(
user_id="123",
message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
try:
print(event['content']['parts'][0]['text'])
except:
continue
Esegui il deployment del modello in Agent Engine.
remote_agent = client.agent_engines.create(
agent=app,
config={
"requirements": ["google-cloud-aiplatform[agent_engines,adk]"],
"staging_bucket": f"gs://{BUCKET_NAME}",
"display_name": "purchase-predictor",
"description": "Agent that predicts whether or not a user will purchase a product.",
}
)
Al termine, visualizza il modello di cui hai eseguito il deployment in Cloud Console.
Esegui di nuovo query sul modello. Ora punta all'agente di cui hai eseguito il deployment anziché alla versione locale.
async for event in remote_agent.async_stream_query(
user_id="123",
message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
try:
print(event['content']['parts'][0]['text'])
except:
continue
14. Libera spazio
Elimina tutte le risorse Google Cloud che hai creato. L'esecuzione di comandi di pulizia come questo è una best practice fondamentale per evitare di incorrere in addebiti futuri.
# Delete the deployed agent.
remote_agent.delete(force=True)
# Delete the inference server.
import subprocess
command = [
"gcloud",
"run",
"services",
"delete",
"inference-server",
"--region",
f"{REGION}",
"--quiet"
]
subprocess.run(command, capture_output=True, text=True)
# Delete the BigQuery dataset.
bigquery_client = bigquery.Client()
bigquery_client.delete_dataset(
f"{PROJECT_ID}.demo", delete_contents=True, not_found_ok=True
)
# Delete the Storage bucket.
storage_client = storage.Client()
bucket = storage_client.get_bucket(BUCKET_NAME)
bucket.delete_blobs(list(bucket.list_blobs()))
bucket.delete()
15. Complimenti!
Ce l'hai fatta. In questo codelab hai fatto quanto segue:
- Utilizzare i notebook di BigQuery Studio per eseguire un flusso di lavoro di data science.
- Creare una connessione ad Apache Spark utilizzando Google Cloud Serverless per Apache Spark e basato su Spark Connect.
- Utilizzare Lightning Engine per accelerare i carichi di lavoro Apache Spark fino a 4,3 volte.
- Caricare i dati da BigQuery utilizzando l'integrazione integrata tra Apache Spark e BigQuery.
- Esplorare i dati utilizzando la generazione di codice assistita da Gemini.
- Eseguire il feature engineering utilizzando il framework di elaborazione dei dati di Apache Spark.
- Addestrare e valutare un modello di classificazione utilizzando la libreria di machine learning nativa di Apache Spark, MLlib.
- Eseguire il deployment di un server di inferenza per il modello di classificazione utilizzando Flask e Cloud Run
- Eseguire il deployment di un agente per eseguire query sul server di inferenza utilizzando il linguaggio naturale con Agent Engine e Agent Development Kit (ADK),
Passaggi successivi
- Scopri di più su Google Cloud Serverless per Apache Spark.
- Scopri come configurare Cloud Run.
- Leggi di più su Agent Engine.