Data science avec Spark

1. Présentation

TheLook, un marchand de vêtements en ligne fictif, stocke des données sur les clients, les produits, les commandes, la logistique, les événements Web et les campagnes de marketing digital dans BigQuery. L'entreprise souhaite tirer parti de l'expertise existante de l'équipe en SQL et PySpark pour analyser ces données à l'aide d'Apache Spark.

Pour éviter le provisionnement ou l'ajustement manuel de l'infrastructure pour Spark, TheLook recherche une solution d'autoscaling qui lui permette de se concentrer sur les charges de travail plutôt que sur la gestion des clusters. De plus, ils souhaitent minimiser les efforts nécessaires pour intégrer Spark et BigQuery tout en restant dans l'environnement BigQuery Studio.

Dans cet atelier, vous allez prédire si un utilisateur effectuera un achat en créant un classificateur de régression logistique à l'aide de PySpark. Vous allez également exploiter l'intégration native de notebook et les fonctionnalités d'IA de BigQuery Studio pour explorer les données. Vous déployez ce modèle sur un serveur d'inférence et créez un agent pour interroger le modèle en langage naturel.

Prérequis

Avant de commencer cet atelier, vous devez :

  • avoir des connaissances de base en programmation SQL et Python ;
  • savoir exécuter du code Python dans un notebook Jupyter.
  • Connaissances de base sur l'informatique distribuée

Objectifs

  • Utilisez les notebooks BigQuery Studio pour exécuter un workflow de science des données.
  • Créez une connexion à Apache Spark à l'aide de Google Cloud Serverless pour Apache Spark et de Spark Connect.
  • Utilisez Lightning Engine pour accélérer les charges de travail Apache Spark jusqu'à 4,3 fois.
  • Chargez des données depuis BigQuery à l'aide de l'intégration intégrée entre Apache Spark et BigQuery.
  • Explorez les données à l'aide de la génération de code assistée par Gemini.
  • Effectuez l'ingénierie des caractéristiques à l'aide du framework de traitement des données Apache Spark.
  • Entraînez et évaluez un modèle de classification à l'aide de la bibliothèque de machine learning native d'Apache Spark, MLlib.
  • Déployer un serveur d'inférence pour le modèle de classification à l'aide de Flask et Cloud Run
  • Déployer un agent pour interroger le serveur d'inférence en langage naturel avec Agent Engine et l'Agent Development Kit (ADK)

2. Se connecter à un environnement d'exécution Colab

Identifier un projet Google Cloud

Créez un projet Google Cloud. Vous pouvez en utiliser un existant.

Cliquez ici pour activer les API suivantes :

  1. Dans la console Google Cloud, accédez au menu de navigation > BigQuery.

Une flèche pointe vers l'onglet "BigQuery" de la console Google Cloud.

  1. Dans le volet BigQuery Studio, cliquez sur la flèche du menu déroulant, pointez sur "Notebook", puis sélectionnez "Importer".

11fd85757040c058.png

  1. Sélectionnez la case d'option "URL", puis saisissez l'URL suivante :
https://github.com/GoogleCloudPlatform/devrel-demos/blob/main/data-analytics/dataproc-webinar/data-science/Spark_Data_Science.ipynb
  1. Définissez la région sur us-central11, puis cliquez sur Importer.

1f2743e9f0a37b3c.png

  1. Pour ouvrir le notebook, cliquez sur la flèche du menu déroulant dans le volet Explorateur avec le nom de votre project-id. Cliquez ensuite sur le menu déroulant Notebooks. Cliquez sur le notebook Spark_Data_Science.

aef016c292c8382.png

  1. Réduisez le menu de navigation BigQuery et la table des matières du notebook pour gagner de l'espace.

1c4b49de92ade1d9.png

3. Se connecter à un environnement d'exécution et exécuter du code de configuration supplémentaire

  1. Cliquez sur Se connecter. Dans le pop-up, autorisez Colab Enterprise avec votre compte de messagerie. Votre notebook se connectera automatiquement à un environnement d'exécution.

995465ba6dbfa550.png

  1. Une fois le temps d'exécution établi, le message suivant s'affiche :

