高级 RAG 技术

1. 简介

概览

检索增强生成 (RAG) 通过将大语言模型 (LLM) 的回答建立在外部知识的基础之上,增强了 LLM 的回答。但是,构建可用于生产用途的 RAG 系统不仅需要简单的向量搜索。 您还必须优化数据的提取方式、相关结果的排名方式以及用户查询的处理方式。

在这个全面的实验中,您将使用 Cloud SQL for PostgreSQL(通过 pgvector 扩展)和 Vertex AI 构建强大的 RAG 应用。您将学习三种高级技术:

  1. 分块策略: 您将观察不同的文本拆分方法(字符、递归、令牌)如何影响检索质量。
  2. 重新排名: 您将实现 Vertex AI Reranker 来优化搜索结果并解决“中间丢失”问题。
  3. 查询转换: 您将使用 Gemini 通过 HyDE (假设文档嵌入)和回溯提示 等技术来优化用户查询。

您将执行的操作

  • 设置具有 pgvector 的 Cloud SQL for PostgreSQL 实例。
  • 构建数据注入流水线,该流水线使用多种策略对文本进行分块,并将嵌入存储在 Cloud SQL 中。
  • 执行语义搜索,并比较不同分块方法的结果质量。
  • 集成 Reranker 以根据相关性对检索到的文档重新排序。
  • 实现由 LLM 提供支持的查询转换,以改进对模糊或复杂问题的检索。

学习内容

  • 如何将 LangChainVertex AICloud SQL 搭配使用。
  • 字符递归令牌文本拆分器的影响。
  • 如何在 PostgreSQL 中实现向量搜索
  • 如何使用 ContextualCompressionRetriever 进行重新排名。
  • 如何实现 HyDE回溯提示

2. 项目设置

Google 账号

如果您还没有个人 Google 账号,则必须创建一个 Google 账号

请使用个人账号 ,而不是工作账号或学校账号。

登录 Google Cloud 控制台

使用个人 Google 账号登录 Google Cloud 控制台

启用结算功能

兑换 Google Cloud 赠金(可选)

如需运行此讲习班,您需要一个具有一定赠金的结算账号。使用此 Codelab 顶部横幅中的赠金开始。如果您已关联到结算账号,则可以跳过此步骤。

设置个人结算账号

如果您使用 Google Cloud 赠金设置结算功能,则可以跳过此步骤。

如需设置个人结算账号,请前往此处在 Cloud 控制台中启用结算功能

一些注意事项:

  • 完成此实验的 Cloud 资源费用应低于 1 美元。
  • 您可以按照本实验末尾的步骤删除资源,以避免产生进一步的费用。
  • 新用户有资格参与 300 美元免费试用计划

创建项目(可选)

如果您没有要用于此实验的当前项目,请在此处创建一个新项目

3. 打开 Cloud Shell Editor

  1. 点击此链接可直接前往 Cloud Shell Editor
  2. 如果系统在今天的任何时间提示您授权,请点击授权 以继续。点击以授权 Cloud Shell
  3. 如果终端未显示在屏幕底部,请打开它:
    • 点击查看
    • 点击终端在 Cloud Shell 编辑器中打开新终端
  4. 在终端中,使用以下命令设置项目:
    gcloud config set project [PROJECT_ID]
    
    • 示例:
      gcloud config set project lab-project-id-example
      
    • 如果您不记得项目 ID,可以使用以下命令列出所有项目 ID:
      gcloud projects list
      
      在 Cloud Shell 编辑器终端中设置项目 ID
  5. 您应会看到以下消息:
    Updated property [core/project].
    

4. 启用 API

如需构建此解决方案,您需要为 Vertex AI、Cloud SQL 和重新排名服务启用多个 Google Cloud API。

  1. 在终端中,启用以下 API:
    gcloud services enable \
      aiplatform.googleapis.com \
      sqladmin.googleapis.com \
      cloudresourcemanager.googleapis.com \
      serviceusage.googleapis.com \
      discoveryengine.googleapis.com
    
    
    

