Xây dựng quy trình MLOps bằng Airflow 2 trên GKE

1. Tổng quan

852dc8844309ffb8.png

Lớp học lập trình này minh hoạ cách tích hợp các phương pháp DevOps vào công nghệ học máy (MLOps) bằng cách tải một tập dữ liệu xuống, tinh chỉnh một mô hình và triển khai LLM trên Google Kubernetes Engine (GKE) bằng cách sử dụng DAG Airflow với mức độ trừu tượng thấp nhất. Do đó, chúng tôi sẽ sử dụng các lệnh gcloud thay vì terraform để bạn có thể làm theo từng bước trong lớp học này và dễ dàng hiểu được từng quy trình từ góc độ của cả Kỹ sư nền tảng và Kỹ sư học máy.

Hướng dẫn thực hành này sẽ hướng dẫn bạn cách tận dụng Airflow để đơn giản hoá quy trình làm việc về AI, đồng thời cung cấp minh hoạ rõ ràng và thực tế về toàn bộ vòng đời MLOps bằng cách định cấu hình DAG.

Kiến thức bạn sẽ học được

  • Thúc đẩy khả năng cộng tác và sự hiểu biết giữa Kỹ sư nền tảng và Kỹ sư học máy bằng cách phá vỡ các silo kiến thức và cải thiện quy trình làm việc
  • Tìm hiểu cách triển khai, sử dụng và quản lý Airflow 2 trên GKE
  • Định cấu hình DAG Airflow từ đầu đến cuối
  • Xây dựng nền tảng cho các hệ thống học máy cấp độ sản xuất bằng GKE
  • Đo lường và vận hành hệ thống học máy
  • Tìm hiểu cách Kỹ thuật nền tảng trở thành trụ cột hỗ trợ quan trọng cho MLOps

Mục tiêu của lớp học lập trình này

  • Bạn có thể đặt câu hỏi về phim thông qua một LLM mà chúng tôi đã tinh chỉnh dựa trên Gemma-2-9b-it, được phân phát trong GKE bằng vLLM.

Đối tượng mục tiêu

  • Kỹ sư học máy
  • Kỹ sư nền tảng
  • Nhà khoa học dữ liệu
  • Kỹ sư dữ liệu
  • Kỹ sư DevOps
  • Kiến trúc sư nền tảng
  • Kỹ sư hỗ trợ khách hàng

Lớp học lập trình này không dành cho

  • Giới thiệu về quy trình làm việc của GKE hoặc AI/ML
  • Dưới dạng một bản chạy qua toàn bộ bộ tính năng của Airflow

2. Nhóm Kỹ thuật nền tảng hỗ trợ Kỹ sư/Nhà khoa học học máy

16635a8284b994c.png

Kỹ thuật nền tảng và MLOps là những ngành học phụ thuộc lẫn nhau, cộng tác để tạo ra một môi trường mạnh mẽ và hiệu quả cho việc phát triển và triển khai công nghệ học máy.

Phạm vi: Kỹ thuật nền tảng có phạm vi rộng hơn MLOps, bao gồm toàn bộ vòng đời phát triển phần mềm và cung cấp các công cụ và cơ sở hạ tầng cho vòng đời đó.

MLOps giúp thu hẹp khoảng cách giữa việc phát triển, triển khai và suy luận về công nghệ học máy.

Chuyên môn: Kỹ sư nền tảng thường có chuyên môn sâu về các công nghệ cơ sở hạ tầng như điện toán đám mây, đóng gói và quản lý dữ liệu.

Kỹ sư MLOps chuyên phát triển, triển khai và giám sát mô hình học máy, thường có kỹ năng khoa học dữ liệu và kỹ thuật phần mềm.

Công cụ: Các kỹ sư nền tảng tạo ra các công cụ để cung cấp cơ sở hạ tầng, quản lý cấu hình, điều phối vùng chứa và tạo ứng dụng. Kỹ sư MLOps sử dụng các công cụ để huấn luyện, thử nghiệm, triển khai, giám sát và tạo phiên bản mô hình học máy.

