ایجاد گردش کار MLOps با Airflow 2 در GKE

1. بررسی اجمالی

852dc8844309ffb8.png

این CodeLab نشان می‌دهد که چگونه می‌توان تمرین‌های DevOps را با یادگیری ماشین (MLOps) با دانلود مجموعه داده، اصلاح یک مدل و استقرار LLM در Google Kubernetes Engine (GKE) با استفاده از DAG جریان هوا با کمترین مقدار انتزاع، ادغام کرد. در نتیجه ما از دستورات gcloud استفاده می‌کنیم و نه terraform تا بتوانید گام به گام آزمایشگاه را دنبال کنید و به راحتی هر فرآیند را از دیدگاه مهندس پلتفرم و مهندس یادگیری ماشین درک کنید.

این راهنمای عملی شما را در استفاده از جریان هوا برای ساده‌سازی گردش‌های کاری هوش مصنوعی راهنمایی می‌کند و با پیکربندی یک DAG نمایشی واضح و عملی از کل چرخه عمر MLOps ارائه می‌کند.

چیزی که یاد خواهید گرفت

  • با تجزیه سیلوهای دانش و بهبود گردش کار، همکاری و تفاهم بیشتر بین پلتفرم و مهندسان یادگیری ماشین را تقویت کنید.
  • نحوه استقرار، استفاده و مدیریت Airflow 2 در GKE را بدانید
  • یک DAG جریان هوا را از انتها به انتها پیکربندی کنید
  • با GKE پایه ای برای سیستم های یادگیری ماشینی درجه تولید بسازید
  • سیستم های یادگیری ماشین را ابزار دقیق و عملیاتی کنید
  • درک کنید که چگونه مهندسی پلتفرم به یک ستون پشتیبانی حیاتی برای MLOها تبدیل شده است

آنچه این CodeLab به آن دست می یابد

  • می‌توانید درباره فیلم‌های یک LLM که ما بر اساس Gemma-2-9b-it به‌خوبی تنظیم کرده‌ایم، که در GKE با vLLM ارائه می‌شود، سؤال بپرسید.

مخاطب هدف

  • مهندسین یادگیری ماشین
  • مهندسین پلتفرم
  • دانشمندان داده
  • مهندسان داده
  • مهندسان DevOps
  • معمار پلتفرم
  • مهندسین مشتری

این CodeLab در نظر گرفته نشده است

  • به عنوان مقدمه ای بر گردش کار GKE یا AI/ML
  • به عنوان یک اجرا از کل مجموعه ویژگی جریان هوا

2. مهندسی پلت فرم به مهندسان/دانشمندان یادگیری ماشین کمک می کند

16635a8284b994c.png

مهندسی پلتفرم و MLOs رشته های وابسته به هم هستند که برای ایجاد یک محیط قوی و کارآمد برای توسعه و استقرار ML با یکدیگر همکاری می کنند.

دامنه: مهندسی پلتفرم دامنه وسیع تری نسبت به MLO ها دارد و کل چرخه عمر توسعه نرم افزار را در بر می گیرد و ابزارها و زیرساخت های آن را فراهم می کند.

MLOps شکاف بین توسعه ML، استقرار و استنتاج را پر می کند.

تخصص: مهندسان پلتفرم معمولاً در فناوری‌های زیرساختی مانند محاسبات ابری، کانتینری‌سازی و مدیریت داده‌ها تخصص بالایی دارند.

مهندسان MLOps در توسعه، استقرار و نظارت بر مدل ML تخصص دارند و اغلب دارای مهارت های علم داده و مهندسی نرم افزار هستند.

ابزارها: مهندسان پلتفرم ابزارهایی را برای تهیه زیرساخت، مدیریت پیکربندی و ارکستراسیون کانتینر و داربست برنامه ایجاد می کنند. مهندسان MLOps از ابزارهایی برای آموزش مدل ML، آزمایش، استقرار، نظارت و نسخه‌سازی استفاده می‌کنند.

3. تنظیمات و الزامات Google Cloud

