Cloud Run GPU で TorchServe と Stable Diffusion を実行する方法

1. はじめに

概要

Cloud Run には最近、GPU のサポートが追加されました。順番待ちのパブリック プレビューとしてご利用いただけます。この機能をお試しになりたい場合は、こちらのフォームにご記入のうえ、順番待ちリストにご登録ください。Cloud Run は Google Cloud 上のコンテナ プラットフォームで、クラスタを管理することなく、コンテナ内でコードを簡単に実行できます。

現在、提供されている GPU は、24 GB の vRAM を備えた Nvidia L4 GPU です。Cloud Run インスタンスごとに 1 つの GPU があり、Cloud Run の自動スケーリングは引き続き適用されます。これには、最大 5 インスタンスのスケールアウト(割り当て増加を利用可能)や、リクエストがない場合はインスタンスを 0 にスケールダウンすることが含まれます。

この Codelab では、Stable Diffusion XL を使用してテキスト プロンプトから画像を生成する TorchServe アプリを作成してデプロイします。生成された画像は、base64 エンコードの文字列として呼び出し元に返されます。

この例は、Torchserve で Huggingface Diffuser を使用して Stable diffusion モデルを実行するに基づいています。この Codelab では、この例を変更して Cloud Run で動作するようにする方法について説明します。

学習内容

  • GPU を使用して Cloud Run で Stable Diffusion XL モデルを実行する方法

2. API を有効にして環境変数を設定する

この Codelab を使用する前に、いくつかの API を有効にする必要があります。この Codelab では、次の API を使用する必要があります。これらの API を有効にするには、次のコマンドを実行します。

gcloud services enable run.googleapis.com \
    storage.googleapis.com \
    cloudbuild.googleapis.com \

次に、この Codelab 全体で使用する環境変数を設定します。

PROJECT_ID=<YOUR_PROJECT_ID>

REPOSITORY=repo
NETWORK_NAME=default
REGION=us-central1
IMAGE=us-central1-docker.pkg.dev/$PROJECT_ID/$REPOSITORY/gpu-torchserve

3. Torchserve アプリを作成する

まず、ソースコードのディレクトリを作成し、そのディレクトリに移動します。

mkdir stable-diffusion-codelab && cd $_

config.properties ファイルを作成します。これは TorchServe の構成ファイルです。

inference_address=http://0.0.0.0:8080
enable_envvars_config=true
min_workers=1
max_workers=1
default_workers_per_model=1
default_response_timeout=1000
load_models=all
max_response_size=655350000
# to enable authorization, see https://github.com/pytorch/serve/blob/master/docs/token_authorization_api.md#how-to-set-and-disable-token-authorization
disable_token_authorization=true

この例では、Cloud Run で動作するためにリッスン アドレス http://0.0.0.0 が使用されています。Cloud Run のデフォルト ポートはポート 8080 です。

requirements.txt ファイルを作成します。

python-dotenv
accelerate
transformers
diffusers
numpy
google-cloud-storage
nvgpu

stable_diffusion_handler.py というファイルを作成します。

from abc import ABC
import base64
import datetime
import io
import logging
import os

from diffusers import StableDiffusionXLImg2ImgPipeline
from diffusers import StableDiffusionXLPipeline
from google.cloud import storage
import numpy as np
from PIL import Image
import torch
from ts.torch_handler.base_handler import BaseHandler


logger = logging.getLogger(__name__)


def image_to_base64(image: Image.Image) -> str:
  """Convert a PIL image to a base64 string."""
  buffer = io.BytesIO()
  image.save(buffer, format="JPEG")
  image_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
  return image_str


