1. 目标
概览
此 Codelab 将重点介绍如何端到端创建 Vertex AI Vision 应用,以使用零售视频素材监控排队人数 。我们将使用预训练的专用模型 Occupancy analytics 的内置功能来捕获以下内容:
- 统计排队人数。
- 统计柜台接受服务的人数。
学习内容
- 如何在 Vertex AI Vision 中创建应用并进行部署
- 如何使用视频文件设置 RTSP 流,以及如何使用 Jupyter 笔记本中的 vaictl 将该流注入到 Vertex AI Vision 中。
- 如何使用 Occupancy Analytics 模型及其不同功能。
- 如何在存储 Vertex AI Vision 的媒体仓库中搜索视频。
- 如何将输出连接到 BigQuery,编写 SQL 查询以从模型的 JSON 输出中提取分析洞见,并使用该输出为原始视频添加标签和注解。
费用:
在 Google Cloud 上运行此实验的总费用约为 2 美元。
2. 准备工作
创建项目并启用 API:
- 在 Google Cloud 控制台的“项目选择器”页面上,选择或 创建 Google Cloud 项目。注意:如果您不打算保留在此过程中创建的资源,请创建新的项目,而不要选择现有的项目。完成本教程介绍的步骤后,您可以删除所创建的项目,这会移除与该项目关联的所有资源。前往项目选择器
- 确保您的云项目已启用结算功能。了解如何 检查项目是否已启用结算功能。
- 启用 Compute Engine、Vertex API、Notebook API 和 Vision AI API。启用 API
创建服务账号:
- 在 Google Cloud 控制台中,转到创建服务账号 页面。前往“创建服务账号”
- 选择您的项目。
- 在服务账号名称 字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务账号 ID 字段。在服务账号说明 字段中,输入说明。例如,快速入门的服务账号。
- 点击创建并继续 。
- 如需提供对项目的访问权限,请向服务账号授予以下角色:
- Vision AI > Vision AI Editor
- Compute Engine > Compute Instance Admin(Beta 版)
- BigQuery > BigQuery Admin 。
在选择角色 列表中,选择一个角色。如需添加其他角色,请点击添加其他角色 ,然后添加其他各个角色。
- 点击继续 。
- 点击完成 以完成服务账号的创建过程。不要关闭浏览器窗口。您将在下一步骤中用到它。
3. 设置 Jupyter 笔记本
在 Occupancy Analytics 中创建应用之前,您必须注册一个视频流,该视频流稍后可供应用使用。
在本教程中,您将创建一个托管视频的 Jupyter 笔记本实例,并从该笔记本发送流式视频数据。我们之所以使用 Jupyter 笔记本,是因为它让我们能够灵活地执行 shell 命令,以及在同一个位置运行自定义预处理/后处理代码,这非常适合快速实验。我们将使用此笔记本来:
- 将 rtsp 服务器作为后台进程运行
- 将 vaictl 命令作为后台进程运行
- 运行查询和处理代码以分析 Occupancy Analytics 输出
创建 Jupyter 笔记本
从 Jupyter 笔记本实例发送视频的第一步是使用我们在上一步中创建的服务账号创建笔记本。
- 在控制台中,前往 Vertex AI 页面。前往 Vertex AI Workbench
- 点击“用户管理的笔记本”

- 依次点击新建笔记本 > TensorFlow 企业版 2.6(带 LTS)> 不带 GPU

- 输入 Jupyter 笔记本的名称。如需了解详情,请参阅 资源命名惯例。

- 点击高级选项
- 向下滚动到权限部分
- 取消选中使用 Compute Engine 默认服务账号 选项
- 添加在上一步中创建的服务账号电子邮件地址。然后点击创建 。

