1. Cele
Omówienie
W tym ćwiczeniu skupimy się na kompleksowym tworzeniu aplikacji Vertex AI Vision, która będzie monitorować wielkość kolejki na podstawie nagrań wideo z placówki handlu detalicznego. Aby rejestrować te informacje, będziemy korzystać z wbudowanych funkcji wytrenowanego modelu specjalistycznego Analytics dotyczących zajętości:
- Policz osoby stojące w kolejce.
- Policz osoby, które są obsługiwane przy ladzie.
Czego się nauczysz
- Jak utworzyć aplikację w Vertex AI Vision i ją wdrożyć
- Jak skonfigurować strumień RTSP za pomocą pliku wideo i przesłać go do Vertex AI Vision za pomocą narzędzia vaictl z notatnika Jupyter.
- Jak korzystać z modelu analityki zajętości i jego różnych funkcji.
- Jak wyszukiwać filmy w hurtowni multimediów Vertex AI Vision.
- Jak połączyć dane wyjściowe z BigQuery, napisać zapytanie SQL, aby wyodrębnić statystyki z danych wyjściowych modelu w formacie JSON, i użyć danych wyjściowych do etykietowania i dodawania adnotacji do oryginalnego filmu.
Koszt:
Całkowity koszt przeprowadzenia tego ćwiczenia w Google Cloud wynosi około 2 USD.
2. Zanim zaczniesz
Utwórz projekt i włącz interfejsy API:
- W konsoli Google Cloud na stronie selektora projektów wybierz lub utwórz projekt Google Cloud. Uwaga: Jeśli nie planujesz zachować zasobów utworzonych w ramach tej procedury, utwórz projekt zamiast wybierać istniejący projekt. Po jej zakończeniu możesz usunąć projekt wraz ze wszystkimi zasobami, które są z nim powiązane. Otwórz selektor projektów
- Sprawdź, czy w projekcie Cloud włączone są płatności. Dowiedz się, jak sprawdzić, czy w projekcie są włączone płatności.
- Włącz interfejsy Compute Engine API, Vertex API, Notebook API i Vision AI API. Włączanie interfejsów API
Utwórz konto usługi:
- W konsoli Google Cloud otwórz stronę Utwórz konto usługi. Otwórz stronę Utwórz konto usługi
- Wybierz projekt.
- W polu Nazwa konta usługi wpisz nazwę. Konsola Google Cloud wypełni pole Identyfikator konta usługi na podstawie tej nazwy. W polu Opis konta usługi wpisz opis. Na przykład Konto usługi na potrzeby szybkiego startu.
- Kliknij Utwórz i kontynuuj.
- Aby przyznać dostęp do projektu, przypisz do konta usługi te role:
- Vision AI > Edytujący Vision AI
- Compute Engine > Administrator instancji Compute (beta)
- BigQuery > Administrator BigQuery.
Na liście Wybierz rolę wybierz rolę. Aby dodać kolejne role, kliknij Dodaj kolejną rolę i dodaj każdą z nich.
- Kliknij Dalej.
- Aby zakończyć tworzenie konta usługi, kliknij Gotowe. Nie zamykaj okna przeglądarki. Będzie Ci potrzebna w następnym kroku.
3. Konfigurowanie notatnika Jupyter
Zanim utworzysz aplikację w Occupancy Analytics, musisz zarejestrować strumień, który będzie później używany przez aplikację.
W tym samouczku utworzysz instancję notatnika Jupyter, która będzie hostować film, a następnie wyślesz dane strumieniowego wideo z notatnika. Używamy notatnika Jupyter, ponieważ zapewnia on elastyczność w zakresie wykonywania poleceń powłoki oraz uruchamiania niestandardowego kodu przetwarzania wstępnego i postprocesowego w jednym miejscu, co jest bardzo przydatne w przypadku szybkich eksperymentów. Będziemy używać tego notatnika do:
- Uruchom serwer rtsp jako proces w tle.
- Uruchamianie polecenia vaictl jako procesu w tle
- Uruchamianie zapytań i kodu przetwarzania w celu analizowania danych wyjściowych analizy obłożenia
Tworzenie notatnika Jupyter
Pierwszym krokiem w wysyłaniu filmów z instancji notatnika Jupyter jest utworzenie notatnika za pomocą konta usługi utworzonego w poprzednim kroku.
- W konsoli otwórz stronę Vertex AI. Otwórz Vertex AI Workbench
- Kliknij Notatniki zarządzane przez użytkownika.

- Kliknij Nowy notatnik > TensorFlow Enterprise 2.6 (z LTS) > Bez GPU.

- Wpisz nazwę notatnika Jupyter. Więcej informacji znajdziesz w artykule Zalecane metody nazywania zasobów.

- Kliknij OPCJE ZAAWANSOWANE.
- Przewiń w dół do sekcji Sekcje uprawnień.
- Odznacz opcję Użyj domyślnego konta usługi Compute Engine.
- Dodaj adres e-mail konta usługi utworzonego w poprzednim kroku. Kliknij Utwórz.

- Po utworzeniu instancji kliknij OTWÓRZ JUPYTERLAB.
4. Konfigurowanie notebooka do strumieniowania wideo
Zanim utworzysz aplikację w Occupancy Analytics, musisz zarejestrować strumień, który będzie później używany przez aplikację.
W tym samouczku użyjemy instancji notatnika Jupyter do hostowania filmu, a dane strumieniowe wideo wyślemy z terminala notatnika.
Pobierz narzędzie wiersza poleceń vaictl
- W otwartej instancji JupyterLab otwórz notatnik z programu uruchamiającego.

- Pobierz narzędzie wiersza poleceń Vertex AI Vision (vaictl), narzędzie wiersza poleceń serwera RTSP i narzędzie open-cv, wpisując w komórce notatnika to polecenie:
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg
5. Przetwarzanie pliku wideo do strumieniowania
Po skonfigurowaniu środowiska notatnika za pomocą wymaganych narzędzi wiersza poleceń możesz skopiować przykładowy film, a następnie użyć narzędzia vaictl do przesyłania strumieniowego danych wideo do aplikacji do analizy obłożenia.
Rejestrowanie nowego strumienia
- W panelu po lewej stronie Vertex AI Vision kliknij kartę strumieni kliknięć.
- U góry kliknij przycisk Zarejestruj się

- W polu Nazwa strumienia wpisz „queue-stream”.
- W sekcji Region wybierz ten sam region, który został wybrany podczas tworzenia notatnika w poprzednim kroku.
- Kliknij Zarejestruj.
Kopiowanie przykładowego filmu na maszynę wirtualną
- W notatniku skopiuj przykładowy film za pomocą tego polecenia wget.
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4
Strumieniowanie wideo z maszyny wirtualnej i pozyskiwanie danych do strumienia
- Aby wysłać ten lokalny plik wideo do strumienia wejściowego aplikacji, użyj tego polecenia w komórce notatnika. Musisz zastąpić te zmienne:
- PROJECT_ID: identyfikator Twojego projektu Google Cloud.
- LOCATION: identyfikator lokalizacji. Na przykład us-central1. Więcej informacji znajdziesz w artykule Lokalizacje Google Cloud.
- LOCAL_FILE: nazwa lokalnego pliku wideo. Na przykład
seq25_h264.mp4.
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
- Uruchom serwer rtsp-simple-server, na którym będziemy przesyłać strumieniowo plik wideo za pomocą protokołu RTSP.
import os
import time
import subprocess
subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- Używanie narzędzia wiersza poleceń ffmpeg do zapętlania filmu w strumieniu rtsp
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- Użyj narzędzia wiersza poleceń vaictl, aby przesyłać strumieniowo wideo z identyfikatora URI serwera rtsp do naszego strumienia Vertex AI Vision „queue-stream” utworzonego w poprzednim kroku.
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)
Od rozpoczęcia operacji vaictl ingest do pojawienia się filmu na panelu może minąć około 100 sekund.
Gdy przesyłanie strumieniowe będzie dostępne, możesz wyświetlić strumień wideo na karcie Strumienie w panelu Vertex AI Vision, wybierając strumień kolejki.

6. Tworzenie aplikacji
Pierwszym krokiem jest utworzenie aplikacji, która będzie przetwarzać Twoje dane. Aplikację można traktować jako zautomatyzowany potok, który łączy:
- Pozyskiwanie danych: kanał wideo jest pozyskiwany do strumienia.
- Analiza danych: po pozyskaniu danych można dodać model AI(rozpoznawanie obrazów).
- Przechowywanie danych: 2 wersje strumienia wideo (oryginalny strumień i strumień przetworzony przez model AI) mogą być przechowywane w hurtowni danych multimedialnych.
W konsoli Google Cloud aplikacja jest przedstawiana w postaci wykresu.
Tworzenie pustej aplikacji
Zanim wypełnisz wykres aplikacji, musisz najpierw utworzyć pustą aplikację.
Utwórz aplikację w konsoli Google Cloud.
- Otwórz konsolę Google Cloud.
- Otwórz kartę Aplikacje w panelu Vertex AI Vision. Otwórz kartę Aplikacje
- Kliknij przycisk Utwórz.