7f917e7c54a84c91.png

  1. Dans le notebook, accédez à la section Setup (Configuration). Commencez ici.

4. Exécuter le code de configuration

Configurez votre environnement avec les bibliothèques Python nécessaires pour effectuer l'atelier. Configurez l'Accès privé à Google. créer un bucket Storage ; Créez un ensemble de données BigQuery. Copiez l'ID de votre projet dans le notebook. Sélectionnez une région. Pour cet atelier, utilisez la région us-central1.

Pour exécuter une cellule de code, pointez sur le bloc de cellule et cliquez sur la flèche.

9b8ccb7d6016ebb9.png

# Enable APIs
import subprocess

command = [
    "gcloud",
    "services",
    "enable",
    "aiplatform.googleapis.com",
    "bigquery.googleapis.com",
    "bigquerystorage.googleapis.com",
    "bigqueryunified.googleapis.com",
    "cloudaicompanion.googleapis.com",
    "dataproc.googleapis.com",
    "run.googleapis.com",
    "storage.googleapis.com"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

# Configure a PROJECT_ID and REGION
PROJECT_ID = "<YOUR_PROJECT_ID>"
REGION = "<YOUR_REGION>"

# Enable Private Google Access
import subprocess

command = [
    "gcloud",
    "compute",
    "networks",
    "subnets",
    "update",
    "default",
    f"--region={REGION}",
    "--enable-private-ip-google-access"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

# Create a Cloud Storage Bucket
from google.cloud import storage
from google.cloud.exceptions import NotFound

BUCKET_NAME = f"{PROJECT_ID}-demo"

storage_client = storage.Client(project=PROJECT_ID)
try:
    bucket = storage_client.get_bucket(BUCKET_NAME)
    print(f"Bucket {BUCKET_NAME} already exists.")
except NotFound:
    bucket = storage_client.create_bucket(BUCKET_NAME, location=REGION)
    print(f"Bucket {BUCKET_NAME} created.")


# Create a BigQuery dataset.
from google.cloud import bigquery

DATASET_ID = f"{PROJECT_ID}.demo"

client = bigquery.Client()

dataset = bigquery.Dataset(DATASET_ID)

dataset.location = REGION

dataset = client.create_dataset(dataset, exists_ok=True)

5. Créer une connexion à Google Cloud Serverless pour Apache Spark

Spark Connect vous permet de vous connecter à une session Spark sans serveur pour exécuter des jobs Spark interactifs. Vous configurez votre environnement d'exécution avec Lightning Engine pour des performances Spark avancées. Lightning Engine accélère les charges de travail à l'aide d'Apache Gluten et de Velox.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session = Session()

session.runtime_config.version = "3.0"

# You can optionally configure Spark properties as well. See https://cloud.google.com/dataproc-serverless/docs/concepts/properties.
session.runtime_config.properties = {
  "dataproc.runtime": "premium",
  "spark.dataproc.engine": "lightningEngine",
}

# To avoid going over quota in this demo, cap the max number of Spark workers.
session.runtime_config.properties = {
    "spark.dynamicAllocation.maxExecutors": "4"
}

spark = (
    DataprocSparkSession.builder
      .appName("CustomSparkSession")
      .dataprocSessionConfig(session)
      .getOrCreate()
)

6. Charger et explorer des données à l'aide de Gemini

Dans cette section, vous allez passer en revue la première étape importante de tout projet de science des données : la préparation de vos données. Vous commencez par charger des données dans un DataFrame Apache Spark à partir de BigQuery.

# Load the tables
order_items = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.order_items").load()

users = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.users").load()

# Register temp tables
users.createOrReplaceTempView("users")
order_items.createOrReplaceTempView("order_items")

# Verify temp tables
spark.sql("SELECT * FROM order_items LIMIT 10").show()

Vous utiliserez ensuite Gemini pour générer du code PySpark afin d'explorer les données et de mieux les comprendre.

200d3133ea7d410b.png

Requête 1 : À l'aide de PySpark, explore la table "users" et affiche les 10 premières lignes.

# prompt:  Using PySpark, explore the users table and show the first 10 rows.

users.show(10)

Requête 2 : À l'aide de PySpark, explorez la table "order_items" et affichez les 10 premières lignes.

# prompt: Using PySpark, explore the order_items table and show the first 10 rows.

order_items.show(10)

Requête 3 : à l'aide de PySpark, affichez les cinq pays les plus fréquents dans la table "users". Affichez le pays et le nombre d'utilisateurs de chaque pays.

# prompt: Using PySpark, show the top 5 most frequent countries in the users table. Display the country and the number of users from each country.

from pyspark.sql.functions import col, count

users.groupBy("country").agg(count("*").alias("user_count")).orderBy(col("user_count").desc()).limit(5).show()

Requête 4 : À l'aide de PySpark, trouvez le prix de vente moyen des articles dans la table "order_items".

# prompt: Using PySpark, find the average sale price of items in the order_items table.

from pyspark.sql import functions as F

average_sale_price = order_items.agg(F.avg("sale_price").alias("average_sale_price"))
average_sale_price.show()

Requête 5 : À l'aide du tableau "users", générez du code pour représenter le pays par rapport à la source de trafic à l'aide d'une bibliothèque de tracé appropriée.

# prompt: Using the table "users", generate code to plot country vs traffic source using a suitable plotting library.

sql = """
    SELECT
        country,
        traffic_source
    FROM
        `bigquery-public-data.thelook_ecommerce.users`
    WHERE country IS NOT NULL AND traffic_source IS NOT NULL
"""
project_id = "iceberg-summit-2025"
df = pandas_gbq.read_gbq(sql, project_id=project_id, dialect="standard")

import matplotlib.pyplot as plt
import seaborn as sns

# Group by country and traffic_source and count occurrences
df_grouped = df.groupby(['country', 'traffic_source']).size().reset_index(name='count')

# Create a pivot table for easier plotting
pivot_table = df_grouped.pivot(index='country', columns='traffic_source', values='count').fillna(0)

# Plotting
plt.figure(figsize=(15, 8))
pivot_table.plot(kind='bar', stacked=True, figsize=(15, 8))
plt.title('Traffic Source Distribution by Country')
plt.xlabel('Country')
plt.ylabel('Number of Users')
plt.xticks(rotation=90)
plt.legend(title='Traffic Source')
plt.tight_layout()
plt.show()

Requête 6 : Crée un histogramme montrant la répartition de l'âge, du pays, du genre et de la source de trafic.

# prompt: Create a histogram showing the distribution of "age", "country", "gender", "traffic_source".

import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas DataFrame for visualization
users_pd = users.toPandas()

# Create histograms for 'age', 'country', 'gender', 'traffic_source'
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Distribution of User Attributes')

# Age distribution
axes[0, 0].hist(users_pd['age'].dropna(), bins=20, edgecolor='black')
axes[0, 0].set_title('Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Number of Users')

# Country distribution
users_pd['country'].value_counts().head(10).plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Top 10 Countries Distribution')
axes[0, 1].set_xlabel('Country')
axes[0, 1].set_ylabel('Number of Users')
axes[0, 1].tick_params(axis='x', rotation=45)

# Gender distribution
users_pd['gender'].value_counts().plot(kind='bar', ax=axes[1, 0])
axes[1, 0].set_title('Gender Distribution')
axes[1, 0].set_xlabel('Gender')
axes[1, 0].set_ylabel('Number of Users')
axes[1, 0].tick_params(axis='x', rotation=0)

# Traffic Source distribution
users_pd['traffic_source'].value_counts().head(10).plot(kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('Top 10 Traffic Source Distribution')
axes[1, 1].set_xlabel('Traffic Source')
axes[1, 1].set_ylabel('Number of Users')
axes[1, 1].tick_params(axis='x', rotation=45)

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

7. Préparation des données et ingénierie des caractéristiques

Vous allez ensuite effectuer une ingénierie des caractéristiques sur les données. Sélectionnez les colonnes appropriées, transformez les données en types de données plus adaptés et identifiez une colonne de libellé.

features = spark.sql("""
SELECT
  CAST(u.age AS DOUBLE) AS age,
  CAST(hash(u.country) AS BIGINT) * 1.0 AS country_hash,
  CAST(hash(u.gender) AS BIGINT) * 1.0 AS gender_hash,
  CAST(hash(u.traffic_source) AS BIGINT) * 1.0 AS traffic_source_hash,
  CASE WHEN COUNT(oi.id) > 0 THEN 1 ELSE 0 END AS label -- Changed label generation to count order items
FROM users AS u
LEFT JOIN order_items AS oi
ON u.id = oi.user_id
GROUP BY u.id, u.age, u.country, u.gender, u.traffic_source
""")
features.show()

8. Entraîner un modèle de régression logistique

Vous entraînez un modèle de régression logistique à l'aide de MLlib. Tout d'abord, vous utilisez un VectorAssembler pour convertir les données au format vectoriel. StandardScaler met ensuite à l'échelle la colonne des caractéristiques pour améliorer les performances. Vous créez ensuite une référence à un modèle LogisticRegression et définissez les hyperparamètres. Vous combinez ces étapes dans un objet Pipeline, entraînez le modèle à l'aide de la fonction fit() et transformez les données à l'aide de la fonction transform().

from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.functions import array_to_vector

#Split Train and Test Data (80:20)
train_data, test_data = features.randomSplit([0.8, 0.2], seed=42)

# Initialize VectorAssembler
assembler = VectorAssembler(
    inputCols=["age", "country_hash", "gender_hash", "traffic_source_hash"],
    outputCol="assembled_features"
)

# Initialize StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")

# Initialize Logistic Regression model
lr = LogisticRegression(
    maxIter=100,
    regParam=0.2,
    threshold=0.8,
    featuresCol="scaled_features",
    labelCol="label"
)

# Define pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Fit the model
pipeline_model = pipeline.fit(train_data)

# Transform the dataset using the trained model
transformed_dataset = pipeline_model.transform(test_data)
transformed_dataset.show()

9. Évaluation du modèle

Évaluez l'ensemble de données que vous venez de transformer. Générez la métrique d'évaluation aire sous la courbe (AUC).

# Model evaluation
eva = BinaryClassificationEvaluator(metricName="areaUnderPR")
aucPR = eva.evaluate(transformed_dataset)
print(f"AUC PR: {aucPR}")

Ensuite, utilisez Gemini pour générer du code PySpark afin de visualiser la sortie de votre modèle.

Requête 1 : génère du code pour tracer la courbe de précision/rappel. Calculez la précision et le rappel à partir des prédictions du modèle, puis affichez la courbe PR à l'aide d'une bibliothèque de tracé appropriée.

# prompt: Generate code to plot the Precision-Recall (PR) curve. Calculate precision and recall from the model's predictions and display the PR curve using a suitable plotting library.

import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, auc

# Extract predictions and labels
predictions = transformed_dataset.select("prediction", "label").toPandas()

# Calculate precision and recall
precision, recall, _ = precision_recall_curve(predictions["label"], predictions["prediction"])

# Calculate AUC-PR
pr_auc = auc(recall, precision)

# Plot the PR curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid(True)
plt.show()

Requête 2 : générer du code pour créer une visualisation de la matrice de confusion. Calculez la matrice de confusion à partir des prédictions du modèle et affichez-la sous forme de carte de densité ou de tableau avec le nombre de vrais positifs (VP), de vrais négatifs (VN), de faux positifs (FP) et de faux négatifs (FN).

# prompt: Generate code to create a confusion matrix visualization. Calculate the confusion matrix from the model's predictions and display it as a heat map or a table with counts of true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN).

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# Extract predictions and labels
predictions_and_labels = transformed_dataset.select("prediction", "label").toPandas()

# Calculate the confusion matrix
cm = confusion_matrix(predictions_and_labels["label"], predictions_and_labels["prediction"])

# Create a DataFrame for better visualization
cm_df = pd.DataFrame(cm,
                     index=['Actual Negative', 'Actual Positive'],
                     columns=['Predicted Negative', 'Predicted Positive'])

# Display the confusion matrix as a table
print("Confusion Matrix:")
print(cm_df)

# Display the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=.5)
plt.title('Confusion Matrix')
plt.ylabel('Actual Label')
plt.xlabel('Predicted Label')
plt.show()

# Calculate and display TP, TN, FP, FN
TN, FP, FN, TP = cm.ravel()
print(f"
True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")

10. Écrire des prédictions dans BigQuery

Utilisez Gemini pour générer du code permettant d'écrire vos prédictions dans une nouvelle table de votre ensemble de données BigQuery.

Requête : À l'aide de Spark, écrivez l'ensemble de données transformé dans BigQuery.

# prompt: Using Spark, write the transformed dataset to BigQuery.

transformed_dataset.write.format("bigquery").option("table", f"{PROJECT_ID}.demo.predictions").mode("overwrite").save()

11. Enregistrer le modèle dans Cloud Storage

Enregistrez votre modèle dans Cloud Storage à l'aide de la fonctionnalité native de MLlib. Le serveur d'inférence charge le modèle à partir de cet emplacement.

MODEL_PATH = "models/prediction_model"
pipeline_model.write().overwrite().save(f"gs://{BUCKET_NAME}/{MODEL_PATH}")

12. Créer un serveur d'inférence

Cloud Run est un outil flexible permettant d'exécuter des applications Web sans serveur. Il utilise des conteneurs Docker pour offrir aux utilisateurs une personnalisation maximale. Pour cet atelier, un Dockerfile est configuré pour exécuter une application Flask qui alimente PySpark. Ce conteneur s'exécute sur Cloud Run pour effectuer l'inférence sur les données d'entrée. Le code est disponible sur cette page.

Clonez le dépôt avec le code du serveur d'inférence.

!git clone https://github.com/GoogleCloudPlatform/devrel-demos.git

Affichez le fichier Dockerfile.

FROM python:3.12-slim

# Install OpenJDK-21 (Required for Spark)
RUN apt-get update && \
    apt-get install -y openjdk-21-jre-headless procps && \
    rm -rf /var/lib/apt/lists/*

ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
ENV PORT=8080

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY main.py .

CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app"]

Affichez le code Python du serveur.

import os
import json
import logging

from flask import Flask, request, jsonify
from google.cloud import storage
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import hash, col

# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Initialization: Spark and Model Loading ---
GCS_BUCKET = os.environ.get("GCS_BUCKET")
GCS_MODEL_PATH = os.environ.get("GCS_MODEL_PATH")
LOCAL_MODEL_PATH = "/tmp/model"

try:
    spark = SparkSession.builder \
        .appName("CloudRunSparkService") \
        .master("local[*]") \
        .getOrCreate()
    logging.info("Spark Session successfully initialized.")
except Exception as e:
    logging.error(f"Failed to initialize Spark Session: {e}")
    raise

def download_directory(bucket_name, prefix, local_path):
    """Downloads a directory from GCS to local filesystem."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=prefix))
    
    if len(blobs) == 0:
        logging.error(f"No files found in GCS bucket {bucket_name} at prefix {prefix}")
        return
    
    for blob in blobs:
        if blob.name.endswith("/"): continue # Skip directories
        
        # Structure local paths
        relative_path = os.path.relpath(blob.name, prefix)
        local_file_path = os.path.join(local_path, relative_path)
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
        
        blob.download_to_filename(local_file_path)
    print(f"Model downloaded to {local_path}")

# Load model
def load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH):
    """Download and load model on startup to avoid latency per request."""
    global MODEL
    if not os.path.exists(LOCAL_MODEL_PATH):
        download_directory(GCS_BUCKET, GCS_MODEL_PATH, LOCAL_MODEL_PATH)
    
    logging.info(f"Loading PySpark model from {GCS_MODEL_PATH}")
    # Load the Spark ML model
    try:
        MODEL = PipelineModel.load(LOCAL_MODEL_PATH)
        logging.info("Spark model loaded successfully.")
    except Exception as e:
        logging.error(f"Failed to load model: {e}")
        raise
    
# Load Model on Startup
load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH)

# --- Flask Application Setup ---
app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict():
    """
    Handles incoming POST requests for inference.
    Expects JSON data that can be converted into a Spark DataFrame.
    """
    if MODEL is None:
        return jsonify({"error": "Model failed to load at startup."}), 500

    try:
        # 1. Get data from the request
        data = request.get_json()
        
        # 2. Check length of list
        data_len = len(data)
        cap = 100
        if data_len > cap:
            return jsonify({"error": f"Too many records. Count: {data_len}, Max: {cap}"}), 400

        # 2. Create Spark DataFrame
        df = spark.createDataFrame(data)
        
        # 3. Transform data
        input_df = df.select(
            col("age").cast("DOUBLE").alias("age"), 
            (hash(col("country")).cast("BIGINT") * 1.0).alias("country_hash"),
            (hash(col("gender")).cast("BIGINT") * 1.0).alias("gender_hash"),
            (hash(col("traffic_source")).cast("BIGINT") * 1.0).alias("traffic_source_hash")
        )

        # 3. Perform Inference
        predictions_df = MODEL.transform(input_df)

        # 4. Prepare results (collect and serialize)
        results = [p.prediction for p in predictions_df.select("prediction").collect()]

        # 5. Return JSON response
        return jsonify({"predictions": results})

    except Exception as e:
        logging.error(f"An error occurred during prediction: {e}")
        #return jsonify({"error": str(e)}), 500
        raise e  
    
# Gunicorn entry point uses 'app' from this file
if __name__ == '__main__':
    # Local testing only: Cloud Run uses Gunicorn/CMD command
    app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

Déployez le serveur d'inférence.

import subprocess

command = [
    "gcloud",
    "run",
    "deploy",
    "inference-server",
    "--source",
    "/content/devrel-demos/data-analytics/dataproc-webinar/data-science/inference-server",
    "--region",
    f"{REGION}",
    "--port",
    "8080",
    "--memory",
    "2Gi",
    "--allow-unauthenticated",
    "--set-env-vars",
    f"GCS_BUCKET={BUCKET_NAME},GCS_MODEL_PATH={MODEL_PATH}",
    "--startup-probe",
    "tcpSocket.port=8080,initialDelaySeconds=240,failureThreshold=3,timeoutSeconds=240,periodSeconds=240"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

Copiez l'URL du serveur d'inférence à partir de la sortie dans une nouvelle variable. Il sera semblable à https://inference-server-123456789.us-central1.run.app..

INFERENCE_SERVER_URL = "<YOUR_SERVER_URL>"

Testez le serveur d'inférence.

import requests

age = "25.0"
country = "United States"
traffic_source = "Search"
gender = "F"

response = requests.post(
    f"{INFERENCE_SERVER_URL}/predict",
    json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
    headers={"Content-Type": "application/json"}
)

print(response.json())

Le résultat doit être 1.0 ou 0.0.

{'predictions': [1.0]}

13. Configurer un agent

Utilisez Agent Engine pour créer un agent capable d'effectuer des inférences. Agent Engine fait partie de Vertex AI Platform, un ensemble de services qui permet aux développeurs de déployer, de gérer et de faire évoluer des agents IA en production. Il dispose de nombreux outils, y compris pour évaluer les agents, les contextes de session et l'exécution de code. Il est compatible avec de nombreux frameworks agentiques populaires, y compris l'Agent Development Kit (ADK). ADK est un framework agentique Open Source qui, bien qu'il ait été conçu et optimisé pour être utilisé avec Gemini et l'écosystème Google, est agnostique aux modèles. Il est conçu pour que le développement d'agents ressemble davantage au développement logiciel.

Initialisez le client Vertex AI.

import vertexai
from vertexai import agent_engines # For the prebuilt templates

client = vertexai.Client(  # For service interactions via client.agent_engines
    project=f"{PROJECT_ID}",
    location=f"{REGION}",
)

Définissez une fonction pour interroger le modèle déployé.

def predict_purchase(
    age: str = "25.0",
    country: str = "United States",
    traffic_source: str = "Search",
    gender: str = "M",
):
    """Predicts whether or not a user will purchase a product.

    Args:
        age: The age of the user.
        country: The country of the user. One of: "China", "Poland", "Germany", "United States", "Spain", "United Kingdom", "España", "Japan", "Brasil", "Colombia", "Belgium", "South Korea", "Austria", "France", "Australia".
        Traffic_source: The source of the user's traffic. One of: "Display", "Email", "Search", "Organic", "Facebook".
        gender: The gender of the user. One of: "M" or "F".

    Returns:
        True if the model output is 1.0, False otherwise.
    """
    import requests
    response = requests.post(
        f"{INFERENCE_SERVER_URL}/predict",
        json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
        headers={"Content-Type": "application/json"}
    )
    return response.json()

Testez la fonction en transmettant des exemples de paramètres.

predict_purchase(age=25.0, country="United States", traffic_source="Search", gender="M")

À l'aide de l'ADK, définissez un agent ci-dessous et fournissez la fonction predict_purchase comme outil.

from google.adk.agents import Agent
from vertexai import agent_engines

agent = Agent(
   model="gemini-2.5-flash",
   name='purchase_prediction_agent',
   tools=[predict_purchase]
)

Testez l'agent localement en transmettant une requête.

app = agent_engines.AdkApp(agent=agent)
async for event in app.async_stream_query(
    user_id="123",
    message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
    try:
        print(event['content']['parts'][0]['text'])
    except:
      continue

Déployez le modèle sur Agent Engine.

remote_agent = client.agent_engines.create(
    agent=app,
    config={
        "requirements": ["google-cloud-aiplatform[agent_engines,adk]"],
        "staging_bucket": f"gs://{BUCKET_NAME}",
        "display_name": "purchase-predictor",
        "description": "Agent that predicts whether or not a user will purchase a product.",
    }
)

Une fois le modèle déployé, affichez-le dans la console Cloud.

Interrogez à nouveau le modèle. Il pointe désormais vers l'agent déployé au lieu de la version locale.

async for event in remote_agent.async_stream_query(
    user_id="123",
    message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
    try:
        print(event['content']['parts'][0]['text'])
    except:
      continue

14. Effectuer un nettoyage

Supprimez toutes les ressources Google Cloud que vous avez créées. L'exécution de commandes de nettoyage comme celle-ci est une bonne pratique essentielle pour éviter d'encourir des frais à l'avenir.

# Delete the deployed agent.
remote_agent.delete(force=True)

# Delete the inference server.
import subprocess

command = [
    "gcloud",
    "run",
    "services",
    "delete",
    "inference-server",
    "--region",
    f"{REGION}",
    "--quiet"
]

subprocess.run(command, capture_output=True, text=True)

# Delete the BigQuery dataset.
bigquery_client = bigquery.Client()

bigquery_client.delete_dataset(
    f"{PROJECT_ID}.demo", delete_contents=True, not_found_ok=True
)

# Delete the Storage bucket.
storage_client = storage.Client()

bucket = storage_client.get_bucket(BUCKET_NAME)
bucket.delete_blobs(list(bucket.list_blobs()))
bucket.delete()

15. Félicitations !

Félicitations, Dans cet atelier de programmation, vous avez :

  • Utilisé des notebooks BigQuery Studio pour exécuter un workflow de science des données.
  • Vous avez créé une connexion à Apache Spark à l'aide de Google Cloud Serverless pour Apache Spark et de Spark Connect.
  • Utilisé Lightning Engine pour accélérer les charges de travail Apache Spark jusqu'à 4,3 fois.
  • Données chargées depuis BigQuery à l'aide de l'intégration intégrée entre Apache Spark et BigQuery.
  • Exploration des données à l'aide de la génération de code assistée par Gemini
  • J'ai effectué l'ingénierie des caractéristiques à l'aide du framework de traitement des données Apache Spark.
  • Entraîné et évalué un modèle de classification à l'aide de la bibliothèque de machine learning native d'Apache Spark, MLlib.
  • Déployer un serveur d'inférence pour le modèle de classification à l'aide de Flask et Cloud Run
  • Déployé un agent pour interroger le serveur d'inférence en langage naturel avec Agent Engine et l'Agent Development Kit (ADK)

Et ensuite ?