1. Descripción general
En este CodeLab, se muestra cómo integrar prácticas de DevOps en el aprendizaje automático (MLOps) mediante la descarga de un conjunto de datos, la mejora de un modelo y la implementación del LLM en Google Kubernetes Engine (GKE) con un DAG de Airflow con la menor cantidad de abstracción. Como resultado, usamos los comandos de gcloud y no Terraform para que puedas seguir el lab paso a paso y comprender fácilmente cada proceso desde la perspectiva del ingeniero de plataformas y del ingeniero de aprendizaje automático.
En esta guía práctica, se explica cómo aprovechar Airflow para optimizar tus flujos de trabajo de IA y se proporciona una demostración clara y práctica de todo el ciclo de vida de las operaciones de AA mediante la configuración de un DAG.
Qué aprenderás
- Fomentar una mayor colaboración y comprensión entre los ingenieros de la plataforma y de aprendizaje automático derribando los silos de conocimiento y mejorando los flujos de trabajo
- Comprende cómo implementar, usar y administrar Airflow 2 en GKE
- Configura un DAG de Airflow de extremo a extremo
- Crea la base para sistemas de aprendizaje automático de nivel de producción con GKE
- Instrumentar y poner en funcionamiento sistemas de aprendizaje automático
- Comprende cómo la ingeniería de plataformas se ha convertido en un pilar de asistencia fundamental para las operaciones de AA
Qué se logra con este codelab
- Puedes hacer preguntas sobre películas desde un LLM que optimizamos en función de Gemma-2-9b-it, que se entrega en GKE con vLLM.
Público objetivo
- Ingenieros de aprendizaje automático
- Ingenieros de plataforma
- Científicos de datos
- Ingenieros de datos
- Ingenieros DevOps
- Arquitecto de plataforma
- Ingenieros de Atención al cliente
Este codelab no está diseñado para
- Como introducción a GKE o a los flujos de trabajo de IA/AA
- Como repaso de todo el conjunto de funciones de Airflow
2. La ingeniería de plataformas ayuda a los ingenieros o científicos de aprendizaje automático.
La ingeniería de plataformas y MLOps son disciplinas interdependientes que colaboran para crear un entorno sólido y eficiente para el desarrollo y la implementación de AA.
Alcance: La ingeniería de plataformas tiene un alcance más amplio que las operaciones de AA, ya que abarca todo el ciclo de vida del desarrollo de software y proporciona las herramientas y la infraestructura para ello.
MLOps cierra la brecha entre el desarrollo, la implementación y la inferencia de AA.
Experiencia: Por lo general, los ingenieros de plataformas tienen una amplia experiencia en tecnologías de infraestructura, como la computación en la nube, la contenedorización y la administración de datos.
Los ingenieros de MLOps se especializan en el desarrollo, la implementación y la supervisión de modelos de AA, y suelen tener habilidades de ingeniería de software y ciencia de datos.
Herramientas: Los ingenieros de plataforma crean herramientas para el aprovisionamiento de infraestructura, la administración de configuraciones, la orquestación de contenedores y el andamiaje de aplicaciones. Los ingenieros de MLOps utilizan herramientas para el entrenamiento, la experimentación, la implementación, la supervisión y el control de versiones de los modelos de AA.
3. Configuración y requisitos de Google Cloud
Configuración del entorno de autoaprendizaje
- Accede a Google Cloud Console y crea un proyecto nuevo o reutiliza uno existente. Si aún no tienes una cuenta de Gmail o de Google Workspace, debes crear una.
- El Nombre del proyecto es el nombre visible de los participantes de este proyecto. Es una cadena de caracteres que no se utiliza en las APIs de Google. Puedes actualizarla cuando quieras.
- El ID del proyecto es único en todos los proyectos de Google Cloud y es inmutable (no se puede cambiar después de configurarlo). La consola de Cloud genera automáticamente una cadena única. Por lo general, no importa cuál sea. En la mayoría de los codelabs, deberás hacer referencia al ID de tu proyecto (suele identificarse como
PROJECT_ID
). Si no te gusta el ID que se generó, podrías generar otro aleatorio. También puedes probar uno propio y ver si está disponible. No se puede cambiar después de este paso y se usa el mismo durante todo el proyecto. - Recuerda que hay un tercer valor, un número de proyecto, que usan algunas APIs. Obtén más información sobre estos tres valores en la documentación.
- A continuación, deberás habilitar la facturación en la consola de Cloud para usar las APIs o los recursos de Cloud. Ejecutar este codelab no costará mucho, tal vez nada. Para cerrar recursos y evitar que se generen cobros más allá de este instructivo, puedes borrar los recursos que creaste o borrar el proyecto. Los usuarios nuevos de Google Cloud son aptos para participar en el programa Prueba gratuita de $300.
Inicia Cloud Shell
Si bien Google Cloud se puede operar de manera remota desde tu laptop, en este codelab usarás Cloud Shell, un entorno de línea de comandos que se ejecuta en la nube.
Activar Cloud Shell
- En la consola de Cloud, haz clic en Activar Cloud Shell
.
Si es la primera vez que inicias Cloud Shell, aparecerá una pantalla intermedia en la que se describe qué es. Si aparece una pantalla intermedia, haz clic en Continuar.
El aprovisionamiento y la conexión a Cloud Shell solo tomará unos minutos.
Esta máquina virtual está cargada con todas las herramientas de desarrollo necesarias. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud, lo que permite mejorar considerablemente el rendimiento de la red y la autenticación. Gran parte de tu trabajo en este codelab, si no todo, se puede hacer con un navegador.
Una vez que te conectes a Cloud Shell, deberías ver que se te autenticó y que el proyecto se configuró con tu ID de proyecto.
- En Cloud Shell, ejecuta el siguiente comando para confirmar que tienes la autenticación:
gcloud auth list
Resultado del comando
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
- En Cloud Shell, ejecuta el siguiente comando para confirmar que el comando gcloud conoce tu proyecto:
gcloud config list project
Resultado del comando
[core] project = <PROJECT_ID>
De lo contrario, puedes configurarlo con el siguiente comando:
gcloud config set project <PROJECT_ID>
Resultado del comando
Updated property [core/project].
4. Paso 1: Regístrate y autenticate en Kaggle
Para comenzar el CodeLab, debes crear una cuenta en Kaggle, una plataforma comunitaria en línea para científicos de datos y entusiastas del aprendizaje automático propiedad de Google que aloja un amplio repositorio de conjuntos de datos disponibles públicamente para varios dominios. Desde este sitio, descargarás el conjunto de datos de RottenTomatoes, que se usa para entrenar tu modelo.
- Regístrate en Kaggle. Puedes usar el SSO de Google para acceder.
- Aceptar los Términos y condiciones
- Ve a Configuración y obtén tu nombre de usuario nombre de usuario
- En la sección de la API, selecciona "Crear token nuevo desde" Kaggle, que descargará kaggle.json.
- Si tienes algún problema, visita la página de asistencia aquí.
5. Paso 2: Regístrate y autenticate en HuggingFace
HuggingFace es una ubicación central para que cualquier persona interactúe con la tecnología de aprendizaje automático. Alberga 900,000 modelos, 200,000 conjuntos de datos y 300,000 apps de demostración (espacios), todos de código abierto y disponibles públicamente.
- Regístrate en HuggingFace: Crea una cuenta con un nombre de usuario. No puedes usar el SSO de Google.
- Cómo confirmar tu dirección de correo electrónico
- Ve aquí y acepta la licencia del modelo Gemma-2-9b-it.
- Crea un token de HuggingFace aquí.
- Registra las credenciales del token, ya que las necesitarás más adelante.
6. Paso 3: Crea los recursos de infraestructura de Google Cloud necesarios
Configurarás GKE, GCE y Artifact Registry, y aplicarás roles de IAM con la federación de identidades para cargas de trabajo.
Tu flujo de trabajo de IA emplea dos grupos de nodos, uno para el entrenamiento y otro para la inferencia. El grupo de nodos de entrenamiento usa una VM de GCE g2-standard-8 equipada con una GPU Nvidia L4 Tensor Core. El grupo de nodos de inferencia usa una VM g2-standard-24 equipada con dos GPU Nvidia L4 Tensor Core. Cuando especifiques la región, elige una en la que se admita la GPU requerida ( vínculo).
En Cloud Shell, ejecuta los siguientes comandos:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
Crea tus manifiestos de YAML
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
Crea 3 buckets de Google Cloud Storage (GCS)
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. Paso 4: Instala Airflow en GKE a través del gráfico de Helm
Ahora, implementamos Airflow 2 con Helm. Apache Airflow es una plataforma de administración de flujos de trabajo de código abierto para canalizaciones de ingeniería de datos. Más adelante, analizaremos el conjunto de funciones de Airflow 2.
values.yaml para el gráfico de Helm de Airflow
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
Implementa Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. Paso 5: Inicializa Airflow con conexiones y variables
Una vez que se haya implementado Airflow 2, podremos comenzar a configurarlo. Definimos algunas variables que leen nuestras secuencias de comandos de Python.
- Accede a la IU de Airflow en el puerto 8080 con tu navegador
Obtén la IP externa
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
Abre un navegador web y ve a http://<EXTERNAL-IP>:8080 . El acceso es admin / admin.
- Crea una conexión predeterminada de GCP en la IU de Airflow. Para ello, ve a Administrador → Conexiones → + Agregar un registro nuevo.
- ID de conexión: google_cloud_default
- Tipo de conexión: Google Cloud
Haz clic en Guardar.
- Crea las variables necesarias. Para ello, ve a Administrador → Variables → + Agregar un registro nuevo.
- Clave: BUCKET_DATA_NAME - Valor: Copia de echo $BUCKET_DATA_NAME
- Clave: GCP_PROJECT_ID - Valor: Copia de echo $DEVSHELL_PROJECT_ID
- Clave: HF_TOKEN - Valor: Ingresa tu token de HF
- Clave: KAGGLE_USERNAME - Valor: Ingresa tu nombre de usuario de Kaggle
- Clave: KAGGLE_KEY - Valor: Cópialo de kaggle.json
Haz clic en Guardar después de cada par clave-valor.
Tu IU debería verse de la siguiente manera:
9. Contenedor de código de la aplicación n.° 1: Descarga de datos
En esta secuencia de comandos de Python, nos autenticamos con Kaggle para descargar el conjunto de datos en nuestro bucket de GCS.
La secuencia de comandos en sí está en un contenedor porque se convierte en la unidad de DAG n° 1 y esperamos que el conjunto de datos se actualice con frecuencia, por lo que queremos automatizar este proceso.
Crea un directorio y copia nuestras secuencias de comandos aquí
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
Ahora, crearemos una imagen de contenedor para dataset-download y la enviaremos a Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. Contenedor de código de la aplicación n.° 2: Preparación de datos
Durante el paso de preparación de datos, logramos lo siguiente:
- Especifica qué parte del conjunto de datos queremos usar para ajustar nuestro modelo de base.
- Carga el conjunto de datos, es decir, lee el archivo CSV en un dataframe de Pandas, que es una estructura de datos de 2 dimensiones para filas y columnas.
- Transformación o procesamiento previo de datos: Determina qué partes del conjunto de datos son irrelevantes especificando lo que queremos conservar, lo que, en efecto, quita el resto.
- Aplica la función
transform
a cada fila del DataFrame. - Guarda los datos preparados en el bucket de GCS
Crea un directorio y copia nuestras secuencias de comandos aquí
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. Contenedor de código de la aplicación n.° 3: Ajustes finos
Aquí, usamos Gemma-2-9b-it como modelo de base y, luego, lo ajustamos con nuestro nuevo conjunto de datos.
Esta es la secuencia de pasos que se realiza durante el paso de ajuste fino.
1. Configuración: Importa bibliotecas, define parámetros (para el modelo, los datos y el entrenamiento) y carga el conjunto de datos desde Google Cloud Storage.
2. Cargar modelo: Carga un modelo de lenguaje previamente entrenado con cuantización para mejorar la eficiencia y carga el analizador de texto correspondiente.
3. Configura LoRA: Configura la adaptación de bajo rango (LoRA) para ajustar el modelo de manera eficiente agregando matrices entrenables pequeñas.
4. Entrenar: Define los parámetros de entrenamiento y usa SFTTrainer
para ajustar el modelo en el conjunto de datos cargado con el tipo de cuantificación FP16.
5. Guardar y subir: Guarda el modelo y el analizador ajustados de forma local y, luego, súbelos a nuestro bucket de GCS.
Luego, creamos una imagen de contenedor con Cloud Build y la almacenamos en Artifact Registry.
Crea un directorio y copia nuestras secuencias de comandos aquí
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
Ahora, crearemos una imagen de contenedor para ajustarla y enviarla a Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. Descripción general de Airflow 2, incluido qué es un DAG
Airflow es una plataforma para orquestar flujos de trabajo y canalizaciones de datos. Usa DAG (grafos acíclicos dirigidos) para definir estos flujos de trabajo en código Python y representar visualmente las tareas y sus dependencias.
Airflow, con sus DAG estáticos y definiciones basadas en Python, es adecuado para programar y administrar flujos de trabajo predefinidos. Su arquitectura incluye una IU fácil de usar para supervisar y administrar estos flujos de trabajo.
En esencia, Airflow te permite definir, programar y supervisar tus canalizaciones de datos con Python, lo que la convierte en una herramienta flexible y potente para la orquestación de flujos de trabajo.
13. Descripción general de nuestro DAG
DAG significa grafo acíclico dirigido. En Airflow, un DAG representa todo el flujo de trabajo o la canalización. Define las tareas, sus dependencias y el orden de ejecución.
Las unidades de flujo de trabajo dentro del DAG se ejecutan desde un pod en el clúster de GKE, que se inicia desde la configuración de Airflow.
Resumen:
Airflow: Descarga de datos: Esta secuencia de comandos automatiza el proceso de obtener un conjunto de datos de opiniones de películas de Kaggle y almacenarlo en tu bucket de GCS, lo que lo pone a disposición para un procesamiento o análisis adicional en tu entorno de nube.
Airflow: Preparación de datos: El código toma el conjunto de datos sin procesar de las opiniones sobre películas, quita las columnas de datos superfluos que no son necesarias para nuestro caso de uso y borra los conjuntos de datos con valores faltantes. A continuación, estructura el conjunto de datos en un formato de respuesta a preguntas adecuado para el aprendizaje automático y lo vuelve a almacenar en GCS para usarlo más adelante.
Airflow: Model Finetuning: Este código ajusta un modelo de lenguaje grande (LLM) con una técnica llamada LoRA (adaptación de clasificación baja) y, luego, guarda el modelo actualizado. Para comenzar, se carga un LLM previamente entrenado y un conjunto de datos desde Google Cloud Storage. Luego, aplica LoRA para ajustar el modelo de manera eficiente en este conjunto de datos. Por último, guarda el modelo ajustado en Google Cloud Storage para usarlo más adelante en aplicaciones como la generación de texto o la respuesta a preguntas.
Airflow: Publicación de modelos: Publica el modelo ajustado en GKE con vllm para la inferencia.
Airflow: Ciclo de retroalimentación: Se vuelve a entrenar el modelo cada xx veces (por hora, por día o por semana).
En este diagrama, se explica cómo funciona Airflow 2 cuando se ejecuta en GKE.
14. Ajuste fino de un modelo en comparación con el uso de RAG
En este codelab, se ajusta un LLM en lugar de usar la generación aumentada de recuperación (RAG).
Comparemos estos dos enfoques:
Ajuste: Crea un modelo especializado: el ajuste adapta el LLM a una tarea o un conjunto de datos específicos, lo que le permite funcionar de forma independiente sin depender de fuentes de datos externas.
Simplifica la inferencia: Esto elimina la necesidad de tener un sistema y una base de datos de recuperación independientes, lo que genera respuestas más rápidas y económicas, especialmente para casos de uso frecuentes.
RAG: Se basa en el conocimiento externo: la RAG recupera información relevante de una base de conocimiento para cada solicitud, lo que garantiza el acceso a datos específicos y actualizados.
Aumenta la complejidad: La implementación de RAG en un entorno de producción, como un clúster de Kubernetes, suele implicar varios microservicios para el procesamiento y la recuperación de datos, lo que podría aumentar la latencia y los costos computacionales.
Por qué se eligió el ajuste fino:
Si bien RAG sería adecuado para el pequeño conjunto de datos que se usa en este codelab, optamos por el ajuste fino para demostrar un caso de uso típico de Airflow. Esta elección nos permite enfocarnos en los aspectos de la orquestación de flujos de trabajo en lugar de profundizar en los matices de la configuración de infraestructura y microservicios adicionales para RAG.
Conclusión:
Tanto el ajuste fino como el RAG son técnicas valiosas con sus propias fortalezas y debilidades. La opción óptima depende de los requisitos específicos de tu proyecto, como el tamaño y la complejidad de tus datos, las necesidades de rendimiento y las consideraciones de costo.
15. Tarea de DAG n° 1: Crea tu primer paso en Airflow: Descarga de datos
Como descripción general de esta unidad de DAG, nuestro código de Python alojado en una imagen de contenedor descarga el conjunto de datos más reciente de RottenTomatoes de Kaggle.
No copies este código en el bucket de GCS. Como último paso, copiamos mlops-dag.py, que contiene todos los pasos de la unidad de DAG en una sola secuencia de comandos de Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. Tarea del DAG 2: Crea tu segundo paso en Airflow: Preparación de datos
Como descripción general de esta unidad de DAG, cargamos un archivo CSV (rotten_tomatoes_movie_reviews.csv) de GCS en un DataFrame de Pandas.
A continuación, limitamos la cantidad de filas procesadas con DATASET_LIMIT para pruebas y eficiencia de recursos y, por último, convertimos los datos transformados en un conjunto de datos de Hugging Face.
Si observas con atención, verás que entrenamos 1,000 filas en el modelo con "DATASET_LIMIT": "1,000", ya que esto demora 20 minutos en una GPU Nvidia L4.
No copies este código en el bucket de GCS. Copiamos mlops-dag.py en el último paso, que contiene todos los pasos dentro de una secuencia de comandos de Python.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. Tarea del DAG n° 3: Crea tu tercer paso en Airflow: Ajusta el modelo
Como descripción general de esta unidad de DAG, aquí ejecutamos finetune.py para definir mejor el modelo de Gemma con nuestro nuevo conjunto de datos.
No copies este código en el bucket de GCS. Copiamos mlops-dag.py en el último paso, que contiene todos los pasos dentro de una secuencia de comandos de Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. Tarea del DAG n.° 4: Crea tu paso final en Airflow: Inferencia / entrega del modelo
vLLM es una potente biblioteca de código abierto diseñada específicamente para la inferencia de alto rendimiento de LLM. Cuando se implementa en Google Kubernetes Engine (GKE), aprovecha la escalabilidad y la eficiencia de Kubernetes para entregar LLM de manera eficaz.
Resumen de los pasos:
- Sube el DAG "mlops-dag.py" al bucket de GCS.
- Copia dos archivos de configuración de YAML de Kubernetes para configurar la inferencia en un bucket de GCS.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
Sube tu secuencia de comandos de Python (archivo DAG) y los manifiestos de Kubernetes al bucket de GCS de DAGS.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
En la IU de Airflow, verás mlops-dag.
- Selecciona Despausar.
- Selecciona Activar DAG para realizar un ciclo de operaciones de maquetación de datos manuales.
Una vez que se complete tu DAG, verás un resultado como este en la IU de Airflow.
Después del paso final, puedes obtener el extremo del modelo y enviar una instrucción para probarlo.
Espera aproximadamente 5 minutos antes de emitir el comando curl para que pueda comenzar la inferencia del modelo y el balanceador de cargas pueda asignar una dirección IP externa.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
Resultado:
19. ¡Felicitaciones!
Creaste tu primer flujo de trabajo de IA con una canalización de DAG con Airflow 2 en GKE.
No olvides desaprovisionar los recursos que implementaste.
20. Cómo hacerlo en producción
Si bien el codelab te brindó una idea fantástica de cómo configurar Airflow 2 en GKE, en el mundo real, querrás tener en cuenta algunos de los siguientes temas cuando lo hagas en producción.
Implementa un frontend web con Gradio o herramientas similares.
Configura la supervisión automática de aplicaciones para cargas de trabajo con GKE aquí o exporta métricas de Airflow aquí.
Es posible que necesites GPUs más grandes para ajustar el modelo más rápido, en especial si tienes conjuntos de datos más grandes. Sin embargo, si queremos entrenar el modelo en varias GPUs, debemos dividir el conjunto de datos y dividir el entrenamiento. Aquí tienes una explicación de FSDP con PyTorch (paralelismo de datos completamente fragmentados, con uso compartido de GPU para lograr ese objetivo. Puedes encontrar más información en una entrada de blog de Meta y en este instructivo sobre FSDP con Pytorch.
Google Cloud Composer es un servicio administrado de Airflow, por lo que no necesitas mantener Airflow en sí, solo debes implementar tu DAG y listo.
Más información
- Documentación de Airflow: https://airflow.apache.org/
Licencia
Este trabajo cuenta con una licencia Atribución 2.0 Genérica de Creative Commons.