- Wpisz „queue-app” jako nazwę aplikacji i wybierz region.
- Kliknij Utwórz.
Dodawanie węzłów komponentów aplikacji
Po utworzeniu pustej aplikacji możesz dodać do wykresu aplikacji 3 węzły:
- Węzeł pozyskiwania: zasób strumienia, który pozyskuje dane wysyłane z serwera wideo RTSP utworzonego w notatniku.
- Węzeł przetwarzania: model analityki zajętości, który przetwarza pozyskane dane.
- Węzeł pamięci masowej: hurtownia danych multimedialnych, w której przechowywane są przetworzone filmy i która służy jako magazyn metadanych. Bazy danych metadanych zawierają informacje analityczne o przetworzonych danych wideo oraz informacje wywnioskowane przez modele AI.
Dodaj węzły komponentów do aplikacji w konsoli.
- Otwórz kartę Aplikacje w panelu Vertex AI Vision. Otwórz kartę Aplikacje
Zobaczysz wizualizację grafu potoku przetwarzania.
Dodawanie węzła pozyskiwania danych
- Aby dodać węzeł strumienia wejściowego, w sekcji Oprogramowanie sprzęgające w bocznym menu wybierz opcję Strumienie.
- W sekcji Źródło w wyświetlonym menu Strumień kliknij Dodaj strumienie.
- W menu Dodaj strumienie wybierz queue-stream.
- Aby dodać strumień do wykresu aplikacji, kliknij Dodaj strumienie.
Dodawanie węzła przetwarzania danych
- Aby dodać węzeł modelu zliczania osób, w sekcji Modele specjalistyczne w bocznym menu wybierz opcję analiza liczby osób.
- Pozostaw domyślny wybór Osoby. Odznacz Pojazdy, jeśli jest już zaznaczone.

- W sekcji Opcje zaawansowane kliknij Utwórz aktywne strefy/linie
. - Narysuj aktywne strefy za pomocą narzędzia Wielokąt, aby zliczać osoby w danej strefie. Odpowiednio oznacz strefę.

- U góry kliknij strzałkę wstecz.

- Dodaj ustawienia czasu przebywania, aby wykrywać zatory, klikając pole wyboru.

Dodawanie węzła pamięci danych
- Aby dodać węzeł docelowy (lokalizację w pamięci masowej) dla danych wyjściowych, w sekcji Oprogramowanie sprzęgające w bocznym menu wybierz opcję Hurtownia Vision AI.
- Kliknij złącze Vertex AI Warehouse, aby otworzyć jego menu, a następnie kliknij Połącz hurtownię.
- W menu Połącz hurtownię wybierz Utwórz nową hurtownię. Nazwij magazyn queue-warehouse i pozostaw czas TTL ustawiony na 14 dni.
- Kliknij przycisk Utwórz, aby dodać hurtownię.
7. Łączenie danych wyjściowych z tabelą BigQuery
Gdy dodasz łącznik BigQuery do aplikacji Vertex AI Vision, wszystkie dane wyjściowe modelu połączonej aplikacji zostaną pozyskane do tabeli docelowej.
Możesz utworzyć własną tabelę BigQuery i określić ją podczas dodawania do aplikacji konektora BigQuery lub pozwolić platformie aplikacji Vertex AI Vision na automatyczne utworzenie tabeli.
Automatyczne tworzenie tabel
Jeśli pozwolisz platformie aplikacji Vertex AI Vision automatycznie utworzyć tabelę, możesz określić tę opcję podczas dodawania węzła oprogramowania sprzęgającego BigQuery.
Jeśli chcesz korzystać z automatycznego tworzenia tabel, musisz spełnić te warunki dotyczące zbioru danych i tabeli:
- Zbiór danych: automatycznie utworzona nazwa zbioru danych to visionai_dataset.
- Tabela: automatycznie utworzona nazwa tabeli to visionai_dataset.APPLICATION_ID.
- Obsługa błędów:
- Jeśli w tym samym zbiorze danych istnieje tabela o tej samej nazwie, nie zostanie ona utworzona automatycznie.
- Otwórz kartę Aplikacje w panelu Vertex AI Vision. Otwórz kartę Aplikacje
- Na liście obok nazwy aplikacji kliknij Wyświetl aplikację.
- Na stronie kreatora aplikacji w sekcji Oprogramowanie sprzęgające wybierz BigQuery.
- Pole Ścieżka do BigQuery pozostaw puste.

- W sekcji Przechowuj metadane z: wybierz tylko „Analiza zajętości” i odznacz strumienie.
Ostateczny wykres aplikacji powinien wyglądać tak:

8. Wdrażanie aplikacji do użytkowania
Po utworzeniu aplikacji kompleksowej ze wszystkimi niezbędnymi komponentami ostatnim krokiem jest jej wdrożenie.
- Otwórz kartę Aplikacje w panelu Vertex AI Vision. Otwórz kartę Aplikacje
- Na liście obok aplikacji queue-app kliknij Wyświetl aplikację.
- Na stronie Studio kliknij przycisk Wdróż.
- W oknie potwierdzenia kliknij Wdróż. Wdrożenie może potrwać kilka minut. Po zakończeniu wdrażania obok węzłów pojawią się zielone ikony potwierdzenia.

