1. 總覽
本 CodeLab 示範如何使用 Airflow DAG (最少抽象化) 下載資料集、精進模型,並在 Google Kubernetes Engine (GKE) 上部署 LLM,藉此將 DevOps 做法整合至機器學習 (MLOps)。因此,我們會使用 gcloud 指令,而非 terraform,這樣您就能逐步完成實驗室課程,並從平台工程師和機器學習工程師的角度,輕鬆瞭解每個程序。
這份實作指南將逐步說明如何運用 Airflow 簡化 AI 工作流程,並透過設定 DAG 清楚且實際地演示整個 MLOps 生命週期。
課程內容
- 打破知識孤島並改善工作流程,促進平台工程師和機器學習工程師之間的合作與理解
- 瞭解如何在 GKE 上部署、使用及管理 Airflow 2
- 從頭到尾設定 Airflow DAG
- 使用 GKE 建構實際工作環境等級機器學習系統的基礎
- 檢測並運用機器學習系統
- 瞭解平台工程如何成為 MLOps 的重要支援支柱
本程式碼研究室的目標
- 你可以向我們根據 Gemma-2-9b-it 微調的 LLM 提問電影相關問題,這項服務會透過 GKE 搭配 vLLM 提供。
目標對象
- 機器學習工程師
- 平台工程師
- 數據資料學家
- 資料工程師
- 開發運作工程師
- Platform Architect
- 客戶工程師
本 CodeLab 不適用於
- 介紹 GKE 或 AI/機器學習工作流程
- 完整 Airflow 功能集的簡介
2. 平台工程師協助機器學習工程師/科學家
平台工程和 MLOps 是相互依存的領域,兩者合作可為機器學習開發和部署作業打造可靠且高效的環境。
範圍:平台工程的範圍比機器學習運作更廣泛,涵蓋整個軟體開發生命週期,並提供相關工具和基礎架構。
MLOps 可填補機器學習開發、部署和推論之間的鴻溝。
專業知識:平台工程師通常在雲端運算、容器化和資料管理等基礎架構技術方面具備深厚專業知識。
機器學習運作工程師專精於機器學習模型開發、部署和監控,通常具備數據科學和軟體工程技能。
工具:平台工程師會建立基礎架構佈建、設定管理、容器協調和應用程式架構的工具。機器學習運作工程師會使用工具來訓練機器學習模型、進行實驗、部署、監控及版本管理。
3. Google Cloud 設定和需求
自助式環境設定
- 登入 Google Cloud 控制台,然後建立新專案或重複使用現有專案。如果您還沒有 Gmail 或 Google Workspace 帳戶,請務必建立帳戶。
- 「Project name」是這個專案參與者的顯示名稱。這是 Google API 不會使用的字元字串。您隨時可以更新。
- 專案 ID 在所有 Google Cloud 專案中都是不重複的值,且無法變更 (設定後即無法變更)。Cloud 控制台會自動產生專屬字串,您通常不需要理會這個字串。在大多數程式碼研究室中,您都需要參照專案 ID (通常會以
PROJECT_ID
表示)。如果您不喜歡系統產生的 ID,可以產生另一個隨機 ID。或者,您也可以自行嘗試,看看是否可用。在這個步驟完成後就無法變更,且會在整個專案期間維持不變。 - 提醒您,有些 API 會使用第三個值「專案編號」。如要進一步瞭解這三個值,請參閱說明文件。
- 接下來,您需要在 Cloud 控制台中啟用帳單功能,才能使用 Cloud 資源/API。執行這個程式碼研究室不會產生太多費用,甚至可能完全不會產生費用。如要關閉資源,避免在本教學課程結束後繼續產生費用,您可以刪除建立的資源或專案。Google Cloud 新使用者可享有價值 $300 美元的免費試用期。
啟動 Cloud Shell
雖然 Google Cloud 可透過筆記型電腦遠端操作,但在本程式碼研究室中,您將使用 Cloud Shell,這是在雲端運作的指令列環境。
啟動 Cloud Shell
- 在 Cloud 控制台中,按一下「啟用 Cloud Shell」 圖示
。
如果這是您首次啟動 Cloud Shell,系統會顯示中介畫面,說明 Cloud Shell 的功能。如果您看到中介畫面,請按一下「繼續」。
佈建並連線至 Cloud Shell 的作業只需幾分鐘的時間。
這個虛擬機器會載入所有必要的開發工具。提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作,大幅提升網路效能和驗證功能。您可以在瀏覽器中完成本程式碼研究室的大部分工作,甚至是全部工作。
連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的專案 ID。
- 在 Cloud Shell 中執行下列指令,確認您已通過驗證:
gcloud auth list
指令輸出
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
- 在 Cloud Shell 中執行下列指令,確認 gcloud 指令知道您的專案:
gcloud config list project
指令輸出
[core] project = <PROJECT_ID>
如未設定,請輸入下列指令設定專案:
gcloud config set project <PROJECT_ID>
指令輸出
Updated property [core/project].
4. 步驟 1 - 在 Kaggle 上註冊及驗證
如要開始使用本程式講義,您必須在 Kaggle 上建立帳戶。Kaggle 是 Google 旗下的線上社群平台,專為資料科學家和機器學習愛好者打造,並提供各個領域的公開資料集。您將從這個網站下載 RottenTomatoes 資料集,用於訓練模型。
5. 步驟 2:在 HuggingFace 上註冊及驗證
HuggingFace 是任何人都能接觸機器學習技術的集中地。其中代管 90 萬個模型、20 萬個資料集和 30 萬個示範應用程式 (Spaces),全部都是開放原始碼且公開提供。
- 註冊 HuggingFace - 使用使用者名稱建立帳戶,您無法使用 Google 單一登入 (SSO)
- 確認您的電子郵件地址
- 前往這個頁面,接受 Gemma-2-9b-it 模型的授權
- 請按這裡建立 HuggingFace 權杖
- 記下權杖憑證,後續步驟將會用到
6. 步驟 3:建立必要的 Google Cloud 基礎架構資源
您將設定 GKE、GCE、Artifact Registry,並使用工作負載身分聯盟套用 IAM 角色。
AI 工作流程會使用兩個節點集區,一個用於訓練,另一個用於推論。訓練節點池使用 g2-standard-8 GCE VM,其中配備一個 Nvidia L4 Tensor 核心 GPU。推論節點池使用配備兩個 Nvidia L4 Tensor Core GPU 的 g2-standard-24 VM。指定地區時,請選擇支援所需 GPU 的地區 ( 連結)。
在 Cloud Shell 中執行下列指令:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
建立 YAML 資訊清單
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
建立 3 個 Google Cloud Storage (GCS) 值區
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. 步驟 4:透過 Helm 資訊套件在 GKE 上安裝 Airflow
接下來,我們將使用 Helm 部署 Airflow 2。Apache Airflow 是資料工程管道專用的開放原始碼 工作流程管理平台。我們稍後會介紹 Airflow 2 的功能。
Airflow Helm 圖表的 values.yaml
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
部署 Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. 步驟 5:使用連線和變數初始化 Airflow
部署 Airflow 2 後,我們就可以開始設定。我們定義了一些變數,並由 Python 指令碼讀取。
- 使用瀏覽器存取通訊埠 8080 上的 Airflow UI
取得外部 IP
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
開啟網路瀏覽器,然後前往 http://<EXTERNAL-IP>:8080。登入名稱為 admin / admin
- 在 Airflow UI 中建立預設 GCP 連線,方法是依序前往「管理」→「連線」→「+ 新增記錄」
- 連線 ID:google_cloud_default
- 連線類型:Google Cloud
按一下「儲存」。
- 建立所需變數,方法是前往「管理」→「變數」→「+ 新增記錄」
- 鍵:BUCKET_DATA_NAME - 值:從 echo $BUCKET_DATA_NAME 複製
- 鍵:GCP_PROJECT_ID - 值:從 echo $DEVSHELL_PROJECT_ID 複製
- 鍵:HF_TOKEN - 值:插入 HF 權杖
- 鍵:KAGGLE_USERNAME - 值:插入 Kaggle 使用者名稱
- 鍵:KAGGLE_KEY - 值:從 kaggle.json 複製
在每個鍵/值組合後點選「儲存」。
您的 UI 應如下所示:
9. 應用程式程式碼容器 #1 - 資料下載
在這個 Python 指令碼中,我們會透過 Kaggle 進行驗證,將資料集下載至 GCS 儲存桶。
由於這個指令碼會成為 DAG 單元 #1,且我們預期資料集會經常更新,因此我們想自動執行這個指令碼。
建立目錄並將指令碼複製到此
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
接下來,我們將為資料集下載作業建立容器映像檔,並推送至 Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. 應用程式程式碼容器 #2 - 資料準備作業
在資料準備步驟中,我們會完成以下工作:
- 指定要用於微調基礎模型的資料集量
- 載入資料集,也就是將 CSV 檔案讀取至 Pandas 資料框架,這是用於列和欄的 2D 資料結構
- 資料轉換 / 前置處理 - 指定要保留的資料,藉此判斷資料集的哪些部分不相關,並移除其餘部分
- 將
transform
函式套用至 DataFrame 的每個資料列 - 將準備好的資料儲存回 GCS 值區
建立目錄並將指令碼複製到此
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. 應用程式程式碼容器 #3 - 微調
我們會使用 Gemma-2-9b-it 做為基礎模型,然後使用新的資料集進行微調。
這是在精細調整步驟中發生的步驟序列。
1. 設定:匯入程式庫、定義參數 (模型、資料和訓練),並從 Google Cloud Storage 載入資料集。
2. 載入模型:載入經過量化處理的預先訓練語言模型,以提高效率,並載入對應的剖析器。
3. 設定 LoRA:設定低秩調整 (LoRA),藉由新增可訓練的小型矩陣,有效地微調模型。
4. 訓練:定義訓練參數,並使用 SFTTrainer
在已載入的資料集上,使用 FP16 量化類型微調模型。
5. 儲存及上傳:在本機儲存經過微調的模型和剖析器,然後上傳至 GCS 值區。
接著,我們會使用 Cloud Build 建立容器映像檔,並儲存至 Artifact Registry。
建立目錄並將指令碼複製到此
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
接下來,我們將建立容器映像檔進行微調,並將其推送至 Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. Airflow 2 總覽,包括什麼是 DAG
Airflow 是用來自動調度工作流程和資料管道的平台。它會使用 DAG (有向非循環圖) 在 Python 程式碼中定義這些工作流程,以視覺化方式呈現工作及其依附元件。
Airflow 提供靜態 DAG 和以 Python 為基礎的定義,非常適合用於排程及管理預先定義的工作流程。其架構包含方便使用者監控及管理這些工作流程的使用者介面。
基本上,Airflow 可讓您使用 Python 定義、排定時程及監控資料管道,是靈活且功能強大的工作流程自動化調度管理工具。
13. DAG 總覽
DAG 代表有向非循環圖,在 Airflow 中,DAG 本身代表整個工作流程或管道。定義工作、依附元件和執行順序。
DAG 中的工作流程單位會從 Airflow 設定啟動的 GKE 叢集 Pod 執行。
摘要:
Airflow:資料下載:這個指令碼會自動執行從 Kaggle 取得電影評論資料集並儲存在 GCS 儲存桶的程序,讓您在雲端環境中進一步處理或分析資料。
Airflow:資料準備 - 程式碼會擷取原始電影評論資料集,移除不符合用途的其他資料欄,並刪除缺少值的資料集。接著,系統會將資料集結構化為適合機器學習的問答格式,並將其存回 GCS,以供日後使用。
Airflow:模型微調 - 這個程式碼會使用 LoRA (低秩適應) 技術微調大型語言模型 (LLM),然後儲存更新後的模型。首先,它會從 Google Cloud Storage 載入預先訓練的 LLM 和資料集。接著,系統會套用 LoRA,針對這個資料集有效率地微調模型。最後,系統會將精修過的模型儲存回 Google Cloud Storage,以便日後用於文字產生或問答等應用程式。
Airflow:模型提供服務 - 使用 vllm 在 GKE 上提供經過微調的模型,以便進行推論。
Airflow:回饋迴路 - 每 xx 時間 (每小時、每天、每週) 重新訓練模型。
這張圖表說明在 GKE 上執行 Airflow 2 的運作方式。
14. 微調模型與使用 RAG
本程式碼研究室會微調 LLM,而非使用檢索增強生成 (RAG) 技術。
讓我們比較這兩種方法:
微調:建立專用模型:微調會將 LLM 調整為特定任務或資料集,讓模型能獨立運作,不必仰賴外部資料來源。
簡化推論:不必使用獨立的擷取系統和資料庫,因此回應速度更快且成本更低,特別適用於常見的用途。
RAG:依賴外部知識:RAG 會針對每個要求從知識庫中擷取相關資訊,確保能存取最新且具體的資料。
增加複雜度:在 Kubernetes 叢集等實際環境中實作 RAG 時,通常會涉及多個用於資料處理和擷取的微服務,可能會增加延遲和運算成本。
選擇微調的原因:
雖然 RAG 適合用於本程式碼研究室中使用的小型資料集,但我們選擇採用精細調整,以示範 Airflow 的典型用途。這個選擇可讓我們專注於工作流程調度方面,而非深入探討為 RAG 設定額外基礎架構和微服務的細微差異。
結論:
精細調整和 RAG 都是實用的技術,各有優缺點。最佳選擇取決於專案的具體需求,例如資料大小和複雜度、效能需求和成本考量。
15. DAG 工作 #1 - 在 Airflow 中建立第一個步驟:資料下載
這個 DAG 單元的概略說明如下:在容器映像檔中代管的 Python 程式碼會從 Kaggle 下載最新的 Rotten Tomatoes 資料集。
請勿將這段程式碼複製到 GCS 值區。我們會在最後一個步驟複製 mlops-dag.py,其中包含單一 Python 指令碼中的所有 DAG 單元步驟。
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. DAG 工作 #2 - 在 Airflow 中建立第二個步驟:資料準備
這個 DAG 單元的概略說明是,我們會將 GCS 中的 CSV 檔案 (rotten_tomatoes_movie_reviews.csv) 載入至 Pandas DataFrame。
接著,我們會使用 DATASET_LIMIT 限制要處理的資料列數量,以便進行測試及提高資源效率,最後將轉換後的資料轉換為 Hugging Face 資料集。
仔細觀察後,您會發現我們使用「DATASET_LIMIT」:"1000",在模型中訓練 1000 列,這是因為使用 Nvidia L4 GPU 訓練需要 20 分鐘。
請勿將這段程式碼複製到 GCS 值區。我們會在最後一個步驟複製 mlops-dag.py,其中包含一個 Python 指令碼中的所有步驟。
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. DAG 工作 #3 - 在 Airflow 中建立第三個步驟:模型微調
這個 DAG 單元的概略說明是,我們會執行 finetune.py,使用新的資料集改善 Gemma 模型。
請勿將這段程式碼複製到 GCS 值區。我們會在最後一個步驟複製 mlops-dag.py,其中包含一個 Python 指令碼中的所有步驟。
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. DAG 工作 #4:在 Airflow 中建立最後一個步驟:推論 / 提供模型
vLLM 是功能強大的開放原始碼程式庫,專門用於大型語言模型 (LLM) 的高效推論作業。在 Google Kubernetes Engine (GKE) 上部署時,可利用 Kubernetes 的擴充性和效率,有效提供 LLM。
步驟摘要:
- 將 DAG「mlops-dag.py」上傳至 GCS 值區。
- 將兩個 Kubernetes YAML 設定檔複製到 GCS 值區,以便設定推論。
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
將 Python 指令碼 (DAG 檔案) 和 Kubernetes 資訊清單上傳至 DAGS GCS bucket。
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
在 Airflow 使用者介面中,您會看到 mlops-dag。
- 選取「取消暫停」。
- 選取「觸發 DAG」即可執行手動 MLOps 週期。
DAG 完成後,您會在 Airflow UI 中看到類似下方的輸出內容。
完成最後一個步驟後,您可以取得模型端點並傳送提示,以便測試模型。
請等候約 5 分鐘再發出 curl 指令,以便模型開始推論,並讓負載平衡器指派外部 IP 位址。
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
輸出:
19. 恭喜!
您已在 GKE 上使用 Airflow 2 的 DAG 管道建立第一個 AI 工作流程。
別忘了取消部署的資源。
20. 在實際工作環境中執行此操作
雖然本程式碼研究室已提供有關在 GKE 上設定 Airflow 2 的絕佳洞察資料,但在實際工作環境中執行這項操作時,您仍應考慮以下幾個主題。
使用 Gradio 或類似工具實作網路前端。
請參閱這篇文章,為 GKE 工作負載設定自動應用程式監控功能,或是參閱這篇文章,從 Airflow 匯出指標。
您可能需要更大的 GPU 才能更快地微調模型,尤其是在您擁有較大資料集的情況下。不過,如果要跨多個 GPU 訓練模型,就必須分割資料集並分割訓練作業。以下是 FSDP 搭配 PyTorch 的說明 (完全分割的資料並行處理,使用 GPU 共用功能達成此目標)。如需進一步閱讀,請參閱 Meta 的這篇網誌文章,以及這篇教學課程,瞭解如何使用 Pytorch 進行 FSDP。
Google Cloud Composer 是代管式 Airflow 服務,因此您不必自行維護 Airflow,只要部署 DAG 即可。
瞭解詳情
- Airflow 說明文件:https://airflow.apache.org/
授權
這項內容採用的授權為 Creative Commons 姓名標示 2.0 通用授權。