1. 簡介
代理程式對代理程式 (A2A) 通訊協定旨在標準化 AI 代理程式之間的通訊,特別是部署在外部系統中的代理程式。先前,這類協定是為工具建立,稱為模型上下文協定 (MCP),這是一種新興標準,可將 LLM 與資料和資源連結。A2A 會嘗試補足 MCP 的不足之處,因為 A2A 著重於解決不同的問題。MCP 著重於降低複雜度,讓服務專員能與工具和資料連結,而 A2A 則著重於讓服務專員以自然的模式進行協作。這可讓服務專員以服務專員 (或使用者) 的身分進行通訊,而非以工具的身分進行通訊。舉例來說,如果您想訂購商品,可以啟用雙向通訊功能。
A2A 可用於補足 MCP,在官方文件中,建議應用程式將 A2A 代理程式模擬為 MCP 資源,以 AgentCard 表示 ( 我們稍後會討論這項功能)。接著,架構就能使用 A2A 與使用者、遠端代理程式和其他代理程式通訊。
在這個示範中,我們會從頭開始實作 A2A。我們將從這些範例存放區 中衍生出一個用途,即當我們有個人購物專員時,他們可以協助我們與漢堡和披薩賣家的服務專員聯絡,處理我們的訂單。
A2A 採用用戶端-伺服器原則。以下是我們在本示範中預期的典型 A2A 流程
- A2A 用戶端會先探索所有可存取的 A2A 伺服器代理程式卡片,並利用這些資訊建立連線用戶端
- 必要時,A2A 用戶端會將工作傳送至 A2A 伺服器。如果在 A2A 用戶端上設定推播通知接收器網址,A2A 伺服器也能將工作進度的狀態發布至接收端
- 工作完成後,A2A 伺服器會將回應構件傳送至 A2A 用戶端
您將透過程式碼研究室,按照以下步驟操作:
- 準備 Google Cloud 專案,並啟用其中所有必要的 API
- 設定程式設計環境的工作區
- 為漢堡和披薩代理程式服務準備環境變數
- 將漢堡和披薩代理程式部署至 Cloud Run
- 檢查 A2A 伺服器建立方式的詳細資料
- 為購買服務人員準備環境變數
- 將購買服務專員部署至 Cloud Run
- 檢查 A2A 用戶端的建立方式和資料建模
- 檢查 A2A 用戶端與伺服器之間的酬載和互動
架構總覽
我們將部署下列服務架構
我們將部署 2 項服務,分別是做為 A2A 伺服器的 Burger 代理程式 ( 由 CrewAI 代理程式架構支援) 和 Pizza 代理程式 ( 由 Langgraph 代理程式架構支援)。使用者只會直接與購物服務櫃檯互動,該服務櫃檯會使用代理程式開發套件 (ADK) 架構執行,並充當 A2A 用戶端。
每個代理程式都會擁有自己的環境和部署作業。
必要條件
- 熟悉 Python 作業
- 瞭解使用 HTTP 服務的基本全堆疊架構
課程內容
- A2A Server 的核心結構
- A2A 用戶端的核心結構
- 將服務部署至 Cloud Run
- A2A 用戶端連線至 A2A 伺服器的方式
- 非串流連線的請求和回應結構
軟硬體需求
- Chrome 網路瀏覽器
- Gmail 帳戶
- 已啟用計費功能的 Cloud 專案
本程式碼研究室專為各級別 (包括初學者) 的開發人員設計,範例應用程式會使用 Python。不過,您不必具備 Python 知識,也能瞭解本文介紹的概念。
2. 事前準備
在 Cloud 控制台中選取有效專案
本程式碼研究室假設您已擁有已啟用計費功能的 Google Cloud 專案。如果您尚未安裝,可以按照下列說明操作。
- 在 Google Cloud 控制台的專案選取器頁面中,選取或建立 Google Cloud 專案。
- 確認 Cloud 專案已啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。
在 Cloud Shell 終端機中設定 Cloud 專案
- 您將使用 Cloud Shell,這是在 Google Cloud 中運作的指令列環境,並已預先載入 bq。按一下 Google Cloud 主控台頂端的「啟用 Cloud Shell」按鈕。如果系統提示您授權,請點選「授權」
- 連線至 Cloud Shell 後,請使用下列指令確認您已通過驗證,且專案已設為您的專案 ID:
gcloud auth list
- 在 Cloud Shell 中執行下列指令,確認 gcloud 指令知道您的專案。
gcloud config list project
- 如果未設定專案,請使用下列指令進行設定:
gcloud config set project <YOUR_PROJECT_ID>
或者,您也可以在控制台中查看 PROJECT_ID
ID
按一下該按鈕,所有專案和專案 ID 就會顯示在右側
- 請透過下列指令啟用必要的 API。這可能需要幾分鐘的時間,請耐心等候。
gcloud services enable aiplatform.googleapis.com \
run.googleapis.com \
cloudbuild.googleapis.com \
cloudresourcemanager.googleapis.com
指令執行成功後,您應該會看到類似以下的訊息:
Operation "operations/..." finished successfully.
您可以透過主控台搜尋每項產品,或使用這個連結,來代替 gcloud 指令。
如果遺漏任何 API,您隨時可以在導入期間啟用。
如要瞭解 gcloud 指令和用法,請參閱說明文件。
前往 Cloud Shell 編輯器並設定應用程式工作目錄
我們現在可以設定程式碼編輯器,執行一些程式設計作業。我們將使用 Cloud Shell 編輯器進行這項操作
- 按一下「開啟編輯器」按鈕,即可開啟 Cloud Shell 編輯器,並在其中編寫程式碼
- 請確認 Cloud Code 專案已設在 Cloud Shell 編輯器的左下角 (狀態列),如下圖所示,並設為已啟用計費功能的有效 Google Cloud 專案。如果出現提示訊息,請點按「授權」。如果您已按照先前的指令操作,按鈕可能會直接指向已啟用的專案,而非登入按鈕
- 接下來,我們將從 GitHub 複製本程式碼研究室的範本工作目錄,並執行下列指令。系統會在 purchasing-concierge-a2a 目錄中建立工作目錄
git clone https://github.com/alphinside/purchasing-concierge-intro-a2a-codelab-starter.git purchasing-concierge-a2a
- 接著,前往 Cloud Shell 編輯器的頂端部分,依序點選「File」>「Open Folder」,找出您的 使用者名稱目錄,然後找出「purchasing-concierge-a2a」目錄,然後按一下「OK」按鈕。這會將所選目錄設為主要工作目錄。在本範例中,使用者名稱為 alvinprayuda,因此目錄路徑如下所示
您的 Cloud Shell 編輯器現在應如下所示:
環境設定
下一個步驟是準備開發環境。目前的有效終端機應位於 purchasing-concierge-a2a 工作目錄中。我們會在本程式碼研究室中使用 Python 3.12,並使用 uv Python 專案管理工具,簡化 Python 版本和虛擬環境的建立和管理作業
- 如果尚未開啟終端機,請依序點選「Terminal」->「New Terminal」,或使用「Ctrl + Shift + C」鍵盤快速鍵,在瀏覽器底部開啟終端機視窗
- 下載
uv
,然後使用下列指令安裝 Python 3.12
curl -LsSf https://astral.sh/uv/0.7.2/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
- 接下來,我們將使用
uv
初始化購物專員的虛擬環境,請執行以下指令
uv sync --frozen
這會建立 .venv 目錄並安裝依附元件。快速查看 pyproject.toml 可提供依附元件相關資訊,如下所示:
dependencies = [ "google-adk>=0.3.0", "gradio>=5.28.0", "httpx>=0.28.1", "jwcrypto>=1.5.6", "pydantic>=2.10.6", "pyjwt>=2.10.1", "sse-starlette>=2.3.3", "starlette>=0.46.2", "typing-extensions>=4.13.2", "uvicorn>=0.34.0", ]
- 如要測試虛擬環境,請建立新檔案 main.py,並複製下列程式碼
def main():
print("Hello from purchasing-concierge-a2a!")
if __name__ == "__main__":
main()
- 然後執行下列指令
uv run main.py
您會看到如下所示的輸出內容
Using CPython 3.12 Creating virtual environment at: .venv Hello from purchasing-concierge-a2a!
這表示 Python 專案已正確設定。
接下來,我們可以繼續進行下一個步驟,設定及部署遠端賣家服務專員
3. 將遠端賣家代理程式 - A2A 伺服器部署至 Cloud Run
在這個步驟中,我們會部署這兩個以紅色方塊標示的遠端賣家服務代理人。漢堡代理程式將採用 CrewAI 代理程式架構,而披薩代理程式則採用 Langgraph 代理程式,兩者皆採用 Gemini Flash 2.0 模型
部署遠端 Burger 代理程式
漢堡代理程式原始碼位於 remote_seller_agents/burger_agent 目錄下。您可以在 agent.py 指令碼中檢查代理程式初始化程序。以下是已初始化的代理程式程式碼片段
from crewai import Agent, Crew, LLM, Task, Process
from crewai.tools import tool
...
model = LLM(
model="vertex_ai/gemini-2.0-flash", # Use base model name without provider prefix
)
burger_agent = Agent(
role="Burger Seller Agent",
goal=(
"Help user to understand what is available on burger menu and price also handle order creation."
),
backstory=("You are an expert and helpful burger seller agent."),
verbose=False,
allow_delegation=False,
tools=[create_burger_order],
llm=model,
)
agent_task = Task(
description=self.TaskInstruction,
output_pydantic=ResponseFormat,
agent=burger_agent,
expected_output=(
"A JSON object with 'status' and 'message' fields."
"Set response status to input_required if asking for user order confirmation."
"Set response status to error if there is an error while processing the request."
"Set response status to completed if the request is complete."
),
)
crew = Crew(
tasks=[agent_task],
agents=[burger_agent],
verbose=False,
process=Process.sequential,
)
inputs = {"user_prompt": query, "session_id": sessionId}
response = crew.kickoff(inputs)
...
接下來,我們需要先準備 .env 變數,請將 .env.example 複製到 .env 檔案中
cp remote_seller_agents/burger_agent/.env.example remote_seller_agents/burger_agent/.env
接著,開啟 remote_seller_agents/burger_agent/.env 檔案,您會看到下列內容
AUTH_USERNAME=burgeruser123 AUTH_PASSWORD=burgerpass123 GCLOUD_LOCATION=us-central1 GCLOUD_PROJECT_ID={your-project-id}
漢堡代理程式 A2A 伺服器會使用 Basic HTTP 驗證 ( 使用 Base64 編碼的使用者名稱和密碼) 執行,為了方便示範,我們會在 .env 檔案中設定允許的使用者名稱和密碼。請將 GCLOUD_PROJECT_ID 變數更新為目前的有效專案 ID
別忘了儲存變更,接下來我們可以直接部署服務。我們稍後會檢查程式碼內容。執行下列指令來部署
gcloud run deploy burger-agent \
--source remote_seller_agents/burger_agent \
--port=8080 \
--allow-unauthenticated \
--min 1 \
--region us-central1
當系統提示您將建立容器存放區,以便從原始碼進行部署時,請回答「Y」。部署成功後,系統會顯示類似以下的記錄。
Service [burger-agent] revision [burger-agent-xxxxx-xxx] has been deployed and is serving 100 percent of traffic. Service URL: https://burger-agent-xxxxxxxxx.us-central1.run.app
這裡的 xxxx
部分會成為部署服務時的專屬 ID。
接著,我們透過瀏覽器嘗試使用這些已部署的漢堡代理程式服務的 /.well-known/agent.json
路徑,您應該會看到類似以下的輸出內容
這是漢堡代理程式資訊卡,應可供探索使用。我們稍後會討論這項功能。目前請記住漢堡代理程式服務的網址,我們稍後會使用這個網址
部署遠端 Pizza 代理程式
同樣地,pizza 代理程式原始碼位於 remote_seller_agents/pizza_agent 目錄下。您可以在 agent.py 指令碼中檢查代理程式初始化程序。以下是已初始化的代理程式程式碼片段
from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent
...
self.model = ChatVertexAI(
model="gemini-2.0-flash",
location=os.getenv("GCLOUD_LOCATION"),
project=os.getenv("GCLOUD_PROJECT_ID"),
)
self.tools = [create_pizza_order]
self.graph = create_react_agent(
self.model,
tools=self.tools,
checkpointer=memory,
prompt=self.SYSTEM_INSTRUCTION,
response_format=ResponseFormat,
)
...
接下來,我們需要先準備 .env 變數,請將 .env.example 複製到 .env 檔案中
cp remote_seller_agents/pizza_agent/.env.example remote_seller_agents/pizza_agent/.env
接著,開啟 remote_seller_agents/pizza_agent/.env 檔案,您會看到以下內容
API_KEY=pizza123 GCLOUD_LOCATION=us-central1 GCLOUD_PROJECT_ID={your-project-id}
披薩代理程式 A2A Server 會使用 Bearer HTTP 驗證 (使用 API 金鑰) 執行,為了方便本示範,我們會在 .env 檔案中設定允許的 API 金鑰。請將 GCLOUD_PROJECT_ID 變數更新為目前的有效專案 ID
別忘了儲存變更,接下來我們可以直接部署服務。我們稍後會檢查程式碼內容。執行下列指令來部署
gcloud run deploy pizza-agent \
--source remote_seller_agents/pizza_agent \
--port=8080 \
--allow-unauthenticated \
--min 1 \
--region us-central1
部署成功後,系統會顯示類似以下的記錄。
Service [pizza-agent] revision [pizza-agent-xxxxx-xxx] has been deployed and is serving 100 percent of traffic. Service URL: https://pizza-agent-xxxxxxxxx.us-central1.run.app
這裡的 xxxx
部分會成為部署服務時的專屬 ID。
接下來,我們透過瀏覽器嘗試使用這些已部署的 Pizza 代理程式服務的 /.well-known/agent.json
路徑,您應該會看到類似以下的輸出內容
這是 pizza 代理商資訊卡資訊,應可供探索使用。我們稍後會討論這項功能。目前只要記住 Pizza 代理程式服務的網址即可。
到目前為止,我們已成功將漢堡和披薩服務部署至 Cloud Run。接下來,我們來討論 A2A Server 的核心元件
4. A2A 伺服器的核心元件
接下來,我們將討論 A2A 伺服器的核心概念和元件
Agent Card
每個 A2A 伺服器都必須有可在 /.well-known/agent.json
資源上存取的代理程式卡片。這項功能可支援 A2A 用戶端的探索階段,應提供完整資訊和背景資訊,說明如何存取服務代理並瞭解其所有功能。這與使用 Swagger 或 Postman 建立的 API 說明文件類似。
這是我們部署的漢堡服務專員資訊卡內容
{
"name": "burger_seller_agent",
"description": "Helps with creating burger orders",
"url": "http://0.0.0.0:8080/",
"version": "1.0.0",
"capabilities": {
"streaming": false,
"pushNotifications": true,
"stateTransitionHistory": false
},
"authentication": {
"schemes": [
"Basic"
]
},
"defaultInputModes": [
"text",
"text/plain"
],
"defaultOutputModes": [
"text",
"text/plain"
],
"skills": [
{
"id": "create_burger_order",
"name": "Burger Order Creation Tool",
"description": "Helps with creating burger orders",
"tags": [
"burger order creation"
],
"examples": [
"I want to order 2 classic cheeseburgers"
]
}
]
}
這些代理人資訊卡會強調許多重要元件,例如代理人技能、串流功能、支援的模式和驗證。
您可以利用所有這些資訊來開發適當的通訊機制,讓 A2A 用戶端能夠正常通訊。支援的模式和驗證機制可確保建立正確的通訊,且可將服務機器人 skills
資訊嵌入 A2A 用戶端系統提示,提供用戶端服務機器人有關要叫用的遠端服務機器人功能和技能的資訊。如要進一步瞭解這個代理人資訊卡的欄位,請參閱這份說明文件。
在程式碼中,我們使用 Pydantic 在 a2a_types.py ( 在 remote_seller_agents/burger_agent
或 remote_seller_agents/pizza_agent
上) 中建立代理卡實作項目
...
class AgentProvider(BaseModel):
organization: str
url: str | None = None
class AgentCapabilities(BaseModel):
streaming: bool = False
pushNotifications: bool = False
stateTransitionHistory: bool = False
class AgentAuthentication(BaseModel):
schemes: List[str]
credentials: str | None = None
class AgentSkill(BaseModel):
id: str
name: str
description: str | None = None
tags: List[str] | None = None
examples: List[str] | None = None
inputModes: List[str] | None = None
outputModes: List[str] | None = None
class AgentCard(BaseModel):
name: str
description: str | None = None
url: str
provider: AgentProvider | None = None
version: str
documentationUrl: str | None = None
capabilities: AgentCapabilities
authentication: AgentAuthentication | None = None
defaultInputModes: List[str] = ["text"]
defaultOutputModes: List[str] = ["text"]
skills: List[AgentSkill]
...
物件建構作業則在 remote_seller_agents/burger_agent/__main__.py
上進行,如下所示:
...
def main(host, port):
"""Starts the Burger Seller Agent server."""
try:
capabilities = AgentCapabilities(pushNotifications=True)
skill = AgentSkill(
id="create_burger_order",
name="Burger Order Creation Tool",
description="Helps with creating burger orders",
tags=["burger order creation"],
examples=["I want to order 2 classic cheeseburgers"],
)
agent_card = AgentCard(
name="burger_seller_agent",
description="Helps with creating burger orders",
# The URL provided here is for the sake of demo,
# in production you should use a proper domain name
url=f"http://{host}:{port}/",
version="1.0.0",
authentication=AgentAuthentication(schemes=["Basic"]),
defaultInputModes=BurgerSellerAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=BurgerSellerAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
notification_sender_auth = PushNotificationSenderAuth()
notification_sender_auth.generate_jwk()
server = A2AServer(
agent_card=agent_card,
task_manager=AgentTaskManager(
agent=BurgerSellerAgent(),
notification_sender_auth=notification_sender_auth,
),
host=host,
port=port,
auth_username=os.environ.get("AUTH_USERNAME"),
auth_password=os.environ.get("AUTH_PASSWORD"),
)
...
工作定義和工作管理員
A2A 的核心元件之一是任務定義。這是在 JSON-RPC 標準上調整酬載格式。在本示範中,我們會在本節的 a2a_types.py ( 在 remote_seller_agents/burger_agent
或 remote_seller_agents/pizza_agent
上) 中,使用 Pydantic 實作這項功能
...
## RPC Messages
class JSONRPCMessage(BaseModel):
jsonrpc: Literal["2.0"] = "2.0"
id: int | str | None = Field(default_factory=lambda: uuid4().hex)
class JSONRPCRequest(JSONRPCMessage):
method: str
params: dict[str, Any] | None = None
...
class SendTaskRequest(JSONRPCRequest):
method: Literal["tasks/send"] = "tasks/send"
params: TaskSendParams
class SendTaskStreamingRequest(JSONRPCRequest):
method: Literal["tasks/sendSubscribe"] = "tasks/sendSubscribe"
params: TaskSendParams
class GetTaskRequest(JSONRPCRequest):
method: Literal["tasks/get"] = "tasks/get"
params: TaskQueryParams
class CancelTaskRequest(JSONRPCRequest):
method: Literal["tasks/cancel",] = "tasks/cancel"
params: TaskIdParams
class SetTaskPushNotificationRequest(JSONRPCRequest):
method: Literal["tasks/pushNotification/set",] = "tasks/pushNotification/set"
params: TaskPushNotificationConfig
class GetTaskPushNotificationRequest(JSONRPCRequest):
method: Literal["tasks/pushNotification/get",] = "tasks/pushNotification/get"
params: TaskIdParams
class TaskResubscriptionRequest(JSONRPCRequest):
method: Literal["tasks/resubscribe",] = "tasks/resubscribe"
params: TaskIdParams
...
您可以使用各種工作方法來支援不同類型的通訊 (例如同步、串流、非同步),並設定工作狀態的通知。A2A 伺服器可靈活設定,以便處理這些任務定義標準。
A2A 伺服器可能會處理不同服務專員或使用者的要求,並能完美隔離各項工作。如要更清楚地瞭解這些內容的背景,請查看下圖
因此,每個 A2A 伺服器都應能夠追蹤傳入的工作,並儲存相關的正確資訊,通常每個傳入的要求都會有 工作 ID 和 工作階段 ID。在程式碼中,這個工作管理員的實作項目位於 remote_seller_agents/burger_agent/task_manager.py
(Pizza 代理程式也共用類似的工作管理員)
...
class AgentTaskManager(InMemoryTaskManager):
def __init__(
self,
agent: BurgerSellerAgent,
notification_sender_auth: PushNotificationSenderAuth,
):
super().__init__()
self.agent = agent
self.notification_sender_auth = notification_sender_auth
...
async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
"""Handles the 'send task' request."""
validation_error = self._validate_request(request)
if validation_error:
return SendTaskResponse(id=request.id, error=validation_error.error)
await self.upsert_task(request.params)
if request.params.pushNotification:
if not await self.set_push_notification_info(
request.params.id, request.params.pushNotification
):
return SendTaskResponse(
id=request.id,
error=InvalidParamsError(
message="Push notification URL is invalid"
),
)
task = await self.update_store(
request.params.id, TaskStatus(state=TaskState.WORKING), None
)
await self.send_task_notification(task)
task_send_params: TaskSendParams = request.params
query = self._get_user_query(task_send_params)
try:
agent_response = self.agent.invoke(query, task_send_params.sessionId)
except Exception as e:
logger.error(f"Error invoking agent: {e}")
raise ValueError(f"Error invoking agent: {e}")
return await self._process_agent_response(request, agent_response)
...
async def _process_agent_response(
self, request: SendTaskRequest, agent_response: dict
) -> SendTaskResponse:
"""Processes the agent's response and updates the task store."""
task_send_params: TaskSendParams = request.params
task_id = task_send_params.id
history_length = task_send_params.historyLength
task_status = None
parts = [{"type": "text", "text": agent_response["content"]}]
artifact = None
if agent_response["require_user_input"]:
task_status = TaskStatus(
state=TaskState.INPUT_REQUIRED,
message=Message(role="agent", parts=parts),
)
else:
task_status = TaskStatus(state=TaskState.COMPLETED)
artifact = Artifact(parts=parts)
task = await self.update_store(
task_id, task_status, None if artifact is None else [artifact]
)
task_result = self.append_task_history(task, history_length)
await self.send_task_notification(task)
return SendTaskResponse(id=request.id, result=task_result)
...
從上述程式碼,我們可以檢查在處理傳入的工作時 ( 當傳入的要求方法為 tasks/send
時,系統會執行 on_send_task
方法),系統會執行多項作業,包括更新工作儲存庫 ( self.update_store
方法呼叫) 和傳送通知 ( self.send_task_notification
方法呼叫)。這是 A2A 伺服器如何在同步傳送工作要求期間管理工作狀態更新和通知的其中一個範例。
摘要
簡單來說,目前部署的 A2A 伺服器可支援下列 2 項功能:
- 在
/.well-known/agent.json
路徑上發布代理方資訊卡 - 使用方法
tasks/send
處理 JSON-RPC 要求
您可以在 main.py 指令碼 ( 位於 remote_seller_agents/burger_agent
或 remote_seller_agents/pizza_agent
上) 中檢查啟動這些功能的進入點。我們可以看到,我們需要先設定 Agent Card,然後在下方程式碼片段中啟動伺服器
...
capabilities = AgentCapabilities(pushNotifications=True)
skill = AgentSkill(
id="create_pizza_order",
name="Pizza Order Creation Tool",
description="Helps with creating pizza orders",
tags=["pizza order creation"],
examples=["I want to order 2 pepperoni pizzas"],
)
agent_card = AgentCard(
name="pizza_seller_agent",
description="Helps with creating pizza orders",
# The URL provided here is for the sake of demo,
# in production you should use a proper domain name
url=f"http://{host}:{port}/",
version="1.0.0",
authentication=AgentAuthentication(schemes=["Bearer"]),
defaultInputModes=PizzaSellerAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=PizzaSellerAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill],
)
...
server = A2AServer(
agent_card=agent_card,
task_manager=AgentTaskManager(
agent=PizzaSellerAgent(),
notification_sender_auth=notification_sender_auth,
),
host=host,
port=port,
api_key=os.environ.get("API_KEY"),
)
...
logger.info(f"Starting server on {host}:{port}")
server.start()
...
5. 將購買服務專員 - A2A 用戶端部署至 Cloud Run
在這個步驟中,我們會部署購買服務專員代理程式。這個代理程式就是我們要與之互動的代理程式。
購買服務專員的程式碼位於 purchasing_concierge 目錄下。您可以在 purchasing_agent.py 指令碼中檢查代理程式初始化程序。以下是已初始化的代理程式程式碼片段。
from google.adk import Agent
...
def create_agent(self) -> Agent:
return Agent(
model="gemini-2.0-flash-001",
name="purchasing_agent",
instruction=self.root_instruction,
before_model_callback=self.before_model_callback,
description=(
"This purchasing agent orchestrates the decomposition of the user purchase request into"
" tasks that can be performed by the seller agents."
),
tools=[
self.list_remote_agents,
self.send_task,
],
)
...
接下來,我們需要先準備 .env 變數,請將 .env.example 複製到 .env 檔案中
cp purchasing_concierge/.env.example purchasing_concierge/.env
接著開啟 purchasing_concierge/.env 檔案,您會看到以下內容
PIZZA_SELLER_AGENT_AUTH=pizza123 PIZZA_SELLER_AGENT_URL=http://localhost:10000 BURGER_SELLER_AGENT_AUTH=burgeruser123:burgerpass123 BURGER_SELLER_AGENT_URL=http://localhost:10001 GOOGLE_GENAI_USE_VERTEXAI=TRUE GOOGLE_CLOUD_PROJECT={your-project-id} GOOGLE_CLOUD_LOCATION=us-central1
這個代理程式會與漢堡和披薩代理程式進行通訊,因此我們需要為這兩個代理程式提供適當的憑證。請將 GCLOUD_PROJECT_ID 變數更新為目前的有效專案 ID。
接下來,我們也需要使用上一個步驟中的 Cloud Run 網址,更新 PIZZA_SELLER_AGENT_URL 和 BURGER_SELLER_AGENT_URL。如果忘記這點,請前往 Cloud Run 控制台。在控制台頂端的搜尋列中輸入「Cloud Run」,然後按一下 Cloud Run 圖示,在新分頁中開啟
你應該會看到先前部署的遠端賣家服務代理人,如下所示
如要查看這些服務的公開網址,請按一下其中一個服務,系統就會將您重新導向至「Service details」(服務詳細資料) 頁面。你可以在頂端區域的「區域」資訊旁邊看到網址
複製這個網址的值,分別貼到 PIZZA_SELLER_AGENT_URL 和 BURGER_SELLER_AGENT_URL。
最終環境變數應如下所示
PIZZA_SELLER_AGENT_AUTH=pizza123 PIZZA_SELLER_AGENT_URL=https://pizza-agent-xxxxx.us-central1.run.app BURGER_SELLER_AGENT_AUTH=burgeruser123:burgerpass123 BURGER_SELLER_AGENT_URL=https://burger-agent-xxxxx.us-central1.run.app GOOGLE_GENAI_USE_VERTEXAI=TRUE GOOGLE_CLOUD_PROJECT={your-project-id} GOOGLE_CLOUD_LOCATION=us-central1
我們現在可以部署購買服務專員代理程式了。如要部署這個代理程式,請執行下列指令
gcloud run deploy purchasing-concierge \
--source . \
--port=8080 \
--allow-unauthenticated \
--min 1 \
--region us-central1 \
--memory 1024Mi
部署成功後,系統會顯示類似以下的記錄。
Service [purchasing-concierge] revision [purchasing-concierge-xxxxx-xxx] has been deployed and is serving 100 percent of traffic. Service URL: https://purchasing-concierge-xxxxxx.us-central1.run.app
這裡的 xxxx
部分會成為部署服務時的專屬 ID。
我們現在可以嘗試透過 UI 與購買服務專員互動。您存取服務網址時,應該會看到如下所示的 Gradio 網頁介面
接下來,我們來討論 A2A 用戶端的核心元件和一般流程。
6. A2A 用戶端的核心元件
上圖顯示 A2A 互動活動的一般流程:
- 用戶端會嘗試在路徑中提供的遠端代理程式網址中,尋找任何已發布的代理程式資訊卡
/.well-known/agent.json
- 接著,在必要時,會向該服務項目傳送工作,其中包含訊息和必要的中繼資料參數 ( 例如工作階段 ID、歷史記錄背景等)
- A2A 伺服器會驗證並處理要求,如果伺服器支援推播通知,也會嘗試在工作處理期間發布一些通知
- 完成後,A2A 伺服器會將回應構件傳回用戶端
上述互動動作的核心物件為以下項目 (詳情請參閱這篇文章):
- Task:具有狀態實體,可讓用戶端和遠端代理程式達成特定結果並產生結果
- 構件:Task 的最終結果
- 訊息:任何非構件的內容。例如:服務專員的想法、使用者背景資訊、指示、錯誤、狀態或中繼資料
- 部分:客戶與遠端代理程式之間交換的完整內容,做為訊息或構件的一部分。部分內容可以是文字、圖片、影片、檔案等。
- 推播通知 (選用):安全的通知機制,可讓服務專員在連線工作階段外通知用戶端有更新
資訊卡
當 A2A 用戶端服務啟動時,一般程序是嘗試取得代理卡資訊並儲存,以便在需要時輕鬆存取。我們可以在這裡的 purchasing_concierge/purchasing_agent.py 指令碼中查看
...
class PurchasingAgent:
"""The purchasing agent.
This is the agent responsible for choosing which remote seller agents to send
tasks to and coordinate their work.
"""
def __init__(
self,
remote_agent_addresses: List[str],
task_callback: TaskUpdateCallback | None = None,
):
self.task_callback = task_callback
self.remote_agent_connections: dict[str, RemoteAgentConnections] = {}
self.cards: dict[str, AgentCard] = {}
for address in remote_agent_addresses:
card_resolver = A2ACardResolver(address)
try:
card = card_resolver.get_agent_card()
# The URL accessed here should be the same as the one provided in the agent card
# However, in this demo we are using the URL provided in the key arguments
remote_connection = RemoteAgentConnections(
agent_card=card, agent_url=address
)
self.remote_agent_connections[card.name] = remote_connection
self.cards[card.name] = card
except httpx.ConnectError:
print(f"ERROR: Failed to get agent card from : {address}")
agent_info = []
for ra in self.list_remote_agents():
agent_info.append(json.dumps(ra))
self.agents = "\n".join(agent_info)
...
提示及傳送工作工具
接著,我們會在購買服務專員系統提示中提供遠端服務專員的內容,並提供工具將工作傳送給服務專員。這是我們提供給 ADK 服務專員的提示和工具
...
def root_instruction(self, context: ReadonlyContext) -> str:
current_agent = self.check_active_agent(context)
return f"""You are an expert purchasing delegator that can delegate the user product inquiry and purchase request to the
appropriate seller remote agents.
Execution:
- For actionable tasks, you can use `send_task` to assign tasks to remote agents to perform.
- When the remote agent is repeatedly asking for user confirmation, assume that the remote agent doesn't have access to user's conversation context.
So improve the task description to include all the necessary information related to that agent
- Never ask user permission when you want to connect with remote agents. If you need to make connection with multiple remote agents, directly
connect with them without asking user permission or asking user preference
- Always show the detailed response information from the seller agent and propagate it properly to the user.
- If the remote seller is asking for confirmation, rely the confirmation question to the user if the user haven't do so.
- If the user already confirmed the related order in the past conversation history, you can confirm on behalf of the user
- Do not give irrelevant context to remote seller agent. For example, ordered pizza item is not relevant for the burger seller agent
- Never ask order confirmation to the remote seller agent
Please rely on tools to address the request, and don't make up the response. If you are not sure, please ask the user for more details.
Focus on the most recent parts of the conversation primarily.
If there is an active agent, send the request to that agent with the update task tool.
Agents:
{self.agents}
Current active seller agent: {current_agent["active_agent"]}
"""
...
async def send_task(self, agent_name: str, task: str, tool_context: ToolContext):
"""Sends a task to remote seller agent
This will send a message to the remote agent named agent_name.
Args:
agent_name: The name of the agent to send the task to.
task: The comprehensive conversation context summary
and goal to be achieved regarding user inquiry and purchase request.
tool_context: The tool context this method runs in.
Yields:
A dictionary of JSON data.
"""
if agent_name not in self.remote_agent_connections:
raise ValueError(f"Agent {agent_name} not found")
state = tool_context.state
state["active_agent"] = agent_name
client = self.remote_agent_connections[agent_name]
if not client:
raise ValueError(f"Client not available for {agent_name}")
if "task_id" in state:
taskId = state["task_id"]
else:
taskId = str(uuid.uuid4())
sessionId = state["session_id"]
task: Task
messageId = ""
metadata = {}
if "input_message_metadata" in state:
metadata.update(**state["input_message_metadata"])
if "message_id" in state["input_message_metadata"]:
messageId = state["input_message_metadata"]["message_id"]
if not messageId:
messageId = str(uuid.uuid4())
metadata.update(**{"conversation_id": sessionId, "message_id": messageId})
request: TaskSendParams = TaskSendParams(
id=taskId,
sessionId=sessionId,
message=Message(
role="user",
parts=[TextPart(text=task)],
metadata=metadata,
),
acceptedOutputModes=["text", "text/plain"],
# pushNotification=None,
metadata={"conversation_id": sessionId},
)
task = await client.send_task(request, self.task_callback)
# Assume completion unless a state returns that isn't complete
state["session_active"] = task.status.state not in [
TaskState.COMPLETED,
TaskState.CANCELED,
TaskState.FAILED,
TaskState.UNKNOWN,
]
if task.status.state == TaskState.INPUT_REQUIRED:
# Force user input back
tool_context.actions.escalate = True
elif task.status.state == TaskState.COMPLETED:
# Reset active agent is task is completed
state["active_agent"] = "None"
response = []
if task.status.message:
# Assume the information is in the task message.
response.extend(convert_parts(task.status.message.parts, tool_context))
if task.artifacts:
for artifact in task.artifacts:
response.extend(convert_parts(artifact.parts, tool_context))
return response
...
在提示中,我們會將所有可用的遠端服務代理人名稱和說明提供給購物服務專員,並在工具 self.send_task
中提供機制,用於擷取適當的用戶端來連線至服務代理人,並使用 TaskSendParams
物件傳送必要的中繼資料。
在這個工具中,我們也可以指定在工作無法完成時,服務機器人應採取的動作。最後,我們需要處理在工作完成時傳回的回應構件
7. 整合測試和酬載檢查
接下來,我們來試試以下對話,並檢查購物專員服務的使用者介面和服務記錄。請嘗試進行以下對話:
- 請顯示漢堡和披薩的菜單
- 我想訂購 1 份燒烤雞披薩和 1 份肯瓊風味漢堡
並繼續對話,直到完成訂單為止。檢查互動情形,以及工具呼叫和回應為何?下圖為互動結果的範例。
我們可以看到,與 2 個不同的代理程式通訊會產生 2 種不同的行為,而 A2A 可以妥善處理這項問題。漢堡賣家服務專員會直接接受購買服務專員的要求,但比薩服務專員需要先取得我們的確認,才能繼續處理要求,且在我們確認後,才能將確認資訊傳達給比薩服務專員
接著,我們來看看 purchasing-agent 服務記錄中的交換資料。首先,請前往 Cloud Run 控制台,在控制台頂端的搜尋列中輸入「Cloud Run」,然後在 Cloud Run 圖示上按一下滑鼠右鍵,在新瀏覽器分頁中開啟
現在,您應該會看到先前部署的服務,如下所示。按一下「購買服務專員」
您現在會看到「Service details」(服務詳細資料) 頁面,請按一下「Logs」(記錄檔) 分頁標籤
我們現在會看到已部署的 purchasing-concierge 服務記錄。請向下捲動,找出最近的互動記錄
您會發現,A2A 用戶端和伺服器之間的要求和回應會採用 JSON-RPC 格式,並符合 A2A 標準。
我們現在已完成 A2A 的基本概念,並瞭解如何將其做為用戶端和伺服器架構實作
8. 清理
如要避免系統向您的 Google Cloud 帳戶收取本程式碼研究室所用資源的費用,請按照下列步驟操作:
- 在 Google Cloud 控制台中前往「管理資源」頁面。
- 在專案清單中選取要刪除的專案,然後點按「刪除」。
- 在對話方塊中輸入專案 ID,然後按一下「Shut down」(關閉) 即可刪除專案。
- 或者,您也可以前往控制台的 Cloud Run,選取剛部署的服務並刪除。