Vertex AI Vision Queue Detection アプリ

1. 目標

概要

この Codelab では、小売店の動画映像を使用してキューサイズをモニタリングするための Vertex AI Vision アプリケーションをエンドツーエンドで作成することに焦点を当てます。事前トレーニング済みの Specialized モデルの組み込み機能である占有率分析を使用して、次の情報を取得します。

  • キューに並んでいる人の数を数えます。
  • カウンターでサービスを受けている人の数を数えます。

学習内容

  • Vertex AI Vision でアプリケーションを作成してデプロイする方法
  • 動画ファイルを使用して RTSP ストリームを設定し、Jupyter ノートブックから vaictl を使用してストリームを Vertex AI Vision に取り込む方法。
  • Occupancy Analytics モデルとそのさまざまな機能の使用方法。
  • ストレージの Vertex AI Vision の Media Warehouse で動画を検索する方法。
  • 出力を BigQuery に接続する方法、SQL クエリを作成してモデルの JSON 出力から分析情報を抽出する方法、出力を利用して元の動画にラベルを付け、注釈を付ける方法。

費用:

このラボを Google Cloud で実行するための総費用は約 $2 です。

2. 始める前に

プロジェクトを作成して API を有効にします。

  1. Google Cloud コンソールのプロジェクト セレクタ ページで、Google Cloud プロジェクトを選択または作成します。: この手順で作成するリソースをそのまま保持する予定でない場合、既存のプロジェクトを選択するのではなく、新しいプロジェクトを作成してください。チュートリアルの終了後にそのプロジェクトを削除すれば、プロジェクトに関連するすべてのリソースを削除できます。プロジェクト セレクタに移動
  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
  3. Compute Engine、Vertex API、Notebook API、Vision AI API を有効にします。API を有効にする

サービス アカウントを作成します。

  1. Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。[サービス アカウントの作成] に移動
  2. プロジェクトを選択します。
  3. [サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドに値が設定されます。[サービス アカウントの説明] フィールドに説明を入力します。たとえば、クイックスタート用のサービス アカウントなどです。
  4. [作成して続行] をクリックします。
  5. プロジェクトへのアクセス権を付与するには、サービス アカウントに次のロールを付与します。
  • [Vision AI] > [Vision AI 編集者]
  • [Compute Engine] > [Compute インスタンス管理者(ベータ版)]
  • BigQuery > BigQuery 管理者

[ロールを選択] リストでロールを選択します。ロールを追加するには、[別のロールを追加] をクリックして各ロールを追加します。

  1. [続行] をクリックします。
  2. [完了] をクリックして、サービス アカウントの作成を完了します。ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

3. Jupyter Notebook を設定する

Occupancy Analytics でアプリを作成する前に、アプリで後で使用できるストリームを登録する必要があります。

このチュートリアルでは、動画をホストする Jupyter ノートブック インスタンスを作成し、そのストリーミング動画データをノートブックから送信します。Jupyter Notebook は、シェル コマンドの実行とカスタムの事前処理/事後処理コードの実行を 1 か所で行える柔軟性があり、迅速なテストに非常に適しているため、これを使用しています。このノートブックは次の目的で使用します。

  1. rtsp サーバーをバックグラウンド プロセスとして実行
  2. vaictl コマンドをバックグラウンド プロセスとして実行する
  3. クエリと処理コードを実行して、占有率分析の出力を分析する

Jupyter ノートブックを作成する

Jupyter ノートブック インスタンスから動画を送信する最初のステップは、前の手順で作成したサービス アカウントを使用してノートブックを作成することです。

  1. コンソールで、Vertex AI ページに移動します。Vertex AI Workbench に移動
  2. [ユーザー管理のノートブック] をクリックします。

65b7112822858dce.png

  1. [新しいノートブック] > [Tensorflow Enterprise 2.6 (with LTS)] > [Without GPUs] をクリックします。

dc156f20b14651d7.png

  1. Jupyter Notebook の名前を入力します。詳細については、リソースの命名規則をご覧ください。

b4dbc5fddc37e8d9.png

  1. [詳細オプション] をクリックします。
  2. [権限セクション] まで下にスクロールします。
  3. [Compute Engine のデフォルトのサービス アカウントを使用する] オプションをオフにする
  4. 前の手順で作成したサービス アカウントのメールアドレスを追加します。[作成] をクリックします。

ec0b9ef00f0ef470.png

  1. インスタンスが作成されたら、[JUPYTERLAB を開く] をクリックします。

4. 動画をストリーミングするようにノートブックを設定する

Occupancy Analytics でアプリを作成する前に、アプリで後で使用できるストリームを登録する必要があります。

このチュートリアルでは、Jupyter ノートブック インスタンスを使用して動画をホストし、ノートブック ターミナルからそのストリーミング動画データを送信します。

vaictl コマンドライン ツールをダウンロードする

  1. 開いた JupyterLab インスタンスで、ランチャーから [ノートブック] を開きます。

a6d182923ae4ada3.png

  1. ノートブック セルで次のコマンドを使用して、Vertex AI Vision(vaictl)コマンドライン ツール、rtsp サーバー コマンドライン ツール、open-cv ツールをダウンロードします。
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg

5. ストリーミング用の動画ファイルを取り込む

必要なコマンドライン ツールを使用してノートブック環境を設定したら、サンプル動画ファイルをコピーし、vaictl を使用して占有率分析アプリに動画データをストリーミングできます。

新しいストリームを登録する

  1. Vertex AI Vision の左側のパネルで [ストリーム] タブをクリックします。
  2. 上部の [登録] ボタンをクリックします。eba418e723916514.png
  3. [ストリーム名] に「queue-stream」と入力します。
  4. リージョンで、前の手順でノートブックの作成時に選択したリージョンと同じリージョンを選択します。
  5. [登録] をクリックします。

サンプル動画を VM にコピーする

  1. ノートブックで、次の wget コマンドを使用してサンプル動画をコピーします。
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4

VM から動画をストリーミングし、ストリームにデータを取り込む

  1. このローカル動画ファイルをアプリの入力ストリームに送信するには、ノートブック セルで次のコマンドを使用します。次の変数を置き換える必要があります。
  • PROJECT_ID: Google Cloud プロジェクト ID。
  • LOCATION: 地域 ID。たとえば、us-central1 などです。詳細については、クラウドのロケーションをご覧ください。
  • LOCAL_FILE: ローカル動画ファイルのファイル名。例: seq25_h264.mp4。
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
  1. rtsp プロトコルで動画ファイルをストリーミングする rtsp-simple-server を起動します。
import os
import time
import subprocess

subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
  1. ffmpeg コマンドライン ツールを使用して rtsp ストリームで動画をループする
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
  1. vaictl コマンドライン ツールを使用して、rtsp サーバー URI から前のステップで作成した Vertex AI Vision ストリーム「queue-stream」に動画をストリーミングします。
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)