API 简介

  • Vertex AI API (aiplatform.googleapis.com):支持使用 Gemini 进行生成,并使用 Vertex AI Embeddings 对文本进行向量化。
  • Cloud SQL Admin API (sqladmin.googleapis.com):允许您以编程方式管理 Cloud SQL 实例。
  • Discovery Engine API (discoveryengine.googleapis.com):为 Vertex AI Reranker 功能提供支持。
  • Service Usage API (serviceusage.googleapis.com):用于检查和管理服务配额。

5. 创建虚拟环境并安装依赖项

在开始任何 Python 项目之前,最好创建一个虚拟环境。这样可以隔离项目的依赖项,防止与其他项目或系统的全局 Python 软件包发生冲突。

  1. 创建一个名为 rag-labs 的文件夹,然后进入该文件夹。在终端 中运行以下代码:
    mkdir rag-labs && cd rag-labs
    
  2. 创建并激活虚拟环境:
    uv venv --python 3.12
    source .venv/bin/activate
    
  3. 创建一个包含必要依赖项的 requirements.txt 文件。在终端 中运行以下代码:
    cloudshell edit requirements.txt
    
  4. 将以下优化 后的依赖项粘贴到 requirements.txt 中。这些版本已固定,以避免冲突并加快安装速度。
    # Core LangChain & AI
    langchain-community==0.3.31
    langchain-google-vertexai==2.1.2
    langchain-google-community[vertexaisearch]==2.0.10
    
    # Google Cloud
    google-cloud-storage==2.19.0
    google-cloud-aiplatform[langchain]==1.130.0
    
    # Database
    cloud-sql-python-connector[pg8000]==1.19.0
    sqlalchemy==2.0.45
    pgvector==0.4.2
    
    # Utilities
    tiktoken==0.12.0
    python-dotenv==1.2.1
    requests==2.32.5
    
  5. 安装依赖项:
    uv pip install -r requirements.txt
    

6. 设置 Cloud SQL for PostgreSQL

在此任务中,您将预配 Cloud SQL for PostgreSQL 实例,创建数据库,并为向量搜索做好准备。

定义 Cloud SQL 配置

  1. 创建一个 .env 文件来存储配置。在终端 中运行以下代码:
    cloudshell edit .env
    
  2. 将以下配置粘贴到 .env 中。
    # Project Config
    PROJECT_ID="[YOUR_PROJECT_ID]"
    REGION="us-central1"
    
    # Database Config
    SQL_INSTANCE_NAME="rag-pg-instance-1"
    SQL_DATABASE_NAME="rag_harry_potter_db"
    SQL_USER="rag_user"
    SQL_PASSWORD="StrongPassword123!" 
    
    # RAG Config
    PGVECTOR_COLLECTION_NAME="rag_harry_potter"
    RANKING_LOCATION_ID="global"
    
    # Connection Name (Auto-generated in scripts usually, but useful to have)
    DB_INSTANCE_CONNECTION_NAME="${PROJECT_ID}:${REGION}:${SQL_INSTANCE_NAME}"
    
  3. [YOUR_PROJECT_ID] 替换为您的实际 Google Cloud 项目 ID。(例如 PROJECT_ID = "google-cloud-labs"
    如果您不记得项目 ID,请在终端 中运行以下命令。它会显示所有项目及其 ID 的列表。
    gcloud projects list
    
  4. 将变量加载到 shell 会话中:
    source .env
    

创建实例和数据库

  1. 创建一个 Cloud SQL for PostgreSQL 实例。此命令会创建一个适合本实验的小型实例。
    gcloud sql instances create ${SQL_INSTANCE_NAME} \
      --database-version=POSTGRES_15 \
      --tier=db-g1-small \
      --region=${REGION} \
      --project=${PROJECT_ID}
    
  2. 实例准备就绪后,创建数据库:
    gcloud sql databases create ${SQL_DATABASE_NAME} \
      --instance=${SQL_INSTANCE_NAME} \
      --project=${PROJECT_ID}
    
  3. 创建数据库用户:
    gcloud sql users create ${SQL_USER} \
      --instance=${SQL_INSTANCE_NAME} \
      --password=${SQL_PASSWORD} \
      --project=${PROJECT_ID}
    

启用 pgvector 扩展程序

pgvector 扩展程序允许 PostgreSQL 存储和搜索向量嵌入。您必须在数据库中明确启用它。

  1. 创建一个名为 enable_pgvector.py 的脚本。在终端 中运行以下代码:
    cloudshell edit enable_pgvector.py
    
  2. 将以下代码粘贴到 enable_pgvector.py 中。此脚本会连接到您的数据库并运行 CREATE EXTENSION IF NOT EXISTS vector;
    import os
    import sqlalchemy
    from google.cloud.sql.connector import Connector, IPTypes
    import logging
    from dotenv import load_dotenv
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Config
    project_id = os.getenv("PROJECT_ID")
    region = os.getenv("REGION")
    instance_name = os.getenv("SQL_INSTANCE_NAME")
    db_user = os.getenv("SQL_USER")
    db_pass = os.getenv("SQL_PASSWORD")
    db_name = os.getenv("SQL_DATABASE_NAME")
    instance_connection_name = f"{project_id}:{region}:{instance_name}"
    
    def getconn():
        with Connector() as connector:
            conn = connector.connect(
                instance_connection_name,
                "pg8000",
                user=db_user,
                password=db_pass,
                db=db_name,
                ip_type=IPTypes.PUBLIC,
            )
            return conn
    
    def enable_pgvector():
        pool = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
        )
        with pool.connect() as db_conn:
            # Check if extension exists
            result = db_conn.execute(sqlalchemy.text("SELECT extname FROM pg_extension WHERE extname = 'vector';")).fetchone()
            if result:
                logging.info("pgvector extension is already enabled.")
            else:
                logging.info("Enabling pgvector extension...")
                db_conn.execute(sqlalchemy.text("CREATE EXTENSION IF NOT EXISTS vector;"))
                db_conn.commit()
                logging.info("pgvector extension enabled successfully.")
    
    if __name__ == "__main__":
        enable_pgvector()
    
  3. 运行脚本:
    python enable_pgvector.py
    

