การสร้างเวิร์กโฟลว์ MLOps ด้วย Airflow 2 ใน GKE

1. ภาพรวม

852dc8844309ffb8.png

CodeLab นี้สาธิตวิธีผสานรวมแนวทางปฏิบัติของ DevOps เข้ากับแมชชีนเลิร์นนิง (MLOps) ด้วยการดาวน์โหลดชุดข้อมูล ปรับแต่งโมเดล และทำให้ LLM ใช้งานได้ใน Google Kubernetes Engine (GKE) โดยใช้ DAG ของ Airflow ที่มี Abstraction น้อยที่สุด เราจึงใช้คําสั่ง gcloud ไม่ใช่ terraform เพื่อให้คุณทําตามขั้นตอนในแล็บทีละขั้นตอนและเข้าใจกระบวนการแต่ละอย่างได้ง่ายจากมุมมองทั้งวิศวกรแพลตฟอร์มและวิศวกรแมชชีนเลิร์นนิง

คู่มือแบบปฏิบัติจริงนี้จะแนะนำวิธีใช้ประโยชน์จาก Airflow เพื่อปรับปรุงเวิร์กโฟลว์ AI โดยแสดงภาพวงจร MLOps ทั้งหมดอย่างชัดเจนและใช้งานได้จริงผ่านการกําหนดค่า DAG

สิ่งที่คุณจะได้เรียนรู้

  • ส่งเสริมการทำงานร่วมกันและความเข้าใจที่มากขึ้นระหว่างวิศวกรแพลตฟอร์มและวิศวกรแมชชีนเลิร์นนิงด้วยการลดการทำงานแบบแยกส่วนและปรับปรุงเวิร์กโฟลว์
  • ทําความเข้าใจวิธีทำให้ใช้งานได้ ใช้ และจัดการ Airflow 2 ใน GKE
  • กำหนดค่า DAG ของ Airflow ตั้งแต่ต้นจนจบ
  • สร้างพื้นฐานสําหรับระบบแมชชีนเลิร์นนิงระดับเวอร์ชันที่ใช้งานจริงด้วย GKE
  • เครื่องมือและใช้งานระบบแมชชีนเลิร์นนิง
  • ทําความเข้าใจว่าวิศวกรแพลตฟอร์มกลายเป็นเสาหลักสําคัญของการสนับสนุน MLOps ได้อย่างไร

สิ่งที่ CodeLab นี้ทำได้

  • คุณสามารถถามคำถามเกี่ยวกับภาพยนตร์จาก LLM ที่เราปรับแต่งให้ดีขึ้นโดยอิงตาม Gemma-2-9b-it ซึ่งให้บริการใน GKE ด้วย vLLM

กลุ่มเป้าหมาย

  • วิศวกรแมชชีนเลิร์นนิง
  • วิศวกรแพลตฟอร์ม
  • นักวิทยาศาสตร์ข้อมูล
  • วิศวกรข้อมูล
  • วิศวกร DevOps
  • สถาปนิกแพลตฟอร์ม
  • วิศวกรลูกค้า

CodeLab นี้ไม่ได้มีไว้เพื่อ

  • ข้อมูลเบื้องต้นเกี่ยวกับเวิร์กโฟลว์ GKE หรือ AI/ML
  • เป็นการทบทวนชุดฟีเจอร์ทั้งหมดของ Airflow

2. วิศวกรแพลตฟอร์มช่วยวิศวกร/นักวิทยาศาสตร์แมชชีนเลิร์นนิง

16635a8284b994c.png

แพลตฟอร์มวิศวกรรมและ MLOps เป็นสาขาวิชาที่เกื้อหนุนซึ่งกันและกัน ซึ่งทำงานร่วมกันเพื่อสร้างสภาพแวดล้อมที่มีประสิทธิภาพและมีประสิทธิภาพสําหรับการพัฒนาและการใช้งาน ML

ขอบเขต: วิศวกรแพลตฟอร์มมีขอบเขตที่กว้างกว่า MLOps โดยครอบคลุมวงจรการพัฒนาซอฟต์แวร์ทั้งหมด รวมถึงมีเครื่องมือและโครงสร้างพื้นฐานสําหรับการพัฒนาซอฟต์แวร์

MLOps จะช่วยลดช่องว่างระหว่างการพัฒนา การนำไปใช้ และการอนุมานของ ML

