1. 简介
代理到代理 (A2A) 协议旨在规范 AI 代理之间的通信,尤其是部署在外部系统中的代理。以前,为工具建立了名为 Model Context Protocol (MCP) 的此类协议,这是一种新兴标准,用于将 LLM 与数据和资源相关联。A2A 旨在补充 MCP,因为 A2A 侧重于解决其他问题,而 MCP 侧重于降低复杂性,以便将代理与工具和数据相关联,A2A 则侧重于如何让代理能够以自然的方式协作。这样,代理就可以作为代理(或用户)进行通信,而不是作为工具进行通信;例如,当您想订购商品时,支持来回沟通。
A2A 旨在对 MCP 进行补充。在官方文档中,建议应用将 A2A 代理建模为 MCP 资源(由 AgentCard 表示,我们稍后会对此进行讨论)。然后,框架可以使用 A2A 与其用户、远程代理和其他代理进行通信。
在本演示中,我们将从头开始实现仅限 A2A 的功能。基于这些示例代码库 ,我们将探索一个用例:我们有一个个人购买礼宾,可以帮助我们与汉堡和比萨卖家代理沟通,以处理我们的订单。
A2A 利用客户端-服务器原则。以下是本演示中预期的典型 A2A 流程
- A2A 客户端将首先发现所有可访问的 A2A 服务器代理卡片,并利用其信息构建连接客户端
- 必要时,A2A 客户端会将任务发送到 A2A 服务器。如果在 A2A 客户端上配置了推送通知接收器网址,A2A 服务器还可以将任务进度状态发布到接收端点
- 任务完成后,A2A 服务器将响应工件发送给 A2A 客户端
在本 Codelab 中,您将采用分步方法,具体步骤如下:
- 准备您的 Google Cloud 项目并在其中启用所有所需的 API
- 为您的编码环境设置工作区
- 为汉堡和披萨代理服务准备环境变量
- 将汉堡和披萨代理部署到 Cloud Run
- 检查有关如何建立 A2A 服务器的详细信息
- 为购买礼宾服务准备环境变量
- 将购买礼宾服务部署到 Cloud Run
- 详细了解如何建立 A2A 客户端及其数据建模
- 检查载荷以及 A2A 客户端和服务器之间的互动
架构概览
我们将部署以下服务架构
我们将部署 2 项服务来充当 A2A 服务器,即 Burger 代理(由 CrewAI 代理框架支持)和 Pizza 代理(由 Langgraph 代理框架支持)。用户将仅与购物 concierge 直接互动,该 concierge 将使用 Agent Development Kit (ADK) 框架运行,该框架将充当 A2A 客户端。
这些代理各有自己的环境和部署。
前提条件
- 熟练使用 Python
- 了解使用 HTTP 服务的基本全栈架构
学习内容
- A2A 服务器的核心结构
- A2A 客户端的核心结构
- 将服务部署到 Cloud Run
- A2A 客户端如何连接到 A2A 服务器
- 非流式连接的请求和响应结构
所需条件
- Chrome 网络浏览器
- Gmail 账号
- 启用了结算功能的 Cloud 项目
此 Codelab 面向各种级别(包括新手)的开发者,其示例应用中使用了 Python。不过,您无需了解 Python 即可理解所介绍的概念。
2. 准备工作
在 Cloud 控制台中选择“有效项目”
本 Codelab 假定您已有一个启用了结算功能的 Google Cloud 项目。如果您尚未创建,可以按照以下说明开始创建。
- 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。
- 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
在 Cloud Shell 终端中设置 Cloud 项目
- 您将使用 Cloud Shell,这是一个在 Google Cloud 中运行的命令行环境,它预加载了 bq。点击 Google Cloud 控制台顶部的“激活 Cloud Shell”。如果系统提示您授权,请点击授权
- 连接到 Cloud Shell 后,您可以使用以下命令检查自己是否已通过身份验证,以及项目是否已设置为您的项目 ID:
gcloud auth list
- 在 Cloud Shell 中运行以下命令,以确认 gcloud 命令了解您的项目。
gcloud config list project
- 如果项目未设置,请使用以下命令进行设置:
gcloud config set project <YOUR_PROJECT_ID>
或者,您也可以在控制台中查看 PROJECT_ID
ID
点击它,您会在右侧看到您的所有项目和项目 ID
- 通过以下命令启用所需的 API。此过程可能需要几分钟的时间,请耐心等待。
gcloud services enable aiplatform.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
cloudresourcemanager.googleapis.com
成功执行该命令后,您应该会看到如下所示的消息:
Operation "operations/..." finished successfully.
您可以通过控制台搜索各个产品或使用此链接,以替代 gcloud 命令。
如果缺少任何 API,您随时可以在实现过程中启用它。
如需了解 gcloud 命令和用法,请参阅文档。
前往 Cloud Shell 编辑器并设置应用工作目录
现在,我们可以设置代码编辑器来进行一些编码操作了。我们将使用 Cloud Shell Editor 来完成此操作
- 点击“打开编辑器”按钮,这会打开 Cloud Shell 编辑器,我们可以在此处编写代码
- 确保在 Cloud Shell 编辑器的左下角(状态栏)中设置 Cloud Code 项目(如下图所示),并将其设置为已启用结算功能的有效 Google Cloud 项目。在系统提示时授权。如果您已按照上一条命令操作,该按钮可能也会直接指向已启用的项目,而不是登录按钮
- 接下来,我们从 GitHub 克隆此 Codelab 的模板工作目录,运行以下命令。它将在 purchasing-concierge-a2a 目录中创建工作目录
git clone https://github.com/alphinside/purchasing-concierge-intro-a2a-codelab-starter.git purchasing-concierge-a2a
- 然后,前往 Cloud Shell 编辑器的顶部,依次点击文件->打开文件夹,找到您的用户名目录,找到 purchasing-concierge-a2a 目录,然后点击“确定”按钮。这会将所选目录设为主工作目录。在此示例中,用户名为 alvinprayuda,因此目录路径如下所示
现在,您的 Cloud Shell 编辑器应如下所示
环境设置
下一步是准备开发环境。您当前使用的终端应位于 purchasing-concierge-a2a 工作目录中。在本 Codelab 中,我们将使用 Python 3.12,并使用 uv Python 项目管理器来简化创建和管理 Python 版本和虚拟环境的流程
- 如果您尚未打开终端,请依次点击 Terminal(终端)-> New Terminal(新建终端),或使用 Ctrl + Shift + C 打开终端,它会在浏览器底部打开一个终端窗口
- 使用以下命令下载
uv
并安装 Python 3.12
curl -LsSf https://astral.sh/uv/0.7.2/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
- 现在,我们使用
uv
初始化购买礼宾的虚拟环境,运行以下命令
uv sync --frozen
这将创建 .venv 目录并安装依赖项。快速浏览 pyproject.toml 即可了解依赖项的相关信息,如下所示
dependencies = [ "google-adk>=0.3.0", "gradio>=5.28.0", "httpx>=0.28.1", "jwcrypto>=1.5.6", "pydantic>=2.10.6", "pyjwt>=2.10.1", "sse-starlette>=2.3.3", "starlette>=0.46.2", "typing-extensions>=4.13.2", "uvicorn>=0.34.0", ]
- 如需测试虚拟环境,请创建新文件 main.py 并复制以下代码
def main():
print("Hello from purchasing-concierge-a2a!")
if __name__ == "__main__":
main()
- 然后,运行以下命令
uv run main.py
您将会看到如下所示的输出
Using CPython 3.12 Creating virtual environment at: .venv Hello from purchasing-concierge-a2a!
这表明 Python 项目已正确设置。
现在,我们可以进入下一步,配置和部署远程卖方代理
3. 将 Remote Seller Agent - A2A Server 部署到 Cloud Run
在此步骤中,我们将部署这两个红框标记的远程卖方代理。汉堡智能体将由 CrewAI 智能体框架提供支持,比萨智能体将由 Langgraph 智能体提供支持,这两者都由 Gemini Flash 2.0 模型提供支持
部署远程 Burger 代理
Burger 代理源代码位于 remote_seller_agents/burger_agent 目录下。您可以在 agent.py 脚本中检查代理初始化。以下是已初始化的代理的代码段
from crewai import Agent, Crew, LLM, Task, Process
from crewai.tools import tool
...
model = LLM(
model="vertex_ai/gemini-2.0-flash", # Use base model name without provider prefix
)
burger_agent = Agent(
role="Burger Seller Agent",
goal=(
"Help user to understand what is available on burger menu and price also handle order creation."
),
backstory=("You are an expert and helpful burger seller agent."),
verbose=False,
allow_delegation=False,
tools=[create_burger_order],
llm=model,
)
agent_task = Task(
description=self.TaskInstruction,
output_pydantic=ResponseFormat,
agent=burger_agent,
expected_output=(
"A JSON object with 'status' and 'message' fields."
"Set response status to input_required if asking for user order confirmation."
"Set response status to error if there is an error while processing the request."
"Set response status to completed if the request is complete."
),
)
crew = Crew(
tasks=[agent_task],
agents=[burger_agent],
verbose=False,
process=Process.sequential,
)
inputs = {"user_prompt": query, "session_id": sessionId}
response = crew.kickoff(inputs)
...
现在,我们需要先准备 .env 变量,将 .env.example 复制到 .env 文件中
cp remote_seller_agents/burger_agent/.env.example remote_seller_agents/burger_agent/.env
现在,打开 remote_seller_agents/burger_agent/.env 文件,您会看到以下内容
AUTH_USERNAME=burgeruser123 AUTH_PASSWORD=burgerpass123 GCLOUD_LOCATION=us-central1 GCLOUD_PROJECT_ID={your-project-id}
Burger 代理 A2A 服务器使用基本 HTTP 身份验证(使用 base64 编码的用户名和密码)运行,为此演示,我们在此处的 .env 文件中配置了允许的用户名和密码。现在,请将 GCLOUD_PROJECT_ID 变量更新为您当前的有效项目 ID
别忘了保存更改,接下来我们可以直接部署服务了。我们稍后会检查代码内容。运行以下命令进行部署
gcloud run deploy burger-agent \
--source remote_seller_agents/burger_agent \
--port=8080 \
--allow-unauthenticated \
--min 1 \
--region us-central1
当系统提示您将创建一个容器仓库以便从源代码进行部署时,请回答 Y。成功部署后,它会显示如下日志。
Service [burger-agent] revision [burger-agent-xxxxx-xxx] has been deployed and is serving 100 percent of traffic. Service URL: https://burger-agent-xxxxxxxxx.us-central1.run.app
在部署服务时,此处的 xxxx
部分将成为唯一标识符。
现在,我们尝试通过浏览器访问这些已部署的 Burger 代理服务的 /.well-known/agent.json
路线,您应该会看到如下所示的输出
这是应可供发现的汉堡型客服人员卡片信息。我们稍后再讨论此功能。现在,只需记住汉堡代理服务的网址,我们稍后会用到
部署远程 Pizza 代理
同样,披萨代理源代码位于 remote_seller_agents/pizza_agent 目录下。您可以在 agent.py 脚本中检查代理初始化。以下是已初始化的代理的代码段
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
...
self.model = ChatVertexAI(
model="gemini-2.0-flash",
location=os.getenv("GCLOUD_LOCATION"),
project=os.getenv("GCLOUD_PROJECT_ID"),
)
self.tools = [create_pizza_order]
self.graph = create_react_agent(
self.model,
tools=self.tools,
checkpointer=memory,
prompt=self.SYSTEM_INSTRUCTION,
response_format=ResponseFormat,
)
...
现在,我们需要先准备 .env 变量,将 .env.example 复制到 .env 文件中
cp remote_seller_agents/pizza_agent/.env.example remote_seller_agents/pizza_agent/.env
现在,打开 remote_seller_agents/pizza_agent/.env 文件,您会看到以下内容
API_KEY=pizza123 GCLOUD_LOCATION=us-central1 GCLOUD_PROJECT_ID={your-project-id}
披萨代理 A2A 服务器使用 Bearer HTTP 身份验证(使用 API 密钥)运行,为此演示,我们在此处的 .env 文件中配置了允许的 API 密钥。现在,请将 GCLOUD_PROJECT_ID 变量更新为您当前的有效项目 ID
别忘了保存更改,接下来我们可以直接部署服务了。我们稍后会检查代码内容。运行以下命令进行部署
gcloud run deploy pizza-agent \
--source remote_seller_agents/pizza_agent \
--port=8080 \
--allow-unauthenticated \
--min 1 \
--region us-central1
成功部署后,它会显示如下日志。
Service [pizza-agent] revision [pizza-agent-xxxxx-xxx] has been deployed and is serving 100 percent of traffic. Service URL: https://pizza-agent-xxxxxxxxx.us-central1.run.app
在部署服务时,此处的 xxxx
部分将成为唯一标识符。
现在,我们尝试通过浏览器访问这些已部署的披萨代理服务的 /.well-known/agent.json
路线,您应该会看到如下所示的输出
这是应可供发现的披萨代理卡片信息。我们稍后再讨论此功能。现在,只需记住披萨代理服务的网址即可。
至此,我们已成功将汉堡和比萨服务部署到 Cloud Run。现在,我们来讨论一下 A2A 服务器的核心组件
4. A2A 服务器的核心组件
现在,我们来讨论一下 A2A 服务器的核心概念和组件
客服人员卡片
每个 A2A 服务器都必须有一个可在 /.well-known/agent.json
资源上访问的代理卡片。这是为了支持 A2A 客户端的发现阶段,该阶段应提供有关如何访问代理以及了解其所有功能的完整信息和背景信息。这与使用 Swagger 或 Postman 编写的详实 API 文档有点类似。
这是我们部署的汉堡代理客服卡片的内容
{
"name": "burger_seller_agent",
"description": "Helps with creating burger orders",
"url": "http://0.0.0.0:8080/",
"version": "1.0.0",
"capabilities": {
"streaming": false,
"pushNotifications": true,
"stateTransitionHistory": false
},
"authentication": {
"schemes": [
"Basic"
]
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "create_burger_order",
"name": "Burger Order Creation Tool",
"description": "Helps with creating burger orders",
"tags": [
"burger order creation"
],
"examples": [
"I want to order 2 classic cheeseburgers"
]
}
]
}
这些客服人员卡片突出显示了许多重要组成部分,例如客服人员技能、流式传输功能、支持的模态和身份验证。
所有这些信息都可以用于开发适当的通信机制,以便 A2A 客户端能够正常通信。支持的模态和身份验证机制可确保能够正确建立通信,并且代理 skills
信息可嵌入到 A2A 客户端系统提示中,以便向客户端的代理提供有关要调用的远程代理功能和技能的上下文。如需了解此客服人员卡片的更多详细字段,请参阅此文档。
在我们的代码中,我们使用 Pydantic 在 a2a_types.py(在 remote_seller_agents/burger_agent
或 remote_seller_agents/pizza_agent
上)上建立了客服人员卡片的实现
...
class AgentProvider(BaseModel):
organization: str
url: str | None = None
class AgentCapabilities(BaseModel):
streaming: bool = False
pushNotifications: bool = False
stateTransitionHistory: bool = False
class AgentAuthentication(BaseModel):
schemes: List[str]
credentials: str | None = None
class AgentSkill(BaseModel):
id: str
name: str
description: str | None = None
tags: List[str] | None = None
examples: List[str] | None = None
inputModes: List[str] | None = None
outputModes: List[str] | None = None
class AgentCard(BaseModel):
name: str
description: str | None = None
url: str
provider: AgentProvider | None = None
version: str
documentationUrl: str | None = None
capabilities: AgentCapabilities
authentication: AgentAuthentication | None = None
defaultInputModes: List[str] = ["text"]
defaultOutputModes: List[str] = ["text"]
skills: List[AgentSkill]
...
对象构建在 remote_seller_agents/burger_agent/__main__.py
上,如下所示
...
def main(host, port):
"""Starts the Burger Seller Agent server."""
try:
capabilities = AgentCapabilities(pushNotifications=True)
skill = AgentSkill(
id="create_burger_order",
name="Burger Order Creation Tool",
description="Helps with creating burger orders",
tags=["burger order creation"],
examples=["I want to order 2 classic cheeseburgers"],
)
agent_card = AgentCard(
name="burger_seller_agent",
description="Helps with creating burger orders",
# The URL provided here is for the sake of demo,
# in production you should use a proper domain name
url=f"http://{host}:{port}/",
version="1.0.0",
authentication=AgentAuthentication(schemes=["Basic"]),
defaultInputModes=BurgerSellerAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=BurgerSellerAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
notification_sender_auth = PushNotificationSenderAuth()
notification_sender_auth.generate_jwk()
server = A2AServer(
agent_card=agent_card,
task_manager=AgentTaskManager(
agent=BurgerSellerAgent(),
notification_sender_auth=notification_sender_auth,
),
host=host,
port=port,
auth_username=os.environ.get("AUTH_USERNAME"),
auth_password=os.environ.get("AUTH_PASSWORD"),
)
...
任务定义和任务管理器
A2A 中的核心组件之一是任务定义。它在 JSON-RPC 标准的基础上调整了载荷格式。在此演示中,我们在本部分的 a2a_types.py(remote_seller_agents/burger_agent
或 remote_seller_agents/pizza_agent
上)使用 Pydantic 实现了此功能
...
## RPC Messages
class JSONRPCMessage(BaseModel):
jsonrpc: Literal["2.0"] = "2.0"
id: int | str | None = Field(default_factory=lambda: uuid4().hex)
class JSONRPCRequest(JSONRPCMessage):
method: str
params: dict[str, Any] | None = None
...
class SendTaskRequest(JSONRPCRequest):
method: Literal["tasks/send"] = "tasks/send"
params: TaskSendParams
class SendTaskStreamingRequest(JSONRPCRequest):
method: Literal["tasks/sendSubscribe"] = "tasks/sendSubscribe"
params: TaskSendParams
class GetTaskRequest(JSONRPCRequest):
method: Literal["tasks/get"] = "tasks/get"
params: TaskQueryParams
class CancelTaskRequest(JSONRPCRequest):
method: Literal["tasks/cancel",] = "tasks/cancel"
params: TaskIdParams
class SetTaskPushNotificationRequest(JSONRPCRequest):
method: Literal["tasks/pushNotification/set",] = "tasks/pushNotification/set"
params: TaskPushNotificationConfig
class GetTaskPushNotificationRequest(JSONRPCRequest):
method: Literal["tasks/pushNotification/get",] = "tasks/pushNotification/get"
params: TaskIdParams
class TaskResubscriptionRequest(JSONRPCRequest):
method: Literal["tasks/resubscribe",] = "tasks/resubscribe"
params: TaskIdParams
...
有多种任务方法可用于支持不同类型的通信(例如同步、流式传输、异步),还可以为任务状态配置通知。A2A 服务器可以灵活配置,以处理这些任务定义标准。
A2A 服务器可能会处理来自不同代理或用户的请求,并且能够完美地隔离每个任务。如需更直观地了解这些内容的上下文,您可以查看下图
因此,每个 A2A 服务器都应能够跟踪传入任务并存储与其相关的适当信息,通常每个传入请求都将包含任务 ID 和会话 ID。在我们的代码中,此任务管理器的实现位于 remote_seller_agents/burger_agent/task_manager.py
(披萨代理也共享类似的任务管理器)
...
class AgentTaskManager(InMemoryTaskManager):
def __init__(
self,
agent: BurgerSellerAgent,
notification_sender_auth: PushNotificationSenderAuth,
):
super().__init__()
self.agent = agent
self.notification_sender_auth = notification_sender_auth
...
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
"""Handles the 'send task' request."""
validation_error = self._validate_request(request)
if validation_error:
return SendTaskResponse(id=request.id, error=validation_error.error)
await self.upsert_task(request.params)
if request.params.pushNotification:
if not await self.set_push_notification_info(
request.params.id, request.params.pushNotification
):
return SendTaskResponse(
id=request.id,
error=InvalidParamsError(
message="Push notification URL is invalid"
),
)
task = await self.update_store(
request.params.id, TaskStatus(state=TaskState.WORKING), None
)
await self.send_task_notification(task)
task_send_params: TaskSendParams = request.params
query = self._get_user_query(task_send_params)
try:
agent_response = self.agent.invoke(query, task_send_params.sessionId)
except Exception as e:
logger.error(f"Error invoking agent: {e}")
raise ValueError(f"Error invoking agent: {e}")
return await self._process_agent_response(request, agent_response)
...
async def _process_agent_response(
self, request: SendTaskRequest, agent_response: dict
) -> SendTaskResponse:
"""Processes the agent's response and updates the task store."""
task_send_params: TaskSendParams = request.params
task_id = task_send_params.id
history_length = task_send_params.historyLength
task_status = None
parts = [{"type": "text", "text": agent_response["content"]}]
artifact = None
if agent_response["require_user_input"]:
task_status = TaskStatus(
state=TaskState.INPUT_REQUIRED,
message=Message(role="agent", parts=parts),
)
else:
task_status = TaskStatus(state=TaskState.COMPLETED)
artifact = Artifact(parts=parts)
task = await self.update_store(
task_id, task_status, None if artifact is None else [artifact]
)
task_result = self.append_task_history(task, history_length)
await self.send_task_notification(task)
return SendTaskResponse(id=request.id, result=task_result)
...
从上面的代码中,我们可以检查到,在处理传入任务时(当传入请求方法为 tasks/send
时,系统会执行方法 on_send_task
),它会执行更新任务存储区(self.update_store
方法调用)和发送通知(self.send_task_notification
方法调用)的多个操作。这是 A2A 服务器如何在整个同步发送任务请求期间管理任务状态更新和通知的示例之一。
摘要
简而言之,到目前为止,我们部署的 A2A 服务器可以支持以下 2 项功能:
- 在
/.well-known/agent.json
路线上发布代理卡片 - 使用方法
tasks/send
处理 JSON-RPC 请求
您可以在 main.py 脚本(remote_seller_agents/burger_agent
或 remote_seller_agents/pizza_agent
上)检查启动这些功能的入口点。我们可以看到,我们需要先配置 Agent 卡片,然后在以下代码段中启动服务器
...
capabilities = AgentCapabilities(pushNotifications=True)
skill = AgentSkill(
id="create_pizza_order",
name="Pizza Order Creation Tool",
description="Helps with creating pizza orders",
tags=["pizza order creation"],
examples=["I want to order 2 pepperoni pizzas"],
)
agent_card = AgentCard(
name="pizza_seller_agent",
description="Helps with creating pizza orders",
# The URL provided here is for the sake of demo,
# in production you should use a proper domain name
url=f"http://{host}:{port}/",
version="1.0.0",
authentication=AgentAuthentication(schemes=["Bearer"]),
defaultInputModes=PizzaSellerAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=PizzaSellerAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
...
server = A2AServer(
agent_card=agent_card,
task_manager=AgentTaskManager(
agent=PizzaSellerAgent(),
notification_sender_auth=notification_sender_auth,
),
host=host,
port=port,
api_key=os.environ.get("API_KEY"),
)
...
logger.info(f"Starting server on {host}:{port}")
server.start()
...
5. 将 Purchasing Concierge - A2A 客户端部署到 Cloud Run
在此步骤中,我们将部署购买礼宾客服人员。我们将与此客服人员互动。
购买礼宾客服人员的源代码位于 purchasing_concierge 目录下。您可以在 purchasing_agent.py 脚本中检查代理初始化。以下是已初始化的代理的代码段。
from google.adk import Agent
...
def create_agent(self) -> Agent:
return Agent(
model="gemini-2.0-flash-001",
name="purchasing_agent",
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
"This purchasing agent orchestrates the decomposition of the user purchase request into"
" tasks that can be performed by the seller agents."
),
tools=[
self.list_remote_agents,
self.send_task,
],
)
...
现在,我们需要先准备 .env 变量,将 .env.example 复制到 .env 文件中
cp purchasing_concierge/.env.example purchasing_concierge/.env
现在,打开 purchasing_concierge/.env 文件,您会看到以下内容
PIZZA_SELLER_AGENT_AUTH=pizza123 PIZZA_SELLER_AGENT_URL=http://localhost:10000 BURGER_SELLER_AGENT_AUTH=burgeruser123:burgerpass123 BURGER_SELLER_AGENT_URL=http://localhost:10001 GOOGLE_GENAI_USE_VERTEXAI=TRUE GOOGLE_CLOUD_PROJECT={your-project-id} GOOGLE_CLOUD_LOCATION=us-central1
此代理将与汉堡代理和披萨代理进行通信,因此我们需要为这两个代理提供适当的凭据。现在,请将 GCLOUD_PROJECT_ID 变量更新为您当前的有效项目 ID。
现在,我们还需要将 PIZZA_SELLER_AGENT_网址 和 BURGER_SELLER_AGENT_网址 更新为上一步中的 Cloud Run 网址。如果您忘记了,我们可以访问 Cloud Run 控制台。在控制台顶部的搜索栏中输入“Cloud Run”,然后右键点击 Cloud Run 图标,在新标签页中打开该服务
您应该会看到我们之前部署的远程卖方代理服务,如下所示
现在,点击其中一个服务即可查看其公开网址,您会被重定向到“服务详情”页面。您可以在顶部区域的“地区”信息旁边看到该网址
将此网址的值复制并粘贴到 PIZZA_SELLER_AGENT_网址 和 BURGER_SELLER_AGENT_网址 中。
最终的环境变量应如下所示
PIZZA_SELLER_AGENT_AUTH=pizza123 PIZZA_SELLER_AGENT_URL=https://pizza-agent-xxxxx.us-central1.run.app BURGER_SELLER_AGENT_AUTH=burgeruser123:burgerpass123 BURGER_SELLER_AGENT_URL=https://burger-agent-xxxxx.us-central1.run.app GOOGLE_GENAI_USE_VERTEXAI=TRUE GOOGLE_CLOUD_PROJECT={your-project-id} GOOGLE_CLOUD_LOCATION=us-central1
现在,我们可以部署购买礼宾客服人员了。如需部署此代理,请运行以下命令
gcloud run deploy purchasing-concierge \
--source . \
--port=8080 \
--allow-unauthenticated \
--min 1 \
--region us-central1 \
--memory 1024Mi
成功部署后,它会显示如下日志。
Service [purchasing-concierge] revision [purchasing-concierge-xxxxx-xxx] has been deployed and is serving 100 percent of traffic. Service URL: https://purchasing-concierge-xxxxxx.us-central1.run.app
在部署服务时,此处的 xxxx
部分将成为唯一标识符。
现在,我们可以尝试通过界面与购买礼宾客服人员互动。访问服务网址后,您应该会看到 Gradio 网络界面,如下所示
现在,我们来讨论一下 A2A 客户端的核心组件和典型流程。
6. A2A 客户端的核心组件
上图显示了 A2A 互动典型流程:
- 客户端会尝试在路由
/.well-known/agent.json
中提供的远程代理网址中查找任何已发布的代理卡片 - 然后,在必要时,它会向该代理发送任务,其中包含消息和必要的元数据参数(例如会话 ID、历史背景等)
- A2A 服务器将对请求进行身份验证并进行处理;如果服务器支持推送通知,则还会在任务处理期间尝试发布一些通知
- 完成后,A2A 服务器将响应工件发送回客户端
上述互动的主要对象是以下各项(如需了解详情,请点击此处):
- 任务:有状态的实体,可让客户端和远程代理实现特定结果并生成结果
- 工件:任务的最终结果
- 消息:任何不是工件的内容。例如,客服人员的想法、用户情境、说明、错误、状态或元数据
- 部分:客户端与远程代理之间作为消息或工件的一部分交换的完整内容。部分可以是文本、图片、视频、文件等。
- 推送通知(可选):一种安全的通知机制,代理可以通过该机制在未连接的会话之外通知客户端有更新
卡片发现
启动 A2A 客户端服务时,典型流程是尝试获取代理卡片信息并将其存储起来,以便在需要时轻松访问。我们可以在 purchasing_concierge/purchasing_agent.py 脚本中查看
...
class PurchasingAgent:
"""The purchasing agent.
This is the agent responsible for choosing which remote seller agents to send
tasks to and coordinate their work.
"""
def __init__(
self,
remote_agent_addresses: List[str],
task_callback: TaskUpdateCallback | None = None,
):
self.task_callback = task_callback
self.remote_agent_connections: dict[str, RemoteAgentConnections] = {}
self.cards: dict[str, AgentCard] = {}
for address in remote_agent_addresses:
card_resolver = A2ACardResolver(address)
try:
card = card_resolver.get_agent_card()
# The URL accessed here should be the same as the one provided in the agent card
# However, in this demo we are using the URL provided in the key arguments
remote_connection = RemoteAgentConnections(
agent_card=card, agent_url=address
)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
except httpx.ConnectError:
print(f"ERROR: Failed to get agent card from : {address}")
agent_info = []
for ra in self.list_remote_agents():
agent_info.append(json.dumps(ra))
self.agents = "\n".join(agent_info)
...
提示和发送任务工具
然后,我们会在购买礼宾系统提示中提供远程客服人员上下文,还会提供用于将任务发送给客服人员的工具。这是我们向 ADK 客服人员提供的提示和工具
...
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_active_agent(context)
return f"""You are an expert purchasing delegator that can delegate the user product inquiry and purchase request to the
appropriate seller remote agents.
Execution:
- For actionable tasks, you can use `send_task` to assign tasks to remote agents to perform.
- When the remote agent is repeatedly asking for user confirmation, assume that the remote agent doesn't have access to user's conversation context.
So improve the task description to include all the necessary information related to that agent
- Never ask user permission when you want to connect with remote agents. If you need to make connection with multiple remote agents, directly
connect with them without asking user permission or asking user preference
- Always show the detailed response information from the seller agent and propagate it properly to the user.
- If the remote seller is asking for confirmation, rely the confirmation question to the user if the user haven't do so.
- If the user already confirmed the related order in the past conversation history, you can confirm on behalf of the user
- Do not give irrelevant context to remote seller agent. For example, ordered pizza item is not relevant for the burger seller agent
- Never ask order confirmation to the remote seller agent
Please rely on tools to address the request, and don't make up the response. If you are not sure, please ask the user for more details.
Focus on the most recent parts of the conversation primarily.
If there is an active agent, send the request to that agent with the update task tool.
Agents:
{self.agents}
Current active seller agent: {current_agent["active_agent"]}
"""
...
async def send_task(self, agent_name: str, task: str, tool_context: ToolContext):
"""Sends a task to remote seller agent
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
task: The comprehensive conversation context summary
and goal to be achieved regarding user inquiry and purchase request.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
if agent_name not in self.remote_agent_connections:
raise ValueError(f"Agent {agent_name} not found")
state = tool_context.state
state["active_agent"] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f"Client not available for {agent_name}")
if "task_id" in state:
taskId = state["task_id"]
else:
taskId = str(uuid.uuid4())
sessionId = state["session_id"]
task: Task
messageId = ""
metadata = {}
if "input_message_metadata" in state:
metadata.update(**state["input_message_metadata"])
if "message_id" in state["input_message_metadata"]:
messageId = state["input_message_metadata"]["message_id"]
if not messageId:
messageId = str(uuid.uuid4())
metadata.update(**{"conversation_id": sessionId, "message_id": messageId})
request: TaskSendParams = TaskSendParams(
id=taskId,
sessionId=sessionId,
message=Message(
role="user",
parts=[TextPart(text=task)],
metadata=metadata,
),
acceptedOutputModes=["text", "text/plain"],
# pushNotification=None,
metadata={"conversation_id": sessionId},
)
task = await client.send_task(request, self.task_callback)
# Assume completion unless a state returns that isn't complete
state["session_active"] = task.status.state not in [
TaskState.COMPLETED,
TaskState.CANCELED,
TaskState.FAILED,
TaskState.UNKNOWN,
]
if task.status.state == TaskState.INPUT_REQUIRED:
# Force user input back
tool_context.actions.escalate = True
elif task.status.state == TaskState.COMPLETED:
# Reset active agent is task is completed
state["active_agent"] = "None"
response = []
if task.status.message:
# Assume the information is in the task message.
response.extend(convert_parts(task.status.message.parts, tool_context))
if task.artifacts:
for artifact in task.artifacts:
response.extend(convert_parts(artifact.parts, tool_context))
return response
...
在提示中,我们会向购买礼宾客服人员提供所有可用的远程客服人员的名称和说明,并在工具 self.send_task
中提供一种机制来检索适当的客户端以连接到客服人员,并使用 TaskSendParams
对象发送所需的元数据。
在该工具中,我们还可以指定在任务因某种原因而无法完成时,代理的行为方式。最后,我们需要处理任务完成时返回的响应工件
7. 集成测试和载荷检查
现在,我们来试试以下对话,并检查我们的购买礼宾界面和服务日志。尝试进行如下对话:
- 给我看汉堡和披萨菜单
- 我想订购 1 个烧烤鸡披萨和 1 个辣味卡真汉堡
然后继续对话,直到完成订单。检查互动情况,以及工具调用和响应情况。下图显示了互动结果示例。
我们可以看到,与 2 个不同的代理进行通信会产生 2 种不同的行为,而 A2A 可以很好地处理这种情况。汉堡卖方代理直接接受我们的采购代理请求,而比萨代理需要先获得我们的确认,然后才能处理我们的请求;在我们确认后,代理可以将确认信息提供给比萨代理
现在,我们来看看 purchasing-agent 服务日志中交换的数据。首先,我们来访问 Cloud Run 控制台,在控制台顶部的搜索栏中输入“Cloud Run”,然后右键点击 Cloud Run 图标,在新的浏览器标签页中打开
现在,您应该会看到之前部署的服务,如下所示。点击 purchasing-concierge
现在,您将进入“服务详情”页面,然后点击日志标签页
现在,我们将查看部署的 purchasing-concierge 服务的日志。请尝试向下滚动,找到与我们互动时的近期日志
您会发现,A2A 客户端和服务器之间的请求和响应将采用 JSON-RPC 格式,并符合 A2A 标准。
现在,我们已经介绍完了 A2A 的基本概念,接下来我们来看看它是如何作为客户端和服务器架构实现的
8. 清理
为避免系统因本 Codelab 中使用的资源向您的 Google Cloud 账号收取费用,请按照以下步骤操作: