GKE で Airflow 2 を使用して MLOps ワークフローを構築する

1. 概要

852dc8844309ffb8.png

この CodeLab では、抽象化を最小限に抑えた Airflow DAG を使用して、データセットのダウンロード、モデルの改良、Google Kubernetes Engine(GKE)への LLM のデプロイを行い、DevOps プラクティスを MLOps に統合する方法について説明します。そのため、Terraform ではなく gcloud コマンドを使用しています。これにより、ラボの手順に沿って、プラットフォーム エンジニアと ML エンジニアの両方の視点から各プロセスを簡単に理解できます。

このハンズオン ガイドでは、Airflow を活用して AI ワークフローを効率化する方法について説明します。DAG を構成して、MLOps ライフサイクル全体を明確かつ実践的にデモします。

学習内容

  • 知識のサイロを解消し、ワークフローを改善することで、プラットフォーム エンジニアと ML エンジニア間のコラボレーションと相互理解を促進する
  • GKE で Airflow 2 をデプロイ、使用、管理する方法について説明します。
  • Airflow DAG をエンドツーエンドで構成する
  • GKE を使用して本番環境グレードの ML システムの基盤を構築する
  • 機械学習システムを計測して運用する
  • プラットフォーム エンジニアリングが MLOps の重要なサポート基盤となった仕組みを理解する

この Codelab で達成できること

  • 映画に関する質問は、GKE で vLLM を使用して提供される Gemma-2-9b-it に基づいてファインチューニングされた LLM にすることができます。

対象者

  • ML エンジニア
  • プラットフォーム エンジニア
  • データ サイエンティスト
  • データ エンジニア
  • DevOps エンジニア
  • プラットフォーム アーキテクト
  • カスタマー エンジニア

この Codelab は、

  • GKE または AI/ML ワークフローの概要として
  • Airflow の機能セット全体の概要

2. プラットフォーム エンジニアリングは ML エンジニア/サイエンティストを支援します。

16635a8284b994c.png

プラットフォーム エンジニアリングと MLOps は相互に依存する分野であり、ML の開発とデプロイのための堅牢で効率的な環境を構築するために連携します。

範囲: プラットフォーム エンジニアリングは MLOps よりも範囲が広く、ソフトウェア開発ライフサイクル全体を網羅し、そのためのツールとインフラストラクチャを提供します。

MLOps は、ML の開発、デプロイ、推論のギャップを埋めます。

専門知識: プラットフォーム エンジニアは通常、クラウド コンピューティング、コンテナ化、データ管理などのインフラストラクチャ テクノロジーに精通しています。

MLOps エンジニアは ML モデルの開発、デプロイ、モニタリングを専門とし、多くの場合、データ サイエンスとソフトウェア エンジニアリングのスキルを備えています。

ツール: プラットフォーム エンジニアは、インフラストラクチャのプロビジョニング、構成管理、コンテナ オーケストレーション、アプリケーション スキャフォールディング用のツールを作成します。MLOps エンジニアは、ML モデルのトレーニング、テスト、デプロイ、モニタリング、バージョニング用のツールを使用します。

3. Google Cloud の設定と要件

セルフペース型の環境設定

  1. Google Cloud Console にログインして、プロジェクトを新規作成するか、既存のプロジェクトを再利用します。Gmail アカウントも Google Workspace アカウントもまだお持ちでない場合は、アカウントを作成してください。

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • プロジェクト名は、このプロジェクトの参加者に表示される名称です。Google API では使用されない文字列です。いつでも更新できます。
  • プロジェクト ID は、すべての Google Cloud プロジェクトにおいて一意でなければならず、不変です(設定後は変更できません)。Cloud コンソールでは一意の文字列が自動生成されます。通常は、この内容を意識する必要はありません。ほとんどの Codelab では、プロジェクト ID(通常は PROJECT_ID と識別されます)を参照する必要があります。生成された ID が好みではない場合は、ランダムに別の ID を生成できます。または、ご自身で試して、利用可能かどうかを確認することもできます。このステップ以降は変更できず、プロジェクトを通して同じ ID になります。
  • なお、3 つ目の値として、一部の API が使用するプロジェクト番号があります。これら 3 つの値について詳しくは、こちらのドキュメントをご覧ください。
  1. 次に、Cloud のリソースや API を使用するために、Cloud コンソールで課金を有効にする必要があります。この Codelab の操作をすべて行って、費用が生じたとしても、少額です。このチュートリアルの終了後に請求が発生しないようにリソースをシャットダウンするには、作成したリソースを削除するか、プロジェクトを削除します。Google Cloud の新規ユーザーは、300 米ドル分の無料トライアル プログラムをご利用いただけます。

