Ciencia de datos con Spark

1. Descripción general

TheLook, una tienda de ropa minorista de comercio electrónico hipotética, almacena datos sobre clientes, productos, pedidos, logística, eventos web y campañas de marketing digital en BigQuery. La empresa quiere aprovechar la experiencia existente del equipo en SQL y PySpark para analizar estos datos con Apache Spark.

Para evitar el aprovisionamiento o el ajuste manual de la infraestructura para Spark, TheLook busca una solución de ajuste de escala automático que le permita enfocarse en las cargas de trabajo en lugar de la administración de clústeres. Además, quieren minimizar el esfuerzo necesario para integrar Spark y BigQuery sin salir del entorno de BigQuery Studio.

En este lab, predecirás si un usuario realizará una compra creando un clasificador de regresión logística con PySpark y aprovechando la integración nativa de notebooks y las funciones basadas en IA de BigQuery Studio para explorar los datos. Implementarás este modelo en un servidor de inferencia y crearás un agente para consultarlo con lenguaje natural.

Requisitos previos

Antes de comenzar este lab, debes tener los siguientes conocimientos:

  • Conocimientos básicos de programación en SQL y Python
  • Ejecución de código de Python en un notebook de Jupyter
  • Conocimientos básicos sobre computación distribuida

Objetivos

  • Usa notebooks de BigQuery Studio para ejecutar un flujo de trabajo de ciencia de datos.
  • Crea una conexión a Apache Spark con Google Cloud Serverless para Apache Spark y Spark Connect.
  • Usa Lightning Engine para acelerar las cargas de trabajo de Apache Spark hasta 4.3 veces.
  • Carga datos desde BigQuery con la integración integrada entre Apache Spark y BigQuery.
  • Explorar los datos con la generación de código asistida por Gemini
  • Realiza ingeniería de atributos con el framework de procesamiento de datos de Apache Spark.
  • Entrenar y evaluar un modelo de clasificación con la biblioteca de aprendizaje automático nativa de Apache Spark, MLlib
  • Implementa un servidor de inferencia para el modelo de clasificación con Flask y Cloud Run
  • Implementar un agente para consultar el servidor de inferencia con lenguaje natural con Agent Engine y el Kit de desarrollo de agentes (ADK)

2. Conéctate a un entorno de ejecución de Colab

Identifica un proyecto de Google Cloud

Crea un proyecto de Google Cloud. Puedes usar uno existente.

Haz clic aquí para habilitar las siguientes APIs:

  1. En la consola de Google Cloud, ve al menú de navegación > BigQuery.

Una flecha apunta a la pestaña BigQuery en la consola de Google Cloud.

  1. En el panel de BigQuery Studio, haz clic en el botón de flecha desplegable, coloca el cursor sobre Notebook y, luego, selecciona Subir.

11fd85757040c058.png

  1. Selecciona el botón de selección URL y, luego, ingresa la siguiente URL:
https://github.com/GoogleCloudPlatform/devrel-demos/blob/main/data-analytics/dataproc-webinar/data-science/Spark_Data_Science.ipynb
  1. Establece la región en us-central11 y haz clic en Subir.

1f2743e9f0a37b3c.png

  1. Para abrir el notebook, haz clic en la flecha del menú desplegable en el panel Explorer con el nombre de tu ID de proyecto. Luego, haz clic en el menú desplegable Notebooks. Haz clic en el cuaderno Spark_Data_Science.

aef016c292c8382.png

  1. Contrae el menú de navegación de BigQuery y el índice del notebook para tener más espacio.

1c4b49de92ade1d9.png

3. Conéctate a un entorno de ejecución y ejecuta código de configuración adicional

  1. Haz clic en Conectar. En la ventana emergente, autoriza Colab Enterprise con tu cuenta de correo electrónico. Tu notebook se conectará automáticamente a un entorno de ejecución.

995465ba6dbfa550.png

  1. Una vez que se establezca el tiempo de ejecución, verás lo siguiente:

7f917e7c54a84c91.png

  1. En el notebook, desplázate hasta la sección Configuración. Comienza aquí.

4. Ejecuta el código de configuración

Configura tu entorno con las bibliotecas de Python necesarias para completar el lab. Configura el acceso privado a Google. Crear un bucket de Storage Crear un conjunto de datos de BigQuery Copia el ID de tu proyecto en el notebook. Selecciona una región. Para este lab, usa la región us-central1.

Para ejecutar una celda de código, coloca el cursor dentro del bloque de la celda y haz clic en la flecha.

9b8ccb7d6016ebb9.png

# Enable APIs
import subprocess

command = [
    "gcloud",
    "services",
    "enable",
    "aiplatform.googleapis.com",
    "bigquery.googleapis.com",
    "bigquerystorage.googleapis.com",
    "bigqueryunified.googleapis.com",
    "cloudaicompanion.googleapis.com",
    "dataproc.googleapis.com",
    "run.googleapis.com",
    "storage.googleapis.com"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

# Configure a PROJECT_ID and REGION
PROJECT_ID = "<YOUR_PROJECT_ID>"
REGION = "<YOUR_REGION>"

# Enable Private Google Access
import subprocess

command = [
    "gcloud",
    "compute",
    "networks",
    "subnets",
    "update",
    "default",
    f"--region={REGION}",
    "--enable-private-ip-google-access"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

# Create a Cloud Storage Bucket
from google.cloud import storage
from google.cloud.exceptions import NotFound

BUCKET_NAME = f"{PROJECT_ID}-demo"

storage_client = storage.Client(project=PROJECT_ID)
try:
    bucket = storage_client.get_bucket(BUCKET_NAME)
    print(f"Bucket {BUCKET_NAME} already exists.")
except NotFound:
    bucket = storage_client.create_bucket(BUCKET_NAME, location=REGION)
    print(f"Bucket {BUCKET_NAME} created.")


# Create a BigQuery dataset.
from google.cloud import bigquery

DATASET_ID = f"{PROJECT_ID}.demo"

client = bigquery.Client()

dataset = bigquery.Dataset(DATASET_ID)

dataset.location = REGION

dataset = client.create_dataset(dataset, exists_ok=True)

5. Crea una conexión a Google Cloud Serverless for Apache Spark

Con Spark Connect, te conectas a una sesión de Spark sin servidores para ejecutar trabajos interactivos de Spark. Configuras tu entorno de ejecución con Lightning Engine para obtener un rendimiento avanzado de Spark. Lightning Engine acelera las cargas de trabajo con Apache Gluten y Velox.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session = Session()

session.runtime_config.version = "3.0"

# You can optionally configure Spark properties as well. See https://cloud.google.com/dataproc-serverless/docs/concepts/properties.
session.runtime_config.properties = {
  "dataproc.runtime": "premium",
  "spark.dataproc.engine": "lightningEngine",
}

# To avoid going over quota in this demo, cap the max number of Spark workers.
session.runtime_config.properties = {
    "spark.dynamicAllocation.maxExecutors": "4"
}

spark = (
    DataprocSparkSession.builder
      .appName("CustomSparkSession")
      .dataprocSessionConfig(session)
      .getOrCreate()
)

6. Carga y explora datos con Gemini

En esta sección, repasarás el primer paso importante de cualquier proyecto de ciencia de datos: la preparación de los datos. Comenzarás por cargar datos en un DataFrame de Apache Spark desde BigQuery.

# Load the tables
order_items = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.order_items").load()

users = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.users").load()

# Register temp tables
users.createOrReplaceTempView("users")
order_items.createOrReplaceTempView("order_items")

# Verify temp tables
spark.sql("SELECT * FROM order_items LIMIT 10").show()

Luego, usarás Gemini para generar código de PySpark y explorar los datos para comprenderlos mejor.

200d3133ea7d410b.png

Instrucción 1: Con PySpark, explora la tabla de usuarios y muestra las primeras 10 filas.

# prompt:  Using PySpark, explore the users table and show the first 10 rows.

users.show(10)

Instrucción 2: Con PySpark, explora la tabla order_items y muestra las primeras 10 filas.

# prompt: Using PySpark, explore the order_items table and show the first 10 rows.

order_items.show(10)

Instrucción 3: Con PySpark, muestra los 5 países más frecuentes en la tabla de usuarios. Muestra el país y la cantidad de usuarios de cada país.

# prompt: Using PySpark, show the top 5 most frequent countries in the users table. Display the country and the number of users from each country.

from pyspark.sql.functions import col, count

users.groupBy("country").agg(count("*").alias("user_count")).orderBy(col("user_count").desc()).limit(5).show()

Instrucción 4: Con PySpark, busca el precio de venta promedio de los artículos en la tabla order_items.

# prompt: Using PySpark, find the average sale price of items in the order_items table.

from pyspark.sql import functions as F

average_sale_price = order_items.agg(F.avg("sale_price").alias("average_sale_price"))
average_sale_price.show()

Instrucción 5: Con la tabla "users", genera código para crear un gráfico comparativo entre el país y la fuente de tráfico con una biblioteca de gráficos adecuada.

# prompt: Using the table "users", generate code to plot country vs traffic source using a suitable plotting library.

sql = """
    SELECT
        country,
        traffic_source
    FROM
        `bigquery-public-data.thelook_ecommerce.users`
    WHERE country IS NOT NULL AND traffic_source IS NOT NULL
"""
project_id = "iceberg-summit-2025"
df = pandas_gbq.read_gbq(sql, project_id=project_id, dialect="standard")

import matplotlib.pyplot as plt
import seaborn as sns

# Group by country and traffic_source and count occurrences
df_grouped = df.groupby(['country', 'traffic_source']).size().reset_index(name='count')

# Create a pivot table for easier plotting
pivot_table = df_grouped.pivot(index='country', columns='traffic_source', values='count').fillna(0)

# Plotting
plt.figure(figsize=(15, 8))
pivot_table.plot(kind='bar', stacked=True, figsize=(15, 8))
plt.title('Traffic Source Distribution by Country')
plt.xlabel('Country')
plt.ylabel('Number of Users')
plt.xticks(rotation=90)
plt.legend(title='Traffic Source')
plt.tight_layout()
plt.show()

Instrucción 6: Crea un histograma que muestre la distribución de "age", "country", "gender" y "traffic_source".

# prompt: Create a histogram showing the distribution of "age", "country", "gender", "traffic_source".

import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas DataFrame for visualization
users_pd = users.toPandas()

# Create histograms for 'age', 'country', 'gender', 'traffic_source'
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Distribution of User Attributes')

# Age distribution
axes[0, 0].hist(users_pd['age'].dropna(), bins=20, edgecolor='black')
axes[0, 0].set_title('Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Number of Users')

# Country distribution
users_pd['country'].value_counts().head(10).plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Top 10 Countries Distribution')
axes[0, 1].set_xlabel('Country')
axes[0, 1].set_ylabel('Number of Users')
axes[0, 1].tick_params(axis='x', rotation=45)

# Gender distribution
users_pd['gender'].value_counts().plot(kind='bar', ax=axes[1, 0])
axes[1, 0].set_title('Gender Distribution')
axes[1, 0].set_xlabel('Gender')
axes[1, 0].set_ylabel('Number of Users')
axes[1, 0].tick_params(axis='x', rotation=0)

# Traffic Source distribution
users_pd['traffic_source'].value_counts().head(10).plot(kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('Top 10 Traffic Source Distribution')
axes[1, 1].set_xlabel('Traffic Source')
axes[1, 1].set_ylabel('Number of Users')
axes[1, 1].tick_params(axis='x', rotation=45)

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

7. Preparación de datos y diseño de atributos

A continuación, realizarás ingeniería de atributos en los datos. Selecciona las columnas adecuadas, transforma los datos en tipos de datos más adecuados y, luego, identifica una columna de etiquetas.

features = spark.sql("""
SELECT
  CAST(u.age AS DOUBLE) AS age,
  CAST(hash(u.country) AS BIGINT) * 1.0 AS country_hash,
  CAST(hash(u.gender) AS BIGINT) * 1.0 AS gender_hash,
  CAST(hash(u.traffic_source) AS BIGINT) * 1.0 AS traffic_source_hash,
  CASE WHEN COUNT(oi.id) > 0 THEN 1 ELSE 0 END AS label -- Changed label generation to count order items
FROM users AS u
LEFT JOIN order_items AS oi
ON u.id = oi.user_id
GROUP BY u.id, u.age, u.country, u.gender, u.traffic_source
""")
features.show()

8. Entrena un modelo de regresión logística

Con MLlib, entrenas un modelo de regresión logística. Primero, usas un VectorAssembler para convertir los datos en un formato de vector. Luego, StandardScaler ajusta la columna de características para mejorar el rendimiento. Luego, creas una referencia a un modelo LogisticRegression y defines hiperparámetros. Combina estos pasos en un objeto Pipeline, entrena el modelo con la función fit() y transforma los datos con la función transform().

from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.functions import array_to_vector

#Split Train and Test Data (80:20)
train_data, test_data = features.randomSplit([0.8, 0.2], seed=42)

# Initialize VectorAssembler
assembler = VectorAssembler(
    inputCols=["age", "country_hash", "gender_hash", "traffic_source_hash"],
    outputCol="assembled_features"
)

# Initialize StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")

# Initialize Logistic Regression model
lr = LogisticRegression(
    maxIter=100,
    regParam=0.2,
    threshold=0.8,
    featuresCol="scaled_features",
    labelCol="label"
)

# Define pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Fit the model
pipeline_model = pipeline.fit(train_data)

# Transform the dataset using the trained model
transformed_dataset = pipeline_model.transform(test_data)
transformed_dataset.show()

9. Evaluación del modelo

Evalúa el conjunto de datos que acabas de transformar. Genera la métrica de evaluación área bajo la curva (AUC).

# Model evaluation
eva = BinaryClassificationEvaluator(metricName="areaUnderPR")
aucPR = eva.evaluate(transformed_dataset)
print(f"AUC PR: {aucPR}")

Luego, usa Gemini para generar código de PySpark que visualice el resultado del modelo.

Instrucción 1: Genera código para trazar la curva de precisión y recuperación (PR). Calcula la precisión y la recuperación a partir de las predicciones del modelo y muestra la curva PR con una biblioteca de trazado adecuada.

# prompt: Generate code to plot the Precision-Recall (PR) curve. Calculate precision and recall from the model's predictions and display the PR curve using a suitable plotting library.

import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, auc

# Extract predictions and labels
predictions = transformed_dataset.select("prediction", "label").toPandas()

# Calculate precision and recall
precision, recall, _ = precision_recall_curve(predictions["label"], predictions["prediction"])

# Calculate AUC-PR
pr_auc = auc(recall, precision)

# Plot the PR curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid(True)
plt.show()

Instrucción 2: Genera código para crear una visualización de la matriz de confusión. Calcula la matriz de confusión a partir de las predicciones del modelo y la muestra como un mapa de calor o una tabla con los recuentos de verdaderos positivos (VP), verdaderos negativos (VN), falsos positivos (FP) y falsos negativos (FN).

# prompt: Generate code to create a confusion matrix visualization. Calculate the confusion matrix from the model's predictions and display it as a heat map or a table with counts of true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN).

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# Extract predictions and labels
predictions_and_labels = transformed_dataset.select("prediction", "label").toPandas()

# Calculate the confusion matrix
cm = confusion_matrix(predictions_and_labels["label"], predictions_and_labels["prediction"])

# Create a DataFrame for better visualization
cm_df = pd.DataFrame(cm,
                     index=['Actual Negative', 'Actual Positive'],
                     columns=['Predicted Negative', 'Predicted Positive'])

# Display the confusion matrix as a table
print("Confusion Matrix:")
print(cm_df)

# Display the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=.5)
plt.title('Confusion Matrix')
plt.ylabel('Actual Label')
plt.xlabel('Predicted Label')
plt.show()

# Calculate and display TP, TN, FP, FN
TN, FP, FN, TP = cm.ravel()
print(f"
True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")

10. Escribe predicciones en BigQuery

Usa Gemini para generar código y escribir tus predicciones en una tabla nueva de tu conjunto de datos de BigQuery.

Instrucción: Con Spark, escribe el conjunto de datos transformado en BigQuery.

# prompt: Using Spark, write the transformed dataset to BigQuery.

transformed_dataset.write.format("bigquery").option("table", f"{PROJECT_ID}.demo.predictions").mode("overwrite").save()

11. Guarda el modelo en Cloud Storage

Con la funcionalidad nativa de MLlib, guarda tu modelo en Cloud Storage. El servidor de inferencia carga el modelo desde aquí.

MODEL_PATH = "models/prediction_model"
pipeline_model.write().overwrite().save(f"gs://{BUCKET_NAME}/{MODEL_PATH}")

12. Crea un servidor de inferencia

Cloud Run es una herramienta flexible para ejecutar apps web sin servidores. Utiliza contenedores de Docker para brindar a los usuarios la máxima capacidad de personalización. En este lab, se configura un Dockerfile para ejecutar una app de Flask que potencia PySpark. Este contenedor se ejecuta en Cloud Run para realizar inferencias sobre los datos de entrada. Puedes encontrar el código aquí.

Clona el repo con el código del servidor de inferencia.

!git clone https://github.com/GoogleCloudPlatform/devrel-demos.git

Visualiza el Dockerfile.

FROM python:3.12-slim

# Install OpenJDK-21 (Required for Spark)
RUN apt-get update && \
    apt-get install -y openjdk-21-jre-headless procps && \
    rm -rf /var/lib/apt/lists/*

ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
ENV PORT=8080

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY main.py .

CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app"]

Visualiza el código de Python para el servidor.

import os
import json
import logging

from flask import Flask, request, jsonify
from google.cloud import storage
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import hash, col

# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Initialization: Spark and Model Loading ---
GCS_BUCKET = os.environ.get("GCS_BUCKET")
GCS_MODEL_PATH = os.environ.get("GCS_MODEL_PATH")
LOCAL_MODEL_PATH = "/tmp/model"

try:
    spark = SparkSession.builder \
        .appName("CloudRunSparkService") \
        .master("local[*]") \
        .getOrCreate()
    logging.info("Spark Session successfully initialized.")
except Exception as e:
    logging.error(f"Failed to initialize Spark Session: {e}")
    raise

def download_directory(bucket_name, prefix, local_path):
    """Downloads a directory from GCS to local filesystem."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=prefix))
    
    if len(blobs) == 0:
        logging.error(f"No files found in GCS bucket {bucket_name} at prefix {prefix}")
        return
    
    for blob in blobs:
        if blob.name.endswith("/"): continue # Skip directories
        
        # Structure local paths
        relative_path = os.path.relpath(blob.name, prefix)
        local_file_path = os.path.join(local_path, relative_path)
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
        
        blob.download_to_filename(local_file_path)
    print(f"Model downloaded to {local_path}")

# Load model
def load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH):
    """Download and load model on startup to avoid latency per request."""
    global MODEL
    if not os.path.exists(LOCAL_MODEL_PATH):
        download_directory(GCS_BUCKET, GCS_MODEL_PATH, LOCAL_MODEL_PATH)
    
    logging.info(f"Loading PySpark model from {GCS_MODEL_PATH}")
    # Load the Spark ML model
    try:
        MODEL = PipelineModel.load(LOCAL_MODEL_PATH)
        logging.info("Spark model loaded successfully.")
    except Exception as e:
        logging.error(f"Failed to load model: {e}")
        raise
    
# Load Model on Startup
load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH)

# --- Flask Application Setup ---
app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict():
    """
    Handles incoming POST requests for inference.
    Expects JSON data that can be converted into a Spark DataFrame.
    """
    if MODEL is None:
        return jsonify({"error": "Model failed to load at startup."}), 500

    try:
        # 1. Get data from the request
        data = request.get_json()
        
        # 2. Check length of list
        data_len = len(data)
        cap = 100
        if data_len > cap:
            return jsonify({"error": f"Too many records. Count: {data_len}, Max: {cap}"}), 400

        # 2. Create Spark DataFrame
        df = spark.createDataFrame(data)
        
        # 3. Transform data
        input_df = df.select(
            col("age").cast("DOUBLE").alias("age"), 
            (hash(col("country")).cast("BIGINT") * 1.0).alias("country_hash"),
            (hash(col("gender")).cast("BIGINT") * 1.0).alias("gender_hash"),
            (hash(col("traffic_source")).cast("BIGINT") * 1.0).alias("traffic_source_hash")
        )

        # 3. Perform Inference
        predictions_df = MODEL.transform(input_df)

        # 4. Prepare results (collect and serialize)
        results = [p.prediction for p in predictions_df.select("prediction").collect()]

        # 5. Return JSON response
        return jsonify({"predictions": results})

    except Exception as e:
        logging.error(f"An error occurred during prediction: {e}")
        #return jsonify({"error": str(e)}), 500
        raise e  
    
# Gunicorn entry point uses 'app' from this file
if __name__ == '__main__':
    # Local testing only: Cloud Run uses Gunicorn/CMD command
    app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

Implementa el servidor de inferencia.

import subprocess

command = [
    "gcloud",
    "run",
    "deploy",
    "inference-server",
    "--source",
    "/content/devrel-demos/data-analytics/dataproc-webinar/data-science/inference-server",
    "--region",
    f"{REGION}",
    "--port",
    "8080",
    "--memory",
    "2Gi",
    "--allow-unauthenticated",
    "--set-env-vars",
    f"GCS_BUCKET={BUCKET_NAME},GCS_MODEL_PATH={MODEL_PATH}",
    "--startup-probe",
    "tcpSocket.port=8080,initialDelaySeconds=240,failureThreshold=3,timeoutSeconds=240,periodSeconds=240"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

Copia la URL del servidor de inferencia del resultado en una variable nueva. Será similar a https://inference-server-123456789.us-central1.run.app..

INFERENCE_SERVER_URL = "<YOUR_SERVER_URL>"

Prueba el servidor de inferencia.

import requests

age = "25.0"
country = "United States"
traffic_source = "Search"
gender = "F"

response = requests.post(
    f"{INFERENCE_SERVER_URL}/predict",
    json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
    headers={"Content-Type": "application/json"}
)

print(response.json())

El resultado debería ser 1.0 o 0.0.

{'predictions': [1.0]}

13. Configura un agente

Usa Agent Engine para crear un agente que pueda realizar inferencias. Agent Engine es parte de la plataforma de Vertex AI, un conjunto de servicios que permite a los desarrolladores implementar, administrar y escalar agentes de IA en producción. Tiene muchas herramientas, como la evaluación de agentes, los contextos de sesión y la ejecución de código. Es compatible con muchos frameworks autónomos populares, incluido el Kit de desarrollo de agentes (ADK). El ADK es un framework de código abierto basado en agentes que, si bien se creó y optimizó para su uso con Gemini y el ecosistema de Google, es independiente del modelo. Está diseñado para que el desarrollo de agentes se parezca más al desarrollo de software.

Inicializa el cliente de Vertex AI.

import vertexai
from vertexai import agent_engines # For the prebuilt templates

client = vertexai.Client(  # For service interactions via client.agent_engines
    project=f"{PROJECT_ID}",
    location=f"{REGION}",
)

Define una función para consultar el modelo implementado.

def predict_purchase(
    age: str = "25.0",
    country: str = "United States",
    traffic_source: str = "Search",
    gender: str = "M",
):
    """Predicts whether or not a user will purchase a product.

    Args:
        age: The age of the user.
        country: The country of the user. One of: "China", "Poland", "Germany", "United States", "Spain", "United Kingdom", "España", "Japan", "Brasil", "Colombia", "Belgium", "South Korea", "Austria", "France", "Australia".
        Traffic_source: The source of the user's traffic. One of: "Display", "Email", "Search", "Organic", "Facebook".
        gender: The gender of the user. One of: "M" or "F".

    Returns:
        True if the model output is 1.0, False otherwise.
    """
    import requests
    response = requests.post(
        f"{INFERENCE_SERVER_URL}/predict",
        json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
        headers={"Content-Type": "application/json"}
    )
    return response.json()

Prueba la función pasando parámetros de muestra.

predict_purchase(age=25.0, country="United States", traffic_source="Search", gender="M")

Con el ADK, define un agente a continuación y proporciona la función predict_purchase como herramienta.

from google.adk.agents import Agent
from vertexai import agent_engines

agent = Agent(
   model="gemini-2.5-flash",
   name='purchase_prediction_agent',
   tools=[predict_purchase]
)

Pasa una consulta para probar el agente de forma local.

app = agent_engines.AdkApp(agent=agent)
async for event in app.async_stream_query(
    user_id="123",
    message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
    try:
        print(event['content']['parts'][0]['text'])
    except:
      continue

Implementa el modelo en Agent Engine.

remote_agent = client.agent_engines.create(
    agent=app,
    config={
        "requirements": ["google-cloud-aiplatform[agent_engines,adk]"],
        "staging_bucket": f"gs://{BUCKET_NAME}",
        "display_name": "purchase-predictor",
        "description": "Agent that predicts whether or not a user will purchase a product.",
    }
)

Una vez que termines, visualiza el modelo implementado en la consola de Cloud.

Vuelve a consultar el modelo. Ahora, apunta al agente implementado en lugar de a la versión local.

async for event in remote_agent.async_stream_query(
    user_id="123",
    message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
    try:
        print(event['content']['parts'][0]['text'])
    except:
      continue

14. Limpia

Borra todos los recursos de Google Cloud que creaste. Ejecutar comandos de limpieza como este es una práctica recomendada fundamental para evitar cargos futuros.

# Delete the deployed agent.
remote_agent.delete(force=True)

# Delete the inference server.
import subprocess

command = [
    "gcloud",
    "run",
    "services",
    "delete",
    "inference-server",
    "--region",
    f"{REGION}",
    "--quiet"
]

subprocess.run(command, capture_output=True, text=True)

# Delete the BigQuery dataset.
bigquery_client = bigquery.Client()

bigquery_client.delete_dataset(
    f"{PROJECT_ID}.demo", delete_contents=True, not_found_ok=True
)

# Delete the Storage bucket.
storage_client = storage.Client()

bucket = storage_client.get_bucket(BUCKET_NAME)
bucket.delete_blobs(list(bucket.list_blobs()))
bucket.delete()

15. ¡Felicitaciones!

¡Lo lograste! En este codelab, realizaste las siguientes tareas:

  • Usó notebooks de BigQuery Studio para ejecutar un flujo de trabajo de ciencia de datos.
  • Se creó una conexión a Apache Spark con Google Cloud Serverless for Apache Spark y con la tecnología de Spark Connect.
  • Se usó Lightning Engine para acelerar las cargas de trabajo de Apache Spark hasta 4.3 veces.
  • Se cargaron datos de BigQuery con la integración integrada entre Apache Spark y BigQuery.
  • Exploramos los datos con la generación de código asistida por Gemini.
  • Realizó ingeniería de atributos con el framework de procesamiento de datos de Apache Spark.
  • Entrenaste y evaluaste un modelo de clasificación con la biblioteca de aprendizaje automático nativa de Apache Spark, MLlib.
  • Implementaste un servidor de inferencia para el modelo de clasificación con Flask y Cloud Run.
  • Implementaste un agente para consultar el servidor de inferencia con lenguaje natural con Agent Engine y el Kit de desarrollo de agentes (ADK).

¿Qué sigue?