كيفية تشغيل TorchServe وStable Diffusion على وحدات معالجة الرسومات في Cloud Run

1. مقدمة

نظرة عامة

أضافت خدمة Cloud Run مؤخرًا ميزة استخدام وحدة معالجة الرسومات. وهي متاحة كميزة تجريبية علنية في قائمة الانتظار. إذا أردت تجربة الميزة، يُرجى ملء هذا النموذج للانضمام إلى قائمة الانتظار. ‫Cloud Run هي منصة حاويات على Google Cloud تسهّل تشغيل الرمز البرمجي في حاوية بدون الحاجة إلى إدارة مجموعة.

في الوقت الحالي، وحدات معالجة الرسومات التي نوفّرها هي وحدات معالجة الرسومات Nvidia L4 التي تتضمّن ذاكرة وصول عشوائي للفيديو (VRAM) بسعة 24 غيغابايت. تتوفّر وحدة معالجة رسومات واحدة لكلّ مثيل من Cloud Run، ولا يزال تطبيق ميزة "التحجيم التلقائي" في Cloud Run ساريًا. ويشمل ذلك زيادة عدد النُسخ إلى 5 نُسخ (مع توفّر زيادة الحصة)، بالإضافة إلى تقليل عدد النُسخ إلى 0 نُسخ في حال عدم توفّر أي طلبات.

في هذا الدليل التعليمي حول الرموز البرمجية، ستنشئ تطبيق TorchServe وتنشره باستخدام الانتشار الثابت XL لإنشاء صور من طلب نصي. يتم عرض الصورة التي تم إنشاؤها للمتصل كسلسلة مرمّزة بترميز base64.

يستند هذا المثال إلى تشغيل نموذج الانتشار الثابت باستخدام Huggingface Diffusers في Torchserve. يوضّح لك هذا الدليل التعليمي كيفية تعديل هذا المثال للعمل مع Cloud Run.

ما ستتعرّف عليه

  • كيفية تشغيل نموذج Stable Diffusion XL على Cloud Run باستخدام وحدات معالجة الرسومات

2. تفعيل واجهات برمجة التطبيقات وضبط متغيّرات البيئة

قبل أن تتمكّن من بدء استخدام هذا الدليل التعليمي للترميز، هناك عدة واجهات برمجة تطبيقات يجب تفعيلها. يتطلّب هذا الدرس التطبيقي استخدام واجهات برمجة التطبيقات التالية. يمكنك تفعيل واجهات برمجة التطبيقات هذه من خلال تنفيذ الأمر التالي:

gcloud services enable run.googleapis.com \
    storage.googleapis.com \
    cloudbuild.googleapis.com \

بعد ذلك، يمكنك ضبط متغيّرات البيئة التي سيتم استخدامها في هذا الدليل التعليمي.

PROJECT_ID=<YOUR_PROJECT_ID>

REPOSITORY=repo
NETWORK_NAME=default
REGION=us-central1
IMAGE=us-central1-docker.pkg.dev/$PROJECT_ID/$REPOSITORY/gpu-torchserve

3- إنشاء تطبيق Torchserve

أولاً، أنشئ دليلاً لرمز المصدر وانتقِل إليه باستخدام الأمر cd.

mkdir stable-diffusion-codelab && cd $_

أنشئ ملفًا بتنسيق config.properties. هذا هو ملف الإعدادات لتطبيق TorchServe.

inference_address=http://0.0.0.0:8080
enable_envvars_config=true
min_workers=1
max_workers=1
default_workers_per_model=1
default_response_timeout=1000
load_models=all
max_response_size=655350000
# to enable authorization, see https://github.com/pytorch/serve/blob/master/docs/token_authorization_api.md#how-to-set-and-disable-token-authorization
disable_token_authorization=true

يُرجى العلم أنّه في هذا المثال، يتم استخدام عنوان الاستماع http://0.0.0.0 للعمل على Cloud Run. المنفذ التلقائي لخدمة Cloud Run هو المنفذ 8080.

أنشئ ملفًا بتنسيق requirements.txt.

python-dotenv
accelerate
transformers
diffusers
numpy
google-cloud-storage
nvgpu

أنشئ ملفًا باسم stable_diffusion_handler.py.

from abc import ABC
import base64
import datetime
import io
import logging
import os

from diffusers import StableDiffusionXLImg2ImgPipeline
from diffusers import StableDiffusionXLPipeline
from google.cloud import storage
import numpy as np
from PIL import Image
import torch
from ts.torch_handler.base_handler import BaseHandler


logger = logging.getLogger(__name__)


def image_to_base64(image: Image.Image) -> str:
  """Convert a PIL image to a base64 string."""
  buffer = io.BytesIO()
  image.save(buffer, format="JPEG")
  image_str = base64.b64encode(buffer.getvalue()).decode("utf-8")
  return image_str