Cloud Shell の起動

Google Cloud はノートパソコンからリモートで操作できますが、この Codelab では、Cloud Shell(Cloud 上で動作するコマンドライン環境)を使用します。

Cloud Shell をアクティブにする

  1. Cloud Console で、[Cloud Shell をアクティブにする] 853e55310c205094.png をクリックします。

3c1dabeca90e44e5.png

Cloud Shell を初めて起動する場合、その内容を説明する中間画面が表示されます。中間画面が表示された場合は、[続行] をクリックします。

9c92662c6a846a5c.png

Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。

9f0e51b578fecce5.png

この仮想マシンには、必要な開発ツールがすべて用意されています。仮想マシンは Google Cloud で稼働し、永続的なホーム ディレクトリが 5 GB 用意されているため、ネットワークのパフォーマンスと認証が大幅に向上しています。この Codelab での作業のほとんどは、ブラウザから実行できます。

Cloud Shell に接続すると、認証が完了し、プロジェクトが各自のプロジェクト ID に設定されていることがわかります。

  1. Cloud Shell で次のコマンドを実行して、認証されたことを確認します。
gcloud auth list

コマンド出力

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。
gcloud config list project

コマンド出力

[core]
project = <PROJECT_ID>

上記のようになっていない場合は、次のコマンドで設定できます。

gcloud config set project <PROJECT_ID>

コマンド出力

Updated property [core/project].

4. ステップ 1 - Kaggle に登録して認証する

CodeLab を開始するには、Kaggle でアカウントを作成する必要があります。Kaggle は、Google が所有するデータ サイエンティストと ML 愛好家向けのオンライン コミュニティ プラットフォームで、さまざまなドメインの一般公開データセットを大量にホストしています。このサイトから、モデルのトレーニングに使用する RottenTomatoes データセットをダウンロードします。

  • Kaggle に登録します。Google SSO を使用してログインできます。
  • 利用規約に同意する
  • [設定] に移動してユーザー名 username を取得します。
  • [API] セクションで、[Kaggle から新しいトークンを作成] を選択し、kaggle.json をダウンロードします。
  • 問題が発生した場合は、こちらのサポートページをご覧ください。

5. ステップ 2 - HuggingFace で登録して認証する

HuggingFace は、誰もが機械学習技術に触れることができる中心的な場所です。90 万個のモデル、20 万個のデータセット、30 万個のデモアプリ(Spaces)がホストされており、すべてオープンソースで一般公開されています。

  • HuggingFace に登録する - ユーザー名でアカウントを作成します。Google SSO は使用できません。
  • メールアドレスの確認
  • こちらにアクセスして、Gemma-2-9b-it モデルのライセンスを承認します。
  • HuggingFace トークンをこちらで作成します。
  • トークンの認証情報を記録します。これは後で必要になります。

6. ステップ 3 - 必要な Google Cloud インフラストラクチャ リソースを作成する

GKE、GCE、Artifact Registry を設定し、Workload Identity 連携を使用して IAM ロールを適用します。

AI ワークフローは、トレーニング用と推論用の 2 つのノードプールを使用します。トレーニング ノードプールは、1 つの Nvidia L4 Tensor Core GPU を搭載した g2-standard-8 GCE VM を使用しています。推論ノードプールは、2 つの Nvidia L4 Tensor Core GPU を搭載した g2-standard-24 VM を使用しています。リージョンを指定するときに、必要な GPU がサポートされているリージョンを選択します(リンク)。

Cloud Shell で次のコマンドを実行します。

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

YAML マニフェストを作成する

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

3 つの Google Cloud Storage(GCS)バケットを作成する

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. ステップ 4 - Helm チャートを使用して GKE に Airflow をインストールする

