Cómo compilar flujos de trabajo de operaciones de AA con Airflow 2 en GKE

1. Descripción general

852dc8844309ffb8.png

En este CodeLab, se muestra cómo integrar prácticas de DevOps en el aprendizaje automático (MLOps) mediante la descarga de un conjunto de datos, la mejora de un modelo y la implementación del LLM en Google Kubernetes Engine (GKE) con un DAG de Airflow con la menor cantidad de abstracción. Como resultado, usamos los comandos de gcloud y no Terraform para que puedas seguir el lab paso a paso y comprender fácilmente cada proceso desde la perspectiva del ingeniero de plataformas y del ingeniero de aprendizaje automático.

En esta guía práctica, se explica cómo aprovechar Airflow para optimizar tus flujos de trabajo de IA y se proporciona una demostración clara y práctica de todo el ciclo de vida de las operaciones de AA mediante la configuración de un DAG.

Qué aprenderás

  • Fomentar una mayor colaboración y comprensión entre los ingenieros de la plataforma y de aprendizaje automático derribando los silos de conocimiento y mejorando los flujos de trabajo
  • Comprende cómo implementar, usar y administrar Airflow 2 en GKE
  • Configura un DAG de Airflow de extremo a extremo
  • Crea la base para sistemas de aprendizaje automático de nivel de producción con GKE
  • Instrumentar y poner en funcionamiento sistemas de aprendizaje automático
  • Comprende cómo la ingeniería de plataformas se ha convertido en un pilar de asistencia fundamental para las operaciones de AA

Qué se logra con este codelab

  • Puedes hacer preguntas sobre películas desde un LLM que optimizamos en función de Gemma-2-9b-it, que se entrega en GKE con vLLM.

Público objetivo

  • Ingenieros de aprendizaje automático
  • Ingenieros de plataforma
  • Científicos de datos
  • Ingenieros de datos
  • Ingenieros DevOps
  • Arquitecto de plataforma
  • Ingenieros de Atención al cliente

Este codelab no está diseñado para

  • Como introducción a GKE o a los flujos de trabajo de IA/AA
  • Como repaso de todo el conjunto de funciones de Airflow

2. La ingeniería de plataformas ayuda a los ingenieros o científicos de aprendizaje automático.

16635a8284b994c.png

La ingeniería de plataformas y MLOps son disciplinas interdependientes que colaboran para crear un entorno sólido y eficiente para el desarrollo y la implementación de AA.

Alcance: La ingeniería de plataformas tiene un alcance más amplio que las operaciones de AA, ya que abarca todo el ciclo de vida del desarrollo de software y proporciona las herramientas y la infraestructura para ello.

MLOps cierra la brecha entre el desarrollo, la implementación y la inferencia de AA.

Experiencia: Por lo general, los ingenieros de plataformas tienen una amplia experiencia en tecnologías de infraestructura, como la computación en la nube, la contenedorización y la administración de datos.

Los ingenieros de MLOps se especializan en el desarrollo, la implementación y la supervisión de modelos de AA, y suelen tener habilidades de ingeniería de software y ciencia de datos.

Herramientas: Los ingenieros de plataforma crean herramientas para el aprovisionamiento de infraestructura, la administración de configuraciones, la orquestación de contenedores y el andamiaje de aplicaciones. Los ingenieros de MLOps utilizan herramientas para el entrenamiento, la experimentación, la implementación, la supervisión y el control de versiones de los modelos de AA.

3. Configuración y requisitos de Google Cloud

Configuración del entorno de autoaprendizaje

  1. Accede a Google Cloud Console y crea un proyecto nuevo o reutiliza uno existente. Si aún no tienes una cuenta de Gmail o de Google Workspace, debes crear una.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • El Nombre del proyecto es el nombre visible de los participantes de este proyecto. Es una cadena de caracteres que no se utiliza en las APIs de Google. Puedes actualizarla cuando quieras.
  • El ID del proyecto es único en todos los proyectos de Google Cloud y es inmutable (no se puede cambiar después de configurarlo). La consola de Cloud genera automáticamente una cadena única. Por lo general, no importa cuál sea. En la mayoría de los codelabs, deberás hacer referencia al ID de tu proyecto (suele identificarse como PROJECT_ID). Si no te gusta el ID que se generó, podrías generar otro aleatorio. También puedes probar uno propio y ver si está disponible. No se puede cambiar después de este paso y se usa el mismo durante todo el proyecto.
  • Recuerda que hay un tercer valor, un número de proyecto, que usan algunas APIs. Obtén más información sobre estos tres valores en la documentación.
  1. A continuación, deberás habilitar la facturación en la consola de Cloud para usar las APIs o los recursos de Cloud. Ejecutar este codelab no costará mucho, tal vez nada. Para cerrar recursos y evitar que se generen cobros más allá de este instructivo, puedes borrar los recursos que creaste o borrar el proyecto. Los usuarios nuevos de Google Cloud son aptos para participar en el programa Prueba gratuita de $300.