تنظیم محیط خود به خود

  1. به Google Cloud Console وارد شوید و یک پروژه جدید ایجاد کنید یا از یک موجود استفاده مجدد کنید. اگر قبلاً یک حساب Gmail یا Google Workspace ندارید، باید یک حساب ایجاد کنید .

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • نام پروژه نام نمایشی برای شرکت کنندگان این پروژه است. این یک رشته کاراکتری است که توسط API های Google استفاده نمی شود. همیشه می توانید آن را به روز کنید.
  • شناسه پروژه در تمام پروژه‌های Google Cloud منحصربه‌فرد است و تغییرناپذیر است (پس از تنظیم نمی‌توان آن را تغییر داد). Cloud Console به طور خودکار یک رشته منحصر به فرد تولید می کند. معمولاً برای شما مهم نیست که چیست. در اکثر کدها، باید شناسه پروژه خود را ارجاع دهید (معمولاً با نام PROJECT_ID شناخته می شود). اگر شناسه تولید شده را دوست ندارید، ممکن است یک شناسه تصادفی دیگر ایجاد کنید. از طرف دیگر، می‌توانید خودتان را امتحان کنید، و ببینید آیا در دسترس است یا خیر. پس از این مرحله نمی توان آن را تغییر داد و در طول مدت پروژه باقی می ماند.
  • برای اطلاع شما، یک مقدار سوم وجود دارد، یک شماره پروژه ، که برخی از API ها از آن استفاده می کنند. در مورد هر سه این مقادیر در مستندات بیشتر بیاموزید.
  1. در مرحله بعد، برای استفاده از منابع Cloud/APIها باید صورتحساب را در کنسول Cloud فعال کنید . اجرا کردن از طریق این کد لبه هزینه زیادی نخواهد داشت. برای خاموش کردن منابع برای جلوگیری از تحمیل صورت‌حساب فراتر از این آموزش، می‌توانید منابعی را که ایجاد کرده‌اید حذف کنید یا پروژه را حذف کنید. کاربران جدید Google Cloud واجد شرایط برنامه آزمایشی رایگان 300 دلاری هستند.

Cloud Shell را راه اندازی کنید

در حالی که Google Cloud را می توان از راه دور از لپ تاپ شما کار کرد، در این کد لبه از Cloud Shell استفاده خواهید کرد، یک محیط خط فرمان که در Cloud اجرا می شود.

Cloud Shell را فعال کنید

  1. از Cloud Console، روی Activate Cloud Shell کلیک کنید 853e55310c205094.png .

3c1dabeca90e44e5.png

اگر این اولین باری است که Cloud Shell را راه اندازی می کنید، با یک صفحه میانی روبرو می شوید که آن را توصیف می کند. اگر با یک صفحه میانی مواجه شدید، روی Continue کلیک کنید.

9c92662c6a846a5c.png

تهیه و اتصال به Cloud Shell فقط باید چند لحظه طول بکشد.

9f0e51b578fecce5.png

این ماشین مجازی با تمام ابزارهای توسعه مورد نیاز بارگذاری شده است. این یک فهرست اصلی 5 گیگابایتی دائمی ارائه می‌کند و در Google Cloud اجرا می‌شود، که عملکرد و احراز هویت شبکه را بسیار افزایش می‌دهد. بسیاری از کارهای شما، اگر نه همه، در این کد لبه با مرورگر قابل انجام است.

پس از اتصال به Cloud Shell، باید ببینید که احراز هویت شده اید و پروژه به ID پروژه شما تنظیم شده است.

  1. برای تایید احراز هویت، دستور زیر را در Cloud Shell اجرا کنید:
gcloud auth list

خروجی فرمان

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. دستور زیر را در Cloud Shell اجرا کنید تا تأیید کنید که دستور gcloud از پروژه شما اطلاع دارد:
gcloud config list project

خروجی فرمان

[core]
project = <PROJECT_ID>

اگر اینطور نیست، می توانید آن را با این دستور تنظیم کنید:

gcloud config set project <PROJECT_ID>

خروجی فرمان

Updated property [core/project].

