1. Übersicht
In diesem CodeLab wird gezeigt, wie Sie DevOps-Praktiken in Machine Learning (MLOps) einbinden, indem Sie einen Datensatz herunterladen, ein Modell optimieren und das LLM mit einem Airflow-DAG mit der geringsten Abstraktion in der Google Kubernetes Engine (GKE) bereitstellen. Daher verwenden wir gcloud-Befehle und nicht Terraform, damit Sie das Lab Schritt für Schritt durcharbeiten und die einzelnen Prozesse sowohl aus der Perspektive des Plattformingenieurs als auch des Machine-Learning-Ingenieurs leicht nachvollziehen können.
In diesem praktischen Leitfaden erfahren Sie, wie Sie mit Airflow Ihre KI-Workflows optimieren. Außerdem wird der gesamte MLOps-Lebenszyklus anhand der Konfiguration eines DAG veranschaulicht.
Lerninhalte
- Bessere Zusammenarbeit und Verständigung zwischen Plattform- und Machine-Learning-Entwicklern durch Aufbrechen von Wissenssilos und Verbesserung von Workflows
- Airflow 2 in GKE bereitstellen, verwenden und verwalten
- Airflow-DAG end-to-end konfigurieren
- Mit GKE die Grundlage für Produktions-ML-Systeme schaffen
- Systeme für maschinelles Lernen instrumentieren und operationalisieren
- Informationen dazu, wie sich Plattformentwicklung zu einer wichtigen Säule für MLOps entwickelt hat
Was Sie in diesem Codelab lernen
- Sie können Fragen zu Filmen an einen LLM stellen, den wir anhand von Gemma-2-9b-it optimiert haben und der in GKE mit vLLM bereitgestellt wird.
Zielgruppe
- Machine Learning Engineers
- Plattformentwickler
- Data Scientists
- Data Engineers
- DevOps-Entwickler
- Plattformarchitekt
- Customer Engineers
Dieses Codelab richtet sich nicht an
- Als Einführung in GKE- oder KI-/ML-Workflows
- Als Übersicht über die gesamte Airflow-Funktionspalette
2. Plattformentwicklung unterstützt Machine Learning Engineers/Scientists
Plattformentwicklung und MLOps sind voneinander abhängige Disziplinen, die zusammen eine robuste und effiziente Umgebung für die Entwicklung und Bereitstellung von ML schaffen.
Umfang: Plattformentwicklung hat einen breiteren Umfang als MLOps. Sie umfasst den gesamten Softwareentwicklungslebenszyklus und stellt die dafür erforderlichen Tools und die Infrastruktur bereit.
MLOps schließt die Lücke zwischen ML-Entwicklung, -Bereitstellung und -Inferenz.
Kompetenz: Plattformingenieure haben in der Regel fundiertes Fachwissen in Infrastrukturtechnologien wie Cloud-Computing, Containerisierung und Datenmanagement.
MLOps-Entwickler sind auf die Entwicklung, Bereitstellung und Überwachung von ML-Modellen spezialisiert und haben oft Kenntnisse in Data Science und Softwareentwicklung.
Tools: Plattformingenieure erstellen Tools für die Infrastrukturbereitstellung, Konfigurationsverwaltung, Container-Orchestrierung und Anwendungs-Scaffolding. MLOps-Entwickler verwenden Tools für das Training, Experimentieren, Bereitstellen, Überwachen und Versionieren von ML-Modellen.
3. Google Cloud-Einrichtung und -Anforderungen
Einrichtung der Umgebung im eigenen Tempo
- Melden Sie sich in der Google Cloud Console an und erstellen Sie ein neues Projekt oder verwenden Sie ein vorhandenes. Wenn Sie noch kein Gmail- oder Google Workspace-Konto haben, müssen Sie ein Konto erstellen.
- Der Projektname ist der Anzeigename für die Teilnehmer dieses Projekts. Es ist ein Zeichenstring, der von Google APIs nicht verwendet wird. Sie können ihn jederzeit aktualisieren.
- Die Projekt-ID ist für alle Google Cloud-Projekte eindeutig und kann nach der Festlegung nicht mehr geändert werden. In der Cloud Console wird automatisch ein eindeutiger String generiert. In der Regel spielt es keine Rolle, wie er lautet. In den meisten Codelabs müssen Sie auf Ihre Projekt-ID verweisen (normalerweise als
PROJECT_ID
gekennzeichnet). Wenn Ihnen die generierte ID nicht gefällt, können Sie eine andere zufällige generieren. Alternativ können Sie Ihr eigenes Konto ausprobieren und prüfen, ob es verfügbar ist. Sie kann nach diesem Schritt nicht mehr geändert werden und bleibt für die Dauer des Projekts bestehen. - Zur Information: Es gibt einen dritten Wert, die Projektnummer, die von einigen APIs verwendet wird. Weitere Informationen zu diesen drei Werten finden Sie in der Dokumentation.
- Als Nächstes müssen Sie die Abrechnung in der Cloud Console aktivieren, um Cloud-Ressourcen/-APIs verwenden zu können. Die Durchführung dieses Codelabs ist kostenlos oder kostet nur sehr wenig. Wenn Sie die Ressourcen herunterfahren möchten, um Kosten nach Abschluss dieser Anleitung zu vermeiden, können Sie die von Ihnen erstellten Ressourcen oder das Projekt löschen. Neuen Google Cloud-Nutzern steht das kostenlose Testprogramm mit einem Guthaben von 300$ zur Verfügung.
Cloud Shell starten
Sie können Google Cloud zwar per Fernzugriff von Ihrem Laptop aus nutzen, in diesem Codelab verwenden Sie jedoch Cloud Shell, eine Befehlszeilenumgebung, die in der Cloud ausgeführt wird.
Cloud Shell aktivieren
- Klicken Sie in der Cloud Console auf Cloud Shell aktivieren
.
Wenn Sie Cloud Shell zum ersten Mal starten, wird ein Zwischenbildschirm mit einer Beschreibung angezeigt. Klicken Sie in diesem Fall auf Weiter.
Die Bereitstellung und Verbindung mit Cloud Shell sollte nur wenige Minuten dauern.
Auf dieser virtuellen Maschine sind alle erforderlichen Entwicklungstools installiert. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft in Google Cloud, was die Netzwerkleistung und Authentifizierung erheblich verbessert. Die meisten, wenn nicht alle Aufgaben in diesem Codelab können mit einem Browser erledigt werden.
Sobald Sie mit Cloud Shell verbunden sind, sollten Sie sehen, dass Sie authentifiziert sind und das Projekt auf Ihre Projekt-ID festgelegt ist.
- Führen Sie in Cloud Shell den folgenden Befehl aus, um zu prüfen, ob Sie authentifiziert sind:
gcloud auth list
Befehlsausgabe
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
- Führen Sie in Cloud Shell den folgenden Befehl aus, um zu prüfen, ob der gcloud-Befehl Ihr Projekt kennt:
gcloud config list project
Befehlsausgabe
[core] project = <PROJECT_ID>
Ist dies nicht der Fall, können Sie die Einstellung mit diesem Befehl vornehmen:
gcloud config set project <PROJECT_ID>
Befehlsausgabe
Updated property [core/project].
4. Schritt 1: Bei Kaggle registrieren und authentifizieren
Bevor Sie mit dem CodeLab beginnen können, müssen Sie ein Konto bei Kaggle erstellen. Kaggle ist eine Online-Community-Plattform für Data Scientists und Interessierte am Thema maschinelles Lernen, die zu Google gehört. Dort finden Sie ein umfangreiches Repository mit öffentlich verfügbaren Datasets für verschiedene Bereiche. Auf dieser Website laden Sie das RottenTomatoes-Dataset herunter, das zum Trainieren Ihres Modells verwendet wird.
- Registrieren Sie sich bei Kaggle. Sie können sich mit Google SSO anmelden.
- Nutzungsbedingungen akzeptieren
- Rufen Sie die Einstellungen auf und sehen Sie sich Ihren Nutzernamen an username.
- Wählen Sie im Abschnitt „API“ die Option „Neues Token von“ > Kaggle aus. Dadurch wird kaggle.json heruntergeladen.
- Wenn Probleme auftreten, besuche die Supportseite.
5. Schritt 2: Bei HuggingFace registrieren und authentifizieren
HuggingFace ist eine zentrale Anlaufstelle für alle, die sich mit der Technologie des maschinellen Lernens beschäftigen möchten. Es beherbergt 900.000 Modelle, 200.000 Datasets und 300.000 Demo-Apps (Spaces), die alle Open Source und öffentlich verfügbar sind.
- HuggingFace registrieren – Konto mit Nutzernamen erstellen, Google SSO kann nicht verwendet werden
- E-Mail-Adresse bestätigen
- Klicken Sie hier und akzeptieren Sie die Lizenz für das Gemma-2-9b-it-Modell.
- Hier können Sie ein HuggingFace-Token erstellen.
- Notieren Sie sich die Token-Anmeldedaten, da Sie sie später benötigen.
6. Schritt 3: Erforderliche Google Cloud-Infrastrukturressourcen erstellen
Sie richten GKE, GCE und Artifact Registry ein und wenden IAM-Rollen mithilfe der Identitätsföderation von Arbeitslasten an.
Ihr KI-Workflow verwendet zwei Knotenpools, einen für das Training und einen für die Inferenz. Der Trainingsknotenpool verwendet eine GCE-VM vom Typ „g2-standard-8“ mit einer Nvidia L4 Tensor Core-GPU. Der Inferenzknotenpool verwendet eine g2-standard-24-VM mit zwei Nvidia L4 Tensor Core-GPUs. Wählen Sie bei der Angabe der Region eine aus, in der die erforderliche GPU unterstützt wird ( Link).
Führen Sie in Cloud Shell die folgenden Befehle aus:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
YAML-Manifeste erstellen
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
Drei Google Cloud Storage-Buckets (GCS) erstellen
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. Schritt 4: Airflow über das Helm-Chart in GKE installieren
Jetzt stellen wir Airflow 2 mit Helm bereit. Apache Airflow ist eine Open-Source- Workflow-Management-Plattform für Data-Engineering-Pipelines. Wir werden uns später die Funktionen von Airflow 2 genauer ansehen.
values.yaml für das Airflow-Helm-Diagramm
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
Airflow 2 bereitstellen
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. Schritt 5: Airflow mit Verbindungen und Variablen initialisieren
Sobald Airflow 2 bereitgestellt wurde, können wir mit der Konfiguration beginnen. Wir definieren einige Variablen, die von unseren Python-Scripts gelesen werden.
- Über einen Browser auf die Airflow-Benutzeroberfläche auf Port 8080 zugreifen
Externe IP-Adresse abrufen
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
Öffnen Sie einen Webbrowser und rufen Sie http://<EXTERNAL-IP>:8080 auf . Die Anmeldedaten sind „admin“/„admin“.
- Erstellen Sie eine Standard-GCP-Verbindung in der Airflow-Benutzeroberfläche. Gehen Sie dazu zu „Verwaltung“ → „Verbindungen“ → „+ Neuen Eintrag hinzufügen“.
- Verbindungs-ID: google_cloud_default
- Verbindungstyp: Google Cloud
Klicken Sie auf „Speichern“.
- Erstellen Sie die erforderlichen Variablen. Gehen Sie dazu zu „Verwaltung“ → „Variablen“ → „+ Neuen Eintrag hinzufügen“.
- Schlüssel: BUCKET_DATA_NAME – Wert: Aus „echo $BUCKET_DATA_NAME“ kopieren
- Schlüssel: GCP_PROJECT_ID – Wert: Aus echo $DEVSHELL_PROJECT_ID kopieren
- Schlüssel: HF_TOKEN – Wert: HF-Token einfügen
- Schlüssel: KAGGLE_USERNAME – Wert: Kaggle-Nutzername einfügen
- Schlüssel: KAGGLE_KEY – Wert: Kopieren Sie diesen aus kaggle.json.
Klicken Sie nach jedem Schlüssel/Wert-Paar auf „Speichern“.
Ihre Benutzeroberfläche sollte so aussehen:
9. Anwendungscode-Container 1 – Datendownload
In diesem Python-Script authentifizieren wir uns bei Kaggle, um das Dataset in unseren GCS-Bucket herunterzuladen.
Das Script selbst ist containerisiert, da es sich um DAG-Einheit 1 handelt. Wir gehen davon aus, dass der Datensatz häufig aktualisiert wird, und möchten dies automatisieren.
Verzeichnis erstellen und Scripts hierhin kopieren
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
Jetzt erstellen wir ein Container-Image für den Datensatzdownload und übertragen es per Push an die Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. Anwendungscodecontainer 2 – Datenvorbereitung
Im Schritt „Datenvorbereitung“ erreichen wir Folgendes:
- Geben Sie an, welchen Teil des Datasets Sie für die Feinabstimmung des Basismodells verwenden möchten.
- Hier wird der Datensatz geladen, d. h. die CSV-Datei wird in einen Pandas-DataFrame gelesen, eine zweidimensionale Datenstruktur für Zeilen und Spalten.
- Datentransformation/-vorbereitung: Festlegen, welche Teile des Datensatzes irrelevant sind, indem angegeben wird, was beibehalten werden soll. Der Rest wird entfernt.
- Wendet die Funktion
transform
auf jede Zeile des DataFrames an. - Die vorbereiteten Daten wieder im GCS-Bucket speichern
Verzeichnis erstellen und Scripts hierhin kopieren
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. Anwendungscode-Container 3 – Feinabstimmung
Hier verwenden wir Gemma-2-9b-it als Basismodell und optimieren es dann mit unserem neuen Datensatz.
Das sind die Schritte, die während der Feinabstimmung ausgeführt werden.
1. Einrichtung: Bibliotheken importieren, Parameter für Modell, Daten und Training definieren und den Datensatz aus Google Cloud Storage laden.
2. Modell laden:Laden Sie ein vortrainiertes Sprachmodell mit Quantisierung für mehr Effizienz und den entsprechenden Tokenizer.
3. LoRA konfigurieren:Richten Sie die Low-Rank Adaptation (LoRA) ein, um das Modell durch Hinzufügen kleiner trainierbarer Matrizen effizient zu optimieren.
4. Trainieren:Definieren Sie die Trainingsparameter und verwenden Sie SFTTrainer
, um das Modell mit dem Quantisierungstyp FP16 auf dem geladenen Dataset zu optimieren.
5. Speichern und hochladen:Speichern Sie das optimierte Modell und den Tokenizer lokal und laden Sie sie dann in unseren GCS-Bucket hoch.
Anschließend erstellen wir mit Cloud Build ein Container-Image und speichern es in der Artifact Registry.
Verzeichnis erstellen und Scripts hierhin kopieren
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
Jetzt erstellen wir ein Container-Image für die Feinabstimmung und übertragen es an die Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. Airflow 2 – Übersicht, einschließlich Definition eines DAG
Airflow ist eine Plattform für die Orchestrierung von Workflows und Datenpipelines. Dabei werden DAGs (Directed Acyclic Graphs) verwendet, um diese Workflows in Python-Code zu definieren und Aufgaben und ihre Abhängigkeiten visuell darzustellen.
Airflow eignet sich mit seinen statischen DAGs und Python-basierten Definitionen gut für die Planung und Verwaltung vordefinierter Workflows. Die Architektur umfasst eine nutzerfreundliche Benutzeroberfläche zum Überwachen und Verwalten dieser Workflows.
Mit Airflow können Sie Ihre Datenpipelines mit Python definieren, planen und überwachen. Das Tool ist daher ein flexibles und leistungsstarkes Tool für die Workflow-Orchestrierung.
13. Übersicht über unsere DAG
DAG steht für „Directed Acyclic Graph“ (gerichteter azyklischer Graph). In Airflow stellt ein DAG den gesamten Workflow oder die gesamte Pipeline dar. Er definiert die Aufgaben, ihre Abhängigkeiten und die Ausführungsreihenfolge.
Die Workflow-Einheiten innerhalb des DAG werden von einem Pod im GKE-Cluster ausgeführt, der über die Airflow-Konfiguration gestartet wird.
Zusammenfassung:
Airflow: Datendownload: Dieses Script automatisiert den Abruf eines Datensatzes mit Filmrezensionen von Kaggle und speichert ihn in Ihrem GCS-Bucket, sodass er für die weitere Verarbeitung oder Analyse in Ihrer Cloud-Umgebung sofort verfügbar ist.
Airflow: Datenvorbereitung: Der Code nimmt den Rohdatensatz der Filmrezensionen, entfernt überflüssige Datenspalten, die für unseren Anwendungsfall nicht erforderlich sind, und löscht Datensätze mit fehlenden Werten. Als Nächstes wird der Datensatz in ein Frage-Antwort-Format strukturiert, das für maschinelles Lernen geeignet ist, und zur späteren Verwendung wieder in GCS gespeichert.
Airflow: Model Finetuning (Airflow: Modell-Feinabstimmung): Mit diesem Code wird ein Large Language Model (LLM) mithilfe der LoRA-Methode (Low-Rank Adaptation) optimiert und dann gespeichert. Zuerst wird ein vorab trainierter LLM und ein Datensatz aus Google Cloud Storage geladen. Anschließend wird LoRA angewendet, um das Modell für diesen Datensatz effizient zu optimieren. Schließlich wird das optimierte Modell wieder in Google Cloud Storage gespeichert, um es später in Anwendungen wie der Textgenerierung oder der Beantwortung von Fragen zu verwenden.
Airflow: Model Serving: Bereitstellung des optimierten Modells in GKE mit vLLM für die Inferenz.
Airflow: Feedbackschleife: Das Modell wird alle xx Mal (stündlich, täglich, wöchentlich) neu trainiert.
In diesem Diagramm wird die Funktionsweise von Airflow 2 in GKE veranschaulicht.
14. Modell optimieren oder RAG verwenden
In diesem CodeLab wird ein LLM optimiert, anstatt Retrieval Augmented Generation (RAG) zu verwenden.
Vergleichen wir diese beiden Ansätze:
Feinabstimmung: Erstellen eines spezialisierten Modells: Bei der Feinabstimmung wird das LLM an eine bestimmte Aufgabe oder einen bestimmten Datensatz angepasst, sodass es unabhängig arbeiten kann, ohne auf externe Datenquellen angewiesen zu sein.
Vereinfacht die Inferenz: Dadurch ist kein separates Abrufsystem und keine separate Datenbank erforderlich. Dies führt zu schnelleren und kostengünstigeren Antworten, insbesondere bei häufigen Anwendungsfällen.
RAG: Verwendet externes Wissen: RAG ruft für jede Anfrage relevante Informationen aus einer Wissensdatenbank ab und sorgt so für Zugriff auf aktuelle und spezifische Daten.
Erhöhte Komplexität: Die Implementierung von RAG in einer Produktionsumgebung wie einem Kubernetes-Cluster umfasst häufig mehrere Mikrodienste für die Datenverarbeitung und ‑abfrage, was zu einer potenziellen Erhöhung der Latenz und der Rechenkosten führen kann.
Gründe für die Feinabstimmung:
RAG wäre für das kleine Dataset in diesem CodeLab geeignet, wir haben uns jedoch für eine Feinabstimmung entschieden, um einen typischen Anwendungsfall für Airflow zu demonstrieren. So können wir uns auf die Aspekte der Workflow-Orchestrierung konzentrieren, anstatt uns mit den Feinheiten der Einrichtung zusätzlicher Infrastruktur und Mikrodienste für RAG zu befassen.
Fazit:
Sowohl die Feinabstimmung als auch die RAG-Methode sind wertvolle Techniken mit eigenen Stärken und Schwächen. Die optimale Auswahl hängt von den spezifischen Anforderungen Ihres Projekts ab, z. B. von der Größe und Komplexität Ihrer Daten, den Leistungsanforderungen und den Kosten.
15. DAG-Aufgabe 1 – Ersten Schritt in Airflow erstellen: Datendownload
Als Übersicht über diese DAG-Einheit lädt unser in einem Container-Image gehosteter Python-Code den neuesten RottenTomatoes-Datensatz von Kaggle herunter.
Kopieren Sie diesen Code nicht in den GCS-Bucket. Als letzten Schritt kopieren wir mlops-dag.py, das alle DAG-Einheitsschritte in einem Python-Script enthält.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. DAG-Aufgabe 2 – Zweiten Schritt in Airflow erstellen: Datenvorbereitung
Als Übersicht über diese DAG-Einheit laden wir eine CSV-Datei (rotten_tomatoes_movie_reviews.csv) aus GCS in einen Pandas-DataFrame.
Als Nächstes begrenzen wir die Anzahl der verarbeiteten Zeilen mit DATASET_LIMIT für Tests und Ressourceneffizienz und konvertieren die transformierten Daten schließlich in ein Hugging Face-Dataset.
Bei genauerem Hinsehen sehen Sie, dass wir 1.000 Zeilen im Modell mit „DATASET_LIMIT“: „1.000“ trainieren. Das liegt daran, dass dies auf einer Nvidia-L4-GPU 20 Minuten dauert.
Kopieren Sie diesen Code nicht in den GCS-Bucket. Im letzten Schritt kopieren wir mlops-dag.py, das alle Schritte in einem Python-Script enthält.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. DAG-Aufgabe 3 – Dritten Schritt in Airflow erstellen: Modell optimieren
Als Übersicht über diese DAG-Einheit führen wir hier „finetune.py“ aus, um das Gemma-Modell mit unserem neuen Datensatz zu optimieren.
Kopieren Sie diesen Code nicht in den GCS-Bucket. Im letzten Schritt kopieren wir mlops-dag.py, das alle Schritte in einem Python-Script enthält.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. DAG-Aufgabe 4 – Letzten Schritt in Airflow erstellen: Inferenz / Bereitstellung des Modells
vLLM ist eine leistungsstarke Open-Source-Bibliothek, die speziell für die Hochleistungsinferenz von LLMs entwickelt wurde. Bei der Bereitstellung in der Google Kubernetes Engine (GKE) werden die Skalierbarkeit und Effizienz von Kubernetes genutzt, um LLMs effektiv bereitzustellen.
Zusammenfassung der Schritte:
- Laden Sie den DAG „mlops-dag.py“ in den GCS-Bucket hoch.
- Kopieren Sie zwei Kubernetes-YAML-Konfigurationsdateien zum Einrichten der Inferenz in einen GCS-Bucket.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
Laden Sie Ihr Python-Script (DAG-Datei) und die Kubernetes-Manifeste in den GCS-Bucket „DAGS“ hoch.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
In der Airflow-Benutzeroberfläche sehen Sie „mlops-dag“.
- Wählen Sie „Pausieren aufheben“ aus.
- Wählen Sie „Trigger DAG“ aus, um einen manuellen MLOps-Zyklus auszuführen.
Sobald der DAG abgeschlossen ist, sehen Sie in der Airflow-Benutzeroberfläche eine Ausgabe wie diese:
Nach dem letzten Schritt können Sie den Modellendpunkt abrufen und einen Prompt senden, um das Modell zu testen.
Warten Sie etwa fünf Minuten, bevor Sie den Befehl „curl“ ausführen, damit die Inferenz des Modells beginnen und der Load Balancer eine externe IP-Adresse zuweisen kann.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
Ausgabe:
19. Glückwunsch!
Sie haben Ihren ersten KI-Workflow mit einer DAG-Pipeline mit Airflow 2 in GKE erstellt.
Denken Sie daran, die Bereitstellung der bereitgestellten Ressourcen aufzuheben.
20. In der Produktionsversion
Das CodeLab hat Ihnen einen hervorragenden Einblick in die Einrichtung von Airflow 2 in GKE gegeben. In der Praxis sollten Sie jedoch einige der folgenden Themen berücksichtigen, wenn Sie dies in der Produktion tun.
Implementieren Sie ein Web-Frontend mit Gradio oder ähnlichen Tools.
Konfigurieren Sie entweder hier das automatische Anwendungsmonitoring für Arbeitslasten mit GKE oder exportieren Sie hier Messwerte aus Airflow.
Möglicherweise benötigen Sie größere GPUs, um das Modell schneller zu optimieren, insbesondere bei größeren Datensätzen. Wenn wir das Modell jedoch auf mehreren GPUs trainieren möchten, müssen wir den Datensatz aufteilen und das Training in Shards aufteilen. Hier finden Sie eine Erklärung von FSDP mit PyTorch (vollständig shardet, parallele Datenberechnung mit GPU-Freigabe). Weitere Informationen finden Sie in diesem Blogpost von Meta und in dieser Anleitung zur FSDP mit PyTorch.
Google Cloud Composer ist ein verwalteter Airflow-Dienst. Sie müssen Airflow also nicht selbst verwalten, sondern können einfach Ihren DAG bereitstellen und loslegen.
Weitere Informationen
- Airflow-Dokumentation: https://airflow.apache.org/
Lizenz
Dieser Text ist mit einer Creative Commons Attribution 2.0 Generic License lizenziert.