GKE에서 Airflow 2를 사용하여 MLOps 워크플로 빌드

1. 개요

852dc8844309ffb8.png

이 CodeLab에서는 최소한의 추상화로 Airflow DAG를 사용하여 데이터 세트를 다운로드하고 모델을 미세 조정하며 Google Kubernetes Engine (GKE)에 LLM을 배포하여 DevOps 관행을 머신러닝 (MLOps)에 통합하는 방법을 보여줍니다. 따라서 실험실의 단계를 따라 플랫폼 엔지니어와 머신러닝 엔지니어의 관점에서 각 프로세스를 쉽게 이해할 수 있도록 terraform이 아닌 gcloud 명령어를 사용합니다.

이 실습 가이드에서는 Airflow를 활용하여 AI 워크플로를 간소화하는 방법을 안내하고 DAG를 구성하여 전체 MLOps 수명 주기를 명확하고 실용적으로 보여줍니다.

학습할 내용

  • 지식 격차를 해소하고 워크플로를 개선하여 플랫폼 엔지니어와 머신러닝 엔지니어 간의 협업과 이해를 강화합니다.
  • GKE에서 Airflow 2를 배포, 사용, 관리하는 방법 이해
  • 엔드 투 엔드 Airflow DAG 구성
  • GKE를 사용하여 프로덕션급 머신러닝 시스템의 기반 빌드
  • 머신러닝 시스템 계측 및 운영
  • 플랫폼 엔지니어링이 MLOps의 핵심 지원 축이 된 이유 이해

이 Codelab에서 얻을 수 있는 이점

  • GKE에서 vLLM과 함께 제공되는 Gemma-2-9b-it를 기반으로 미세 조정된 LLM에 영화에 관한 질문을 할 수 있습니다.

타겟 잠재고객

  • 머신러닝 엔지니어
  • 플랫폼 엔지니어
  • 데이터 과학자
  • 데이터 엔지니어
  • DevOps 엔지니어
  • 플랫폼 설계자
  • 고객 엔지니어

이 Codelab은 다음과 같은 용도로 사용하기 위한 것이 아닙니다.

  • GKE 또는 AI/ML 워크플로 소개
  • 전체 Airflow 기능 세트의 개요

2. 플랫폼 엔지니어링이 머신러닝 엔지니어/과학자를 지원함

16635a8284b994c.png

플랫폼 엔지니어링과 MLOps는 ML 개발 및 배포를 위한 강력하고 효율적인 환경을 만들기 위해 협력하는 상호 종속적인 분야입니다.

범위: 플랫폼 엔지니어링은 MLOps보다 범위가 넓으며 전체 소프트웨어 개발 수명 주기를 포괄하고 이에 필요한 도구와 인프라를 제공합니다.

MLOps는 ML 개발, 배포, 추론 간의 격차를 메웁니다.

전문 지식: 플랫폼 엔지니어는 일반적으로 클라우드 컴퓨팅, 컨테이너화, 데이터 관리와 같은 인프라 기술에 대한 전문 지식이 뛰어납니다.

MLOps 엔지니어는 ML 모델 개발, 배포, 모니터링을 전문으로 하며 데이터 과학 및 소프트웨어 엔지니어링 기술을 보유하고 있는 경우가 많습니다.

도구: 플랫폼 엔지니어는 인프라 프로비저닝, 구성 관리, 컨테이너 조정, 애플리케이션 스캐폴딩을 위한 도구를 만듭니다. MLOps 엔지니어는 ML 모델 학습, 실험, 배포, 모니터링, 버전 관리를 위한 도구를 활용합니다.

3. Google Cloud 설정 및 요구사항