ความเชี่ยวชาญ: วิศวกรแพลตฟอร์มมักจะมีความเชี่ยวชาญด้านเทคโนโลยีโครงสร้างพื้นฐาน เช่น คลาวด์คอมพิวติ้ง คอนเทนเนอร์ และการจัดการข้อมูล

วิศวกร MLOps มีความเชี่ยวชาญด้านการพัฒนา การนำไปใช้ และการตรวจสอบโมเดล ML และมักมีทักษะด้านวิทยาศาสตร์ข้อมูลและวิศวกรรมซอฟต์แวร์

เครื่องมือ: วิศวกรแพลตฟอร์มจะสร้างเครื่องมือสำหรับการจัดสรรโครงสร้างพื้นฐาน การจัดการการกําหนดค่า การจัดการคอนเทนเนอร์ และการสร้างโครงสร้างแอปพลิเคชัน วิศวกร MLOps จะใช้เครื่องมือสำหรับการฝึกโมเดล ML, การทดลอง, การติดตั้งใช้งาน, การตรวจสอบ และการจัดการเวอร์ชัน

3. การตั้งค่าและข้อกําหนดของ Google Cloud

การตั้งค่าสภาพแวดล้อมด้วยตนเอง

  1. ลงชื่อเข้าใช้ Google Cloud Console และสร้างโปรเจ็กต์ใหม่หรือใช้โปรเจ็กต์ที่มีอยู่ซ้ำ หากยังไม่มีบัญชี Gmail หรือ Google Workspace คุณต้องสร้างบัญชี

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • ชื่อโปรเจ็กต์คือชื่อที่แสดงสำหรับผู้เข้าร่วมโปรเจ็กต์นี้ ซึ่งเป็นสตริงอักขระที่ Google APIs ไม่ได้ใช้ คุณจะอัปเดตได้ทุกเมื่อ
  • รหัสโปรเจ็กต์จะซ้ำกันไม่ได้ในโปรเจ็กต์ Google Cloud ทั้งหมดและจะเปลี่ยนแปลงไม่ได้ (เปลี่ยนแปลงไม่ได้หลังจากตั้งค่าแล้ว) คอนโซล Cloud จะสร้างสตริงที่ไม่ซ้ำกันโดยอัตโนมัติ ซึ่งปกติแล้วคุณไม่จำเป็นต้องสนใจว่าสตริงนั้นจะเป็นอะไร ในโค้ดแล็บส่วนใหญ่ คุณจะต้องอ้างอิงรหัสโปรเจ็กต์ (ปกติจะระบุเป็น PROJECT_ID) หากไม่ชอบรหัสที่สร้างขึ้น คุณอาจสร้างรหัสอื่นแบบสุ่มได้ หรือจะลองใช้รหัสของคุณเองเพื่อดูว่ารหัสนั้นใช้งานได้หรือไม่ก็ได้ คุณจะเปลี่ยนแปลงหลังจากขั้นตอนนี้ไม่ได้ และชื่อนี้จะคงอยู่ตลอดระยะเวลาของโปรเจ็กต์
  • โปรดทราบว่ามีค่าที่ 3 ซึ่งเป็นหมายเลขโปรเจ็กต์ที่ API บางรายการใช้ ดูข้อมูลเพิ่มเติมเกี่ยวกับค่าทั้ง 3 รายการนี้ได้ในเอกสารประกอบ
  1. ถัดไป คุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร/API ของ Cloud การทำตามโค้ดแล็บนี้จะไม่เสียค่าใช้จ่ายมากนัก หากต้องการปิดทรัพยากรเพื่อหลีกเลี่ยงการเรียกเก็บเงินหลังจากบทแนะนำนี้ คุณก็ลบทรัพยากรที่สร้างไว้หรือลบโปรเจ็กต์ได้ ผู้ใช้ Google Cloud รายใหม่มีสิทธิ์เข้าร่วมโปรแกรมช่วงทดลองใช้ฟรีมูลค่า$300 USD

เริ่ม Cloud Shell

แม้ว่า Google Cloud จะทำงานจากระยะไกลจากแล็ปท็อปได้ แต่ในโค้ดแล็บนี้ คุณจะใช้ Cloud Shell ซึ่งเป็นสภาพแวดล้อมบรรทัดคำสั่งที่ทำงานในระบบคลาวด์

เปิดใช้งาน Cloud Shell

  1. จาก Cloud Console ให้คลิกเปิดใช้งาน Cloud Shell 853e55310c205094.png

3c1dabeca90e44e5.png