Inicia Cloud Shell

Si bien Google Cloud se puede operar de manera remota desde tu laptop, en este codelab usarás Cloud Shell, un entorno de línea de comandos que se ejecuta en la nube.

Activar Cloud Shell

  1. En la consola de Cloud, haz clic en Activar Cloud Shell853e55310c205094.png.

3c1dabeca90e44e5.png

Si es la primera vez que inicias Cloud Shell, aparecerá una pantalla intermedia en la que se describe qué es. Si aparece una pantalla intermedia, haz clic en Continuar.

9c92662c6a846a5c.png

El aprovisionamiento y la conexión a Cloud Shell solo tomará unos minutos.

9f0e51b578fecce5.png

Esta máquina virtual está cargada con todas las herramientas de desarrollo necesarias. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud, lo que permite mejorar considerablemente el rendimiento de la red y la autenticación. Gran parte de tu trabajo en este codelab, si no todo, se puede hacer con un navegador.

Una vez que te conectes a Cloud Shell, deberías ver que se te autenticó y que el proyecto se configuró con tu ID de proyecto.

  1. En Cloud Shell, ejecuta el siguiente comando para confirmar que tienes la autenticación:
gcloud auth list

Resultado del comando

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. En Cloud Shell, ejecuta el siguiente comando para confirmar que el comando gcloud conoce tu proyecto:
gcloud config list project

Resultado del comando

[core]
project = <PROJECT_ID>

De lo contrario, puedes configurarlo con el siguiente comando:

gcloud config set project <PROJECT_ID>

Resultado del comando

Updated property [core/project].

4. Paso 1: Regístrate y autenticate en Kaggle

Para comenzar el CodeLab, debes crear una cuenta en Kaggle, una plataforma comunitaria en línea para científicos de datos y entusiastas del aprendizaje automático propiedad de Google que aloja un amplio repositorio de conjuntos de datos disponibles públicamente para varios dominios. Desde este sitio, descargarás el conjunto de datos de RottenTomatoes, que se usa para entrenar tu modelo.

  • Regístrate en Kaggle. Puedes usar el SSO de Google para acceder.
  • Aceptar los Términos y condiciones
  • Ve a Configuración y obtén tu nombre de usuario nombre de usuario
  • En la sección de la API, selecciona "Crear token nuevo desde" Kaggle, que descargará kaggle.json.
  • Si tienes algún problema, visita la página de asistencia aquí.

5. Paso 2: Regístrate y autenticate en HuggingFace

HuggingFace es una ubicación central para que cualquier persona interactúe con la tecnología de aprendizaje automático. Alberga 900,000 modelos, 200,000 conjuntos de datos y 300,000 apps de demostración (espacios), todos de código abierto y disponibles públicamente.

  • Regístrate en HuggingFace: Crea una cuenta con un nombre de usuario. No puedes usar el SSO de Google.
  • Cómo confirmar tu dirección de correo electrónico
  • Ve aquí y acepta la licencia del modelo Gemma-2-9b-it.
  • Crea un token de HuggingFace aquí.
  • Registra las credenciales del token, ya que las necesitarás más adelante.

6. Paso 3: Crea los recursos de infraestructura de Google Cloud necesarios

Configurarás GKE, GCE y Artifact Registry, y aplicarás roles de IAM con la federación de identidades para cargas de trabajo.

Tu flujo de trabajo de IA emplea dos grupos de nodos, uno para el entrenamiento y otro para la inferencia. El grupo de nodos de entrenamiento usa una VM de GCE g2-standard-8 equipada con una GPU Nvidia L4 Tensor Core. El grupo de nodos de inferencia usa una VM g2-standard-24 equipada con dos GPU Nvidia L4 Tensor Core. Cuando especifiques la región, elige una en la que se admita la GPU requerida ( vínculo).

En Cloud Shell, ejecuta los siguientes comandos:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

