Créer des workflows MLOps avec Airflow 2 sur GKE

1. Présentation

852dc8844309ffb8.png

Ce codelab explique comment intégrer les pratiques DevOps au machine learning (MLOps) en téléchargeant un ensemble de données, en affinant un modèle et en déployant le LLM sur Google Kubernetes Engine (GKE) à l'aide d'un DAG Airflow avec le moins d'abstraction possible. Par conséquent, nous utilisons des commandes gcloud et non terraform afin que vous puissiez suivre l'atelier étape par étape et comprendre facilement chaque processus du point de vue de l'ingénieur en plate-forme et de l'ingénieur en machine learning.

Ce guide pratique vous explique comment utiliser Airflow pour simplifier vos workflows d'IA. Il fournit une démonstration claire et pratique de l'ensemble du cycle de vie MLOps en configurant un DAG.

Points abordés

  • Favoriser une meilleure collaboration et une meilleure compréhension entre les ingénieurs de plate-forme et de machine learning en brisant les silos de connaissances et en améliorant les workflows
  • Découvrez comment déployer, utiliser et gérer Airflow 2 sur GKE.
  • Configurer un DAG Airflow de bout en bout
  • Créer la base de systèmes de machine learning de production avec GKE
  • Instrumenter et opérationnaliser des systèmes de machine learning
  • Comprendre comment l'ingénierie de plate-forme est devenue un pilier essentiel de l'assistance pour le MLOps

Ce que vous allez apprendre dans cet atelier de programmation

  • Vous pouvez poser des questions sur les films à partir d'un LLM que nous avons affiné sur la base de Gemma-2-9b-it, diffusé dans GKE avec vLLM.

Audience cible

  • Ingénieurs en machine learning
  • Ingénieurs de plate-forme
  • Data scientists
  • Ingénieurs de données
  • Ingénieurs en DevOps
  • Architecte de plate-forme
  • Ingénieurs client

Cet atelier de programmation n'est pas destiné

  • En guise d'introduction aux workflows GKE ou d'IA/ML
  • En tant que présentation de l'ensemble des fonctionnalités d'Airflow

2. L'ingénierie de plate-forme aide les ingénieurs/scientifiques en machine learning

16635a8284b994c.png

L'ingénierie de plate-forme et le MLOps sont des disciplines interdépendantes qui collaborent pour créer un environnement robuste et efficace pour le développement et le déploiement du ML.

Portée:l'ingénierie de plate-forme a une portée plus large que le MLOps, car elle englobe l'ensemble du cycle de développement logiciel et fournit les outils et l'infrastructure nécessaires.

Le MLOps permet de combler le fossé entre le développement, le déploiement et l'inférence du ML.

Expertise:les ingénieurs de plate-forme possèdent généralement une solide expertise des technologies d'infrastructure telles que le cloud computing, la conteneurisation et la gestion des données.

Les ingénieurs MLOps se spécialisent dans le développement, le déploiement et la surveillance des modèles de ML. Ils possèdent souvent des compétences en science des données et en ingénierie logicielle.

Outils:les ingénieurs de plate-forme créent des outils de provisionnement d'infrastructure, de gestion de la configuration, d'orchestration de conteneurs et d'échafaudage d'applications. Les ingénieurs MLOps utilisent des outils pour l'entraînement, l'expérimentation, le déploiement, la surveillance et le contrôle des versions des modèles de ML.

3. Configuration et prérequis Google Cloud