หากนี่เป็นครั้งแรกที่คุณเริ่มใช้ Cloud Shell คุณจะเห็นหน้าจอกลางที่อธิบายเกี่ยวกับ Cloud Shell หากเห็นหน้าจอกลาง ให้คลิกต่อไป

9c92662c6a846a5c.png

การจัดสรรและเชื่อมต่อกับ Cloud Shell ใช้เวลาเพียงไม่กี่นาที

9f0e51b578fecce5.png

เครื่องเสมือนนี้โหลดเครื่องมือการพัฒนาที่จำเป็นทั้งหมดไว้แล้ว ซึ่งจะมีไดเรกทอรีหลักขนาด 5 GB ถาวรและทำงานใน Google Cloud ซึ่งจะช่วยเพิ่มประสิทธิภาพเครือข่ายและการรับรองได้อย่างมีประสิทธิภาพ คุณทํางานส่วนใหญ่ในโค้ดแล็บนี้ได้โดยใช้เบราว์เซอร์

เมื่อเชื่อมต่อกับ Cloud Shell แล้ว คุณควรเห็นการรับรองและโปรเจ็กต์ที่ตั้งค่าเป็นรหัสโปรเจ็กต์ของคุณ

  1. เรียกใช้คําสั่งต่อไปนี้ใน Cloud Shell เพื่อยืนยันว่าคุณได้รับการตรวจสอบสิทธิ์
gcloud auth list

เอาต์พุตจากคำสั่ง

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell เพื่อยืนยันว่าคำสั่ง gcloud รู้จักโปรเจ็กต์ของคุณ
gcloud config list project

เอาต์พุตจากคำสั่ง

[core]
project = <PROJECT_ID>

หากไม่เป็นเช่นนั้น ให้ตั้งค่าด้วยคําสั่งนี้

gcloud config set project <PROJECT_ID>

เอาต์พุตจากคำสั่ง

Updated property [core/project].

4. ขั้นตอนที่ 1 - ลงชื่อสมัครใช้และตรวจสอบสิทธิ์ใน Kaggle

หากต้องการเริ่มใช้งาน CodeLab คุณต้องสร้างบัญชีใน Kaggle ซึ่งเป็นแพลตฟอร์มชุมชนออนไลน์สำหรับนักวิทยาศาสตร์ข้อมูลและผู้สนใจแมชชีนเลิร์นนิงที่ Google เป็นเจ้าของ และโฮสต์ที่เก็บชุดข้อมูลที่เผยแพร่ต่อสาธารณะจำนวนมากสำหรับโดเมนต่างๆ คุณจะต้องดาวน์โหลดชุดข้อมูล RottenTomatoes จากเว็บไซต์นี้เพื่อใช้ฝึกโมเดล

  • ลงชื่อสมัครใช้ Kaggle คุณสามารถใช้ SSO ของ Google เพื่อลงชื่อเข้าใช้ได้
  • ยอมรับข้อกำหนดในการให้บริการ
  • ไปที่การตั้งค่าและดูชื่อผู้ใช้ username
  • ในส่วน API ให้เลือก "สร้างโทเค็นใหม่จาก" Kaggle ซึ่งจะดาวน์โหลด kaggle.json
  • หากพบปัญหาใดๆ โปรดไปที่หน้าการสนับสนุนที่นี่

5. ขั้นตอนที่ 2 - ลงชื่อสมัครใช้และตรวจสอบสิทธิ์ใน HuggingFace

HuggingFace เป็นศูนย์กลางที่ทุกคนสามารถมีส่วนร่วมกับเทคโนโลยีแมชชีนเลิร์นนิง โดยโฮสต์โมเดล 900, 000 รายการ ชุดข้อมูล 200, 000 ชุด และแอปเดโม (พื้นที่ทำงาน) 300,000 แอป ซึ่งทั้งหมดเป็นแบบโอเพนซอร์สและเข้าถึงได้แบบสาธารณะ

  • ลงชื่อสมัครใช้ HuggingFace - สร้างบัญชีด้วยชื่อผู้ใช้ คุณใช้ SSO ของ Google ไม่ได้
  • ยืนยันที่อยู่อีเมลของคุณ
  • ไปที่ที่นี่และยอมรับใบอนุญาตสำหรับรุ่น Gemma-2-9b-it
  • สร้างโทเค็น HuggingFace ที่นี่
  • บันทึกข้อมูลเข้าสู่ระบบโทเค็นไว้ คุณต้องใช้ข้อมูลนี้ในภายหลัง

6. ขั้นตอนที่ 3 - สร้างทรัพยากรโครงสร้างพื้นฐาน Google Cloud ที่จําเป็น