3. Yêu cầu và cách thiết lập Google Cloud

Thiết lập môi trường theo tốc độ của riêng bạn

  1. Đăng nhập vào Google Cloud Console rồi tạo một dự án mới hoặc sử dụng lại một dự án hiện có. Nếu chưa có tài khoản Gmail hoặc Google Workspace, bạn phải tạo một tài khoản.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • Tên dự án là tên hiển thị cho người tham gia dự án này. Đây là một chuỗi ký tự không được API của Google sử dụng. Bạn luôn có thể cập nhật thông tin này.
  • Mã dự án là duy nhất trên tất cả các dự án Google Cloud và không thể thay đổi (không thể thay đổi sau khi đặt). Cloud Console sẽ tự động tạo một chuỗi duy nhất; thường thì bạn không cần quan tâm đến chuỗi này. Trong hầu hết các lớp học lập trình, bạn sẽ cần tham chiếu đến Mã dự án (thường được xác định là PROJECT_ID). Nếu không thích mã được tạo, bạn có thể tạo một mã ngẫu nhiên khác. Ngoài ra, bạn có thể thử dùng email của riêng mình để xem có thể sử dụng hay không. Bạn không thể thay đổi thông tin này sau bước này và thông tin này sẽ được giữ nguyên trong suốt thời gian diễn ra dự án.
  • Xin lưu ý rằng có một giá trị thứ ba là Mã dự án mà một số API sử dụng. Tìm hiểu thêm về cả ba giá trị này trong tài liệu.
  1. Tiếp theo, bạn cần bật tính năng thanh toán trong Cloud Console để sử dụng các tài nguyên/API trên Cloud. Việc tham gia lớp học lập trình này sẽ không tốn kém nhiều chi phí, nếu có. Để tắt các tài nguyên nhằm tránh bị tính phí sau khi hoàn tất hướng dẫn này, bạn có thể xoá các tài nguyên đã tạo hoặc xoá dự án. Người dùng mới của Google Cloud đủ điều kiện tham gia chương trình Dùng thử miễn phí 300 USD.

Khởi động Cloud Shell

Mặc dù có thể điều khiển Google Cloud từ xa trên máy tính xách tay, nhưng trong lớp học lập trình này, bạn sẽ sử dụng Cloud Shell, một môi trường dòng lệnh chạy trên đám mây.

Kích hoạt Cloud Shell

  1. Trên Cloud Console, hãy nhấp vào Kích hoạt Cloud Shell 853e55310c205094.png.

3c1dabeca90e44e5.png

Nếu đây là lần đầu tiên bạn khởi động Cloud Shell, bạn sẽ thấy một màn hình trung gian mô tả về Cloud Shell. Nếu bạn thấy một màn hình trung gian, hãy nhấp vào Tiếp tục.

9c92662c6a846a5c.png

Quá trình cấp phép và kết nối với Cloud Shell chỉ mất vài phút.

9f0e51b578fecce5.png

Máy ảo này được tải sẵn tất cả các công cụ phát triển cần thiết. Ứng dụng này cung cấp một thư mục gốc 5 GB ổn định và chạy trong Google Cloud, giúp cải thiện đáng kể hiệu suất mạng và xác thực. Bạn có thể thực hiện hầu hết (nếu không phải tất cả) công việc trong lớp học lập trình này bằng trình duyệt.

Sau khi kết nối với Cloud Shell, bạn sẽ thấy mình đã được xác thực và dự án được đặt thành mã dự án của bạn.

  1. Chạy lệnh sau trong Cloud Shell để xác nhận rằng bạn đã được xác thực:
gcloud auth list

Kết quả của lệnh

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Chạy lệnh sau trong Cloud Shell để xác nhận rằng lệnh gcloud biết về dự án của bạn:
gcloud config list project

Kết quả của lệnh

[core]
project = <PROJECT_ID>

Nếu không, bạn có thể đặt giá trị này bằng lệnh sau:

gcloud config set project <PROJECT_ID>

Kết quả của lệnh

Updated property [core/project].

4. Bước 1 – Đăng ký và xác thực trên Kaggle