- 创建实例后,点击打开 JUPYTERLAB 。
4. 设置笔记本以流式传输视频
在 Occupancy Analytics 中创建应用之前,您必须注册一个视频流,该视频流稍后可供应用使用。
在本教程中,我们将使用 Jupyter 笔记本实例来托管视频,并从笔记本终端发送流式视频数据。
下载 vaictl 命令行工具
- 在打开的 JupyterLab 实例中,从启动器打开笔记本 。

- 使用笔记本单元中的以下命令下载 Vertex AI Vision (vaictl) 命令行工具、rtsp 服务器命令行工具、open-cv 工具:
!wget -q https://github.com/aler9/rtsp-simple-server/releases/download/v0.20.4/rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!wget -q https://github.com/google/visionai/releases/download/v0.0.4/visionai_0.0-4_amd64.deb
!tar -xf rtsp-simple-server_v0.20.4_linux_amd64.tar.gz
!pip install opencv-python --quiet
!sudo apt-get -qq remove -y visionai
!sudo apt-get -qq install -y ./visionai_0.0-4_amd64.deb
!sudo apt-get -qq install -y ffmpeg
5. 注入视频文件以进行流式传输
使用所需的命令行工具设置笔记本环境后,您可以复制示例视频文件,然后使用 vaictl 将视频数据流式传输到 Occupancy Analytics 应用。
注册新的视频流
- 点击 Vertex AI Vision 左侧面板中的“视频流”标签页。
- 点击顶部的“注册”按钮