คุณจะต้องตั้งค่า GKE, GCE, รีจิสทรีอาร์ติแฟกต์ และใช้บทบาท IAM โดยใช้การรวมข้อมูลประจำตัวของเวิร์กโหลด

เวิร์กโฟลว์ AI ใช้กลุ่มโหนด 2 กลุ่ม ได้แก่ 1 กลุ่มสําหรับการฝึกและอีก 1 กลุ่มสําหรับการอนุมาน กลุ่มโหนดการฝึกอบรมใช้ GCE VM ขนาด g2-standard-8 ที่มี GPU Nvidia L4 Tensor Core 1 ตัว กลุ่มโหนดที่ใช้ทําการอนุมานใช้ VM g2-standard-24 ที่มี GPU Nvidia L4 Tensor Core 2 ตัว ขณะระบุภูมิภาค ให้เลือกภูมิภาคที่รองรับ GPU ที่จำเป็น ( ลิงก์)

ใน Cloud Shell ให้เรียกใช้คําสั่งต่อไปนี้

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

สร้างไฟล์ Manifest รูปแบบ YAML

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

สร้างที่เก็บข้อมูล Google Cloud Storage (GCS) 3 รายการ

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. ขั้นตอนที่ 4 - ติดตั้ง Airflow ใน GKE ผ่านแผนภูมิ Helm

ตอนนี้เราทำให้ Airflow 2 ใช้งานได้โดยใช้ Helm Apache Airflow เป็นแพลตฟอร์มการจัดการเวิร์กโฟลว์แบบโอเพนซอร์สสำหรับไปป์ไลน์วิศวกรข้อมูล เราจะพูดถึงชุดฟีเจอร์ของ Airflow 2 ในภายหลัง

values.yaml สำหรับแผนภูมิ Helm ของ Airflow

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

ติดตั้งใช้งาน Airflow 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. ขั้นตอนที่ 5 - เริ่มต้น Airflow ด้วยการเชื่อมต่อและตัวแปร

เมื่อติดตั้งใช้งาน Airflow 2 แล้ว เราจะเริ่มกําหนดค่าได้ เรากําหนดตัวแปรบางอย่างซึ่งสคริปต์ Python จะอ่าน

  1. เข้าถึง UI ของ Airflow ในพอร์ต 8080 ด้วยเบราว์เซอร์

รับ IP ภายนอก

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

เปิดเว็บเบราว์เซอร์แล้วไปที่ http://<EXTERNAL-IP>:8080 เข้าสู่ระบบด้วย admin / admin

  1. สร้างการเชื่อมต่อ GCP เริ่มต้นภายใน UI ของ Airflow โดยไปที่ผู้ดูแลระบบ → การเชื่อมต่อ → + เพิ่มระเบียนใหม่
  • รหัสการเชื่อมต่อ: google_cloud_default
  • ประเภทการเชื่อมต่อ: Google Cloud

คลิกบันทึก

  1. สร้างตัวแปรที่จําเป็น โดยไปที่ผู้ดูแลระบบ → ตัวแปร → + เพิ่มระเบียนใหม่
  • คีย์: BUCKET_DATA_NAME - ค่า: คัดลอกจาก echo $BUCKET_DATA_NAME
  • คีย์: GCP_PROJECT_ID - ค่า: คัดลอกจาก echo $DEVSHELL_PROJECT_ID
  • คีย์: HF_TOKEN - ค่า: แทรกโทเค็น HF
  • คีย์: KAGGLE_USERNAME - ค่า: ป้อนชื่อผู้ใช้ Kaggle
  • คีย์: KAGGLE_KEY - ค่า: คัดลอกจาก kaggle.json

คลิก "บันทึก" หลังจากคู่คีย์-ค่าแต่ละคู่

UI ควรมีลักษณะดังนี้

771121470131b5ec.png

9. คอนเทนเนอร์โค้ดแอปพลิเคชัน #1 - การดาวน์โหลดข้อมูล

ในสคริปต์ Python นี้ เราจะตรวจสอบสิทธิ์กับ Kaggle เพื่อดาวน์โหลดชุดข้อมูลไปยังที่เก็บข้อมูล GCS

สคริปต์เองเป็นแบบคอนเทนเนอร์เนื่องจากจะกลายเป็น DAG Unit #1 และเราคาดว่าชุดข้อมูลจะได้รับการอัปเดตบ่อยครั้ง เราจึงต้องการทำให้กระบวนการนี้เป็นแบบอัตโนมัติ

