Tworzenie przepływów pracy MLOps za pomocą Airflow 2 w GKE

1. Omówienie

852dc8844309ffb8.png

Ten moduł CodeLab pokazuje, jak zintegrować praktyki DevOps z uczenia maszynowego (MLOps) przez pobranie zbioru danych, dopracowanie modelu i wdrażanie LLM w Google Kubernetes Engine (GKE) za pomocą DAG Airflow z najmniejszą ilością abstrakcji. Z tego powodu używamy poleceń gcloud, a nie terraform, aby umożliwić Ci krok po kroku wykonywanie ćwiczeń i łatwe zrozumienie każdego procesu z perspektywy zarówno inżyniera platformy, jak i inżyniera systemów uczących się.

W tym praktycznym przewodniku dowiesz się, jak korzystać z Airflow, aby usprawnić przepływy pracy związane z AI. Otrzymasz też jasne i praktyczne omówienie całego cyklu MLOps z konfiguracją DAG.

Czego się nauczysz

  • Wzmacnianie współpracy i wzajemnego zrozumienia między inżynierami platform i inżynierami systemów uczących się poprzez rozbicie silosów wiedzy i ulepszanie przepływów pracy
  • Dowiedz się, jak wdrażać i używać Airflow 2 w GKE oraz zarządzać tymi usługami
  • Konfigurowanie DAG Airflow od początku do końca
  • Tworzenie podstawy systemów uczenia maszynowego na potrzeby produkcji za pomocą GKE
  • Instrumentowanie i wdrażanie systemów uczących się
  • Dowiedz się, jak zespół Platform Engineering stał się kluczowym elementem wsparcia dla MLOps

Osiągnięcia w ramach tego ćwiczenia z programowania

  • Możesz zadawać pytania na temat filmów modelowi LLM, który został dostrojony na podstawie modelu Gemma-2-9b-it i obsługiwany w GCE z modelem vLLM.

Odbiorcy docelowi

  • Inżynierowie systemów uczących się
  • Inżynierowie platformy
  • badacze danych,
  • Inżynierowie danych
  • Inżynierowie DevOps
  • Platform Architect
  • Inżynierowie ds. obsługi klienta

To ćwiczenie z programowania nie jest przeznaczone

  • jako wprowadzenie do GKE lub przepływów pracy AI/ML.
  • jako przegląd całego zestawu funkcji Airflow.

2. Platform Engineering ułatwia pracę inżynierom i naukom zajmującym się systemami uczącymi się

16635a8284b994c.png

Platform Engineering i MLOps to powiązane ze sobą dyscypliny, które współpracują ze sobą, aby tworzyć niezawodne i wydajne środowisko do tworzenia i wdrażania systemów uczących się.

Zakres: inżynieria platform ma szerszy zakres niż MLOps, obejmując cały cykl życia tworzenia oprogramowania i zapewniając narzędzia oraz infrastrukturę do jego realizacji.

MLOps wypełnia lukę między tworzeniem, wdrażaniem i wykorzystywaniem modeli uczenia maszynowego.

Kompetencje: inżynierowie platformy mają zwykle duże doświadczenie w zakresie technologii infrastrukturalnych, takich jak przetwarzanie w chmurze, konteneryzacja i zarządzanie danymi.

Inżynierowie MLOps specjalizują się w rozwijaniu, wdrażaniu i monitorowaniu modeli ML. Często mają oni umiejętności z zakresu analizy danych i inżynierii oprogramowania.

Narzędzia: inżynierowie platform opracowują narzędzia do obsługi infrastruktury, zarządzania konfiguracją oraz tworzenia i aranżacji aplikacji. Inżynierowie MLOps korzystają z narzędzi do trenowania modeli ML, eksperymentowania, wdrażania, monitorowania i tworzenia wersji.

3. Konfiguracja i wymagania Google Cloud