次に、Helm を使用して Airflow 2 をデプロイします。Apache Airflow は、データ エンジニアリング パイプライン用のオープンソースワークフロー管理プラットフォームです。Airflow 2 の機能セットについては、後ほど説明します。

Airflow Helm チャートの values.yaml

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Airflow 2 をデプロイする

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. ステップ 5 - 接続と変数を使用して Airflow を初期化する

Airflow 2 がデプロイされたら、構成を開始できます。Python スクリプトによって読み取られる変数を定義します。

  1. ブラウザでポート 8080 の Airflow UI にアクセスする

外部 IP を取得する

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

ウェブブラウザを開き、http://<EXTERNAL-IP>:8080 に移動します。ログイン名は admin / admin です。

  1. Airflow UI 内でデフォルトの GCP 接続を作成します。[管理] → [接続] → [+ 新しいレコードを追加] に移動します。
  • 接続 ID: google_cloud_default
  • 接続タイプ: Google Cloud

[保存] をクリックします。

  1. 必要な変数を作成します。[管理] > [変数] > [+ 新しいレコードを追加] に移動します。
  • キー: BUCKET_DATA_NAME - 値: echo $BUCKET_DATA_NAME からコピー
  • キー: GCP_PROJECT_ID - 値: echo $DEVSHELL_PROJECT_ID からコピー
  • キー: HF_TOKEN - 値: HF トークンを挿入します
  • Key: KAGGLE_USERNAME - Value: kaggle のユーザー名を挿入します。
  • キー: KAGGLE_KEY - 値: kaggle.json からコピーします。

各 Key-Value ペアの後に [保存] をクリックします。

UI は次のようになります。

771121470131b5ec.png

9. アプリケーション コード コンテナ 1 - データのダウンロード

この Python スクリプトでは、Kaggle で認証してデータセットを GCS バケットにダウンロードします。

スクリプト自体はコンテナ化されています。これは、DAG ユニット 1 になるためです。また、データセットは頻繁に更新されるため、自動化する必要があります。

ディレクトリを作成し、スクリプトをここにコピーします。

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

次に、dataset-download のコンテナ イメージを作成し、Artifact Registry に push します。

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. アプリケーション コード コンテナ #2 - データの準備

データ準備ステップでは、次のことが実現されます。

  1. ベースモデルのファインチューニングに使用するデータセットの量を指定します。
  2. データセットを読み込みます。つまり、CSV ファイルを行と列の 2 次元データ構造である Pandas データフレームに読み込みます。
  3. データ変換 / 前処理 - 保持するデータを指定して、データセットのどの部分が関連性がないかを判断します。残りの部分は削除されます。
  4. DataFrame の各行に transform 関数を適用します。
  5. 準備したデータを GCS バケットに保存する

ディレクトリを作成し、スクリプトをここにコピーします。

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. アプリケーション コード コンテナ #3 - ファインチューニング

ここでは、Gemma-2-9b-it をベースモデルとして使用し、新しいデータセットでファインチューニングします。

以下は、ファインチューニング ステップで実行される一連のステップです。

1. 設定: ライブラリをインポートし、パラメータ(モデル、データ、トレーニング用)を定義して、Google Cloud Storage からデータセットを読み込みます。

2. モデルを読み込む: 効率性を高めるために量子化された事前トレーニング済み言語モデルを読み込み、対応するトークン化ツールを読み込みます。

3. LoRA を構成する: 小さなトレーニング可能な行列を追加してモデルを効率的にファインチューニングするように、Low-Rank Adaptation(LoRA)を設定します。

4. トレーニング: トレーニング パラメータを定義し、SFTTrainer を使用して、FP16 量子化タイプを使用して読み込まれたデータセットでモデルをファインチューニングします。

5. 保存とアップロード: ファインチューニングしたモデルとトークナイザーをローカルに保存してから、GCS バケットにアップロードします。

次に、Cloud Build を使用してコンテナ イメージを作成し、Artifact Registry に保存します。

ディレクトリを作成し、スクリプトをここにコピーします。

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

