1. סקירה כללית
ב-CodeLab הזה תלמדו איך לשלב שיטות DevOps בלמידת מכונה (MLOps) על ידי הורדת מערך נתונים, שיפור מודל ופריסה של LLM ב-Google Kubernetes Engine (GKE) באמצעות DAG של Airflow עם מינימום הפשטה. לכן אנחנו משתמשים בפקודות gcloud ולא ב-Terraform, כדי שתוכלו לבצע את השלבים במעבדה בקלות ולהבין בקלות כל תהליך מנקודת המבט של מהנדס הפלטפורמה ומנקודת המבט של מהנדס למידת המכונה.
במדריך המעשי הזה נסביר איך להשתמש ב-Airflow כדי לייעל את תהליכי העבודה של AI, ונציג הדגמה ברורה ומעשית של מחזור החיים המלא של MLOps באמצעות הגדרת DAG.
מה תלמדו
- שיתוף פעולה טוב יותר והבנה עמוקה יותר בין מהנדסי פלטפורמות לבין מהנדסי למידת מכונה, על ידי פירוק של מחסומי ידע ושיפור תהליכי העבודה
- איך לפרוס, להשתמש ולנהל את Airflow 2 ב-GKE
- הגדרת DAG של Airflow מקצה לקצה
- איך יוצרים את הבסיס למערכות למידת מכונה ברמת הייצור באמצעות GKE
- יצירת כלים למערכות למידת מכונה והפעלתן
- איך הנדסת פלטפורמות הפכה לעמוד תמיכה קריטי ב-MLOps
מה תלמדו ב-CodeLab הזה
- אתם יכולים לשאול שאלות על סרטים באמצעות LLM ששיפרנו על סמך Gemma-2-9b-it, שמוצג ב-GKE עם vLLM.
קהל היעד
- מהנדסי למידת מכונה
- מהנדסי פלטפורמה
- מדעני נתונים
- מהנדסי מערכות מידע
- מהנדסי DevOps
- אדריכלית פלטפורמה
- מהנדסי לקוחות
CodeLab הזה לא מיועד
- כמבוא לתהליכי עבודה ב-GKE או ב-AI/ML
- כסקירה כללית של כל קבוצת התכונות של Airflow
2. מהנדסי פלטפורמה עוזרים למהנדסי למידת מכונה או למדעני נתונים
הנדסת פלטפורמות ו-MLOps הן תחומים תלויים זה בזה, שמשתפים פעולה כדי ליצור סביבה חזקה ויעילה לפיתוח ולפריסה של למידת מכונה.
היקף: להנדסת פלטפורמות יש היקף רחב יותר מ-MLOps, והיא כוללת את כל מחזור החיים של פיתוח התוכנה ומספקת את הכלים והתשתית לכך.
MLOps מגשר על הפער בין פיתוח, פריסה והסקת מסקנות של למידת מכונה.
מומחיות: מהנדסי פלטפורמות בדרך כלל מומחים בטכנולוגיות תשתית כמו מחשוב ענן, ארגון בקונטיינרים וניהול נתונים.
מהנדסי MLOps מתמחים בפיתוח, בפריסה ובמעקב אחרי מודלים של למידת מכונה, ולרוב הם בעלי כישורים בתחומי מדעי הנתונים והנדסת תוכנה.
כלים: מהנדסי פלטפורמות יוצרים כלים לניהול התשתית, לניהול ההגדרות, לתזמור קונטיינרים וליצירת תבניות לאפליקציות. מהנדסי MLOps משתמשים בכלים לאימון, לניסוי, לפריסה, למעקב ולניהול גרסאות של מודלים של למידת מכונה.
3. הגדרה ודרישות של Google Cloud
הגדרת סביבה בקצב אישי
- נכנסים למסוף Google Cloud ויוצרים פרויקט חדש או משתמשים מחדש בפרויקט קיים. אם עדיין אין לכם חשבון Gmail או חשבון Google Workspace, עליכם ליצור חשבון.
- שם הפרויקט הוא השם המוצג של המשתתפים בפרויקט. זוהי מחרוזת תווים שלא משמשת את Google APIs. תמיד תוכלו לעדכן אותו.
- מזהה הפרויקט הוא ייחודי לכל הפרויקטים ב-Google Cloud ואי אפשר לשנות אותו אחרי שמגדירים אותו. מסוף Cloud יוצר מחרוזת ייחודית באופן אוטומטי. בדרך כלל לא משנה מה המחרוזת הזו. ברוב ה-codelabs תצטרכו להפנות למזהה הפרויקט (בדרך כלל מזהים אותו בתור
PROJECT_ID
). אם המזהה שנוצר לא מוצא חן בעיניכם, תוכלו ליצור מזהה אקראי אחר. לחלופין, אפשר לנסות כתובת משלכם ולבדוק אם היא זמינה. לא ניתן לשנות את השם אחרי השלב הזה, והוא יישאר למשך כל תקופת הפרויקט. - לידיעתכם, יש ערך שלישי, מספר פרויקט, שחלק מממשקי ה-API משתמשים בו. מידע נוסף על כל שלושת הערכים האלה זמין במסמכי העזרה.
- בשלב הבא, כדי להשתמש במשאבים או ב-API של Cloud, תצטרכו להפעיל את החיוב במסוף Cloud. השלמת הקודלאב הזה לא תעלה הרבה, אם בכלל. כדי להשבית את המשאבים ולמנוע חיובים אחרי סיום המדריך, אפשר למחוק את המשאבים שיצרתם או למחוק את הפרויקט. משתמשים חדשים ב-Google Cloud זכאים להשתתף בתוכנית תקופת ניסיון בחינם בסך 300$.
הפעלת Cloud Shell
אפשר להפעיל את Google Cloud מרחוק מהמחשב הנייד, אבל בסדנת הקוד הזו נשתמש ב-Cloud Shell, סביבת שורת פקודה שפועלת ב-Cloud.
הפעלת Cloud Shell
- במסוף Cloud, לוחצים על Activate Cloud Shell
.
אם זו הפעם הראשונה שאתם מפעילים את Cloud Shell, יוצג מסך ביניים עם תיאור של השירות. אם יוצג מסך ביניים, לוחצים על Continue (המשך).
תהליך ההקצאה והחיבור ל-Cloud Shell אמור להימשך רק כמה רגעים.
המכונה הווירטואלית הזו כוללת את כל הכלים הנדרשים לפיתוח. יש בה ספריית בית בנפח מתמיד של 5GB והיא פועלת ב-Google Cloud, משפרת מאוד את הביצועים והאימות של הרשת. אפשר לבצע את רוב העבודה ב-codelab הזה, אם לא את כולה, באמצעות דפדפן.
אחרי שתתחברו ל-Cloud Shell, אמורה להופיע הודעה על כך שהאימות בוצע והפרויקט מוגדר לפי מזהה הפרויקט שלכם.
- מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שהאימות בוצע:
gcloud auth list
פלט הפקודה
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
- מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שפקודת gcloud מכירה את הפרויקט:
gcloud config list project
פלט הפקודה
[core] project = <PROJECT_ID>
אם לא, אפשר להגדיר אותו באמצעות הפקודה הבאה:
gcloud config set project <PROJECT_ID>
פלט הפקודה
Updated property [core/project].
4. שלב 1 – הרשמה ואימות ב-Kaggle
כדי להתחיל את CodeLab, צריך ליצור חשבון ב-Kaggle. זוהי פלטפורמה של קהילה אונליין לחוקרי נתונים ולחובבי למידת מכונה בבעלות Google, שמארחת מאגר עצום של מערכי נתונים שזמינים לכולם בתחומים שונים. מהאתר הזה מורידים את מערך הנתונים של RottenTomatoes, שמשמש לאימון המודל.
5. שלב 2 – הרשמה ואימות ב-HuggingFace
HuggingFace הוא מיקום מרכזי לכל מי שרוצה להשתמש בטכנולוגיית למידת מכונה. ב-CoDS יש 900,000 מודלים, 200,000 מערכי נתונים ו-300,000 אפליקציות הדגמה (מרחבים), והכול בקוד פתוח וזמין לכולם.
- נרשמים ל-HuggingFace – יוצרים חשבון עם שם משתמש, אי אפשר להשתמש ב-Google SSO
- אישור כתובת האימייל
- עוברים כאן ומאשרים את הרישיון של המודל Gemma-2-9b-it
- כאן אפשר ליצור טוקן של HuggingFace
- מתעדים את פרטי הכניסה לאסימון, כי תצטרכו אותם בהמשך
6. שלב 3 – יוצרים את משאבי התשתית הנדרשים ב-Google Cloud
תלמדו להגדיר את GKE, GCE ו-Artifact Registry ולהחיל תפקידים ב-IAM באמצעות איחוד שירותי אימות הזהות של עומסי עבודה.
תהליך העבודה של ה-AI כולל שני מאגרי צמתים, אחד לאימון ואחד להסקה. ב-nodepool של האימון נעשה שימוש במכונה וירטואלית מסוג g2-standard-8 של GCE עם GPU אחד של Nvidia L4 Tensor Core. ב-nodepool של ההסקה נעשה שימוש במכונה וירטואלית מסוג g2-standard-24 עם שני מעבדי GPU מסוג Nvidia L4 Tensor Core. כשמציינים את האזור, בוחרים אזור שבו יש תמיכה ב-GPU הנדרש ( קישור).
ב-Cloud Shell, מריצים את הפקודות הבאות:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
יצירת מניפסט YAML
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
יצירת 3 קטגוריות של Google Cloud Storage (GCS)
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. שלב 4 – התקנת Airflow ב-GKE באמצעות תרשים helm
עכשיו אנחנו פורסים את Airflow 2 באמצעות Helm. Apache Airflow היא פלטפורמה לניהול תהליכי עבודה בקוד פתוח לצינורות עיבוד נתונים. בהמשך נרחיב על חבילות התכונות של Airflow 2.
values.yaml לתרשים helm של Airflow
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
פריסת Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. שלב 5 – איך מפעילים את Airflow באמצעות חיבורים ומשתנים
אחרי הפריסה של Airflow 2, נוכל להתחיל להגדיר אותו. אנחנו מגדירים כמה משתנים, שסקריפטים של Python קוראים.
- גישה לממשק המשתמש של Airflow ביציאה 8080 באמצעות הדפדפן
אחזור כתובת ה-IP החיצונית
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
פותחים דפדפן אינטרנט ועוברים לכתובת http://<EXTERNAL-IP>:8080 . שם הכניסה הוא admin / admin
- יוצרים חיבור ברירת מחדל ל-GCP בממשק המשתמש של Airflow. לשם כך, עוברים אל Admin (אדמין) → Connections (חיבורים) → + Add a new record (הוספת רשומה חדשה).
- מזהה החיבור: google_cloud_default
- סוג החיבור: Google Cloud
לוחצים על 'שמירה'.
- יוצרים את המשתנים הנדרשים: עוברים אל 'אדמין' → 'משתנים' → + 'הוספת רשומה חדשה'.
- מפתח: BUCKET_DATA_NAME – ערך: מעתיקים מ-echo $BUCKET_DATA_NAME
- מפתח: GCP_PROJECT_ID – ערך: העתקה מ-echo $DEVSHELL_PROJECT_ID
- מפתח: HF_TOKEN – ערך: מזינים את אסימון ה-HF
- מפתח: KAGGLE_USERNAME – ערך: מזינים את שם המשתמש ב-Kaggle
- מפתח: KAGGLE_KEY – ערך: מעתיקים את זה מקובץ kaggle.json
לוחצים על 'שמירה' אחרי כל צמד מפתח/ערך.
ממשק המשתמש אמור להיראות כך:
9. מאגר קוד האפליקציה מס' 1 – הורדת נתונים
בסקריפט הזה ל-Python, אנחנו מבצעים אימות באמצעות Kaggle כדי להוריד את מערך הנתונים לקטגוריה (bucket) שלנו ב-GCS.
הסקריפט עצמו נמצא בקונטיינר כי הוא הופך ליחידה מס' 1 ב-DAG, ואנחנו מצפים שקבוצת הנתונים תתעדכן לעיתים קרובות, ולכן אנחנו רוצים להפוך את התהליך הזה לאוטומטי.
יוצרים ספרייה ומעתיקים אליה את הסקריפטים שלנו
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
קובץ Docker
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
עכשיו יוצרים קובץ אימג' בקונטיינר להורדת מערך נתונים ומעבירים אותו ל-Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. מאגר קוד האפליקציה מס' 2 – הכנת נתונים
בשלב הכנת הנתונים, אנחנו מבצעים את הפעולות הבאות:
- מציינים את החלק של מערך הנתונים שבו רוצים להשתמש לצורך שיפור המודל הבסיסי
- טעינת מערך הנתונים, כלומר קריאת קובץ ה-CSV ל-DataFrame של Pandas, שהוא מבנה נתונים דו-מימדי של שורות ועמודות.
- טרנספורמציה של נתונים / עיבוד מקדים – קובעים אילו חלקים ממערך הנתונים לא רלוונטיים על ידי ציון מה אנחנו רוצים לשמור, ובפועל מסירים את שאר החלקים.
- החלת הפונקציה
transform
על כל שורה של DataFrame - שומרים את הנתונים שהוכן בקטגוריה של GCS.
יוצרים ספרייה ומעתיקים אליה את הסקריפטים שלנו
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
קובץ Docker
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. קונטיינר של קוד אפליקציה מס' 3 – שיפור נוסף
כאן אנחנו משתמשים ב-Gemma-2-9b-it כמודל בסיס, ולאחר מכן מבצעים שיפורים באמצעות מערך הנתונים החדש.
זוהי רצף השלבים שמתרחשים במהלך השלב של השיפור.
1. הגדרה: ייבוא ספריות, הגדרת פרמטרים (למודל, לנתונים ולאימון) וטעינת מערך הנתונים מ-Google Cloud Storage.
2. טעינת מודל: טעינת מודל שפה שעבר אימון מקדים עם קצירה לצורך יעילות, וטעינת ה-tokenizer התואם.
3. מגדירים את LoRA: מגדירים התאמה ברמה נמוכה (LoRA) כדי לשפר את המודל ביעילות על ידי הוספת מטריצות קטנות שניתן לאמן.
4. אימון: מגדירים את פרמטרים האימון ומשתמשים ב-SFTTrainer
כדי לבצע שיפור מדויק של המודל במערך הנתונים שנטען באמצעות סוג הקידוד FP16.
5. שמירה והעלאה: שומרים את המודל המשופר ואת ה-tokenizer באופן מקומי, ולאחר מכן מעלים אותם לקטגוריה (bucket) שלנו ב-GCS.
לאחר מכן יוצרים קובץ אימג' בקונטיינר באמצעות Cloud Build ומאחסנים אותו ב-Artifact Registry.
יוצרים ספרייה ומעתיקים אליה את הסקריפטים שלנו
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
קובץ Docker
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
עכשיו יוצרים קובץ אימג' בקונטיינר לצורך כוונון מדויק ומעבירים אותו ל-Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. סקירה כללית על Airflow 2, כולל הסבר על DAG
Airflow היא פלטפורמה לתזמור תהליכי עבודה וצינורות עיבוד נתונים. המערכת משתמשת ב-DAG (גרף אציקלי מכוון) כדי להגדיר את תהליכי העבודה האלה בקוד Python, ומציגה באופן חזותי את המשימות ואת יחסי התלות ביניהן.
Airflow, עם ה-DAGs הסטטיים וההגדרות שמבוססות על Python, מתאים מאוד לתזמון ולניהול של תהליכי עבודה מוגדרים מראש. הארכיטקטורה שלו כוללת ממשק משתמש ידידותי למעקב אחרי תהליכי העבודה האלה ולניהול שלהם.
בעיקרון, Airflow מאפשר לכם להגדיר, לתזמן ולנטר את צינורות עיבוד הנתונים באמצעות Python, מה שהופך אותו לכלי גמיש ויעיל לתזמור תהליכי עבודה.
13. סקירה כללית על ה-DAG שלנו
ראשי התיבות DAG הן Directed Acyclic Graph (גרף אציקלי מנוהל). ב-Airflow, DAG מייצג את תהליך העבודה או צינור עיבוד הנתונים כולו. הוא מגדיר את המשימות, יחסי התלות ביניהן ואת סדר הביצוע.
יחידות תהליך העבודה ב-DAG מופעלות מ-pod באשכול GKE, שמופעלת מההגדרה של Airflow.
סיכום:
Airflow: הורדת נתונים – הסקריפט הזה מאפשר להפוך לאוטומטי את התהליך של אחזור מערך נתונים של ביקורות על סרטים מ-Kaggle ואחסון שלו בקטגוריה של GCS, כך שהוא יהיה זמין לעיבוד או לניתוח נוסף בסביבת הענן.
Airflow: Data Preparation – הקוד לוקח את מערך הנתונים הגולמי של ביקורות הסרטים, מסיר עמודות נתונים מיותרות שלא נדרשות לתרחיש לדוגמה שלנו ומחק מערכי נתונים עם ערכים חסרים. לאחר מכן, המערכת מגדירה את מערך הנתונים בפורמט של שאלות ותשובות שמתאים ללמידת מכונה, ושומרת אותו ב-GCS לשימוש עתידי.
Airflow: Model Finetuning – הקוד הזה מבצע שיפורים ותיקונים במודל שפה גדול (LLM) באמצעות טכניקה שנקראת LoRA (התאמה ברמה נמוכה), ולאחר מכן שומר את המודל המעודכן. התהליך מתחיל בטעינת LLM שהודרכה מראש ומערך נתונים מ-Google Cloud Storage. לאחר מכן, המערכת מחילה את LoRA כדי לבצע התאמה מדויקת של המודל במערך הנתונים הזה. לבסוף, המודל המשופר נשמר ב-Google Cloud Storage לשימוש מאוחר יותר באפליקציות כמו יצירת טקסט או מענה על שאלות.
Airflow: Model Serving – הצגת המודל שעבר שיפורים ב-GKE באמצעות vllm לצורך הסקת מסקנות.
Airflow: Feedback loop – אימון מחדש של המודל בכל xx זמן (שעתי, יומי, שבועי).
בתרשים הזה מוסבר איך Airflow 2 פועל כשמריצים אותו ב-GKE.
14. שינוי מדויק של מודל לעומת שימוש ב-RAG
ב-CodeLab הזה נערך כוונון של LLM במקום שימוש ביצירה משופרת של אחזור (RAG).
נבחן את שתי הגישות האלה:
התאמה אישית: יצירת מודל מותאם אישית: התאמה אישית מאפשרת להתאים את ה-LLM למשימה או למערך נתונים ספציפיים, וכך הוא יכול לפעול באופן עצמאי בלי להסתמך על מקורות נתונים חיצוניים.
הפשטת ההסקה: כך אין צורך במערכת אחזור ובמסד נתונים נפרדים, וכתוצאה מכך התשובות הן מהירות וזולות יותר, במיוחד בתרחישי שימוש נפוצים.
RAG: מסתמך על ידע חיצוני: RAG מאחזר מידע רלוונטי ממאגר ידע לכל בקשה, ומבטיח גישה לנתונים עדכניים וספציפיים.
מורכבות מוגברת: הטמעת RAG בסביבת ייצור כמו אשכול Kubernetes לרוב כוללת כמה מיקרו-שירותים לעיבוד ולאחזור נתונים, וכתוצאה מכך עלולה להגדיל את זמן האחזור ואת עלויות המחשוב.
למה נבחר כוונון מדויק:
ה-RAG מתאים למערך הנתונים הקטן שנעשה בו שימוש ב-CodeLab הזה, אבל בחרנו לבצע שינוי קל כדי להדגים תרחיש לדוגמה של Airflow. הבחירה הזו מאפשרת לנו להתמקד בהיבטים של תזמור תהליכי העבודה, במקום להיכנס לפרטים הקטנים של הגדרת תשתית ומיקרו-שירותים נוספים ל-RAG.
סיכום:
גם כוונון מדויק וגם RAG הן שיטות חשובות שיש להן יתרונות וחסרונות משלהם. הבחירה האופטימלית תלויה בדרישות הספציפיות של הפרויקט, כמו הגודל והמורכבות של הנתונים, צורכי הביצועים ושיקולי העלות.
15. משימה מס' 1 ב-DAG – יצירת השלב הראשון ב-Airflow: הורדת נתונים
כסקירה כללית של יחידת ה-DAG הזו, קוד Python שלנו שמתארח בקובץ אימג' של קונטיינר מוריד את מערך הנתונים העדכני ביותר של RottenTomatoes מ-Kaggle.
אין להעתיק את הקוד הזה לקטגוריה של GCS. בשלב האחרון אנחנו מעתיקים את הקובץ mlops-dag.py, שמכיל את כל השלבים של יחידת ה-DAG בתוך סקריפט Python אחד.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. משימה מס' 2 ב-DAG – יצירת השלב השני ב-Airflow: הכנת נתונים
כסקירה כללית של יחידת ה-DAG הזו, אנחנו מעלים קובץ CSV (rotten_tomatoes_movie_reviews.csv) מ-GCS אל DataFrame של Pandas.
בשלב הבא, אנחנו מגבילים את מספר השורות שמעובדות באמצעות DATASET_LIMIT לצורך בדיקה ויעילות משאבים, ובסוף ממירים את הנתונים שעברו טרנספורמציה למערך נתונים של Hugging Face.
אם תבחינו היטב, תראו שאנחנו מאומנים 1,000 שורות במודל עם "DATASET_LIMIT": "1000", כי זה לוקח 20 דקות ב-GPU של Nvidia L4.
אין להעתיק את הקוד הזה לקטגוריה של GCS. בשלב האחרון מעתיקים את הקובץ mlops-dag.py, שמכיל את כל השלבים בסקריפט Python אחד.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. משימה 3 ב-DAG – יצירת השלב השלישי ב-Airflow: שיפור המודל
כסקירה כללית של יחידת ה-DAG הזו, כאן אנחנו מריצים את finetune.py כדי לשפר את מודל Gemma באמצעות מערך הנתונים החדש שלנו.
אין להעתיק את הקוד הזה לקטגוריה של GCS. בשלב האחרון מעתיקים את הקובץ mlops-dag.py, שמכיל את כל השלבים בסקריפט Python אחד.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. משימה מס' 4 ב-DAG – יצירת השלב האחרון ב-Airflow: יצירת אינטרפולציה / הצגת המודל
vLLM היא ספריית קוד פתוח חזקה שמיועדת במיוחד להסקה של מודלים גדולים של שפה (LLMs) עם ביצועים גבוהים. כשפורסים אותו ב-Google Kubernetes Engine (GKE), הוא מנצל את יכולת ההתאמה לעומס ואת היעילות של Kubernetes כדי לספק LLMs ביעילות.
סיכום השלבים:
- מעלים את ה-DAG 'mlops-dag.py' לקטגוריה ב-GCS.
- מעתיקים שני קובצי תצורה של YAML ב-Kubernetes כדי להגדיר את ההסקה, לקטגוריה ב-GCS.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
מעלים את סקריפט Python (קובץ DAG) ואת המניפסטים של Kubernetes לקטגוריה DAGS ב-GCS.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
בממשק המשתמש של Airflow יופיע mlops-dag.
- בוחרים באפשרות 'ביטול ההשהיה'.
- בוחרים באפשרות Trigger DAG כדי לבצע מחזור ידני של MLOps.
בסיום ה-DAG, יוצג פלט כזה בממשק המשתמש של Airflow.
אחרי השלב האחרון, תוכלו לקבל את נקודת הקצה של המודל ולשלוח הנחיה לבדיקה שלו.
צריך להמתין כ-5 דקות לפני שמפעילים את הפקודה curl, כדי שההסקה של המודל תתחיל ומאזן העומסים יוכל להקצות כתובת IP חיצונית.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
פלט:
19. מעולה!
יצרתם את תהליך העבודה הראשון שלכם ב-AI באמצעות צינור עיבוד נתונים מסוג DAG עם Airflow 2 ב-GKE.
אל תשכחו לבטל את הקצאת המשאבים שפרסמתם.
20. ביצוע הפעולה הזו בסביבת הייצור
ב-CodeLab קיבלתם תובנות נהדרות לגבי הגדרת Airflow 2 ב-GKE, אבל בעולם האמיתי כדאי להביא בחשבון כמה מהנושאים הבאים כשמבצעים את ההגדרה בסביבת הייצור.
הטמעת ממשק אינטרנט קדמי באמצעות Gradio או כלים דומים.
אפשר להגדיר מעקב אוטומטי אחר אפליקציות לעומסי עבודה באמצעות GKE כאן, או לייצא מדדים מ-Airflow כאן.
יכול להיות שתצטרכו כרטיסי GPU גדולים יותר כדי לשפר את המודל מהר יותר, במיוחד אם יש לכם מערכי נתונים גדולים יותר. עם זאת, אם רוצים לאמן את המודל בכמה מערכי GPU, צריך לפצל את מערך הנתונים ולחלק את האימון. בהמשך מוסבר על FSDP עם PyTorch (עיבוד מקבילי של נתונים שמחולקים לחלקים, באמצעות שיתוף GPU כדי להשיג את המטרה הזו. מידע נוסף זמין כאן בפוסט בבלוג של Meta ובמדריך הזה בנושא FSDP באמצעות Pytorch.
Google Cloud Composer הוא שירות מנוהל של Airflow, כך שאין צורך לתחזק את Airflow עצמו. פשוט פורסים את ה-DAG וממשיכים הלאה.
מידע נוסף
- מסמכי התיעוד של Airflow: https://airflow.apache.org/
רישיון
העבודה הזו בשימוש במסגרת רישיון Creative Commons Attribution 2.0 Generic.