7. 第 1 部分:分块策略

任何 RAG 流水线的第一步都是将文档转换为 LLM 可以理解的格式:

LLM 有上下文窗口限制(它们一次可以处理的文本量)。此外,检索 50 页的文档来回答特定问题会稀释信息。我们将文档拆分为较小的“块”,以隔离相关信息。

但是,您拆分文本的 方式非常重要:

  • 字符拆分器: 严格按字符数拆分。这种方法速度很快,但风险很高;它可能会将字词或句子拆成两半,从而破坏语义。
  • 递归拆分器: 尝试先按段落拆分,然后按句子拆分,最后按字词拆分。它会尝试将语义单元保留在一起。
  • 令牌拆分器: 根据 LLM 自己的词汇表(令牌)拆分。这样可以确保块完美地适应上下文窗口,但生成时计算成本可能更高。

在本部分中,您将使用所有三种策略提取相同的数据,以便进行比较。

创建提取脚本

您将使用一个脚本下载哈利·波特数据集,使用字符递归令牌 策略对其进行拆分,并将嵌入上传到 Cloud SQL 中的三个单独的表中。

  1. 创建文件 ingest_data.py
    cloudshell edit ingest_data.py
    
  2. 将以下固定 代码粘贴到 ingest_data.py 中。此版本可以正确解析数据集的 JSON 结构。
    import os
    import json
    import logging
    import requests
    from typing import List, Dict, Any
    from dotenv import load_dotenv
    
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter, TokenTextSplitter
    from langchain.docstore.document import Document
    
    load_dotenv()
    logging.basicConfig(level=logging.INFO)
    
    # Configuration
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    BOOKS_JSON_URL = "https://storage.googleapis.com/github-repo/generative-ai/gemini/reasoning-engine/sample_data/harry_potter_books.json"
    
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    MAX_DOCS_TO_PROCESS = 10 
    
    # Database Connector
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def download_data():
        logging.info(f"Downloading data from {BOOKS_JSON_URL}...")
        response = requests.get(BOOKS_JSON_URL)
        return response.json()
    
    def prepare_chunks(json_data, strategy):
        documents = []
    
        # Iterate through the downloaded data
        for entry in json_data[:MAX_DOCS_TO_PROCESS]:
    
            # --- JSON PARSING LOGIC ---
            # The data structure nests content inside 'kwargs' -> 'page_content'
            if "kwargs" in entry and "page_content" in entry["kwargs"]:
                content = entry["kwargs"]["page_content"]
    
                # Extract metadata if available, ensuring it's a dict
                metadata = entry["kwargs"].get("metadata", {})
                if not isinstance(metadata, dict):
                    metadata = {"source": "unknown"}
    
                # Add the strategy to metadata for tracking
                metadata["strategy"] = strategy
            else:
                continue
    
            if not content:
                continue
    
            # Choose the splitter based on the strategy
            if strategy == "character":
                splitter = CharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, separator="\n")
            elif strategy == "token":
                splitter = TokenTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
            else: # default to recursive
                splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
    
            # Split the content into chunks
            chunks = splitter.split_text(content)
    
            # Create Document objects for each chunk
            for chunk in chunks:
                documents.append(Document(page_content=chunk, metadata=metadata))
    
        return documents
    
    def main():
        logging.info("Initializing Embeddings...")
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
    
        data = download_data()
        strategies = ["character", "recursive", "token"]
    
        # Connection string for PGVector (uses the getconn helper)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        for strategy in strategies:
            collection_name = f"{BASE_COLLECTION_NAME}_{strategy}"
            logging.info(f"--- Processing strategy: {strategy.upper()} ---")
            logging.info(f"Target Collection: {collection_name}")
    
            # Prepare documents with the specific strategy
            docs = prepare_chunks(data, strategy)
    
            if not docs:
                logging.warning(f"No documents generated for strategy {strategy}. Check data source.")
                continue
    
            logging.info(f"Generated {len(docs)} chunks. Uploading to Cloud SQL...")
    
            # Initialize the Vector Store
            store = PGVector(
                collection_name=collection_name,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn},
                pre_delete_collection=True # Clears old data for this collection before adding new
            )
    
            # Batch add documents
            store.add_documents(docs)
            logging.info(f"Successfully finished {strategy}.\n")
    
    if __name__ == "__main__":
        main()
    
  3. 运行提取脚本。这会将您的数据库填充到三个不同的表(集合)中。
    python ingest_data.py
    