สร้างไดเรกทอรีและคัดลอกสคริปต์ของเราที่นี่

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

ตอนนี้เราจะสร้างอิมเมจคอนเทนเนอร์สําหรับการดาวน์โหลดชุดข้อมูลและพุชไปยัง Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. คอนเทนเนอร์โค้ดแอปพลิเคชัน #2 - การเตรียมข้อมูล

สิ่งที่เราทำได้ในขั้นตอนการเตรียมข้อมูลมีดังนี้

  1. ระบุจำนวนชุดข้อมูลที่ต้องการใช้เพื่อปรับแต่งโมเดลพื้นฐาน
  2. โหลดชุดข้อมูล เช่น อ่านไฟล์ CSV ลงใน DataFrame ของ Pandas ซึ่งเป็นโครงสร้างข้อมูล 2 มิติสําหรับแถวและคอลัมน์
  3. การเปลี่ยนรูปแบบข้อมูล / การประมวลผลข้อมูลเบื้องต้น - ระบุส่วนใดของชุดข้อมูลที่ไม่เกี่ยวข้องโดยระบุสิ่งที่เราต้องการเก็บไว้ ซึ่งจะนําส่วนที่เหลือออก
  4. ใช้ฟังก์ชัน transform กับแต่ละแถวของ DataFrame
  5. บันทึกข้อมูลที่เตรียมไว้กลับไปยังที่เก็บข้อมูล GCS

สร้างไดเรกทอรีและคัดลอกสคริปต์ของเราที่นี่

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. คอนเทนเนอร์โค้ดแอปพลิเคชัน #3 - การปรับแต่ง

ในที่นี้เราใช้ Gemma-2-9b-it เป็นโมเดลพื้นฐาน จากนั้นปรับแต่งด้วยชุดข้อมูลใหม่

ลำดับขั้นตอนที่เกิดขึ้นระหว่างขั้นตอนการปรับแต่งมีดังนี้

1. การตั้งค่า: นําเข้าไลบรารี กําหนดพารามิเตอร์ (สําหรับโมเดล ข้อมูล และการฝึก) และโหลดชุดข้อมูลจาก Google Cloud Storage

2. โหลดโมเดล: โหลดโมเดลภาษาที่ผ่านการฝึกอบรมล่วงหน้าด้วยการแปลงค่าเพื่อประสิทธิภาพ และโหลดตัวแยกวิเคราะห์ที่เกี่ยวข้อง

3. กำหนดค่า LoRA: ตั้งค่าการปรับ Low-Rank (LoRA) เพื่อปรับแต่งโมเดลอย่างมีประสิทธิภาพด้วยการเพิ่มเมทริกซ์ขนาดเล็กที่ฝึกได้

4. ฝึก: กําหนดพารามิเตอร์การฝึกและใช้ SFTTrainer เพื่อปรับแต่งโมเดลในชุดข้อมูลที่โหลดโดยใช้ประเภทการหาค่าประมาณ FP16

5. บันทึกและอัปโหลด: บันทึกโมเดลและตัวแยกวิเคราะห์ที่ปรับแต่งแล้วในเครื่อง จากนั้นอัปโหลดไปยังที่เก็บข้อมูล GCS

จากนั้นเราจะสร้างอิมเมจคอนเทนเนอร์โดยใช้ Cloud Build และจัดเก็บไว้ใน Artifact Registry

สร้างไดเรกทอรีและคัดลอกสคริปต์ของเราที่นี่

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

ตอนนี้เราจะสร้างอิมเมจคอนเทนเนอร์เพื่อปรับแต่งและพุชไปยัง Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. ภาพรวมของ Airflow 2 รวมถึง DAG คืออะไร

Airflow เป็นแพลตฟอร์มสำหรับควบคุมเวิร์กโฟลว์และไปป์ไลน์ข้อมูล โดยจะใช้ DAG (Directed Acyclic Graph) เพื่อกำหนดเวิร์กโฟลว์เหล่านี้ในโค้ด Python ซึ่งจะแสดงงานและความสัมพันธ์ของงานเป็นภาพ

Airflow ที่มี DAG แบบคงที่และคําจํากัดความที่อิงตาม Python เหมาะสําหรับกําหนดเวลาและจัดการเวิร์กโฟลว์ที่กําหนดไว้ล่วงหน้า สถาปัตยกรรมประกอบด้วย UI ที่ใช้งานง่ายสำหรับการตรวจสอบและจัดการเวิร์กโฟลว์เหล่านี้

