Создание рабочих процессов MLOps с помощью Airflow 2 на GKE

1. Обзор

852dc8844309ffb8.png

В этой CodeLab показано, как интегрировать методы DevOps в машинное обучение (MLOps) путем загрузки набора данных, уточнения модели и развертывания LLM в Google Kubernetes Engine (GKE) с помощью Airflow DAG с наименьшим количеством абстракции. В результате мы используем команды gcloud, а не terraform, чтобы вы могли шаг за шагом следовать лабораторной работе и легко понимать каждый процесс с точки зрения как инженера платформы, так и инженера по машинному обучению.

Это практическое руководство расскажет вам, как использовать Airflow для оптимизации рабочих процессов ИИ, обеспечивая четкую и практическую демонстрацию всего жизненного цикла MLOps путем настройки DAG.

Что вы узнаете

  • Содействуйте более тесному сотрудничеству и взаимопониманию между инженерами платформ и машинного обучения, разрушая разрозненность знаний и улучшая рабочие процессы.
  • Поймите, как развертывать, использовать и управлять Airflow 2 на GKE.
  • Настройте Airflow DAG от начала до конца
  • Создайте основу для систем машинного обучения промышленного уровня с помощью GKE
  • Инструментирование и внедрение систем машинного обучения
  • Поймите, как разработка платформ стала важнейшим компонентом поддержки MLOps.

Чего достигает эта CodeLab

  • Вы можете задавать вопросы о фильмах из LLM, который мы настроили на основе Gemma-2-9b-it и который подается в GKE с vLLM.

Целевая аудитория

  • Инженеры по машинному обучению
  • Инженеры платформ
  • Специалисты по данным
  • Инженеры данных
  • DevOps-инженеры
  • Архитектор платформ
  • Инженеры по работе с клиентами

Эта CodeLab не предназначена

  • В качестве введения в рабочие процессы GKE или AI/ML.
  • В качестве прогона всего набора функций Airflow.

2. Разработка платформ помогает инженерам/ученым по машинному обучению

16635a8284b994c.png

Проектирование платформ и MLOps — это взаимозависимые дисциплины, которые совместно создают надежную и эффективную среду для разработки и развертывания машинного обучения.

Область применения: проектирование платформ имеет более широкую сферу применения, чем MLOps, оно охватывает весь жизненный цикл разработки программного обеспечения и предоставляет для него инструменты и инфраструктуру.

MLOps устраняет разрыв между разработкой, развертыванием и логическими выводами машинного обучения.

Экспертиза: инженеры платформ обычно имеют большой опыт в инфраструктурных технологиях, таких как облачные вычисления, контейнеризация и управление данными.

Инженеры MLOps специализируются на разработке, развертывании и мониторинге моделей машинного обучения, часто обладая навыками анализа данных и разработки программного обеспечения.

Инструменты. Инженеры платформ создают инструменты для предоставления инфраструктуры, управления конфигурацией, оркестровки контейнеров и создания каркасов приложений. Инженеры MLOps используют инструменты для обучения, экспериментирования, развертывания, мониторинга и управления версиями моделей машинного обучения.

3. Настройка Google Cloud и требования

Самостоятельная настройка среды

  1. Войдите в Google Cloud Console и создайте новый проект или повторно используйте существующий. Если у вас еще нет учетной записи Gmail или Google Workspace, вам необходимо ее создать .

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • Имя проекта — это отображаемое имя для участников этого проекта. Это строка символов, не используемая API Google. Вы всегда можете обновить его.
  • Идентификатор проекта уникален для всех проектов Google Cloud и является неизменяемым (невозможно изменить после его установки). Cloud Console автоматически генерирует уникальную строку; обычно тебя не волнует, что это такое. В большинстве лабораторий кода вам потребуется указать идентификатор проекта (обычно идентифицируемый как PROJECT_ID ). Если вам не нравится сгенерированный идентификатор, вы можете создать другой случайный идентификатор. Альтернативно, вы можете попробовать свой собственный и посмотреть, доступен ли он. Его нельзя изменить после этого шага и он сохраняется на протяжении всего проекта.
  • К вашему сведению, есть третье значение — номер проекта , которое используют некоторые API. Подробнее обо всех трех этих значениях читайте в документации .
  1. Затем вам необходимо включить выставление счетов в Cloud Console, чтобы использовать облачные ресурсы/API. Прохождение этой кодовой лаборатории не будет стоить много, если вообще что-то стоить. Чтобы отключить ресурсы и избежать выставления счетов за пределами этого руководства, вы можете удалить созданные вами ресурсы или удалить проект. Новые пользователи Google Cloud имеют право на участие в программе бесплатной пробной версии стоимостью 300 долларов США .

Запустить Cloud Shell

Хотя Google Cloud можно управлять удаленно с вашего ноутбука, в этой лаборатории вы будете использовать Cloud Shell , среду командной строки, работающую в облаке.

Активировать Cloud Shell

  1. В Cloud Console нажмите «Активировать Cloud Shell». 853e55310c205094.png .

3c1dabeca90e44e5.png

Если вы запускаете Cloud Shell впервые, вы увидите промежуточный экран с описанием того, что это такое. Если вам был представлен промежуточный экран, нажмите «Продолжить» .

9c92662c6a846a5c.png

Подготовка и подключение к Cloud Shell займет всего несколько минут.

9f0e51b578fecce5.png

Эта виртуальная машина загружена всеми необходимыми инструментами разработки. Он предлагает постоянный домашний каталог объемом 5 ГБ и работает в Google Cloud, что значительно повышает производительность сети и аутентификацию. Большую часть, если не всю, работу в этой лаборатории кода можно выполнить с помощью браузера.

После подключения к Cloud Shell вы увидите, что вы прошли аутентификацию и что для проекта установлен идентификатор вашего проекта.

  1. Выполните следующую команду в Cloud Shell, чтобы подтвердить, что вы прошли аутентификацию:
gcloud auth list

Вывод команды

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Выполните следующую команду в Cloud Shell, чтобы убедиться, что команда gcloud знает о вашем проекте:
gcloud config list project

Вывод команды

[core]
project = <PROJECT_ID>

Если это не так, вы можете установить это с помощью этой команды:

gcloud config set project <PROJECT_ID>

Вывод команды

Updated property [core/project].

4. Шаг 1. Зарегистрируйтесь и выполните аутентификацию на Kaggle.

Чтобы начать CodeLab, вам необходимо создать учетную запись на Kaggle, которая представляет собой онлайн-платформу сообщества для специалистов по данным и энтузиастов машинного обучения, принадлежащую Google и размещающую обширное хранилище общедоступных наборов данных для различных доменов. Именно с этого сайта вы загрузите набор данных RottenTomatoes, используемый для обучения вашей модели.

  • Зарегистрируйтесь в Kaggle , вы можете использовать Google SSO для входа.
  • Принять условия использования
  • Перейдите в настройки и получите свое имя пользователя.
  • В разделе API выберите «Создать новый токен из» Kaggle, который загрузит kaggle.json.
  • Если у вас возникли проблемы, перейдите на страницу поддержки здесь.

5. Шаг 2. Зарегистрируйтесь и авторизуйтесь на HuggingFace.

HuggingFace — это центральное место, где каждый может приобщиться к технологиям машинного обучения. На нем размещено 900 тысяч моделей, 200 тысяч наборов данных и 300 тысяч демонстрационных приложений (Spaces), все с открытым исходным кодом и общедоступны.

  • Зарегистрируйтесь в HuggingFace — создайте учетную запись с именем пользователя, вы не сможете использовать Google SSO.
  • Подтвердите свой адрес электронной почты
  • Зайдите сюда и примите лицензию на модель Gemma-2-9b-it.
  • Создайте токен HuggingFace здесь
  • Запишите учетные данные токена, они потребуются вам позже.

6. Шаг 3. Создайте необходимые ресурсы инфраструктуры Google Cloud.

Вы настроите реестр GKE, GCE, Artifact и примените роли IAM, используя федерацию удостоверений рабочей нагрузки .

Ваш рабочий процесс ИИ использует два пула узлов: один для обучения, а другой для вывода. В пуле обучающих узлов используется виртуальная машина GCE g2-standard-8, оснащенная одним графическим процессором Nvidia L4 Tensor Core. В пуле узлов вывода используется виртуальная машина g2-standard-24, оснащенная двумя графическими процессорами Nvidia L4 Tensor Core. Указывая регион, выберите тот, в котором поддерживается нужный графический процессор ( Ссылка ).

В Cloud Shell выполните следующие команды:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

Создайте свои YAML-манифесты

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

пвк-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

пространство имен.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

вывод.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

вывод-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Создайте 3 сегмента Google Cloud Storage (GCS).

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. Шаг №4 — Установите Airflow на GKE через схему управления.

Теперь мы развертываем Airflow 2 с помощью Helm. Apache Airflow — это платформа управления рабочими процессами с открытым исходным кодом для конвейеров обработки данных. Позже мы рассмотрим набор функций Airflow 2.

values.yaml для диаграммы управления воздушным потоком

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Развертывание Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. Шаг №5 — Инициализация воздушного потока с помощью соединений и переменных

После развертывания Airflow 2 мы можем начать его настройку. Мы определяем некоторые переменные, которые считываются нашими скриптами Python.

  1. Получите доступ к пользовательскому интерфейсу Airflow через порт 8080 с помощью браузера.

Получить внешний IP

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Откройте веб-браузер и перейдите по адресу http:// <EXTERNAL-IP> :8080. Логин admin / admin

  1. Создайте соединение GCP по умолчанию в пользовательском интерфейсе Airflow, поэтому перейдите в «Администратор» → «Соединения» → + «Добавить новую запись».
  • Идентификатор подключения: google_cloud_default
  • Тип подключения: Google Cloud

Нажмите «Сохранить».

  1. Создайте необходимые переменные, поэтому перейдите в Администратор → Переменные → + Добавить новую запись.
  • Ключ: BUCKET_DATA_NAME – Значение: копия из echo $BUCKET_DATA_NAME.
  • Ключ: GCP_PROJECT_ID — Значение: копия из echo $DEVSHELL_PROJECT_ID.
  • Ключ: HF_TOKEN — Значение: вставьте свой токен HF.
  • Ключ: KAGGLE_USERNAME — Значение: вставьте свое имя пользователя Kaggle.
  • Ключ: KAGGLE_KEY — Значение: скопируйте это из kaggle.json.

Нажмите «Сохранить» после каждой пары «ключ-значение».

Ваш пользовательский интерфейс должен выглядеть так:

771121470131b5ec.png

9. Контейнер кода приложения №1 — загрузка данных

В этом скрипте Python мы выполняем аутентификацию с помощью Kaggle, чтобы загрузить набор данных в нашу корзину GCS.

Сам сценарий является контейнерным, поскольку он становится модулем DAG №1, и мы ожидаем, что набор данных будет часто обновляться, поэтому мы хотим автоматизировать это.

Создайте каталог и скопируйте сюда наши скрипты.

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

набор данных-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Докерфайл

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

требования.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

Теперь мы создаем образ контейнера для загрузки набора данных и помещаем его в реестр артефактов.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. Контейнер кода приложения №2 — подготовка данных

На этапе подготовки данных мы достигаем следующих результатов:

  1. Укажите, какую часть набора данных мы хотим использовать для точной настройки нашей базовой модели.
  2. Загружает набор данных, т. е. считывает файл CSV в фрейм данных Pandas, который представляет собой двумерную структуру данных для строк и столбцов.
  3. Преобразование/предварительная обработка данных. Определите, какие части набора данных не имеют значения, указав, что мы хотим сохранить, что по сути означает удаление остального.
  4. Применяет функцию transform к каждой строке DataFrame.
  5. Сохраните подготовленные данные обратно в корзину GCS.

Создайте каталог и скопируйте сюда наши скрипты.

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

данные-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Докерфайл

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

требования.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. Контейнер кода приложения №3 — точная настройка

Здесь мы используем Gemma-2-9b-it в качестве базовой модели, а затем настраиваем ее с помощью нашего нового набора данных.

Это последовательность шагов, которые происходят на этапе точной настройки.

1. Настройка. Импортируйте библиотеки, определите параметры (для модели, данных и обучения) и загрузите набор данных из Google Cloud Storage.

2. Загрузить модель. Загрузите предварительно обученную языковую модель с квантованием для повышения эффективности и соответствующий токенизатор.

3. Настройте LoRA. Настройте адаптацию низкого ранга (LoRA) для эффективной настройки модели путем добавления небольших обучаемых матриц.

4. Обучение. Определите параметры обучения и используйте SFTTrainer для точной настройки модели на загруженном наборе данных с использованием типа квантования FP16 .

5. Сохранить и загрузить. Сохраните настроенную модель и токенизатор локально, а затем загрузите их в нашу корзину GCS.

Затем мы создаем образ контейнера с помощью Cloud Build и сохраняем его в реестре артефактов.

Создайте каталог и скопируйте сюда наши скрипты.

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

Finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Докерфайл

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

требования.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

Теперь мы создаем образы контейнеров для тонкой настройки и помещаем их в реестр артефактов.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Обзор Airflow 2, включая что такое DAG

Airflow — это платформа для организации рабочих процессов и конвейеров данных. Он использует DAG (направленные ациклические графики) для определения этих рабочих процессов в коде Python, визуально представляя задачи и их зависимости.

Airflow со своими статическими группами DAG и определениями на основе Python хорошо подходит для планирования и управления предопределенными рабочими процессами. Его архитектура включает удобный пользовательский интерфейс для мониторинга и управления этими рабочими процессами.

По сути, Airflow позволяет вам определять, планировать и контролировать конвейеры данных с помощью Python, что делает его гибким и мощным инструментом для оркестрации рабочих процессов.

13. Обзор нашей группы обеспечения доступности баз данных

ec49964ad7d61491.png

DAG означает «Направленный ациклический граф», в Airflow DAG сам по себе представляет весь рабочий процесс или конвейер. Он определяет задачи, их зависимости и порядок выполнения.

Единицы рабочего процесса в группе обеспечения доступности баз данных выполняются из модуля в кластере GKE, инициированного из конфигурации Airflow.

Краткое содержание:

Airflow: загрузка данных . Этот скрипт автоматизирует процесс получения набора данных обзора фильмов от Kaggle и сохранения его в корзине GCS, что делает его доступным для дальнейшей обработки или анализа в вашей облачной среде.

Воздушный поток: подготовка данных . Код берет необработанный набор данных обзора фильмов, удаляет посторонние столбцы данных, не необходимые для нашего варианта использования, и удаляет наборы данных с пропущенными значениями. Затем он структурирует набор данных в формате вопросов и ответов, подходящем для машинного обучения, и сохраняет его обратно в GCS для дальнейшего использования.

Airflow: точная настройка модели . Этот код выполняет точную настройку большой языковой модели (LLM) с использованием метода LoRA (адаптация низкого ранга), а затем сохраняет обновленную модель. Все начинается с загрузки предварительно обученного LLM и набора данных из Google Cloud Storage. Затем он применяет LoRA для эффективной точной настройки модели в этом наборе данных. Наконец, он сохраняет точно настроенную модель обратно в облачное хранилище Google для последующего использования в таких приложениях, как генерация текста или ответы на вопросы.

Воздушный поток: обслуживание модели — предоставление точно настроенной модели в GKE с помощью vllm для вывода.

Воздушный поток: цикл обратной связи — переобучение модели каждые xx раз (ежечасно, ежедневно, еженедельно).

Эта диаграмма объясняет, как работает Airflow 2 при запуске на GKE.

8691f41166209a5d.png

14. Точная настройка модели вместо использования RAG

Эта CodeLab настраивает LLM, а не использует поисковую дополненную генерацию (RAG).

Давайте сравним эти два подхода:

Точная настройка: создание специализированной модели. Точная настройка адаптирует LLM к конкретной задаче или набору данных, позволяя ему работать независимо, не полагаясь на внешние источники данных.

Упрощает вывод: это устраняет необходимость в отдельной системе поиска и базе данных, что приводит к более быстрому и дешевому реагированию, особенно для частых случаев использования.

RAG: полагается на внешние знания: RAG извлекает соответствующую информацию из базы знаний для каждого запроса, обеспечивая доступ к актуальным и конкретным данным.

Увеличение сложности: реализация RAG в производственной среде, такой как кластер Kubernetes, часто включает в себя несколько микросервисов для обработки и извлечения данных, что потенциально увеличивает задержку и вычислительные затраты.

Почему была выбрана тонкая настройка:

Хотя RAG подходит для небольшого набора данных, используемого в этой CodeLab, мы выбрали тонкую настройку, чтобы продемонстрировать типичный вариант использования Airflow. Такой выбор позволяет нам сосредоточиться на аспектах оркестрации рабочих процессов, а не углубляться в нюансы настройки дополнительной инфраструктуры и микросервисов для RAG.

Заключение:

И точная настройка, и RAG — ценные методы, имеющие свои сильные и слабые стороны. Оптимальный выбор зависит от конкретных требований вашего проекта, таких как размер и сложность ваших данных, требования к производительности и соображения стоимости.