Konfiguracja środowiska w samodzielnym tempie

  1. Zaloguj się w konsoli Google Cloud i utwórz nowy projekt lub użyj istniejącego. Jeśli nie masz jeszcze konta Gmail ani Google Workspace, musisz je utworzyć.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • Nazwa projektu to wyświetlana nazwa uczestników tego projektu. Jest to ciąg znaków, którego nie używają interfejsy API Google. Zawsze możesz ją zaktualizować.
  • Identyfikator projektu jest niepowtarzalny w ramach wszystkich projektów Google Cloud i nie można go zmienić (po ustawieniu). Konsola Cloud automatycznie generuje unikalny ciąg znaków. Zwykle nie ma znaczenia, jaki to ciąg. W większości laboratoriów z kodem musisz podać identyfikator projektu (zwykle oznaczony jako PROJECT_ID). Jeśli nie podoba Ci się wygenerowany identyfikator, możesz wygenerować inny losowy. Możesz też spróbować użyć własnego adresu e-mail, aby sprawdzić, czy jest on dostępny. Nie można go zmienić po wykonaniu tego kroku. Pozostanie on do końca projektu.
  • Informacyjnie: istnieje jeszcze 3 wartość, numer projektu, której używają niektóre interfejsy API. Więcej informacji o wszystkich 3 wartościach znajdziesz w dokumentacji.
  1. Następnie musisz włączyć rozliczenia w konsoli Cloud, aby korzystać z zasobów i interfejsów API Cloud. Przejście przez ten samouczek nie będzie kosztowne, a być może nawet bezpłatne. Aby wyłączyć zasoby i uniknąć obciążenia opłatami po zakończeniu samouczka, możesz usunąć utworzone zasoby lub usunąć projekt. Nowi użytkownicy Google Cloud mogą skorzystać z bezpłatnego okresu próbnego, w którym mają do dyspozycji środki w wysokości 300 USD.

Uruchom Cloud Shell

Google Cloud można obsługiwać zdalnie z laptopa, ale w tym samouczku będziesz używać Cloud Shell, czyli środowiska wiersza poleceń działającego w chmurze.

Aktywowanie Cloud Shell

  1. W konsoli Google Cloud kliknij Aktywuj Cloud Shell 853e55310c205094.png.

3c1dabeca90e44e5.png

Jeśli uruchamiasz Cloud Shell po raz pierwszy, zobaczysz ekran pośredni, na którym opisano, czym jest to środowisko. Jeśli taki ekran się wyświetlił, kliknij Dalej.

9c92662c6a846a5c.png

Uproszczenie i połączenie z Cloud Shell powinno zająć tylko kilka chwil.

9f0e51b578fecce5.png

Ta maszyna wirtualna zawiera wszystkie niezbędne narzędzia programistyczne. Zawiera stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie poprawia wydajność sieci i uwierzytelnianie. Większość, jeśli nie wszystkie, zadań w tym ćwiczeniu można wykonać w przeglądarce.

Po połączeniu z Cloud Shell powinieneś zobaczyć, że jesteś uwierzytelniony i że projekt jest ustawiony na identyfikator Twojego projektu.

  1. Aby potwierdzić uwierzytelnianie, uruchom w Cloud Shell to polecenie:
gcloud auth list

Wynik polecenia

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Aby sprawdzić, czy polecenie gcloud zna Twój projekt, uruchom w Cloud Shell to polecenie:
gcloud config list project

Wynik polecenia

[core]
project = <PROJECT_ID>

Jeśli nie, możesz ustawić je za pomocą tego polecenia:

gcloud config set project <PROJECT_ID>

Wynik polecenia

Updated property [core/project].

4. Krok 1. Zarejestruj się i uwierzytelnij na Kaggle

Aby rozpocząć korzystanie z CodeLab, musisz utworzyć konto w Kaggle, czyli należącej do Google platformie społecznościowej dla naukowców zajmujących się danymi i entuzjastów systemów uczących się. Kaggle to ogromne repozytorium publicznie dostępnych zbiorów danych z różnych dziedzin. Z tej strony pobierzesz zbiór danych RottenTomatoes, który posłuży do trenowania modelu.

  • Zarejestruj się w Kaggle. Do logowania możesz użyć logowania jednokrotnego Google.
  • Zaakceptuj warunki usługi
  • Otwórz Ustawienia i uzyskaj nazwę użytkownika nazwa_użytkownika.
  • W sekcji API wybierz „Utwórz nowy token z Kaggle”, aby pobrać plik kaggle.json.
  • Jeśli masz jakieś problemy, otwórz stronę pomocy tutaj

5. Krok 2. Zarejestruj się i uwierzytelnij na HuggingFace

HuggingFace to centralne miejsce, w którym każdy może korzystać z technologii uczenia maszynowego. Zawiera 900 tys. modeli, 200 tys. zbiorów danych i 300 tys. aplikacji demonstracyjnych (Spaces), które są dostępne publicznie i są typu open source.

  • Rejestracja w HuggingFace – utwórz konto z nazwą użytkownika. Nie możesz używać logowania jednokrotnego Google.
  • Potwierdzanie adresu e-mail
  • Kliknij tutaj i zaakceptuj licencję na model Gemma-2-9b-it.
  • Utwórz token Hugging Face tutaj
  • Zapisz dane uwierzytelniające tokena, ponieważ będą Ci potrzebne później

6. Krok 3. Utwórz wymagane zasoby infrastruktury Google Cloud

Skonfigurujesz GKE, GCE, rejestr Artifact i zastosujesz role IAM za pomocą federacji tożsamości zadań.

Twój przepływ pracy AI korzysta z 2 poolów węzłów: jeden do trenowania, a drugi do wnioskowania. Węzły w puli do treningu używają maszyny wirtualnej GCE g2-standard-8 z 1 procesorem graficznym Nvidia L4 Tensor Core. Pula węzłów do wnioskowania używa maszyny wirtualnej g2-standard-24 z 2 procesorami graficznymi Nvidia L4 Tensor Core. Podczas określania regionu wybierz ten, w którym wymagany procesor graficzny jest obsługiwany ( link).

W Cloud Shell uruchom te polecenia:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

Tworzenie plików manifestu w formacie YAML

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Utwórz 3 zasoby Google Cloud Storage (GCS)

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. Krok 4. Zainstaluj Airflow w GKE za pomocą karty Helm

Obecnie wdrażamy Airflow 2 za pomocą Helm. Apache Airflow to platforma typu open source do zarządzania przepływem pracy w przypadku potoków inżynierii danych. Zestaw funkcji Airflow 2 omówimy później.

plik values.yaml dla wykresu Helm w Airflow

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Wdrażanie Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. Krok 5. Inicjuj Airflow za pomocą połączeń i zmiennych

Po wdrożeniu Airflow 2 możemy rozpocząć jego konfigurowanie. Definiujemy zmienne, które są odczytywane przez nasze skrypty Pythona.

  1. Otwieranie interfejsu Airflow na porcie 8080 w przeglądarce

Pobieranie zewnętrznego adresu IP

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Otwórz przeglądarkę i wejdź na stronę http://<EXTERNAL-IP>:8080 . Login to admin / admin

  1. Aby utworzyć domyślne połączenie z GCP w interfejsie Airflow, kliknij kolejno Administracja → Połączenia → + Dodaj nowy rekord.
  • Identyfikator połączenia: google_cloud_default
  • Typ połączenia: Google Cloud

Kliknij Zapisz.

  1. Aby utworzyć potrzebne zmienne, kliknij kolejno Administracja → Zmienne → + Dodaj nowy rekord.
  • Klucz: BUCKET_DATA_NAME – wartość: skopiuj z echo $BUCKET_DATA_NAME
  • Klucz: GCP_PROJECT_ID – wartość: skopiuj z echo $DEVSHELL_PROJECT_ID
  • Klucz: HF_TOKEN – wartość: wstaw token HF
  • Klucz: KAGGLE_USERNAME – wartość: wpisz swoją nazwę użytkownika w Kaggle
  • Klucz: KAGGLE_KEY – wartość: skopiuj z pliku kaggle.json

Po każdej parze klucz-wartość kliknij Zapisz.

Interfejs użytkownika powinien wyglądać tak:

771121470131b5ec.png

9. Kontenery kodu aplikacji 1 – pobieranie danych

W tym skrypcie Pythona uwierzytelniamy się w Kaggle, aby pobrać zbiór danych do naszego zasobnika GCS.