Crea tus manifiestos de YAML

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Crea 3 buckets de Google Cloud Storage (GCS)

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. Paso 4: Instala Airflow en GKE a través del gráfico de Helm

Ahora, implementamos Airflow 2 con Helm. Apache Airflow es una plataforma de administración de flujos de trabajo de código abierto para canalizaciones de ingeniería de datos. Más adelante, analizaremos el conjunto de funciones de Airflow 2.

values.yaml para el gráfico de Helm de Airflow

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Implementa Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. Paso 5: Inicializa Airflow con conexiones y variables

Una vez que se haya implementado Airflow 2, podremos comenzar a configurarlo. Definimos algunas variables que leen nuestras secuencias de comandos de Python.

  1. Accede a la IU de Airflow en el puerto 8080 con tu navegador

Obtén la IP externa

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Abre un navegador web y ve a http://<EXTERNAL-IP>:8080 . El acceso es admin / admin.

  1. Crea una conexión predeterminada de GCP en la IU de Airflow. Para ello, ve a Administrador → Conexiones → + Agregar un registro nuevo.
  • ID de conexión: google_cloud_default
  • Tipo de conexión: Google Cloud

Haz clic en Guardar.

  1. Crea las variables necesarias. Para ello, ve a Administrador → Variables → + Agregar un registro nuevo.
  • Clave: BUCKET_DATA_NAME - Valor: Copia de echo $BUCKET_DATA_NAME
  • Clave: GCP_PROJECT_ID - Valor: Copia de echo $DEVSHELL_PROJECT_ID
  • Clave: HF_TOKEN - Valor: Ingresa tu token de HF
  • Clave: KAGGLE_USERNAME - Valor: Ingresa tu nombre de usuario de Kaggle
  • Clave: KAGGLE_KEY - Valor: Cópialo de kaggle.json

Haz clic en Guardar después de cada par clave-valor.

Tu IU debería verse de la siguiente manera:

771121470131b5ec.png

9. Contenedor de código de la aplicación n.° 1: Descarga de datos

En esta secuencia de comandos de Python, nos autenticamos con Kaggle para descargar el conjunto de datos en nuestro bucket de GCS.

La secuencia de comandos en sí está en un contenedor porque se convierte en la unidad de DAG n° 1 y esperamos que el conjunto de datos se actualice con frecuencia, por lo que queremos automatizar este proceso.

Crea un directorio y copia nuestras secuencias de comandos aquí

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

Ahora, crearemos una imagen de contenedor para dataset-download y la enviaremos a Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. Contenedor de código de la aplicación n.° 2: Preparación de datos

Durante el paso de preparación de datos, logramos lo siguiente:

  1. Especifica qué parte del conjunto de datos queremos usar para ajustar nuestro modelo de base.
  2. Carga el conjunto de datos, es decir, lee el archivo CSV en un dataframe de Pandas, que es una estructura de datos de 2 dimensiones para filas y columnas.
  3. Transformación o procesamiento previo de datos: Determina qué partes del conjunto de datos son irrelevantes especificando lo que queremos conservar, lo que, en efecto, quita el resto.
  4. Aplica la función transform a cada fila del DataFrame.
  5. Guarda los datos preparados en el bucket de GCS

Crea un directorio y copia nuestras secuencias de comandos aquí

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. Contenedor de código de la aplicación n.° 3: Ajustes finos

Aquí, usamos Gemma-2-9b-it como modelo de base y, luego, lo ajustamos con nuestro nuevo conjunto de datos.

Esta es la secuencia de pasos que se realiza durante el paso de ajuste fino.

1. Configuración: Importa bibliotecas, define parámetros (para el modelo, los datos y el entrenamiento) y carga el conjunto de datos desde Google Cloud Storage.

2. Cargar modelo: Carga un modelo de lenguaje previamente entrenado con cuantización para mejorar la eficiencia y carga el analizador de texto correspondiente.

3. Configura LoRA: Configura la adaptación de bajo rango (LoRA) para ajustar el modelo de manera eficiente agregando matrices entrenables pequeñas.

4. Entrenar: Define los parámetros de entrenamiento y usa SFTTrainer para ajustar el modelo en el conjunto de datos cargado con el tipo de cuantificación FP16.

5. Guardar y subir: Guarda el modelo y el analizador ajustados de forma local y, luego, súbelos a nuestro bucket de GCS.

Luego, creamos una imagen de contenedor con Cloud Build y la almacenamos en Artifact Registry.

Crea un directorio y copia nuestras secuencias de comandos aquí

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