15. Задача DAG № 1. Сделайте свой первый шаг в Airflow: загрузка данных.

В качестве обзора этого модуля DAG наш код Python, размещенный в образе контейнера, загружает последний набор данных RottenTomatoes из Kaggle.

Не копируйте этот код в корзину GCS. На последнем шаге мы копируем файл mlops-dag.py , который содержит все шаги модуля DAG в одном скрипте Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. Задача DAG № 2. Создайте второй этап Airflow: подготовка данных.

В качестве обзора этого модуля DAG мы загружаем файл CSV (rotten_tomatoes_movie_reviews.csv) из GCS в DataFrame Pandas.

Затем мы ограничиваем количество обрабатываемых строк с помощью DATASET_LIMIT для тестирования и повышения эффективности использования ресурсов и, наконец, преобразуем преобразованные данные в набор данных Hugging Face.

Если вы посмотрите внимательно, вы увидите, что мы обучаем 1000 строк в модели с «DATASET_LIMIT»: «1000», потому что на графическом процессоре Nvidia L4 это занимает 20 минут.

Не копируйте этот код в корзину GCS. На последнем шаге мы копируем mlops-dag.py , который содержит все шаги одного скрипта Python.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. Задача DAG № 3. Создайте третий этап Airflow: точная настройка модели.

В качестве обзора этого модуля DAG мы запускаем Finetune.py, чтобы уточнить модель Gemma с помощью нашего нового набора данных.

Не копируйте этот код в корзину GCS. На последнем шаге мы копируем mlops-dag.py , который содержит все шаги одного скрипта Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. Задача DAG № 4. Сделайте последний шаг в Airflow: вывод/обслуживание модели.

vLLM — это мощная библиотека с открытым исходным кодом, специально разработанная для высокопроизводительного вывода LLM. При развертывании на Google Kubernetes Engine (GKE) он использует масштабируемость и эффективность Kubernetes для эффективного обслуживания LLM.

Краткое описание шагов:

  • Загрузите группу обеспечения доступности баз данных «mlops-dag.py» в корзину GCS.
  • Скопируйте два файла конфигурации Kubernetes YAML для настройки вывода в корзину GCS.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Загрузите свой скрипт Python (файл DAG), а также манифесты Kubernetes в корзину DAGS GCS.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

В пользовательском интерфейсе Airflow вы увидите mlops-dag.

  1. Выберите снять паузу.
  2. Выберите Trigger DAG, чтобы выполнить цикл MLOps вручную.

d537281b92d5e8bb.png

После завершения работы вашей группы обеспечения доступности баз данных вы увидите такой вывод в пользовательском интерфейсе Airflow.

3ed42abf8987384e.png

После последнего шага вы можете получить конечную точку модели и отправить запрос на тестирование модели.

Подождите примерно 5 минут, прежде чем вводить команду Curl, чтобы можно было начать вывод модели и балансировщик нагрузки мог назначить внешний IP-адрес.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Выход:

19. Поздравляем!

Вы создали свой первый рабочий процесс ИИ, используя конвейер DAG с Airflow 2 на GKE.

Не забудьте отменить подготовку развернутых ресурсов.

20. Делаем это в производстве

Несмотря на то, что CodeLab предоставил вам фантастическую информацию о том, как настроить Airflow 2 на GKE, в реальном мире вам захочется рассмотреть некоторые из следующих тем, делая это в рабочей среде.

Реализуйте веб-интерфейс с помощью Gradio или аналогичного инструмента.

Либо настройте автоматический мониторинг приложений для рабочих нагрузок с помощью GKE здесь , либо экспортируйте метрики из Airflow здесь .

Для более быстрой точной настройки модели вам могут потребоваться более крупные графические процессоры, особенно если у вас большие наборы данных. Однако, если мы хотим обучить модель на нескольких графических процессорах, нам придется разделить набор данных и сегментировать обучение. Вот объяснение FSDP с PyTorch (полное параллельное разделение данных с использованием совместного использования графического процессора для достижения этой цели. Дополнительную информацию можно найти здесь, в сообщении в блоге Meta , а также в этом руководстве по FSDP с использованием Pytorch .

Google Cloud Composer — это управляемая служба Airflow, поэтому вам не нужно поддерживать сам Airflow, просто разверните DAG и все готово.

Узнать больше

Лицензия

Эта работа распространяется под лицензией Creative Commons Attribution 2.0 Generic License.