1. نظرة عامة
يوضّح هذا الدليل التعليمي كيفية دمج ممارسات DevOps في تعلُّم الآلة (MLOps) من خلال تنزيل مجموعة بيانات وتحسين نموذج ونشر نموذج تعلُّم الآلة الكبير (LLM) على Google Kubernetes Engine (GKE) باستخدام مخطط DAG في Airflow مع أقل قدر من التجريد. نتيجةً لذلك، سنستخدم أوامر gcloud وليس terraform حتى تتمكّن من اتّباع الخطوات في المختبر وفهم كل عملية بسهولة من منظور كلّ من مهندس المنصة ومهندس تعلُّم الآلة.
سيرشدك هذا الدليل العملي إلى كيفية الاستفادة من Airflow لتبسيط سير عمل الذكاء الاصطناعي، مع تقديم عرض توضيحي واضح وعملي لدورة حياة MLOps بالكامل من خلال ضبط مخطط DAG.
المُعطيات
- تعزيز التعاون والتفاهم بشكل أكبر بين مهندسي المنصة وتكنولوجيات تعلُّم الآلة من خلال إزالة الحواجز التي تقف في طريق المعرفة وتحسين سير العمل
- التعرّف على كيفية نشر Airflow 2 واستخدامه وإدارته على GKE
- ضبط DAG في Airflow من البداية إلى النهاية
- إنشاء الأساس لأنظمة تعلُّم الآلة المناسبة للاستخدام في مرحلة الإنتاج باستخدام "Google Kubernetes Engine"
- استخدام أنظمة تعلُّم الآلة وتشغيلها
- فهم كيفية تطوّر هندسة المنصات لتصبح ركيزة دعم مهمة لعملية MLOps
النتائج التي يحقّقها هذا الدرس التطبيقي حول الترميز
- يمكنك طرح أسئلة عن الأفلام من نموذج لغوي كبير جدًا (LLM) عدّلناه استنادًا إلى Gemma-2-9b-it، والذي يتم عرضه في GKE باستخدام نموذج لغوي كبير جدًا متغيّر (vLLM).
Target Audience
- مهندسو تعلُّم الآلة
- مهندسو المنصات
- علماء البيانات
- مهندسو البيانات
- مهندسو DevOps
- مهندس المنصة
- مهندسو إرشاد العملاء المتعاقدين
لا يهدف هذا الدرس التطبيقي حول الترميز إلى
- كمقدمة عن سير عمل GKE أو الذكاء الاصطناعي/تعلُّم الآلة
- كنظرة عامة على مجموعة ميزات Airflow بالكامل
2. مساعدة مهندسي/علماء تعلُّم الآلة من خلال هندسة المنصّات
إنّ هندسة المنصات وعمليات MLOps هما مجالان مترابطان يتعاونان لإنشاء بيئة فعّالة وقوية لتطوير تعلُّم الآلة ونشره.
النطاق: يشمل مجال هندسة المنصات نطاقًا أوسع من مجال MLOps، إذ يشمل دورة تطوير البرامج بالكامل ويوفّر الأدوات والبنية الأساسية له.
تُسدِّد منصّة MLOps الفجوة بين تطوير تعلُّم الآلة ونشره واستنتاجه.
الخبرة: عادةً ما يمتلك مهندسو المنصات خبرة قوية في تكنولوجيات البنية الأساسية، مثل الحوسبة السحابية واستخدام الحاويات وإدارة البيانات.
يتخصّص مهندسو MLOps في تطوير نماذج تعلُّم الآلة ونشرها ومراقبتها، وغالبًا ما يكونون على دراية بعلوم البيانات وهندسة البرامج.
الأدوات: ينشئ مهندسو المنصات أدوات لتوفير البنية الأساسية وإدارة الإعدادات وتنسيق الحِزم وإنشاء إطار عمل للتطبيقات. يستخدم مهندسو MLOps أدوات لتدريب نماذج تعلُّم الآلة والتجربة والنشر والمراقبة وإدارة الإصدارات.
3- متطلبات إعداد Google Cloud
إعداد البيئة حسب السرعة التي تناسبك
- سجِّل الدخول إلى Google Cloud Console وأنشئ مشروعًا جديدًا أو أعِد استخدام مشروع حالي. إذا لم يكن لديك حساب على Gmail أو Google Workspace، عليك إنشاء حساب.
- اسم المشروع هو الاسم المعروض للمشاركين في هذا المشروع. وهي سلسلة أحرف لا تستخدمها واجهات برمجة تطبيقات Google. ويمكنك تعديلها في أي وقت.
- يكون معرّف المشروع فريدًا في جميع مشاريع Google Cloud وغير قابل للتغيير (لا يمكن تغييره بعد ضبطه). تنشئ وحدة تحكّم Cloud Console سلسلة فريدة تلقائيًا، ولا يهمّك عادةً معرفة محتواها. في معظم مختبرات رموز البرامج، ستحتاج إلى الإشارة إلى معرّف المشروع (يُعرَف عادةً باسم
PROJECT_ID
). إذا لم يعجبك المعرّف الذي تم إنشاؤه، يمكنك إنشاء معرّف آخر عشوائي. يمكنك بدلاً من ذلك تجربة عنوانك الخاص لمعرفة ما إذا كان متاحًا. ولا يمكن تغييره بعد هذه الخطوة ويبقى ساريًا طوال مدة المشروع. - يُرجى العِلم أنّ هناك قيمة ثالثة، وهي رقم المشروع، تستخدمها بعض واجهات برمجة التطبيقات. اطّلِع على مزيد من المعلومات عن كلّ من هذه القيم الثلاث في المستندات.
- بعد ذلك، عليك تفعيل الفوترة في Cloud Console لاستخدام موارد/واجهات برمجة تطبيقات Cloud. لن تتطلّب المشاركة في هذا الدليل التعليمي البرمجي أي تكلفة، أو قد تتطلّب تكلفة بسيطة. لإيقاف الموارد لتجنُّب تحصيل رسوم بعد انتهاء هذا الدليل التعليمي، يمكنك حذف الموارد التي أنشأتها أو حذف المشروع. يكون مستخدمو Google Cloud الجدد مؤهّلين للاستفادة من برنامج الفترة التجريبية المجانية التي تبلغ قيمتها 300 دولار أمريكي.
بدء Cloud Shell
على الرغم من أنّه يمكن تشغيل Google Cloud عن بُعد من الكمبيوتر المحمول، ستستخدم في هذا الإصدار التجريبي من "مختبر رموز Google" Cloud Shell، وهي بيئة سطر أوامر تعمل في السحابة الإلكترونية.
تفعيل Cloud Shell
- من Cloud Console، انقر على تفعيل Cloud Shell
.
إذا كانت هذه هي المرة الأولى التي تبدأ فيها Cloud Shell، ستظهر لك شاشة وسيطة توضّح ماهيتها. إذا ظهرت لك شاشة وسيطة، انقر على متابعة.
من المفترض ألا يستغرق توفير Cloud Shell والاتصال بها سوى بضع لحظات.
تم تحميل هذه الآلة الافتراضية بجميع أدوات التطوير اللازمة. ويقدّم هذا الدليل دليلاً منزليًا دائمًا بسعة 5 غيغابايت ويتم تشغيله في Google Cloud، ما يُحسِّن بشكل كبير أداء الشبكة والمصادقة. يمكن تنفيذ الكثير من عملك في هذا الدليل التعليمي للترميز، إن لم يكن كلّه، باستخدام متصفّح.
بعد الاتصال بخدمة Cloud Shell، من المفترض أن تظهر لك رسالة تفيد بأنّه تم مصادقة حسابك وأنّه تم ضبط المشروع على معرّف مشروعك.
- نفِّذ الأمر التالي في Cloud Shell لتأكيد مصادقة حسابك:
gcloud auth list
ناتج الأمر
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
- شغِّل الأمر التالي في Cloud Shell للتأكّد من أنّ الأمر gcloud يعرف مشروعك:
gcloud config list project
ناتج الأمر
[core] project = <PROJECT_ID>
إذا لم يكن كذلك، يمكنك تعيينه من خلال هذا الأمر:
gcloud config set project <PROJECT_ID>
ناتج الأمر
Updated property [core/project].
4. الخطوة 1: الاشتراك والمصادقة على Kaggle
لبدء استخدام CodeLab، عليك إنشاء حساب على Kaggle، وهي منصة مجتمعية على الإنترنت لعلماء البيانات وعشاق تعلُّم الآلة وتملكها Google، وتستضيف مستودعًا ضخمًا من مجموعات البيانات المتاحة للجميع في مجالات مختلفة. يمكنك تنزيل مجموعة بيانات RottenTomatoes من هذا الموقع الإلكتروني، وهي مجموعة البيانات المستخدَمة لتدريب النموذج.
- اشترِك في Kaggle، ويمكنك استخدام ميزة "الدخول المُوحَّد من Google" لتسجيل الدخول.
- قبول الأحكام والشروط
- انتقِل إلى "الإعدادات" واحصل على اسم المستخدم username.
- ضمن قسم "واجهة برمجة التطبيقات"، اختَر "إنشاء رمز مميّز جديد من" Kaggle، ما سيؤدي إلى تنزيل kaggle.json.
- إذا واجهت أي مشاكل، يُرجى الانتقال إلى صفحة الدعم هنا.
5- الخطوة 2: الاشتراك في HuggingFace والمصادقة عليه
توفّر HuggingFace موقعًا مركزيًا لأي شخص للتفاعل مع تكنولوجيا تعلُّم الآلة. تستضيف المنصة 900 ألف نموذج و200 ألف مجموعة بيانات و300 ألف تطبيق تجريبي (مساحات)، وجميعها مفتوحة المصدر ومتاحة للجميع.
- اشترِك في HuggingFace - أنشئ حسابًا باستخدام اسم مستخدم، ولا يمكنك استخدام الدخول المُوحَّد من Google.
- تأكيد عنوان بريدك الإلكتروني
- يُرجى الانتقال إلى هذا الرابط وقبول ترخيص نموذج Gemma-2-9b-it.
- يمكنك إنشاء رمز مميّز في HuggingFace هنا.
- سجِّل بيانات اعتماد الرمز المميّز، لأنّك ستحتاج إليها لاحقًا.
6- الخطوة 3: إنشاء موارد البنية الأساسية المطلوبة في Google Cloud
ستُعدّ GKE وGCE وArtifact Registry وتطبّق أدوار IAM باستخدام توحيد هوية حمولة العمل.
يستخدم سير عمل الذكاء الاصطناعي مجموعتَي عقدتَين، إحداهما للتدريب والأخرى للاستنتاج. يستخدم تجمع العقد المخصّص للتدريب جهازًا افتراضيًا من Google Compute Engine من فئة g2-standard-8 مزوّدًا بوحدة معالجة رسومات Nvidia L4 Tensor Core. يستخدم تجمع العقد لاستنتاج البيانات جهازًا افتراضيًا من نوع g2-standard-24 مزوّدًا بوحدتَي معالجة رسومات Nvidia L4 Tensor Core. أثناء تحديد المنطقة، اختَر منطقة تتوفّر فيها وحدة معالجة الرسومات المطلوبة ( الرابط).
في Cloud Shell، نفِّذ الأوامر التالية:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
إنشاء ملفات بيان YAML
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
إنشاء 3 حِزم في Google Cloud Storage
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7- الخطوة 4: تثبيت Airflow على GKE من خلال مخطّط helm
ننشر الآن Airflow 2 باستخدام Helm. Apache Airflow هي منصّة إدارة سير العمل مفتوحة المصدر لعمليات هندسة البيانات. سنتناول مجموعة ميزات Airflow 2 لاحقًا.
ملف values.yaml لرسم بياني helm في Airflow
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
نشر Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. الخطوة 5: بدء Airflow باستخدام الاتصالات والمتغيّرات
بعد نشر Airflow 2، يمكننا البدء في ضبط إعداداته. نحدّد بعض المتغيّرات التي تقرأها نصوص Python البرمجية.
- الوصول إلى واجهة مستخدم Airflow على المنفذ 8080 باستخدام المتصفّح
الحصول على عنوان IP الخارجي
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
افتح متصفّح ويب وانتقِل إلى http://<EXTERNAL-IP>:8080 . كلمة المرور هي admin / admin
- أنشئ اتصالاً تلقائيًا بخدمة Google Cloud Platform ضمن واجهة مستخدم Airflow، لذلك انتقِل إلى "المشرف" ← "عمليات الربط" ← + "إضافة سجلّ جديد".
- معرّف الاتصال: google_cloud_default
- نوع الاتصال: Google Cloud
النقر على "حفظ"
- أنشئ المتغيّرات المطلوبة، لذلك انتقِل إلى "المشرف" ثم "المتغيّرات" ثم "+ إضافة سجلّ جديد".
- المفتاح: BUCKET_DATA_NAME - القيمة: نسخ من echo $BUCKET_DATA_NAME
- المفتاح: GCP_PROJECT_ID - القيمة: نسخ من echo $DEVSHELL_PROJECT_ID
- المفتاح: HF_TOKEN - القيمة: إدراج رمز HF المميّز
- المفتاح: KAGGLE_USERNAME - القيمة: أدخِل اسم المستخدم في Kaggle
- المفتاح: KAGGLE_KEY - القيمة: انسخ هذا من kaggle.json
انقر على "حفظ" بعد كل زوج مفتاح/قيمة.
من المفترض أن يظهر واجهة المستخدم على النحو التالي:
9. حاوية رمز التطبيق رقم 1: تنزيل البيانات
في هذا النص البرمجي بتنسيق Python، نُجري مصادقة مع Kaggle لتنزيل مجموعة البيانات إلى حزمة GCS.
تم وضع النص البرمجي نفسه في حاوية لأنّه يصبح الوحدة رقم 1 من DAG، ونتوقع أن يتم تعديل مجموعة البيانات بشكل متكرّر، لذا نريد إجراء ذلك بشكل آلي.
أنشئ دليلاً وانسخ النصوص البرمجية هنا.
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
ملف شامل
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
الآن سننشئ صورة حاوية لتنزيل مجموعة البيانات وندفعها إلى "مستودع العناصر".
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. حاوية رمز التطبيق رقم 2: تحضير البيانات
خلال خطوة إعداد البيانات، نحقّق ما يلي:
- تحديد مقدار مجموعة البيانات التي نريد استخدامها لتحسين النموذج الأساسي
- تحميل مجموعة البيانات، أي قراءة ملف CSV في إطار بيانات Pandas الذي يمثّل بنية بيانات ثنائية الأبعاد للصفوف والأعمدة
- تحويل البيانات / المعالجة المُسبَقة: تحديد أجزاء مجموعة البيانات غير ذات الصلة من خلال تحديد ما نريد الاحتفاظ به، ما يؤدي في الواقع إلى إزالة الباقي
- تطبِّق الدالة
transform
على كل صف من إطار البيانات. - حفظ البيانات التي تم إعدادها مرة أخرى في حزمة GCS
أنشئ دليلاً وانسخ النصوص البرمجية هنا.
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
ملف شامل
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. حاوية رمز التطبيق رقم 3: التحسين
في ما يلي، نستخدم Gemma-2-9b-it كنموذج أساسي، ثمّ نُجري تحسينات عليه باستخدام مجموعة البيانات الجديدة.
في ما يلي تسلسل الخطوات التي تحدث أثناء خطوة التحسين.
1. الإعداد: استيراد المكتبات وتحديد المَعلمات (للنموذج والبيانات والتدريب) وتحميل مجموعة البيانات من Google Cloud Storage
2- تحميل النموذج: يمكنك تحميل نموذج لغوي مدرَّب مسبقًا مع التقطيع لزيادة الكفاءة، وتحميل أداة تقسيم الكلمات المقابلة.
3. ضبط LoRA: يمكنك ضبط تقنية Low-Rank Adaptation (LoRA) لتحسين النموذج بكفاءة من خلال إضافة مصفوفات صغيرة قابلة للتدريب.
4. التدريب: حدِّد مَعلمات التدريب واستخدِم SFTTrainer
لتحسين النموذج على مجموعة البيانات المحمَّلة باستخدام نوع التكثيف FP16.
5- الحفظ والتحميل: يمكنك حفظ النموذج المحسَّن وبرنامج تقسيم النصوص محليًا، ثم تحميلهما إلى حزمة GCS.
بعد ذلك، ننشئ صورة حاوية باستخدام Cloud Build ونخزّنها في Artifact Registry.
أنشئ دليلاً وانسخ النصوص البرمجية هنا.
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
ملف شامل
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
الآن سننشئ صور حاويات لتحسينها ودفعها إلى "مستودع العناصر".
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. نظرة عامة على Airflow 2، بما في ذلك ما هو DAG
Airflow هي منصّة لتنسيق سير العمل ومسارات البيانات. ويستخدم هذا الأسلوب الرسوم البيانية التوجيهية غير الحلقية لتحديد هذه سير العمل في رمز Python، ما يمثّل بصريًا المهام وتبعياتها.
إنّ أداة Airflow، مع مخططات DAG الثابتة والتعريفات المستندة إلى Python، مناسبة تمامًا لجدولة سير العمل المحدّد مسبقًا وإدارته. تتضمّن بنيته واجهة مستخدِم سهلة الاستخدام لمراقبة عمليات سير العمل هذه وإدارتها.
في الأساس، يتيح لك Airflow تحديد مسارات البيانات وجداول أعمالها ومراقبتها باستخدام لغة Python، ما يجعله أداة مرنة وفعّالة لتنسيق سير العمل.
13. نظرة عامة على "الدليل الإداري للإعلانات"
يشير DAG إلى الرسم البياني الحلقي المُوجَّه، وفي Airflow يمثّل DAG نفسه سير العمل أو مسار الإحالة الناجحة بالكامل. ويحدِّد المهام وتبعياتها وترتيب التنفيذ.
يتم تنفيذ وحدات سير العمل ضمن DAG من مجموعة في مجموعة GKE، ويتم تشغيلها من إعدادات Airflow.
الملخص:
Airflow: تنزيل البيانات: يعمل هذا النص البرمجي على التشغيل الآلي لعملية الحصول على مجموعة بيانات مراجعات الأفلام من Kaggle وتخزينها في حزمة GCS، ما يجعلها متاحة بسهولة لإجراء المزيد من المعالجة أو التحليل في بيئة السحابة الإلكترونية.
Airflow: تحضير البيانات: تأخذ الرموز البرمجية مجموعة بيانات مراجعات الأفلام الأوّلية وتزيل أعمدة البيانات غير الضرورية لحالة الاستخدام التي نستخدمها وتحذف مجموعات البيانات التي تحتوي على قيم غير متوفّرة. بعد ذلك، يتم تنظيم مجموعة البيانات في تنسيق يناسب تعلُّم الآلة للإجابة عن الأسئلة، ويتم تخزينها مرة أخرى في GCS لاستخدامها لاحقًا.
Airflow: تحسين النموذج: يعمل هذا الرمز على تحسين نموذج لغوي كبير (LLM) باستخدام تقنية تُعرف باسم LoRA (Low-Rank Adaptation)، ثم يحفظ النموذج المعدَّل. يبدأ الإجراء بتحميل نموذج لغوي كبير مُدرَّب مسبقًا ومجموعة بيانات من Google Cloud Storage. بعد ذلك، تطبِّق LoRA لتحسين النموذج بكفاءة في مجموعة البيانات هذه. أخيرًا، يتم حفظ النموذج المحسَّن مرة أخرى في Google Cloud Storage لاستخدامه لاحقًا في تطبيقات مثل إنشاء النصوص أو الإجابة عن الأسئلة.
Airflow: عرض النموذج: عرض النموذج المحسَّن على GKE باستخدام vllm للاستنتاج
Airflow: حلقة التغذية المرتدة: إعادة تدريب النموذج كل xx مرة (ساعة أو يوم أو أسبوع).
يوضّح هذا المخطّط البياني آلية عمل Airflow 2 عند تشغيله على GKE.
14. تحسين نموذج مقابل استخدام الحالة العامة (RAG)
يعمل هذا الدرس التطبيقي على تحسين نموذج اللغة الكبير بدلاً من استخدام ميزة "الإنشاء المعزّز لاسترداد المعلومات" (RAG).
لنقارن بين هذين الأسلوبَين:
التحسين الدقيق: يُنشئ نموذجًا متخصّصًا: يُعدّل التحسين الدقيق النموذج اللغوي الكبير ليتلاءم مع مهمة أو مجموعة بيانات معيّنة، ما يتيح له العمل بشكل مستقل بدون الاعتماد على مصادر بيانات خارجية.
تبسيط الاستنتاج: يزيل ذلك الحاجة إلى نظام استرجاع وقاعدة بيانات منفصلَين، ما يؤدي إلى تقديم ردود أسرع وأرخص، خاصةً في حالات الاستخدام المتكرّرة.
RAG: يعتمد على المعرفة الخارجية: يسترجع RAG المعلومات ذات الصلة من قاعدة معلومات لكل طلب، ما يضمن الوصول إلى بيانات محدّدة وحديثة.
زيادة التعقيد: غالبًا ما يتضمّن تنفيذ ميزة RAG في بيئة الإنتاج، مثل مجموعة Kubernetes، عدة خدمات صغيرة لمعالجة البيانات واستردادها، ما قد يؤدي إلى زيادة وقت الاستجابة وتكاليف الحوسبة.
سبب اختيار التحسين الدقيق:
على الرغم من أنّ RAG قد يكون مناسبًا لمجموعة البيانات الصغيرة المستخدَمة في هذا الدليل التعليمي، اخترنا إجراء تحسينات لإظهار حالة استخدام نموذجية لخدمة Airflow. يتيح لنا هذا الخيار التركيز على جوانب تنسيق سير العمل بدلاً من الخوض في التفاصيل الدقيقة لإعداد بنية أساسية وخدمات صغيرة إضافية لخدمة RAG.
الخاتمة:
إنّ أسلوبَي التحسين الدقيق وRAG هما أسلوبان قيّمان ولكل منهما نقاط قوته ونقاط ضعفه. يعتمد الخيار الأمثل على المتطلبات المحدّدة لمشروعك، مثل حجم بياناتك وتعقيدها واحتياجات الأداء والاعتبارات المتعلقة بالتكلفة.
15. مهمة DAG رقم 1: إنشاء خطوتك الأولى في Airflow: تنزيل البيانات
في ما يلي نظرة عامة على وحدة DAG هذه: يُنزِّل رمز Python المستضاف في صورة حاوية أحدث مجموعة بيانات RottenTomatoes من Kaggle.
لا تنسخ هذا الرمز إلى حزمة GCS. ننسخ mlops-dag.py كخطوة أخيرة، وهو ملف يحتوي على جميع خطوات وحدة DAG ضمن نص برمجي واحد بلغة Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. مهمة DAG رقم 2: إنشاء خطوتك الثانية في Airflow: تحضير البيانات
كنظرة عامة على وحدة DAG هذه، نحمِّل ملف CSV (rotten_tomatoes_movie_reviews.csv) من GCS إلى إطار بيانات Pandas.
بعد ذلك، نحدّ من عدد الصفوف التي تتم معالجتها باستخدام DATASET_LIMIT للاختبار وكفاءة الموارد، وفي النهاية نحوّل البيانات المحوَّلة إلى مجموعة بيانات Hugging Face.
إذا اطّلعت على الأمر بعناية، ستلاحظ أنّنا ندرب 1000 صف في النموذج باستخدام "DATASET_LIMIT": "1000"، وذلك لأنّ إجراء ذلك يستغرق 20 دقيقة على وحدة معالجة الرسومات Nvidia L4.
لا تنسخ هذا الرمز إلى حزمة GCS. ننسخ mlops-dag.py في الخطوة الأخيرة، وهو ملف نصي واحد بلغة Python يتضمّن جميع الخطوات.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. DAG Task #3 - Create your third step on Airflow: Model Finetuning
كنظرة عامة على وحدة مخطط DAG هذه، ننفّذ هنا finetune.py لتحسين نموذج Gemma باستخدام مجموعة البيانات الجديدة.
لا تنسخ هذا الرمز إلى حزمة GCS. ننسخ mlops-dag.py في الخطوة الأخيرة، وهو ملف نصي واحد بلغة Python يتضمّن جميع الخطوات.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18 مهمة DAG رقم 4: إنشاء الخطوة الأخيرة في Airflow: الاستنتاج / عرض النموذج
vLLM هي مكتبة قوية مفتوحة المصدر مصمّمة خصيصًا للاستنتاج عالي الأداء للنماذج اللغوية الكبيرة. عند نشرها على Google Kubernetes Engine (GKE)، تستفيد من قابلية Kubernetes للتكيّف وكفاءتها لتقديم النماذج اللغوية الكبيرة بفعالية.
ملخّص الخطوات:
- حمِّل مخطّط DAG "mlops-dag.py" إلى حزمة GCS.
- انسخ ملفَّي إعدادات بتنسيق YAML في Kubernetes لإعداد الاستنتاج إلى حزمة GCS.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
حمِّل نص Python البرمجي (ملف DAG) بالإضافة إلى ملفات Kubernetes في حزمة GCS الخاصة بـ DAGS.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
في واجهة مستخدم Airflow، سترى mlops-dag.
- انقر على "إلغاء الإيقاف المؤقت".
- اختَر Trigger DAG لتنفيذ دورة MLOps يدوية.
بعد اكتمال مخطّط DAG، ستظهر لك مخرجات على هذا النحو في واجهة مستخدم Airflow.
بعد الخطوة الأخيرة، يمكنك الحصول على نقطة نهاية النموذج وإرسال طلب لاختبار النموذج.
انتظِر 5 دقائق تقريبًا قبل إصدار الأمر curl، حتى يبدأ استنتاج النموذج ويمكن لموازن الحمولة منح عنوان IP خارجي.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
إخراج:
19. تهانينا!
لقد أنشأت سير العمل الأول للذكاء الاصطناعي باستخدام مسار بيانات مخطط DAG مع Airflow 2 على GKE.
لا تنسَ إلغاء توفير الموارد التي تم نشرها.
20. تنفيذ ذلك في مرحلة الإنتاج
على الرغم من أنّ "مختبر رموز Google" قدّم لك إحصاءات رائعة حول كيفية إعداد Airflow 2 على GKE، عليك مراعاة بعض المواضيع التالية في الحياة الواقعية عند إجراء ذلك في مرحلة الإنتاج.
نفِّذ واجهة مستخدم ويب باستخدام Gradio أو أدوات مشابهة.
يمكنك ضبط ميزة "مراقبة التطبيقات تلقائيًا" لأحمال العمل باستخدام GKE هنا أو تصدير المقاييس من Airflow هنا.
قد تحتاج إلى وحدات معالجة رسومات أكبر لتحسين النموذج بشكلٍ أسرع، خاصةً إذا كانت لديك مجموعات بيانات أكبر. ومع ذلك، إذا أردنا تدريب النموذج على وحدات معالجة رسومات متعددة، علينا تقسيم مجموعة البيانات وتقسيم عملية التدريب. في ما يلي شرح لنموذج FSDP مع PyTorch (البيانات المجزّأة بالكامل بشكل موازٍ باستخدام مشاركة وحدة معالجة الرسومات لتحقيق هذا الهدف). يمكنك الاطّلاع على مزيد من المعلومات في منشور مدونة من Meta ومنشور آخر في هذا الدليل التعليمي حول FSDP باستخدام Pytorch.
Google Cloud Composer هي خدمة مُدارة من Airflow، لذا لا تحتاج إلى صيانة Airflow نفسها، ما عليك سوى نشر DAG والبدء في العمل.
مزيد من المعلومات
- مستندات Airflow: https://airflow.apache.org/
الترخيص
يخضع هذا العمل للترخيص العام Creative Commons Attribution 2.0.