Ahora, crearemos una imagen de contenedor para ajustarla y enviarla a Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Descripción general de Airflow 2, incluido qué es un DAG

Airflow es una plataforma para orquestar flujos de trabajo y canalizaciones de datos. Usa DAG (grafos acíclicos dirigidos) para definir estos flujos de trabajo en código Python y representar visualmente las tareas y sus dependencias.

Airflow, con sus DAG estáticos y definiciones basadas en Python, es adecuado para programar y administrar flujos de trabajo predefinidos. Su arquitectura incluye una IU fácil de usar para supervisar y administrar estos flujos de trabajo.

En esencia, Airflow te permite definir, programar y supervisar tus canalizaciones de datos con Python, lo que la convierte en una herramienta flexible y potente para la orquestación de flujos de trabajo.

13. Descripción general de nuestro DAG

ec49964ad7d61491.png

DAG significa grafo acíclico dirigido. En Airflow, un DAG representa todo el flujo de trabajo o la canalización. Define las tareas, sus dependencias y el orden de ejecución.

Las unidades de flujo de trabajo dentro del DAG se ejecutan desde un pod en el clúster de GKE, que se inicia desde la configuración de Airflow.

Resumen:

Airflow: Descarga de datos: Esta secuencia de comandos automatiza el proceso de obtener un conjunto de datos de opiniones de películas de Kaggle y almacenarlo en tu bucket de GCS, lo que lo pone a disposición para un procesamiento o análisis adicional en tu entorno de nube.

Airflow: Preparación de datos: El código toma el conjunto de datos sin procesar de las opiniones sobre películas, quita las columnas de datos superfluos que no son necesarias para nuestro caso de uso y borra los conjuntos de datos con valores faltantes. A continuación, estructura el conjunto de datos en un formato de respuesta a preguntas adecuado para el aprendizaje automático y lo vuelve a almacenar en GCS para usarlo más adelante.

Airflow: Model Finetuning: Este código ajusta un modelo de lenguaje grande (LLM) con una técnica llamada LoRA (adaptación de clasificación baja) y, luego, guarda el modelo actualizado. Para comenzar, se carga un LLM previamente entrenado y un conjunto de datos desde Google Cloud Storage. Luego, aplica LoRA para ajustar el modelo de manera eficiente en este conjunto de datos. Por último, guarda el modelo ajustado en Google Cloud Storage para usarlo más adelante en aplicaciones como la generación de texto o la respuesta a preguntas.

Airflow: Publicación de modelos: Publica el modelo ajustado en GKE con vllm para la inferencia.

Airflow: Ciclo de retroalimentación: Se vuelve a entrenar el modelo cada xx veces (por hora, por día o por semana).

En este diagrama, se explica cómo funciona Airflow 2 cuando se ejecuta en GKE.

8691f41166209a5d.png

14. Ajuste fino de un modelo en comparación con el uso de RAG

En este codelab, se ajusta un LLM en lugar de usar la generación aumentada de recuperación (RAG).

Comparemos estos dos enfoques:

Ajuste: Crea un modelo especializado: el ajuste adapta el LLM a una tarea o un conjunto de datos específicos, lo que le permite funcionar de forma independiente sin depender de fuentes de datos externas.

Simplifica la inferencia: Esto elimina la necesidad de tener un sistema y una base de datos de recuperación independientes, lo que genera respuestas más rápidas y económicas, especialmente para casos de uso frecuentes.

RAG: Se basa en el conocimiento externo: la RAG recupera información relevante de una base de conocimiento para cada solicitud, lo que garantiza el acceso a datos específicos y actualizados.

Aumenta la complejidad: La implementación de RAG en un entorno de producción, como un clúster de Kubernetes, suele implicar varios microservicios para el procesamiento y la recuperación de datos, lo que podría aumentar la latencia y los costos computacionales.

Por qué se eligió el ajuste fino:

Si bien RAG sería adecuado para el pequeño conjunto de datos que se usa en este codelab, optamos por el ajuste fino para demostrar un caso de uso típico de Airflow. Esta elección nos permite enfocarnos en los aspectos de la orquestación de flujos de trabajo en lugar de profundizar en los matices de la configuración de infraestructura y microservicios adicionales para RAG.

Conclusión:

Tanto el ajuste fino como el RAG son técnicas valiosas con sus propias fortalezas y debilidades. La opción óptima depende de los requisitos específicos de tu proyecto, como el tamaño y la complejidad de tus datos, las necesidades de rendimiento y las consideraciones de costo.

