Como executar o TorchServe e a difusão estável em GPUs do Cloud Run

1. Introdução

Visão geral

O Cloud Run adicionou recentemente suporte a GPU. Ele está disponível como uma prévia pública em lista de espera. Se você quiser testar o recurso, preencha este formulário para entrar na lista de espera. O Cloud Run é uma plataforma de contêineres no Google Cloud que facilita a execução do código em um contêiner, sem a necessidade de gerenciar um cluster.

Atualmente, as GPUs disponíveis são as Nvidia L4 com 24 GB de vRAM. Há uma GPU por instância do Cloud Run, e o escalonamento automático do Cloud Run ainda é aplicado. Isso inclui o escalonamento horizontal de até cinco instâncias (com aumento de cota disponível) e o escalonamento vertical para zero instâncias quando não há solicitações.

Neste codelab, você vai criar e implantar um app TorchServe que usa a difusão estável XL para gerar imagens a partir de um comando de texto. A imagem gerada é retornada ao autor da chamada como uma string codificada em base64.

Este exemplo é baseado em Como executar o modelo de difusão estável usando difusores do Hugging Face no Torchserve. Este codelab mostra como modificar esse exemplo para que ele funcione com o Cloud Run.

O que você vai aprender

  • Como executar um modelo de difusão estável XL no Cloud Run usando GPUs

2. Ativar APIs e definir variáveis de ambiente

Antes de começar a usar este codelab, você precisa ativar várias APIs. Este codelab exige o uso das seguintes APIs. Para ativar essas APIs, execute o seguinte comando:

gcloud services enable run.googleapis.com \
    storage.googleapis.com \
    cloudbuild.googleapis.com \

Em seguida, você pode definir as variáveis de ambiente que serão usadas neste codelab.

PROJECT_ID=<YOUR_PROJECT_ID>

REPOSITORY=repo
NETWORK_NAME=default
REGION=us-central1
IMAGE=us-central1-docker.pkg.dev/$PROJECT_ID/$REPOSITORY/gpu-torchserve

3. Criar o app Torchserve

Primeiro, crie um diretório para o código-fonte e entre nele.

mkdir stable-diffusion-codelab && cd $_

Crie um arquivo config.properties. Este é o arquivo de configuração do TorchServe.

inference_address=http://0.0.0.0:8080
enable_envvars_config=true
min_workers=1
max_workers=1
default_workers_per_model=1
default_response_timeout=1000
load_models=all
max_response_size=655350000
# to enable authorization, see https://github.com/pytorch/serve/blob/master/docs/token_authorization_api.md#how-to-set-and-disable-token-authorization
disable_token_authorization=true

Neste exemplo, o endereço de escuta http://0.0.0.0 é usado para funcionar no Cloud Run. A porta padrão do Cloud Run é 8080.

Crie um arquivo requirements.txt.

python-dotenv
accelerate
transformers
diffusers
numpy
google-cloud-storage
nvgpu

Crie um arquivo chamado stable_diffusion_handler.py.

from abc import ABC
import base64
import datetime
import io
import logging
import os

from diffusers import StableDiffusionXLImg2ImgPipeline
from diffusers import StableDiffusionXLPipeline
from google.cloud import storage
import numpy as np
from PIL import Image
import torch
from ts.torch_handler.base_handler import BaseHandler


logger = logging.getLogger(__name__)


def image_to_base64(image: Image.Image) -> str:
  """Convert a PIL image to a base64 string."""
  buffer = io.BytesIO()
  image.save(buffer, format="JPEG")
  image_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
  return image_str


class DiffusersHandler(BaseHandler, ABC):
  """Diffusers handler class for text to image generation."""

  def __init__(self):
    self.initialized = False

  def initialize(self, ctx):
    """In this initialize function, the Stable Diffusion model is loaded and

       initialized here.
    Args:
        ctx (context): It is a JSON Object containing information pertaining to
          the model artifacts parameters.
    """
    logger.info("Initialize DiffusersHandler")
    self.manifest = ctx.manifest
    properties = ctx.system_properties
    model_dir = properties.get("model_dir")
    model_name = os.environ["MODEL_NAME"]
    model_refiner = os.environ["MODEL_REFINER"]

    self.bucket = None

    logger.info(
        "GPU device count: %s",
        torch.cuda.device_count(),
    )
    logger.info(
        "select the GPU device, cuda is available: %s",
        torch.cuda.is_available(),
    )
    self.device = torch.device(
        "cuda:" + str(properties.get("gpu_id"))
        if torch.cuda.is_available() and properties.get("gpu_id") is not None
        else "cpu"
    )
    logger.info("Device used: %s", self.device)

    # open the pipeline to the inferenece model 
    # this is generating the image
    logger.info("Donwloading model %s", model_name)
    self.pipeline = StableDiffusionXLPipeline.from_pretrained(
        model_name,
        variant="fp16",
        torch_dtype=torch.float16,
        use_safetensors=True,
    ).to(self.device)
    logger.info("done donwloading model %s", model_name)

    # open the pipeline to the refiner
    # refiner is used to remove artifacts from the image
    logger.info("Donwloading refiner %s", model_refiner)
    self.refiner = StableDiffusionXLImg2ImgPipeline.from_pretrained(
        model_refiner,
        variant="fp16",
        torch_dtype=torch.float16,
        use_safetensors=True,
    ).to(self.device)
    logger.info("done donwloading refiner %s", model_refiner)

    self.n_steps = 40
    self.high_noise_frac = 0.8
    self.initialized = True
    # Commonly used basic negative prompts.
    logger.info("using negative_prompt")
    self.negative_prompt = ("worst quality, normal quality, low quality, low res, blurry")

  # this handles the user request
  def preprocess(self, requests):
    """Basic text preprocessing, of the user's prompt.

    Args:
        requests (str): The Input data in the form of text is passed on to the
          preprocess function.

    Returns:
        list : The preprocess function returns a list of prompts.
    """
    logger.info("Process request started")
    inputs = []
    for _, data in enumerate(requests):
      input_text = data.get("data")
      if input_text is None:
        input_text = data.get("body")
      if isinstance(input_text, (bytes, bytearray)):
        input_text = input_text.decode("utf-8")
      logger.info("Received text: '%s'", input_text)
      inputs.append(input_text)
    return inputs

  def inference(self, inputs):
    """Generates the image relevant to the received text.

    Args:
        input_batch (list): List of Text from the pre-process function is passed
          here

    Returns:
        list : It returns a list of the generate images for the input text
    """
    logger.info("Inference request started")
    # Handling inference for sequence_classification.
    image = self.pipeline(
        prompt=inputs,
        negative_prompt=self.negative_prompt,
        num_inference_steps=self.n_steps,
        denoising_end=self.high_noise_frac,
        output_type="latent",
    ).images
    logger.info("Done model")

    image = self.refiner(
        prompt=inputs,
        negative_prompt=self.negative_prompt,
        num_inference_steps=self.n_steps,
        denoising_start=self.high_noise_frac,
        image=image,
    ).images
    logger.info("Done refiner")

    return image

  def postprocess(self, inference_output):
    """Post Process Function converts the generated image into Torchserve readable format.

    Args:
        inference_output (list): It contains the generated image of the input
          text.

    Returns:
        (list): Returns a list of the images.
    """
    logger.info("Post process request started")
    images = []
    response_size = 0
    for image in inference_output:
      # Save image to GCS
      if self.bucket:
        image.save("temp.jpg")

        # Create a blob object
        blob = self.bucket.blob(
            datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".jpg"
        )

        # Upload the file
        blob.upload_from_filename("temp.jpg")

      # to see the image, encode to base64
      encoded = image_to_base64(image)
      response_size += len(encoded)
      images.append(encoded)

    logger.info("Images %d, response size: %d", len(images), response_size)
    return images

Crie um arquivo chamado start.sh. Ele é usado como um ponto de entrada no contêiner para iniciar o TorchServe.

#!/bin/bash

echo "starting the server"
# start the server. By default torchserve runs in backaround, and start.sh will immediately terminate when done
# so use --foreground to keep torchserve running in foreground while start.sh is running in a container  
torchserve --start --ts-config config.properties --models "stable_diffusion=${MAR_FILE_NAME}.mar" --model-store ${MAR_STORE_PATH} --foreground

Em seguida, execute o comando a seguir para torná-lo um arquivo executável.

chmod 755 start.sh

Crie um dockerfile.

# pick a version of torchserve to avoid any future breaking changes
# docker pull pytorch/torchserve:0.11.1-cpp-dev-gpu
FROM pytorch/torchserve:0.11.1-cpp-dev-gpu AS base

USER root

WORKDIR /home/model-server

COPY requirements.txt ./
RUN pip install --upgrade -r ./requirements.txt

# Stage 1 build the serving container.
FROM base AS serve-gcs

ENV MODEL_NAME='stabilityai/stable-diffusion-xl-base-1.0'
ENV MODEL_REFINER='stabilityai/stable-diffusion-xl-refiner-1.0'

ENV MAR_STORE_PATH='/home/model-server/model-store'
ENV MAR_FILE_NAME='model'
RUN mkdir -p $MAR_STORE_PATH

COPY config.properties ./
COPY stable_diffusion_handler.py ./
COPY start.sh ./

# creates the mar file used by torchserve
RUN torch-model-archiver --force --model-name ${MAR_FILE_NAME} --version 1.0 --handler stable_diffusion_handler.py -r requirements.txt --export-path ${MAR_STORE_PATH}

# entrypoint
CMD ["./start.sh"]

4. Configurar o Cloud NAT

O Cloud NAT permite que você tenha uma largura de banda maior para acessar a Internet e fazer o download do modelo do HuggingFace, o que acelera significativamente os tempos de implantação.

Para usar o Cloud NAT, execute o seguinte comando para ativar uma instância do Cloud NAT:

gcloud compute routers create nat-router --network $NETWORK_NAME --region us-central1
gcloud compute routers nats create vm-nat --router=nat-router --region=us-central1 --auto-allocate-nat-external-ips --nat-all-subnet-ip-ranges

5. Criar e implantar o serviço do Cloud Run

Envie seu código para o Cloud Build.

gcloud builds submit --tag $IMAGE

Em seguida, implante no Cloud Run

gcloud beta run deploy gpu-torchserve \
 --image=$IMAGE \
 --cpu=8 --memory=32Gi \
 --gpu=1 --no-cpu-throttling --gpu-type=nvidia-l4 \
 --allow-unauthenticated \
 --region us-central1 \
 --project $PROJECT_ID \
 --execution-environment=gen2 \
 --max-instances 1 \
 --network $NETWORK_NAME \
 --vpc-egress all-traffic

6. Testar o serviço

Para testar o serviço, execute os seguintes comandos:

PROMPT_TEXT="a cat sitting in a magnolia tree"

SERVICE_URL=$(gcloud run services describe gpu-torchserve --region $REGION --format 'value(status.url)')

time curl $SERVICE_URL/predictions/stable_diffusion -d "data=$PROMPT_TEXT" | base64 --decode > image.jpg

O arquivo image.jpg vai aparecer no seu diretório atual. Você pode abrir a imagem no editor do Cloud Shell para conferir a imagem de um gato sentado em uma árvore.

7. Parabéns!

Parabéns por concluir o codelab.

Recomendamos a leitura da documentação sobre GPUs do Cloud Run.

O que vimos

  • Como executar um modelo de difusão estável XL no Cloud Run usando GPUs

8. Limpar

Para evitar cobranças acidentais (por exemplo, se esse job do Cloud Run for invocado acidentalmente mais vezes do que sua alocação mensal de invocação do Cloud Run no nível sem custo financeiro), exclua o job do Cloud Run ou o projeto criado na etapa 2.

Para excluir o job do Cloud Run, acesse o console do Cloud Run em https://console.cloud.google.com/run/ e exclua o serviço gpu-torchserve.

Você também precisa excluir a configuração do Cloud NAT.

Se você quiser excluir o projeto inteiro, acesse https://console.cloud.google.com/cloud-resource-manager, selecione o projeto criado na etapa 2 e escolha "Excluir". Se você excluir o projeto, vai precisar mudar os projetos no Cloud SDK. Para conferir a lista de todos os projetos disponíveis, execute gcloud projects list.