class DiffusersHandler(BaseHandler, ABC):
  """Diffusers handler class for text to image generation."""

  def __init__(self):
    self.initialized = False

  def initialize(self, ctx):
    """In this initialize function, the Stable Diffusion model is loaded and

       initialized here.
    Args:
        ctx (context): It is a JSON Object containing information pertaining to
          the model artifacts parameters.
    """
    logger.info("Initialize DiffusersHandler")
    self.manifest = ctx.manifest
    properties = ctx.system_properties
    model_dir = properties.get("model_dir")
    model_name = os.environ["MODEL_NAME"]
    model_refiner = os.environ["MODEL_REFINER"]

    self.bucket = None

    logger.info(
        "GPU device count: %s",
        torch.cuda.device_count(),
    )
    logger.info(
        "select the GPU device, cuda is available: %s",
        torch.cuda.is_available(),
    )
    self.device = torch.device(
        "cuda:" + str(properties.get("gpu_id"))
        if torch.cuda.is_available() and properties.get("gpu_id") is not None
        else "cpu"
    )
    logger.info("Device used: %s", self.device)

    # open the pipeline to the inferenece model 
    # this is generating the image
    logger.info("Donwloading model %s", model_name)
    self.pipeline = StableDiffusionXLPipeline.from_pretrained(
        model_name,
        variant="fp16",
        torch_dtype=torch.float16,
        use_safetensors=True,
    ).to(self.device)
    logger.info("done donwloading model %s", model_name)

    # open the pipeline to the refiner
    # refiner is used to remove artifacts from the image
    logger.info("Donwloading refiner %s", model_refiner)
    self.refiner = StableDiffusionXLImg2ImgPipeline.from_pretrained(
        model_refiner,
        variant="fp16",
        torch_dtype=torch.float16,
        use_safetensors=True,
    ).to(self.device)
    logger.info("done donwloading refiner %s", model_refiner)

    self.n_steps = 40
    self.high_noise_frac = 0.8
    self.initialized = True
    # Commonly used basic negative prompts.
    logger.info("using negative_prompt")
    self.negative_prompt = ("worst quality, normal quality, low quality, low res, blurry")

  # this handles the user request
  def preprocess(self, requests):
    """Basic text preprocessing, of the user's prompt.

    Args:
        requests (str): The Input data in the form of text is passed on to the
          preprocess function.

    Returns:
        list : The preprocess function returns a list of prompts.
    """
    logger.info("Process request started")
    inputs = []
    for _, data in enumerate(requests):
      input_text = data.get("data")
      if input_text is None:
        input_text = data.get("body")
      if isinstance(input_text, (bytes, bytearray)):
        input_text = input_text.decode("utf-8")
      logger.info("Received text: '%s'", input_text)
      inputs.append(input_text)
    return inputs

  def inference(self, inputs):
    """Generates the image relevant to the received text.

    Args:
        input_batch (list): List of Text from the pre-process function is passed
          here

    Returns:
        list : It returns a list of the generate images for the input text
    """
    logger.info("Inference request started")
    # Handling inference for sequence_classification.
    image = self.pipeline(
        prompt=inputs,
        negative_prompt=self.negative_prompt,
        num_inference_steps=self.n_steps,
        denoising_end=self.high_noise_frac,
        output_type="latent",
    ).images
    logger.info("Done model")

    image = self.refiner(
        prompt=inputs,
        negative_prompt=self.negative_prompt,
        num_inference_steps=self.n_steps,
        denoising_start=self.high_noise_frac,
        image=image,
    ).images
    logger.info("Done refiner")

    return image

  def postprocess(self, inference_output):
    """Post Process Function converts the generated image into Torchserve readable format.

    Args:
        inference_output (list): It contains the generated image of the input
          text.

    Returns:
        (list): Returns a list of the images.
    """
    logger.info("Post process request started")
    images = []
    response_size = 0
    for image in inference_output:
      # Save image to GCS
      if self.bucket:
        image.save("temp.jpg")

        # Create a blob object
        blob = self.bucket.blob(
            datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".jpg"
        )

        # Upload the file
        blob.upload_from_filename("temp.jpg")

      # to see the image, encode to base64
      encoded = image_to_base64(image)
      response_size += len(encoded)
      images.append(encoded)

    logger.info("Images %d, response size: %d", len(images), response_size)
    return images

start.sh というファイルを作成します。このファイルは、TorchServe を起動するためのコンテナ内のエントリポイントとして使用されます。

#!/bin/bash

echo "starting the server"
# start the server. By default torchserve runs in backaround, and start.sh will immediately terminate when done
# so use --foreground to keep torchserve running in foreground while start.sh is running in a container  
torchserve --start --ts-config config.properties --models "stable_diffusion=${MAR_FILE_NAME}.mar" --model-store ${MAR_STORE_PATH} --foreground