4. مرحله شماره 1 - ثبت نام و احراز هویت در Kaggle

برای شروع CodeLab، باید یک حساب کاربری در Kaggle ایجاد کنید که یک پلت فرم انجمن آنلاین برای دانشمندان داده و علاقه مندان به یادگیری ماشینی است که متعلق به Google است و میزبان یک مخزن وسیع از مجموعه داده های عمومی در دسترس برای دامنه های مختلف است. از این سایت است که مجموعه داده RottenTomatoes را که برای آموزش مدل خود استفاده می شود، دانلود خواهید کرد.

  • در Kaggle ثبت نام کنید، می توانید از Google SSO برای ورود به سیستم استفاده کنید
  • شرایط و ضوابط را بپذیرید
  • به تنظیمات بروید و نام کاربری خود را دریافت کنید
  • در بخش API، "Create new token from" Kaggle را انتخاب کنید که kaggle.json را دانلود می کند.
  • اگر مشکلی دارید، به صفحه پشتیبانی اینجا بروید

5. مرحله شماره 2 - ثبت نام و احراز هویت در HuggingFace

HuggingFace مکانی مرکزی برای هر کسی است که با فناوری یادگیری ماشین درگیر شود. این میزبان 900 هزار مدل، 200 هزار مجموعه داده و 300 هزار برنامه نمایشی (فضاها) است که همگی منبع باز و در دسترس عموم هستند.

  • ثبت نام در HuggingFace - یک حساب کاربری با نام کاربری ایجاد کنید، نمی توانید از Google SSO استفاده کنید
  • آدرس ایمیل خود را تایید کنید
  • به اینجا بروید و مجوز مدل Gemma-2-9b-it را بپذیرید
  • در اینجا یک نشانه HuggingFace ایجاد کنید
  • اعتبار رمز را ضبط کنید، بعداً به آن نیاز خواهید داشت

6. مرحله شماره 3 - منابع زیرساخت Google Cloud مورد نیاز را ایجاد کنید

GKE، GCE، رجیستری مصنوع را راه‌اندازی می‌کنید و نقش‌های IAM را با استفاده از فدراسیون هویت حجم کاری اعمال می‌کنید.

گردش کار هوش مصنوعی شما از دو گره‌پول استفاده می‌کند، یکی برای آموزش و دیگری برای استنتاج. گره‌پول آموزشی از یک GCE VM با استاندارد g2 استفاده می‌کند که مجهز به یک پردازنده گرافیکی Nvidia L4 Tensor Core است. گره استنتاج از یک VM با استاندارد g2 استفاده می کند که مجهز به دو پردازنده گرافیکی Nvidia L4 Tensor Core است. هنگام مشخص کردن منطقه، یکی را انتخاب کنید که در آن GPU مورد نیاز پشتیبانی می‌شود ( پیوند ).

در Cloud Shell خود دستورات زیر را اجرا کنید:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

مانیفست های YAML خود را ایجاد کنید

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

استنباط.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

3 سطل Google Cloud Storage (GCS) ایجاد کنید

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. مرحله 4 - Airflow را از طریق نمودار فرمان روی GKE نصب کنید

اکنون Airflow 2 را با استفاده از Helm مستقر می کنیم. Apache Airflow یک پلت فرم مدیریت گردش کار منبع باز برای خطوط لوله مهندسی داده است. بعداً به مجموعه ویژگی های Airflow 2 خواهیم پرداخت.

values.yaml برای نمودار فرمان جریان هوا

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

راه اندازی جریان هوا 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. مرحله 5 - جریان هوا را با اتصالات و متغیرها راه اندازی کنید

هنگامی که Airflow 2 مستقر شد، می توانیم پیکربندی آن را شروع کنیم. ما متغیرهایی را تعریف می کنیم که توسط اسکریپت های پایتون خوانده می شوند.

  1. با مرورگر خود به رابط کاربری Airflow در پورت 8080 دسترسی پیدا کنید

آی پی خارجی را دریافت کنید

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