자습형 환경 설정

  1. Google Cloud Console에 로그인하여 새 프로젝트를 만들거나 기존 프로젝트를 재사용합니다. 아직 Gmail이나 Google Workspace 계정이 없는 경우 계정을 만들어야 합니다.

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • 프로젝트 이름은 이 프로젝트 참가자의 표시 이름입니다. 이는 Google API에서 사용하지 않는 문자열이며 언제든지 업데이트할 수 있습니다.
  • 프로젝트 ID는 모든 Google Cloud 프로젝트에서 고유하며, 변경할 수 없습니다(설정된 후에는 변경할 수 없음). Cloud 콘솔은 고유한 문자열을 자동으로 생성합니다. 일반적으로는 신경 쓰지 않아도 됩니다. 대부분의 Codelab에서는 프로젝트 ID (일반적으로 PROJECT_ID로 식별됨)를 참조해야 합니다. 생성된 ID가 마음에 들지 않으면 다른 임의 ID를 생성할 수 있습니다. 또는 직접 시도해 보고 사용 가능한지 확인할 수도 있습니다. 이 단계 이후에는 변경할 수 없으며 프로젝트 기간 동안 유지됩니다.
  • 참고로 세 번째 값은 일부 API에서 사용하는 프로젝트 번호입니다. 이 세 가지 값에 대한 자세한 내용은 문서를 참고하세요.
  1. 다음으로 Cloud 리소스/API를 사용하려면 Cloud 콘솔에서 결제를 사용 설정해야 합니다. 이 Codelab 실행에는 많은 비용이 들지 않습니다. 이 튜토리얼이 끝난 후에 요금이 청구되지 않도록 리소스를 종료하려면 만든 리소스 또는 프로젝트를 삭제하면 됩니다. Google Cloud 신규 사용자는 300달러(USD) 상당의 무료 체험판 프로그램에 참여할 수 있습니다.

Cloud Shell 시작

Google Cloud를 노트북에서 원격으로 실행할 수 있지만, 이 Codelab에서는 Cloud에서 실행되는 명령줄 환경인 Cloud Shell을 사용합니다.

Cloud Shell 활성화

  1. Cloud Console에서 Cloud Shell 활성화853e55310c205094.png를 클릭합니다.

3c1dabeca90e44e5.png

Cloud Shell을 처음 시작하는 경우 Cloud Shell에 대한 설명이 포함된 중간 화면이 표시됩니다. 중간 화면이 표시되면 계속을 클릭합니다.

9c92662c6a846a5c.png

Cloud Shell을 프로비저닝하고 연결하는 데 몇 분 정도만 걸립니다.

9f0e51b578fecce5.png

이 가상 머신에는 필요한 모든 개발 도구가 로드되어 있습니다. 영구적인 5GB 홈 디렉터리를 제공하고 Google Cloud에서 실행되므로 네트워크 성능과 인증이 크게 개선됩니다. 이 Codelab의 대부분의 작업은 브라우저에서 수행할 수 있습니다.

Cloud Shell에 연결되면 인증이 완료되었고 프로젝트가 해당 프로젝트 ID로 설정된 것을 확인할 수 있습니다.

  1. Cloud Shell에서 다음 명령어를 실행하여 인증되었는지 확인합니다.
gcloud auth list

명령어 결과

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. Cloud Shell에서 다음 명령어를 실행하여 gcloud 명령어가 프로젝트를 알고 있는지 확인합니다.
gcloud config list project

명령어 결과

[core]
project = <PROJECT_ID>

또는 다음 명령어로 설정할 수 있습니다.

gcloud config set project <PROJECT_ID>

명령어 결과

Updated property [core/project].

4. 1단계 - Kaggle에서 가입 및 인증

CodeLab을 시작하려면 Google에서 소유하고 있으며 다양한 도메인에 관한 공개 데이터 세트의 방대한 저장소를 호스팅하는 데이터 과학자 및 머신러닝 애호가를 위한 온라인 커뮤니티 플랫폼인 Kaggle에서 계정을 만들어야 합니다. 이 사이트에서 모델을 학습하는 데 사용할 RottenTomatoes 데이터 세트를 다운로드합니다.

  • Kaggle에 가입합니다. Google SSO를 사용하여 로그인할 수 있습니다.
  • 이용약관 동의
  • 설정으로 이동하여 사용자 이름 username을 확인합니다.
  • API 섹션에서 Kaggle의 '새 토큰 만들기'를 선택하면 kaggle.json이 다운로드됩니다.
  • 문제가 있는 경우 여기에서 지원 페이지로 이동하세요.

5. 2단계 - HuggingFace에서 가입 및 인증

HuggingFace는 누구나 머신러닝 기술을 활용할 수 있는 중심 공간입니다. 이 플랫폼은 90만 개의 모델, 20만 개의 데이터 세트, 30만 개의 데모 앱 (Spaces)을 호스팅하며, 모두 오픈소스이며 공개적으로 제공됩니다.

  • HuggingFace에 가입합니다. 사용자 이름으로 계정을 만듭니다. Google SSO는 사용할 수 없습니다.
  • 이메일 주소 확인
  • 여기로 이동하여 Gemma-2-9b-it 모델의 라이선스를 수락합니다.
  • 여기에서 HuggingFace 토큰을 만듭니다.
  • 나중에 필요하므로 토큰 사용자 인증 정보를 기록합니다.

6. 3단계 - 필요한 Google Cloud 인프라 리소스 만들기

워크로드 아이덴티티 제휴를 사용하여 GKE, GCE, Artifact Registry를 설정하고 IAM 역할을 적용합니다.

AI 워크플로는 학습용과 추론용의 두 가지 노드 풀을 사용합니다. 학습 노드 풀은 Nvidia L4 Tensor Core GPU가 1개 장착된 g2-standard-8 GCE VM을 사용합니다. 추론 노드 풀은 Nvidia L4 Tensor Core GPU 2개가 장착된 g2-standard-24 VM을 사용합니다. 리전을 지정할 때 필요한 GPU가 지원되는 리전을 선택합니다 ( 링크).

Cloud Shell에서 다음 명령어를 실행합니다.

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

YAML 매니페스트 만들기

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-role.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

Google Cloud Storage (GCS) 버킷 3개 만들기

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. 4단계 - Helm 차트를 통해 GKE에 Airflow 설치

이제 Helm을 사용하여 Airflow 2를 배포합니다. Apache Airflow는 데이터 엔지니어링 파이프라인을 위한 오픈소스 워크플로 관리 플랫폼입니다. Airflow 2의 기능 세트는 나중에 살펴보겠습니다.

Airflow Helm 차트의 values.yaml

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

Airflow 2 배포

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. 5단계 - 연결 및 변수로 Airflow 초기화

Airflow 2가 배포되면 구성을 시작할 수 있습니다. Python 스크립트에서 읽는 몇 가지 변수를 정의합니다.

  1. 브라우저를 사용하여 포트 8080의 Airflow UI에 액세스합니다.

외부 IP 가져오기

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

웹브라우저를 열고 http://<EXTERNAL-IP>:8080으로 이동합니다 . 로그인은 admin / admin입니다.

  1. Airflow UI 내에서 기본 GCP 연결을 만듭니다. Admin(관리) → Connections(연결) → + Add a new record(새 레코드 추가)로 이동합니다.
  • 연결 ID: google_cloud_default
  • 연결 유형: Google Cloud

저장을 클릭합니다.

  1. 필요한 변수를 만들려면 관리 → 변수 → + 새 레코드 추가로 이동합니다.
  • 키: BUCKET_DATA_NAME - 값: echo $BUCKET_DATA_NAME에서 복사
  • 키: GCP_PROJECT_ID - 값: echo $DEVSHELL_PROJECT_ID에서 복사
  • 키: HF_TOKEN - 값: HF 토큰 삽입
  • 키: KAGGLE_USERNAME - 값: Kaggle 사용자 이름 삽입
  • 키: KAGGLE_KEY - 값: kaggle.json에서 복사

각 키-값 쌍 후에 '저장'을 클릭합니다.

UI는 다음과 같이 표시됩니다.

771121470131b5ec.png

9. 애플리케이션 코드 컨테이너 1 - 데이터 다운로드

이 Python 스크립트에서는 Kaggle로 인증하여 데이터 세트를 GCS 버킷에 다운로드합니다.

스크립트 자체는 컨테이너화됩니다. 이는 DAG 1번이 되며 데이터 세트가 자주 업데이트될 것으로 예상되므로 자동화하려고 합니다.

디렉터리를 만들고 여기에 스크립트를 복사합니다.

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

이제 dataset-download용 컨테이너 이미지를 만들고 Artifact Registry에 푸시합니다.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. 애플리케이션 코드 컨테이너 2 - 데이터 준비

데이터 준비 단계에서는 다음을 수행합니다.

  1. 기본 모델을 미세 조정하는 데 사용할 데이터 세트의 양을 지정합니다.
  2. 데이터 세트를 로드합니다.즉, CSV 파일을 행과 열의 2차원 데이터 구조인 Pandas dataframe으로 읽습니다.
  3. 데이터 변환 / 사전 처리 - 보관할 항목을 지정하여 데이터 세트에서 관련 없는 부분을 결정합니다. 즉, 나머지는 삭제됩니다.
  4. DataFrame의 각 행에 transform 함수를 적용합니다.
  5. 준비된 데이터를 GCS 버킷에 다시 저장합니다.

디렉터리를 만들고 여기에 스크립트를 복사합니다.

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

Dockerfile

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. 애플리케이션 코드 컨테이너 3 - 미세 조정

여기서는 Gemma-2-9b-it를 기본 모델로 사용한 다음 새 데이터 세트로 모델을 미세 조정합니다.

미세 조정 단계에서 발생하는 단계의 순서입니다.

1. 설정: 라이브러리를 가져오고, 모델, 데이터, 학습의 매개변수를 정의하고, Google Cloud Storage에서 데이터 세트를 로드합니다.

2. 모델 로드: 효율성을 위해 양자화로 사전 학습된 언어 모델을 로드하고 해당 토큰 생성기를 로드합니다.

3. LoRA 구성: 소규모 학습 가능한 행렬을 추가하여 모델을 효율적으로 미세 조정하도록 LoRA (Low-Rank Adaptation)를 설정합니다.

4. 학습: 학습 매개변수를 정의하고 SFTTrainer를 사용하여 FP16 양자화 유형을 사용하여 로드된 데이터 세트에서 모델을 미세 조정합니다.

5. 저장 및 업로드: 미세 조정된 모델과 토큰라이저를 로컬에 저장한 후 GCS 버킷에 업로드합니다.

그런 다음 Cloud Build를 사용하여 컨테이너 이미지를 만들고 Artifact Registry에 저장합니다.

디렉터리를 만들고 여기에 스크립트를 복사합니다.

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

Dockerfile

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

이제 미세 조정을 위한 컨테이너 이미지를 만들고 Artifact Registry에 푸시합니다.

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. DAG의 정의가 포함된 Airflow 2 개요

Airflow는 워크플로 및 데이터 파이프라인을 조정하는 플랫폼입니다. DAG (방향성 비순환 그래프)를 사용하여 Python 코드에서 이러한 워크플로를 정의하고 태스크와 종속 항목을 시각적으로 나타냅니다.

정적 DAG 및 Python 기반 정의를 갖춘 Airflow는 사전 정의된 워크플로를 예약하고 관리하는 데 적합합니다. 이 아키텍처에는 이러한 워크플로를 모니터링하고 관리하기 위한 사용자 친화적인 UI가 포함되어 있습니다.

기본적으로 Airflow를 사용하면 Python을 사용하여 데이터 파이프라인을 정의, 예약, 모니터링할 수 있으므로 워크플로 조정을 위한 유연하고 강력한 도구가 됩니다.

13. DAG 개요

ec49964ad7d61491.png

DAG는 Directed Acyclic Graph의 약자입니다. Airflow에서 DAG 자체는 전체 워크플로 또는 파이프라인을 나타냅니다. 태스크, 종속 항목, 실행 순서를 정의합니다.

DAG 내 워크플로 단위는 Airflow 구성에서 시작된 GKE 클러스터의 포드에서 실행됩니다.

요약:

Airflow: 데이터 다운로드 - 이 스크립트는 Kaggle에서 영화 리뷰 데이터 세트를 가져와 GCS 버킷에 저장하는 프로세스를 자동화하여 클라우드 환경에서 추가 처리 또는 분석에 쉽게 사용할 수 있도록 합니다.

Airflow: 데이터 준비 - 이 코드는 원시 영화 리뷰 데이터 세트를 가져와 사용 사례에 필요하지 않은 불필요한 데이터 열을 삭제하고 누락된 값이 있는 데이터 세트를 삭제합니다. 그런 다음 데이터 세트를 머신러닝에 적합한 질문-답변 형식으로 구성하고 나중에 사용할 수 있도록 GCS에 다시 저장합니다.