9. Wyszukiwanie treści wideo w magazynie danych
Po pozyskaniu danych wideo do aplikacji do przetwarzania możesz wyświetlać przeanalizowane dane wideo i wyszukiwać dane na podstawie informacji analitycznych dotyczących zajętości.
- Otwórz kartę Magazyny w panelu Vertex AI Vision. Otwórz kartę Magazyny
- Na liście znajdź magazyn kolejkowy i kliknij Wyświetl komponenty.
- W sekcji Liczba osób ustaw wartość Min. na 1, a wartość Maks. na 5.
- Aby przefiltrować przetworzone dane wideo przechowywane w Media Warehouse w Vertex AI Vision, kliknij Szukaj.

Widok przechowywanych danych wideo, które spełniają kryteria wyszukiwania w konsoli Google Cloud.
10. Adnotowanie i analiza danych wyjściowych za pomocą tabeli BigQuery
- W notatniku zainicjuj w komórce te zmienne.
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
- Teraz przechwycimy klatki ze strumienia RTSP za pomocą tego kodu:
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone
frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()
stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
global frames
while True:
ret, frame = stream.read()
frame_ts = datetime.now(timezone.utc).timestamp() * 1000
if ret:
with frame_buffer_lock:
while len(frame_buffer) >= frame_buffer_size:
_ = frame_buffer.popitem(last=False)
frame_buffer[frame_ts] = frame
frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
- Pobierz sygnaturę czasową danych i informacje o adnotacjach z tabeli BigQuery i utwórz katalog do przechowywania przechwyconych obrazów klatek:
from google.cloud import bigquery
import pandas as pd
client = bigquery.Client(project=PROJECT_ID)
query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""
bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
- Dodaj adnotacje do ramek, używając następującego kodu:
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output
im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)
dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
while True:
try:
annotations_df = client.query(f'''
SELECT ingestion_time, annotation
FROM `{bq_table}`
WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
''').to_dataframe()
except ValueError as e:
continue
bq_max_ingest_ts = annotations_df['ingestion_time'].max()
for _, row in annotations_df.iterrows():
with frame_buffer_lock:
frame_ts = np.asarray(list(frame_buffer.keys()))
delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
delta_tx_idx = delta_ts.argmin()
closest_ts_delta = delta_ts[delta_tx_idx]
closest_ts = frame_ts[delta_tx_idx]
if closest_ts_delta > frame_buffer_error_milliseconds: continue
image = frame_buffer[closest_ts]
annotations = json.loads(row['annotation'])
for box in annotations['identifiedBoxes']:
image = cv2.rectangle(
image,
(
int(box['normalizedBoundingBox']['xmin']*im_width),
int(box['normalizedBoundingBox']['ymin']*im_height)
),
(
int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
),
(255, 0, 0), 2
)
img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
cv2.imwrite(img_filename, image)
binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
curr_framedata = {
'path': img_filename,
'timestamp_error': closest_ts_delta,
'counts': {
**{
k['annotation']['displayName'] : cntext(k['counts'])
for k in annotations['stats']["activeZoneCounts"]
},
'full-frame': cntext(annotations['stats']["fullFrameCount"])
}
}
framedata[img_filename] = curr_framedata
if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
dashdelta = datetime.now()
clear_output()
display(HTML(f'''
<h1>Queue Monitoring Application</h1>
<p>Live Feed of the queue camera:</p>
<p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
<table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
<caption>Current Model Outputs</caption>
<thead>
<tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
</thead>
<tbody>
<tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
<tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
<tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
<tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
</tbody>
</table>
<p> </p>
'''))
except KeyboardInterrupt:
print('Stopping Live Monitoring')

- Zatrzymaj zadanie adnotacji, klikając przycisk Zatrzymaj na pasku menu notatnika.

- Możesz ponownie wyświetlić poszczególne klatki, używając tego kodu:
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
display(framedata[imgs[frame]])
display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
description='Frame #:',
value=0,
min=0, max=len(imgs)-1, step=1,
layout=Layout(width='100%')))

11. Gratulacje
Gratulacje! Moduł został ukończony.
Czyszczenie danych
Aby uniknąć obciążenia konta Google Cloud opłatami za zasoby zużyte w tym samouczku, możesz usunąć projekt zawierający te zasoby lub zachować projekt i usunąć poszczególne zasoby.
Usuwanie projektu
Usuwanie poszczególnych zasobów
Materiały
https://cloud.google.com/vision-ai/docs/overview
https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial