使用 Spanner 和 Vertex AI 进行相似性搜索

1. 简介

近年来,随着深度学习的飞速发展,我们能够以能够捕获语义含义的方式表示文本和其他数据。这催生了一种称为“向量搜索”的新搜索方法,该方法使用文本的向量表示(称为嵌入)来查找与用户查询最相关的文档。对于服装搜索等应用,矢量搜索优于传统搜索,在传统搜索中,用户通常会按商品的说明、款式或上下文来搜索商品,而不是按确切的商品或品牌名称。我们可以将 Cloud Spanner 数据库与矢量搜索集成,以执行向量相似度匹配。通过将 Spanner 和 Vector Search 结合使用,客户可以打造出强大的集成,将 Spanner 的可用性、可靠性和规模与 Vertex AI Vector Search 的高级相似度搜索功能融为一体。通过比较向量搜索索引中项的嵌入并返回最相似的匹配项来执行此搜索。

使用场景

假设您是一家时装零售商的数据科学家,希望跟上快速变化的趋势、商品搜索和推荐情况。挑战在于,您的资源和数据孤岛非常有限。本博文演示了如何针对服装数据使用相似度搜索方法实现服装推荐用例。涵盖以下步骤:

  1. 数据来源于 Spanner
  2. 使用 ML.PREDICT 为服装数据生成并存储在 Spanner 中的向量
  3. 使用 Dataflow 和工作流作业与 Vector Search 集成的 Spanner 矢量数据
  4. 执行向量搜索,以查找用户输入的相似度匹配项

我们将构建一个演示 Web 应用,以根据用户输入文本执行服装搜索。该应用允许用户通过输入文字说明来搜索服装。

Spanner 到矢量搜索索引:

服装搜索的数据存储在 Spanner 中。我们将直接从 Spanner 数据调用 ML.PREDICT 结构中的 Vertex AI Embeddings API。然后,我们将利用 Dataflow 和工作流作业,将这些数据(目录和嵌入)批量上传到 Vertex AI 的矢量搜索中并刷新索引。

对索引运行用户查询:

当用户输入服饰说明时,应用会使用 Text Embeddings API 实时生成嵌入。然后,系统会将这些信息作为输入发送到 Vector Search API,以便从索引中查找 10 条相关商品说明,并显示相应的图片。

架构概览

以下包含 2 个部分的示意图显示了 Spanner 矢量搜索应用的架构:

Spanner 到矢量搜索索引: a79932a25bee23a4.png

客户端应用对索引运行用户查询

b2b4d5a5715bd4c4.png要构建的内容

Spanner 到矢量索引:

  • Spanner 数据库,用于存储和管理源数据及相应嵌入
  • 将数据(ID 和嵌入)批量上传到 Vertex AI Vector Search 数据库的工作流作业。
  • 用于从索引中查找相关商品说明的 Vector Search API。

对索引运行用户查询:

  • 允许用户输入服装文本说明、使用部署的索引端点执行相似度搜索并将距离最近的服装返回到输入的 Web 应用。

运作原理

当用户输入服装的文字说明时,Web 应用会将该说明发送到 Vector Search API。然后,Vector Search API 使用嵌入的服饰说明从索引中查找最相关的商品说明。产品说明和相应的图片随后将显示给用户。一般工作流程如下:

  1. 为存储在 Spanner 中的数据生成嵌入
  2. 将嵌入导出并上传到矢量搜索索引中。
  3. 通过执行最近邻搜索来查询相似项的矢量搜索索引。

2. 要求

  • 一个浏览器,例如 ChromeFirefox
  • 启用了结算功能的 Google Cloud 项目

准备工作

  1. Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能
  3. 确保已启用所有必要的 API(Cloud Spanner、Vertex AI、Google Cloud Storage)
  4. 您将使用 Cloud Shell,这是一个在 Google Cloud 中运行的命令行环境,它预先加载了 gcloud。如需了解 gcloud 命令和用法,请参阅文档。如果项目未设置,请使用以下命令进行设置:
gcloud config set project <YOUR_PROJECT_ID>
  1. 导航到包含您的活跃 Google Cloud 项目的 Cloud Spanner 页面以开始

3. 后端:创建 Spanner 数据源和嵌入

在这个使用场景中,Spanner 数据库会存储包含相应图片和说明的服装商品目录。请务必为文本说明生成嵌入,并将其作为 ARRAY<float64> 存储在 Spanner 数据库中。

  1. 创建 Spanner 数据

创建一个名为“spanner-vertex”的实例和一个名为“spanner-vertex-embeddings”的数据库。使用 DDL 创建表:

CREATE TABLE
  apparels ( id NUMERIC,
    category STRING(100),
    sub_category STRING(50),
    uri STRING(200),
    content STRING(2000),
    embedding ARRAY<FLOAT64>
    )
PRIMARY KEY
  (id);
  1. 使用 INSERT SQL 将数据插入表中

此处提供了用于示例数据的插入脚本。

  1. 创建文本嵌入模型

这是必需的,这样我们才能为输入中的内容生成嵌入。以下是相同的 DDL:

CREATE MODEL text_embeddings INPUT(content STRING(MAX))
OUTPUT(
  embeddings
    STRUCT<
      statistics STRUCT<truncated BOOL, token_count FLOAT64>,
      values ARRAY<FLOAT64>>
)
REMOTE OPTIONS (
  endpoint = '//aiplatform.googleapis.com/projects/abis-345004/locations/us-central1/publishers/google/models/textembedding-gecko');
  1. 为源数据生成文本嵌入

创建一个表来存储嵌入并插入生成的嵌入。在真实的数据库应用中,直到第 2 步的数据加载到 Spanner 都是事务性的。为了保持设计最佳实践的完整性,我倾向于使事务表保持规范化,因此为嵌入创建一个单独的表。

CREATE TABLE apparels_embeddings (id string(100), embedding ARRAY<FLOAT64>)
PRIMARY KEY (id);

INSERT INTO apparels_embeddings(id, embeddings) 
SELECT CAST(id as string), embeddings.values
FROM ML.PREDICT(
  MODEL text_embeddings,
  (SELECT id, content from apparels)
) ;

现在批量内容和嵌入已准备就绪,让我们创建一个向量搜索索引和端点来存储有助于执行向量搜索的嵌入。

4. 工作流作业:将 Spanner 数据导出到矢量搜索

  1. 创建 Cloud Storage 存储分区

必须执行此操作,才能以 Vector Search 预计会作为输入的 JSON 格式将来自 Spanner 的嵌入存储在 GCS 存储分区中。在 Spanner 中的数据所在的区域中创建存储分区。根据需要在内部创建一个文件夹,但主要在其中创建一个名为 empty.json 的空文件。

  1. 设置 Cloud Workflow

如需设置从 Spanner 到 Vertex AI Vector Search 索引的批量导出,请执行以下操作:

创建空索引

确保矢量搜索索引与 Cloud Storage 存储分区和数据位于同一区域。按照“管理索引”页面中为批量更新创建索引部分,按照控制台标签页下的说明完成 11 个步骤。在传递给 contentDeltaUri 的文件夹中,创建一个名为 empty.json 的空文件,因为如果没有此文件,您将无法创建索引。这将创建一个空索引。

如果您已有索引,则可以跳过此步骤。工作流将覆盖您的索引。

注意:您无法将空索引部署到端点。因此,在将矢量数据导出到 Cloud Storage 后,我们将将其部署到端点的步骤推迟到后续步骤。

克隆此 Git 代码库:克隆 Git 代码库有多种方法,一种方法是使用 GitHub CLI 运行以下命令。从 Cloud Shell 终端运行以下 2 条命令:

gh repo clone cloudspannerecosystem/spanner-ai

cd spanner-ai/vertex-vector-search/workflows

此文件夹包含两个文件

  • batch-export.yaml:这是工作流定义。
  • sample-batch-input.json:这是工作流输入参数的示例。

通过示例文件设置 input.json:首先,复制示例 json。

cp sample-batch-input.json input.json

然后,使用项目详细信息修改 input.json。在本例中,您的 JSON 应如下所示:

{
  "project_id": "<<YOUR_PROJECT>>",
  "location": "<<us-central1>>",
  "dataflow": {
    "temp_location": "gs://<<YOUR_BUCKET>>/<<FOLDER_IF_ANY>>/workflow_temp"
  },
  "gcs": {
    "output_folder": "gs://<<YOUR_BUCKET>>/<<FOLDER_IF_ANY>>/workflow_output"
  },
  "spanner": {
    "instance_id": "spanner-vertex",
    "database_id": "spanner-vertex-embeddings",
    "table_name": "apparels_embeddings",
    "columns_to_export": "embedding,id"
  },
  "vertex": {
    "vector_search_index_id": "<<YOUR_INDEX_ID>>"
  }
}

设置权限

对于生产环境,我们强烈建议您创建新的服务账号,并向其授予一个或多个 IAM 角色(这些角色包含管理服务所需的最低权限)。设置工作流以将数据从 Spanner(嵌入)导出到矢量搜索索引需要以下角色:

Cloud Workflow 服务账号

默认使用 Compute Engine 默认服务账号

如果您使用手动配置的服务账号,则必须添加以下角色:

如需触发 Dataflow 作业,请执行以下操作:Dataflow 管理员、Dataflow 工作器。

如需模拟 Dataflow 工作器服务账号,请执行以下操作:Service Account User

如需写入日志,请执行以下操作:Logs Writer

如需触发 Vertex AI Vector Search 重新构建,请执行以下操作:Vertex AI User

Dataflow 工作器服务账号

如果您使用手动配置的服务账号,则必须添加以下角色:

如需管理 Dataflow,请执行以下操作:Dataflow 管理员Dataflow 工作器。如需从 Spanner 读取数据:Cloud Spanner Database Reader。拥有对所选 GCS Container Registry 的写入权限:GCS Storage Bucket Owner

  1. 部署 Cloud Workflow

将工作流 yaml 文件部署到您的 Google Cloud 项目。您可以配置在执行工作流时将在其中运行工作流的区域或位置。

gcloud workflows deploy vector-export-workflow --source=batch-export.yaml --location="us-central1" [--service account=<service_account>]

or 

gcloud workflows deploy vector-export-workflow --source=batch-export.yaml --location="us-central1"

该工作流现在应该会显示在 Google Cloud 控制台的“工作流”页面上。

注意:您还可以通过 Google Cloud 控制台创建和部署工作流。按照 Cloud 控制台中的提示操作。对于工作流定义,请复制并粘贴 batch-export.yaml 的内容。

此操作完成后,请执行工作流,以便开始导出数据。

  1. 执行 Cloud 工作流

运行以下命令以执行工作流:

gcloud workflows execute vector-export-workflow --data="$(cat input.json)"

该执行作业应显示在 Workflows 的“执行”标签页中。这应该会将您的数据加载到矢量搜索数据库中,并将其编入索引。

注意:您也可以使用“执行”按钮从控制台执行。按照提示操作,对于输入内容,请复制并粘贴您自定义的 input.json 的内容。

5. 部署矢量搜索索引

将索引部署到端点

您可以按照以下步骤部署索引:

  1. Vector Search indexes 页面上,您应该会在上一节的第 2 步中创建的索引旁边看到一个“部署”按钮。或者,您也可以前往索引信息页,然后点击“部署至端点”按钮。
  2. 提供必要信息并将索引部署到端点。

或者,您可以查看此笔记本,以将其部署到端点(跳至此笔记本的部署部分)。部署后,记下已部署的索引 ID 和端点网址。

6. 前端:从用户数据到矢量搜索

我们来构建一个简单的 Python 应用,并提供由 gradio 提供支持的用户体验,以便快速测试我们的实现:您可以参阅此处的实现,以在自己的 Colab 笔记本中实现此演示版应用。

  1. 我们将使用 aiplatform python SDK 调用 Embeddings API,以及调用矢量搜索索引端点。
# [START aiplatform_sdk_embedding]
!pip install google-cloud-aiplatform==1.35.0 --upgrade --quiet --user


import vertexai
vertexai.init(project=PROJECT_ID, location="us-central1")


from vertexai.language_models import TextEmbeddingModel


import sys
if "google.colab" in sys.modules:
    # Define project information
    PROJECT_ID = " "  # Your project id
    LOCATION = " "  # Your location 


    # Authenticate user to Google Cloud
    from google.colab import auth
    auth.authenticate_user()
  1. 我们将使用 gradio 演示我们通过界面快速而轻松地构建的 AI 应用。请先重启运行时,然后再实现此步骤。
!pip install gradio
import gradio as gr
  1. 在用户输入内容时从 Web 应用中调用 Embeddings API,我们将使用文本嵌入模型:textembedding-gecko@latest

以下方法会调用文本嵌入模型,并返回用户输入的文本的向量嵌入:

def text_embedding(content) -> list:
    """Text embedding with a Large Language Model."""
    model = TextEmbeddingModel.from_pretrained("textembedding-gecko@latest")
    embeddings = model.get_embeddings(content)
    for embedding in embeddings:
        vector = embedding.values
        #print(f"Length of Embedding Vector: {len(vector)}")
    return vector

测试应用

text_embedding("red shorts for girls")

您应该会看到如下所示的输出(请注意,图片的高度会被剪裁,因此您无法看到完整的矢量响应):

5d8355ec04dac1f9

  1. 声明已部署的索引 ID 和端点 ID
from google.cloud import aiplatform
DEPLOYED_INDEX_ID = "spanner_vector1_1702366982123"
#Vector Search Endpoint
index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/273845608377/locations/us-central1/indexEndpoints/2021628049526620160')
  1. 定义向量搜索方法,以调用索引端点,并显示对应于用户输入文本的嵌入响应的 10 个最接近的匹配项。

在下面的矢量搜索方法定义中,请注意,系统会调用 find_neighbors 方法来识别 10 个最近的矢量。

def vector_search(content) -> list:
  result = text_embedding(content)
  #call_vector_search_api(content)
  index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/273845608377/locations/us-central1/indexEndpoints/2021628049526620160')
  # run query
  response = index_endpoint.find_neighbors(
      deployed_index_id = DEPLOYED_INDEX_ID,
      queries = [result],
      num_neighbors = 10
  )
  out = []
  # show the results
  for idx, neighbor in enumerate(response[0]):
      print(f"{neighbor.distance:.2f} {spanner_read_data(neighbor.id)}")
      out.append(f"{spanner_read_data(neighbor.id)}")
  return out

您还会看到对 spanner_read_data 方法的调用。我们将在下一步中对此进行探讨。

  1. 定义 Spanner 读取数据方法实现,该实现将调用 execute_sql 方法,以提取与上一步返回的最近邻向量的 ID 对应的图片。
!pip install google-cloud-spanner==3.36.0


from google.cloud import spanner


instance_id = "spanner-vertex"
database_id = "spanner-vertex-embeddings"
projectId = PROJECT_ID
client = spanner.Client()
client.project = projectId
instance = client.instance(instance_id)
database = instance.database(database_id)
def spanner_read_data(id):
    query = "SELECT uri FROM apparels where id = " + id
    outputs = []
    with database.snapshot() as snapshot:
        results = snapshot.execute_sql(query)


        for row in results:
            #print(row)
            #output = "ID: {}, CONTENT: {}, URI: {}".format(*row)
            output = "{}".format(*row)
            outputs.append(output)


    return "\n".join(outputs)

它应该返回与所选矢量对应的图片的网址。

  1. 最后,我们将这几部分放在界面中并触发矢量搜索流程
from PIL import Image


def call_search(query):
  response = vector_search(query)
  return response


input_text = gr.Textbox(label="Enter your query. Examples: Girls Tops White Casual, Green t-shirt girls, jeans shorts, denim skirt etc.")
output_texts = [gr.Image(label="") for i in range(10)]
demo = gr.Interface(fn=call_search, inputs=input_text, outputs=output_texts, live=True)
resp = demo.launch(share = True)

您应该会看到如下所示的结果:

8093b39fbab1a9cc

图片: 链接

请点击此处观看结果视频。

7. 清理

为避免系统因本博文中使用的资源向您的 Google Cloud 账号收取费用,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,前往管理资源页面。
  2. 在项目列表中,选择要删除的项目,然后点击“删除”。
  3. 在对话框中输入项目 ID,然后点击“关停”以删除项目。
  4. 如果您不想删除项目,请删除 Spanner 实例,方法是导航到您刚刚为此项目创建的实例,然后点击实例概览页面右上角的“删除实例”按钮。
  5. 您还可以导航到 Vector Search 索引,取消部署端点和索引,并删除索引。

8. 总结

恭喜!您已成功完成 Spanner - Vertex Vector Search 实现,

  1. 为来自 Spanner 数据库的应用创建 Spanner 数据源和嵌入。
  2. 正在创建 Vector Search 数据库索引。
  3. 使用 Dataflow 和工作流作业将 Spanner 中的矢量数据集成到矢量搜索。
  4. 将索引部署到端点。
  5. 最后,在由 Python 提供支持的 Vertex AI SDK 实现中,对用户输入调用矢量搜索。

您可以随意将实现扩展到您自己的用例,也可以使用新功能即刻改进当前用例。如需详细了解 Spanner 的机器学习功能,请点击此处