Airflow: 모델 미세 조정 - 이 코드는 LoRA (하위 순위 조정)라는 기법을 사용하여 대규모 언어 모델 (LLM)을 미세 조정하고 업데이트된 모델을 저장합니다. 먼저 Google Cloud Storage에서 사전 학습된 LLM과 데이터 세트를 로드합니다. 그런 다음 LoRA를 적용하여 이 데이터 세트에서 모델을 효율적으로 미세 조정합니다. 마지막으로 미세 조정된 모델을 Google Cloud Storage에 다시 저장하여 나중에 텍스트 생성이나 질문 응답과 같은 애플리케이션에서 사용합니다.

Airflow: 모델 제공 - 추론을 위해 vllm을 사용하여 GKE에서 미세 조정된 모델을 제공합니다.

Airflow: 피드백 루프 - xx마다 모델을 재학습합니다 (시간, 일, 주 단위).

이 다이어그램은 GKE에서 실행될 때 Airflow 2가 작동하는 방식을 설명합니다.

8691f41166209a5d.png

14. 모델 미세 조정과 RAG 사용 비교

이 CodeLab에서는 검색 증강 생성 (RAG)을 사용하는 대신 LLM을 미세 조정합니다.

다음 두 가지 접근 방식을 비교해 보겠습니다.

미세 조정: 특수화된 모델을 만듭니다. 미세 조정은 LLM을 특정 작업 또는 데이터 세트에 맞게 조정하여 외부 데이터 소스에 의존하지 않고도 독립적으로 작동할 수 있도록 합니다.

추론 간소화: 별도의 검색 시스템과 데이터베이스가 필요하지 않으므로 특히 빈번한 사용 사례의 경우 더 빠르고 저렴한 응답이 가능합니다.

RAG: 외부 지식 사용: RAG는 각 요청에 대해 기술 자료에서 관련 정보를 검색하여 최신 및 구체적인 데이터에 액세스할 수 있도록 합니다.

복잡성 증가: Kubernetes 클러스터와 같은 프로덕션 환경에서 RAG를 구현하려면 데이터 처리 및 검색을 위한 여러 마이크로서비스를 사용해야 하는 경우가 많으므로 지연 시간과 계산 비용이 늘어날 수 있습니다.

미세 조정이 선택된 이유:

RAG는 이 Codelab에서 사용된 소규모 데이터 세트에 적합하지만, Airflow의 일반적인 사용 사례를 보여주기 위해 미세 조정을 선택했습니다. 이렇게 하면 RAG를 위한 추가 인프라 및 마이크로서비스를 설정하는 미묘한 차이점을 파고들기보다는 워크플로 조정 측면에 집중할 수 있습니다.

결론:

미세 조정과 RAG는 모두 고유한 장단점이 있는 유용한 기법입니다. 최적의 선택은 데이터의 크기 및 복잡성, 성능 요구사항, 비용 고려사항과 같은 프로젝트의 구체적인 요구사항에 따라 달라집니다.

15. DAG 작업 1 - Airflow의 첫 번째 단계 만들기: 데이터 다운로드

이 DAG 단위의 개요로, 컨테이너 이미지에 호스팅된 Python 코드는 Kaggle에서 최신 RottenTomatoes 데이터 세트를 다운로드합니다.

이 코드를 GCS 버킷에 복사하지 마세요. 마지막 단계로 mlops-dag.py를 복사합니다. 이 파일에는 하나의 Python 스크립트 내에 모든 DAG 단위 단계가 포함되어 있습니다.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. DAG 작업 2 - Airflow의 두 번째 단계: 데이터 준비 만들기

이 DAG 단위의 개요로 GCS에서 CSV 파일 (rotten_tomatoes_movie_reviews.csv)을 Pandas DataFrame에 로드합니다.

그런 다음 테스트 및 리소스 효율성을 위해 DATASET_LIMIT을 사용하여 처리되는 행 수를 제한하고 마지막으로 변환된 데이터를 Hugging Face 데이터 세트로 변환합니다.