ファインチューニング用のコンテナ イメージを作成して、Artifact Registry に push します。

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Airflow 2 の概要(DAG の概要を含む)

Airflow は、ワークフローとデータ パイプラインをオーケストレートするためのプラットフォームです。DAG(有向非巡回グラフ)を使用して、これらのワークフローを Python コードで定義し、タスクとその依存関係を視覚的に表します。

静的 DAG と Python ベースの定義を備えた Airflow は、事前定義されたワークフローのスケジューリングと管理に適しています。このアーキテクチャには、これらのワークフローをモニタリングして管理するためのユーザー フレンドリーな UI が含まれています。

基本的に、Airflow では Python を使用してデータ パイプラインを定義、スケジュール設定、モニタリングできるため、ワークフローのオーケストレーションに柔軟で強力なツールとなります。

13. DAG の概要

ec49964ad7d61491.png

DAG は有向非巡回グラフの略です。Airflow では、DAG 自体がワークフローまたはパイプライン全体を表します。タスク、その依存関係、実行順序を定義します。

DAG 内のワークフローのユニットは、Airflow 構成から開始され、GKE クラスタの Pod から実行されます。

概要

Airflow: データのダウンロード - このスクリプトは、Kaggle から映画レビュー データセットを取得して GCS バケットに保存するプロセスを自動化します。これにより、クラウド環境でさらに処理や分析を行うことができます。

Airflow: データ準備 - このコードは、映画レビューの元のデータセットを取り、ユースケースに不要な余分なデータ列を削除し、欠損値を含むデータセットを削除します。次に、データセットを機械学習に適した質問応答形式に構造化し、後で使用できるように GCS に保存します。

Airflow: モデルのファインチューニング - このコードは、LoRA(Low-Rank Adaptation)と呼ばれる手法を使用して大規模言語モデル(LLM)をファインチューニングし、更新されたモデルを保存します。まず、Google Cloud Storage から事前トレーニング済みの LLM とデータセットを読み込みます。次に、LoRA を適用して、このデータセットでモデルを効率的にファインチューニングします。最後に、ファインチューニングされたモデルを Google Cloud Storage に保存し、テキスト生成や質問応答などのアプリケーションで後で使用できるようにします。

Airflow: モデル サービング - 推論用に vllm を使用して GKE でファインチューニング済みモデルをサービングします。

Airflow: フィードバック ループ - xx 回(時間単位、日単位、週単位)ごとにモデルを再トレーニングします。

この図は、GKE で実行された場合の Airflow 2 の動作を示しています。

8691f41166209a5d.png

14. モデルのファインチューニングと RAG の使用の比較

この CodeLab では、検索拡張生成(RAG)を使用するのではなく、LLM をファインチューニングします。

次の 2 つのアプローチを比較してみましょう。

ファインチューニング: 特殊なモデルを作成します。ファインチューニングでは、LLM を特定のタスクまたはデータセットに適応させ、外部データソースに依存せずに独立して動作できるようにします。

推論を簡素化: 個別の取得システムとデータベースが不要になるため、特に頻繁なユースケースで、より迅速で低コストなレスポンスが得られます。

RAG: 外部知識に依存: RAG はリクエストごとにナレッジベースから関連情報を取得し、最新の特定のデータにアクセスできるようにします。

複雑さが増す: Kubernetes クラスタなどの本番環境に RAG を実装する場合、多くの場合、データ処理と取得に複数のマイクロサービスが関与するため、レイテンシと計算コストが増加する可能性があります。

微調整が選択された理由:

RAG は、この Codelab で使用される小規模なデータセットには適していますが、Airflow の一般的なユースケースを示すために、ファインチューニングを選択しました。これにより、RAG に追加のインフラストラクチャとマイクロサービスを設定する際の細かい点に悩まずに、ワークフローのオーケストレーションに集中できます。

まとめ:

ファインチューニングと RAG はどちらも有用な手法であり、それぞれに長所と短所があります。最適な選択は、データのサイズと複雑さ、パフォーマンスのニーズ、コストに関する考慮事項など、プロジェクトの特定の要件によって異なります。

15. DAG タスク 1 - Airflow で最初のステップを作成する: データのダウンロード

