1. Visão geral
Este CodeLab demonstra como integrar práticas de DevOps ao aprendizado de máquina (MLOps) fazendo o download de um conjunto de dados, refinando um modelo e implantando o LLM no Google Kubernetes Engine (GKE) usando um DAG do Airflow com o menor nível de abstração. Por isso, estamos usando comandos gcloud e não terraform para que você possa seguir o laboratório passo a passo e entender facilmente cada processo da perspectiva do engenheiro de plataforma e do engenheiro de machine learning.
Neste guia prático, você vai aprender a usar o Airflow para simplificar seus fluxos de trabalho de IA, com uma demonstração clara e prática de todo o ciclo de vida do MLOps ao configurar um DAG.
O que você vai aprender
- Promover maior colaboração e entendimento entre engenheiros de plataforma e de aprendizado de máquina, quebrando silos de conhecimento e melhorando os fluxos de trabalho.
- Entenda como implantar, usar e gerenciar o Airflow 2 no GKE
- Configurar um DAG do Airflow de ponta a ponta
- Crie a base para sistemas de machine learning de produção com o GKE
- Instrumentar e operacionalizar sistemas de machine learning
- Entenda como a engenharia de plataforma se tornou um pilar de suporte essencial para MLOps
O que este codelab alcança
- Você pode fazer perguntas sobre filmes usando um LLM que ajustamos com base no Gemma-2-9b-it, veiculado no GKE com o vLLM.
Público-alvo
- Engenheiros de machine learning
- Engenheiros de plataforma
- Cientistas de dados
- Engenheiros de dados
- Engenheiros de DevOps
- Arquiteto de plataforma
- Engenheiros de clientes
Este CodeLab não é destinado a
- Como introdução aos fluxos de trabalho de IA/ML ou do GKE
- Como uma revisão de todo o conjunto de recursos do Airflow
2. A engenharia de plataforma ajuda engenheiros/cientistas de machine learning
A engenharia de plataforma e o MLOps são disciplinas interdependentes que colaboram para criar um ambiente robusto e eficiente para o desenvolvimento e a implantação de ML.
Escopo:a engenharia de plataforma tem um escopo mais amplo do que o MLOps, abrangendo todo o ciclo de vida do desenvolvimento de software e fornecendo as ferramentas e a infraestrutura para isso.
O MLOps preenche a lacuna entre o desenvolvimento, a implantação e a inferência de ML.
Especialização:os engenheiros de plataforma geralmente têm grande experiência em tecnologias de infraestrutura, como computação em nuvem, contêinerização e gerenciamento de dados.
Os engenheiros MLOps são especialistas em desenvolvimento, implantação e monitoramento de modelos de ML, muitas vezes com habilidades de ciência de dados e engenharia de software.
Ferramentas:os engenheiros de plataforma criam ferramentas para provisionamento de infraestrutura, gerenciamento de configuração, orquestração de contêineres e scaffolding de aplicativos. Os engenheiros de MLOps usam ferramentas para treinamento, experimentação, implantação, monitoramento e versionamento de modelos de ML.
3. Configuração e requisitos do Google Cloud
Configuração de ambiente autoguiada
- Faça login no Console do Google Cloud e crie um novo projeto ou reutilize um existente. Crie uma conta do Gmail ou do Google Workspace, se ainda não tiver uma.
- O Nome do projeto é o nome de exibição para os participantes do projeto. É uma string de caracteres não usada pelas APIs do Google e pode ser atualizada quando você quiser.
- O ID do projeto precisa ser exclusivo em todos os projetos do Google Cloud e não pode ser mudado após a definição. O console do Cloud gera automaticamente uma string exclusiva. Em geral, não importa o que seja. Na maioria dos codelabs, é necessário fazer referência ao ID do projeto, normalmente identificado como
PROJECT_ID
. Se você não gostar do ID gerado, crie outro aleatório. Se preferir, teste o seu e confira se ele está disponível. Ele não pode ser mudado após essa etapa e permanece durante o projeto. - Para sua informação, há um terceiro valor, um Número do projeto, que algumas APIs usam. Saiba mais sobre esses três valores na documentação.
- Em seguida, ative o faturamento no console do Cloud para usar os recursos/APIs do Cloud. A execução deste codelab não vai ser muito cara, se tiver algum custo. Para encerrar os recursos e evitar cobranças além deste tutorial, exclua os recursos criados ou exclua o projeto. Novos usuários do Google Cloud estão qualificados para o programa de US$ 300 de avaliação sem custos.
Inicie o Cloud Shell
Embora o Google Cloud possa ser operado remotamente em seu laptop, neste codelab usaremos o Cloud Shell, um ambiente de linha de comando executado no Cloud.
Ativar o Cloud Shell
- No Console do Cloud, clique em Ativar o Cloud Shell
.
Se esta for a primeira vez que você iniciar o Cloud Shell, vai aparecer uma tela intermediária com a descrição dele. Se esse for o caso, clique em Continuar.
Leva apenas alguns instantes para provisionar e se conectar ao Cloud Shell.
Essa máquina virtual contém todas as ferramentas de desenvolvimento necessárias. Ela oferece um diretório principal persistente de 5 GB e é executada no Google Cloud. Isso aprimora o desempenho e a autenticação da rede. Praticamente todo o trabalho neste codelab pode ser feito em um navegador.
Depois de se conectar ao Cloud Shell, você vai notar que sua conta já está autenticada e que o projeto está configurado com seu ID do projeto.
- Execute o seguinte comando no Cloud Shell para confirmar se a conta está autenticada:
gcloud auth list
Resposta ao comando
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
- Execute o comando a seguir no Cloud Shell para confirmar se o comando gcloud sabe sobre seu projeto:
gcloud config list project
Resposta ao comando
[core] project = <PROJECT_ID>
Se o projeto não estiver configurado, configure-o usando este comando:
gcloud config set project <PROJECT_ID>
Resposta ao comando
Updated property [core/project].
4. Etapa 1: inscrever-se e autenticar no Kaggle
Para começar o CodeLab, você precisa criar uma conta no Kaggle, uma plataforma de comunidade on-line para cientistas de dados e entusiastas de aprendizado de máquina, que é propriedade do Google e hospeda um vasto repositório de conjuntos de dados disponíveis publicamente para vários domínios. É neste site que você vai fazer o download do conjunto de dados do RottenTomatoes, usado para treinar seu modelo.
- Inscreva-se no Kaggle. Você pode usar o SSO do Google para fazer login.
- Aceitar Termos e Condições
- Acesse "Configurações" e confira seu nome de usuário nome de usuário.
- Na seção "API", selecione "Criar novo token do" Kaggle, que vai fazer o download de kaggle.json.
- Se você tiver algum problema, acesse a página de suporte aqui.
5. Etapa 2: inscrever-se e autenticar no HuggingFace
O HuggingFace é um local central para qualquer pessoa interagir com a tecnologia de aprendizado de máquina. Ele hospeda 900 mil modelos, 200 mil conjuntos de dados e 300 mil apps de demonstração (Spaces), todos de código aberto e disponíveis publicamente.
- Inscreva-se no HuggingFace: crie uma conta com um nome de usuário. Não é possível usar o SSO do Google.
- Confirmar seu endereço de e-mail
- Acesse este link e aceite a licença do modelo Gemma-2-9b-it.
- Crie um token do HuggingFace aqui
- Anote as credenciais do token, você vai precisar delas mais tarde
6. Etapa 3: criar os recursos de infraestrutura do Google Cloud necessários
Você vai configurar o GKE, o GCE, o Artifact Registry e aplicar funções do IAM usando a federação de identidade da carga de trabalho.
Seu fluxo de trabalho de IA emprega dois node pools, um para treinamento e outro para inferência. O nodepool de treinamento está usando uma VM GCE g2-standard-8 equipada com uma GPU Nvidia L4 Tensor Core. O nodepool de inferência está usando uma VM g2-standard-24 equipada com duas GPUs Nvidia L4 Tensor Core. Ao especificar a região, escolha uma em que a GPU necessária seja compatível ( link).
No Cloud Shell, execute estes comandos:
# Set environment variables
export CODELAB_PREFIX=mlops-airflow
export PROJECT_NUMBER=$(gcloud projects list --filter="${DEVSHELL_PROJECT_ID}" --format="value(PROJECT_NUMBER)")
SUFFIX=$(echo $RANDOM | md5sum | head -c 4; echo;)
export CLUSTER_NAME=${CODELAB_PREFIX}
export CLUSTER_SA=sa-${CODELAB_PREFIX}
export BUCKET_LOGS_NAME=${CODELAB_PREFIX}-logs-${SUFFIX}
export BUCKET_DAGS_NAME=${CODELAB_PREFIX}-dags-${SUFFIX}
export BUCKET_DATA_NAME=${CODELAB_PREFIX}-data-${SUFFIX}
export REPO_NAME=${CODELAB_PREFIX}-repo
export REGION=us-central1
# Enable Google API's
export PROJECT_ID=${DEVSHELL_PROJECT_ID}
gcloud config set project ${PROJECT_ID}
gcloud services enable \
container.googleapis.com \
cloudbuild.googleapis.com \
artifactregistry.googleapis.com \
storage.googleapis.com
# Create a VPC for the GKE cluster
gcloud compute networks create mlops --subnet-mode=auto
# Create IAM and the needed infrastructure (GKE, Bucket, Artifact Registry)
# Create an IAM Service Account
gcloud iam service-accounts create ${CLUSTER_SA} --display-name="SA for ${CLUSTER_NAME}"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com" --role roles/container.defaultNodeServiceAccount
# Create a GKE cluster
gcloud container clusters create ${CLUSTER_NAME} --zone ${REGION}-a --num-nodes=4 --network=mlops --create-subnetwork name=mlops-subnet --enable-ip-alias --addons GcsFuseCsiDriver --workload-pool=${DEVSHELL_PROJECT_ID}.svc.id.goog --no-enable-insecure-kubelet-readonly-port --service-account=${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com
# Create 1 x node pool for our cluster 1 x node with 1 x L4 GPU for model finetuning
gcloud container node-pools create training \
--accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-12 \
--num-nodes=1
# Create 1 x node pool for our cluster 1 x node with 2 x L4 GPUs for inference
gcloud container node-pools create inference\
--accelerator type=nvidia-l4,count=2,gpu-driver-version=latest \
--project=${PROJECT_ID} \
--location=${REGION}-a \
--node-locations=${REGION}-a \
--cluster=${CLUSTER_NAME} \
--machine-type=g2-standard-24 \
--num-nodes=1
# Download K8s credentials
gcloud container clusters get-credentials ${CLUSTER_NAME} --location ${REGION}-a
# Create Artifact Registry
gcloud artifacts repositories create ${REPO_NAME} --repository-format=docker --location=${REGION}
gcloud artifacts repositories add-iam-policy-binding ${REPO_NAME} --member=serviceAccount:${CLUSTER_SA}@${DEVSHELL_PROJECT_ID}.iam.gserviceaccount.com --role=roles/artifactregistry.reader --location=${REGION}
Criar manifestos YAML
mkdir manifests
cd manifests
mlops-sa.yaml
apiVersion: v1
kind: ServiceAccount
automountServiceAccountToken: true
metadata:
name: airflow-mlops-sa
namespace: airflow
labels:
tier: airflow
pv-dags.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-dags
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 5Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_DAGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pv-logs.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: airflow-logs
spec:
accessModes:
- ReadWriteMany
capacity:
storage: 100Gi
storageClassName: standard
mountOptions:
- implicit-dirs
csi:
driver: gcsfuse.csi.storage.gke.io
volumeHandle: BUCKET_LOGS_NAME
volumeAttributes:
gcsfuseLoggingSeverity: warning
pvc-dags.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-dags
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
volumeName: airflow-dags
storageClassName: standard
pvc-logs.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: airflow-logs
namespace: airflow
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 100Gi
volumeName: airflow-logs
storageClassName: standard
namespace.yaml
kind: Namespace
apiVersion: v1
metadata:
name: airflow
labels:
name: airflow
sa-role.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: airflow
name: airflow-deployment-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["create", "get", "list", "watch", "update", "patch", "delete"]
- apiGroups: [""]
resources: ["services"]
verbs: ["create", "get", "list", "watch", "patch", "update", "delete"]
sa-rolebinding.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-deployment-rolebinding
namespace: airflow
subjects:
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
roleRef:
kind: Role
name: airflow-deployment-role
apiGroup: rbac.authorization.k8s.io
inference.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: inference-deployment
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: gemma-server
template:
metadata:
labels:
app: gemma-server
ai.gke.io/model: gemma-2-9b-it
ai.gke.io/inference-server: vllm
annotations:
gke-gcsfuse/volumes: "true"
spec:
serviceAccountName: airflow-mlops-sa
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
- key: "on-demand"
value: "true"
operator: "Equal"
effect: "NoSchedule"
containers:
- name: inference-server
image: vllm/vllm-openai:latest
ports:
- containerPort: 8000
resources:
requests:
nvidia.com/gpu: "2"
limits:
nvidia.com/gpu: "2"
command: ["/bin/sh", "-c"]
args:
- |
python3 -m vllm.entrypoints.api_server --model=/modeldata/fine_tuned_model --tokenizer=/modeldata/fine_tuned_model --tensor-parallel-size=2
volumeMounts:
- mountPath: /dev/shm
name: dshm
- name: gcs-fuse-csi-ephemeral
mountPath: /modeldata
readOnly: true
volumes:
- name: dshm
emptyDir:
medium: Memory
- name: gcs-fuse-csi-ephemeral
csi:
driver: gcsfuse.csi.storage.gke.io
volumeAttributes:
bucketName: BUCKET_DATA_NAME
mountOptions: "implicit-dirs,file-cache:enable-parallel-downloads:true,file-cache:max-parallel-downloads:-1"
fileCacheCapacity: "20Gi"
fileCacheForRangeRead: "true"
metadataStatCacheCapacity: "-1"
metadataTypeCacheCapacity: "-1"
metadataCacheTTLSeconds: "-1"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
inference-service.yaml
apiVersion: v1
kind: Service
metadata:
name: llm-service
namespace: airflow
spec:
selector:
app: gemma-server
type: LoadBalancer
ports:
- protocol: TCP
port: 8000
targetPort: 8000
Crie três buckets do Google Cloud Storage (GCS)
gcloud storage buckets create gs://${BUCKET_LOGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DAGS_NAME} --location=${REGION}
gcloud storage buckets create gs://${BUCKET_DATA_NAME} --location=${REGION}
# Create the namespace in GKE
kubectl apply -f namespace.yaml
# Create the PV and PVC in GKE for Airflow DAGs storage
sed -i "s/BUCKET_DAGS_NAME/${BUCKET_DAGS_NAME}/g" pv-dags.yaml
sed -i "s/BUCKET_LOGS_NAME/${BUCKET_LOGS_NAME}/g" pv-logs.yaml
sed -i "s/BUCKET_DATA_NAME/${BUCKET_DATA_NAME}/g" inference.yaml
kubectl apply -f pv-dags.yaml
kubectl apply -f pv-logs.yaml
kubectl apply -f pvc-dags.yaml
kubectl apply -f pvc-logs.yaml
kubectl apply -f mlops-sa.yaml
kubectl apply -f sa-role.yaml
kubectl apply -f sa-rolebinding.yaml
Add the necessary IAM roles to access buckets from Airflow using Workload Identity Federation
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-scheduler" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-triggerer" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-worker" --role "roles/container.developer"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/artifactregistry.reader"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-webserver" --role "roles/storage.objectUser"
gcloud projects add-iam-policy-binding ${DEVSHELL_PROJECT_ID} --member "principal://iam.googleapis.com/projects/${PROJECT_NUMBER}/locations/global/workloadIdentityPools/${DEVSHELL_PROJECT_ID}.svc.id.goog/subject/ns/airflow/sa/airflow-mlops-sa" --role "roles/storage.objectUser"
7. Etapa 4: instalar o Airflow no GKE usando o gráfico do Helm
Agora, vamos implantar o Airflow 2 usando o Helm. O Apache Airflow é uma plataforma de gerenciamento de fluxos de trabalho de código aberto para pipelines de engenharia de dados. Vamos abordar o conjunto de recursos do Airflow 2 mais tarde.
values.yaml para o gráfico do helm do Airflow
config:
webserver:
expose_config: true
webserver:
service:
type: LoadBalancer
podAnnotations:
gke-gcsfuse/volumes: "true"
executor: KubernetesExecutor
extraEnv: |-
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL
value: "30"
logs:
persistence:
enabled: true
existingClaim: "airflow-logs"
dags:
persistence:
enabled: true
existingClaim: "airflow-dags"
scheduler:
podAnnotations:
gke-gcsfuse/volumes: "true"
triggerer:
podAnnotations:
gke-gcsfuse/volumes: "true"
workers:
podAnnotations:
gke-gcsfuse/volumes: "true"
Implantar o Airflow 2
helm repo add apache-airflow https://airflow.apache.org
helm repo update
helm upgrade --install airflow apache-airflow/airflow --namespace airflow -f values.yaml
8. Etapa 5: inicializar o Airflow com conexões e variáveis
Depois que o Airflow 2 for implantado, vamos começar a configurá-lo. Definimos algumas variáveis, que são lidas pelos nossos scripts Python.
- Acessar a interface do Airflow na porta 8080 com seu navegador
Conferir o IP externo
kubectl -n airflow get svc/airflow-webserver --output jsonpath='{.status.loadBalancer.ingress[0].ip}'
Abra um navegador da Web e acesse http://<EXTERNAL-IP>:8080 . O login é admin / admin
- Crie uma conexão padrão do GCP na interface do Airflow. Para isso, acesse "Administrador" → "Conexões" → "+ Adicionar um novo registro".
- ID da conexão: google_cloud_default
- Tipo de conexão: Google Cloud
Clique em "Salvar".
- Crie as variáveis necessárias. Para isso, acesse "Administrador" > "Variáveis" > "+ Adicionar um novo registro".
- Chave: BUCKET_DATA_NAME - Valor: copiar de echo $BUCKET_DATA_NAME
- Chave: GCP_PROJECT_ID - Valor: copie de echo $DEVSHELL_PROJECT_ID
- Chave: HF_TOKEN - Valor: insira seu token HF
- Chave: KAGGLE_USERNAME - Valor: insira seu nome de usuário do Kaggle
- Chave: KAGGLE_KEY - Valor: copie isso de kaggle.json
Clique em "Salvar" depois de cada par de chave-valor.
A interface vai ficar assim:
9. Contêiner do código do aplicativo 1: download de dados
Neste script Python, fazemos a autenticação com o Kaggle para fazer o download do conjunto de dados no nosso bucket do GCS.
O script em si é contêinerizado porque se torna a unidade DAG 1. Esperamos que o conjunto de dados seja atualizado com frequência, então queremos automatizar isso.
Criar um diretório e copiar nossos scripts
cd .. ; mkdir 1-dataset-download
cd 1-dataset-download
dataset-download.py
import os
import kagglehub
from google.cloud import storage
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(f"File {source_file_name} uploaded to {destination_blob_name}.")
# Download latest version
path = kagglehub.dataset_download("priyamchoksi/rotten-tomato-movie-reviews-1-44m-rows")
print("Path to dataset files:", path)
destination_blob_name = "rotten_tomatoes_movie_reviews.csv"
source_file_name = f"{path}/{destination_blob_name}"
upload_blob(BUCKET_DATA_NAME, source_file_name, destination_blob_name)
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY dataset-download.py .
CMD ["python", "dataset-download.py"]
requirements.txt
google-cloud-storage==2.19.0
kagglehub==0.3.4
Agora vamos criar uma imagem de contêiner para o download do conjunto de dados e enviá-la para o Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/dataset-download:latest
10. Contêiner do código do aplicativo 2: preparação de dados
Durante a etapa de preparação de dados, conseguimos:
- Especifique quanto do conjunto de dados você quer usar para ajustar o modelo básico
- Carrega o conjunto de dados, ou seja, lê o arquivo CSV em um DataFrame do Pandas, que é uma estrutura de dados bidimensional para linhas e colunas.
- Transformação / pré-processamento de dados: determine quais partes do conjunto de dados são irrelevantes especificando o que queremos manter, o que significa remover o restante.
- Aplica a função
transform
a cada linha do DataFrame - Salvar os dados preparados no bucket do GCS
Criar um diretório e copiar nossos scripts
cd .. ; mkdir 2-data-preparation
cd 2-data-preparation
data-preparation.py
import os
import pandas as pd
import gcsfs
import json
from datasets import Dataset
# Environment variables
GCP_PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_DATA_NAME = os.getenv("BUCKET_DATA_NAME")
DATASET_NAME = os.getenv("DATASET_NAME", "rotten_tomatoes_movie_reviews.csv")
PREPARED_DATASET_NAME = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
DATASET_LIMIT = int(os.getenv("DATASET_LIMIT", "100")) # Process a limited number of rows, used 100 during testing phase but can be increased
DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{DATASET_NAME}"
PREPARED_DATASET_URL = f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATASET_NAME}"
# Load the dataset
print(f"Loading dataset from {DATASET_URL}...")
def transform(data):
"""
Transforms a row of the DataFrame into the desired format for fine-tuning.
Args:
data: A pandas Series representing a row of the DataFrame.
Returns:
A dictionary containing the formatted text.
"""
question = f"Review analysis for movie '{data['id']}'"
context = data['reviewText']
answer = data['scoreSentiment']
template = "Question: {question}\nContext: {context}\nAnswer: {answer}"
return {'text': template.format(question=question, context=context, answer=answer)}
try:
df = pd.read_csv(DATASET_URL, nrows=DATASET_LIMIT)
print(f"Dataset loaded successfully.")
# Drop rows with NaN values in relevant columns
df = df.dropna(subset=['id', 'reviewText', 'scoreSentiment'])
# Apply transformation to the DataFrame
transformed_data = df.apply(transform, axis=1).tolist()
# Convert transformed data to a DataFrame and then to a Hugging Face Dataset
transformed_df = pd.DataFrame(transformed_data)
dataset = Dataset.from_pandas(transformed_df)
# Save the prepared dataset to JSON lines format
with gcsfs.GCSFileSystem(project=GCP_PROJECT_ID).open(PREPARED_DATASET_URL, 'w') as f:
for item in dataset:
f.write(json.dumps(item) + "\n")
print(f"Prepared dataset saved to {PREPARED_DATASET_URL}")
except Exception as e:
print(f"Error during data loading or preprocessing: {e}")
import traceback
print(traceback.format_exc())
Dockerfile
FROM python:3.13.0-slim-bookworm
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
COPY data-preparation.py .
CMD ["python", "data-preparation.py"]
requirements.txt
datasets==3.1.0
gcsfs==2024.9.0
pandas==2.2.3
# Now we create a container images for data-preparation and push it to the Artifact Registry
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/data-preparation:latest
11. Contêiner do código do aplicativo 3: ajuste fino
Aqui, usamos o Gemma-2-9b-it como um modelo de base e, em seguida, ajustamos com nosso novo conjunto de dados.
Esta é a sequência de etapas que acontecem durante a etapa de ajuste fino.
1. Configuração: importe bibliotecas, defina parâmetros (para modelo, dados e treinamento) e carregue o conjunto de dados do Google Cloud Storage.
2. Carregar modelo:carregue um modelo de linguagem pré-treinado com quantização para eficiência e o tokenizador correspondente.
3. Configurar a LoRA:configure a adaptação de baixa classificação (LoRA) para ajustar o modelo de maneira eficiente adicionando pequenas matrizes treináveis.
4. Train:defina os parâmetros de treinamento e use o SFTTrainer
para ajustar o modelo no conjunto de dados carregado usando o tipo de quantização FP16.
5. Salvar e fazer upload:salve o modelo e o tokenizer ajustados localmente e faça upload deles para o bucket do GCS.
Em seguida, criamos uma imagem de contêiner usando o Cloud Build e a armazenamos no Artifact Registry.
Criar um diretório e copiar nossos scripts
cd .. ; mkdir 3-fine-tuning
cd 3-fine-tuning
finetuning.py
import os
import torch
import bitsandbytes
from accelerate import Accelerator
from datasets import Dataset, load_dataset, load_from_disk
from peft import LoraConfig, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM, SFTConfig, SFTTrainer
from google.cloud import storage
# Environment variables
BUCKET_DATA_NAME = os.environ["BUCKET_DATA_NAME"]
PREPARED_DATA_URL = os.getenv("PREPARED_DATA_URL", "prepared_data.jsonl")
# Finetuned model name
new_model = os.getenv("NEW_MODEL_NAME", "fine_tuned_model")
# Base model from the Hugging Face hub
model_name = os.getenv("MODEL_ID", "google/gemma-2-9b-it")
# Root path for saving the finetuned model
save_model_path = os.getenv("MODEL_PATH", "./output")
# Load tokenizer
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right" # Fix weird overflow issue with fp16 training
print("Tokenizer loaded successfully!")
# Load dataset
EOS_TOKEN = tokenizer.eos_token
dataset = load_dataset(
"json", data_files=f"gs://{BUCKET_DATA_NAME}/{PREPARED_DATA_URL}", split="train")
print(dataset)
################################################################################
# LoRA parameters
################################################################################
# LoRA attention dimension
lora_r = int(os.getenv("LORA_R", "8"))
# Alpha parameter for LoRA scaling
lora_alpha = int(os.getenv("LORA_ALPHA", "16"))
# Dropout probability for LoRA layers
lora_dropout = float(os.getenv("LORA_DROPOUT", "0.1"))
################################################################################
# TrainingArguments parameters
################################################################################
# Number of training epochs
num_train_epochs = int(os.getenv("EPOCHS", 1))
# Set fp16/bf16 training (set bf16 to True with an A100)
fp16 = False
bf16 = False
# Batch size per GPU for training
per_device_train_batch_size = int(os.getenv("TRAIN_BATCH_SIZE", "1"))
# Batch size per GPU for evaluation
per_device_eval_batch_size = 1
# Number of update steps to accumulate the gradients for
gradient_accumulation_steps = int(os.getenv("GRADIENT_ACCUMULATION_STEPS", "1"))
# Enable gradient checkpointing
gradient_checkpointing = True
# Maximum gradient normal (gradient clipping)
max_grad_norm = 0.3
# Initial learning rate (AdamW optimizer)
learning_rate = 2e-4
# Weight decay to apply to all layers except bias/LayerNorm weights
weight_decay = 0.001
# Optimizer to use
optim = "paged_adamw_32bit"
# Learning rate schedule
lr_scheduler_type = "cosine"
# Number of training steps (overrides num_train_epochs)
max_steps = -1
# Ratio of steps for a linear warmup (from 0 to learning rate)
warmup_ratio = 0.03
# Group sequences into batches with same length
# Saves memory and speeds up training considerably
group_by_length = True
# Save strategy: steps, epoch, no
save_strategy = os.getenv("CHECKPOINT_SAVE_STRATEGY", "steps")
# Save total limit of checkpoints
save_total_limit = int(os.getenv("CHECKPOINT_SAVE_TOTAL_LIMIT", "5"))
# Save checkpoint every X updates steps
save_steps = int(os.getenv("CHECKPOINT_SAVE_STEPS", "1000"))
# Log every X updates steps
logging_steps = 50
################################################################################
# SFT parameters
################################################################################
# Maximum sequence length to use
max_seq_length = int(os.getenv("MAX_SEQ_LENGTH", "512"))
# Pack multiple short examples in the same input sequence to increase efficiency
packing = False
# Load base model
print(f"Loading base model started")
model = AutoModelForCausalLM.from_pretrained(
attn_implementation="eager",
pretrained_model_name_or_path=model_name,
torch_dtype=torch.float16,
)
model.config.use_cache = False
model.config.pretraining_tp = 1
print("Loading base model completed")
# Configure fine-tuning with LoRA
print(f"Configuring fine tuning started")
peft_config = LoraConfig(
lora_alpha=lora_alpha,
lora_dropout=lora_dropout,
r=lora_r,
bias="none",
task_type="CAUSAL_LM",
target_modules=[
"q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj",
],
)
# Set training parameters
training_arguments = SFTConfig(
bf16=bf16,
dataset_kwargs={
"add_special_tokens": False,
"append_concat_token": False,
},
dataset_text_field="text",
disable_tqdm=True,
fp16=fp16,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=gradient_checkpointing,
gradient_checkpointing_kwargs={"use_reentrant": False},
group_by_length=group_by_length,
log_on_each_node=False,
logging_steps=logging_steps,
learning_rate=learning_rate,
lr_scheduler_type=lr_scheduler_type,
max_grad_norm=max_grad_norm,
max_seq_length=max_seq_length,
max_steps=max_steps,
num_train_epochs=num_train_epochs,
optim=optim,
output_dir=save_model_path,
packing=packing,
per_device_train_batch_size=per_device_train_batch_size,
save_strategy=save_strategy,
save_steps=save_steps,
save_total_limit=save_total_limit,
warmup_ratio=warmup_ratio,
weight_decay=weight_decay,
)
print(f"Configuring fine tuning completed")
# Initialize the SFTTrainer
print(f"Creating trainer started")
trainer = SFTTrainer(
model=model,
train_dataset=dataset,
peft_config=peft_config,
dataset_text_field="text",
max_seq_length=max_seq_length,
tokenizer=tokenizer,
args=training_arguments,
packing=packing,
)
print(f"Creating trainer completed")
# Finetune the model
print("Starting fine-tuning...")
trainer.train()
print("Fine-tuning completed.")
# Save the fine-tuned model
print("Saving new model started")
trainer.model.save_pretrained(new_model)
print("Saving new model completed")
# Merge LoRA weights with the base model
print(f"Merging the new model with base model started")
base_model = AutoModelForCausalLM.from_pretrained(
low_cpu_mem_usage=True,
pretrained_model_name_or_path=model_name,
return_dict=True,
torch_dtype=torch.float16,
)
model = PeftModel.from_pretrained(
model=base_model,
model_id=new_model,
)
model = model.merge_and_unload()
print(f"Merging the new model with base model completed")
accelerator = Accelerator()
print(f"Accelerate unwrap model started")
unwrapped_model = accelerator.unwrap_model(model)
print(f"Accelerate unwrap model completed")
print(f"Save unwrapped model started")
unwrapped_model.save_pretrained(
is_main_process=accelerator.is_main_process,
save_directory=save_model_path,
save_function=accelerator.save,
)
print(f"Save unwrapped model completed")
print(f"Save new tokenizer started")
if accelerator.is_main_process:
tokenizer.save_pretrained(save_model_path)
print(f"Save new tokenizer completed")
# Upload the model to GCS
def upload_to_gcs(bucket_name, model_dir):
"""Uploads a directory to GCS."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for root, _, files in os.walk(model_dir):
for file in files:
local_file_path = os.path.join(root, file)
gcs_file_path = os.path.relpath(local_file_path, model_dir)
blob = bucket.blob(os.path.join(new_model, gcs_file_path)) # Use new_model_name
blob.upload_from_filename(local_file_path)
# Upload the fine-tuned model and tokenizer to GCS
upload_to_gcs(BUCKET_DATA_NAME, save_model_path)
print(f"Fine-tuned model {new_model} successfully uploaded to GCS.")
Dockerfile
# Using the NVIDIA CUDA base image
FROM nvidia/cuda:12.6.2-runtime-ubuntu22.04
# Install necessary system packages
RUN apt-get update && \
apt-get -y --no-install-recommends install python3-dev gcc python3-pip git && \
rm -rf /var/lib/apt/lists/*
# Copy requirements.txt into the container
COPY requirements.txt .
# Install Python packages from requirements.txt
RUN pip install --no-cache-dir --upgrade pip && pip install --no-cache-dir -r requirements.txt
# Copy your finetune script into the container
COPY finetuning.py .
# Set the environment variable to ensure output is flushed
ENV PYTHONUNBUFFERED 1
ENV MODEL_ID "google/gemma-2-9b-it"
ENV GCS_BUCKET "finetuning-data-bucket"
# Set the command to run the finetuning script with CUDA device
CMD ["python3", "finetuning.py"]
requirements.txt
accelerate==1.1.1
bitsandbytes==0.45.0
datasets==3.1.0
gcsfs==2024.9.0
peft==v0.13.2
torch==2.5.1
transformers==4.47.0
trl==v0.11.4
Agora, vamos criar uma imagem de contêiner para ajustes e enviá-la para o Artifact Registry.
gcloud builds submit --tag ${REGION}-docker.pkg.dev/${DEVSHELL_PROJECT_ID}/${REPO_NAME}/finetuning:latest
12. Visão geral do Airflow 2, incluindo o que é um DAG
O Airflow é uma plataforma para orquestrar fluxos de trabalho e pipelines de dados. Ele usa DAGs (gráficos acíclicos dirigidos) para definir esses fluxos de trabalho em código Python, representando visualmente as tarefas e as dependências delas.
O Airflow, com seus DAGs estáticos e definições baseadas em Python, é adequado para programar e gerenciar fluxos de trabalho predefinidos. A arquitetura inclui uma interface fácil de usar para monitorar e gerenciar esses fluxos de trabalho.
O Airflow permite definir, programar e monitorar pipelines de dados usando Python, o que o torna uma ferramenta flexível e poderosa para orquestração de fluxos de trabalho.
13. Visão geral do nosso DAG
DAG significa "Gráfico acíclico dirigido". No Airflow, um DAG representa todo o fluxo de trabalho ou pipeline. Ele define as tarefas, as dependências delas e a ordem de execução.
As unidades de fluxo de trabalho no DAG são executadas em um pod no cluster do GKE, iniciado na configuração do Airflow.
Resumo:
Airflow: download de dados: esse script automatiza o processo de extração de um conjunto de dados de análise de filmes do Kaggle e o armazena no seu bucket do GCS, disponibilizando-o para processamento ou análise no seu ambiente de nuvem.
Airflow: preparação de dados: o código usa o conjunto de dados brutos de avaliações de filmes, remove colunas de dados externas que não são necessárias para nosso caso de uso e exclui conjuntos de dados com valores ausentes. Em seguida, ele estrutura o conjunto de dados em um formato de resposta a perguntas adequado para aprendizado de máquina e o armazena de volta no GCS para uso posterior.
Airflow: ajuste fino do modelo: esse código ajusta um modelo de linguagem grande (LLM) usando uma técnica chamada LoRA (adaptação de classificação baixa) e salva o modelo atualizado. Ele começa carregando um LLM pré-treinado e um conjunto de dados do Google Cloud Storage. Em seguida, ele aplica a LoRA para ajustar o modelo de forma eficiente nesse conjunto de dados. Por fim, o modelo ajustado é salvo no Google Cloud Storage para uso posterior em aplicativos como geração de texto ou resposta a perguntas.
Airflow: disponibilização de modelos: disponibilização do modelo ajustado no GKE com vllm para inferência.
Airflow: ciclo de feedback: reensine o modelo a cada xx vezes (por hora, dia ou semana).
Este diagrama explica como o Airflow 2 funciona quando executado no GKE.
14. Ajuste fino de um modelo versus uso da RAG
Este CodeLab ajusta um LLM em vez de usar a geração aumentada de recuperação (RAG).
Vamos comparar essas duas abordagens:
Ajuste fino: cria um modelo especializado. O ajuste fino adapta o LLM a uma tarefa ou conjunto de dados específico, permitindo que ele opere de forma independente sem depender de fontes de dados externas.
Simplifica a inferência: isso elimina a necessidade de um sistema de recuperação e um banco de dados separados, resultando em respostas mais rápidas e baratas, especialmente para casos de uso frequentes.
RAG:depende de conhecimento externo: a RAG recupera informações relevantes de uma base de conhecimento para cada solicitação, garantindo o acesso a dados específicos e atualizados.
Aumento da complexidade: a implementação da RAG em um ambiente de produção, como um cluster do Kubernetes, geralmente envolve vários microsserviços para processamento e recuperação de dados, o que pode aumentar a latência e os custos computacionais.
Por que a opção "Ajuste fino" foi escolhida:
Embora a RAG seja adequada para o pequeno conjunto de dados usado neste CodeLab, optamos por fazer ajustes para demonstrar um caso de uso típico do Airflow. Essa escolha nos permite focar nos aspectos de orquestração do fluxo de trabalho em vez de se aprofundar nas nuances de configuração de infraestrutura e microsserviços adicionais para o RAG.
Conclusão:
Tanto o ajuste fino quanto o RAG são técnicas valiosas com pontos fortes e fracos. A escolha ideal depende dos requisitos específicos do seu projeto, como o tamanho e a complexidade dos dados, as necessidades de desempenho e as considerações de custo.
15. Tarefa de DAG 1: criar a primeira etapa no Airflow: download de dados
Como visão geral desta unidade de DAG, nosso código Python hospedado em uma imagem de contêiner faz o download do conjunto de dados mais recente do RottenTomatoes do Kaggle.
Não copie esse código para o bucket do GCS. Copiamos mlops-dag.py como a última etapa, que contém todas as etapas da unidade DAG em um script Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
dataset_download
16. Tarefa 2 do DAG: criar a segunda etapa no Airflow: preparação de dados
Como visão geral desta unidade de DAG, carregamos um arquivo CSV (rotten_tomatoes_movie_reviews.csv) do GCS em um DataFrame do Pandas.
Em seguida, limitamos o número de linhas processadas usando DATASET_LIMIT para testes e eficiência de recursos e, por fim, convertemos os dados transformados em um conjunto de dados de rostos abraços.
Se você olhar com atenção, vai notar que estamos treinando 1.000 linhas no modelo com "DATASET_LIMIT": "1000". Isso ocorre porque leva 20 minutos em uma GPU Nvidia L4 para fazer isso.
Não copie esse código para o bucket do GCS. Copiamos mlops-dag.py na última etapa, que contém todas as etapas em um script Python.
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation
17. Tarefa 3 do DAG: crie a terceira etapa no Airflow: ajuste fino do modelo
Como visão geral desta unidade de DAG, aqui executamos finetune.py para refinar o modelo Gemma com nosso novo conjunto de dados.
Não copie esse código para o bucket do GCS. Copiamos mlops-dag.py na última etapa, que contém todas as etapas em um script Python.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Task 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Task 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Task 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
dataset_download >> data_preparation >> fine_tuning
18. Tarefa de DAG 4: criar a etapa final no Airflow: inferência / disponibilização do modelo
O vLLM (link em inglês) é uma biblioteca de código aberto poderosa, desenvolvida especificamente para inferência de LLMs de alto desempenho. Quando implantado no Google Kubernetes Engine (GKE), ele aproveita a escalabilidade e a eficiência do Kubernetes para oferecer LLMs de maneira eficaz.
Resumo das etapas:
- Faça upload do DAG "mlops-dag.py" no bucket do GCS.
- Copie dois arquivos de configuração YAML do Kubernetes para configurar a inferência em um bucket do GCS.
mlops-dag.py
import yaml
from os import path
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models
from kubernetes.client.rest import ApiException
GCP_PROJECT_ID = Variable.get("GCP_PROJECT_ID")
BUCKET_DATA_NAME = Variable.get("BUCKET_DATA_NAME")
HF_TOKEN = Variable.get("HF_TOKEN")
KAGGLE_USERNAME = Variable.get("KAGGLE_USERNAME")
KAGGLE_KEY = Variable.get("KAGGLE_KEY")
JOB_NAMESPACE = Variable.get("JOB_NAMESPACE", default_var="airflow")
def model_serving():
config.load_incluster_config()
k8s_apps_v1 = client.AppsV1Api()
k8s_core_v1 = client.CoreV1Api()
while True:
try:
k8s_apps_v1.delete_namespaced_deployment(
namespace="airflow",
name="inference-deployment",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Deployment inference-deployment deleted")
with open(path.join(path.dirname(__file__), "inference.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="airflow")
print(f"Deployment created. Status='{resp.metadata.name}'")
while True:
try:
k8s_core_v1.delete_namespaced_service(
namespace="airflow",
name="llm-service",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
)
)
except ApiException:
break
print("Service llm-service deleted")
with open(path.join(path.dirname(__file__), "inference-service.yaml")) as f:
dep = yaml.safe_load(f)
resp = k8s_core_v1.create_namespaced_service(
body=dep, namespace="airflow")
print(f"Service created. Status='{resp.metadata.name}'")
with DAG(dag_id="mlops-dag",
start_date=datetime(2024,11,1),
schedule_interval="@daily",
catchup=False) as dag:
# DAG Step 1: Fetch raw data to GCS Bucket
dataset_download = KubernetesPodOperator(
task_id="dataset_download_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/dataset-download:latest",
name="dataset-download",
service_account_name="airflow-mlops-sa",
env_vars={
"KAGGLE_USERNAME":KAGGLE_USERNAME,
"KAGGLE_KEY":KAGGLE_KEY,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME
}
)
# DAG Step 2: Run GKEJob for data preparation
data_preparation = KubernetesPodOperator(
task_id="data_pipeline_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/data-preparation:latest",
name="data-preparation",
service_account_name="airflow-mlops-sa",
env_vars={
"GCP_PROJECT_ID":GCP_PROJECT_ID,
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"DATASET_LIMIT": "1000",
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 3: Run GKEJob for fine tuning
fine_tuning = KubernetesPodOperator(
task_id="fine_tuning_task",
namespace=JOB_NAMESPACE,
image="us-central1-docker.pkg.dev/{{ var.value.GCP_PROJECT_ID }}/mlops-airflow-repo/finetuning:latest",
name="fine-tuning",
service_account_name="airflow-mlops-sa",
startup_timeout_seconds=600,
container_resources=models.V1ResourceRequirements(
requests={"nvidia.com/gpu": "1"},
limits={"nvidia.com/gpu": "1"}
),
env_vars={
"BUCKET_DATA_NAME":BUCKET_DATA_NAME,
"HF_TOKEN":HF_TOKEN
}
)
# DAG Step 4: Run GKE Deployment for model serving
model_serving = PythonOperator(
task_id="model_serving",
python_callable=model_serving
)
dataset_download >> data_preparation >> fine_tuning >> model_serving
Faça upload do script Python (arquivo DAG) e dos manifestos do Kubernetes no bucket do GCS para DAGS.
gcloud storage cp mlops-dag.py gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference.yaml gs://${BUCKET_DAGS_NAME}
gcloud storage cp manifests/inference-service.yaml gs://${BUCKET_DAGS_NAME}
Na interface do Airflow, você vai encontrar mlops-dag.
- Selecione "Reativar".
- Selecione "Acionar DAG" para realizar um ciclo manual de MLOps.
Quando o DAG for concluído, você vai ver uma saída como esta na interface do Airflow.
Após a etapa final, você pode acessar o endpoint do modelo e enviar uma solicitação para testar o modelo.
Aguarde aproximadamente cinco minutos antes de emitir o comando curl para que a inferência do modelo possa começar e o balanceador de carga possa atribuir um endereço IP externo.
export MODEL_ENDPOINT=$(kubectl -n airflow get svc/llm-service --output jsonpath='{.status.loadBalancer.ingress[0].ip}')
curl -X POST http://${MODEL_ENDPOINT}:8000/generate -H "Content-Type: application/json" -d @- <<EOF
{
"prompt": "Question: Review analysis for movie 'dangerous_men_2015'",
"temperature": 0.1,
"top_p": 1.0,
"max_tokens": 128
}
EOF
Saída:
19. Parabéns!
Você criou seu primeiro fluxo de trabalho de IA usando um pipeline DAG com o Airflow 2 no GKE.
Não se esqueça de cancelar o provisionamento dos recursos que você implantou.
20. Como fazer isso na produção
O CodeLab forneceu insights sobre como configurar o Airflow 2 no GKE, mas, na produção, você precisa considerar alguns dos seguintes tópicos.
Implemente um front-end da Web usando o Gradio ou ferramentas semelhantes.
Configure o monitoramento automático de aplicativos para cargas de trabalho com o GKE aqui ou exporte métricas do Airflow aqui.
Talvez você precise de GPUs maiores para ajustar o modelo mais rapidamente, especialmente se tiver conjuntos de dados maiores. No entanto, se quisermos treinar o modelo em várias GPUs, precisamos dividir o conjunto de dados e fragmentar o treinamento. Confira uma explicação sobre o FSDP com PyTorch (dados totalmente particionados em paralelo, usando compartilhamento de GPU para atingir esse objetivo. Leia mais sobre o assunto nesta postagem do blog da Meta e em outro tutorial sobre FSDP usando Pytorch.
O Google Cloud Composer é um serviço gerenciado do Airflow. Portanto, você não precisa manter o Airflow, basta implantar seu DAG e pronto.
Saiba mais
- Documentação do Airflow: https://airflow.apache.org/
Licença
Este conteúdo está sob a licença Atribuição 2.0 Genérica da Creative Commons.