Configuration de l'environnement au rythme de chacun

  1. Connectez-vous à la console Google Cloud, puis créez un projet ou réutilisez un projet existant. Si vous n'avez pas encore de compte Gmail ou Google Workspace, vous devez en créer un.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • Le nom du projet est le nom à afficher pour les participants au projet. Il s'agit d'une chaîne de caractères non utilisée par les API Google. Vous pourrez toujours le modifier.
  • L'ID du projet est unique parmi tous les projets Google Cloud et non modifiable une fois défini. La console Cloud génère automatiquement une chaîne unique (en général, vous n'y accordez d'importance particulière). Dans la plupart des ateliers de programmation, vous devrez indiquer l'ID de votre projet (généralement identifié par PROJECT_ID). Si l'ID généré ne vous convient pas, vous pouvez en générer un autre de manière aléatoire. Vous pouvez également en spécifier un et voir s'il est disponible. Après cette étape, l'ID n'est plus modifiable et restera donc le même pour toute la durée du projet.
  • Pour information, il existe une troisième valeur (le numéro de projet) que certaines API utilisent. Pour en savoir plus sur ces trois valeurs, consultez la documentation.
  1. Vous devez ensuite activer la facturation dans la console Cloud pour utiliser les ressources/API Cloud. L'exécution de cet atelier de programmation est très peu coûteuse, voire sans frais. Pour désactiver les ressources et éviter ainsi que des frais ne vous soient facturés après ce tutoriel, vous pouvez supprimer le projet ou les ressources que vous avez créées. Les nouveaux utilisateurs de Google Cloud peuvent participer au programme d'essai sans frais pour bénéficier d'un crédit de 300 $.

Démarrer Cloud Shell

Bien que Google Cloud puisse être utilisé à distance depuis votre ordinateur portable, nous allons utiliser Cloud Shell pour cet atelier de programmation, un environnement de ligne de commande exécuté dans le cloud.

Activer Cloud Shell

  1. Dans Cloud Console, cliquez sur Activer Cloud Shell 853e55310c205094.png.

3c1dabeca90e44e5.png

Si vous démarrez Cloud Shell pour la première fois, un écran intermédiaire s'affiche pour décrire de quoi il s'agit. Si tel est le cas, cliquez sur Continuer.

9c92662c6a846a5c.png

Le provisionnement et la connexion à Cloud Shell ne devraient pas prendre plus de quelques minutes.

9f0e51b578fecce5.png

Cette machine virtuelle contient tous les outils de développement nécessaires. Elle comprend un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud, ce qui améliore nettement les performances du réseau et l'authentification. Vous pouvez réaliser une grande partie, voire la totalité, des activités de cet atelier de programmation dans un navigateur.

Une fois connecté à Cloud Shell, vous êtes en principe authentifié, et le projet est défini avec votre ID de projet.

  1. Exécutez la commande suivante dans Cloud Shell pour vérifier que vous êtes authentifié :
gcloud auth list

Résultat de la commande

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Exécutez la commande suivante dans Cloud Shell pour vérifier que la commande gcloud connaît votre projet:
gcloud config list project

Résultat de la commande

[core]
project = <PROJECT_ID>

Si vous obtenez un résultat différent, exécutez cette commande :

gcloud config set project <PROJECT_ID>

Résultat de la commande

Updated property [core/project].

4. Étape 1 : Inscrivez-vous et authentifiez-vous sur Kaggle

Pour commencer le cours, vous devez créer un compte sur Kaggle, une plate-forme communautaire en ligne destinée aux data scientists et aux passionnés de machine learning, qui appartient à Google et héberge un vaste dépôt d'ensembles de données accessibles au public pour divers domaines. C'est sur ce site que vous téléchargerez l'ensemble de données RottenTomatoes, utilisé pour entraîner votre modèle.

  • Inscrivez-vous à Kaggle. Vous pouvez utiliser l'authentification unique Google pour vous connecter.
  • Accepter les conditions d'utilisation
  • Accédez à "Paramètres" et obtenez votre nom d'utilisateur nom d'utilisateur.
  • Dans la section "API", sélectionnez "Create new token from" (Créer un jeton à partir de) Kaggle, ce qui permet de télécharger kaggle.json.
  • Si vous rencontrez des problèmes, accédez à la page d'assistance ici.

5. Étape 2 : Inscrivez-vous et authentifiez-vous sur HuggingFace