vaictl 取り込みオペレーションの開始から動画がダッシュボードに表示されるまで、約 100 秒かかることがあります。

ストリームの取り込みが完了すると、Vertex AI Vision ダッシュボードの [ストリーム] タブで、queue-stream ストリームを選択して動画フィードを確認できます。

[ストリーム] タブに移動

1b7aac7d36552f29.png

6. アプリケーションを作成する

最初のステップは、データを処理するアプリを作成することです。アプリは、次のものを接続する自動化されたパイプラインと考えることができます。

  • データの取り込み: 動画フィードがストリームに取り込まれます。
  • データ分析: 取り込み後に AI(コンピュータ ビジョン)モデルを追加できます。
  • データ ストレージ: 動画フィードの 2 つのバージョン(元のストリームと AI モデルで処理されたストリーム)をメディア ウェアハウスに保存できます。

Google Cloud コンソールでは、アプリはグラフとして表されます。

空のアプリを作成する

アプリグラフにデータを入力する前に、まず空のアプリを作成する必要があります。

Google Cloud コンソールでアプリを作成します。

  1. Google Cloud のコンソールに移動します。
  2. Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
  3. [作成] ボタンをクリックします。21ecba7a23e9979e.png
  4. アプリ名として「queue-app'」と入力し、リージョンを選択します。
  5. [作成] をクリックします。

アプリ コンポーネント ノードを追加する

空のアプリケーションを作成したら、次の 3 つのノードをアプリグラフに追加できます。

  1. 取り込みノード: ノートブックで作成した rtsp 動画サーバーから送信されたデータを取り込むストリーム リソース。
  2. 処理ノード: 取り込まれたデータに対して動作する占有率分析モデル。
  3. ストレージ ノード: 処理済みの動画を保存し、メタデータ ストアとして機能するメディア ウェアハウス。メタデータ ストアには、取り込まれた動画データに関する分析情報と、AI モデルによって推論された情報が含まれます。

コンソールでアプリにコンポーネント ノードを追加します。

  1. Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動