class DiffusersHandler(BaseHandler, ABC):
  """Diffusers handler class for text to image generation."""

  def __init__(self):
    self.initialized = False

  def initialize(self, ctx):
    """In this initialize function, the Stable Diffusion model is loaded and

       initialized here.
    Args:
        ctx (context): It is a JSON Object containing information pertaining to
          the model artifacts parameters.
    """
    logger.info("Initialize DiffusersHandler")
    self.manifest = ctx.manifest
    properties = ctx.system_properties
    model_dir = properties.get("model_dir")
    model_name = os.environ["MODEL_NAME"]
    model_refiner = os.environ["MODEL_REFINER"]

    self.bucket = None

    logger.info(
        "GPU device count: %s",
        torch.cuda.device_count(),
    )
    logger.info(
        "select the GPU device, cuda is available: %s",
        torch.cuda.is_available(),
    )
    self.device = torch.device(
        "cuda:" + str(properties.get("gpu_id"))
        if torch.cuda.is_available() and properties.get("gpu_id") is not None
        else "cpu"
    )
    logger.info("Device used: %s", self.device)

    # open the pipeline to the inferenece model 
    # this is generating the image
    logger.info("Donwloading model %s", model_name)
    self.pipeline = StableDiffusionXLPipeline.from_pretrained(
        model_name,
        variant="fp16",
        torch_dtype=torch.float16,
        use_safetensors=True,
    ).to(self.device)
    logger.info("done donwloading model %s", model_name)

    # open the pipeline to the refiner
    # refiner is used to remove artifacts from the image
    logger.info("Donwloading refiner %s", model_refiner)
    self.refiner = StableDiffusionXLImg2ImgPipeline.from_pretrained(
        model_refiner,
        variant="fp16",
        torch_dtype=torch.float16,
        use_safetensors=True,
    ).to(self.device)
    logger.info("done donwloading refiner %s", model_refiner)

    self.n_steps = 40
    self.high_noise_frac = 0.8
    self.initialized = True
    # Commonly used basic negative prompts.
    logger.info("using negative_prompt")
    self.negative_prompt = ("worst quality, normal quality, low quality, low res, blurry")

  # this handles the user request
  def preprocess(self, requests):
    """Basic text preprocessing, of the user's prompt.

    Args:
        requests (str): The Input data in the form of text is passed on to the
          preprocess function.

    Returns:
        list : The preprocess function returns a list of prompts.
    """
    logger.info("Process request started")
    inputs = []
    for _, data in enumerate(requests):
      input_text = data.get("data")
      if input_text is None:
        input_text = data.get("body")
      if isinstance(input_text, (bytes, bytearray)):
        input_text = input_text.decode("utf-8")
      logger.info("Received text: '%s'", input_text)
      inputs.append(input_text)
    return inputs

  def inference(self, inputs):
    """Generates the image relevant to the received text.

    Args:
        input_batch (list): List of Text from the pre-process function is passed
          here

    Returns:
        list : It returns a list of the generate images for the input text
    """
    logger.info("Inference request started")
    # Handling inference for sequence_classification.
    image = self.pipeline(
        prompt=inputs,
        negative_prompt=self.negative_prompt,
        num_inference_steps=self.n_steps,
        denoising_end=self.high_noise_frac,
        output_type="latent",
    ).images
    logger.info("Done model")

    image = self.refiner(
        prompt=inputs,
        negative_prompt=self.negative_prompt,
        num_inference_steps=self.n_steps,
        denoising_start=self.high_noise_frac,
        image=image,
    ).images
    logger.info("Done refiner")

    return image

  def postprocess(self, inference_output):
    """Post Process Function converts the generated image into Torchserve readable format.

    Args:
        inference_output (list): It contains the generated image of the input
          text.

    Returns:
        (list): Returns a list of the images.
    """
    logger.info("Post process request started")
    images = []
    response_size = 0
    for image in inference_output:
      # Save image to GCS
      if self.bucket:
        image.save("temp.jpg")

        # Create a blob object
        blob = self.bucket.blob(
            datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".jpg"
        )

        # Upload the file
        blob.upload_from_filename("temp.jpg")

      # to see the image, encode to base64
      encoded = image_to_base64(image)
      response_size += len(encoded)
      images.append(encoded)

    logger.info("Images %d, response size: %d", len(images), response_size)
    return images

أنشئ ملفًا باسم start.sh. يُستخدَم هذا الملف كنقطة دخول في الحاوية لبدء TorchServe.

#!/bin/bash

echo "starting the server"
# start the server. By default torchserve runs in backaround, and start.sh will immediately terminate when done
# so use --foreground to keep torchserve running in foreground while start.sh is running in a container  
torchserve --start --ts-config config.properties --models "stable_diffusion=${MAR_FILE_NAME}.mar" --model-store ${MAR_STORE_PATH} --foreground