โดยพื้นฐานแล้ว Airflow ช่วยให้คุณกําหนด กําหนดเวลา และตรวจสอบไปป์ไลน์ข้อมูลได้โดยใช้ Python ซึ่งทําให้ Airflow เป็นเครื่องมือที่ยืดหยุ่นและมีประสิทธิภาพในการประสานงานเวิร์กโฟลว์

13. ภาพรวมของ DAG

ec49964ad7d61491.png

DAG ย่อมาจาก Directed Acyclic Graph ใน Airflow DAG จะแสดงถึงเวิร์กโฟลว์หรือไปป์ไลน์ทั้งหมด โดยจะกําหนดงาน Dependency ของงาน และลําดับการดําเนินการ

หน่วยของเวิร์กโฟลว์ภายใน DAG จะทำงานจากพ็อดในคลัสเตอร์ GKE ซึ่งเริ่มต้นจากการกําหนดค่า Airflow

สรุป:

Airflow: การดาวน์โหลดข้อมูล - สคริปต์นี้จะทําให้กระบวนการรับชุดข้อมูลรีวิวภาพยนตร์จาก Kaggle และจัดเก็บไว้ในที่เก็บข้อมูล GCS เป็นไปโดยอัตโนมัติ เพื่อให้พร้อมสําหรับการประมวลผลหรือการวิเคราะห์เพิ่มเติมในสภาพแวดล้อมระบบคลาวด์

Airflow: การเตรียมข้อมูล - โค้ดจะนําชุดข้อมูลรีวิวภาพยนตร์ดิบ ลบคอลัมน์ข้อมูลที่เกินมาซึ่งไม่จําเป็นสําหรับ Use Case ของเรา และลบชุดข้อมูลที่ไม่มีค่า จากนั้นจะจัดโครงสร้างชุดข้อมูลเป็นรูปแบบการตอบคำถามที่เหมาะกับแมชชีนเลิร์นนิง และจัดเก็บกลับไปไว้ใน GCS เพื่อใช้ภายหลัง

Airflow: Model Finetuning - โค้ดนี้จะปรับแต่งโมเดลภาษาขนาดใหญ่ (LLM) โดยใช้เทคนิคที่เรียกว่า LoRA (Low-Rank Adaptation) จากนั้นบันทึกโมเดลที่อัปเดต โดยเริ่มต้นด้วยการโหลด LLM ที่ฝึกไว้ล่วงหน้าและชุดข้อมูลจาก Google Cloud Storage จากนั้นจะใช้ LoRA เพื่อปรับแต่งโมเดลในชุดข้อมูลนี้อย่างมีประสิทธิภาพ สุดท้าย ระบบจะบันทึกโมเดลที่ปรับแต่งแล้วกลับไปยัง Google Cloud Storage เพื่อใช้ในแอปพลิเคชันต่างๆ เช่น การสร้างข้อความหรือการตอบคำถามในภายหลัง

Airflow: การแสดงโมเดล - แสดงโมเดลที่ปรับแต่งใน GKE ด้วย vllm เพื่อการอนุมาน

Airflow: ลูปความคิดเห็น - การฝึกโมเดลใหม่ทุก xx ครั้ง (รายชั่วโมง รายวัน รายสัปดาห์)

แผนภาพนี้อธิบายวิธีการทำงานของ Airflow 2 เมื่อทำงานใน GKE

8691f41166209a5d.png

14. การปรับแต่งโมเดลกับการใช้ RAG

CodeLab นี้จะปรับแต่ง LLM แทนที่จะใช้ Retrieval Augmented Generation (RAG)

มาดูการเปรียบเทียบแนวทางทั้ง 2 นี้กัน

การปรับแต่ง: สร้างโมเดลเฉพาะ: การปรับแต่งจะปรับ LLM ให้เหมาะกับงานหรือชุดข้อมูลหนึ่งๆ ซึ่งช่วยให้โมเดลทำงานได้อย่างอิสระโดยไม่ต้องอาศัยแหล่งข้อมูลภายนอก

ลดความซับซ้อนในการอนุมาน: ข้อมูลนี้ทำให้ไม่ต้องใช้ระบบการดึงข้อมูลและฐานข้อมูลที่แยกต่างหาก จึงทําให้ระบบตอบสนองได้เร็วและประหยัดค่าใช้จ่ายมากขึ้น โดยเฉพาะสําหรับ Use Case ที่พบบ่อย