処理パイプラインのグラフ表示が表示されます。

データの取り込みノードを追加する

  1. 入力ストリーム ノードを追加するには、サイドメニューの [コネクタ] セクションで [ストリーム] オプションを選択します。
  2. 開いた [ストリーム] メニューの [ソース] セクションで、[ストリームを追加] を選択します。
  3. [ストリームを追加] メニューで、[queue-stream] を選択します。
  4. ストリームをアプリグラフに追加するには、[ストリームを追加] をクリックします。

データ処理ノードを追加する

  1. 占有率カウント モデルノードを追加するには、サイドメニューの [Specialized models] セクションで [occupancy analytics] オプションを選択します。
  2. デフォルトの選択肢である [ユーザー] をそのままにします。[車両] が選択されている場合は、選択を解除します。

618b0c9dc671bae3.png

  1. [詳細オプション] セクションで、[アクティブ ゾーン/ラインを作成 ] 5b2f31235603e05d.png をクリックします。
  2. ポリゴン ツールを使用してアクティブ ゾーンを描画し、そのゾーン内の人数をカウントします。ゾーンに適切なラベルを付ける

50281a723650491f.png

  1. 上部の戻る矢印をクリックします。

2bf0ff4d029d29eb.png

  1. チェックボックスをクリックして、滞留時間を設定し、混雑状況を検出します。

c067fa256ca5bb96.png

データ ストレージ ノードを追加する

  1. 出力先(ストレージ)ノードを追加するには、サイドメニューの [コネクタ] セクションで [Vision AI Warehouse] オプションを選択します。
  2. [Vertex AI Warehouse] コネクタをクリックしてメニューを開き、[ウェアハウスを接続] をクリックします。
  3. [ウェアハウスを接続] メニューで、[新しいウェアハウスを作成] を選択します。ウェアハウスに queue-warehouse という名前を付け、TTL の期間を 14 日のままにします。
  4. [作成] ボタンをクリックしてウェアハウスを追加します。

7. 出力を BigQuery テーブルに接続する

BigQuery コネクタを Vertex AI Vision アプリに追加すると、接続されているすべてのアプリモデルの出力がターゲット テーブルに取り込まれます。

独自の BigQuery テーブルを作成し、アプリに BigQuery コネクタを追加するときにそのテーブルを指定するか、Vertex AI Vision アプリ プラットフォームにテーブルを自動的に作成させることができます。

テーブルの自動作成

Vertex AI Vision アプリ プラットフォームでテーブルを自動的に作成する場合は、BigQuery コネクタノードを追加するときにこのオプションを指定できます。

テーブルの自動作成を使用する場合は、次のデータセットとテーブルの条件が適用されます。

  • データセット: 自動的に作成されるデータセット名は visionai_dataset です。
  • テーブル: 自動的に作成されるテーブル名は visionai_dataset.APPLICATION_ID です。
  • エラー処理:
  • 同じデータセットに同じ名前のテーブルが存在する場合、自動作成は行われません。
  1. Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
  2. リストからアプリケーションの名前の横にある [アプリを表示] を選択します。
  3. アプリケーション ビルダーのページで、[コネクタ] セクションから [BigQuery] を選択します。
  4. [BigQuery パス] フィールドは空欄のままにします。

ee0b67d4ab2263d.png

  1. [store metadata from:] で、[occupancy Analytics] のみを選択し、ストリームのチェックを外します。

最終的なアプリグラフは次のようになります。

da0a1a049843572f.png

8. アプリをデプロイして使用できるようにする

必要なコンポーネントをすべて使用してエンドツーエンドのアプリを構築したら、アプリを使用するための最後のステップとして、アプリをデプロイします。

  1. Vertex AI Vision ダッシュボードの [アプリケーション] タブを開きます。[アプリケーション] タブに移動
  2. リストで queue-app アプリの横にある [アプリを表示] を選択します。
  3. [Studio] ページで、[デプロイ] ボタンをクリックします。
  4. 次の確認ダイアログで、[デプロイ] をクリックします。デプロイ オペレーションが完了するまでに数分かかることがあります。デプロイが完了すると、ノードの横に緑色のチェックマークが表示されます。dc514d9b9f35099d.png

9. ストレージ ウェアハウスで動画コンテンツを検索する