بعد ذلك، نفِّذ الأمر التالي لجعله ملفًا قابلاً للتنفيذ.

chmod 755 start.sh

أنشئ dockerfile.

# pick a version of torchserve to avoid any future breaking changes
# docker pull pytorch/torchserve:0.11.1-cpp-dev-gpu
FROM pytorch/torchserve:0.11.1-cpp-dev-gpu AS base

USER root

WORKDIR /home/model-server

COPY requirements.txt ./
RUN pip install --upgrade -r ./requirements.txt

# Stage 1 build the serving container.
FROM base AS serve-gcs

ENV MODEL_NAME='stabilityai/stable-diffusion-xl-base-1.0'
ENV MODEL_REFINER='stabilityai/stable-diffusion-xl-refiner-1.0'

ENV MAR_STORE_PATH='/home/model-server/model-store'
ENV MAR_FILE_NAME='model'
RUN mkdir -p $MAR_STORE_PATH

COPY config.properties ./
COPY stable_diffusion_handler.py ./
COPY start.sh ./

# creates the mar file used by torchserve
RUN torch-model-archiver --force --model-name ${MAR_FILE_NAME} --version 1.0 --handler stable_diffusion_handler.py -r requirements.txt --export-path ${MAR_STORE_PATH}

# entrypoint
CMD ["./start.sh"]

4. إعداد Cloud NAT

تتيح لك تقنية Cloud NAT الحصول على معدل نقل بيانات أعلى للوصول إلى الإنترنت وتنزيل النموذج من HuggingFace، ما سيؤدي إلى تسريع أوقات النشر بشكل كبير.

لاستخدام Cloud NAT، شغِّل الأمر التالي لتفعيل إحدى مثيلات Cloud NAT:

gcloud compute routers create nat-router --network $NETWORK_NAME --region us-central1
gcloud compute routers nats create vm-nat --router=nat-router --region=us-central1 --auto-allocate-nat-external-ips --nat-all-subnet-ip-ranges

5- إنشاء خدمة Cloud Run ونشرها

أرسِل الرمز إلى Cloud Build.

gcloud builds submit --tag $IMAGE

بعد ذلك، عليك نشر التطبيق على Cloud Run.

gcloud beta run deploy gpu-torchserve \
 --image=$IMAGE \
 --cpu=8 --memory=32Gi \
 --gpu=1 --no-cpu-throttling --gpu-type=nvidia-l4 \
 --allow-unauthenticated \
 --region us-central1 \
 --project $PROJECT_ID \
 --execution-environment=gen2 \
 --max-instances 1 \
 --network $NETWORK_NAME \
 --vpc-egress all-traffic

6- اختبار الخدمة

يمكنك اختبار الخدمة من خلال تنفيذ الأوامر التالية:

PROMPT_TEXT="a cat sitting in a magnolia tree"

SERVICE_URL=$(gcloud run services describe gpu-torchserve --region $REGION --format 'value(status.url)')

time curl $SERVICE_URL/predictions/stable_diffusion -d "data=$PROMPT_TEXT" | base64 --decode > image.jpg

سيظهر لك ملف image.jpg في الدليل الحالي. يمكنك فتح الصورة في محرِّر Cloud Shell للاطّلاع على صورة قطة تجلس في شجرة.

7- تهانينا!

تهانينا على إكمال دورة codelab.

ننصحك بمراجعة المستندات حول وحدات معالجة الرسومات في Cloud Run.

المواضيع التي تناولناها

  • كيفية تشغيل نموذج Stable Diffusion XL على Cloud Run باستخدام وحدات معالجة الرسومات

8. تَنظيم

لتجنُّب الرسوم غير المقصودة (على سبيل المثال، إذا تمّ استدعاء مهمة Cloud Run هذه بدون قصد أكثر من عدد عمليات استدعاء Cloud Run الشهرية المخصّصة لك في الفئة المجانية)، يمكنك إما حذف مهمة Cloud Run أو حذف المشروع الذي أنشأته في الخطوة 2.

لحذف وظيفة Cloud Run، انتقِل إلى Cloud Console في Cloud Run على الرابط https://console.cloud.google.com/run/ وأزِل خدمة gpu-torchserve.

عليك أيضًا حذف إعدادات Cloud NAT.

إذا اخترت حذف المشروع بأكمله، يمكنك الانتقال إلى https://console.cloud.google.com/cloud-resource-manager واختيار المشروع الذي أنشأته في الخطوة 2 ثم اختيار "حذف". في حال حذف المشروع، عليك تغيير المشاريع في حزمة Cloud SDK. يمكنك عرض قائمة بجميع المشاريع المتاحة من خلال تشغيل gcloud projects list.