HuggingFace est un point d'accès central pour tous ceux qui souhaitent interagir avec la technologie de machine learning. Il héberge 900 000 modèles, 200 000 ensembles de données et 300 000 applications de démonstration (Spaces), tous Open Source et accessibles au public.

  • Inscrivez-vous à HuggingFace : créez un compte avec un nom d'utilisateur. Vous ne pouvez pas utiliser l'authentification unique Google.
  • Confirmer votre adresse e-mail
  • Cliquez ici et acceptez la licence pour le modèle Gemma-2-9b-it.
  • Créez un jeton HuggingFace ici.
  • Enregistrez les identifiants du jeton, car vous en aurez besoin plus tard.

6. Étape 3 : Créez les ressources d'infrastructure Google Cloud requises

Vous allez configurer GKE, GCE, Artifact Registry et appliquer des rôles IAM à l'aide de la fédération d'identité de charge de travail.

Votre workflow d'IA utilise deux pools de nœuds, l'un pour l'entraînement et l'autre pour l'inférence. Le pool de nœuds d'entraînement utilise une VM GCE g2-standard-8 équipée d'un GPU Nvidia L4 Tensor Core. Le pool de nœuds d'inférence utilise une VM g2-standard-24 équipée de deux GPU Nvidia L4 Tensor Core. Lorsque vous spécifiez la région, choisissez-en une où le GPU requis est compatible ( Lien).

Dans Cloud Shell, exécutez les commandes suivantes:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

Créer vos fichiers manifestes YAML

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Créer trois buckets Google Cloud Storage (GCS)

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. Étape 4 : Installer Airflow sur GKE à l'aide du chart Helm

Nous allons maintenant déployer Airflow 2 à l'aide de Helm. Apache Airflow est une plate-forme de gestion des workflows Open Source pour les pipelines d'ingénierie des données. Nous verrons plus tard l'ensemble des fonctionnalités d'Airflow 2.

values.yaml pour le graphique Helm Airflow

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Déployer Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. Étape 5 : Initialiser Airflow avec des connexions et des variables

Une fois Airflow 2 déployé, nous pouvons commencer à le configurer. Nous définissons des variables qui sont lues par nos scripts Python.

  1. Accéder à l'interface utilisateur d'Airflow sur le port 8080 avec votre navigateur

Obtenir l'adresse IP externe

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Ouvrez un navigateur Web et accédez à http://<EXTERNAL-IP>:8080 . La connexion est admin / admin.

  1. Créez une connexion GCP par défaut dans l'interface utilisateur d'Airflow. Pour ce faire, accédez à "Admin" → "Connexions" → "+ Ajouter un enregistrement".
  • ID de connexion: google_cloud_default
  • Type de connexion: Google Cloud

Cliquez sur "Enregistrer".

  1. Créez les variables nécessaires en accédant à "Admin" → "Variables" → "+ Ajouter un enregistrement".
  • Clé: BUCKET_DATA_NAME - Valeur: Copier à partir de echo $BUCKET_DATA_NAME
  • Clé: GCP_PROJECT_ID – Valeur: copier à partir de echo $DEVSHELL_PROJECT_ID
  • Clé: HF_TOKEN - Valeur: insérez votre jeton HF
  • Clé: KAGGLE_USERNAME – Valeur: insérez votre nom d'utilisateur Kaggle
  • Clé: KAGGLE_KEY - Valeur: copiez-la à partir de kaggle.json

Cliquez sur "Enregistrer" après chaque paire clé-valeur.

Votre UI devrait se présenter comme suit:

771121470131b5ec.png

9. Conteneur de code d'application 1 : téléchargement de données

Dans ce script Python, nous nous authentifions auprès de Kaggle pour télécharger l'ensemble de données dans notre bucket GCS.

Le script lui-même est conteneurisé, car il devient l'unité DAG 1. Nous nous attendons à ce que l'ensemble de données soit fréquemment mis à jour. Nous souhaitons donc l'automatiser.