15. Tarea de DAG n° 1: Crea tu primer paso en Airflow: Descarga de datos

Como descripción general de esta unidad de DAG, nuestro código de Python alojado en una imagen de contenedor descarga el conjunto de datos más reciente de RottenTomatoes de Kaggle.

No copies este código en el bucket de GCS. Como último paso, copiamos mlops-dag.py, que contiene todos los pasos de la unidad de DAG en una sola secuencia de comandos de Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. Tarea del DAG 2: Crea tu segundo paso en Airflow: Preparación de datos

Como descripción general de esta unidad de DAG, cargamos un archivo CSV (rotten_tomatoes_movie_reviews.csv) de GCS en un DataFrame de Pandas.

A continuación, limitamos la cantidad de filas procesadas con DATASET_LIMIT para pruebas y eficiencia de recursos y, por último, convertimos los datos transformados en un conjunto de datos de Hugging Face.

Si observas con atención, verás que entrenamos 1,000 filas en el modelo con "DATASET_LIMIT": "1,000", ya que esto demora 20 minutos en una GPU Nvidia L4.

No copies este código en el bucket de GCS. Copiamos mlops-dag.py en el último paso, que contiene todos los pasos dentro de una secuencia de comandos de Python.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. Tarea del DAG n° 3: Crea tu tercer paso en Airflow: Ajusta el modelo

Como descripción general de esta unidad de DAG, aquí ejecutamos finetune.py para definir mejor el modelo de Gemma con nuestro nuevo conjunto de datos.

No copies este código en el bucket de GCS. Copiamos mlops-dag.py en el último paso, que contiene todos los pasos dentro de una secuencia de comandos de Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. Tarea del DAG n.° 4: Crea tu paso final en Airflow: Inferencia / entrega del modelo

vLLM es una potente biblioteca de código abierto diseñada específicamente para la inferencia de alto rendimiento de LLM. Cuando se implementa en Google Kubernetes Engine (GKE), aprovecha la escalabilidad y la eficiencia de Kubernetes para entregar LLM de manera eficaz.

Resumen de los pasos:

  • Sube el DAG "mlops-dag.py" al bucket de GCS.
  • Copia dos archivos de configuración de YAML de Kubernetes para configurar la inferencia en un bucket de GCS.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Sube tu secuencia de comandos de Python (archivo DAG) y los manifiestos de Kubernetes al bucket de GCS de DAGS.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

En la IU de Airflow, verás mlops-dag.

  1. Selecciona Despausar.
  2. Selecciona Activar DAG para realizar un ciclo de operaciones de maquetación de datos manuales.

d537281b92d5e8bb.png

Una vez que se complete tu DAG, verás un resultado como este en la IU de Airflow.

3ed42abf8987384e.png

Después del paso final, puedes obtener el extremo del modelo y enviar una instrucción para probarlo.

Espera aproximadamente 5 minutos antes de emitir el comando curl para que pueda comenzar la inferencia del modelo y el balanceador de cargas pueda asignar una dirección IP externa.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Resultado:

19. ¡Felicitaciones!

Creaste tu primer flujo de trabajo de IA con una canalización de DAG con Airflow 2 en GKE.

No olvides desaprovisionar los recursos que implementaste.

20. Cómo hacerlo en producción

Si bien el codelab te brindó una idea fantástica de cómo configurar Airflow 2 en GKE, en el mundo real, querrás tener en cuenta algunos de los siguientes temas cuando lo hagas en producción.

Implementa un frontend web con Gradio o herramientas similares.

Configura la supervisión automática de aplicaciones para cargas de trabajo con GKE aquí o exporta métricas de Airflow aquí.

Es posible que necesites GPUs más grandes para ajustar el modelo más rápido, en especial si tienes conjuntos de datos más grandes. Sin embargo, si queremos entrenar el modelo en varias GPUs, debemos dividir el conjunto de datos y dividir el entrenamiento. Aquí tienes una explicación de FSDP con PyTorch (paralelismo de datos completamente fragmentados, con uso compartido de GPU para lograr ese objetivo. Puedes encontrar más información en una entrada de blog de Meta y en este instructivo sobre FSDP con Pytorch.

Google Cloud Composer es un servicio administrado de Airflow, por lo que no necesitas mantener Airflow en sí, solo debes implementar tu DAG y listo.

Más información

Licencia

Este trabajo cuenta con una licencia Atribución 2.0 Genérica de Creative Commons.