GKE-তে এয়ারফ্লো 2 সহ MLOps ওয়ার্কফ্লো তৈরি করা

1. ওভারভিউ

852dc8844309ffb8.png

এই CodeLab প্রদর্শন করে যে কীভাবে একটি ডেটাসেট ডাউনলোড করে, একটি মডেল পরিমার্জন করে এবং ন্যূনতম পরিমাণ বিমূর্ততা সহ একটি Airflow DAG ব্যবহার করে Google Kubernetes Engine (GKE) এ LLM স্থাপন করে মেশিন লার্নিং (MLOps) এর সাথে DevOps অনুশীলনগুলিকে একীভূত করতে হয়৷ ফলস্বরূপ আমরা gcloud কমান্ড ব্যবহার করছি এবং terraform নয় যাতে আপনি ধাপে ধাপে ল্যাব অনুসরণ করতে পারেন এবং প্ল্যাটফর্ম ইঞ্জিনিয়ার এবং মেশিন লার্নিং ইঞ্জিনিয়ার উভয়ের দৃষ্টিকোণ থেকে প্রতিটি প্রক্রিয়া সহজেই বুঝতে পারেন।

এই হ্যান্ডস-অন গাইডটি আপনার এআই ওয়ার্কফ্লোকে স্ট্রীমলাইন করার জন্য এয়ারফ্লোকে সুবিধার মাধ্যমে নিয়ে যাবে, একটি DAG কনফিগার করে সমগ্র MLOps জীবনচক্রের একটি পরিষ্কার এবং ব্যবহারিক প্রদর্শন প্রদান করবে।

আপনি কি শিখবেন

  • প্ল্যাটফর্ম এবং মেশিন লার্নিং ইঞ্জিনিয়ারদের মধ্যে বৃহত্তর সহযোগিতা এবং বোঝাপড়া গড়ে তুলুন জ্ঞানের সাইলো ভেঙ্গে এবং কর্মপ্রবাহ উন্নত করে
  • GKE-তে এয়ারফ্লো 2 কীভাবে স্থাপন, ব্যবহার এবং পরিচালনা করবেন তা বুঝুন
  • শেষ থেকে শেষ পর্যন্ত একটি এয়ারফ্লো DAG কনফিগার করুন
  • GKE এর সাথে প্রোডাকশন গ্রেড মেশিন লার্নিং সিস্টেমের ভিত্তি তৈরি করুন
  • মেশিন লার্নিং সিস্টেমকে ইন্সট্রুমেন্ট এবং চালু করা
  • প্ল্যাটফর্ম ইঞ্জিনিয়ারিং কীভাবে MLOps-এর জন্য একটি গুরুত্বপূর্ণ সমর্থন স্তম্ভ হয়ে উঠেছে তা বুঝুন

এই কোডল্যাব কি অর্জন করে

  • আপনি একটি LLM-এর সিনেমা সম্পর্কে প্রশ্ন করতে পারেন যা আমরা Gemma-2-9b-it-এর উপর ভিত্তি করে তৈরি করেছি, VLLM-এর সাথে GKE-তে পরিবেশিত।

লক্ষ্য শ্রোতা

  • মেশিন লার্নিং ইঞ্জিনিয়ার
  • প্ল্যাটফর্ম ইঞ্জিনিয়াররা
  • তথ্য বিজ্ঞানীরা
  • ডেটা ইঞ্জিনিয়ার
  • DevOps ইঞ্জিনিয়ার্স
  • প্ল্যাটফর্ম স্থপতি
  • গ্রাহক প্রকৌশলী

এই CodeLab উদ্দেশ্য নয়

  • GKE বা AI/ML কর্মপ্রবাহের ভূমিকা হিসাবে
  • পুরো এয়ারফ্লো ফিচার-সেটের রান-থ্রু হিসেবে

2. প্ল্যাটফর্ম ইঞ্জিনিয়ারিং মেশিন লার্নিং ইঞ্জিনিয়ার/বিজ্ঞানীদের সাহায্য করে

16635a8284b994c.png

প্ল্যাটফর্ম ইঞ্জিনিয়ারিং এবং MLOps হল পারস্পরিক নির্ভরশীল শৃঙ্খলা যা ML উন্নয়ন এবং স্থাপনার জন্য একটি শক্তিশালী এবং দক্ষ পরিবেশ তৈরি করতে সহযোগিতা করে।

সুযোগ: প্ল্যাটফর্ম ইঞ্জিনিয়ারিং-এর MLOps-এর চেয়ে বিস্তৃত সুযোগ রয়েছে, যা সমগ্র সফ্টওয়্যার বিকাশের জীবনচক্রকে অন্তর্ভুক্ত করে এবং এর জন্য সরঞ্জাম এবং অবকাঠামো প্রদান করে।

MLOps ML উন্নয়ন, স্থাপনা এবং অনুমানের মধ্যে ব্যবধান পূরণ করে।

দক্ষতা: প্ল্যাটফর্ম ইঞ্জিনিয়ারদের সাধারণত ক্লাউড কম্পিউটিং, কন্টেইনারাইজেশন এবং ডেটা ব্যবস্থাপনার মতো অবকাঠামো প্রযুক্তিতে শক্তিশালী দক্ষতা থাকে।

MLOps ইঞ্জিনিয়াররা এমএল মডেল ডেভেলপমেন্ট, স্থাপনা এবং পর্যবেক্ষণে বিশেষজ্ঞ, প্রায়ই ডেটা সায়েন্স এবং সফটওয়্যার ইঞ্জিনিয়ারিং দক্ষতার অধিকারী।

টুলস: প্ল্যাটফর্ম ইঞ্জিনিয়াররা অবকাঠামো ব্যবস্থা, কনফিগারেশন ম্যানেজমেন্ট এবং কন্টেইনার অর্কেস্ট্রেশন এবং অ্যাপ্লিকেশন স্ক্যাফোল্ডিংয়ের জন্য সরঞ্জাম তৈরি করে। MLOps ইঞ্জিনিয়াররা ML মডেল প্রশিক্ষণ, পরীক্ষা, স্থাপনা, পর্যবেক্ষণ, এবং সংস্করণের জন্য সরঞ্জামগুলি ব্যবহার করে।

3. Google ক্লাউড সেটআপ এবং প্রয়োজনীয়তা

স্ব-গতিসম্পন্ন পরিবেশ সেটআপ

  1. Google ক্লাউড কনসোলে সাইন-ইন করুন এবং একটি নতুন প্রকল্প তৈরি করুন বা বিদ্যমান একটি পুনরায় ব্যবহার করুন৷ আপনার যদি ইতিমধ্যেই একটি Gmail বা Google Workspace অ্যাকাউন্ট না থাকে, তাহলে আপনাকে অবশ্যই একটি তৈরি করতে হবে।

fbef9caa1602edd0.png

a99b7ace416376c4.png

5e3ff691252acf41.png

  • প্রকল্পের নাম এই প্রকল্পের অংশগ্রহণকারীদের জন্য প্রদর্শনের নাম। এটি একটি অক্ষর স্ট্রিং যা Google API দ্বারা ব্যবহৃত হয় না। আপনি সবসময় এটি আপডেট করতে পারেন.
  • প্রোজেক্ট আইডি সমস্ত Google ক্লাউড প্রোজেক্ট জুড়ে অনন্য এবং অপরিবর্তনীয় (সেট করার পরে পরিবর্তন করা যাবে না)। ক্লাউড কনসোল স্বয়ংক্রিয়ভাবে একটি অনন্য স্ট্রিং তৈরি করে; সাধারণত আপনি এটা কি যত্ন না. বেশিরভাগ কোডল্যাবে, আপনাকে আপনার প্রকল্প আইডি উল্লেখ করতে হবে (সাধারণত PROJECT_ID হিসাবে চিহ্নিত)। আপনি যদি জেনারেট করা আইডি পছন্দ না করেন, তাহলে আপনি অন্য একটি এলোমেলো আইডি তৈরি করতে পারেন। বিকল্পভাবে, আপনি নিজের চেষ্টা করতে পারেন, এবং এটি উপলব্ধ কিনা দেখতে পারেন। এই ধাপের পরে এটি পরিবর্তন করা যাবে না এবং প্রকল্পের সময়কালের জন্য থাকে।
  • আপনার তথ্যের জন্য, একটি তৃতীয় মান আছে, একটি প্রকল্প নম্বর , যা কিছু API ব্যবহার করে। ডকুমেন্টেশনে এই তিনটি মান সম্পর্কে আরও জানুন।
  1. এরপরে, ক্লাউড রিসোর্স/এপিআই ব্যবহার করতে আপনাকে ক্লাউড কনসোলে বিলিং সক্ষম করতে হবে। এই কোডল্যাবের মাধ্যমে চালানোর জন্য খুব বেশি খরচ হবে না, যদি কিছু হয়। এই টিউটোরিয়ালের বাইরে বিলিং এড়াতে সংস্থানগুলি বন্ধ করতে, আপনি আপনার তৈরি করা সংস্থানগুলি মুছতে বা প্রকল্প মুছতে পারেন। নতুন Google ক্লাউড ব্যবহারকারীরা $300 USD বিনামূল্যের ট্রায়াল প্রোগ্রামের জন্য যোগ্য৷

ক্লাউড শেল শুরু করুন

যদিও Google ক্লাউড আপনার ল্যাপটপ থেকে দূরবর্তীভাবে পরিচালিত হতে পারে, এই কোডল্যাবে আপনি ক্লাউড শেল ব্যবহার করবেন, ক্লাউডে চলমান একটি কমান্ড লাইন পরিবেশ।

ক্লাউড শেল সক্রিয় করুন

  1. ক্লাউড কনসোল থেকে, ক্লাউড শেল সক্রিয় করুন ক্লিক করুন 853e55310c205094.png .

3c1dabeca90e44e5.png

যদি এটি আপনার প্রথমবার ক্লাউড শেল শুরু হয়, তাহলে এটি কী তা বর্ণনা করে আপনাকে একটি মধ্যবর্তী স্ক্রীন উপস্থাপন করা হবে। যদি আপনি একটি মধ্যবর্তী স্ক্রীনের সাথে উপস্থাপিত হন, তবে চালিয়ে যান ক্লিক করুন।

9c92662c6a846a5c.png

ক্লাউড শেলের সাথে সংযোগ করতে এবং সংযোগ করতে এটির মাত্র কয়েক মুহূর্ত লাগবে৷

9f0e51b578fecce5.png

এই ভার্চুয়াল মেশিনটি প্রয়োজনীয় সমস্ত বিকাশের সরঞ্জাম দিয়ে লোড করা হয়েছে। এটি একটি ক্রমাগত 5 GB হোম ডিরেক্টরি অফার করে এবং Google ক্লাউডে চলে, যা নেটওয়ার্ক কর্মক্ষমতা এবং প্রমাণীকরণকে ব্যাপকভাবে উন্নত করে। এই কোডল্যাবে আপনার অনেক কাজ, যদি সব না হয়, ব্রাউজার দিয়ে করা যেতে পারে।

একবার ক্লাউড শেলের সাথে সংযুক্ত হয়ে গেলে, আপনি দেখতে পাবেন যে আপনি প্রমাণীকৃত হয়েছেন এবং প্রকল্পটি আপনার প্রকল্প আইডিতে সেট করা আছে।

  1. আপনি প্রমাণীকৃত কিনা তা নিশ্চিত করতে ক্লাউড শেলে নিম্নলিখিত কমান্ডটি চালান:
gcloud auth list

কমান্ড আউটপুট

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
  1. gcloud কমান্ড আপনার প্রকল্প সম্পর্কে জানে তা নিশ্চিত করতে ক্লাউড শেলে নিম্নলিখিত কমান্ডটি চালান:
gcloud config list project

কমান্ড আউটপুট

[core]
project = <PROJECT_ID>

যদি এটি না হয়, আপনি এই কমান্ড দিয়ে এটি সেট করতে পারেন:

gcloud config set project <PROJECT_ID>

কমান্ড আউটপুট

Updated property [core/project].

4. ধাপ #1 - সাইন আপ করুন এবং Kaggle এ প্রমাণীকরণ করুন

CodeLab শুরু করার জন্য, আপনাকে Kaggle-এ একটি অ্যাকাউন্ট তৈরি করতে হবে যা Google-এর মালিকানাধীন ডেটা বিজ্ঞানী এবং মেশিন লার্নিং উত্সাহীদের জন্য একটি অনলাইন কমিউনিটি প্ল্যাটফর্ম এবং বিভিন্ন ডোমেনের জন্য সর্বজনীনভাবে উপলব্ধ ডেটাসেটের একটি বিশাল ভাণ্ডার হোস্ট করে৷ এই সাইট থেকে আপনি RottenTomatoes ডেটাসেট ডাউনলোড করবেন, যা আপনার মডেলকে প্রশিক্ষণ দিতে ব্যবহৃত হয়।

  • Kaggle এ সাইন আপ করুন, আপনি সাইন-ইন করতে Google SSO ব্যবহার করতে পারেন
  • শর্তাবলী গ্রহণ করুন
  • সেটিংসে যান এবং আপনার ব্যবহারকারীর নাম ব্যবহারকারীর নাম পান
  • API বিভাগের অধীনে, "Create new token from Kaggle" নির্বাচন করুন যা kaggle.json ডাউনলোড করবে
  • আপনার যদি কোনো সমস্যা থাকে, তাহলে এখানে সহায়তা পৃষ্ঠায় যান

5. ধাপ #2 - সাইন আপ করুন এবং HuggingFace-এ প্রমাণীকরণ করুন

HuggingFace হল মেশিন লার্নিং টেকনোলজির সাথে যুক্ত হওয়ার জন্য একটি কেন্দ্রীয় অবস্থান। এটি 900k মডেল, 200k ডেটাসেট এবং 300k ডেমো অ্যাপ (স্পেস) হোস্ট করে, সমস্ত ওপেন সোর্স এবং সর্বজনীনভাবে উপলব্ধ।

  • HuggingFace- এ সাইন আপ করুন - ব্যবহারকারীর নাম দিয়ে একটি অ্যাকাউন্ট তৈরি করুন, আপনি Google SSO ব্যবহার করতে পারবেন না
  • আপনার ইমেল ঠিকানা নিশ্চিত করুন
  • এখানে যান এবং Gemma-2-9b-it মডেলের লাইসেন্স গ্রহণ করুন
  • এখানে একটি HuggingFace টোকেন তৈরি করুন
  • টোকেন শংসাপত্রগুলি রেকর্ড করুন, আপনার এটি পরে প্রয়োজন হবে৷

6. ধাপ #3 - প্রয়োজনীয় Google ক্লাউড অবকাঠামো সংস্থান তৈরি করুন

আপনি GKE, GCE, আর্টিফ্যাক্ট রেজিস্ট্রি সেট আপ করবেন এবং ওয়ার্কলোড আইডেন্টিটি ফেডারেশন ব্যবহার করে IAM ভূমিকা প্রয়োগ করবেন।

আপনার এআই ওয়ার্কফ্লো দুটি নোডপুল নিয়োগ করে, একটি প্রশিক্ষণের জন্য এবং একটি অনুমানের জন্য। প্রশিক্ষণ নোডপুল একটি এনভিডিয়া এল৪ টেনসর কোর জিপিইউ দিয়ে সজ্জিত একটি g2-standard-8 GCE VM ব্যবহার করছে। ইনফারেন্স নোডপুল দুটি Nvidia L4 Tensor Core GPU দিয়ে সজ্জিত একটি g2-standard-24 VM ব্যবহার করছে। অঞ্চল নির্দিষ্ট করার সময়, প্রয়োজনীয় GPU সমর্থিত একটি বেছে নিন ( লিঙ্ক )।

আপনার ক্লাউড শেলে নিম্নলিখিত কমান্ডগুলি চালান:

# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")

SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1

# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto

# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount

# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com

# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
  --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-12 \
  --num-nodes=1

# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
  --accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
  --project=${PROJECT_ID} \
  --location=${REGION}-a \
  --node-locations=${REGION}-a \
  --cluster=${CLUSTER_NAME} \
  --machine-type=g2-standard-24 \
  --num-nodes=1

# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a

# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}

আপনার YAML ম্যানিফেস্ট তৈরি করুন

mkdir manifests
cd manifests

mlops-sa.yaml

apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
  name: airflow-mlops-sa
  namespace: airflow
  labels:
    tier: airflow

pv-dags.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-dags
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 5Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_DAGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pv-logs.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-logs
spec:
  accessModes:
  - ReadWriteMany
  capacity:
    storage: 100Gi
  storageClassName: standard
  mountOptions:
    - implicit-dirs
  csi:
    driver: gcsfuse.csi.storage.gke.io
    volumeHandle: BUCKET_LOGS_NAME
    volumeAttributes:
      gcsfuseLoggingSeverity: warning

pvc-dags.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-dags
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
  volumeName: airflow-dags
  storageClassName: standard

pvc-logs.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-logs
  namespace: airflow
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  volumeName: airflow-logs
  storageClassName: standard

namespace.yaml

kind: Namespace
apiVersion: v1
metadata:
  name: airflow
  labels:
    name: airflow

sa-ভূমিকা.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow-deployment-role
rules:
- apiGroups: ["apps"] 
  resources: ["deployments"]
  verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]

sa-rolebinding.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow-deployment-rolebinding
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: airflow-worker
  namespace: airflow
roleRef:
  kind: Role
  name: airflow-deployment-role
  apiGroup: rbac.authorization.k8s.io

inference.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: inference-deployment
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: gemma-server
  template:
    metadata:
      labels:
        app: gemma-server
        ai.gke.io/model: gemma-2-9b-it
        ai.gke.io/inference-server: vllm
      annotations:
        gke-gcsfuse/volumes: "true"
    spec:
      serviceAccountName: airflow-mlops-sa
      tolerations:
      - key: "nvidia.com/gpu"
        operator: "Exists"
        effect: "NoSchedule"
      - key: "on-demand"
        value: "true"
        operator: "Equal"
        effect: "NoSchedule"
      containers:
      - name: inference-server
        image: vllm/vllm-openai:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            nvidia.com/gpu: "2"
          limits:
            nvidia.com/gpu: "2"
        command: ["/bin/sh", "-c"]
        args:
        - |
          python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
        volumeMounts:
        - mountPath: /dev/shm
          name: dshm
        - name: gcs-fuse-csi-ephemeral
          mountPath: /modeldata
          readOnly: true
      volumes:
      - name: dshm
        emptyDir:
          medium: Memory
      - name: gcs-fuse-csi-ephemeral
        csi:
          driver: gcsfuse.csi.storage.gke.io
          volumeAttributes:
            bucketName: BUCKET_DATA_NAME
            mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
            fileCacheCapacity: "20Gi"
            fileCacheForRangeRead: "true"
            metadataStatCacheCapacity: "-1"
            metadataTypeCacheCapacity: "-1"
            metadataCacheTTLSeconds: "-1"
      nodeSelector:
        cloud.google.com/gke-accelerator: nvidia-l4

inference-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: llm-service
  namespace: airflow
spec:
  selector:
    app: gemma-server
  type: LoadBalancer
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000

3টি Google ক্লাউড স্টোরেজ (GCS) বালতি তৈরি করুন

gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}

# Create the namespace in GKE
kubectl apply -f namespace.yaml

# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml

Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation

gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"

7. ধাপ #4 - হেলম চার্টের মাধ্যমে GKE-তে এয়ারফ্লো ইনস্টল করুন

এখন আমরা Helm ব্যবহার করে Airflow 2 স্থাপন করি। Apache Airflow হল ডেটা ইঞ্জিনিয়ারিং পাইপলাইনের জন্য একটি ওপেন সোর্স ওয়ার্কফ্লো ম্যানেজমেন্ট প্ল্যাটফর্ম । আমরা পরে এয়ারফ্লো 2 এর বৈশিষ্ট্য সেটে যাব।

Values.yaml এয়ারফ্লো হেলম চার্টের জন্য

config:
  webserver:
    expose_config: true
webserver:
  service:
    type: LoadBalancer
  podAnnotations:
    gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
  - name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
    value: "30"
logs:
  persistence:
    enabled: true
    existingClaim: "airflow-logs"
dags:
  persistence:
    enabled: true
    existingClaim: "airflow-dags"
scheduler:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
triggerer:
  podAnnotations:
    gke-gcsfuse/volumes: "true"
workers:
  podAnnotations:
    gke-gcsfuse/volumes: "true"

বায়ুপ্রবাহ স্থাপন করুন 2

helm repo add apache-airflow https://airflow.apache.org
helm repo update

helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml

8. ধাপ #5 - সংযোগ এবং ভেরিয়েবল সহ বায়ুপ্রবাহ শুরু করুন

একবার এয়ারফ্লো 2 স্থাপন করা হলে, আমরা এটি কনফিগার করা শুরু করতে পারি। আমরা কিছু ভেরিয়েবল সংজ্ঞায়িত করি, যা আমাদের পাইথন স্ক্রিপ্ট দ্বারা পড়া হয়।

  1. আপনার ব্রাউজার দিয়ে পোর্ট 8080 এ Airflow UI অ্যাক্সেস করুন

বাহ্যিক আইপি পান

kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'

একটি ওয়েব ব্রাউজার খুলুন এবং http:// <EXTERNAL-IP> :8080 এ যান। লগইন হল অ্যাডমিন/প্রশাসন

  1. Airflow UI এর মধ্যে একটি ডিফল্ট GCP সংযোগ তৈরি করুন, তাই অ্যাডমিন → সংযোগ → + একটি নতুন রেকর্ড যোগ করুন-এ যান
  • সংযোগ আইডি: google_cloud_default
  • সংযোগের ধরন: গুগল ক্লাউড

Save এ ক্লিক করুন।

  1. প্রয়োজনীয় ভেরিয়েবল তৈরি করুন, তাই অ্যাডমিন → ভেরিয়েবল → + একটি নতুন রেকর্ড যোগ করুন এ যান
  • কী: BUCKET_DATA_NAME - মান: echo $BUCKET_DATA_NAME থেকে কপি
  • কী: GCP_PROJECT_ID - মান: echo $DEVSHELL_PROJECT_ID থেকে অনুলিপি
  • কী: HF_TOKEN - মান: আপনার HF টোকেন ঢোকান
  • কী: KAGGLE_USERNAME - মান: আপনার kaggle ব্যবহারকারীর নাম সন্নিবেশ করুন
  • কী: KAGGLE_KEY - মান: kaggle.json থেকে এটি কপি করুন

প্রতিটি কী-মান জোড়ার পরে সংরক্ষণ করুন ক্লিক করুন।

আপনার UI দেখতে এইরকম হওয়া উচিত:

771121470131b5ec.png

9. অ্যাপ্লিকেশন কোড কন্টেইনার #1 - ডেটা ডাউনলোড হচ্ছে

এই পাইথন স্ক্রিপ্টে, আমরা আমাদের GCS বালতিতে ডেটাসেট ডাউনলোড করতে Kaggle দিয়ে প্রমাণীকরণ করি।

স্ক্রিপ্টটি নিজেই কনটেইনারাইজড কারণ এটি DAG ইউনিট #1 হয়ে যায় এবং আমরা আশা করি যে ডেটাসেট ঘন ঘন আপডেট হবে, তাই আমরা এটি স্বয়ংক্রিয় করতে চাই।

ডিরেক্টরি তৈরি করুন এবং এখানে আমাদের স্ক্রিপ্ট অনুলিপি করুন

cd .. ; mkdir 1-dataset-download
cd 1-dataset-download

dataset-download.py

import os
import kagglehub
from google.cloud import storage

KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")

# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")

print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"

upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)

ডকারফাইল

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]

requirements.txt

google-cloud-storage==2.19.0
kagglehub==0.3.4

এখন আমরা ডেটাসেট-ডাউনলোডের জন্য একটি কন্টেইনার ইমেজ তৈরি করি এবং এটিকে আর্টিফ্যাক্ট রেজিস্ট্রিতে পুশ করি

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest

10. অ্যাপ্লিকেশন কোড কন্টেইনার #2 - ডেটা প্রস্তুতি

আমাদের ডেটা প্রস্তুতির ধাপে, আমরা যা অর্জন করি তা হল:

  1. আমাদের বেস মডেল ফাইনটিউন করার জন্য আমরা কত ডেটাসেট ব্যবহার করতে চাই তা নির্দিষ্ট করুন
  2. ডেটাসেট লোড করে, যেমন একটি পান্ডাস ডেটাফ্রেমে CSV ফাইলটি পড়ে যা সারি এবং কলামগুলির জন্য একটি 2-মাত্রিক ডেটা কাঠামো
  3. ডেটা ট্রান্সফর্মেশন/প্রিপ্রসেসিং - আমরা কী রাখতে চাই তা নির্দিষ্ট করে ডেটাসেটের কোন অংশগুলি অপ্রাসঙ্গিক তা নির্ধারণ করুন, যা কার্যত বাকিগুলিকে সরিয়ে দিচ্ছে
  4. ডেটাফ্রেমের প্রতিটি সারিতে transform ফাংশন প্রয়োগ করে
  5. GCS বালতিতে প্রস্তুত ডেটা আবার সংরক্ষণ করুন

ডিরেক্টরি তৈরি করুন এবং এখানে আমাদের স্ক্রিপ্ট অনুলিপি করুন

cd .. ; mkdir 2-data-preparation
cd 2-data-preparation

data-preparation.py

import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset

# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")

DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100"))  # Process a limited number of rows, used 100 during testing phase but can be increased

DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"

# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")

def transform(data):
    """
    Transforms a row of the DataFrame into the desired format for fine-tuning.

    Args:
      data: A pandas Series representing a row of the DataFrame.

    Returns:
      A dictionary containing the formatted text.
    """ 
    question = f"Review analysis for movie '{data['id']}'"
    context = data['reviewText']
    answer = data['scoreSentiment']
    template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
    return {'text': template.format(question=question, context=context, answer=answer)}

try:
    df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
    print(f"Dataset loaded successfully.")

    # Drop rows with NaN values in relevant columns
    df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])

    # Apply transformation to the DataFrame
    transformed_data = df.apply(transform, axis=1).tolist()

    # Convert transformed data to a DataFrame and then to a Hugging Face Dataset
    transformed_df = pd.DataFrame(transformed_data)
    dataset = Dataset.from_pandas(transformed_df)

    # Save the prepared dataset to JSON lines format
    with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
        for item in dataset:
            f.write(json.dumps(item) + "\n")

    print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
    
except Exception as e:
    print(f"Error during data loading or preprocessing: {e}")
    import traceback
    print(traceback.format_exc())

ডকারফাইল

FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]

requirements.txt

datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3

# Now we create a container images for data-preparation and push it to the Artifact Registry

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest

11. অ্যাপ্লিকেশন কোড কন্টেইনার #3 - ফাইনটিউনিং

এখানে আমরা একটি বেস মডেল হিসাবে Gemma-2-9b-it ব্যবহার করি এবং তারপর আমাদের নতুন ডেটাসেটের সাথে এটিকে সুন্দর করে তুলব।

ফাইনটিউনিং ধাপের সময় ঘটতে থাকা ধাপগুলির এই ক্রম।

1. সেটআপ: লাইব্রেরি আমদানি করুন, প্যারামিটারগুলি (মডেল, ডেটা এবং প্রশিক্ষণের জন্য) সংজ্ঞায়িত করুন এবং Google ক্লাউড স্টোরেজ থেকে ডেটাসেট লোড করুন৷

2. লোড মডেল: দক্ষতার জন্য কোয়ান্টাইজেশন সহ একটি প্রাক-প্রশিক্ষিত ভাষা মডেল লোড করুন এবং সংশ্লিষ্ট টোকেনাইজার লোড করুন।

3. LoRA কনফিগার করুন: ছোট প্রশিক্ষনযোগ্য ম্যাট্রিক্স যোগ করে মডেলটিকে দক্ষতার সাথে ফাইনটিউন করতে নিম্ন-র্যাঙ্ক অ্যাডাপ্টেশন (LoRA) সেট আপ করুন।

4. ট্রেন: প্রশিক্ষণের পরামিতিগুলি সংজ্ঞায়িত করুন এবং FP16 কোয়ান্টাইজেশন টাইপ ব্যবহার করে লোড করা ডেটাসেটে মডেলটিকে সূক্ষ্ম-টিউন করতে SFTTrainer ব্যবহার করুন।

5. সংরক্ষণ করুন এবং আপলোড করুন: ফাইনটিউন করা মডেল এবং টোকেনাইজার স্থানীয়ভাবে সংরক্ষণ করুন, তারপর সেগুলি আমাদের GCS বালতিতে আপলোড করুন।

তারপরে আমরা ক্লাউড বিল্ড ব্যবহার করে একটি কন্টেইনার ইমেজ তৈরি করি এবং এটি আর্টিফ্যাক্ট রেজিস্ট্রিতে সংরক্ষণ করি।

ডিরেক্টরি তৈরি করুন এবং এখানে আমাদের স্ক্রিপ্ট অনুলিপি করুন

cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning

finetuning.py

import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage

# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")

# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")

# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
    "json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)

################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))

################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03

# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50

################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False

# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
    attn_implementation="eager",
    pretrained_model_name_or_path=model_name,
    torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")

# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
    lora_alpha=lora_alpha,
    lora_dropout=lora_dropout,
    r=lora_r,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=[
        "q_proj",
        "k_proj",
        "v_proj",
        "o_proj",
        "gate_proj",
        "up_proj",
        "down_proj",
    ],
)

# Set training parameters
training_arguments = SFTConfig(
        bf16=bf16,
        dataset_kwargs={
            "add_special_tokens": False,  
            "append_concat_token": False, 
        },
        dataset_text_field="text",
        disable_tqdm=True,
        fp16=fp16,
        gradient_accumulation_steps=gradient_accumulation_steps,
        gradient_checkpointing=gradient_checkpointing,
        gradient_checkpointing_kwargs={"use_reentrant": False},
        group_by_length=group_by_length,
        log_on_each_node=False,
        logging_steps=logging_steps,
        learning_rate=learning_rate,
        lr_scheduler_type=lr_scheduler_type,
        max_grad_norm=max_grad_norm,
        max_seq_length=max_seq_length,
        max_steps=max_steps,
        num_train_epochs=num_train_epochs,
        optim=optim,
        output_dir=save_model_path,
        packing=packing,
        per_device_train_batch_size=per_device_train_batch_size,
        save_strategy=save_strategy,
        save_steps=save_steps,
        save_total_limit=save_total_limit,
        warmup_ratio=warmup_ratio,
        weight_decay=weight_decay,
    )

print(f"Configuring fine tuning completed")

# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
    model=model,
    train_dataset=dataset,
    peft_config=peft_config,
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    tokenizer=tokenizer,
    args=training_arguments,
    packing=packing,
)

print(f"Creating trainer completed")

# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")

# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")

# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
    low_cpu_mem_usage=True,
    pretrained_model_name_or_path=model_name,
    return_dict=True,
    torch_dtype=torch.float16,
)

model = PeftModel.from_pretrained(
    model=base_model,
    model_id=new_model,
)
model = model.merge_and_unload()

print(f"Merging the new model with base model completed")

accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")

print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
    is_main_process=accelerator.is_main_process,
    save_directory=save_model_path,
    save_function=accelerator.save,
)
print(f"Save unwrapped model completed")

print(f"Save new tokenizer started")
if accelerator.is_main_process:
    tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")

# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
    """Uploads a directory to GCS."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    for root, _, files in os.walk(model_dir):
        for file in files:
            local_file_path = os.path.join(root, file)
            gcs_file_path = os.path.relpath(local_file_path, model_dir)
            blob = bucket.blob(os.path.join(new_model, gcs_file_path))  # Use new_model_name
            blob.upload_from_filename(local_file_path)

# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")

ডকারফাইল

# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04

# Install necessary system packages
RUN apt-get update && \
    apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
    rm -rf /var/lib/apt/lists/*

# Copy requirements.txt into the container
COPY requirements.txt .

# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt

# Copy your finetune script into the container
COPY finetuning.py .

# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
 
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]

requirements.txt

accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4

এখন আমরা ফাইনটিউনিংয়ের জন্য একটি কন্টেইনার ইমেজ তৈরি করি এবং এটি আর্টিফ্যাক্ট রেজিস্ট্রিতে পুশ করি

gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest

12. এয়ারফ্লো 2 ওভারভিউ ইনক একটি DAG কি

এয়ারফ্লো হল ওয়ার্কফ্লো এবং ডেটা পাইপলাইন সাজানোর জন্য একটি প্ল্যাটফর্ম। এটি পাইথন কোডে এই ওয়ার্কফ্লোগুলিকে সংজ্ঞায়িত করতে DAGs (নির্দেশিত অ্যাসাইক্লিক গ্রাফ) ব্যবহার করে, দৃশ্যত কার্য এবং তাদের নির্ভরতাগুলিকে প্রতিনিধিত্ব করে।

এয়ারফ্লো, এর স্ট্যাটিক DAG এবং পাইথন-ভিত্তিক সংজ্ঞা সহ, পূর্বনির্ধারিত ওয়ার্কফ্লোগুলি নির্ধারণ এবং পরিচালনার জন্য উপযুক্ত। এর আর্কিটেকচারে এই ওয়ার্কফ্লোগুলি পর্যবেক্ষণ এবং পরিচালনার জন্য একটি ব্যবহারকারী-বান্ধব UI অন্তর্ভুক্ত রয়েছে।

মূলত, এয়ারফ্লো আপনাকে পাইথন ব্যবহার করে আপনার ডেটা পাইপলাইনগুলিকে সংজ্ঞায়িত, সময়সূচী এবং নিরীক্ষণ করতে দেয়, এটিকে ওয়ার্কফ্লো অর্কেস্ট্রেশনের জন্য একটি নমনীয় এবং শক্তিশালী হাতিয়ার করে তোলে।

13. আমাদের DAG এর ওভারভিউ

ec49964ad7d61491.png

DAG এর পূর্ণরূপ নির্দেশিত অ্যাসাইক্লিক গ্রাফ, এয়ারফ্লোতে একটি DAG নিজেই পুরো ওয়ার্কফ্লো বা পাইপলাইনকে প্রতিনিধিত্ব করে। এটি কাজ, তাদের নির্ভরতা এবং কার্যকর করার ক্রম সংজ্ঞায়িত করে।

এয়ারফ্লো কনফিগারেশন থেকে শুরু করা GKE ক্লাস্টারের একটি পড থেকে DAG-এর মধ্যে কর্মপ্রবাহের ইউনিটগুলি কার্যকর করা হয়।

সারাংশ:

এয়ারফ্লো: ডেটা ডাউনলোড - এই স্ক্রিপ্টটি Kaggle থেকে একটি মুভি রিভিউ ডেটাসেট পাওয়ার প্রক্রিয়াটিকে স্বয়ংক্রিয় করে এবং এটিকে আপনার GCS বালতিতে সংরক্ষণ করে, এটিকে আপনার ক্লাউড পরিবেশে আরও প্রক্রিয়াকরণ বা বিশ্লেষণের জন্য সহজেই উপলব্ধ করে।

এয়ারফ্লো: ডেটা প্রস্তুতি - কোডটি কাঁচা মুভি পর্যালোচনা ডেটাসেট নেয়, আমাদের ব্যবহারের ক্ষেত্রে প্রয়োজনীয় নয় এমন বহিরাগত ডেটা কলামগুলি সরিয়ে দেয় এবং অনুপস্থিত মান সহ ডেটাসেটগুলি মুছে দেয়৷ এর পরে, এটি ডেটাসেটটিকে মেশিন লার্নিংয়ের জন্য উপযুক্ত একটি প্রশ্ন-উত্তর বিন্যাসে গঠন করে এবং পরবর্তীতে ব্যবহারের জন্য GCS-এ আবার সংরক্ষণ করে।

এয়ারফ্লো: মডেল ফাইনটিউনিং - এই কোডটি LoRA (নিম্ন-র্যাঙ্ক অ্যাডাপ্টেশন) নামক একটি কৌশল ব্যবহার করে একটি বড় ভাষা মডেল (LLM) সূক্ষ্ম সুর করে এবং তারপর আপডেট করা মডেলটিকে সংরক্ষণ করে। এটি Google ক্লাউড স্টোরেজ থেকে একটি প্রাক-প্রশিক্ষিত LLM এবং একটি ডেটাসেট লোড করার মাধ্যমে শুরু হয়। তারপরে, এটি এই ডেটাসেটে মডেলটিকে দক্ষতার সাথে সূক্ষ্ম-টিউন করতে LoRA প্রয়োগ করে। অবশেষে, এটি টেক্সট জেনারেশন বা প্রশ্নের উত্তর দেওয়ার মতো অ্যাপ্লিকেশনগুলিতে পরবর্তী ব্যবহারের জন্য সূক্ষ্ম-টিউন করা মডেলটিকে Google ক্লাউড স্টোরেজে ফিরিয়ে দেয়।

বায়ুপ্রবাহ: মডেল পরিবেশন - অনুমানের জন্য vllm সহ GKE-তে সূক্ষ্ম সুর করা মডেল পরিবেশন করা।

এয়ারফ্লো: ফিডব্যাক লুপ - প্রতি xx সময়ে মডেলের পুনরায় প্রশিক্ষণ (ঘণ্টা, দৈনিক, সাপ্তাহিক)।

এই চিত্রটি ব্যাখ্যা করে যে কিভাবে এয়ারফ্লো 2 কাজ করে, যখন GKE-তে চালিত হয়।

8691f41166209a5d.png

14. RAG ব্যবহার করে একটি মডেলের ফিনিটিউনিং

এই কোডল্যাব রিট্রিভাল অগমেন্টেড জেনারেশন (RAG) ব্যবহার করার পরিবর্তে একটি LLM ফাইনটিউন করে।

আসুন এই দুটি পদ্ধতির তুলনা করা যাক:

ফাইনটিউনিং: একটি বিশেষ মডেল তৈরি করে: ফাইনটিউনিং এলএলএমকে একটি নির্দিষ্ট কাজ বা ডেটাসেটের সাথে খাপ খাইয়ে নেয়, এটি বহিরাগত ডেটা উত্সের উপর নির্ভর না করে স্বাধীনভাবে কাজ করার অনুমতি দেয়।

অনুমানকে সহজ করে: এটি একটি পৃথক পুনরুদ্ধার ব্যবস্থা এবং ডাটাবেসের প্রয়োজনীয়তা দূর করে, যার ফলে দ্রুত এবং সস্তা প্রতিক্রিয়া পাওয়া যায়, বিশেষ করে ঘন ঘন ব্যবহারের ক্ষেত্রে।

RAG: বাহ্যিক জ্ঞানের উপর নির্ভর করে: RAG প্রতিটি অনুরোধের জন্য জ্ঞানের ভিত্তি থেকে প্রাসঙ্গিক তথ্য পুনরুদ্ধার করে, আপ-টু-ডেট এবং নির্দিষ্ট ডেটা অ্যাক্সেস নিশ্চিত করে।

জটিলতা বাড়ায়: কুবারনেটস ক্লাস্টারের মতো একটি উত্পাদন পরিবেশে RAG প্রয়োগ করা প্রায়শই ডেটা প্রক্রিয়াকরণ এবং পুনরুদ্ধারের জন্য একাধিক মাইক্রোসার্ভিস জড়িত থাকে, সম্ভাব্য বিলম্বিতা এবং গণনামূলক খরচ বাড়ায়।

কেন ফাইনটিউনিং বেছে নেওয়া হয়েছিল:

যদিও RAG এই CodeLab-এ ব্যবহৃত ছোট ডেটাসেটের জন্য উপযুক্ত হবে, আমরা Airflow-এর জন্য একটি সাধারণ ব্যবহারের ক্ষেত্রে প্রদর্শন করতে ফাইনটিউনিং বেছে নিয়েছি। এই পছন্দটি আমাদের RAG-এর জন্য অতিরিক্ত অবকাঠামো এবং মাইক্রোসার্ভিসেস সেট আপ করার সূক্ষ্ম বিষয়গুলিকে গভীরভাবে দেখার পরিবর্তে ওয়ার্কফ্লো অর্কেস্ট্রেশন দিকগুলিতে ফোকাস করার অনুমতি দেয়৷

উপসংহার:

ফাইনটিউনিং এবং আরএজি উভয়ই তাদের নিজস্ব শক্তি এবং দুর্বলতা সহ মূল্যবান কৌশল। সর্বোত্তম পছন্দ আপনার প্রকল্পের নির্দিষ্ট প্রয়োজনীয়তার উপর নির্ভর করে, যেমন আপনার ডেটার আকার এবং জটিলতা, কর্মক্ষমতা চাহিদা এবং খরচ বিবেচনা।

15. DAG টাস্ক #1 - এয়ারফ্লোতে আপনার প্রথম ধাপ তৈরি করুন: ডেটা ডাউনলোড

এই DAG ইউনিটের একটি ওভারভিউ হিসাবে, একটি কন্টেইনার ছবিতে হোস্ট করা আমাদের পাইথন কোডটি Kaggle থেকে সর্বশেষ RottenTomatoes ডেটাসেট ডাউনলোড করে।

এই কোডটি GCS বালতিতে কপি করবেন না। আমরা শেষ ধাপ হিসেবে mlops-dag.py কপি করি, যেটিতে একটি পাইথন স্ক্রিপ্টের মধ্যে সমস্ত DAG ইউনিট ধাপ রয়েছে।

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        dataset_download

16. DAG টাস্ক #2 - এয়ারফ্লোতে আপনার দ্বিতীয় ধাপ তৈরি করুন: ডেটা প্রস্তুতি

এই DAG ইউনিটের একটি ওভারভিউ হিসাবে, আমরা একটি পান্ডাস ডেটাফ্রেমে GCS থেকে একটি CSV ফাইল (rotten_tomatoes_movie_reviews.csv) লোড করি।

এর পরে, আমরা পরীক্ষা এবং সম্পদের দক্ষতার জন্য DATASET_LIMIT ব্যবহার করে প্রক্রিয়া করা সারির সংখ্যা সীমিত করি এবং অবশেষে রূপান্তরিত ডেটাকে একটি আলিঙ্গন মুখ ডেটাসেটে রূপান্তর করি।

আপনি যদি মনোযোগ সহকারে লক্ষ্য করেন, আপনি দেখতে পাবেন যে আমরা মডেলটিতে "DATASET_LIMIT": "1000" সহ 1000টি সারি প্রশিক্ষণ দিচ্ছি, কারণ এটি একটি Nvidia L4 GPU-এ এটি করতে 20 মিনিট সময় নেয়৷

এই কোডটি GCS বালতিতে কপি করবেন না। আমরা শেষ ধাপে mlops-dag.py কপি করি, যেটিতে একটি পাইথন স্ক্রিপ্টের মধ্যে সমস্ত ধাপ রয়েছে।

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation

17. DAG টাস্ক #3 - এয়ারফ্লোতে আপনার তৃতীয় ধাপ তৈরি করুন: মডেল ফাইনটিউনিং

এই DAG ইউনিটের একটি সংক্ষিপ্ত বিবরণ হিসাবে, এখানে আমরা আমাদের নতুন ডেটাসেটের সাথে Gemma মডেলকে পরিমার্জিত করতে finetune.py চালাই।

এই কোডটি GCS বালতিতে কপি করবেন না। আমরা শেষ ধাপে mlops-dag.py কপি করি, যেটিতে একটি পাইথন স্ক্রিপ্টের মধ্যে সমস্ত ধাপ রয়েছে।

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Task 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Task 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Task 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        dataset_download >> data_preparation >> fine_tuning

18. DAG টাস্ক #4 - এয়ারফ্লোতে আপনার চূড়ান্ত ধাপ তৈরি করুন: অনুমান / মডেল পরিবেশন করা

ভিএলএলএম একটি শক্তিশালী ওপেন সোর্স লাইব্রেরি যা বিশেষভাবে এলএলএম-এর উচ্চ-পারফরম্যান্স অনুমানের জন্য ডিজাইন করা হয়েছে। Google Kubernetes Engine (GKE) তে মোতায়েন করা হলে, এটি কার্যকরভাবে LLM পরিবেশন করতে Kubernetes-এর স্কেলেবিলিটি এবং দক্ষতার ব্যবহার করে।

পদক্ষেপের সারসংক্ষেপ:

  • GCS বালতিতে DAG "mlops-dag.py" আপলোড করুন৷
  • দুটি Kubernetes YAML কনফিগারেশন ফাইল একটি GCS বালতিতে সেটআপ করার জন্য অনুলিপি করুন।

mlops-dag.py

import yaml

from os import path
from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException

GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")

def model_serving():
    config.load_incluster_config()
    k8s_apps_v1 = client.AppsV1Api()
    k8s_core_v1 = client.CoreV1Api()

    while True:
        try:
            k8s_apps_v1.delete_namespaced_deployment(
                    namespace="airflow",
                    name="inference-deployment",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Deployment inference-deployment deleted")
    
    with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_apps_v1.create_namespaced_deployment(
            body=dep, namespace="airflow")
        print(f"Deployment created. Status='{resp.metadata.name}'")
    
    while True:
        try:
            k8s_core_v1.delete_namespaced_service(
                    namespace="airflow",
                    name="llm-service",
                    body=client.V1DeleteOptions(
                    propagation_policy="Foreground", grace_period_seconds=5
                    )
            )
        except ApiException:
            break
    print("Service llm-service deleted")

    with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
        dep = yaml.safe_load(f)
        resp = k8s_core_v1.create_namespaced_service(
            body=dep, namespace="airflow")
        print(f"Service created. Status='{resp.metadata.name}'")

with DAG(dag_id="mlops-dag",
            start_date=datetime(2024,11,1),
            schedule_interval="@daily",
            catchup=False) as dag:

        # DAG Step 1: Fetch raw data to GCS Bucket
        dataset_download = KubernetesPodOperator(
            task_id="dataset_download_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
            name="dataset-download",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "KAGGLE_USERNAME":KAGGLE_USERNAME,
                    "KAGGLE_KEY":KAGGLE_KEY,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME
            }
        )

        # DAG Step 2: Run GKEJob for data preparation
        data_preparation = KubernetesPodOperator(
            task_id="data_pipeline_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
            name="data-preparation",
            service_account_name="airflow-mlops-sa",
            env_vars={
                    "GCP_PROJECT_ID":GCP_PROJECT_ID,
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "DATASET_LIMIT": "1000",
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 3: Run GKEJob for fine tuning
        fine_tuning = KubernetesPodOperator(
            task_id="fine_tuning_task",
            namespace=JOB_NAMESPACE,
            image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
            name="fine-tuning",
            service_account_name="airflow-mlops-sa",
            startup_timeout_seconds=600,
            container_resources=models.V1ResourceRequirements(
                    requests={"nvidia.com/gpu": "1"},
                    limits={"nvidia.com/gpu": "1"}
            ),
            env_vars={
                    "BUCKET_DATA_NAME":BUCKET_DATA_NAME,
                    "HF_TOKEN":HF_TOKEN
            }
        )

        # DAG Step 4: Run GKE Deployment for model serving
        model_serving = PythonOperator(
            task_id="model_serving",
            python_callable=model_serving
        )

        dataset_download >> data_preparation >> fine_tuning >> model_serving

আপনার পাইথন স্ক্রিপ্ট (ডিএজি ফাইল) আপলোড করুন, সেইসাথে কুবারনেটস ডিএজিএস জিসিএস বালতিতে প্রকাশ করে।

gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}

আপনার এয়ারফ্লো UI এ, আপনি mlops-dag দেখতে পাবেন।

  1. আন-পজ নির্বাচন করুন।
  2. একটি ম্যানুয়াল MLOps চক্র সম্পাদন করতে ট্রিগার DAG নির্বাচন করুন।

d537281b92d5e8bb.png

একবার আপনার DAG সম্পূর্ণ হয়ে গেলে, আপনি Airflow UI-তে এইরকম আউটপুট দেখতে পাবেন।

3ed42abf8987384e.png

চূড়ান্ত ধাপের পরে, আপনি মডেলের শেষ পয়েন্টটি ধরতে পারেন এবং মডেলটি পরীক্ষা করার জন্য একটি প্রম্পট পাঠাতে পারেন।

কার্ল কমান্ড জারি করার আগে প্রায় 5 মিনিট অপেক্ষা করুন, যাতে মডেলের অনুমান শুরু হতে পারে এবং লোড ব্যালেন্সার একটি বাহ্যিক IP ঠিকানা বরাদ্দ করতে পারে।

export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')

curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
    "prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
    "temperature": 0.1,
    "top_p": 1.0,
    "max_tokens": 128
}
EOF

আউটপুট:

19. অভিনন্দন!

আপনি GKE-তে Airflow 2 সহ একটি DAG পাইপলাইন ব্যবহার করে আপনার প্রথম AI ওয়ার্কফ্লো তৈরি করেছেন।

আপনি যে সংস্থানগুলি স্থাপন করেছেন সেগুলি আন-প্রভিশন করতে ভুলবেন না৷

20. উৎপাদনে এটি করা

যদিও কোডল্যাব আপনাকে GKE-তে এয়ারফ্লো 2 সেট আপ করার জন্য একটি চমত্কার অন্তর্দৃষ্টি প্রদান করেছে, বাস্তব জগতে আপনি উৎপাদনে এটি করার সময় নিম্নলিখিত বিষয়গুলির মধ্যে কয়েকটি বিবেচনা করতে চাইবেন।

Gradio বা অনুরূপ টুলিং ব্যবহার করে একটি ওয়েব ফ্রন্টএন্ড প্রয়োগ করুন।

হয় এখানে GKE এর সাথে কাজের চাপের জন্য স্বয়ংক্রিয় অ্যাপ্লিকেশন মনিটরিং কনফিগার করুন বা এখানে Airflow থেকে মেট্রিক্স রপ্তানি করুন।

মডেলটি দ্রুত সূক্ষ্ম সুর করার জন্য আপনার আরও বড় GPU-র প্রয়োজন হতে পারে, বিশেষ করে যদি আপনার কাছে বড় ডেটাসেট থাকে। যাইহোক, যদি আমরা একাধিক জিপিইউ জুড়ে মডেলটিকে প্রশিক্ষণ দিতে চাই তবে আমাদের ডেটাসেটটি বিভক্ত করতে হবে এবং প্রশিক্ষণটি ভাগ করতে হবে। এখানে PyTorch-এর সাথে FSDP- এর একটি ব্যাখ্যা (সম্পূর্ণভাবে শ্যার্ড করা ডেটা সমান্তরাল, GPU শেয়ারিং ব্যবহার করে সেই লক্ষ্য অর্জন করা হয়েছে। আরও পড়া এখানে Meta থেকে একটি ব্লগ পোস্টে এবং Pytorch ব্যবহার করে FSDP- এর এই টিউটোরিয়ালে পাওয়া যাবে।

Google ক্লাউড কম্পোজার হল একটি পরিচালিত এয়ারফ্লো পরিষেবা, তাই আপনাকে নিজেই এয়ারফ্লো বজায় রাখতে হবে না, শুধু আপনার DAG মোতায়েন করুন এবং আপনি চলে যান৷

আরও জানুন

লাইসেন্স

এই কাজটি ক্রিয়েটিভ কমন্স অ্যাট্রিবিউশন 2.0 জেনেরিক লাইসেন্সের অধীনে লাইসেন্সপ্রাপ্ত।