比较分块结果

现在数据已加载完毕,让我们针对所有三个集合运行查询,看看分块策略如何影响结果。

  1. 创建 query_chunking.py
    cloudshell edit query_chunking.py
    
  2. 将以下代码粘贴到 query_chunking.py 中:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR) # Only show errors to keep output clean
    
    # Config
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    BASE_COLLECTION_NAME = os.getenv("PGVECTOR_COLLECTION_NAME")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        query = "Tell me about the Dursleys and their relationship with Harry Potter"
        print(f"\nQUERY: {query}\n" + "="*50)
    
        strategies = ["character", "recursive", "token"]
    
        for strategy in strategies:
            collection = f"{BASE_COLLECTION_NAME}_{strategy}"
            print(f"\nSTRATEGY: {strategy.upper()}")
    
            store = PGVector(
                collection_name=collection,
                embedding_function=embeddings,
                connection_string=pg_conn_str,
                engine_args={"creator": getconn}
            )
    
            results = store.similarity_search_with_score(query, k=2)
            for i, (doc, score) in enumerate(results):
                print(f"  Result {i+1} (Score: {score:.4f}): {doc.page_content[:150].replace(chr(10), ' ')}...")
    
    if __name__ == "__main__":
        main()
    
  3. 运行查询脚本:
    python query_chunking.py
    

观察输出。

请注意,字符 拆分可能会在句子中间截断句子,而递归 会尝试遵循段落边界。令牌 拆分可确保块完美地适应 LLM 上下文窗口,但可能会忽略语义结构。

8. 第 2 部分:重新排名

向量搜索(检索)速度非常快,因为它依赖于压缩的数学表示法(嵌入)。它会广泛撒网以确保召回率(找到所有可能相关的项),但通常会受到精确率 低的影响(这些项的排名可能不完美)。

通常,相关文档会“丢失”在结果列表的中间。如果 LLM 只关注前 5 个结果,可能会错过位于第 7 位的关键答案。

重新排名 通过添加第二阶段来解决此问题。

  1. 检索器: 使用快速向量搜索提取更大的集合(例如前 25 个)。
  2. Reranker: 使用专用模型(如 Cross-Encoder)检查查询和文档对的全文。它速度较慢,但准确率要高得多。它会重新对前 25 个结果进行评分,并返回绝对最佳的 3 个结果。

在此任务中,您将搜索在第 1 部分中创建的 recursive 集合,但这次您将应用 Vertex AI Reranker 来优化结果。

  1. 创建 query_reranking.py
    cloudshell edit query_reranking.py
    
  2. 粘贴以下代码。请注意,它如何明确以 _recursive 集合为目标,并使用 ContextualCompressionRetriever
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings
    from langchain_community.vectorstores import PGVector
    
    # Reranking Imports
    from langchain.retrievers import ContextualCompressionRetriever
    from langchain_google_community.vertex_rank import VertexAIRank
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    
    # IMPORTANT: Target the recursive collection created in ingest_data.py
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    RANKING_LOCATION = os.getenv("RANKING_LOCATION_ID")
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
    
        print(f"Connecting to collection: {COLLECTION_NAME}")
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
    
        query = "What are the Horcruxes?"
        print(f"QUERY: {query}\n")
    
        # 1. Base Retriever (Vector Search) - Fetch top 10
        base_retriever = store.as_retriever(search_kwargs={"k": 10})
    
        # 2. Reranker - Select top 3 from the 10
        reranker = VertexAIRank(
            project_id=PROJECT_ID,
            location_id=RANKING_LOCATION,
            ranking_config="default_ranking_config",
            title_field="source",
            top_n=3
        )
    
        compression_retriever = ContextualCompressionRetriever(
            base_compressor=reranker,
            base_retriever=base_retriever
        )
    
        # Execute
        try:
            reranked_docs = compression_retriever.invoke(query)
    
            if not reranked_docs:
                print("No documents returned. Check if the collection exists and is populated.")
    
            print(f"--- Top 3 Reranked Results ---")
            for i, doc in enumerate(reranked_docs):
                print(f"Result {i+1} (Score: {doc.metadata.get('relevance_score', 'N/A')}):")
                print(f"  {doc.page_content[:200]}...\n")
        except Exception as e:
            print(f"Error during reranking: {e}")
    
    if __name__ == "__main__":
        main()
    
  3. 运行重新排名查询:
    python query_reranking.py
    

观察

您可能会注意到,与原始向量搜索相比,相关性得分更高或排序不同。这样可以确保 LLM 收到尽可能精确的上下文。

9. 第 3 部分:查询转换

通常,RAG 的最大瓶颈是用户。用户查询通常是模糊不清、不完整或措辞不当的。如果查询嵌入在数学上与文档嵌入不一致,则检索会失败。

查询转换 使用 LLM 在查询到达数据库之前重写或扩展查询。 您将实现两种技术:

  • HyDE(假设文档嵌入): 问题与答案之间的向量相似性通常低于答案与假设答案之间的相似性。 HyDE 要求 LLM 虚构一个完美的答案,嵌入该答案,并搜索看起来像虚构答案的文档。
  • 回溯提示: 如果用户提出具体的详细问题,系统可能会错过更广泛的上下文。回溯提示要求 LLM 生成更高级别的抽象问题(“这个家族的历史是什么?”),以便检索基础信息以及具体详细信息。
  1. 创建 query_transformation.py
    cloudshell edit query_transformation.py
    
  2. 粘贴以下代码:
    import os
    import logging
    from dotenv import load_dotenv
    from google.cloud.sql.connector import Connector, IPTypes
    from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
    from langchain_community.vectorstores import PGVector
    from langchain_core.prompts import PromptTemplate
    
    load_dotenv()
    logging.basicConfig(level=logging.ERROR)
    
    PROJECT_ID = os.getenv("PROJECT_ID")
    REGION = os.getenv("REGION")
    DB_USER = os.getenv("SQL_USER")
    DB_PASS = os.getenv("SQL_PASSWORD")
    DB_NAME = os.getenv("SQL_DATABASE_NAME")
    INSTANCE_CONNECTION_NAME = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"
    
    def getconn():
        with Connector() as connector:
            return connector.connect(
                INSTANCE_CONNECTION_NAME,
                "pg8000",
                user=DB_USER,
                password=DB_PASS,
                db=DB_NAME,
                ip_type=IPTypes.PUBLIC,
            )
    
    def generate_hyde_doc(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a concise, hypothetical answer to the question. Question: {question} Answer:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def generate_step_back(query, llm):
        prompt = PromptTemplate(
            input_variables=["question"],
            template="Write a more general, abstract question that concepts in this question. Original: {question} Step-back:"
        )
        chain = prompt | llm
        return chain.invoke({"question": query})
    
    def main():
        embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)
        llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.5)
    
        pg_conn_str = f"postgresql+pg8000://{DB_USER}:{DB_PASS}@placeholder/{DB_NAME}"
        store = PGVector(
            collection_name=COLLECTION_NAME,
            embedding_function=embeddings,
            connection_string=pg_conn_str,
            engine_args={"creator": getconn}
        )
        retriever = store.as_retriever(search_kwargs={"k": 2})
    
        original_query = "Tell me about the Dursleys."
        print(f"ORIGINAL QUERY: {original_query}\n" + "-"*30)
    
        # 1. HyDE
        hyde_doc = generate_hyde_doc(original_query, llm)
        print(f"HyDE Generated Doc: {hyde_doc.strip()[:100]}...")
        hyde_results = retriever.invoke(hyde_doc)
        print(f"HyDE Retrieval: {hyde_results[0].page_content[:100]}...\n")
    
        # 2. Step-back
        step_back_q = generate_step_back(original_query, llm)
        print(f"Step-back Query: {step_back_q.strip()}")
        step_results = retriever.invoke(step_back_q)
        print(f"Step-back Retrieval: {step_results[0].page_content[:100]}...")
    
    if __name__ == "__main__":
        main()
    
  3. 运行转换脚本:
    python query_transformation.py
    

观察输出。

请注意,回溯 查询可能会检索有关德思礼家族历史的更广泛的上下文,而 HyDE 则侧重于假设答案中生成的具体详细信息。

10. 第 4 部分:端到端生成

我们已经对数据进行了分块、优化了搜索并完善了用户查询。现在,我们终于在 RAG 中加入了“G”:生成

到目前为止,我们只是在查找信息。 如需构建真正的 AI 助理,我们需要将这些高质量的重新排名文档馈送到 LLM (Gemini) 中,以合成自然语言答案。

在生产流水线中,这涉及一个特定的流程:

  1. 检索: 使用快速向量搜索获取广泛的候选集(例如前 10 个)。
  2. 重新排名: 使用 Vertex AI Reranker 过滤到绝对最佳结果(例如前 3 个)。
  3. 上下文构建: 将前 3 个文档的内容拼接成一个字符串。
  4. 接地提示: 将该上下文字符串插入到严格的提示模板中,该模板会强制 LLM 仅使用该信息。

创建生成脚本

我们将使用 gemini-2.5-flash 进行生成步骤。此模型非常适合 RAG,因为它具有较长的上下文窗口和较低的延迟时间,因此可以快速处理多个检索到的文档。

  1. 创建 end_to_end_rag.py
cloudshell edit end_to_end_rag.py
  1. 粘贴以下代码。请注意 template 变量,我们在此处严格指示模型通过将其绑定到提供的上下文来避免“幻觉”(编造内容)。
import os
import logging
from dotenv import load_dotenv
from google.cloud.sql.connector import Connector, IPTypes
from langchain_google_vertexai import VertexAIEmbeddings, VertexAI
from langchain_community.vectorstores import PGVector
from langchain.retrievers import ContextualCompressionRetriever
from langchain_google_community.vertex_rank import VertexAIRank
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

load_dotenv()
logging.basicConfig(level=logging.ERROR)

PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")
# We use the recursive collection as it generally provides the best context boundaries
COLLECTION_NAME = f"{os.getenv('PGVECTOR_COLLECTION_NAME')}_recursive"

def getconn():
    instance_conn = f"{PROJECT_ID}:{REGION}:{os.getenv('SQL_INSTANCE_NAME')}"
    with Connector() as connector:
        return connector.connect(
            instance_conn, "pg8000",
            user=os.getenv("SQL_USER"), password=os.getenv("SQL_PASSWORD"),
            db=os.getenv("SQL_DATABASE_NAME"), ip_type=IPTypes.PUBLIC
        )