Skrypt jest spakowany, ponieważ staje się jednostką DAG 1. Spodziewamy się, że zbiór danych będzie często aktualizowany, dlatego chcemy zautomatyzować ten proces.

Tworzenie katalogu i kopiowanie do niego naszych skryptów

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

Teraz tworzymy obraz kontenera dla zadania dataset-download i przesyłamy go do Artifact Registry.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. Kontenery kodu aplikacji 2 – przygotowanie danych

Na etapie przygotowania danych:

  1. Określ, jak dużej części zbioru danych chcesz użyć do dostosowania modelu podstawowego
  2. Ładuje zbiór danych, czyli odczytuje plik CSV do ramki danych Pandas, która jest dwuwymiarową strukturą danych dla wierszy i kolumn.
  3. Przekształcanie danych / wstępne przetwarzanie – określ, które części zbioru danych są nieistotne, określając, co chcemy zachować, co w efekcie spowoduje usunięcie reszty.
  4. Funkcja transform jest stosowana do każdego wiersza DataFrame.
  5. Zapisz przygotowane dane z powrotem w zasobniku GCS

Tworzenie katalogu i kopiowanie do niego naszych skryptów

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. Kontener kodu aplikacji 3 – dostrojenie

Tutaj jako modelu podstawowego używamy modelu Gemma-2-9b-it, a potem dostosowujemy go do naszego nowego zbioru danych.

To jest sekwencja kroków, które są wykonywane podczas dopracowywania.

1. Konfiguracja: zaimportuj biblioteki, zdefiniuj parametry (dla modelu, danych i treningu) oraz wczytaj zbiór danych z Google Cloud Storage.

2. Załaduj model: załaduj wytrenowany model językowy z kwantyzacją w celu zwiększenia wydajności i załaduj odpowiedni tokenizer.

3. Konfigurowanie LoRA: skonfiguruj adaptację niskiego rzędu (LoRA), aby efektywnie dostosować model przez dodanie małych mnóżeń do trenowania.

4. Trenowanie: określ parametry trenowania i użyj opcji SFTTrainer, aby dostosować model do załadowanego zbioru danych, stosując typ kwantyzacji FP16.

5. Zapisz i prześlij: zapisz dostrojony model i tokenizer lokalnie, a potem prześlij je do naszego zasobnika GCS.

Następnie tworzymy obraz kontenera za pomocą Cloud Build i przechowujemy go w Artifact Registry.

Tworzenie katalogu i kopiowanie do niego naszych skryptów

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

Teraz tworzymy obrazy kontenera do dostosowania i przesyłamy je do rejestru artefaktów.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Airflow 2 Omówienie, czym jest DAG

Airflow to platforma do sterowania przepływami danych i przepływami danych. Do definiowania tych przepływów pracy w kodzie Pythona używa skierowanych grafów acyklicznych (DAG), które wizualnie przedstawiają zadania i ich zależności.

Airflow, ze swoimi statycznymi DAG-ami i definicjami opartymi na Pythonie, doskonale nadaje się do planowania i zarządzania zdefiniowanymi wstępnie przepływami pracy. Jego architektura obejmuje przyjazny dla użytkownika interfejs do monitorowania i zarządzania tymi przepływami pracy.

Airflow umożliwia definiowanie, planowanie i monitorowanie potoków danych za pomocą Pythona, co czyni go elastycznym i zaawansowanym narzędziem do zarządzania przepływem pracy.

13. Omówienie DAG

ec49964ad7d61491.png

DAG to skrót od Directed Acyclic Graph (kierowany graf acykliczny). W Airflow DAG reprezentuje cały przepływ pracy lub cały potok. Określa ona zadania, ich zależności i kolejność wykonywania.

Jednostki przepływu pracy w DAG są wykonywane z kontenera w klastrze GKE, który został zainicjowany z konfiguracji Airflow.

Podsumowanie:

Airflow: pobieranie danych – ten skrypt automatyzuje proces pobierania zbioru danych z recenzjami filmów z Kaggle i przechowywania go w pojemniku GCS, dzięki czemu dane są łatwo dostępne do dalszego przetwarzania lub analizy w środowisku chmury.

