1. Цели
Обзор
В этом практическом занятии мы сосредоточимся на создании комплексного приложения Vertex AI Vision для мониторинга размера очереди с использованием видеозаписей из магазинов розничной торговли . Мы будем использовать встроенные функции анализа занятости предварительно обученной специализированной модели для сбора следующей информации:
- Подсчитайте количество людей, стоящих в очереди.
- Подсчитайте количество людей, обслуживаемых у стойки.
Что вы узнаете
- Как создать и развернуть приложение в Vertex AI Vision
- Как настроить RTSP-поток с использованием видеофайла и загрузить этот поток в Vertex AI Vision с помощью vaictl из Jupyter Notebook.
- Как использовать модель анализа заполняемости и её различные функции.
- Как искать видео в хранилище данных Vertex AI Vision Media Warehouse.
- Как подключить выходные данные к BigQuery, написать SQL-запрос для извлечения информации из JSON-вывода модели и использовать эти данные для разметки и аннотирования исходного видео.
Расходы:
Общая стоимость запуска этой лабораторной работы в Google Cloud составляет около 2 долларов.
2. Перед началом работы
Создайте проект и включите API:
- В консоли Google Cloud на странице выбора проекта выберите или создайте проект Google Cloud . Примечание : если вы не планируете сохранять ресурсы, созданные в ходе этой процедуры, создайте проект вместо выбора существующего. После выполнения этих шагов вы можете удалить проект, удалив все связанные с ним ресурсы. Перейдите к выбору проекта.
- Убедитесь, что для вашего облачного проекта включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .
- Включите Compute Engine, Vertex API, Notebook API и Vision AI API. Активируйте API.
Создайте учетную запись службы:
- В консоли Google Cloud перейдите на страницу «Создать учетную запись службы» . Перейдите по ссылке «Создать учетную запись службы» .
- Выберите свой проект.
- В поле « Имя учетной записи службы» введите имя. Консоль Google Cloud заполнит поле «Идентификатор учетной записи службы» на основе этого имени. В поле «Описание учетной записи службы» введите описание. Например, «Учетная запись службы для быстрого запуска».
- Нажмите «Создать и продолжить» .
- Для предоставления доступа к вашему проекту назначьте вашей учетной записи службы следующие роли:
- Vision AI > Редактор Vision AI
- Compute Engine > Администрирование вычислительных экземпляров (бета-версия)
- BigQuery > Администрирование BigQuery.
В списке «Выберите роль» выберите нужную роль. Для добавления дополнительных ролей нажмите «Добавить еще одну роль» и добавьте каждую дополнительную роль.
- Нажмите «Продолжить» .
- Нажмите «Готово» , чтобы завершить создание учетной записи службы. Не закрывайте окно браузера. Вы будете использовать ее на следующем шаге.
3. Настройка Jupyter Notebook
Перед созданием приложения в Occupancy Analytics необходимо зарегистрировать поток данных, который впоследствии сможет использовать приложение.
В этом руководстве вы создадите экземпляр Jupyter Notebook, который будет содержать видео, и будете отправлять потоковые видеоданные из этого блокнота. Мы используем Jupyter Notebook, поскольку он предоставляет нам гибкость для выполнения команд оболочки, а также запуска пользовательского кода предварительной/постобработки в одном месте, что очень удобно для быстрых экспериментов. Мы будем использовать этот блокнот для:
- Запустите RTSP- сервер в фоновом режиме.
- Запустите команду vaictl в фоновом режиме.
- Выполните запросы и запустите код обработки для анализа результатов аналитики заполняемости.
Создать блокнот Jupyter
Первым шагом при отправке видео из экземпляра Jupyter Notebook является создание блокнота с использованием учетной записи службы, созданной на предыдущем шаге.
- В консоли перейдите на страницу Vertex AI. Перейдите в Vertex AI Workbench.
- Нажмите «Управляемые пользователем блокноты»

- Нажмите «Новый ноутбук» > Tensorflow Enterprise 2.6 (с LTS) > «Без графических процессоров».

- Введите имя для блокнота Jupyter. Дополнительную информацию см. в разделе «Соглашение об именовании ресурсов» .