RAG: อาศัยความรู้ภายนอก: RAG จะดึงข้อมูลที่เกี่ยวข้องจากฐานความรู้สําหรับคําขอแต่ละรายการ เพื่อให้มั่นใจว่าเข้าถึงข้อมูลที่เป็นปัจจุบันและเฉพาะเจาะจงได้

เพิ่มความซับซ้อน: การใช้ RAG ในสภาพแวดล้อมที่ใช้งานจริง เช่น คลัสเตอร์ Kubernetes มักเกี่ยวข้องกับ Microservice หลายรายการสำหรับการประมวลผลและการเรียกข้อมูล ซึ่งอาจทำให้เวลาในการตอบสนองและค่าใช้จ่ายในการประมวลผลเพิ่มขึ้น

เหตุผลที่เลือกการปรับแต่ง:

แม้ว่า RAG จะเหมาะกับชุดข้อมูลขนาดเล็กที่ใช้ใน CodeLab นี้ แต่เราเลือกที่จะปรับแต่งเพื่อสาธิตกรณีการใช้งานทั่วไปของ Airflow ตัวเลือกนี้ช่วยให้เรามุ่งเน้นที่ด้านการจัดระเบียบเวิร์กโฟลว์ได้โดยไม่ต้องเจาะลึกรายละเอียดการตั้งค่าโครงสร้างพื้นฐานและไมโครเซอร์วิสเพิ่มเติมสําหรับ RAG

สรุป:

ทั้งการปรับแต่งและ RAG เป็นเทคนิคที่มีประโยชน์ซึ่งมีจุดแข็งและจุดอ่อนเป็นของตัวเอง ตัวเลือกที่ดีที่สุดขึ้นอยู่กับข้อกำหนดเฉพาะของโปรเจ็กต์ เช่น ขนาดและความซับซ้อนของข้อมูล ความต้องการด้านประสิทธิภาพ และการพิจารณาด้านต้นทุน

15. งาน DAG #1 - สร้างขั้นตอนแรกใน Airflow: การดาวน์โหลดข้อมูล

ภาพรวมของหน่วย DAG นี้คือโค้ด Python ที่โฮสต์ในอิมเมจคอนเทนเนอร์จะดาวน์โหลดชุดข้อมูล RottenTomatoes ล่าสุดจาก Kaggle

อย่าคัดลอกโค้ดนี้ลงในที่เก็บข้อมูล GCS เราคัดลอก mlops-dag.py เป็นขั้นตอนสุดท้าย ซึ่งมีขั้นตอนของ DAG Unit ทั้งหมดภายในสคริปต์ Python 1 รายการ

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. งาน DAG #2 - สร้างขั้นตอนที่ 2 ใน Airflow: การเตรียมข้อมูล

ภาพรวมของหน่วย DAG นี้คือเราจะโหลดไฟล์ CSV (rotten_tomatoes_movie_reviews.csv) จาก GCS ไปยัง Pandas DataFrame

ถัดไป เราจะจํากัดจํานวนแถวที่ประมวลผลโดยใช้ DATASET_LIMIT เพื่อการทดสอบและประสิทธิภาพของทรัพยากร และสุดท้ายจะแปลงข้อมูลที่แปลงแล้วเป็นชุดข้อมูล Hugging Face

หากสังเกตดีๆ คุณจะเห็นว่าเรากําลังฝึกโมเดล 1,000 แถวด้วย "DATASET_LIMIT": "1000" เนื่องจากใช้เวลา 20 นาทีใน GPU Nvidia L4

อย่าคัดลอกโค้ดนี้ลงในที่เก็บข้อมูล GCS เราจะคัดลอก mlops-dag.py ในขั้นตอนสุดท้าย ซึ่งมีขั้นตอนทั้งหมดภายในสคริปต์ Python 1 รายการ

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. งาน DAG #3 - สร้างขั้นตอนที่ 3 ใน Airflow: การปรับแต่งโมเดล

ภาพรวมของหน่วย DAG นี้คือ เราจะเรียกใช้ finetune.py เพื่อปรับแต่งโมเดล Gemma ด้วยชุดข้อมูลใหม่

อย่าคัดลอกโค้ดนี้ลงในที่เก็บข้อมูล GCS เราจะคัดลอก mlops-dag.py ในขั้นตอนสุดท้าย ซึ่งมีขั้นตอนทั้งหมดภายในสคริปต์ Python 1 รายการ

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. งาน DAG #4 - สร้างขั้นตอนสุดท้ายใน Airflow: การอนุมาน / การแสดงโมเดล

