如何在 Cloud Run GPU 上运行 TorchServe 和 Stable Diffusion

1. 简介

概览

Cloud Run 最近添加了 GPU 支持。此功能目前处于公开预览版阶段,需要加入等候名单才能使用。如果您有兴趣试用此功能,请填写此表单加入等候名单。Cloud Run 是 Google Cloud 上的容器平台,可让您轻松在容器中运行代码,而无需管理集群。

目前,我们提供的 GPU 是配备 24 GB vRAM 的 Nvidia L4 GPU。每个 Cloud Run 实例都有一个 GPU,并且 Cloud Run 自动扩缩功能仍然适用。这包括最多扩容到 5 个实例(可增加配额),以及在没有请求时缩减到 0 个实例。

在本 Codelab 中,您将创建并部署一个 TorchServe 应用,该应用使用 stable diffusion XL 从文本提示生成图片。生成的图片会以 base64 编码字符串的形式返回给调用方。

此示例基于在 Torchserve 中使用 Huggingface Diffuser 运行 Stable Diffusion 模型。此 Codelab 将介绍如何修改此示例以便与 Cloud Run 搭配使用。

学习内容

  • 如何使用 GPU 在 Cloud Run 上运行 Stable Diffusion XL 模型

2. 启用 API 并设置环境变量

在开始使用此 Codelab 之前,您需要先启用几个 API。本 Codelab 需要使用以下 API。您可以通过运行以下命令来启用这些 API:

gcloud services enable run.googleapis.com \
    storage.googleapis.com \
    cloudbuild.googleapis.com \

然后,您可以设置将在本 Codelab 中全程使用的环境变量。

PROJECT_ID=<YOUR_PROJECT_ID>

REPOSITORY=repo
NETWORK_NAME=default
REGION=us-central1
IMAGE=us-central1-docker.pkg.dev/$PROJECT_ID/$REPOSITORY/gpu-torchserve

3. 创建 Torchserve 应用

首先,为源代码创建一个目录,然后通过 cd 命令进入该目录。

mkdir stable-diffusion-codelab && cd $_

创建 config.properties 文件,这是 TorchServe 的配置文件。

inference_address=http://0.0.0.0:8080
enable_envvars_config=true
min_workers=1
max_workers=1
default_workers_per_model=1
default_response_timeout=1000
load_models=all
max_response_size=655350000
# to enable authorization, see https://github.com/pytorch/serve/blob/master/docs/token_authorization_api.md#how-to-set-and-disable-token-authorization
disable_token_authorization=true

请注意,在此示例中,监听地址 http://0.0.0.0 用于在 Cloud Run 上运行。Cloud Run 的默认端口为 8080。

创建 requirements.txt 文件。

python-dotenv
accelerate
transformers
diffusers
numpy
google-cloud-storage
nvgpu

创建一个名为 stable_diffusion_handler.py 的文件

from abc import ABC
import base64
import datetime
import io
import logging
import os

from diffusers import StableDiffusionXLImg2ImgPipeline
from diffusers import StableDiffusionXLPipeline
from google.cloud import storage
import numpy as np
from PIL import Image
import torch
from ts.torch_handler.base_handler import BaseHandler


logger = logging.getLogger(__name__)


def image_to_base64(image: Image.Image) -> str:
  """Convert a PIL image to a base64 string."""
  buffer = io.BytesIO()
  image.save(buffer, format="JPEG")
  image_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
  return image_str


class DiffusersHandler(BaseHandler, ABC):
  """Diffusers handler class for text to image generation."""

  def __init__(self):
    self.initialized = False

  def initialize(self, ctx):
    """In this initialize function, the Stable Diffusion model is loaded and

       initialized here.
    Args:
        ctx (context): It is a JSON Object containing information pertaining to
          the model artifacts parameters.
    """
    logger.info("Initialize DiffusersHandler")
    self.manifest = ctx.manifest
    properties = ctx.system_properties
    model_dir = properties.get("model_dir")
    model_name = os.environ["MODEL_NAME"]
    model_refiner = os.environ["MODEL_REFINER"]

    self.bucket = None

    logger.info(
        "GPU device count: %s",
        torch.cuda.device_count(),
    )
    logger.info(
        "select the GPU device, cuda is available: %s",
        torch.cuda.is_available(),
    )
    self.device = torch.device(
        "cuda:" + str(properties.get("gpu_id"))
        if torch.cuda.is_available() and properties.get("gpu_id") is not None
        else "cpu"
    )
    logger.info("Device used: %s", self.device)

    # open the pipeline to the inferenece model 
    # this is generating the image
    logger.info("Donwloading model %s", model_name)
    self.pipeline = StableDiffusionXLPipeline.from_pretrained(
        model_name,
        variant="fp16",
        torch_dtype=torch.float16,
        use_safetensors=True,
    ).to(self.device)
    logger.info("done donwloading model %s", model_name)

    # open the pipeline to the refiner
    # refiner is used to remove artifacts from the image
    logger.info("Donwloading refiner %s", model_refiner)
    self.refiner = StableDiffusionXLImg2ImgPipeline.from_pretrained(
        model_refiner,
        variant="fp16",
        torch_dtype=torch.float16,
        use_safetensors=True,
    ).to(self.device)
    logger.info("done donwloading refiner %s", model_refiner)

    self.n_steps = 40
    self.high_noise_frac = 0.8
    self.initialized = True
    # Commonly used basic negative prompts.
    logger.info("using negative_prompt")
    self.negative_prompt = ("worst quality, normal quality, low quality, low res, blurry")

  # this handles the user request
  def preprocess(self, requests):
    """Basic text preprocessing, of the user's prompt.

    Args:
        requests (str): The Input data in the form of text is passed on to the
          preprocess function.

    Returns:
        list : The preprocess function returns a list of prompts.
    """
    logger.info("Process request started")
    inputs = []
    for _, data in enumerate(requests):
      input_text = data.get("data")
      if input_text is None:
        input_text = data.get("body")
      if isinstance(input_text, (bytes, bytearray)):
        input_text = input_text.decode("utf-8")
      logger.info("Received text: '%s'", input_text)
      inputs.append(input_text)
    return inputs

  def inference(self, inputs):
    """Generates the image relevant to the received text.

    Args:
        input_batch (list): List of Text from the pre-process function is passed
          here

    Returns:
        list : It returns a list of the generate images for the input text
    """
    logger.info("Inference request started")
    # Handling inference for sequence_classification.
    image = self.pipeline(
        prompt=inputs,
        negative_prompt=self.negative_prompt,
        num_inference_steps=self.n_steps,
        denoising_end=self.high_noise_frac,
        output_type="latent",
    ).images
    logger.info("Done model")

    image = self.refiner(
        prompt=inputs,
        negative_prompt=self.negative_prompt,
        num_inference_steps=self.n_steps,
        denoising_start=self.high_noise_frac,
        image=image,
    ).images
    logger.info("Done refiner")

    return image

  def postprocess(self, inference_output):
    """Post Process Function converts the generated image into Torchserve readable format.

    Args:
        inference_output (list): It contains the generated image of the input
          text.

    Returns:
        (list): Returns a list of the images.
    """
    logger.info("Post process request started")
    images = []
    response_size = 0
    for image in inference_output:
      # Save image to GCS
      if self.bucket:
        image.save("temp.jpg")

        # Create a blob object
        blob = self.bucket.blob(
            datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".jpg"
        )

        # Upload the file
        blob.upload_from_filename("temp.jpg")

      # to see the image, encode to base64
      encoded = image_to_base64(image)
      response_size += len(encoded)
      images.append(encoded)

    logger.info("Images %d, response size: %d", len(images), response_size)
    return images

创建一个名为 start.sh 的文件。此文件将用作容器中的入口点来启动 TorchServe。

#!/bin/bash

echo "starting the server"
# start the server. By default torchserve runs in backaround, and start.sh will immediately terminate when done
# so use --foreground to keep torchserve running in foreground while start.sh is running in a container  
torchserve --start --ts-config config.properties --models "stable_diffusion=${MAR_FILE_NAME}.mar" --model-store ${MAR_STORE_PATH} --foreground

然后,运行以下命令将其转换为可执行文件。

chmod 755 start.sh

创建 dockerfile

# pick a version of torchserve to avoid any future breaking changes
# docker pull pytorch/torchserve:0.11.1-cpp-dev-gpu
FROM pytorch/torchserve:0.11.1-cpp-dev-gpu AS base

USER root

WORKDIR /home/model-server

COPY requirements.txt ./
RUN pip install --upgrade -r ./requirements.txt

# Stage 1 build the serving container.
FROM base AS serve-gcs

ENV MODEL_NAME='stabilityai/stable-diffusion-xl-base-1.0'
ENV MODEL_REFINER='stabilityai/stable-diffusion-xl-refiner-1.0'

ENV MAR_STORE_PATH='/home/model-server/model-store'
ENV MAR_FILE_NAME='model'
RUN mkdir -p $MAR_STORE_PATH

COPY config.properties ./
COPY stable_diffusion_handler.py ./
COPY start.sh ./

# creates the mar file used by torchserve
RUN torch-model-archiver --force --model-name ${MAR_FILE_NAME} --version 1.0 --handler stable_diffusion_handler.py -r requirements.txt --export-path ${MAR_STORE_PATH}

# entrypoint
CMD ["./start.sh"]

4. 设置 Cloud NAT

借助 Cloud NAT,您可以获得更高的带宽来访问互联网并从 HuggingFace 下载模型,从而显著缩短部署时间。

如需使用 Cloud NAT,请运行以下命令以启用 Cloud NAT 实例:

gcloud compute routers create nat-router --network $NETWORK_NAME --region us-central1
gcloud compute routers nats create vm-nat --router=nat-router --region=us-central1 --auto-allocate-nat-external-ips --nat-all-subnet-ip-ranges

5. 构建并部署 Cloud Run 服务

将代码提交到 Cloud Build。

gcloud builds submit --tag $IMAGE

接下来,部署到 Cloud Run

gcloud beta run deploy gpu-torchserve \
 --image=$IMAGE \
 --cpu=8 --memory=32Gi \
 --gpu=1 --no-cpu-throttling --gpu-type=nvidia-l4 \
 --allow-unauthenticated \
 --region us-central1 \
 --project $PROJECT_ID \
 --execution-environment=gen2 \
 --max-instances 1 \
 --network $NETWORK_NAME \
 --vpc-egress all-traffic

6. 测试服务

您可以通过运行以下命令来测试该服务:

PROMPT_TEXT="a cat sitting in a magnolia tree"

SERVICE_URL=$(gcloud run services describe gpu-torchserve --region $REGION --format 'value(status.url)')

time curl $SERVICE_URL/predictions/stable_diffusion -d "data=$PROMPT_TEXT" | base64 --decode > image.jpg

您会在当前目录中看到 image.jpg 文件。您可以在 Cloud Shell 编辑器中打开该图片,查看一只猫坐在树上的图片。

7. 恭喜!

恭喜您完成此 Codelab!

建议您查看 Cloud Run GPU 文档。

所学内容

  • 如何使用 GPU 在 Cloud Run 上运行 Stable Diffusion XL 模型

8. 清理

为避免意外产生费用(例如,如果此 Cloud Run 作业的意外调用次数超过了免费层级的 Cloud Run 调用月度配额),您可以删除 Cloud Run 作业,也可以删除您在第 2 步中创建的项目。

如需删除 Cloud Run 作业,请前往 https://console.cloud.google.com/run/ 访问 Cloud Run Cloud 控制台,然后删除 gpu-torchserve 服务。

您还需要删除 Cloud NAT 配置

如果您选择删除整个项目,可以前往 https://console.cloud.google.com/cloud-resource-manager,选择您在第 2 步中创建的项目,然后选择“删除”。如果您删除该项目,则需要在 Cloud SDK 中更改项目。您可以通过运行 gcloud projects list 来查看所有可用项目的列表。