Créer un répertoire et y copier nos scripts

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

Nous allons maintenant créer une image de conteneur pour le téléchargement de l'ensemble de données et la transférer vers Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. Conteneur de code d'application 2 : préparation des données

Voici ce que nous faisons lors de la préparation des données:

  1. Spécifiez la partie de l'ensemble de données que vous souhaitez utiliser pour affiner notre modèle de base.
  2. Charge l'ensemble de données, c'est-à-dire lit le fichier CSV dans un DataFrame Pandas, qui est une structure de données à deux dimensions pour les lignes et les colonnes.
  3. Transformation / prétraitement des données : déterminez les parties de l'ensemble de données qui ne sont pas pertinentes en spécifiant ce que vous souhaitez conserver, ce qui revient à supprimer le reste.
  4. Applique la fonction transform à chaque ligne du DataFrame.
  5. Enregistrer les données préparées dans le bucket GCS

Créer un répertoire et y copier nos scripts

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. Conteneur de code d'application 3 : ajustement

Ici, nous utilisons Gemma-2-9b-it comme modèle de base, puis nous l'affinons avec notre nouvel ensemble de données.

Voici la séquence d'étapes qui se déroulent lors de l'étape de réglage fin.

1. Configuration:importez les bibliothèques, définissez les paramètres (pour le modèle, les données et l'entraînement) et chargez l'ensemble de données à partir de Google Cloud Storage.

2. Charger le modèle:chargez un modèle de langage pré-entraîné avec quantification pour plus d'efficacité, puis chargez le tokenizer correspondant.

3. Configurez LoRA:configurez l'adaptation faible (LoRA) pour affiner efficacement le modèle en ajoutant de petites matrices enregistrables.

4. Entraînement:définissez les paramètres d'entraînement et utilisez SFTTrainer pour affiner le modèle sur l'ensemble de données chargé à l'aide du type de quantification FP16.

5. Enregistrer et importer:enregistrez le modèle et le tokenizer affinés localement, puis importez-les dans notre bucket GCS.

Nous créons ensuite une image de conteneur à l'aide de Cloud Build et la stockons dans Artifact Registry.

Créer un répertoire et y copier nos scripts

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

Nous allons maintenant créer une image de conteneur pour l'ajuster et la transférer vers Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Présentation d'Airflow 2, y compris de ce qu'est un DAG

Airflow est une plate-forme d'orchestration des workflows et des pipelines de données. Il utilise des DAG (graphes orientés acycliques) pour définir ces workflows en code Python, en représentant visuellement les tâches et leurs dépendances.

Airflow, avec ses DAG statiques et ses définitions basées sur Python, est particulièrement adapté à la planification et à la gestion des workflows prédéfinis. Son architecture comprend une interface utilisateur conviviale pour surveiller et gérer ces workflows.

Essentiellement, Airflow vous permet de définir, de planifier et de surveiller vos pipelines de données à l'aide de Python, ce qui en fait un outil flexible et puissant pour l'orchestration de workflows.

13. Présentation de notre DAG

ec49964ad7d61491.png

DAG signifie "graphe orienté acyclique". Dans Airflow, un DAG représente l'ensemble du workflow ou du pipeline. Il définit les tâches, leurs dépendances et l'ordre d'exécution.

Les unités de workflow du DAG sont exécutées à partir d'un pod sur le cluster GKE, lancé à partir de la configuration Airflow.

Récapitulatif :

Airflow : Téléchargement de données : ce script automatise le processus d'obtention d'un ensemble de données de critiques de films à partir de Kaggle et de son stockage dans votre bucket GCS, ce qui le rend facilement disponible pour un traitement ou une analyse ultérieurs dans votre environnement cloud.

Airflow : préparation des données : le code récupère l'ensemble de données brut sur les critiques de films, supprime les colonnes de données superflues qui ne sont pas nécessaires pour notre cas d'utilisation et supprime les ensembles de données contenant des valeurs manquantes. Ensuite, il structure l'ensemble de données dans un format de questions/réponses adapté au machine learning, puis le stocke à nouveau dans GCS pour une utilisation ultérieure.

Airflow : affinage du modèle : ce code effectue l'affinage d'un grand modèle de langage (LLM) à l'aide d'une technique appelée LoRA (Low-Rank Adaptation), puis enregistre le modèle mis à jour. Il commence par charger un LLM pré-entraîné et un ensemble de données à partir de Google Cloud Storage. Il applique ensuite LoRA pour affiner efficacement le modèle sur cet ensemble de données. Enfin, il enregistre le modèle affiné dans Google Cloud Storage pour pouvoir l'utiliser ultérieurement dans des applications telles que la génération de texte ou la réponse aux questions.

Airflow : diffusion du modèle : diffusion du modèle affiné sur GKE avec vllm pour l'inférence.

Airflow : boucle de rétroaction : réentraînement du modèle toutes les xx fois (par heure, par jour ou par semaine).

Ce diagramme explique le fonctionnement d'Airflow 2 lorsqu'il est exécuté sur GKE.

8691f41166209a5d.png

14. Affiner un modèle ou utiliser le RAG

Cet atelier de programmation affine un LLM plutôt que d'utiliser la génération augmentée de récupération (RAG).

Comparons ces deux approches:

Ajustement : crée un modèle spécialisé. L'ajustement adapte le LLM à une tâche ou un ensemble de données spécifiques, ce qui lui permet de fonctionner indépendamment sans s'appuyer sur des sources de données externes.

Simplifie l'inférence: cela élimine le besoin d'un système et d'une base de données de récupération distincts, ce qui permet d'obtenir des réponses plus rapides et moins coûteuses, en particulier pour les cas d'utilisation fréquents.

RAG:s'appuie sur des connaissances externes: le RAG récupère les informations pertinentes à partir d'une base de connaissances pour chaque requête, ce qui garantit l'accès à des données spécifiques et à jour.

Augmente la complexité: l'implémentation de la génération augmentée de récupération dans un environnement de production tel qu'un cluster Kubernetes implique souvent plusieurs microservices pour le traitement et la récupération des données, ce qui peut augmenter la latence et les coûts de calcul.

Pourquoi le réglage fin a-t-il été choisi ?

Bien que l'approche RAG soit adaptée au petit ensemble de données utilisé dans cet atelier de programmation, nous avons opté pour un ajustement précis afin de présenter un cas d'utilisation typique d'Airflow. Ce choix nous permet de nous concentrer sur les aspects d'orchestration des workflows plutôt que sur les subtilités de la configuration d'une infrastructure et de microservices supplémentaires pour RAG.

Conclusion :

Le réglage fin et l'analyse RAG sont deux techniques utiles, chacune avec ses propres avantages et inconvénients. Le choix optimal dépend des exigences spécifiques de votre projet, telles que la taille et la complexité de vos données, vos besoins en termes de performances et les considérations liées aux coûts.

15. Tâche 1 du DAG : Créer votre première étape dans Airflow : le téléchargement de données

Pour présenter cette unité DAG, notre code Python hébergé dans une image de conteneur télécharge le dernier ensemble de données RottenTomatoes de Kaggle.

Ne copiez pas ce code dans le bucket GCS. Nous copions mlops-dag.py comme dernière étape, qui contient toutes les étapes de l'unité DAG dans un script Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. Tâche DAG 2 : Créez votre deuxième étape dans Airflow : préparation des données

Pour présenter cette unité de DAG, nous chargeons un fichier CSV (rotten_tomatoes_movie_reviews.csv) à partir de GCS dans un DataFrame Pandas.

Ensuite, nous limitons le nombre de lignes traitées à l'aide de DATASET_LIMIT pour les tests et l'efficacité des ressources, et nous convertissons finalement les données transformées en ensemble de données Hugging Face.

Si vous regardez attentivement, vous verrez que nous entraînons 1 000 lignes dans le modèle avec "DATASET_LIMIT": "1000", car cela prend 20 minutes sur un GPU Nvidia L4.

Ne copiez pas ce code dans le bucket GCS. Nous copions mlops-dag.py à la dernière étape, qui contient toutes les étapes dans un script Python.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. Tâche 3 du DAG : Créez votre troisième étape dans Airflow : ajustement du modèle

Pour obtenir un aperçu de cette unité DAG, nous exécutons finetune.py pour affiner le modèle Gemma avec notre nouvel ensemble de données.

Ne copiez pas ce code dans le bucket GCS. Nous copions mlops-dag.py à la dernière étape, qui contient toutes les étapes dans un script Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. DAG Task #4 - Create your final step on Airflow: Inference / Serving the model

vLLM est une puissante bibliothèque Open Source conçue spécifiquement pour l'inférence hautes performances des LLM. Lorsqu'il est déployé sur Google Kubernetes Engine (GKE), il exploite l'évolutivité et l'efficacité de Kubernetes pour diffuser efficacement les LLM.

Résumé des étapes:

  • Importez le DAG "mlops-dag.py" dans le bucket GCS.
  • Copiez deux fichiers de configuration YAML Kubernetes dans un bucket GCS pour configurer l'inférence.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Importez votre script Python (fichier DAG), ainsi que les fichiers manifestes Kubernetes dans le bucket GCS DAGS.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

Dans l'interface utilisateur d'Airflow, vous verrez mlops-dag.

  1. Sélectionnez "Réactiver".
  2. Sélectionnez "Déclencher le DAG" pour effectuer un cycle MLOps manuel.

d537281b92d5e8bb.png

Une fois votre DAG terminé, un résultat semblable à celui-ci s'affiche dans l'interface utilisateur d'Airflow.

3ed42abf8987384e.png

Après la dernière étape, vous pouvez récupérer le point de terminaison du modèle et envoyer une requête pour le tester.

Attendez environ cinq minutes avant d'exécuter la commande curl afin que l'inférence du modèle puisse commencer et que l'équilibreur de charge puisse attribuer une adresse IP externe.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Sortie :

19. Félicitations !

Vous avez créé votre premier workflow d'IA à l'aide d'un pipeline DAG avec Airflow 2 sur GKE.

N'oubliez pas de désprovisionner les ressources que vous avez déployées.

20. Effectuer cette opération en production

Cet atelier de programmation vous a fourni un aperçu fantastique de la configuration d'Airflow 2 sur GKE. Toutefois, dans la pratique, vous devrez tenir compte de certains des sujets suivants lorsque vous effectuerez cette opération en production.

Implémentez une interface Web à l'aide de Gradio ou d'un outil similaire.

Cliquez ici pour configurer la surveillance automatique des applications pour les charges de travail avec GKE ou cliquez ici pour exporter des métriques depuis Airflow.

Vous devrez peut-être utiliser des GPU plus volumineux pour affiner le modèle plus rapidement, en particulier si vous disposez d'ensembles de données plus volumineux. Toutefois, si nous voulons entraîner le modèle sur plusieurs GPU, nous devons diviser l'ensemble de données et fractionner l'entraînement. Voici une explication de la FSDP avec PyTorch (parallélisme de données entièrement segmenté, utilisant le partage de GPU pour atteindre cet objectif. Pour en savoir plus, consultez cet article de blog de Meta et ce tutoriel sur la FSDP avec Pytorch.

Google Cloud Composer est un service Airflow géré. Vous n'avez donc pas besoin de gérer Airflow lui-même. Il vous suffit de déployer votre DAG et de vous lancer.

En savoir plus

Licence

Ce document est publié sous une licence Creative Commons Attribution 2.0 Generic.