vLLM เป็นไลบรารีโอเพนซอร์สที่มีประสิทธิภาพซึ่งออกแบบมาเพื่อการอนุมาน LLM ที่มีประสิทธิภาพสูงโดยเฉพาะ เมื่อติดตั้งใช้งานใน Google Kubernetes Engine (GKE) LLM จะใช้ประโยชน์จากความสามารถในการปรับขนาดและความมีประสิทธิภาพของ Kubernetes เพื่อให้บริการ LLM อย่างมีประสิทธิภาพ

สรุปขั้นตอน

  • อัปโหลด DAG "mlops-dag.py" ไปยังที่เก็บข้อมูล GCS
  • คัดลอกไฟล์การกําหนดค่า YAML ของ Kubernetes 2 ไฟล์เพื่อตั้งค่าการอนุมานลงในที่เก็บข้อมูล GCS

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

อัปโหลดสคริปต์ Python (ไฟล์ DAG) รวมถึงไฟล์ Manifest ของ Kubernetes ไปยังที่เก็บข้อมูล GCS ของ DAG

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

คุณจะเห็น mlops-dag ใน UI ของ Airflow

  1. เลือก "ยกเลิกการหยุดชั่วคราว"
  2. เลือก DAG ทริกเกอร์เพื่อเรียกใช้วงจร MLOps ด้วยตนเอง

d537281b92d5e8bb.png

เมื่อ DAG เสร็จสมบูรณ์แล้ว คุณจะเห็นเอาต์พุตเช่นนี้ใน UI ของ Airflow

3ed42abf8987384e.png

หลังจากขั้นตอนสุดท้าย คุณสามารถรับข้อมูลปลายทางของโมเดลและส่งพรอมต์เพื่อทดสอบโมเดลได้

รอประมาณ 5 นาทีก่อนออกคําสั่ง curl เพื่อให้การอนุมานโมเดลเริ่มต้นขึ้นได้และตัวจัดสรรภาระงานสามารถกําหนดที่อยู่ IP ภายนอก

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

เอาต์พุต:

19. ยินดีด้วย

คุณได้สร้างเวิร์กโฟลว์ AI รายการแรกโดยใช้ไปป์ไลน์ DAG ด้วย Airflow 2 ใน GKE แล้ว

อย่าลืมยกเลิกการจัดสรรทรัพยากรที่คุณได้ติดตั้งใช้งาน

20. การดำเนินการนี้ในเวอร์ชันที่ใช้งานจริง

แม้ว่า CodeLab จะให้ข้อมูลเชิงลึกที่ยอดเยี่ยมเกี่ยวกับวิธีตั้งค่า Airflow 2 ใน GKE แต่ในชีวิตจริงคุณอาจต้องพิจารณาหัวข้อต่อไปนี้เมื่อทําเช่นนี้ในเวอร์ชันที่ใช้งานจริง

ใช้เว็บฟรอนต์เอนด์โดยใช้ Gradio หรือเครื่องมือที่คล้ายกัน

กำหนดค่าการตรวจสอบแอปพลิเคชันอัตโนมัติสำหรับภาระงานด้วย GKE ที่นี่ หรือส่งออกเมตริกจาก Airflow ที่นี่

คุณอาจต้องใช้ GPU ขนาดใหญ่ขึ้นเพื่อปรับแต่งโมเดลให้ละเอียดยิ่งขึ้นได้เร็วขึ้น โดยเฉพาะในกรณีที่มีชุดข้อมูลขนาดใหญ่ อย่างไรก็ตาม หากต้องการฝึกโมเดลใน GPU หลายตัว เราต้องแยกชุดข้อมูลและแบ่งการฝึก ต่อไปนี้คือคําอธิบายของ FSDP ด้วย PyTorch (การแยกข้อมูลแบบขนานทั้งหมดโดยใช้การแชร์ GPU เพื่อให้บรรลุเป้าหมาย อ่านเพิ่มเติมได้ในบล็อกโพสต์จาก Meta และอีกโพสต์ในบทแนะนำเกี่ยวกับ FSDP โดยใช้ Pytorch

Google Cloud Composer เป็นบริการ Airflow ที่มีการจัดการ คุณจึงไม่ต้องดูแลรักษา Airflow เอง เพียงติดตั้งใช้งาน DAG ก็พร้อมใช้งาน

ดูข้อมูลเพิ่มเติม

ใบอนุญาต

ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์สำหรับยอมรับสิทธิของผู้สร้าง (Creative Commons Attribution License) 2.0 ทั่วไป