자세히 살펴보면 'DATASET_LIMIT': '1000'을 사용하여 모델에서 1,000개의 행을 학습하고 있는 것을 볼 수 있습니다. Nvidia L4 GPU에서 이렇게 하는 데 20분이 걸리기 때문입니다.

이 코드를 GCS 버킷에 복사하지 마세요. 마지막 단계에서 하나의 Python 스크립트 내에 모든 단계가 포함된 mlops-dag.py를 복사합니다.

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. DAG 작업 3 - Airflow의 세 번째 단계 만들기: 모델 미세 조정

이 DAG 단위의 개요로, 여기서는 finetune.py를 실행하여 새 데이터 세트로 Gemma 모델을 미세 조정합니다.

이 코드를 GCS 버킷에 복사하지 마세요. 마지막 단계에서 하나의 Python 스크립트 내에 모든 단계가 포함된 mlops-dag.py를 복사합니다.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. DAG 태스크 4 - Airflow의 마지막 단계 만들기: 추론 / 모델 서빙

vLLM은 LLM의 고성능 추론을 위해 특별히 설계된 강력한 오픈소스 라이브러리입니다. Google Kubernetes Engine (GKE)에 배포하면 Kubernetes의 확장성과 효율성을 활용하여 LLM을 효과적으로 제공할 수 있습니다.

단계 요약:

  • DAG 'mlops-dag.py'를 GCS 버킷에 업로드합니다.
  • 추론을 설정하기 위한 Kubernetes YAML 구성 파일 2개를 GCS 버킷에 복사합니다.

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

Python 스크립트 (DAG 파일)와 Kubernetes 매니페스트를 DAGS GCS 버킷에 업로드합니다.

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

Airflow UI에 mlops-dag가 표시됩니다.

  1. 일시중지 해제를 선택합니다.
  2. 수동 MLOps 주기를 실행하려면 Trigger DAG를 선택합니다.

d537281b92d5e8bb.png

DAG가 완료되면 Airflow UI에 다음과 같은 출력이 표시됩니다.

3ed42abf8987384e.png

마지막 단계가 완료되면 모델 엔드포인트를 가져와 프롬프트를 전송하여 모델을 테스트할 수 있습니다.

모델 추론이 시작되고 부하 분산기가 외부 IP 주소를 할당할 수 있도록 curl 명령어를 실행하기 전에 약 5분 정도 기다립니다.

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

출력:

19. 축하합니다.

GKE에서 Airflow 2를 사용하여 DAG 파이프라인을 통해 첫 번째 AI 워크플로를 만들었습니다.

배포한 리소스의 프로비저닝을 해제하는 것을 잊지 마세요.

20. 프로덕션에서 실행

CodeLab에서는 GKE에서 Airflow 2를 설정하는 방법을 자세히 설명했지만, 실제 프로덕션 환경에서 설정할 때는 다음 주제를 고려해야 합니다.

Gradio 또는 유사한 도구를 사용하여 웹 프런트엔드를 구현합니다.

여기에서 GKE를 사용하여 워크로드의 자동 애플리케이션 모니터링을 구성하거나 여기에서 Airflow에서 측정항목을 내보냅니다.

특히 데이터 세트가 큰 경우 모델을 더 빠르게 미세 조정하려면 더 큰 GPU가 필요할 수 있습니다. 하지만 여러 GPU에서 모델을 학습하려면 데이터 세트를 분할하고 학습을 샤딩해야 합니다. 다음은 PyTorch를 사용한 FSDP (완전히 샤딩된 데이터 병렬, GPU 공유를 사용하여 이 목표를 달성함)에 관한 설명입니다. 자세한 내용은 Meta의 블로그 게시물과 Pytorch를 사용한 FSDP에 관한 이 튜토리얼을 참고하세요.

Google Cloud Composer는 관리형 Airflow 서비스이므로 Airflow 자체를 유지관리할 필요가 없습니다. DAG를 배포하기만 하면 됩니다.

자세히 알아보기

라이선스

이 작업물은 Creative Commons Attribution 2.0 일반 라이선스에 따라 사용이 허가되었습니다.