- Нажмите на РАСШИРЕННЫЕ ПАРАМЕТРЫ
- Прокрутите вниз до раздела «Разрешения» .
- Снимите флажок с параметра «Использовать учетную запись службы Compute Engine по умолчанию».
- Добавьте адрес электронной почты учетной записи службы, созданный на предыдущем шаге. И нажмите «Создать».

- После создания экземпляра нажмите кнопку «ОТКРЫТЬ JUPYTERLAB».
4. Настройте ноутбук для потоковой передачи видео.
Перед созданием приложения в Occupancy Analytics необходимо зарегистрировать поток данных, который впоследствии сможет использовать приложение.
В этом уроке мы будем использовать наш экземпляр Jupyter Notebook для размещения видео, а вы будете отправлять данные потокового видео из терминала Notebook.
Загрузите инструмент командной строки vaictl.
- В открытом экземпляре Jupyterlab откройте блокнот с помощью панели запуска.

- Загрузите инструмент командной строки Vertex AI Vision (vaictl), инструмент командной строки rtsp-сервера и инструмент OpenCV, используя следующую команду в ячейке блокнота:
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg
5. Загрузите видеофайл для потоковой передачи.
После настройки среды ноутбука с необходимыми инструментами командной строки вы можете скопировать образец видеофайла, а затем использовать vaictl для потоковой передачи видеоданных в ваше приложение для анализа заполняемости помещений.
Зарегистрировать новый поток
- В программе Vertex AI Vision нажмите вкладку «Потоки» на левой панели.
- Нажмите кнопку «Регистрация» вверху страницы.

- В поле "Имя потока" введите 'queue-stream'.
- В поле «Регион» выберите тот же регион, который был выбран при создании блокнота на предыдущем шаге.
- Нажмите «Зарегистрироваться»
Скопируйте образец видео на свою виртуальную машину.
- В своей записной книжке скопируйте пример видео с помощью следующей команды wget.
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4
Транслируйте видео с виртуальной машины и загружайте данные в свой поток.
- Чтобы отправить этот локальный видеофайл во входной поток приложения, используйте следующую команду в ячейке блокнота. Необходимо выполнить следующие подстановки переменных:
- PROJECT_ID: Идентификатор вашего проекта в Google Cloud.
- МЕСТОПОЛОЖЕНИЕ: Идентификатор вашего местоположения. Например, us-central1. Для получения дополнительной информации см. раздел «Облачные местоположения» .
- LOCAL_FILE: Имя файла локального видеофайла. Например,
seq25_h264.mp4.
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
- Запустите rtsp-simple-server, на котором мы будем передавать видеофайл по протоколу RTSP.
import os
import time
import subprocess
subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- Используйте инструмент командной строки ffmpeg для зацикливания видео в потоке RTSP.
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- Используйте инструмент командной строки vaictl для потоковой передачи видео с URI RTSP-сервера в созданную на предыдущем шаге очередь потоков Vertex AI Vision 'queue-stream'.
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)
Между началом операции загрузки vaictl и появлением видео на панели управления может пройти около 100 секунд.
После того, как поток станет доступен, вы сможете просмотреть видеопоток на вкладке «Потоки» панели управления Vertex AI Vision, выбрав поток очереди.
Перейдите во вкладку «Трансляции».

6. Создайте приложение
Первый шаг — создание приложения, которое будет обрабатывать ваши данные. Приложение можно рассматривать как автоматизированный конвейер, соединяющий следующие элементы:
- Ввод данных : Видеопоток вводится в поток.
- Анализ данных : После обработки данных можно добавить модель искусственного интеллекта (компьютерного зрения).
- Хранение данных : Две версии видеопотока (исходный поток и поток, обработанный моделью ИИ) могут храниться в медиатеке.
В консоли Google Cloud приложение отображается в виде графа.
Создайте пустое приложение
Прежде чем заполнять график приложения, необходимо сначала создать пустое приложение.
Создайте приложение в консоли Google Cloud.
- Перейдите в консоль Google Cloud.
- Откройте вкладку «Приложения» на панели управления Vertex AI Vision. Перейдите на вкладку «Приложения».
- Нажмите кнопку «Создать» .