Để bắt đầu lớp học lập trình này, bạn cần tạo một tài khoản trên Kaggle. Đây là một nền tảng cộng đồng trực tuyến dành cho các nhà khoa học dữ liệu và những người đam mê học máy. Nền tảng này thuộc sở hữu của Google và lưu trữ một kho lưu trữ khổng lồ gồm các tập dữ liệu có sẵn công khai cho nhiều lĩnh vực. Bạn sẽ tải tập dữ liệu RottenTomatoes xuống từ trang web này để huấn luyện mô hình.

  • Đăng ký Kaggle, bạn có thể sử dụng tính năng Đăng nhập một lần của Google để đăng nhập
  • Chấp nhận (các) điều khoản và điều kiện
  • Chuyển đến phần Cài đặt và lấy tên người dùng tên người dùng
  • Trong phần API, hãy chọn "Tạo mã thông báo mới từ" Kaggle để tải kaggle.json xuống
  • Nếu bạn gặp vấn đề, hãy truy cập vào trang hỗ trợ tại đây

5. Bước 2 – Đăng ký và xác thực trên HuggingFace

HuggingFace là một trung tâm để mọi người có thể tham gia vào công nghệ Học máy. Kho lưu trữ này lưu trữ 900.000 mô hình, 200.000 tập dữ liệu và 300.000 ứng dụng minh hoạ (Spaces), tất cả đều là nguồn mở và được cung cấp công khai.

  • Đăng ký HuggingFace – Tạo tài khoản có tên người dùng, bạn không thể sử dụng tính năng Đăng nhập một lần của Google
  • Xác nhận địa chỉ email của bạn
  • Truy cập tại đây và chấp nhận giấy phép cho mô hình Gemma-2-9b-it
  • Tạo mã thông báo HuggingFace tại đây
  • Ghi lại thông tin xác thực mã thông báo, bạn sẽ cần thông tin này sau

6. Bước 3 – Tạo các tài nguyên cơ sở hạ tầng cần thiết trên Google Cloud

Bạn sẽ thiết lập GKE, GCE, Cấu phần phần mềm đăng ký và áp dụng các vai trò IAM bằng cách sử dụng kết hợp danh tính khối lượng công việc.

Quy trình công việc AI của bạn sử dụng hai nhóm nút, một nhóm để huấn luyện và một nhóm để suy luận. Vùng chứa nút đào tạo đang sử dụng một máy ảo GCE g2-standard-8 được trang bị một GPU Nvidia L4 Tensor Core. Nhóm nút suy luận đang sử dụng máy ảo g2-standard-24 được trang bị hai GPU Nvidia L4 Tensor Core. Khi chỉ định khu vực, hãy chọn một khu vực hỗ trợ GPU bắt buộc ( Đường liên kết).

Trong Cloud Shell, hãy chạy các lệnh sau:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

Tạo tệp kê khai YAML

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Tạo 3 bộ chứa Google Cloud Storage (GCS)

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. Bước 4 – Cài đặt Airflow trên GKE thông qua biểu đồ Helm

Bây giờ, chúng ta sẽ triển khai Airflow 2 bằng Helm. Apache Airflow là một nền tảng quản lý quy trình công việc nguồn mở cho quy trình kỹ thuật dữ liệu. Chúng ta sẽ tìm hiểu về bộ tính năng của Airflow 2 sau.

values.yaml cho biểu đồ Helm của Airflow

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Triển khai Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. Bước 5 – Khởi chạy Airflow bằng các kết nối và biến

Sau khi triển khai Airflow 2, chúng ta có thể bắt đầu định cấu hình. Chúng ta xác định một số biến mà tập lệnh Python sẽ đọc.

  1. Truy cập vào giao diện người dùng Airflow trên cổng 8080 bằng trình duyệt

Lấy địa chỉ IP bên ngoài

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

Mở trình duyệt web rồi truy cập vào http://<EXTERNAL-IP>:8080 . Tên đăng nhập là admin / admin

  1. Tạo một kết nối GCP mặc định trong giao diện người dùng Airflow, vì vậy, hãy chuyển đến phần Quản trị → Kết nối → + Thêm bản ghi mới
  • Mã kết nối: google_cloud_default
  • Loại kết nối: Google Cloud

Nhấp vào Lưu.

  1. Tạo các biến cần thiết, vì vậy, hãy chuyển đến phần Quản trị → Biến → + Thêm bản ghi mới
  • Khoá: BUCKET_DATA_NAME – Giá trị: Sao chép từ echo $BUCKET_DATA_NAME
  • Khoá: GCP_PROJECT_ID – Giá trị: Sao chép từ echo $DEVSHELL_PROJECT_ID
  • Khoá: HF_TOKEN – Giá trị: Chèn mã thông báo HF
  • Khoá: KAGGLE_USERNAME – Giá trị: Nhập tên người dùng Kaggle của bạn
  • Khoá: KAGGLE_KEY – Giá trị: Sao chép khoá này từ kaggle.json

Nhấp vào Lưu sau mỗi cặp khoá-giá trị.

Giao diện người dùng của bạn sẽ có dạng như sau:

771121470131b5ec.png

9. Vùng chứa mã xử lý ứng dụng #1 – Tải dữ liệu xuống

Trong tập lệnh Python này, chúng ta xác thực bằng Kaggle để tải tập dữ liệu xuống bộ chứa GCS.

Bản thân tập lệnh này được đóng gói vì đây sẽ là Đơn vị DAG #1 và chúng tôi dự kiến tập dữ liệu sẽ được cập nhật thường xuyên, vì vậy, chúng tôi muốn tự động hoá việc này.

Tạo thư mục và sao chép tập lệnh của chúng ta vào đây

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

tệp Docker

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

Bây giờ, chúng ta sẽ tạo một hình ảnh vùng chứa để tải tập dữ liệu xuống và đẩy hình ảnh đó vào Cấu phần phần mềm lưu trữ

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. Vùng chứa mã ứng dụng #2 – Chuẩn bị dữ liệu

Trong bước chuẩn bị dữ liệu, chúng ta đã đạt được những điều sau:

  1. Chỉ định lượng dữ liệu mà chúng ta muốn sử dụng để tinh chỉnh mô hình cơ sở
  2. Tải tập dữ liệu, tức là đọc tệp CSV vào một khung dữ liệu Pandas, đây là cấu trúc dữ liệu 2 chiều cho các hàng và cột
  3. Biến đổi / xử lý trước dữ liệu – Xác định những phần của tập dữ liệu không liên quan bằng cách chỉ định những phần mà chúng ta muốn giữ lại, từ đó xoá phần còn lại
  4. Áp dụng hàm transform cho từng hàng của DataFrame
  5. Lưu dữ liệu đã chuẩn bị trở lại bộ chứa GCS

Tạo thư mục và sao chép tập lệnh của chúng ta vào đây

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

tệp Docker

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. Vùng chứa mã xử lý ứng dụng #3 – Điều chỉnh

Ở đây, chúng ta sử dụng Gemma-2-9b-it làm mô hình cơ sở, sau đó tinh chỉnh mô hình đó bằng tập dữ liệu mới.

Đây là trình tự các bước diễn ra trong bước tinh chỉnh.

1. Thiết lập: Nhập thư viện, xác định các tham số (cho mô hình, dữ liệu và hoạt động huấn luyện) rồi tải tập dữ liệu từ Google Cloud Storage.

2. Tải mô hình: Tải mô hình ngôn ngữ được huấn luyện trước bằng tính năng lượng tử hoá để tăng hiệu quả và tải trình tạo mã thông báo tương ứng.

3. Định cấu hình LoRA: Thiết lập tính năng Điều chỉnh theo thứ hạng thấp (LoRA) để tinh chỉnh mô hình một cách hiệu quả bằng cách thêm các ma trận nhỏ có thể huấn luyện.

4. Huấn luyện: Xác định các tham số huấn luyện và sử dụng SFTTrainer để tinh chỉnh mô hình trên tập dữ liệu đã tải bằng cách sử dụng loại lượng tử hoá FP16.

5. Lưu và tải lên: Lưu mô hình và trình phân tích cú pháp được tinh chỉnh trên máy, sau đó tải các mô hình và trình phân tích cú pháp đó lên bộ chứa GCS.

Sau đó, chúng ta tạo một hình ảnh vùng chứa bằng Cloud Build và lưu trữ hình ảnh đó trong Cấu phần phần mềm.

Tạo thư mục và sao chép tập lệnh của chúng ta vào đây

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

tệp Docker

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

Bây giờ, chúng ta sẽ tạo hình ảnh vùng chứa để tinh chỉnh và đẩy hình ảnh đó vào Cấu phần phần mềm lưu trữ

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Tổng quan về Airflow 2, bao gồm cả DAG là gì

Airflow là một nền tảng để điều phối quy trình công việc và quy trình dữ liệu. Công cụ này sử dụng DAG (Đồ thị không chu trình có hướng) để xác định các quy trình công việc này trong mã Python, biểu thị trực quan các tác vụ và phần phụ thuộc của chúng.

Airflow, với các DAG tĩnh và định nghĩa dựa trên Python, rất phù hợp để lên lịch và quản lý các quy trình công việc được xác định trước. Cấu trúc của công cụ này bao gồm một giao diện người dùng thân thiện để theo dõi và quản lý các quy trình công việc này.

Về cơ bản, Airflow cho phép bạn xác định, lên lịch và giám sát quy trình dữ liệu bằng Python, giúp công cụ này trở thành một công cụ linh hoạt và mạnh mẽ để điều phối quy trình công việc.

13. Tổng quan về DAG

ec49964ad7d61491.png

DAG là viết tắt của Directed Acyclic Graph (Đồ thị không chu trình có hướng). Trong Airflow, DAG chính là toàn bộ quy trình công việc hoặc quy trình. Tệp này xác định các tác vụ, phần phụ thuộc của các tác vụ đó và thứ tự thực thi.

Các đơn vị quy trình công việc trong DAG được thực thi từ một vùng chứa trên cụm GKE, được khởi tạo từ cấu hình Airflow.

Tóm tắt:

Airflow: Tải dữ liệu xuống – Tập lệnh này tự động hoá quy trình lấy tập dữ liệu bài đánh giá phim từ Kaggle và lưu trữ tập dữ liệu đó trong bộ chứa GCS, giúp bạn có thể dễ dàng xử lý hoặc phân tích thêm trong môi trường đám mây.

Airflow: Chuẩn bị dữ liệu – Mã này lấy tập dữ liệu bài đánh giá phim thô, xoá các cột dữ liệu không cần thiết cho trường hợp sử dụng của chúng ta và xoá các tập dữ liệu bị thiếu giá trị. Tiếp theo, công cụ này sẽ sắp xếp tập dữ liệu thành định dạng trả lời câu hỏi phù hợp với việc học máy và lưu trữ lại tập dữ liệu đó trong GCS để sử dụng sau này.

Airflow: Model Finetuning (Airflow: Điều chỉnh mô hình) – Mã này điều chỉnh mô hình ngôn ngữ lớn (LLM) bằng một kỹ thuật có tên là LoRA (Thích ứng thứ hạng thấp) rồi lưu mô hình đã cập nhật. Mô-đun này bắt đầu bằng cách tải một LLM được huấn luyện trước và một tập dữ liệu từ Google Cloud Storage. Sau đó, mô hình này sẽ áp dụng LoRA để tinh chỉnh hiệu quả mô hình trên tập dữ liệu này. Cuối cùng, mô hình được tinh chỉnh sẽ được lưu trở lại Google Cloud Storage để sử dụng sau này trong các ứng dụng như tạo văn bản hoặc trả lời câu hỏi.

Airflow: Phân phát mô hình – Phân phát mô hình được tinh chỉnh trên GKE bằng vllm để suy luận.

Airflow: Vòng phản hồi – Huấn luyện lại mô hình mỗi xx lần (theo giờ, hằng ngày, hằng tuần).

Sơ đồ này giải thích cách hoạt động của Airflow 2 khi chạy trên GKE.

8691f41166209a5d.png

14. Điều chỉnh mô hình so với sử dụng RAG

Lớp học lập trình này tinh chỉnh LLM thay vì sử dụng tính năng Tạo tăng cường truy xuất (RAG).

Hãy so sánh hai phương pháp này:

Điều chỉnh chi tiết: Tạo một mô hình chuyên biệt: Điều chỉnh chi tiết giúp LLM thích ứng với một tác vụ hoặc tập dữ liệu cụ thể, cho phép mô hình này hoạt động độc lập mà không cần dựa vào các nguồn dữ liệu bên ngoài.

Đơn giản hoá quá trình suy luận: Nhờ đó, bạn không cần có hệ thống truy xuất và cơ sở dữ liệu riêng biệt, giúp phản hồi nhanh hơn và tiết kiệm hơn, đặc biệt là đối với các trường hợp sử dụng thường xuyên.

RAG: Dựa vào kiến thức bên ngoài: RAG truy xuất thông tin liên quan từ cơ sở kiến thức cho mỗi yêu cầu, đảm bảo quyền truy cập vào dữ liệu cụ thể và mới nhất.

Tăng độ phức tạp: Việc triển khai RAG trong môi trường sản xuất như cụm Kubernetes thường liên quan đến nhiều dịch vụ vi mô để xử lý và truy xuất dữ liệu, có thể làm tăng độ trễ và chi phí tính toán.

Lý do chọn tinh chỉnh:

Mặc dù RAG phù hợp với tập dữ liệu nhỏ được sử dụng trong Lớp học lập trình này, nhưng chúng tôi đã chọn tinh chỉnh để minh hoạ một trường hợp sử dụng thông thường cho Airflow. Lựa chọn này cho phép chúng ta tập trung vào các khía cạnh điều phối quy trình làm việc thay vì đi sâu vào các sắc thái của việc thiết lập cơ sở hạ tầng và dịch vụ vi mô bổ sung cho RAG.

Kết luận:

Cả việc tinh chỉnh và RAG đều là những kỹ thuật có giá trị với những điểm mạnh và điểm yếu riêng. Lựa chọn tối ưu phụ thuộc vào các yêu cầu cụ thể của dự án, chẳng hạn như kích thước và độ phức tạp của dữ liệu, nhu cầu về hiệu suất và các yếu tố cần cân nhắc về chi phí.

15. Tác vụ DAG #1 – Tạo bước đầu tiên trên Airflow: Tải dữ liệu xuống

Dưới đây là thông tin tổng quan về đơn vị DAG này. Mã Python của chúng tôi được lưu trữ trong hình ảnh vùng chứa sẽ tải tập dữ liệu RottenTomatoes mới nhất xuống từ Kaggle.

Đừng sao chép mã này vào bộ chứa GCS. Bước cuối cùng là sao chép mlops-dag.py. Tệp này chứa tất cả các bước của Đơn vị DAG trong một tập lệnh Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. Tác vụ DAG #2 – Tạo bước thứ hai trên Airflow: Chuẩn bị dữ liệu

Để tổng quan về đơn vị DAG này, chúng ta sẽ tải tệp CSV (rotten_tomatoes_movie_reviews.csv) từ GCS vào Pandas DataFrame.

Tiếp theo, chúng ta giới hạn số lượng hàng được xử lý bằng DATASET_LIMIT để kiểm thử và sử dụng hiệu quả tài nguyên, cuối cùng chuyển đổi dữ liệu đã chuyển đổi thành Tập dữ liệu Hugging Face.

Nếu xem xét kỹ, bạn sẽ thấy chúng ta đang huấn luyện 1000 hàng trong mô hình với "DATASET_LIMIT": "1000", điều này là do GPU Nvidia L4 mất 20 phút để thực hiện việc này.

Đừng sao chép mã này vào bộ chứa GCS. Chúng ta sao chép mlops-dag.py ở bước cuối cùng. Tập lệnh này chứa tất cả các bước trong một tập lệnh Python.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. Tác vụ DAG #3 – Tạo bước thứ ba trên Airflow: Điều chỉnh mô hình