Airflow: przygotowanie danych – kod pobiera surowy zbiór danych z recenzjami filmów, usuwa zbędne kolumny danych, które nie są wymagane w naszym przypadku użycia, oraz usuwa zbiory danych z brakującymi wartościami. Następnie porządkuje zbiór danych w taki sposób, aby odpowiadał formatowi pytań i odpowiedzi odpowiednim do uczenia maszynowego, i zapisują go z powrotem w GCS na potrzeby późniejszego użycia.

Airflow: dostrajanie modelu – ten kod dostosowuje duży model językowy (LLM) za pomocą techniki o nazwie LoRA (Low-Rank Adaptation), a następnie zapisuje zaktualizowany model. Najpierw wczytuje wstępnie wytrenowane LLM i zbiór danych z Google Cloud Storage. Następnie stosuje on LoRA do efektywnego dopracowania modelu na tym zbiorze danych. Na koniec zapisuje dostosowany model z powrotem w Google Cloud Storage, aby można go było później używać w aplikacji, np. do generowania tekstu lub odpowiadania na pytania.

Airflow: udostępnianie modelu – udostępnianie w GKE docelowego modelu z vllm do wnioskowania.

Airflow: sprzęgło zwrotne – ponowne trenowanie modelu co xx razy (co godzinę, co tydzień, co miesiąc).

Ten diagram pokazuje, jak działa Airflow 2 w GKE.

8691f41166209a5d.png

14. Dostrojenie modelu a używanie RAG

W tym CodeLab dostrajamy model LLM, a nie korzystamy z generowania rozszerzonego przez wyszukiwanie w zapisanych informacjach (RAG).

Porównajmy te 2 metody:

Dostosowywanie: tworzy model wyspecjalizowany: dostosowywanie dostosowuje model LLM do określonego zadania lub zbioru danych, umożliwiając mu działanie niezależnie od zewnętrznych źródeł danych.

Upraszcza wnioskowanie: eliminuje potrzebę korzystania z osobnego systemu wyszukiwania i bazy danych, co pozwala uzyskać szybsze i tańsze odpowiedzi, zwłaszcza w przypadku częstego używania.

RAG: korzysta z zewnętrznej wiedzy: RAG pobiera odpowiednie informacje z bazy wiedzy dla każdego zapytania, zapewniając dostęp do aktualnych i szczegółowych danych.

Zwiększanie złożoności: wdrożenie RAG w środowisku produkcyjnym, takim jak klaster Kubernetes, często wymaga korzystania z wielu mikrousług do przetwarzania i pobierania danych, co może zwiększyć opóźnienia i koszty obliczeniowe.

Dlaczego wybrano dostosowanie:

Chociaż algorytm RAG nadaje się do małego zbioru danych użytego w tym samouczku, zdecydowaliśmy się na dostosowanie, aby zademonstrować typowe zastosowanie Airflow. Dzięki temu możemy skupić się na aspektach aranżowania przepływu pracy, zamiast zagłębiać się w subtelności konfigurowania dodatkowej infrastruktury i mikrousług dla RAG.

Podsumowanie:

Zarówno dostosowanie, jak i RAG to przydatne techniki, które mają swoje zalety i wady. Optymalny wybór zależy od konkretnych wymagań projektu, takich jak rozmiar i złożoność danych, wymagania dotyczące wydajności i kwestie związane z kosztami.

15. Zadanie DAG 1. Utwórz pierwszy krok w Airflow: pobieranie danych

W ramach omówienia tego węzła DAG nasz kod Pythona hostowany w obrazie kontenera pobiera najnowszy zbiór danych Rotten Tomatoes z Kaggle.

Nie kopiuj tego kodu do zasobnika GCS. Jako ostatni krok kopiujemy plik mlops-dag.py, który zawiera wszystkie kroki jednostki DAG w ramach jednego skryptu Pythona.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. Zadanie DAG 2. Utwórz drugi krok w Airflow: przygotowanie danych

W ramach omówienia tego węzła DAG wczytujemy plik CSV (rotten_tomatoes_movie_reviews.csv) z GCS do DataFrame w Pandas.