動画データを処理アプリに取り込んだ後、分析された動画データを表示し、占有率分析情報に基づいてデータを検索できます。

  1. Vertex AI Vision ダッシュボードの [ウェアハウス] タブを開きます。[ウェアハウス] タブに移動
  2. リストで queue-warehouse ウェアハウスを見つけて、[アセットを表示] をクリックします。
  3. [人数] セクションで、[最小] の値を 1 に、[最大] の値を 5 に設定します。
  4. Vertex AI Vision の Media Warehouse に保存されている処理済みの動画データをフィルタするには、[検索] をクリックします。

a0e5766262443d6c.png

Google Cloud コンソールで検索条件に一致する保存済み動画データのビュー。

10. BigQuery テーブルを使用して出力をアノテーションして分析する

  1. ノートブックで、セル内の次の変数を初期化します。
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
  1. 次のコードを使用して、rtsp ストリームからフレームをキャプチャします。
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone

frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()

stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
  global frames
  while True:
    ret, frame = stream.read()
    frame_ts = datetime.now(timezone.utc).timestamp() * 1000
    if ret:
      with frame_buffer_lock:
        while len(frame_buffer) >= frame_buffer_size:
          _ = frame_buffer.popitem(last=False)
        frame_buffer[frame_ts] = frame

frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
  1. BigQuery テーブルからデータ タイムスタンプとアノテーション情報を取得し、キャプチャしたフレーム画像を保存するディレクトリを作成します。
from google.cloud import bigquery
import pandas as pd

client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""

bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
   os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
  1. 次のコードを使用してフレームにアノテーションを付けます。
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output

im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)

dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
  while True:
    try:
        annotations_df = client.query(f'''
          SELECT ingestion_time, annotation
          FROM `{bq_table}`
          WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
         ''').to_dataframe()
    except ValueError as e: 
        continue
    bq_max_ingest_ts = annotations_df['ingestion_time'].max()
    for _, row in annotations_df.iterrows():
      with frame_buffer_lock:
        frame_ts = np.asarray(list(frame_buffer.keys()))
        delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
        delta_tx_idx = delta_ts.argmin()
        closest_ts_delta = delta_ts[delta_tx_idx]
        closest_ts = frame_ts[delta_tx_idx]
        if closest_ts_delta > frame_buffer_error_milliseconds: continue
        image = frame_buffer[closest_ts]
      annotations = json.loads(row['annotation'])
      for box in annotations['identifiedBoxes']:
        image = cv2.rectangle(
          image,
          (
            int(box['normalizedBoundingBox']['xmin']*im_width),
            int(box['normalizedBoundingBox']['ymin']*im_height)
          ),
          (
            int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
            int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
          ),
          (255, 0, 0), 2
        )
      img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
      cv2.imwrite(img_filename, image)
      binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
      curr_framedata = {
        'path': img_filename,
        'timestamp_error': closest_ts_delta,
        'counts': {
          **{
            k['annotation']['displayName'] : cntext(k['counts'])
            for k in annotations['stats']["activeZoneCounts"]
          },
          'full-frame': cntext(annotations['stats']["fullFrameCount"])
        }
      }
      framedata[img_filename] = curr_framedata
      if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
        dashdelta = datetime.now()
        clear_output()
        display(HTML(f'''
          <h1>Queue Monitoring Application</h1>
          <p>Live Feed of the queue camera:</p>
          <p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
          <table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
            <caption>Current Model Outputs</caption>
            <thead>
              <tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
            </thead>
            <tbody>
              <tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
              <tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
              <tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
              <tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
            </tbody>
          </table>
          <p>&nbsp;</p>
        '''))
except KeyboardInterrupt:
  print('Stopping Live Monitoring')

9426ffe2376f0a7d.png

  1. ノートブックのメニューバーにある [停止] ボタンを使用して、アノテーション タスクを停止します。

6c19cb00dcb28894.png

  1. 次のコードを使用して、個々のフレームを再確認できます。
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
    display(framedata[imgs[frame]])
    display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
    description='Frame #:',
    value=0,
    min=0, max=len(imgs)-1, step=1,
    layout=Layout(width='100%')))

78b63b546a4c883b.png

11. 完了

お疲れさまでした。これでラボは完了です。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトの削除

リソースを個別に削除する

リソース

https://cloud.google.com/vision-ai/docs/overview

https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial

ライセンス

アンケート

このチュートリアルをどのように使用しましたか?

通読のみ 通読して演習を行う

この Codelab はお役に立ちましたか?

非常に役に立った 役に立った 役に立たなかった

この Codelab はどのくらい簡単に理解できましたか?

簡単 普通 難しい