Để tổng quan về đơn vị DAG này, chúng ta sẽ thực thi finetune.py để tinh chỉnh mô hình Gemma bằng tập dữ liệu mới.

Đừng sao chép mã này vào bộ chứa GCS. Chúng ta sao chép mlops-dag.py ở bước cuối cùng. Tập lệnh này chứa tất cả các bước trong một tập lệnh Python.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. Tác vụ DAG #4 – Tạo bước cuối cùng trên Airflow: Dự đoán / Phân phát mô hình

vLLM là một thư viện nguồn mở mạnh mẽ, được thiết kế riêng cho hoạt động suy luận hiệu suất cao của LLM. Khi được triển khai trên Google Kubernetes Engine (GKE), công cụ này sẽ tận dụng khả năng mở rộng và hiệu quả của Kubernetes để phân phát LLM một cách hiệu quả.

Tóm tắt các bước:

  • Tải DAG "mlops-dag.py" lên bộ chứa GCS.
  • Sao chép hai tệp cấu hình YAML của Kubernetes để thiết lập quy trình suy luận vào một bộ chứa GCS.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Tải tập lệnh Python (tệp DAG) cũng như tệp kê khai Kubernetes lên bộ chứa GCS DAGS.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

Trong giao diện người dùng Airflow, bạn sẽ thấy mlops-dag.

  1. Chọn huỷ tạm dừng.
  2. Chọn Trigger DAG (Kích hoạt DAG) để thực hiện chu kỳ MLOps theo cách thủ công.

d537281b92d5e8bb.png

Sau khi DAG hoàn tất, bạn sẽ thấy kết quả như sau trong giao diện người dùng Airflow.

3ed42abf8987384e.png

Sau bước cuối cùng, bạn có thể lấy điểm cuối của mô hình và gửi lời nhắc để kiểm thử mô hình.

Hãy đợi khoảng 5 phút trước khi đưa ra lệnh curl để quá trình suy luận của mô hình có thể bắt đầu và bộ cân bằng tải có thể chỉ định Địa chỉ IP bên ngoài.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

Kết quả:

19. Xin chúc mừng!

Bạn đã tạo quy trình làm việc AI đầu tiên bằng quy trình DAG với Airflow 2 trên GKE.

Đừng quên huỷ cấp phép các tài nguyên mà bạn đã triển khai.

20. Thực hiện việc này trong phiên bản chính thức

Mặc dù Lớp học lập trình đã cung cấp cho bạn thông tin chi tiết tuyệt vời về cách thiết lập Airflow 2 trên GKE, nhưng trong thực tế, bạn nên cân nhắc một số chủ đề sau đây khi thực hiện việc này trong môi trường sản xuất.

Triển khai giao diện người dùng web bằng Gradio hoặc công cụ tương tự.

Hãy định cấu hình tính năng giám sát ứng dụng tự động cho các khối lượng công việc bằng GKE tại đây hoặc xuất các chỉ số từ Airflow tại đây.

Bạn có thể cần GPU lớn hơn để tinh chỉnh mô hình nhanh hơn, đặc biệt là nếu có tập dữ liệu lớn hơn. Tuy nhiên, nếu muốn huấn luyện mô hình trên nhiều GPU, chúng ta phải chia tập dữ liệu và phân đoạn quá trình huấn luyện. Dưới đây là nội dung giải thích về FSDP với PyTorch (song song dữ liệu được phân đoạn đầy đủ, sử dụng chế độ chia sẻ GPU để đạt được mục tiêu đó. Bạn có thể đọc thêm tại đây trong một bài đăng trên blog của Meta và một bài đăng khác trong hướng dẫn này về FSDP bằng Pytorch.

Google Cloud Composer là một dịch vụ Airflow được quản lý, vì vậy, bạn không cần phải tự duy trì Airflow, chỉ cần triển khai DAG là xong.

Tìm hiểu thêm

Giấy phép

Tác phẩm này được cấp phép theo Giấy phép chung Ghi công theo Creative Commons 2.0.