この DAG ユニットの概要として、コンテナ イメージでホストされている Python コードが、Kaggle から最新の RottenTomatoes データセットをダウンロードします。

このコードを GCS バケットにコピーしないでください。最後のステップとして、1 つの Python スクリプト内にすべての DAG ユニット ステップを含む mlops-dag.py をコピーします。

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. DAG タスク 2 - Airflow で 2 番目のステップを作成する: データ準備

この DAG ユニットの概要として、CSV ファイル(rotten_tomatoes_movie_reviews.csv)を GCS から Pandas DataFrame に読み込みます。

次に、テストとリソース効率化のために DATASET_LIMIT を使用して処理される行数を制限し、最後に変換されたデータを Hugging Face Dataset に変換します。

よく見ると、「DATASET_LIMIT": "1000" でモデル内の 1,000 行をトレーニングしていることがわかります。これは、Nvidia L4 GPU でトレーニングに 20 分かかるためです。

このコードを GCS バケットにコピーしないでください。最後のステップで mlops-dag.py をコピーします。このファイルには、1 つの Python スクリプト内のすべてのステップが含まれています。

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. DAG タスク 3 - Airflow で 3 つ目のステップを作成する: モデルのファインチューニング

この DAG ユニットの概要として、ここでは finetune.py を実行して、新しいデータセットで Gemma モデルを改良します。

このコードを GCS バケットにコピーしないでください。最後のステップで mlops-dag.py をコピーします。このファイルには、1 つの Python スクリプト内のすべてのステップが含まれています。

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. DAG タスク 4 - Airflow の最後のステップを作成する: 推論 / モデルの提供

vLLM は、LLM の高パフォーマンス推論用に特別に設計された強力なオープンソース ライブラリです。Google Kubernetes Engine(GKE)にデプロイすると、Kubernetes のスケーラビリティと効率性を活用して LLM を効果的に提供できます。

手順の概要:

  • DAG「mlops-dag.py」を GCS バケットにアップロードします。
  • 推論を設定するための 2 つの Kubernetes YAML 構成ファイルを GCS バケットにコピーします。

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Python スクリプト(DAG ファイル)と Kubernetes マニフェストを DAGS GCS バケットにアップロードします。

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

Airflow UI に mlops-dag が表示されます。

  1. 一時停止を解除します。
  2. 手動 MLOps サイクルを実行するには、[Trigger DAG] を選択します。

d537281b92d5e8bb.png

DAG が完了すると、Airflow UI に次のような出力が表示されます。

3ed42abf8987384e.png

最後のステップの後、モデル エンドポイントを取得してプロンプトを送信し、モデルをテストできます。

モデルの推論を開始してロードバランサが外部 IP アドレスを割り当てることができるように、curl コマンドを実行する前に 5 分ほど待ちます。

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

出力:

19. 完了

GKE で Airflow 2 を使用して DAG パイプラインを使用して最初の AI ワークフローを作成しました。

デプロイしたリソースのプロビジョニングを解除してください。

20. 本番環境でこれを行う

Codelab では、GKE で Airflow 2 を設定する方法について詳しく説明しましたが、本番環境でこの作業を行う場合は、次のトピックのいくつかを検討する必要があります。

Gradio などのツールを使用してウェブ フロントエンドを実装します。

こちらで GKE を使用してワークロードの自動アプリケーション モニタリングを構成するか、こちらで Airflow から指標をエクスポートします。

特にデータセットが大きい場合は、モデルをより高速にファインチューニングするために、より大きな GPU が必要になる場合があります。ただし、複数の GPU でモデルをトレーニングする場合は、データセットを分割してトレーニングをシャーディングする必要があります。PyTorch を使用した FSDP(完全にシャーディングされたデータ並列処理。GPU 共有を使用してこの目的を達成します。詳しくは、Meta のブログ投稿と、Pytorch を使用した FSDP に関するチュートリアルをご覧ください。

Google Cloud Composer はマネージド Airflow サービスであるため、Airflow 自体を維持する必要はありません。DAG をデプロイするだけで使用できます。

詳細

ライセンス

この作業はクリエイティブ・コモンズの表示 2.0 汎用ライセンスにより使用許諾されています。