- 在“视频流名称”中,输入 ‘queue-stream'
- 在“区域”中,选择在上一步创建笔记本时选择的同一区域。
- 点击注册
将示例视频复制到虚拟机
- 在笔记本中,使用以下 wget 命令复制示例视频。
!wget -q https://github.com/vagrantism/interesting-datasets/raw/main/video/collective_activity/seq25_h264.mp4
从虚拟机流式传输视频并将数据注入到视频流中
- 如需将此本地视频文件发送到应用输入流,请在笔记本单元中使用以下命令。您必须进行以下变量替换:
- PROJECT_ID:您的 Google Cloud 项目 ID。
- LOCATION:您的位置 ID。例如 us-central1。如需了解详情,请参阅 Cloud 位置。
- LOCAL_FILE:本地视频文件的文件名。例如,
seq25_h264.mp4。
PROJECT_ID='<Your Google Cloud project ID>'
LOCATION='<Your stream location>'
LOCAL_FILE='seq25_h264.mp4'
STREAM_NAME='queue-stream'
- 启动 rtsp-simple-server,我们将在其中使用 rtsp 协议流式传输视频文件
import os
import time
import subprocess
subprocess.Popen(["nohup", "./rtsp-simple-server"], stdout=open('rtsp_out.log', 'a'), stderr=open('rtsp_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- 使用 ffmpeg 命令行工具在 rtsp 流中循环播放视频
subprocess.Popen(["nohup", "ffmpeg", "-re", "-stream_loop", "-1", "-i", LOCAL_FILE, "-c", "copy", "-f", "rtsp", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('ffmpeg_out.log', 'a'), stderr=open('ffmpeg_err.log', 'a'), preexec_fn=os.setpgrp)
time.sleep(5)
- 使用 vaictl 命令行工具将视频从 rtsp 服务器 URI 流式传输到我们在上一步中创建的 Vertex AI Vision 视频流“queue-stream”。
subprocess.Popen(["nohup", "vaictl", "-p", PROJECT_ID, "-l", LOCATION, "-c", "application-cluster-0", "--service-endpoint", "visionai.googleapis.com", "send", "rtsp", "to", "streams", "queue-stream", "--rtsp-uri", f"rtsp://localhost:8554/{LOCAL_FILE.split('.')[0]}"], stdout=open('vaictl_out.log', 'a'), stderr=open('vaictl_err.log', 'a'), preexec_fn=os.setpgrp)
从启动 vaictl 注入操作到视频显示在信息中心内,可能需要大约 100 秒。
视频流注入可用后,您可以通过选择 queue-stream 视频流,在 Vertex AI Vision 信息中心的视频流 标签页中查看视频 feed。

6. 创建应用
第一步是创建一个处理数据的应用。应用可以视为连接以下内容的自动化流水线:
- 数据注入:将视频 feed 注入到视频流中。
- 数据分析:注入后,可以添加 AI(计算机视觉)模型。
- 数据存储:视频 feed 的两个版本(原始视频流和 AI 模型处理的视频流)可以存储在媒体仓库中。
在 Google Cloud 控制台中,应用以图表的形式表示。
创建空应用
在填充应用图表之前,您必须先创建一个空应用。
在 Google Cloud 控制台中创建应用。
- 前往 Google Cloud 控制台。
- 打开 Vertex AI Vision 信息中心的应用 标签页。 前往“应用”标签页
- 点击 Create (创建)按钮。

- 输入“queue-app'”作为应用名称,然后选择您的区域。
- 点击创建 。
添加应用组件节点
创建空应用后,您可以向应用图表添加三个节点:
- 注入节点:注入从您在笔记本中创建的 rtsp 视频服务器发送的数据的视频流资源。
- 处理节点:对注入的数据执行操作的 Occupancy Analytics 模型。
- 存储节点:存储已处理视频并充当元数据存储区的媒体仓库。元数据存储区包括有关注入的视频数据的数据分析信息,以及 AI 模型推断的信息。
在控制台中向应用添加组件节点。
- 打开 Vertex AI Vision 信息中心的应用 标签页。前往“应用”标签页
系统会将您转至处理流水线的图表可视化界面。
添加数据注入节点
- 如需添加输入流节点,请选择侧边菜单的连接器 部分中的视频流 选项。
- 在打开的信息流 菜单的来源 部分中,选择添加信息流 。
- 在添加视频流 菜单中,选择 queue-stream 。
- 如需将视频流添加到应用图表,请点击添加视频流 。
添加数据处理节点
- 如需添加人数统计模型节点,请选择侧边菜单的专用模型 部分中的 Occupancy Analytics 选项。
- 保留默认选择的人员 。如果已选择车辆 ,请取消选中。

- 在“高级选项”部分中,点击创建活跃区域/线条

- 使用多边形工具绘制活跃区域,以统计该区域内的人数。相应地为该区域添加标签

- 点击顶部的后退箭头。

- 点击复选框,添加停留时间设置以检测拥塞情况。

添加数据存储节点
- 如需添加输出目标位置(存储)节点,请选择侧边菜单的连接器 部分中的 Vision AI 仓库 选项。
- 点击 Vertex AI 仓库 连接器以打开其菜单,然后点击连接仓库 。
- 在连接仓库 菜单中,选择创建新仓库 。将仓库命名为 queue-warehouse ,并将 TTL 时长保留 14 天。
- 点击创建 按钮以添加仓库。
7. 将输出连接到 BigQuery 表
当您向 Vertex AI Vision 应用添加 BigQuery 连接器时,所有关联的应用模型输出都将注入到目标表中。
您可以创建自己的 BigQuery 表,并在向应用添加 BigQuery 连接器时指定该表,也可以让 Vertex AI Vision 应用平台自动创建该表。
自动创建表
如果您让 Vertex AI Vision 应用平台自动创建表,则可以在添加 BigQuery 连接器节点时指定此选项。
如果您想使用自动创建表,则适用以下数据集和表条件:
- 数据集:自动创建的数据集名称为 visionai_dataset。
- 表:自动创建的表名称为 visionai_dataset.APPLICATION_ID。
- 错误处理:
- 如果存在同一数据集下具有相同名称的表,则不会自动创建。
- 打开 Vertex AI Vision 信息中心的应用 标签页。前往“应用”标签页
- 从列表中选择应用名称旁边的查看应用 。
- 在应用构建器页面上,从连接器 部分中选择 BigQuery 。
- 将 BigQuery 路径 字段留空。

- 在存储元数据来源: 中,仅选择 ‘Occupancy Analytics' 并取消选中视频流。
最终的应用图表应如下所示:

8. 部署应用以供使用
使用所有必要组件构建端到端应用后,使用该应用的最后一步是部署该应用。
- 打开 Vertex AI Vision 信息中心的应用 标签页。前往“应用”标签页
- 在列表中选择 queue-app 应用旁边的查看应用 。
- 在 Studio 页面中,点击 Deploy (部署)按钮。
- 在以下确认对话框中,点击 Deploy (部署)。部署操作可能需要几分钟才能完成。部署完成后,节点旁边会显示绿色对勾标记。

9. 在存储仓库中搜索视频内容
将视频数据注入到处理应用后,您可以查看分析的视频数据,并根据 Occupancy Analytics 信息搜索数据。
- 打开 Vertex AI Vision 信息中心的仓库 标签页。前往“仓库”标签页
- 在列表中找到 queue-warehouse 仓库,然后点击查看资产 。
- 在人数统计 部分中,将最小值 设置为 1,并将最大值 设置为 5。
- 如需过滤存储在 Vertex AI Vision 的媒体仓库中的已处理视频数据,请点击搜索 。

在 Google Cloud 控制台中查看符合搜索条件的存储视频数据。
10. 使用 BigQuery 表为输出添加注解并进行分析
- 在笔记本中,在单元中初始化以下变量。
DATASET_ID='vision_ai_dataset'
bq_table=f'{PROJECT_ID}.{DATASET_ID}.queue-app'
frame_buffer_size=10000
frame_buffer_error_milliseconds=5
dashboard_update_delay_seconds=3
rtsp_url='rtsp://localhost:8554/seq25_h264'
- 现在,我们将使用以下代码从 rtsp 流中捕获帧:
import cv2
import threading
from collections import OrderedDict
from datetime import datetime, timezone
frame_buffer = OrderedDict()
frame_buffer_lock = threading.Lock()
stream = cv2.VideoCapture(rtsp_url)
def read_frames(stream):
global frames
while True:
ret, frame = stream.read()
frame_ts = datetime.now(timezone.utc).timestamp() * 1000
if ret:
with frame_buffer_lock:
while len(frame_buffer) >= frame_buffer_size:
_ = frame_buffer.popitem(last=False)
frame_buffer[frame_ts] = frame
frame_buffer_thread = threading.Thread(target=read_frames, args=(stream,))
frame_buffer_thread.start()
print('Waiting for stream initialization')
while not list(frame_buffer.keys()): pass
print('Stream Initialized')
- 从 BigQuery 表中提取数据时间戳和注解信息,并创建一个目录来存储捕获的帧图片:
from google.cloud import bigquery
import pandas as pd
client = bigquery.Client(project=PROJECT_ID)
query = f"""
SELECT MAX(ingestion_time) AS ts
FROM `{bq_table}`
"""
bq_max_ingest_ts_df = client.query(query).to_dataframe()
bq_max_ingest_epoch = str(int(bq_max_ingest_ts_df['ts'][0].timestamp()*1000000))
bq_max_ingest_ts = bq_max_ingest_ts_df['ts'][0]
print('Preparing to pull records with ingestion time >', bq_max_ingest_ts)
if not os.path.exists(bq_max_ingest_epoch):
os.makedirs(bq_max_ingest_epoch)
print('Saving output frames to', bq_max_ingest_epoch)
- 使用以下代码为帧添加注解:
import json
import base64
import numpy as np
from IPython.display import Image, display, HTML, clear_output
im_width = stream.get(cv2.CAP_PROP_FRAME_WIDTH)
im_height = stream.get(cv2.CAP_PROP_FRAME_HEIGHT)
dashdelta = datetime.now()
framedata = {}
cntext = lambda x: {y['entity']['labelString']: y['count'] for y in x}
try:
while True:
try:
annotations_df = client.query(f'''
SELECT ingestion_time, annotation
FROM `{bq_table}`
WHERE ingestion_time > TIMESTAMP("{bq_max_ingest_ts}")
''').to_dataframe()
except ValueError as e:
continue
bq_max_ingest_ts = annotations_df['ingestion_time'].max()
for _, row in annotations_df.iterrows():
with frame_buffer_lock:
frame_ts = np.asarray(list(frame_buffer.keys()))
delta_ts = np.abs(frame_ts - (row['ingestion_time'].timestamp() * 1000))
delta_tx_idx = delta_ts.argmin()
closest_ts_delta = delta_ts[delta_tx_idx]
closest_ts = frame_ts[delta_tx_idx]
if closest_ts_delta > frame_buffer_error_milliseconds: continue
image = frame_buffer[closest_ts]
annotations = json.loads(row['annotation'])
for box in annotations['identifiedBoxes']:
image = cv2.rectangle(
image,
(
int(box['normalizedBoundingBox']['xmin']*im_width),
int(box['normalizedBoundingBox']['ymin']*im_height)
),
(
int((box['normalizedBoundingBox']['xmin'] + box['normalizedBoundingBox']['width'])*im_width),
int((box['normalizedBoundingBox']['ymin'] + box['normalizedBoundingBox']['height'])*im_height)
),
(255, 0, 0), 2
)
img_filename = f"{bq_max_ingest_epoch}/{row['ingestion_time'].timestamp() * 1000}.png"
cv2.imwrite(img_filename, image)
binimg = base64.b64encode(cv2.imencode('.jpg', image)[1]).decode()
curr_framedata = {
'path': img_filename,
'timestamp_error': closest_ts_delta,
'counts': {
**{
k['annotation']['displayName'] : cntext(k['counts'])
for k in annotations['stats']["activeZoneCounts"]
},
'full-frame': cntext(annotations['stats']["fullFrameCount"])
}
}
framedata[img_filename] = curr_framedata
if (datetime.now() - dashdelta).total_seconds() > dashboard_update_delay_seconds:
dashdelta = datetime.now()
clear_output()
display(HTML(f'''
<h1>Queue Monitoring Application</h1>
<p>Live Feed of the queue camera:</p>
<p><img alt="" src="{img_filename}" style="float: left;"/></a></p>
<table border="1" cellpadding="1" cellspacing="1" style="width: 500px;">
<caption>Current Model Outputs</caption>
<thead>
<tr><th scope="row">Metric</th><th scope="col">Value</th></tr>
</thead>
<tbody>
<tr><th scope="row">Serving Area People Count</th><td>{curr_framedata['counts']['serving-zone']['Person']}</td></tr>
<tr><th scope="row">Queueing Area People Count</th><td>{curr_framedata['counts']['queue-zone']['Person']}</td></tr>
<tr><th scope="row">Total Area People Count</th><td>{curr_framedata['counts']['full-frame']['Person']}</td></tr>
<tr><th scope="row">Timestamp Error</th><td>{curr_framedata['timestamp_error']}</td></tr>
</tbody>
</table>
<p> </p>
'''))
except KeyboardInterrupt:
print('Stopping Live Monitoring')

- 使用笔记本菜单栏中的 Stop (停止)按钮停止注解任务

- 您可以使用以下代码重新访问各个帧:
from IPython.html.widgets import Layout, interact, IntSlider
imgs = sorted(list(framedata.keys()))
def loadimg(frame):
display(framedata[imgs[frame]])
display(Image(open(framedata[imgs[frame]]['path'],'rb').read()))
interact(loadimg, frame=IntSlider(
description='Frame #:',
value=0,
min=0, max=len(imgs)-1, step=1,
layout=Layout(width='100%')))

11. 恭喜
恭喜!您已完成此实验!
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
删除各个资源
资源
https://cloud.google.com/vision-ai/docs/overview
https://cloud.google.com/vision-ai/docs/occupancy-count-tutorial