次に、次のコマンドを実行して実行可能ファイルにします。

chmod 755 start.sh

dockerfile を作成します。

# pick a version of torchserve to avoid any future breaking changes
# docker pull pytorch/torchserve:0.11.1-cpp-dev-gpu
FROM pytorch/torchserve:0.11.1-cpp-dev-gpu AS base

USER root

WORKDIR /home/model-server

COPY requirements.txt ./
RUN pip install --upgrade -r ./requirements.txt

# Stage 1 build the serving container.
FROM base AS serve-gcs

ENV MODEL_NAME='stabilityai/stable-diffusion-xl-base-1.0'
ENV MODEL_REFINER='stabilityai/stable-diffusion-xl-refiner-1.0'

ENV MAR_STORE_PATH='/home/model-server/model-store'
ENV MAR_FILE_NAME='model'
RUN mkdir -p $MAR_STORE_PATH

COPY config.properties ./
COPY stable_diffusion_handler.py ./
COPY start.sh ./

# creates the mar file used by torchserve
RUN torch-model-archiver --force --model-name ${MAR_FILE_NAME} --version 1.0 --handler stable_diffusion_handler.py -r requirements.txt --export-path ${MAR_STORE_PATH}

# entrypoint
CMD ["./start.sh"]

4. Cloud NAT を設定する

Cloud NAT を使用すると、インターネットにアクセスして HuggingFace からモデルをダウンロードするための帯域幅を増やすことができます。これにより、デプロイ時間が大幅に短縮されます。

Cloud NAT を使用するには、次のコマンドを実行して Cloud NAT インスタンスを有効にします。

gcloud compute routers create nat-router --network $NETWORK_NAME --region us-central1
gcloud compute routers nats create vm-nat --router=nat-router --region=us-central1 --auto-allocate-nat-external-ips --nat-all-subnet-ip-ranges

5. Cloud Run サービスをビルドしてデプロイする

コードを Cloud Build に送信します。

gcloud builds submit --tag $IMAGE

次に、Cloud Run にデプロイします。

gcloud beta run deploy gpu-torchserve \
 --image=$IMAGE \
 --cpu=8 --memory=32Gi \
 --gpu=1 --no-cpu-throttling --gpu-type=nvidia-l4 \
 --allow-unauthenticated \
 --region us-central1 \
 --project $PROJECT_ID \
 --execution-environment=gen2 \
 --max-instances 1 \
 --network $NETWORK_NAME \
 --vpc-egress all-traffic

6. サービスをテストする

次のコマンドを実行してサービスをテストできます。

PROMPT_TEXT="a cat sitting in a magnolia tree"

SERVICE_URL=$(gcloud run services describe gpu-torchserve --region $REGION --format 'value(status.url)')

time curl $SERVICE_URL/predictions/stable_diffusion -d "data=$PROMPT_TEXT" | base64 --decode > image.jpg

現在のディレクトリに image.jpg ファイルが表示されます。Cloud Shell エディタで画像を開くと、木に座っている猫の画像が表示されます。

7. 完了

以上で、この Codelab は完了です。

Cloud Run GPU のドキュメントを確認することをおすすめします。

学習した内容

  • GPU を使用して Cloud Run で Stable Diffusion XL モデルを実行する方法

8. クリーンアップ

誤って課金されないようにするには(たとえば、この Cloud Run ジョブが無料 tier の Cloud Run の月間呼び出し割り当てを超える回数誤って呼び出された場合など)、Cloud Run ジョブを削除するか、手順 2 で作成したプロジェクトを削除します。

Cloud Run ジョブを削除するには、Cloud Run Cloud コンソール(https://console.cloud.google.com/run/)に移動し、gpu-torchserve サービスを削除します。

また、Cloud NAT 構成を削除することも必要です。

プロジェクト全体を削除する場合は、https://console.cloud.google.com/cloud-resource-manager に移動し、ステップ 2 で作成したプロジェクトを選択して、[削除] を選択します。プロジェクトを削除する場合は、Cloud SDK でプロジェクトを変更する必要があります。gcloud projects list を実行すると、使用可能なすべてのプロジェクトのリストが表示されます。