1. Genel Bakış
Bu CodeLab'de, en az düzeyde soyutlama içeren bir Airflow DAG'si kullanılarak bir veri kümesi indirme, bir modeli hassaslaştırma ve LLM'yi Google Kubernetes Engine'e (GKE) dağıtma işlemleriyle DevOps uygulamalarının makine öğrenimine (MLOps) nasıl entegre edileceği gösterilmektedir. Bu nedenle, laboratuvardaki adımları adım adım uygulayabilmeniz ve her süreci hem Platform Mühendisi hem de Makine Öğrenimi Mühendisi açısından kolayca anlayabilmeniz için terraform yerine gcloud komutlarını kullanıyoruz.
Bu uygulamalı kılavuzda, yapay zeka iş akışlarınızı kolaylaştırmak için Airflow'dan yararlanma konusunda size yol gösterilir. Ayrıca, bir DAG yapılandırarak MLOps yaşam döngüsünün tamamını net ve pratik bir şekilde gösterir.
Neler öğreneceksiniz?
- Bilgi silosunu ortadan kaldırarak ve iş akışlarını iyileştirerek platform ve makine öğrenimi mühendisleri arasında daha fazla işbirliği ve anlayış geliştirme
- GKE'de Airflow 2'yi dağıtmayı, kullanmayı ve yönetmeyi öğrenin
- Airflow DAG'sini uçtan uca yapılandırma
- GKE ile üretim sınıfı makine öğrenimi sistemlerinin temelini oluşturun
- Makine öğrenimi sistemlerini donatma ve çalıştırma
- Platform Mühendisliği'nin MLOps için nasıl kritik bir destek sütunu haline geldiğini anlama
Bu CodeLab'in amacı
- Gemma-2-9b-it'e göre hassas ayar yaptığımız ve vLLM ile GKE'de sunulan bir LLM'den filmlerle ilgili soru sorabilirsiniz.
Hedef kitle
- Makine Öğrenimi Mühendisleri
- Platform Mühendisleri
- Veri bilimciler
- Veri Mühendisleri
- DevOps Mühendisleri
- Platform Mimarı
- Müşteri Mühendisleri
Bu CodeLab,
- GKE veya AI/ML iş akışlarına giriş olarak
- Airflow özellik grubunun tamamına genel bakış
2. Platform Mühendisliği, makine öğrenimi mühendislerine/bilimcilerine yardımcı olur
Platform mühendisliği ve MLOps, makine öğrenimi geliştirme ve dağıtımı için güçlü ve verimli bir ortam oluşturmak üzere birlikte çalışan birbirine bağlı disiplinlerdir.
Kapsam: Platform mühendisliği, MLOps'ten daha geniş bir kapsama sahiptir. Yazılım geliştirme yaşam döngüsünün tamamını kapsar ve bunun için gerekli araçları ve altyapıyı sağlar.
MLOps, makine öğrenimi geliştirme, dağıtım ve çıkarım arasında köprü görevi görür.
Uzmanlık: Platform mühendisleri genellikle bulut bilişim, kapsayıcı oluşturma ve veri yönetimi gibi altyapı teknolojileri konusunda güçlü bir uzmanlığa sahiptir.
MLOps mühendisleri, makine öğrenimi modeli geliştirme, dağıtma ve izleme konusunda uzmandır ve genellikle veri bilimi ve yazılım mühendisliği becerilerine sahiptir.
Araçlar: Platform mühendisleri, altyapı sağlama, yapılandırma yönetimi, kapsayıcı orkestrasyonu ve uygulama iskeleti için araçlar oluşturur. MLOps mühendisleri, ML modeli eğitimi, deneme, dağıtım, izleme ve sürüm oluşturma için araçlardan yararlanır.
3. Google Cloud Kurulumu ve şartları
Kendine ait tempoda ortam oluşturma
- Google Cloud Console'da oturum açın ve yeni bir proje oluşturun veya mevcut bir projeyi yeniden kullanın. Gmail veya Google Workspace hesabınız yoksa hesap oluşturmanız gerekir.
- Proje adı, bu projenin katılımcılarının görünen adıdır. Google API'leri tarafından kullanılmayan bir karakter dizesidir. Dilediğiniz zaman güncelleyebilirsiniz.
- Proje kimliği, tüm Google Cloud projelerinde benzersizdir ve değiştirilemez (ayarlandıktan sonra değiştirilemez). Cloud Console, benzersiz bir dize otomatik olarak oluşturur. Bu dizenin ne olduğu genellikle önemli değildir. Çoğu kod laboratuvarında proje kimliğinize (genellikle
PROJECT_ID
olarak tanımlanır) referans vermeniz gerekir. Oluşturulan kimliği beğenmezseniz rastgele başka bir kimlik oluşturabilirsiniz. Alternatif olarak, kendi anahtarınızı deneyerek kullanılabilir olup olmadığını görebilirsiniz. Bu adımdan sonra değiştirilemez ve proje boyunca geçerli kalır. - Bazı API'lerin kullandığı üçüncü bir değer (Proje Numarası) olduğunu belirtmek isteriz. Bu üç değer hakkında daha fazla bilgiyi dokümanlar bölümünde bulabilirsiniz.
- Ardından, Cloud kaynaklarını/API'lerini kullanmak için Cloud Console'da faturalandırmayı etkinleştirmeniz gerekir. Bu codelab'i çalıştırmak çok pahalı değildir. Bu eğitimden sonra faturalandırılmamak için kaynakları kapatmak istiyorsanız oluşturduğunuz kaynakları veya projeyi silebilirsiniz. Yeni Google Cloud kullanıcıları 300 ABD doları değerindeki ücretsiz deneme programına uygundur.
Cloud Shell'i başlatma
Google Cloud, dizüstü bilgisayarınızdan uzaktan çalıştırılabilir olsa da bu kod laboratuvarında bulutta çalışan bir komut satırı ortamı olan Cloud Shell'i kullanacaksınız.
Cloud Shell'i etkinleştirme
- Cloud Console'da Cloud Shell'i etkinleştir 'i
tıklayın.
Cloud Shell'i ilk kez başlatıyorsanız Cloud Shell'in ne olduğunu açıklayan bir ara ekran gösterilir. Ara ekran gösterildiyse Devam'ı tıklayın.
Cloud Shell'e bağlanmak ve ortam oluşturmak yalnızca birkaç dakikanızı alır.
Bu sanal makinede, ihtiyaç duyulan tüm geliştirme araçları yüklüdür. 5 GB boyutunda kalıcı bir ana dizin sunar ve Google Cloud'da çalışır. Bu sayede ağ performansını ve kimlik doğrulamayı büyük ölçüde iyileştirir. Bu kod laboratuvarındaki çalışmanızın tamamı olmasa da büyük bir kısmı tarayıcıda yapılabilir.
Cloud Shell'e bağlandıktan sonra kimliğinizin doğrulandığını ve projenin proje kimliğinize ayarlandığını görürsünüz.
- Kimliğinizi doğrulamak için Cloud Shell'de aşağıdaki komutu çalıştırın:
gcloud auth list
Komut çıkışı
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
- gcloud komutunun projeniz hakkında bilgi sahibi olduğunu onaylamak için Cloud Shell'de aşağıdaki komutu çalıştırın:
gcloud config list project
Komut çıkışı
[core] project = <PROJECT_ID>
Aksi takdirde aşağıdaki komutla ayarlayabilirsiniz:
gcloud config set project <PROJECT_ID>
Komut çıkışı
Updated property [core/project].
4. 1. Adım: Kaggle'a kaydolun ve kimlik doğrulaması yapın
CodeLab'e başlamak için Google'ın sahip olduğu, veri bilimcileri ve makine öğrenimi meraklıları için bir online topluluk platformu olan Kaggle'da hesap oluşturmanız gerekir. Kaggle, çeşitli alanlara ait herkese açık veri kümelerinin bulunduğu geniş bir depoya sahiptir. Modelinizi eğitmek için kullanacağınız RottenTomatoes veri kümesini bu siteden indirirsiniz.
- Kaggle'a kaydolun. Oturum açmak için Google TOA'yı kullanabilirsiniz.
- Şartlar ve koşulları kabul et
- Ayarlar'a gidin ve kullanıcı adınızı kullanıcı adı alın.
- API bölümünde, Kaggle'den "Yeni jeton oluştur"u seçin. Bu işlem, kaggle.json dosyasını indirir.
- Sorun yaşarsanız buradaki destek sayfasına gidin.
5. 2. Adım: HuggingFace'ta kaydolup kimlik doğrulama yapın
HuggingFace, herkesin makine öğrenimi teknolojisiyle etkileşime geçebileceği merkezi bir yerdir. Bu platformda 900.000 model, 200.000 veri kümesi ve 300.000 demo uygulaması (Spaces) yer alır. Bunların tümü açık kaynaktır ve herkese açıktır.
- HuggingFace'a kaydolun - Kullanıcı adı içeren bir hesap oluşturun. Google TOA'yı kullanamazsınız.
- E-posta adresinizi onaylama
- Buraya gidip Gemma-2-9b-it modeli için lisansı kabul edin.
- Burada HuggingFace jetonu oluşturun.
- Jeton kimlik bilgilerini kaydedin. Daha sonra bu bilgilere ihtiyacınız olacaktır.
6. 3. Adım: Gerekli Google Cloud altyapı kaynaklarını oluşturun
İş yükü kimliği federasyonunu kullanarak GKE, GCE, Artifact kayıt otoritesini ayarlayacak ve IAM rollerini uygulayacaksınız.
Yapay zeka iş akışınızda biri eğitim, diğeri çıkarım için olmak üzere iki nodepool kullanılır. Eğitim düğüm havuzu, bir Nvidia L4 Tensor Core GPU ile donatılmış bir g2-standard-8 GCE sanal makinesi kullanıyor. Çıkarma düğüm havuzu, iki Nvidia L4 Tensor Core GPU ile donatılmış bir g2-standard-24 sanal makinesi kullanıyor. Bölgeyi belirtirken gerekli GPU'nun desteklendiğini seçin ( Bağlantı).
Cloud Shell'inizde aşağıdaki komutları çalıştırın:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
YAML manifest dosyalarınızı oluşturma
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
3 Google Cloud Storage (GCS) paketi oluşturun
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. 4. Adım: Airflow'u, helm grafiği aracılığıyla GKE'ye yükleyin
Artık Airflow 2'yi Helm kullanarak dağıtıyoruz. Apache Airflow, veri mühendisliği ardışık düzenleri için açık kaynaklı iş akışı yönetim platformudur. Airflow 2'nin özellik grubunu daha sonra ele alacağız.
Airflow helm grafiği için values.yaml
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
Airflow 2'yi dağıtma
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. 5. Adım: Airflow'u Bağlantılar ve Değişkenlerle Başlatma
Airflow 2 dağıtıldıktan sonra yapılandırmaya başlayabiliriz. Python komut dosyalarımız tarafından okunan bazı değişkenler tanımlarız.
- Tarayıcınızla 8080 bağlantı noktasındaki Airflow kullanıcı arayüzüne erişme
Harici IP'yi alma
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
Bir web tarayıcısı açıp http://<EXTERNAL-IP>:8080 adresine gidin . Giriş: admin / admin
- Airflow kullanıcı arayüzünde varsayılan bir GCP bağlantısı oluşturun. Bunun için Yönetici → Bağlantılar → + Yeni kayıt ekle'ye gidin.
- Bağlantı Kimliği: google_cloud_default
- Bağlantı Türü: Google Cloud
Kaydet'i tıklayın.
- Gerekli değişkenleri oluşturun. Bunun için Yönetici → Değişkenler → + Yeni kayıt ekle'ye gidin.
- Anahtar: BUCKET_DATA_NAME - Değer: echo $BUCKET_DATA_NAME'dan kopyalayın
- Anahtar: GCP_PROJECT_ID - Değer: echo $DEVSHELL_PROJECT_ID komutundan kopyalayın
- Anahtar: HF_TOKEN - Değer: HF jetonunuzu girin
- Anahtar: KAGGLE_USERNAME - Değer: Kaggle kullanıcı adınızı girin
- Anahtar: KAGGLE_KEY - Değer: Bunu kaggle.json dosyasından kopyalayın
Her bir anahtar/değer çiftinden sonra Kaydet'i tıklayın.
Kullanıcı arayüzünüz aşağıdaki gibi görünmelidir:
9. Uygulama kodu kapsayıcısı #1 - Veri indirme
Bu Python komut dosyasında, veri kümesini GCS paketimize indirmek için Kaggle ile kimlik doğrulaması yapıyoruz.
Bu, 1. DAG birimi haline geldiğinden ve veri kümesinin sık sık güncellenmesini beklediğimizden komut dosyasının kendisi kapsayıcıya yerleştirilir. Bu nedenle, bu işlemi otomatikleştirmek isteriz.
Dizin oluşturun ve komut dosyalarımızı buraya kopyalayın
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
Şimdi, veri kümesi indirme işlemi için bir kapsayıcı görüntüsü oluşturup Artifact Registry'ye gönderiyoruz.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. Uygulama kodu kapsayıcısı #2 - Veri hazırlama
Veri hazırlama adımımız sırasında şunları yaparız:
- Temel modelimizde hassas ayar yapmak için veri kümesinin ne kadarını kullanmak istediğimizi belirtin
- Veri kümesini yükler, yani CSV dosyasını satır ve sütunlar için 2 boyutlu bir veri yapısı olan Pandas veri çerçevesine okur.
- Veri dönüştürme / ön işleme: Saklamak istediğimizi belirterek veri kümesinin hangi bölümlerinin alakasız olduğunu belirleyin. Bu işlem, geri kalanı kaldırır.
transform
işlevini DataFrame'in her satırına uygular- Hazırlanan verileri GCS paketine geri kaydedin.
Dizin oluşturun ve komut dosyalarımızı buraya kopyalayın
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. Uygulama kodu kapsayıcısı 3: İnce ayar
Burada, temel model olarak Gemma-2-9b-it'i kullanıp ardından yeni veri kümemizle hassas ayar yapıyoruz.
Bunlar, hassas ayar adımı sırasında gerçekleşen adımların sırasıdır.
1. Kurulum: Kitaplıkları içe aktarın, parametreleri (model, veri ve eğitim için) tanımlayın ve veri kümesini Google Cloud Storage'dan yükleyin.
2. Modeli Yükleme: Verimlilik için önceden eğitilmiş bir dil modelini kesme işlemiyle yükleyin ve ilgili dize ayırıcıyı yükleyin.
3. LoRA'yı yapılandırın: Küçük eğitilebilir matrisler ekleyerek modelde etkili bir şekilde ince ayar yapmak için Düşük Sıralı Uyum (LoRA) ayarlayın.
4. Eğit: Eğitim parametrelerini tanımlayın ve FP16 kesme türünü kullanarak yüklü veri kümesinde model üzerinde ince ayar yapmak için SFTTrainer
öğesini kullanın.
5. Kaydetme ve Yükleme: İnce ayarlanmış modeli ve parçalayıcıyı yerel olarak kaydedin, ardından GCS paketimize yükleyin.
Ardından Cloud Build'i kullanarak bir kapsayıcı görüntüsü oluşturup Artifact Registry'de depolarız.
Dizin oluşturun ve komut dosyalarımızı buraya kopyalayın
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
Şimdi hassas ayar yapmak için bir kapsayıcı görüntüsü oluşturup Artifact Registry'ye aktarıyoruz.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. DAG nedir? dahil olmak üzere Airflow 2'ye genel bakış
Airflow, iş akışlarını ve veri ardışık düzenlerini koordine etmek için kullanılan bir platformdur. Bu iş akışlarını Python kodunda tanımlamak için DAG'leri (yönlü düz ağaçlar) kullanır ve görevleri ile bunların bağımlılıklarını görsel olarak temsil eder.
Statik DAG'leri ve Python tabanlı tanımlarıyla Airflow, önceden tanımlanmış iş akışlarını planlamak ve yönetmek için idealdir. Mimarisinde, bu iş akışlarını izlemek ve yönetmek için kullanıcı dostu bir kullanıcı arayüzü bulunur.
Airflow, temel olarak Python'u kullanarak veri ardışık düzenlerinizi tanımlamanıza, planlamanıza ve izlemenize olanak tanır. Bu sayede iş akışı düzenleme için esnek ve güçlü bir araç haline gelir.
13. DAG'ımıza genel bakış
DAG, Yönlü Düz Ağaç anlamına gelir. Airflow'da DAG, iş akışının veya ardışık düzenin tamamını temsil eder. Görevleri, bunların bağımlılıkları ve yürütme sırasını tanımlar.
DAG'daki iş akışı birimleri, Airflow yapılandırmasından başlatılarak GKE kümesindeki bir kapsülden yürütülür.
Özet:
Airflow: Veri indirme: Bu komut dosyası, Kaggle'dan film yorumu veri kümesini alma ve GCS paketinizde depolama işlemini otomatikleştirerek bulut ortamınızda daha fazla işleme veya analiz için hazır hale getirir.
Airflow: Veri Hazırlama: Kod, ham film yorumu veri kümesini alır, kullanım alanımız için gerekli olmayan ek veri sütunlarını kaldırır ve eksik değerler içeren veri kümelerini siler. Ardından, veri kümesini makine öğrenimine uygun bir soru-cevap biçiminde yapılandırır ve daha sonra kullanmak üzere GCS'de tekrar depolar.
Airflow: Model İnce Ayarlama: Bu kod, LoRA (Düşük Sıralı Uyum) adlı bir teknik kullanarak büyük bir dil modelinde (LLM) ince ayar yapar ve ardından güncellenmiş modeli kaydeder. Önceden eğitilmiş bir LLM ve Google Cloud Storage'dan bir veri kümesi yükleyerek başlar. Ardından, bu veri kümesinde modelin ince ayarlarını verimli bir şekilde yapmak için LoRA'yı uygular. Son olarak, hassas ayarlanmış modeli daha sonra metin oluşturma veya soru yanıtlama gibi uygulamalarda kullanmak üzere Google Cloud Storage'a geri kaydeder.
Airflow: Model Yayınlama: İnference için vllm ile hassas ayarlanmış modeli GKE'de yayınlama.
Hava akışı: Geri bildirim döngüsü: Modelin xx zaman aralığında (saatlik, günlük, haftalık) yeniden eğitilmesi.
Bu şemada, Airflow 2'nin GKE'de çalıştırıldığında nasıl çalıştığı açıklanmaktadır.
14. Modeli hassaslaştırma ve RAG kullanma
Bu CodeLab'de, almayla artırılmış üretim (RAG) yerine LLM'de ince ayar yapılır.
Bu iki yaklaşımı karşılaştıralım:
İnce ayar: Özel bir model oluşturur: İnce ayar, LLM'yi belirli bir göreve veya veri kümesine uyarlayarak harici veri kaynaklarına ihtiyaç duymadan bağımsız olarak çalışmasını sağlar.
Çıkarımı basitleştirir: Bu sayede ayrı bir alma sistemi ve veritabanı gerekmez. Bu da özellikle sık kullanılan kullanım alanları için daha hızlı ve daha ucuz yanıtlar sağlar.
RAG: Harici bilgilerden yararlanır: RAG, her istek için bilgi tabanından alakalı bilgileri alır ve güncel ve belirli verilere erişilmesini sağlar.
Karmaşıklığı artırır: RAG'yi Kubernetes kümesi gibi bir üretim ortamında uygulamak genellikle veri işleme ve alma için birden fazla mikro hizmet gerektirir. Bu da gecikme süresini ve hesaplama maliyetlerini artırabilir.
İnce ayar neden tercih edildi?
RAG, bu CodeLab'de kullanılan küçük veri kümesi için uygun olsa da Airflow'un tipik bir kullanım alanını göstermek için hassas ayarlama yapmayı tercih ettik. Bu seçim, RAG için ek altyapı ve mikro hizmetler oluşturmanın inceliklerini incelemek yerine iş akışı koordinasyonu özelliklerine odaklanmamıza olanak tanır.
Sonuç:
Hem hassas ayar hem de RAG, kendine özgü güçlü ve zayıf yönleri olan değerli tekniklerdir. En uygun seçim, projenizin belirli koşullarına (ör. verilerinizin boyutu ve karmaşıklığı, performans ihtiyaçları ve maliyet hususları) bağlıdır.
15. DAG Görevi #1 - Airflow'da ilk adımınızı oluşturun: Veri indirme
Bu DAG birimine genel bakış olarak, bir kapsayıcı görüntüsünde barındırılan Python kodumuz Kaggle'dan en son RottenTomatoes veri kümesini indirir.
Bu kodu GCS paketine kopyalamayın. Son adım olarak, tüm DAG birimi adımlarını tek bir Python komut dosyasında içeren mlops-dag.py dosyasını kopyalıyoruz.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. DAG Görevi #2 - Airflow'da ikinci adımı oluşturun: Veri Hazırlama
Bu DAG birimine genel bakış olarak GCS'den bir CSV dosyasını (rotten_tomatoes_movie_reviews.csv) Pandas DataFrame'e yüklüyoruz.
Ardından, test ve kaynak verimliliği için DATASET_LIMIT kullanılarak işlenen satır sayısını sınırlandırıyoruz ve son olarak dönüştürülmüş verileri Hugging Face veri kümesine dönüştürüyoruz.
Dikkatlice bakarsanız modelde "DATASET_LIMIT": "1000" ile 1000 satır eğittiğimizi görürsünüz. Bunun nedeni, Nvidia L4 GPU'da bunu yapmanın 20 dakika sürmesidir.
Bu kodu GCS paketine kopyalamayın. Son adımda, tüm adımları tek bir Python komut dosyasında içeren mlops-dag.py dosyasını kopyalıyoruz.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. DAG Task #3 - Create your third step on Airflow: Model Finetuning
Bu DAG birimine genel bakış olarak, Gemma modelini yeni veri kümemizle hassaslaştırmak için finetune.py dosyasını burada çalıştırıyoruz.
Bu kodu GCS paketine kopyalamayın. Son adımda, tüm adımları tek bir Python komut dosyasında içeren mlops-dag.py dosyasını kopyalıyoruz.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. DAG Görevi #4: Airflow'da son adımınızı oluşturun: Tahmin / Modeli sunma
vLLM, LLM'lerin yüksek performanslı çıkarımı için özel olarak tasarlanmış güçlü bir açık kaynak kitaplığıdır. Google Kubernetes Engine (GKE) üzerinde dağıtıldığında, LLM'leri etkili bir şekilde sunmak için Kubernetes'in ölçeklenebilirliğinden ve verimliliğinden yararlanır.
Adımların özeti:
- "mlops-dag.py" DAG'sini GCS paketine yükleyin.
- Tahmini ayarlamak için iki Kubernetes YAML yapılandırma dosyasını bir GCS paketine kopyalayın.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
Python komut dosyanızı (DAG dosyası) ve Kubernetes manifestlerini DAGS GCS paketine yükleyin.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
Airflow kullanıcı arayüzünde mlops-dag'ı görürsünüz.
- Duraklatmayı kaldır'ı seçin.
- Manuel bir MLOps döngüsü gerçekleştirmek için DAG'yi tetikle'yi seçin.
DAG'niz tamamlandığında Airflow kullanıcı arayüzünde aşağıdaki gibi bir çıkış görürsünüz.
Son adımdan sonra model uç noktasını alabilir ve modeli test etmek için bir istem gönderebilirsiniz.
Modelin çıkarımı başlayabilsin ve yük dengeleyici harici bir IP adresi atayabilsin diye curl komutunu vermeden önce yaklaşık 5 dakika bekleyin.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
Çıkış:
19. Tebrikler!
GKE'de Airflow 2 ile DAG ardışık düzeni kullanarak ilk yapay zeka iş akışınızı oluşturdunuz.
Dağıttığınız kaynakların temel hazırlığını kaldırmayı unutmayın.
20. Bu işlemi üretimde yapma
CodeLab, Airflow 2'yi GKE'de nasıl ayarlayacağınız konusunda size mükemmel bir fikir vermiştir. Ancak gerçek dünyada bunu üretimde yaparken aşağıdaki konulardan bazılarını göz önünde bulundurmanız gerekir.
Gradio veya benzer araçları kullanarak bir web ön ucu uygulayın.
GKE'deki iş yükleri için otomatik uygulama izlemeyi buradan yapılandırın veya metrikleri Airflow'dan buradan dışa aktarın.
Özellikle daha büyük veri kümeleriniz varsa modelde daha hızlı ince ayar yapmak için daha büyük GPU'lara ihtiyacınız olabilir. Ancak modeli birden fazla GPU'da eğitmek istiyorsak veri kümesini bölmemiz ve eğitimi parçalara ayırmamız gerekir. Aşağıda, bu amaca ulaşmak için GPU paylaşımını kullanarak tamamen parçalara ayrılmış veri paralelliği sağlayan PyTorch ile FSDP'nin açıklaması verilmiştir. Daha fazla bilgiyi Meta'nın blog yayınında ve Pytorch kullanan FSDP konulu bu eğitimde bulabilirsiniz.
Google Cloud Composer, yönetilen bir Airflow hizmetidir. Bu nedenle, Airflow'u kendiniz yönetmeniz gerekmez. DAG'ınızı dağıtmanız yeterlidir.
Daha fazla bilgi
- Airflow Dokümanları: https://airflow.apache.org/
Lisans
Bu çalışma, Creative Commons Attribution 2.0 Genel Amaçlı Lisans ile lisans altına alınmıştır.