- Введите « queue-app» в качестве названия приложения и выберите свой регион.
- Нажмите «Создать» .
Добавить узлы компонентов приложения
После создания пустого приложения вы можете добавить три узла в граф приложения:
- Узел приема данных : ресурс потока, который принимает данные, отправленные с RTSP-видеосервера, созданного вами в ноутбуке.
- Узел обработки : Модель анализа занятости, которая обрабатывает полученные данные.
- Узел хранения : хранилище медиафайлов, в котором хранятся обработанные видеоматериалы и которое служит хранилищем метаданных. Хранилища метаданных содержат аналитическую информацию о принятых видеоданных и информацию, полученную моделями искусственного интеллекта.
Добавьте узлы компонентов в ваше приложение через консоль.
- Откройте вкладку «Приложения» на панели управления Vertex AI Vision. Перейдите на вкладку «Приложения».
Это переведет вас к графической визуализации конвейера обработки.
Добавить узел для приема данных
- Чтобы добавить узел входного потока, выберите параметр « Потоки» в разделе «Соединители » бокового меню.
- В разделе «Источник» открывшегося меню «Поток» выберите «Добавить потоки» .
- В меню «Добавить потоки» выберите «queue-stream» .
- Чтобы добавить поток в граф приложения, нажмите «Добавить потоки» .
Добавьте узел обработки данных.
- Чтобы добавить узел модели подсчета заполняемости, выберите опцию «Аналитика заполняемости» в разделе «Специализированные модели» бокового меню.
- Оставьте значения по умолчанию для параметра «Люди» . Снимите флажок с параметра «Транспортные средства», если он уже выбран.

- В разделе «Дополнительные параметры» нажмите «Создать активные зоны/линии» .

- Нарисуйте активные зоны с помощью инструмента «Многоугольник», чтобы подсчитать количество людей в каждой зоне. Дайте зонам соответствующее обозначение.

- Нажмите на стрелку «Назад» вверху.

- Добавьте параметры времени задержки для обнаружения перегрузки, установив флажок.

Добавьте узел хранения данных.
- Чтобы добавить узел назначения (хранилища) выходных данных, выберите опцию VIsion AI Warehouse в разделе «Коннекторы» бокового меню.
- Чтобы открыть меню Vertex AI Warehouse Connector, нажмите «Подключить склад» .
- В меню Connect warehouse выберите «Создать новый склад» . Назовите склад queue-warehouse и оставьте значение TTL равным 14 дням.
- Нажмите кнопку «Создать» , чтобы добавить склад.
7. Подключите выходные данные к таблице BigQuery.
При добавлении коннектора BigQuery к вашему приложению Vertex AI Vision все выходные данные модели подключенного приложения будут загружаться в целевую таблицу.
Вы можете либо создать собственную таблицу BigQuery и указать её при добавлении коннектора BigQuery в приложение, либо позволить платформе приложений Vertex AI Vision автоматически создать таблицу.
Автоматическое создание таблиц
Если вы позволите платформе приложений Vertex AI Vision автоматически создавать таблицу, вы можете указать этот параметр при добавлении узла коннектора BigQuery.
Для автоматического создания таблиц необходимо соблюдать следующие условия для набора данных и таблиц:
- Набор данных: Автоматически созданный набор данных называется visionai_dataset.
- Таблица: Имя автоматически созданной таблицы — visionai_dataset.APPLICATION_ID.
- Обработка ошибок:
- Если таблица с тем же именем в том же наборе данных уже существует, автоматическое создание не происходит.
- Откройте вкладку «Приложения» на панели управления Vertex AI Vision. Перейдите на вкладку «Приложения».
- Выберите в списке пункт «Просмотреть приложение» рядом с названием вашего приложения.
- На странице конструктора приложений выберите BigQuery в разделе «Коннекторы» .
- Оставьте поле "Путь в BigQuery" пустым.

- В разделе «Метаданные хранилища» выберите только «Аналитика заполняемости» и снимите флажок «Потоки».
Итоговый график приложения должен выглядеть примерно так:

8. Разверните приложение для использования.
После того, как вы создали полноценное приложение со всеми необходимыми компонентами, последним шагом для его использования является развертывание.
- Откройте вкладку «Приложения» на панели управления Vertex AI Vision. Перейдите на вкладку «Приложения».
- Выберите приложение «Просмотр» рядом с приложением «Очередь» в списке.
- На странице Studio нажмите кнопку «Развернуть» .
- В появившемся диалоговом окне подтверждения нажмите кнопку «Развернуть» . Операция развертывания может занять несколько минут. После завершения развертывания рядом с узлами появятся зеленые галочки.