یک مرورگر وب باز کنید و به http:// <EXTERNAL-IP> :8080 بروید. ورود به سیستم مدیر / مدیر است

  1. یک اتصال GCP پیش‌فرض در رابط کاربری Airflow ایجاد کنید، بنابراین به Admin → Connections → + Add a record new بروید.
  • شناسه اتصال: google_cloud_default
  • نوع اتصال: Google Cloud

روی ذخیره کلیک کنید.

  1. متغیرهای مورد نیاز را ایجاد کنید، بنابراین به Admin → Variables → + Add a new record بروید
  • کلید: BUCKET_DATA_NAME - مقدار: کپی از echo $BUCKET_DATA_NAME
  • کلید: GCP_PROJECT_ID - مقدار: کپی از echo $DEVSHELL_PROJECT_ID
  • کلید: HF_TOKEN - مقدار: رمز HF خود را وارد کنید
  • کلید: KAGGLE_USERNAME - مقدار: نام کاربری kaggle خود را وارد کنید
  • کلید: KAGGLE_KEY - مقدار: این را از kaggle.json کپی کنید

بعد از هر جفت کلید-مقدار روی Save کلیک کنید.

رابط کاربری شما باید به شکل زیر باشد:

771121470131b5ec.png

9. محفظه کد برنامه شماره 1 - دانلود داده ها

در این اسکریپت پایتون، ما با Kaggle احراز هویت می کنیم تا مجموعه داده را در سطل GCS خود بارگیری کنیم.

خود اسکریپت محفظه است زیرا به واحد شماره 1 DAG تبدیل می‌شود و ما انتظار داریم مجموعه داده‌ها مرتباً به‌روزرسانی شوند، بنابراین می‌خواهیم این را خودکار کنیم.

دایرکتوری ایجاد کنید و اسکریپت های ما را در اینجا کپی کنید

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

مجموعه داده-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

الزامات. txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

اکنون یک تصویر ظرف برای دانلود مجموعه ایجاد می کنیم و آن را به رجیستری Artifact فشار می دهیم

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. ظرف کد برنامه شماره 2 - آماده سازی داده ها

در مرحله آماده سازی داده ها، این چیزی است که ما به آن دست پیدا می کنیم:

  1. مشخص کنید چه مقدار از مجموعه داده ای را که می خواهیم برای تنظیم دقیق مدل پایه خود استفاده کنیم
  2. مجموعه داده را بارگذاری می کند، یعنی فایل CSV را در یک دیتافریم Pandas می خواند که یک ساختار داده دو بعدی برای سطرها و ستون ها است.
  3. تبدیل داده / پیش پردازش - تعیین کنید که کدام بخش از مجموعه داده بی ربط هستند با مشخص کردن آنچه می خواهیم نگه داریم، که در واقع باعث حذف بقیه می شود.
  4. تابع transform را برای هر ردیف از DataFrame اعمال می کند
  5. داده های آماده شده را دوباره در سطل GCS ذخیره کنید

دایرکتوری ایجاد کنید و اسکریپت های ما را در اینجا کپی کنید

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

الزامات. txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. محفظه کد برنامه شماره 3 - تنظیم دقیق

در اینجا ما از Gemma-2-9b-it به عنوان یک مدل پایه استفاده می کنیم و سپس آن را با مجموعه داده جدید خود تنظیم می کنیم.

اینها توالی مراحلی هستند که در مرحله تنظیم دقیق اتفاق می‌افتند.

1. راه‌اندازی: کتابخانه‌ها را وارد کنید، پارامترها را (برای مدل، داده و آموزش) تعریف کنید و مجموعه داده را از Google Cloud Storage بارگیری کنید.

2. مدل بارگذاری: یک مدل زبان از پیش آموزش دیده را با کوانتیزه کردن برای کارایی بارگذاری کنید و توکنایزر مربوطه را بارگذاری کنید.

3. LoRA را پیکربندی کنید: برای تنظیم دقیق مدل با افزودن ماتریس های کوچک قابل آموزش، انطباق با رتبه پایین (LoRA) را تنظیم کنید.

4. Train: پارامترهای آموزشی را تعریف کنید و از SFTTrainer برای تنظیم دقیق مدل بر روی مجموعه داده بارگذاری شده با استفاده از نوع کوانتیزاسیون FP16 استفاده کنید.

5. ذخیره و آپلود: مدل و توکنایزر تنظیم شده را به صورت محلی ذخیره کنید، سپس آنها را در سطل GCS ما آپلود کنید.

سپس با استفاده از Cloud Build یک Container Image ایجاد می کنیم و آن را در Artifact Registry ذخیره می کنیم.

دایرکتوری ایجاد کنید و اسکریپت های ما را در اینجا کپی کنید

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

الزامات. txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

اکنون یک تصاویر ظرف برای تنظیم دقیق ایجاد می کنیم و آن را به رجیستری Artifact فشار می دهیم

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. Airflow 2 Overview inc چیست DAG

جریان هوا یک پلتفرم برای هماهنگی گردش کار و خطوط لوله داده است. از DAG ها (گراف های غیر چرخه ای جهت دار) برای تعریف این گردش های کاری در کد پایتون استفاده می کند و وظایف و وابستگی های آنها را به صورت بصری نشان می دهد.

جریان هوا، با DAG های ثابت و تعاریف مبتنی بر پایتون، برای برنامه ریزی و مدیریت گردش های کاری از پیش تعریف شده مناسب است. معماری آن شامل یک UI کاربر پسند برای نظارت و مدیریت این گردش کار است.

اساساً، Airflow به شما امکان می دهد خطوط لوله داده خود را با استفاده از پایتون تعریف، زمان بندی و نظارت کنید و آن را به ابزاری انعطاف پذیر و قدرتمند برای هماهنگ سازی جریان کار تبدیل می کند.

13. مروری بر DAG ما

ec49964ad7d61491.png

DAG مخفف Directed Acyclic Graph است، در جریان هوا یک DAG خود کل گردش کار یا خط لوله را نشان می دهد. وظایف، وابستگی های آنها و ترتیب اجرا را مشخص می کند.

واحدهای گردش کار در DAG از یک pod در خوشه GKE اجرا می شوند که از پیکربندی جریان هوا آغاز شده است.

خلاصه:

جریان هوا: دانلود داده - این اسکریپت فرآیند دریافت مجموعه داده بررسی فیلم از Kaggle و ذخیره آن در سطل GCS شما را خودکار می کند و آن را برای پردازش یا تجزیه و تحلیل بیشتر در محیط ابری شما به راحتی در دسترس قرار می دهد.

جریان هوا: آماده‌سازی داده - کد مجموعه داده‌های بازبینی فیلم خام را می‌گیرد، ستون‌های داده‌های اضافی را که برای مورد استفاده ما لازم نیستند حذف می‌کند و مجموعه‌های داده با مقادیر گمشده را حذف می‌کند. در مرحله بعد، مجموعه داده را در قالب پاسخگویی به پرسش مناسب برای یادگیری ماشینی ساختار می دهد و آن را برای استفاده بعدی در GCS ذخیره می کند.

جریان هوا: تنظیم دقیق مدل - این کد یک مدل زبان بزرگ (LLM) را با استفاده از تکنیکی به نام LoRA (تطبیق با رتبه پایین) تنظیم می کند و سپس مدل به روز شده را ذخیره می کند. با بارگیری یک LLM از پیش آموزش دیده و مجموعه داده از Google Cloud Storage شروع می شود. سپس، LoRA را برای تنظیم دقیق مدل در این مجموعه داده اعمال می کند. در نهایت، مدل دقیق تنظیم شده را برای استفاده بعدی در برنامه‌هایی مانند تولید متن یا پاسخ به سؤال در Google Cloud Storage ذخیره می‌کند.

جریان هوا: سرویس مدل - ارائه مدل دقیق در GKE با vllm برای استنتاج.

جریان هوا: حلقه بازخورد - بازآموزی مدل هر xx بار (ساعتی، روزانه، هفتگی).

این نمودار نحوه عملکرد Airflow 2 را هنگام اجرا در GKE توضیح می دهد.

8691f41166209a5d.png

14. تنظیم دقیق یک مدل در مقابل استفاده از RAG

این CodeLab یک LLM را به جای استفاده از Retrieval Augmented Generation (RAG) تنظیم می کند.

بیایید این دو رویکرد را با هم مقایسه کنیم:

Finetuning: یک مدل تخصصی ایجاد می کند: Finetuning LLM را با یک کار یا مجموعه داده خاص تطبیق می دهد و به آن اجازه می دهد بدون اتکا به منابع داده خارجی به طور مستقل عمل کند.

استنتاج را ساده می کند: این امر نیاز به سیستم بازیابی جداگانه و پایگاه داده را از بین می برد و در نتیجه پاسخ های سریع تر و ارزان تر را به خصوص برای موارد استفاده مکرر می دهد.

RAG: متکی بر دانش خارجی: RAG اطلاعات مربوطه را از پایگاه دانش برای هر درخواست بازیابی می کند و دسترسی به داده های به روز و خاص را تضمین می کند.

پیچیدگی را افزایش می‌دهد: پیاده‌سازی RAG در یک محیط تولیدی مانند خوشه Kubernetes اغلب شامل چندین ریزسرویس برای پردازش و بازیابی داده‌ها می‌شود که به طور بالقوه باعث افزایش تاخیر و هزینه‌های محاسباتی می‌شود.

چرا تنظیم دقیق انتخاب شد:

در حالی که RAG برای مجموعه داده کوچک مورد استفاده در این CodeLab مناسب است، ما تنظیمات دقیق را برای نشان دادن یک مورد استفاده معمولی برای جریان هوا انتخاب کردیم. این انتخاب به ما اجازه می‌دهد تا بر جنبه‌های ارکستراسیون گردش کار تمرکز کنیم تا اینکه به جزئیات راه‌اندازی زیرساخت‌ها و میکروسرویس‌های اضافی برای RAG بپردازیم.

نتیجه گیری:

Finetuning و RAG هر دو تکنیک های ارزشمندی هستند که دارای نقاط قوت و ضعف خاص خود هستند. انتخاب بهینه به نیازهای خاص پروژه شما مانند اندازه و پیچیدگی داده ها، نیازهای عملکرد و ملاحظات هزینه بستگی دارد.

15. وظیفه شماره 1 DAG - اولین مرحله خود را در جریان هوا ایجاد کنید: دانلود داده

به عنوان یک نمای کلی از این واحد DAG، کد پایتون ما میزبانی شده در یک تصویر ظرف، آخرین مجموعه داده RottenTomatoes را از Kaggle دانلود می کند.

این کد را در سطل GCS کپی نکنید. ما mlops-dag.py را به عنوان آخرین مرحله کپی می کنیم، که شامل تمام مراحل واحد DAG در یک اسکریپت پایتون است.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. وظیفه شماره 2 DAG - مرحله دوم خود را در جریان هوا ایجاد کنید: آماده سازی داده

به عنوان یک نمای کلی از این واحد DAG، ما یک فایل CSV (rotten_tomatoes_movie_reviews.csv) را از GCS در یک Pandas DataFrame بارگذاری می کنیم.

در مرحله بعد، تعداد ردیف های پردازش شده با استفاده از DATASET_LIMIT را برای آزمایش و بهره وری منابع محدود می کنیم و در نهایت داده های تبدیل شده را به مجموعه داده Hugging Face تبدیل می کنیم.

اگر با دقت نگاه کنید، می بینید که ما در حال آموزش 1000 ردیف در مدل با "DATASET_LIMIT": "1000" هستیم، این به این دلیل است که 20 دقیقه در پردازنده گرافیکی Nvidia L4 برای انجام این کار زمان می برد.

این کد را در سطل GCS کپی نکنید. ما در آخرین مرحله، که شامل تمام مراحل در یک اسکریپت پایتون است، mlops-dag.py را کپی می کنیم.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. وظیفه شماره 3 DAG - مرحله سوم خود را در جریان هوا ایجاد کنید: مدل Finetuning

به عنوان یک نمای کلی از این واحد DAG، در اینجا ما finetune.py را برای اصلاح مدل Gemma با مجموعه داده جدید خود اجرا می کنیم.

این کد را در سطل GCS کپی نکنید. ما در آخرین مرحله، که شامل تمام مراحل در یک اسکریپت پایتون است، mlops-dag.py را کپی می کنیم.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. وظیفه شماره 4 DAG - مرحله نهایی خود را در جریان هوا ایجاد کنید: استنتاج / ارائه مدل

vLLM یک کتابخانه منبع باز قدرتمند است که به طور خاص برای استنتاج با کارایی بالا از LLM طراحی شده است. هنگامی که در Google Kubernetes Engine (GKE) مستقر می شود، از مقیاس پذیری و کارایی Kubernetes برای ارائه خدمات موثر به LLM ها استفاده می کند.

خلاصه مراحل:

  • DAG "mlops-dag.py" را در سطل GCS آپلود کنید.
  • برای تنظیم استنتاج، دو فایل پیکربندی Kubernetes YAML را در یک سطل GCS کپی کنید.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

اسکریپت پایتون (فایل DAG) و همچنین نمایشگرهای Kubernetes را در سطل DAGS GCS آپلود کنید.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

در رابط کاربری Airflow خود، mlops-dag را خواهید دید.

  1. لغو مکث را انتخاب کنید.
  2. Trigger DAG را برای انجام یک چرخه MLOps دستی انتخاب کنید.

d537281b92d5e8bb.png

هنگامی که DAG شما تکمیل شد، خروجی مانند این را در رابط کاربری Airflow مشاهده خواهید کرد.

3ed42abf8987384e.png

پس از مرحله نهایی، می توانید نقطه پایانی مدل را بگیرید و یک اعلان برای آزمایش مدل ارسال کنید.

تقریباً 5 دقیقه قبل از صدور دستور curl صبر کنید تا استنتاج مدل شروع شود و متعادل کننده بار بتواند یک آدرس IP خارجی را اختصاص دهد.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

خروجی:

19. تبریک می گویم!

شما اولین گردش کار هوش مصنوعی خود را با استفاده از خط لوله DAG با Airflow 2 در GKE ایجاد کرده اید.

فراموش نکنید که منابعی را که مستقر کرده اید لغو کنید.

20. انجام این کار در تولید

در حالی که CodeLab بینش خارق‌العاده‌ای در مورد نحوه راه‌اندازی Airflow 2 در GKE به شما ارائه کرده است، در دنیای واقعی، هنگام انجام این کار در تولید، باید برخی از موضوعات زیر را در نظر بگیرید.

یک صفحه وب با استفاده از Gradio یا ابزارهای مشابه پیاده سازی کنید.

یا نظارت خودکار برنامه را برای بارهای کاری با GKE در اینجا پیکربندی کنید یا معیارهای سنجش را از Airflow در اینجا صادر کنید.

ممکن است برای تنظیم سریعتر مدل به GPUهای بزرگتری نیاز داشته باشید، به خصوص اگر مجموعه داده های بزرگتری دارید. با این حال، اگر بخواهیم مدل را در چندین GPU آموزش دهیم، باید مجموعه داده را تقسیم کنیم و آموزش را تقسیم کنیم. در اینجا توضیحی در مورد FSDP با PyTorch ارائه شده است (داده های کاملاً خرد شده موازی، با استفاده از اشتراک گذاری GPU برای دستیابی به آن هدف. مطالعه بیشتر را می توانید در اینجا در یک پست وبلاگ از Meta و دیگری در این آموزش در مورد FSDP با استفاده از Pytorch پیدا کنید.

Google Cloud Composer یک سرویس Airflow مدیریت شده است، بنابراین نیازی به حفظ خود Airflow ندارید، فقط DAG خود را مستقر کنید و از آنجا خارج شوید.

بیشتر بدانید

مجوز

این اثر تحت مجوز Creative Commons Attribution 2.0 Generic مجوز دارد.