Następnie ograniczamy liczbę przetwarzanych wierszy za pomocą parametru DATASET_LIMIT na potrzeby testowania i zwiększenia efektywności wykorzystania zasobów, a na koniec konwertujemy przekształcone dane na zbiór danych Hugging Face.

Jeśli przyjrzysz się uważnie, zauważysz, że model z wartością „DATASET_LIMIT”: „1000” trenuje 1000 wierszy. Wynika to z tego, że na karcie graficznej Nvidia L4 zajmuje to 20 minut.

Nie kopiuj tego kodu do zasobnika GCS. W ostatnim kroku kopiujemy plik mlops-dag.py, który zawiera wszystkie kroki w ramach jednego skryptu Pythona.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. Zadanie DAG 3. Utwórz trzeci krok w Airflow: Dostrojenie modelu

W ramach omówienia tego węzła DAG uruchamiamy polecenie finetune.py, aby dostosować model Gemma do nowego zbioru danych.

Nie kopiuj tego kodu do zasobnika GCS. W ostatnim kroku kopiujemy plik mlops-dag.py, który zawiera wszystkie kroki w ramach jednego skryptu Pythona.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. DAG Task #4 - Create your final step on Airflow: Inference / Serving the model

vLLM to zaawansowana biblioteka open source zaprojektowana specjalnie do wydajnego wnioskowania na podstawie modeli LLM. Wdrożony w Google Kubernetes Engine (GKE) wykorzystuje skalowalność i wydajność Kubernetes do skutecznego obsługiwania modeli LLM.

Podsumowanie kroków:

  • Prześlij plik DAG „mlops-dag.py” do zasobnika GCS.
  • Skopiuj do zasobnika GCS 2 pliki konfiguracji YAML Kubernetes, aby skonfigurować wnioskowanie.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Prześlij skrypt Pythona (plik DAG) oraz pliki manifestu Kubernetes do zasobnika GCS DAGS.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

W interfejsie Airflow zobaczysz DAG mlops.

  1. Wybierz anuluj wstrzymanie.
  2. Wybierz DAG reguły, aby wykonać ręczny cykl MLOps.

d537281b92d5e8bb.png

Gdy DAG zostanie ukończony, w interfejsie Airflow zobaczysz dane wyjściowe podobne do tych.

3ed42abf8987384e.png

W ostatnim kroku możesz pobrać punkt końcowy modelu i wysłać prompt do jego przetestowania.

Zanim wydasz polecenie curl, odczekaj około 5 minut, aby umożliwić rozpoczęcie wnioskowania przez model i przypisanie zewnętrznego adresu IP przez system równoważenia obciążenia.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Dane wyjściowe:

19. Gratulacje!

Udało Ci się utworzyć pierwszy przepływ pracy AI za pomocą potoku DAG w Airflow 2 w GKE.

Nie zapomnij cofnąć obsługi zaimplementowanych zasobów.

20. W wersji produkcyjnej

W tym CodeLab znajdziesz świetne informacje o konfigurowaniu Airflow 2 w GKE, ale w praktyce, gdy będziesz robić to w produkcji, musisz wziąć pod uwagę kilka z tych tematów.

Wdrożyć interfejs internetowy za pomocą narzędzia Gradio lub podobnego.

Skonfiguruj automatyczne monitorowanie aplikacji w przypadku zadań z GKE tutaj lub wyeksportuj dane z Airflow tutaj.

Aby szybciej dostosować model, możesz potrzebować większych procesorów graficznych, zwłaszcza jeśli masz większe zbiory danych. Jeśli jednak chcemy trenować model na wielu procesorach GPU, musimy podzielić zbiór danych i przeprowadzić trening na częściach. Oto wyjaśnienie FSDP z PyTorch (pełna równoległość danych w ramach partycji przy użyciu udostępniania GPU). Więcej informacji znajdziesz w poście na blogu Meta oraz w tym samouczku na temat FSDP w Pytorch.

Google Cloud Composer to zarządzana usługa Airflow, dzięki której nie musisz utrzymywać samej usługi Airflow. Wystarczy, że wdrożesz DAG.

Więcej informacji

Licencja

To zadanie jest licencjonowane na podstawie ogólnej licencji Creative Commons Attribution 2.0.