def main():
    print("--- Initializing Production RAG Pipeline ---")

    # 1. Setup Embeddings (Gemini Embedding 001)
    # We use this to vectorize the user's query to match our database.
    embeddings = VertexAIEmbeddings(model_name="gemini-embedding-001", project=PROJECT_ID, location=REGION)

    # 2. Connect to Vector Store
    pg_conn_str = f"postgresql+pg8000://{os.getenv('SQL_USER')}:{os.getenv('SQL_PASSWORD')}@placeholder/{os.getenv('SQL_DATABASE_NAME')}"
    store = PGVector(
        collection_name=COLLECTION_NAME,
        embedding_function=embeddings,
        connection_string=pg_conn_str,
        engine_args={"creator": getconn}
    )

    # 3. Setup The 'Filter Funnel' (Retriever + Reranker)
    # Step A: Fast retrieval of top 10 similar documents
    base_retriever = store.as_retriever(search_kwargs={"k": 10})

    # Step B: Precise reranking to find the top 3 most relevant
    reranker = VertexAIRank(
        project_id=PROJECT_ID,
        location_id="global", 
        ranking_config="default_ranking_config",
        title_field="source",
        top_n=3
    )

    # Combine A and B into a single retrieval object
    compression_retriever = ContextualCompressionRetriever(
        base_compressor=reranker,
        base_retriever=base_retriever
    )

    # 4. Setup LLM (Gemini 2.5 Flash)
    # We use a low temperature (0.1) to reduce creativity and increase factual adherence.
    llm = VertexAI(model_name="gemini-2.5-flash", project=PROJECT_ID, location=REGION, temperature=0.1)

    # --- Execution Loop ---
    user_query = "Who is Harry Potter?"
    print(f"\nUser Query: {user_query}")
    print("Retrieving and Reranking documents...")

    # Retrieve the most relevant documents
    top_docs = compression_retriever.invoke(user_query)

    if not top_docs:
        print("No relevant documents found.")
        return

    # Build the Context String
    # We stitch the documents together, labeling them as Source 1, Source 2, etc.
    context_str = "\n\n".join([f"Source {i+1}: {d.page_content}" for i, d in enumerate(top_docs)])

    print(f"Found {len(top_docs)} relevant context chunks.")

    # 5. The Grounded Prompt
    template = """You are a helpful assistant. Answer the question strictly based on the provided context.
    If the answer is not in the context, say "I don't know."

    Context:
    {context}

    Question:
    {question}

    Answer:
    """

    prompt = PromptTemplate(template=template, input_variables=["context", "question"])

    # Create the chain: Prompt -> LLM
    chain = prompt | llm

    print("Generating Answer via Gemini 2.5 Flash...")
    final_answer = chain.invoke({"context": context_str, "question": user_query})

    print(f"\nFINAL ANSWER:\n{final_answer}")

if __name__ == "__main__":
    main()
  1. 运行最终应用:
python end_to_end_rag.py

了解输出

运行此脚本时,请观察原始检索到的块(您在之前的步骤中看到过)与最终答案之间的差异。LLM 充当合成器,它会读取 Reranker 提供的文本片段“块”,并将它们平滑地转换为连贯的、人类可读的句子。

通过链接这些组件,您可以从随机“猜测”转变为确定性的接地工作流。检索器 撒网,Reranker 选择最佳捕获,生成器 烹制美食。

11. 总结

恭喜!您已成功构建了一个高级 RAG 流水线,该流水线远远超出了基本的向量搜索范围。

回顾

  • 您配置了 Cloud SQL with pgvector 以实现可扩缩的向量存储。
  • 您比较了分块策略 ,以了解数据准备如何影响检索。
  • 您使用 Vertex AI 实现了重新排名 ,以提高结果的精确率。
  • 您利用了查询转换 (HyDE、回溯)来使用户意图与您的数据保持一致。

了解详情

从原型设计到投入生产

本实验是利用 Google Cloud 构建可用于生产用途的 AI 学习路线 的一部分。

  • **探索完整课程** ,弥合从原型设计到投入生产的差距。
  • 使用 #ProductionReadyAI 主题标签分享您的进度