1. Panoramica
Questo CodeLab mostra come integrare le pratiche DevOps nel machine learning (MLOps) scaricando un set di dati, perfezionando un modello ed eseguendo il deployment del modello LLM su Google Kubernetes Engine (GKE) utilizzando un DAG Airflow con il minor livello di astrazione. Di conseguenza, utilizziamo i comandi gcloud e non terraform per consentirti di seguire il laboratorio passo passo e comprendere facilmente ogni processo dal punto di vista sia del Platform Engineer sia del Machine Learning Engineer.
Questa guida pratica ti illustrerà come utilizzare Airflow per semplificare i flussi di lavoro di IA, fornendo una dimostrazione chiara e pratica dell'intero ciclo di vita di MLOps mediante la configurazione di un DAG.
Obiettivi didattici
- Favorire una maggiore collaborazione e comprensione tra gli ingegneri di piattaforma e di machine learning abbattendo i silos di conoscenza e migliorando i flussi di lavoro
- Scopri come eseguire il deployment, utilizzare e gestire Airflow 2 su GKE
- Configurare un DAG Airflow end-to-end
- Creare le basi per sistemi di machine learning di livello di produzione con GKE
- Strumentare e mettere in funzione i sistemi di machine learning
- Scopri in che modo l'ingegneria di piattaforma è diventata un pilastro di supporto fondamentale per MLOps
Scopo di questo Codelab
- Puoi porre domande sui film a un modello LLM ottimizzato in base a Gemma-2-9b-it, pubblicato in GKE con vLLM.
Pubblico di destinazione
- Machine Learning Engineer
- Platform Engineer
- Data scientist
- Data Engineer
- DevOps Engineer
- Platform Architect
- Customer Engineer
Questo Codelab non è destinato
- Come introduzione ai flussi di lavoro GKE o AI/ML
- Come panoramica dell'intero set di funzionalità di Airflow
2. L'ingegneria di piattaforma aiuta gli ingegneri/scienziati di machine learning
L'ingegneria di piattaforma e MLOps sono discipline interdipendenti che collaborano per creare un ambiente solido ed efficiente per lo sviluppo e il deployment di soluzioni ML.
Ambito:l'ingegneria di piattaforma ha un ambito più ampio di MLOps, in quanto comprende l'intero ciclo di vita dello sviluppo software e fornisce gli strumenti e l'infrastruttura necessari.
MLOps colma il divario tra lo sviluppo, il deployment e l'inferenza di ML.
Competenze:in genere, gli ingegneri delle piattaforme dispongono di una solida esperienza in tecnologie di infrastruttura come cloud computing, containerizzazione e gestione dei dati.
Gli ingegneri MLOps sono specializzati nello sviluppo, nel deployment e nel monitoraggio dei modelli di ML e spesso possiedono competenze di data science e ingegneria del software.
Strumenti:gli ingegneri della piattaforma creano strumenti per il provisioning dell'infrastruttura, la gestione della configurazione, l'orchestrazione dei container e lo scaffolding delle applicazioni. Gli ingegneri MLOps utilizzano strumenti per l'addestramento, la sperimentazione, il deployment, il monitoraggio e il controllo delle versioni dei modelli ML.
3. Configurazione e requisiti di Google Cloud
Configurazione dell'ambiente a tuo ritmo
- Accedi alla console Google Cloud e crea un nuovo progetto o riutilizzane uno esistente. Se non hai ancora un account Gmail o Google Workspace, devi crearne uno.
- Il nome del progetto è il nome visualizzato per i partecipanti al progetto. Si tratta di una stringa di caratteri non utilizzata dalle API di Google. Puoi sempre aggiornarlo.
- L'ID progetto è univoco per tutti i progetti Google Cloud ed è immutabile (non può essere modificato dopo essere stato impostato). La console Cloud genera automaticamente una stringa univoca; di solito non ti interessa quale sia. Nella maggior parte dei codelab, dovrai fare riferimento al tuo ID progetto (in genere identificato come
PROJECT_ID
). Se l'ID generato non ti piace, puoi generarne un altro casuale. In alternativa, puoi provare il tuo e vedere se è disponibile. Non può essere modificato dopo questo passaggio e rimane invariato per tutta la durata del progetto. - Per tua informazione, esiste un terzo valore, un Numero progetto, utilizzato da alcune API. Scopri di più su tutti e tre questi valori nella documentazione.
- Successivamente, dovrai abilitare la fatturazione nella console Cloud per utilizzare le API/risorse Cloud. La partecipazione a questo codelab non ha costi, o quasi. Per arrestare le risorse ed evitare di incorrere in fatturazione al termine di questo tutorial, puoi eliminare le risorse che hai creato o il progetto. I nuovi utenti di Google Cloud sono idonei al programma Prova senza costi di 300$.
Avvia Cloud Shell
Sebbene Google Cloud possa essere utilizzato da remoto dal tuo laptop, in questo codelab utilizzerai Cloud Shell, un ambiente a riga di comando in esecuzione nel cloud.
Attiva Cloud Shell
- Nella console Cloud, fai clic su Attiva Cloud Shell
.
Se è la prima volta che avvii Cloud Shell, viene visualizzata una schermata intermedia che descrive di cosa si tratta. Se viene visualizzata una schermata intermedia, fai clic su Continua.
Dovrebbero bastare pochi istanti per eseguire il provisioning e connettersi a Cloud Shell.
Questa macchina virtuale è caricata con tutti gli strumenti di sviluppo necessari. Offre una home directory permanente da 5 GB e viene eseguita in Google Cloud, migliorando notevolmente le prestazioni e l'autenticazione della rete. Gran parte, se non tutto, del lavoro in questo codelab può essere svolto con un browser.
Una volta eseguita la connessione a Cloud Shell, dovresti vedere che il tuo account è già autenticato e il progetto è già impostato sul tuo ID progetto.
- Esegui questo comando in Cloud Shell per verificare che l'account sia autenticato:
gcloud auth list
Output comando
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
- Esegui il seguente comando in Cloud Shell per verificare che il comando gcloud conosca il tuo progetto:
gcloud config list project
Output comando
[core] project = <PROJECT_ID>
In caso contrario, puoi impostarlo con questo comando:
gcloud config set project <PROJECT_ID>
Output comando
Updated property [core/project].
4. Passaggio 1: registrati e autenticati su Kaggle
Per iniziare il Codelab, devi creare un account su Kaggle, una piattaforma comunitaria online per data scientist e appassionati di machine learning di proprietà di Google che ospita un vasto repository di set di dati disponibili pubblicamente per vari domini. Da questo sito scaricherai il set di dati RottenTomatoes, utilizzato per addestrare il modello.
- Registrati a Kaggle, puoi utilizzare il Single Sign-On di Google per accedere
- Accetta i Termini e condizioni
- Vai a Impostazioni e recupera il tuo nome utente username
- Nella sezione API, seleziona "Crea nuovo token da" Kaggle per scaricare kaggle.json
- In caso di problemi, vai alla pagina di assistenza qui
5. Passaggio 2: registrati e autenticati su HuggingFace
HuggingFace è un punto di riferimento centralizzato per chiunque voglia interagire con la tecnologia di machine learning. Ospita 900.000 modelli, 200.000 set di dati e 300.000 app di dimostrazione (Spaces), tutti open source e disponibili pubblicamente.
- Registrati a HuggingFace: crea un account con nome utente, non puoi utilizzare SSO di Google
- Confermare il proprio indirizzo email
- Vai qui e accetta la licenza per il modello Gemma-2-9b-it
- Crea un token HuggingFace qui
- Registra le credenziali del token, ti serviranno in seguito
6. Passaggio 3: crea le risorse di infrastruttura Google Cloud necessarie
Configura GKE, GCE, Artifact Registry e applica i ruoli IAM utilizzando la federazione delle identità per i carichi di lavoro.
Il flusso di lavoro di IA utilizza due pool di nodi, uno per l'addestramento e uno per l'inferenza. Il node pool di addestramento utilizza una VM GCE g2-standard-8 dotata di una GPU Nvidia L4 Tensor Core. Il node pool di inferenza utilizza una VM g2-standard-24 dotata di due GPU Nvidia L4 Tensor Core. Quando specifichi la regione, scegline una in cui è supportata la GPU richiesta ( Link).
In Cloud Shell, esegui i seguenti comandi:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
Crea i manifest YAML
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
Crea tre bucket Google Cloud Storage (GCS)
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. Passaggio 4: installa Airflow su GKE tramite il grafico Helm
Ora eseguiamo il deployment di Airflow 2 utilizzando Helm. Apache Airflow è una piattaforma di gestione del flusso di lavoro open source per le pipeline di data engineering. Più avanti parleremo dell'insieme di funzionalità di Airflow 2.
values.yaml per il grafico Helm di Airflow
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
Esegui il deployment di Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. Passaggio 5: inizializza Airflow con connessioni e variabili
Una volta disegnato Airflow 2, possiamo iniziare a configurarlo. Definiamo alcune variabili, che vengono lette dai nostri script Python.
- Accedi all'interfaccia utente di Airflow sulla porta 8080 con il browser
Ottieni l'IP esterno
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
Apri un browser web e vai all'indirizzo http://<EXTERNAL-IP>:8080 . L'accesso è admin / admin
- Crea una connessione Google Cloud predefinita all'interno dell'interfaccia utente di Airflow, quindi vai ad Amministrazione → Connessioni → + Aggiungi un nuovo record
- ID connessione: google_cloud_default
- Tipo di connessione: Google Cloud
Fai clic su Salva.
- Crea le variabili necessarie, quindi vai ad Amministrazione → Variabili → + Aggiungi un nuovo record
- Chiave: BUCKET_DATA_NAME - Valore: copia da echo $BUCKET_DATA_NAME
- Chiave: GCP_PROJECT_ID - Valore: copia da echo $DEVSHELL_PROJECT_ID
- Chiave: HF_TOKEN - Valore: inserisci il token HF
- Chiave: KAGGLE_USERNAME - Valore: inserisci il tuo nome utente Kaggle
- Chiave: KAGGLE_KEY - Valore: copia questo da kaggle.json
Fai clic su Salva dopo ogni coppia chiave-valore.
L'interfaccia utente dovrebbe avere il seguente aspetto:
9. Contenitore del codice dell'applicazione 1 - Download dei dati
In questo script Python, eseguiamo l'autenticazione con Kaggle per scaricare il set di dati nel nostro bucket GCS.
Lo script stesso è in container perché diventa l'unità DAG 1 e prevediamo che il set di dati venga aggiornato di frequente, quindi vogliamo automatizzare questa operazione.
Crea la directory e copia qui i nostri script
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
Ora creiamo un'immagine contenitore per il download del set di dati ed eseguiamo il push in Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. Contenitore del codice dell'applicazione 2: preparazione dei dati
Durante il passaggio di preparazione dei dati, otteniamo quanto segue:
- Specifica la quantità di set di dati che vogliamo utilizzare per l'ottimizzazione del nostro modello di base
- Carica il set di dati, ovvero legge il file CSV in un dataframe Pandas, che è una struttura di dati 2D per righe e colonne
- Trasformazione / preelaborazione dei dati: determina quali parti del set di dati sono irrilevanti specificando cosa vogliamo conservare, il che in pratica significa rimuovere il resto
- Applica la funzione
transform
a ogni riga del DataFrame - Salvare nuovamente i dati preparati nel bucket GCS
Crea la directory e copia qui i nostri script
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. Contenitore del codice dell'applicazione 3 - Ottimizzazione fine
Qui utilizziamo Gemma-2-9b-it come modello di base e poi lo perfezioniamo con il nostro nuovo set di dati.
Questa è la sequenza di passaggi che si verificano durante il passaggio di ottimizzazione.
1. Configurazione: importa le librerie, definisci i parametri (per il modello, i dati e l'addestramento) e carica il set di dati da Google Cloud Storage.
2. Carica modello: carica un modello linguistico preaddestrato con quantizzazione per l'efficienza e carica il tokenizzatore corrispondente.
3. Configura LoRA: configura l'adattamento a basso ranking (LoRA) per ottimizzare il modello in modo efficiente aggiungendo piccole matrici addestrabili.
4. Addestra: definisci i parametri di addestramento e utilizza SFTTrainer
per perfezionare il modello sul set di dati caricato utilizzando il tipo di quantizzazione FP16.
5. Salva e carica: salva il modello e il tokenizzatore ottimizzati localmente, quindi caricali nel nostro bucket GCS.
Poi creiamo un'immagine contenitore utilizzando Cloud Build e la archiviamo in Artifact Registry.
Crea la directory e copia qui i nostri script
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
Ora creiamo un'immagine container per la messa a punto ed eseguiamo il push in Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. Panoramica di Airflow 2, incluso che cos'è un DAG
Airflow è una piattaforma per l'orchestrazione di flussi di lavoro e pipeline di dati. Utilizza i DAG (Directed Acyclic Graph) per definire questi workflow nel codice Python, rappresentando visivamente le attività e le relative dipendenze.
Airflow, con i suoi DAG statici e le definizioni basate su Python, è adatto per pianificare e gestire i flussi di lavoro predefiniti. La sua architettura include un'interfaccia utente intuitiva per il monitoraggio e la gestione di questi flussi di lavoro.
Essenzialmente, Airflow ti consente di definire, pianificare e monitorare le pipeline di dati utilizzando Python, il che lo rende uno strumento flessibile e potente per l'orchestrazione dei flussi di lavoro.
13. Panoramica del nostro DAG
DAG sta per Directed Acyclic Graph (grafo diretto aciclico). In Airflow, un DAG rappresenta l'intero flusso di lavoro o la pipeline. Definisce le attività, le relative dipendenze e l'ordine di esecuzione.
Le unità di workflow all'interno del DAG vengono eseguite da un pod sul cluster GKE, avviato dalla configurazione di Airflow.
Riepilogo:
Airflow: download dei dati: questo script automatizza il processo di acquisizione di un set di dati di recensioni di film da Kaggle e di archiviazione nel bucket GCS, rendendolo subito disponibile per ulteriori elaborazioni o analisi nel tuo ambiente cloud.
Airflow: preparazione dei dati: il codice prende il set di dati non elaborato delle recensioni dei film, rimuove le colonne di dati estranee non necessarie per il nostro caso d'uso ed elimina i set di dati con valori mancanti. Successivamente, struttura il set di dati in un formato di risposta alle domande adatto al machine learning e lo archivia di nuovo in GCS per un uso successivo.
Airflow: Model Finetuning: questo codice ottimizza un modello linguistico di grandi dimensioni (LLM) utilizzando una tecnica chiamata LoRA (Low-Rank Adaptation) e poi salva il modello aggiornato. Inizia caricando un modello LLM preaddestrato e un set di dati da Google Cloud Storage. Poi applica LoRA per perfezionare in modo efficiente il modello su questo set di dati. Infine, salva il modello perfezionato in Google Cloud Storage per utilizzarlo in un secondo momento in applicazioni come la generazione di testo o la risposta alle domande.
Airflow: pubblicazione del modello: pubblicazione del modello ottimizzato su GKE con vllm per l'inferenza.
Flusso di aria: ciclo di feedback: ricollocazione del modello ogni xx volte (ogni ora, ogni giorno, ogni settimana).
Questo diagramma spiega come funziona Airflow 2 quando viene eseguito su GKE.
14. Ottimizzazione di un modello rispetto all'utilizzo di RAG
Questo codelab perfeziona un LLM anziché utilizzare la Retrieval Augmented Generation (RAG).
Confrontiamo questi due approcci:
Messa a punto:crea un modello specializzato: la messa a punto adatta l'LLM a un'attività o a un set di dati specifico, consentendogli di operare in modo indipendente senza fare affidamento su origini dati esterne.
Semplifica l'inferenza: in questo modo non è necessario un sistema di recupero e un database separati, il che si traduce in risposte più rapide ed economiche, in particolare per i casi d'uso frequenti.
RAG: si basa su conoscenze esterne: la RAG recupera le informazioni pertinenti da una knowledge base per ogni richiesta, garantendo l'accesso a dati specifici e aggiornati.
Aumenta la complessità: l'implementazione di RAG in un ambiente di produzione come un cluster Kubernetes spesso prevede più microservizi per l'elaborazione e il recupero dei dati, con un potenziale aumento della latenza e dei costi di calcolo.
Perché è stata scelta la messa a punto fine:
Sebbene RAG sia adatto per il piccolo set di dati utilizzato in questo CodeLab, abbiamo optato per la messa a punto per dimostrare un caso d'uso tipico di Airflow. Questa scelta ci consente di concentrarci sugli aspetti di orchestrazione del flusso di lavoro anziché approfondire le sfumature della configurazione di infrastruttura e microservizi aggiuntivi per RAG.
Conclusione:
Sia la messa a punto fine che la RAG sono tecniche preziose con i propri punti di forza e di debolezza. La scelta ottimale dipende dai requisiti specifici del progetto, come le dimensioni e la complessità dei dati, le esigenze di rendimento e i fattori di costo.
15. Attività DAG 1: crea il tuo primo passaggio in Airflow: scaricamento dei dati
Come panoramica di questa unità DAG, il nostro codice Python ospitato in un'immagine contenitore scarica l'ultimo set di dati RottenTomatoes da Kaggle.
Non copiare questo codice nel bucket GCS. Come ultimo passaggio, copiamo mlops-dag.py, che contiene tutti i passaggi dell'unità DAG in uno script Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. Attività DAG 2: crea il secondo passaggio in Airflow: preparazione dei dati
Come panoramica di questa unità DAG, carichiamo un file CSV (rotten_tomatoes_movie_reviews.csv) da GCS in un DataFrame Pandas.
Successivamente, limitiamo il numero di righe elaborate utilizzando DATASET_LIMIT per i test e l'efficienza delle risorse e infine convertiamo i dati trasformati in un set di dati Hugging Face.
Se guardi attentamente, noterai che stiamo addestrando 1000 righe nel modello con "DATASET_LIMIT": "1000", perché ci vogliono 20 minuti su una GPU Nvidia L4 per farlo.
Non copiare questo codice nel bucket GCS. Nell'ultimo passaggio, copiamo mlops-dag.py, che contiene tutti i passaggi all'interno di uno script Python.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. Attività DAG 3: crea il terzo passaggio in Airflow: ottimizzazione del modello
Come panoramica di questa unità DAG, qui eseguiamo finetune.py per perfezionare il modello Gemma con il nostro nuovo set di dati.
Non copiare questo codice nel bucket GCS. Nell'ultimo passaggio, copiamo mlops-dag.py, che contiene tutti i passaggi all'interno di uno script Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. Attività DAG 4: crea l'ultimo passaggio in Airflow: inferenza / pubblicazione del modello
vLLM è una potente libreria open source progettata appositamente per l'inferenza ad alte prestazioni degli LLM. Se viene eseguito il deployment su Google Kubernetes Engine (GKE), sfrutta la scalabilità e l'efficienza di Kubernetes per pubblicare efficacemente i modelli LLM.
Riepilogo dei passaggi:
- Carica il DAG "mlops-dag.py" nel bucket GCS.
- Copia due file di configurazione YAML di Kubernetes per configurare l'inferenza in un bucket GCS.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
Carica lo script Python (file DAG) e i manifest di Kubernetes nel bucket GCS DAGS.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
Nell'interfaccia utente di Airflow vedrai mlops-dag.
- Seleziona Riproduci.
- Seleziona Attiva DAG per eseguire un ciclo MLOps manuale.
Al termine del DAG, nell'interfaccia utente di Airflow vedrai un output simile a questo.
Dopo il passaggio finale, puoi recuperare l'endpoint del modello e inviare un prompt per testarlo.
Attendi circa 5 minuti prima di emettere il comando curl, in modo che l'inferenza del modello possa iniziare e il bilanciatore del carico possa assegnare un indirizzo IP esterno.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
Output:
19. Complimenti!
Hai creato il tuo primo flusso di lavoro di IA utilizzando una pipeline DAG con Airflow 2 su GKE.
Non dimenticare di annullare il provisioning delle risorse di cui hai eseguito il deployment.
20. Eseguire questa operazione in produzione
Sebbene il CodeLab ti abbia fornito un'ottima panoramica su come configurare Airflow 2 su GKE, nella realtà dovrai prendere in considerazione alcuni dei seguenti argomenti quando esegui questa operazione in produzione.
Implementa un frontend web utilizzando Gradio o strumenti simili.
Configura il monitoraggio automatico delle applicazioni per i carichi di lavoro con GKE qui o esporta le metriche da Airflow qui.
Potresti aver bisogno di GPU più grandi per ottimizzare il modello più rapidamente, soprattutto se hai set di dati più grandi. Tuttavia, se vogliamo addestrare il modello su più GPU, dobbiamo suddividere il set di dati e suddividere l'addestramento. Ecco una spiegazione di FSDP con PyTorch (elaborazione parallela dei dati completamente suddivisi in parti, utilizzando la condivisione della GPU per raggiungere questo obiettivo. Puoi trovare ulteriori informazioni qui in un post del blog di Meta e un altro in questo tutorial su FSDP con Pytorch.
Google Cloud Composer è un servizio Airflow gestito, quindi non devi occuparti della manutenzione di Airflow stesso, ma devi solo eseguire il deployment del DAG e iniziare a utilizzarlo.
Scopri di più
- Documentazione di Airflow: https://airflow.apache.org/
Licenza
Questo lavoro è concesso in licenza ai sensi di una licenza Creative Commons Attribution 2.0 Generic.