9. Выполните поиск видеоконтента в хранилище данных.
После загрузки видеоданных в приложение для обработки вы можете просмотреть проанализированные видеоданные и выполнить поиск по данным на основе аналитики занятости помещений.
- Откройте вкладку «Склады» на панели управления Vertex AI Vision. Перейдите на вкладку «Склады».
- Найдите в списке склад-очередь и нажмите «Просмотреть активы» .
- В разделе «Количество людей » установите минимальное значение равным 1, а максимальное — 5.
- Для фильтрации обработанных видеоданных, хранящихся в хранилище медиафайлов Vertex AI Vision, нажмите «Поиск» .

Отображение сохраненных видеоданных, соответствующих критериям поиска в консоли Google Cloud.
10. Аннотирование и анализ выходных данных с помощью таблицы BigQuery.
- В блокноте инициализируйте следующие переменные в ячейке.
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
- Теперь мы будем захватывать кадры из RTSP-потока, используя следующий код:
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone
frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()
stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
global frames
while True:
ret, frame = stream.read()
frame_ts = datetime.now(timezone.utc).timestamp() * 1000
if ret:
with frame_buffer_lock:
while len(frame_buffer) >= frame_buffer_size:
_ = frame_buffer.popitem(last=False)
frame_buffer[frame_ts] = frame
frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
- Извлеките данные из таблицы BigQuery, включая метки времени и аннотации, и создайте каталог для хранения захваченных изображений кадров:
from google.cloud import bigquery
import pandas as pd
client = bigquery.Client(project=PROJECT_ID)
query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""
bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
- Добавьте аннотации к кадрам, используя следующий код:
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output
im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)
dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
while True:
try:
annotations_df = client.query(f'''
SELECT ingestion_time, annotation
FROM `{bq_table}`
WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
''').to_dataframe()
except ValueError as e:
continue
bq_max_ingest_ts = annotations_df['ingestion_time'].max()
for _, row in annotations_df.iterrows():
with frame_buffer_lock:
frame_ts = np.asarray(list(frame_buffer.keys()))
delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
delta_tx_idx = delta_ts.argmin()
closest_ts_delta = delta_ts[delta_tx_idx]
closest_ts = frame_ts[delta_tx_idx]
if closest_ts_delta > frame_buffer_error_milliseconds: continue
image = frame_buffer[closest_ts]
annotations = json.loads(row['annotation'])
for box in annotations['identifiedBoxes']:
image = cv2.rectangle(
image,
(
int(box['normalizedBoundingBox']['xmin']*im_width),
int(box['normalizedBoundingBox']['ymin']*im_height)
),
(
int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
),
(255, 0, 0), 2
)
img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
cv2.imwrite(img_filename, image)
binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
curr_framedata = {
'path': img_filename,
'timestamp_error': closest_ts_delta,
'counts': {
**{
k['annotation']['displayName'] : cntext(k['counts'])
for k in annotations['stats']["activeZoneCounts"]
},
'full-frame': cntext(annotations['stats']["fullFrameCount"])
}
}
framedata[img_filename] = curr_framedata
if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
dashdelta = datetime.now()
clear_output()
display(HTML(f'''
<h1>Queue Monitoring Application</h1>
<p>Live Feed of the queue camera:</p>
<p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
<table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
<caption>Current Model Outputs</caption>
<thead>
<tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
</thead>
<tbody>
<tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
<tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
<tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
<tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
</tbody>
</table>
<p> </p>
'''))
except KeyboardInterrupt:
print('Stopping Live Monitoring')

- Остановите задачу аннотирования с помощью кнопки «Стоп» в строке меню блокнота.

- Вы можете повторно просмотреть отдельные кадры, используя следующий код:
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
display(framedata[imgs[frame]])
display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
description='Frame #:',
value=0,
min=0, max=len(imgs)-1, step=1,
layout=Layout(width='100%')))

11. Поздравляем!
Поздравляем, вы завершили лабораторную работу!
Уборка
Чтобы избежать списания средств с вашего аккаунта Google Cloud за ресурсы, используемые в этом руководстве, либо удалите проект, содержащий эти ресурсы, либо сохраните проект и удалите отдельные ресурсы.
Удалить проект
Удалить отдельные ресурсы
Ресурсы
https://cloud.google.com/vision-ai/docs/overview
https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial