エージェント間(A2A)プロトコルのスタートガイド: Cloud Run で Gemini を使用する購入コンシェルジュとリモート販売エージェントのやり取り

1. はじめに

3356bc1e3134fab5.png

エージェント間(A2A)プロトコルは、AI エージェント間の通信を標準化するために設計されています。特に、外部システムにデプロイされているエージェントが対象です。以前は、LLM をデータやリソースに接続するための新しい標準である Model Context Protocol(MCP)というツール用にこのようなプロトコルが確立されていました。A2A は MCP を補完しようとしています。A2A は別の問題に焦点を当てています。MCP はエージェントとツールやデータを接続するための複雑さを軽減することに焦点を当てていますが、A2A はエージェントが自然な方法でコラボレーションできるようにすることに焦点を当てています。これにより、エージェントはツールとしてではなく、エージェント(またはユーザー)としてやり取りできるようになります。たとえば、何かを注文する際にやり取りを可能にできます。

A2A は MCP を補完するものです。公式ドキュメントでは、アプリケーションで A2A エージェントを MCP リソース(AgentCard で表されます)としてモデル化することをおすすめしています(後で説明します)。これにより、フレームワークは A2A を使用してユーザー、リモート エージェント、他のエージェントと通信できます。

83b1a03588b90b68.png

このデモでは、A2A のみを最初から実装します。これらのサンプル リポジトリ から派生したユースケースとして、ハンバーガーとピザの販売エージェントとやり取りして注文を処理できるパーソナル ショッピング コンシェルジュを使用するユースケースについて説明します。

A2A はクライアント サーバー モデルを使用します。このデモで想定される一般的な A2A フロー

73dae827aa9bddc3.png

  1. A2A クライアントは、まずアクセス可能なすべての A2A サーバー エージェント カードを検出し、その情報を使用して接続クライアントを構築します。
  2. 必要に応じて、A2A クライアントは A2A サーバーにタスクを送信します。A2A クライアントでプッシュ通知レシーバーの URL が構成されている場合、A2A サーバーはタスクの進行状況のステータスを受信エンドポイントに公開することもできます。
  3. タスクが完了すると、A2A サーバーは A2A クライアントにレスポンス アーティファクトを送信します。

この Codelab では、次の手順で進めていきます。

  1. Google Cloud プロジェクトを準備し、必要なすべての API を有効にする
  2. コーディング環境のワークスペースを設定する
  3. ハンバーガー エージェント サービスとピザ エージェント サービスの環境変数を準備する
  4. ハンバーガーとピザのエージェントを Cloud Run にデプロイする
  5. A2A サーバーの確立方法の詳細を確認する
  6. コンシェルジュの購入用の環境変数を準備する
  7. 購入コンシェルジュを Cloud Run にデプロイする
  8. A2A クライアントの設定方法とデータモデリングの詳細を確認する
  9. A2A クライアントとサーバー間のペイロードとインタラクションを検査する

アーキテクチャの概要

次のサービス アーキテクチャをデプロイします。

bc8f96e071ae53ff.png

A2A サーバーとして機能する 2 つのサービス(Burger エージェント(CrewAI エージェント フレームワークをベースとする)と Pizza エージェント(Langgraph エージェント フレームワークをベースとする))をデプロイします。ユーザーが直接操作するのは、A2A クライアントとして機能するエージェント開発キット(ADK)フレームワークを使用して実行される購入コンシェルジュのみです。

これらのエージェントにはそれぞれ独自の環境とデプロイメントがあります。

前提条件

  • Python を使い慣れている
  • HTTP サービスを使用する基本的なフルスタック アーキテクチャの理解

学習内容

  • A2A サーバーのコア構造
  • A2A クライアントのコア構造
  • Cloud Run へのサービスのデプロイ
  • A2A クライアントが A2A サーバーに接続する方法
  • 非ストリーミング接続のリクエストとレスポンスの構造

必要なもの

  • Chrome ウェブブラウザ
  • Gmail アカウント
  • 課金が有効になっている Cloud プロジェクト

この Codelab は、初心者を含むあらゆるレベルのデベロッパーを対象としており、サンプル アプリケーションで Python を使用します。ただし、ここで説明するコンセプトを理解するために Python の知識は必要ありません。

2. 始める前に

Cloud コンソールでアクティブなプロジェクトを選択する

この Codelab は、課金が有効になっている Google Cloud プロジェクトがすでにあることを前提としています。まだアカウントをお持ちでない場合は、以下の手順に沿って作成してください。

  1. Google Cloud コンソールのプロジェクト選択ページで、Google Cloud プロジェクトを選択または作成します。
  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

bc8d176ea42fbb7.png

Cloud Shell ターミナルで Cloud プロジェクトを設定する

  1. Cloud Shell(Google Cloud で動作するコマンドライン環境)を使用します。この環境には bq がプリロードされています。Google Cloud コンソールの上部にある [Cloud Shell をアクティブにする] をクリックします。承認を求めるメッセージが表示されたら、[承認] をクリックします。

1829c3759227c19b.png

  1. Cloud Shell に接続したら、次のコマンドを使用して、認証が完了していることと、プロジェクトがプロジェクト ID に設定されていることを確認します。
gcloud auth list
  1. Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。
gcloud config list project
  1. プロジェクトが設定されていない場合は、次のコマンドを使用して設定します。
gcloud config set project <YOUR_PROJECT_ID>

コンソールで PROJECT_ID ID を確認することもできます。

4032c45803813f30.jpeg

これをクリックすると、右側にすべてのプロジェクトとプロジェクト ID が表示されます。

8dc17eb4271de6b5.jpeg

  1. 次のコマンドを使用して、必要な API を有効にします。この処理には数分かかることがありますので、少々お待ちください。
gcloud services enable aiplatform.googleapis.com \
                       run.googleapis.com \
                       cloudbuild.googleapis.com \
                       cloudresourcemanager.googleapis.com

コマンドが正常に実行されると、次のようなメッセージが表示されます。

Operation "operations/..." finished successfully.

gcloud コマンドの代わりに、コンソールで各プロダクトを検索するか、このリンクを使用します。

必要な API が不足している場合は、実装中にいつでも有効にできます。

gcloud コマンドとその使用方法については、ドキュメントをご覧ください。

Cloud Shell エディタに移動してアプリケーションの作業ディレクトリを設定する

これで、コードエディタを設定してコーディングを開始できます。ここでは、Cloud Shell エディタを使用します。

  1. [エディタを開く] ボタンをクリックすると、Cloud Shell エディタが開きます。ここにコードを記述します。b16d56e4979ec951.png
  2. 下の図に示すように、Cloud Shell エディタの左下(ステータスバー)に Cloud Code プロジェクトが設定され、請求が有効になっているアクティブな Google Cloud プロジェクトに設定されていることを確認します。プロンプトが表示されたら [承認] をクリックします。前のコマンドに沿って操作している場合は、ボタンがログインボタンではなく、有効化されたプロジェクトに直接リンクしていることもあります。

f5003b9c38b43262.png

  1. 次に、この Codelab のテンプレート作業ディレクトリのクローンを GitHub から作成します。次のコマンドを実行します。作業ディレクトリが purchasing-concierge-a2a ディレクトリに作成されます。
git clone https://github.com/alphinside/purchasing-concierge-intro-a2a-codelab-starter.git purchasing-concierge-a2a
  1. 次に、Cloud Shell エディタの上部セクションに移動し、[ファイル] -> [フォルダを開く] をクリックして、ユーザー名 ディレクトリを見つけ、purchasing-concierge-a2a ディレクトリを見つけて、[OK] ボタンをクリックします。これにより、選択したディレクトリがメインの作業ディレクトリになります。この例では、ユーザー名は alvinprayuda であるため、ディレクトリ パスは次のようになります。

2c53696f81d805cc.png

253b472fa1bd752e.png

Cloud Shell エディタは次のようになります。

6a2aa5fc278f5456.png

環境の設定

次のステップは、開発環境を準備することです。現在のアクティブなターミナルは、purchasing-concierge-a2a 作業ディレクトリ内にある必要があります。この Codelab では Python 3.12 を使用し、uv python プロジェクト マネージャーを使用して、Python バージョンと仮想環境の作成と管理を簡素化します。

  1. ターミナルを開いていない場合は、[Terminal] -> [New Terminal] をクリックするか、Ctrl+Shift+C を使用して開きます。ターミナル ウィンドウがブラウザの下部に開きます。

f8457daf0bed059e.jpeg

  1. uv をダウンロードし、次のコマンドを使用して Python 3.12 をインストールします。
curl -LsSf https://astral.sh/uv/0.7.2/install.sh | sh && \
source $HOME/.local/bin/env && \
uv python install 3.12
  1. 次に、uv を使用して購入コンシェルジュの仮想環境を初期化します。次のコマンドを実行します。
uv sync --frozen

これにより、.venv ディレクトリが作成され、依存関係がインストールされます。pyproject.toml をざっと見てみると、次のように依存関係に関する情報が表示されます。

dependencies = [
    "google-adk>=0.3.0",
    "gradio>=5.28.0",
    "httpx>=0.28.1",
    "jwcrypto>=1.5.6",
    "pydantic>=2.10.6",
    "pyjwt>=2.10.1",
    "sse-starlette>=2.3.3",
    "starlette>=0.46.2",
    "typing-extensions>=4.13.2",
    "uvicorn>=0.34.0",
]
  1. 仮想環境をテストするには、新しいファイル main.py を作成し、次のコードをコピーします。
def main():
   print("Hello from purchasing-concierge-a2a!")

if __name__ == "__main__":
   main()
  1. 次に、次のコマンドを実行します。
uv run main.py

次のような出力が表示されます。

Using CPython 3.12
Creating virtual environment at: .venv
Hello from purchasing-concierge-a2a!

これは、Python プロジェクトが正しく設定されていることを示しています。

これで、次のステップであるリモート販売者エージェントの構成とデプロイに進むことができます。

3. リモート販売者エージェント - A2A サーバーを Cloud Run にデプロイする

このステップでは、赤いボックスで囲まれている 2 つのリモート販売者エージェントをデプロイします。ハンバーガー エージェントは CrewAI エージェント フレームワークを、ピザ エージェントは Langgraph エージェントを使用します。どちらも Gemini Flash 2.0 モデルをベースにしています。

ba7eefb0c30f0c46.png

リモート Burger エージェントのデプロイ

ハンバーガー エージェントのソースコードは、remote_seller_agents/burger_agent ディレクトリにあります。エージェントの初期化は agent.py スクリプトで確認できます。初期化されたエージェントのコード スニペットは次のとおりです。

from crewai import Agent, Crew, LLM, Task, Process
from crewai.tools import tool

...

model = LLM(
    model="vertex_ai/gemini-2.0-flash",  # Use base model name without provider prefix
)
burger_agent = Agent(
    role="Burger Seller Agent",
    goal=(
        "Help user to understand what is available on burger menu and price also handle order creation."
    ),
    backstory=("You are an expert and helpful burger seller agent."),
    verbose=False,
    allow_delegation=False,
    tools=[create_burger_order],
    llm=model,
)

agent_task = Task(
    description=self.TaskInstruction,
    output_pydantic=ResponseFormat,
    agent=burger_agent,
    expected_output=(
        "A JSON object with 'status' and 'message' fields."
        "Set response status to input_required if asking for user order confirmation."
        "Set response status to error if there is an error while processing the request."
        "Set response status to completed if the request is complete."
    ),
)

crew = Crew(
    tasks=[agent_task],
    agents=[burger_agent],
    verbose=False,
    process=Process.sequential,
)

inputs = {"user_prompt": query, "session_id": sessionId}
response = crew.kickoff(inputs)

...

次に、.env 変数を準備する必要があります。.env.example.env ファイルにコピーします。

cp remote_seller_agents/burger_agent/.env.example remote_seller_agents/burger_agent/.env

remote_seller_agents/burger_agent/.env ファイルを開くと、次の内容が表示されます。

AUTH_USERNAME=burgeruser123
AUTH_PASSWORD=burgerpass123
GCLOUD_LOCATION=us-central1
GCLOUD_PROJECT_ID={your-project-id}

ハンバーガー エージェントの A2A サーバーは、基本 HTTP 認証(Base64 でエンコードされたユーザー名とパスワードを使用)を使用して実行されます。このデモでは、.env ファイルで許可されたユーザー名とパスワードを構成します。次に、GCLOUD_PROJECT_ID 変数を、現在アクティブなプロジェクト ID に更新します。

変更を保存してください。次に、サービスを直接デプロイします。コードの内容については後で確認します。次のコマンドを実行してデプロイします。

gcloud run deploy burger-agent \
           --source remote_seller_agents/burger_agent \
           --port=8080 \
           --allow-unauthenticated \
           --min 1 \
           --region us-central1

ソースからデプロイするためにコンテナ リポジトリが作成されることを示すメッセージが表示されたら、[Y] と回答します。デプロイが正常に完了すると、次のようなログが表示されます。

Service [burger-agent] revision [burger-agent-xxxxx-xxx] has been deployed and is serving 100 percent of traffic.
Service URL: https://burger-agent-xxxxxxxxx.us-central1.run.app

ここでの xxxx 部分は、サービスをデプロイする際の一意の識別子になります。

次に、ブラウザからデプロイされたバーガー エージェント サービスの /.well-known/agent.json ルートを試してみます。次のような出力が表示されます。

7b4e9ffc00131552.png

これは、検索目的でアクセスできるバーガー エージェント カードの情報です。これについては後ほど説明します。バーガー エージェント サービスの URL を覚えておいてください。後で使用します。

リモート ピザ エージェントのデプロイ

同様に、ピザ エージェントのソースコードは remote_seller_agents/pizza_agent ディレクトリにあります。エージェントの初期化は agent.py スクリプトで確認できます。初期化されたエージェントのコード スニペットは次のとおりです。

from langchain_google_vertexai import ChatVertexAI
from langgraph.prebuilt import create_react_agent

...

self.model = ChatVertexAI(
    model="gemini-2.0-flash",
    location=os.getenv("GCLOUD_LOCATION"),
    project=os.getenv("GCLOUD_PROJECT_ID"),
)
self.tools = [create_pizza_order]
self.graph = create_react_agent(
    self.model,
    tools=self.tools,
    checkpointer=memory,
    prompt=self.SYSTEM_INSTRUCTION,
    response_format=ResponseFormat,
)

...

次に、.env 変数を準備する必要があります。.env.example.env ファイルにコピーします。

cp remote_seller_agents/pizza_agent/.env.example remote_seller_agents/pizza_agent/.env

remote_seller_agents/pizza_agent/.env ファイルを開くと、次の内容が表示されます。

API_KEY=pizza123
GCLOUD_LOCATION=us-central1
GCLOUD_PROJECT_ID={your-project-id}

Pizza エージェントの A2A サーバーは、ベアラー HTTP 認証(API キーを使用)を使用して実行されます。このデモでは、.env ファイルで許可された API キーを構成します。次に、GCLOUD_PROJECT_ID 変数を、現在アクティブなプロジェクト ID に更新します。

変更を保存してください。次に、サービスを直接デプロイします。コードの内容については後で確認します。次のコマンドを実行してデプロイします。

gcloud run deploy pizza-agent \
           --source remote_seller_agents/pizza_agent \
           --port=8080 \
           --allow-unauthenticated \
           --min 1 \
           --region us-central1

デプロイが正常に完了すると、次のようなログが表示されます。

Service [pizza-agent] revision [pizza-agent-xxxxx-xxx] has been deployed and is serving 100 percent of traffic.
Service URL: https://pizza-agent-xxxxxxxxx.us-central1.run.app

ここでの xxxx 部分は、サービスをデプロイする際の一意の識別子になります。

次に、ブラウザからデプロイされたピザ エージェント サービスの /.well-known/agent.json ルートを試してみます。次のような出力が表示されます。

88bd25001af5dcbc.png

これは、検索目的でアクセスできるピザ エージェント カードの情報です。これについては後ほど説明します。とりあえず、ピザ エージェント サービスの URL を覚えておきます。

この時点で、ハンバーガー サービスとピザ サービスの両方が Cloud Run に正常にデプロイされています。次に、A2A サーバーのコア コンポーネントについて説明します。

4. A2A サーバーのコア コンポーネント

次に、A2A サーバーの基本コンセプトとコンポーネントについて説明します。

エージェントカード

各 A2A サーバーに、/.well-known/agent.json リソースでアクセス可能なエージェントカードが必要です。これは、A2A クライアントの検出フェーズをサポートするためのものです。このフェーズでは、エージェントにアクセスしてすべての機能を把握するための完全な情報とコンテキストを提供する必要があります。Swagger や Postman を使用してよくドキュメント化された API ドキュメントに似ています。

デプロイされたハンバーガー エージェントのエージェントカードの内容は次のとおりです。

{
  "name": "burger_seller_agent",
  "description": "Helps with creating burger orders",
  "url": "http://0.0.0.0:8080/",
  "version": "1.0.0",
  "capabilities": {
    "streaming": false,
    "pushNotifications": true,
    "stateTransitionHistory": false
  },
  "authentication": {
    "schemes": [
      "Basic"
    ]
  },
  "defaultInputModes": [
    "text",
    "text/plain"
  ],
  "defaultOutputModes": [
    "text",
    "text/plain"
  ],
  "skills": [
    {
      "id": "create_burger_order",
      "name": "Burger Order Creation Tool",
      "description": "Helps with creating burger orders",
      "tags": [
        "burger order creation"
      ],
      "examples": [
        "I want to order 2 classic cheeseburgers"
      ]
    }
  ]
}

これらのエージェントカードには、エージェントのスキル、ストリーミング機能、サポートされているモダリティ、認証など、多くの重要なコンポーネントがハイライト表示されます。

これらの情報をすべて利用して、A2A クライアントが適切に通信できるように適切な通信メカニズムを開発できます。サポートされているモダリティと認証メカニズムにより、通信を適切に確立できます。エージェントの skills 情報を A2A クライアント システム プロンプトに埋め込むことで、呼び出されるリモート エージェントの機能とスキルに関するコンテキストをクライアントのエージェントに提供できます。このエージェントカードの詳細なフィールドについては、こちらのドキュメントをご覧ください。

コードでは、a2a_types.pyremote_seller_agents/burger_agent または remote_seller_agents/pizza_agent)で Pydantic を使用してエージェントカードの実装が確立されています。

...

class AgentProvider(BaseModel):
    organization: str
    url: str | None = None


class AgentCapabilities(BaseModel):
    streaming: bool = False
    pushNotifications: bool = False
    stateTransitionHistory: bool = False


class AgentAuthentication(BaseModel):
    schemes: List[str]
    credentials: str | None = None


class AgentSkill(BaseModel):
    id: str
    name: str
    description: str | None = None
    tags: List[str] | None = None
    examples: List[str] | None = None
    inputModes: List[str] | None = None
    outputModes: List[str] | None = None


class AgentCard(BaseModel):
    name: str
    description: str | None = None
    url: str
    provider: AgentProvider | None = None
    version: str
    documentationUrl: str | None = None
    capabilities: AgentCapabilities
    authentication: AgentAuthentication | None = None
    defaultInputModes: List[str] = ["text"]
    defaultOutputModes: List[str] = ["text"]
    skills: List[AgentSkill]

...

オブジェクトの作成は、以下に反映されている remote_seller_agents/burger_agent/__main__.py にあります。

...

def main(host, port):
    """Starts the Burger Seller Agent server."""
    try:
        capabilities = AgentCapabilities(pushNotifications=True)
        skill = AgentSkill(
            id="create_burger_order",
            name="Burger Order Creation Tool",
            description="Helps with creating burger orders",
            tags=["burger order creation"],
            examples=["I want to order 2 classic cheeseburgers"],
        )
        agent_card = AgentCard(
            name="burger_seller_agent",
            description="Helps with creating burger orders",
            # The URL provided here is for the sake of demo,
            # in production you should use a proper domain name
            url=f"http://{host}:{port}/",
            version="1.0.0",
            authentication=AgentAuthentication(schemes=["Basic"]),
            defaultInputModes=BurgerSellerAgent.SUPPORTED_CONTENT_TYPES,
            defaultOutputModes=BurgerSellerAgent.SUPPORTED_CONTENT_TYPES,
            capabilities=capabilities,
            skills=[skill],
        )

        notification_sender_auth = PushNotificationSenderAuth()
        notification_sender_auth.generate_jwk()
        server = A2AServer(
            agent_card=agent_card,
            task_manager=AgentTaskManager(
                agent=BurgerSellerAgent(),
                notification_sender_auth=notification_sender_auth,
            ),
            host=host,
            port=port,
            auth_username=os.environ.get("AUTH_USERNAME"),
            auth_password=os.environ.get("AUTH_PASSWORD"),
        )

...

タスクの定義とタスク マネージャー

A2A のコア コンポーネントの 1 つが、タスクの定義です。JSON-RPC 標準に基づいてペイロード形式を適応させています。このデモでは、このセクションの a2a_types.pyremote_seller_agents/burger_agent または remote_seller_agents/pizza_agent)で Pydantic を使用して実装しています。

...

## RPC Messages


class JSONRPCMessage(BaseModel):
    jsonrpc: Literal["2.0"] = "2.0"
    id: int | str | None = Field(default_factory=lambda: uuid4().hex)


class JSONRPCRequest(JSONRPCMessage):
    method: str
    params: dict[str, Any] | None = None

...

class SendTaskRequest(JSONRPCRequest):
    method: Literal["tasks/send"] = "tasks/send"
    params: TaskSendParams

class SendTaskStreamingRequest(JSONRPCRequest):
    method: Literal["tasks/sendSubscribe"] = "tasks/sendSubscribe"
    params: TaskSendParams

class GetTaskRequest(JSONRPCRequest):
    method: Literal["tasks/get"] = "tasks/get"
    params: TaskQueryParams

class CancelTaskRequest(JSONRPCRequest):
    method: Literal["tasks/cancel",] = "tasks/cancel"
    params: TaskIdParams

class SetTaskPushNotificationRequest(JSONRPCRequest):
    method: Literal["tasks/pushNotification/set",] = "tasks/pushNotification/set"
    params: TaskPushNotificationConfig

class GetTaskPushNotificationRequest(JSONRPCRequest):
    method: Literal["tasks/pushNotification/get",] = "tasks/pushNotification/get"
    params: TaskIdParams

class TaskResubscriptionRequest(JSONRPCRequest):
    method: Literal["tasks/resubscribe",] = "tasks/resubscribe"
    params: TaskIdParams

...

さまざまなタイプの通信(同期、ストリーミング、非同期など)をサポートし、タスクのステータスの通知を構成するために使用できるタスク メソッドがいくつかあります。A2A サーバーは、これらのタスク定義標準を処理するように柔軟に構成できます。

A2A サーバーは、異なるエージェントまたはユーザーからのリクエストを処理し、各タスクを完全に分離できる場合があります。これらのコンテキストをより明確に可視化するには、下の画像を検査します。

bf84e3517789fb9d.png

したがって、各 A2A サーバーは、受信したタスクを追跡し、タスクに関する適切な情報を保存できる必要があります。通常、受信する各リクエストには、タスク IDセッション ID が含まれます。コードでは、このタスク マネージャーの実装は remote_seller_agents/burger_agent/task_manager.py にあります(ピザ エージェントも同様のタスク マネージャーを共有しています)。

...

class AgentTaskManager(InMemoryTaskManager):
    def __init__(
        self,
        agent: BurgerSellerAgent,
        notification_sender_auth: PushNotificationSenderAuth,
    ):
        super().__init__()
        self.agent = agent
        self.notification_sender_auth = notification_sender_auth

    ...

    async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:
        """Handles the 'send task' request."""
        validation_error = self._validate_request(request)
        if validation_error:
            return SendTaskResponse(id=request.id, error=validation_error.error)

        await self.upsert_task(request.params)

        if request.params.pushNotification:
            if not await self.set_push_notification_info(
                request.params.id, request.params.pushNotification
            ):
                return SendTaskResponse(
                    id=request.id,
                    error=InvalidParamsError(
                        message="Push notification URL is invalid"
                    ),
                )

        task = await self.update_store(
            request.params.id, TaskStatus(state=TaskState.WORKING), None
        )
        await self.send_task_notification(task)

        task_send_params: TaskSendParams = request.params
        query = self._get_user_query(task_send_params)
        try:
            agent_response = self.agent.invoke(query, task_send_params.sessionId)
        except Exception as e:
            logger.error(f"Error invoking agent: {e}")
            raise ValueError(f"Error invoking agent: {e}")
        return await self._process_agent_response(request, agent_response)

    ...

    async def _process_agent_response(
        self, request: SendTaskRequest, agent_response: dict
    ) -> SendTaskResponse:
        """Processes the agent's response and updates the task store."""
        task_send_params: TaskSendParams = request.params
        task_id = task_send_params.id
        history_length = task_send_params.historyLength
        task_status = None

        parts = [{"type": "text", "text": agent_response["content"]}]
        artifact = None
        if agent_response["require_user_input"]:
            task_status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message=Message(role="agent", parts=parts),
            )
        else:
            task_status = TaskStatus(state=TaskState.COMPLETED)
            artifact = Artifact(parts=parts)
        task = await self.update_store(
            task_id, task_status, None if artifact is None else [artifact]
        )
        task_result = self.append_task_history(task, history_length)
        await self.send_task_notification(task)
        return SendTaskResponse(id=request.id, result=task_result)

   ...

上記のコードから、受信タスクの処理(受信リクエスト メソッドが tasks/send の場合にメソッド on_send_task が実行される)で、タスクストアの更新(self.update_store メソッド呼び出し)と通知の送信(self.send_task_notification メソッド呼び出し)に関するいくつかのオペレーションが行われることがわかります。これは、A2A サーバーが同期送信タスク リクエスト全体でタスクのステータス更新と通知を管理する方法の例の 1 つです。

概要

要約すると、現在デプロイされている A2A サーバーは、次の 2 つの機能をサポートできます。

  1. /.well-known/agent.json ルートでエージェント カードを公開する
  2. メソッド tasks/send で JSON-RPC リクエストを処理する

これらの機能の開始エントリ ポイントは、main.py スクリプト(remote_seller_agents/burger_agent または remote_seller_agents/pizza_agent)で確認できます。次のコード スニペットでは、まずエージェントカードを構成してから、サーバーを起動する必要があります。

...

capabilities = AgentCapabilities(pushNotifications=True)
skill = AgentSkill(
    id="create_pizza_order",
    name="Pizza Order Creation Tool",
    description="Helps with creating pizza orders",
    tags=["pizza order creation"],
    examples=["I want to order 2 pepperoni pizzas"],
)
agent_card = AgentCard(
    name="pizza_seller_agent",
    description="Helps with creating pizza orders",
    # The URL provided here is for the sake of demo,
    # in production you should use a proper domain name
    url=f"http://{host}:{port}/",
    version="1.0.0",
    authentication=AgentAuthentication(schemes=["Bearer"]),
    defaultInputModes=PizzaSellerAgent.SUPPORTED_CONTENT_TYPES,
    defaultOutputModes=PizzaSellerAgent.SUPPORTED_CONTENT_TYPES,
    capabilities=capabilities,
    skills=[skill],
)

...

server = A2AServer(
    agent_card=agent_card,
    task_manager=AgentTaskManager(
        agent=PizzaSellerAgent(),
        notification_sender_auth=notification_sender_auth,
    ),
    host=host,
    port=port,
    api_key=os.environ.get("API_KEY"),
)

...

logger.info(f"Starting server on {host}:{port}")
server.start()

...

5. Purchasing Concierge - A2A クライアントを Cloud Run にデプロイする

このステップでは、購入コンシェルジュ エージェントをデプロイします。このエージェントが、Google がやり取りするエージェントです。

857aa91382185439.png

購入コンシェルジュ エージェントのソースコードは、purchasing_concierge ディレクトリにあります。エージェントの初期化は、purchasing_agent.py スクリプトで確認できます。初期化されたエージェントのコード スニペットは次のとおりです。

from google.adk import Agent

...

def create_agent(self) -> Agent:
    return Agent(
        model="gemini-2.0-flash-001",
        name="purchasing_agent",
        instruction=self.root_instruction,
        before_model_callback=self.before_model_callback,
        description=(
            "This purchasing agent orchestrates the decomposition of the user purchase request into"
            " tasks that can be performed by the seller agents."
        ),
        tools=[
            self.list_remote_agents,
            self.send_task,
        ],
    )

...

次に、.env 変数を準備する必要があります。.env.example.env ファイルにコピーします。

cp purchasing_concierge/.env.example purchasing_concierge/.env

purchasing_concierge/.env ファイルを開くと、次の内容が表示されます。

PIZZA_SELLER_AGENT_AUTH=pizza123
PIZZA_SELLER_AGENT_URL=http://localhost:10000
BURGER_SELLER_AGENT_AUTH=burgeruser123:burgerpass123
BURGER_SELLER_AGENT_URL=http://localhost:10001
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT={your-project-id}
GOOGLE_CLOUD_LOCATION=us-central1

このエージェントはハンバーガー エージェントとピザ エージェントとやり取りするため、両方のエージェントに対して適切な認証情報を指定する必要があります。次に、GCLOUD_PROJECT_ID 変数を、現在アクティブなプロジェクト ID に更新します。

また、PIZZA_SELLER_AGENT_URLBURGER_SELLER_AGENT_URL を、前の手順で取得した Cloud Run URL に更新する必要があります。忘れてしまった場合は、Cloud Run コンソールにアクセスします。コンソールの上部にある検索バーに「Cloud Run」と入力し、Cloud Run アイコンを右クリックして新しいタブで開きます。

1adde569bb345b48.png

以前にデプロイしたリモート販売者エージェント サービスが次のように表示されます。

179e55cc095723a8.png

これらのサービスの公開 URL を表示するには、いずれかのサービスをクリックします。サービス詳細ページにリダイレクトされます。URL は、上部領域の [地域] 情報のすぐ横に表示されます。

64c01403a92b1107.png

この URL の値をコピーして、PIZZA_SELLER_AGENT_URLBURGER_SELLER_AGENT_URL にそれぞれ貼り付けます。

最終的な環境変数は次のようになります。

PIZZA_SELLER_AGENT_AUTH=pizza123
PIZZA_SELLER_AGENT_URL=https://pizza-agent-xxxxx.us-central1.run.app
BURGER_SELLER_AGENT_AUTH=burgeruser123:burgerpass123
BURGER_SELLER_AGENT_URL=https://burger-agent-xxxxx.us-central1.run.app
GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT={your-project-id}
GOOGLE_CLOUD_LOCATION=us-central1

これで、購入コンシェルジュ エージェントをデプロイする準備が整いました。このエージェントをデプロイするには、次のコマンドを実行します。

gcloud run deploy purchasing-concierge \
           --source . \
           --port=8080 \
           --allow-unauthenticated \
           --min 1 \
           --region us-central1 \
           --memory 1024Mi

デプロイが正常に完了すると、次のようなログが表示されます。

Service [purchasing-concierge] revision [purchasing-concierge-xxxxx-xxx] has been deployed and is serving 100 percent of traffic.
Service URL: https://purchasing-concierge-xxxxxx.us-central1.run.app

ここでの xxxx 部分は、サービスをデプロイする際の一意の識別子になります。

これで、UI から購入コンシェルジュ エージェントとやり取りできるようになりました。サービスの URL にアクセスすると、次のような Gradio ウェブ インターフェースが表示されます。

3d705381f9219bf3.png

次に、A2A クライアントのコア コンポーネントと一般的なフローについて説明します。

6. A2A クライアントのコア コンポーネント

73dae827aa9bddc3.png

上の画像は、A2A インタラクションの一般的なフローを示しています。

  1. クライアントは、ルート /.well-known/agent.json で指定されたリモート エージェントの URL で、公開されているエージェント カードを探します。
  2. 必要に応じて、メッセージと必要なメタデータ パラメータ(セッション ID、過去のコンテキストなど)を含むタスクをそのエージェントに送信します。
  3. A2A サーバーはリクエストを認証して処理します。サーバーがプッシュ通知をサポートしている場合は、タスク処理中に一部の通知を公開しようとします。
  4. 完了すると、A2A サーバーはレスポンス アーティファクトをクライアントに返します。

上記のインタラクションのコア オブジェクトは次の項目です(詳細はこちらをご覧ください)。

  • タスク: クライアントとリモート エージェントが特定の結果を達成して結果を生成するステートフル エンティティ
  • アーティファクト: タスクの最終結果
  • メッセージ: アーティファクトではないコンテンツ。例: エージェントの考え、ユーザーのコンテキスト、手順、エラー、ステータス、メタデータ
  • パート: メッセージまたはアーティファクトの一部として、クライアントとリモート エージェント間でやり取りされる完全なコンテンツ。パートは、テキスト、画像、動画、ファイルなどです。
  • プッシュ通知(省略可): エージェントが接続されたセッション外でクライアントに更新を通知できる安全な通知メカニズム

カードの検出

A2A クライアント サービスが起動されると、通常はエージェント カード情報を取得して保存し、必要に応じて簡単にアクセスできるようにします。purchasing_concierge/purchasing_agent.py スクリプトで確認できます。

...

class PurchasingAgent:
    """The purchasing agent.

    This is the agent responsible for choosing which remote seller agents to send
    tasks to and coordinate their work.
    """

    def __init__(
        self,
        remote_agent_addresses: List[str],
        task_callback: TaskUpdateCallback | None = None,
    ):
        self.task_callback = task_callback
        self.remote_agent_connections: dict[str, RemoteAgentConnections] = {}
        self.cards: dict[str, AgentCard] = {}
        for address in remote_agent_addresses:
            card_resolver = A2ACardResolver(address)
            try:
                card = card_resolver.get_agent_card()
                # The URL accessed here should be the same as the one provided in the agent card
                # However, in this demo we are using the URL provided in the key arguments
                remote_connection = RemoteAgentConnections(
                    agent_card=card, agent_url=address
                )
                self.remote_agent_connections[card.name] = remote_connection
                self.cards[card.name] = card
            except httpx.ConnectError:
                print(f"ERROR: Failed to get agent card from : {address}")
        agent_info = []
        for ra in self.list_remote_agents():
            agent_info.append(json.dumps(ra))
        self.agents = "\n".join(agent_info)

...

プロンプトと送信タスク ツール

購入コンシェルジュ システムのプロンプトでリモート エージェントにコンテキストを提供し、エージェントにタスクを送信するためのツールも提供します。以下は、ADK エージェント向けに提供しているプロンプトとツールです。

...

def root_instruction(self, context: ReadonlyContext) -> str:
    current_agent = self.check_active_agent(context)
    return f"""You are an expert purchasing delegator that can delegate the user product inquiry and purchase request to the
appropriate seller remote agents.

Execution:
- For actionable tasks, you can use `send_task` to assign tasks to remote agents to perform.
- When the remote agent is repeatedly asking for user confirmation, assume that the remote agent doesn't have access to user's conversation context. 
So improve the task description to include all the necessary information related to that agent
- Never ask user permission when you want to connect with remote agents. If you need to make connection with multiple remote agents, directly
connect with them without asking user permission or asking user preference
- Always show the detailed response information from the seller agent and propagate it properly to the user. 
- If the remote seller is asking for confirmation, rely the confirmation question to the user if the user haven't do so. 
- If the user already confirmed the related order in the past conversation history, you can confirm on behalf of the user
- Do not give irrelevant context to remote seller agent. For example, ordered pizza item is not relevant for the burger seller agent
- Never ask order confirmation to the remote seller agent 

Please rely on tools to address the request, and don't make up the response. If you are not sure, please ask the user for more details.
Focus on the most recent parts of the conversation primarily.

If there is an active agent, send the request to that agent with the update task tool.

Agents:
{self.agents}

Current active seller agent: {current_agent["active_agent"]}
"""

...

async def send_task(self, agent_name: str, task: str, tool_context: ToolContext):
    """Sends a task to remote seller agent

    This will send a message to the remote agent named agent_name.

    Args:
        agent_name: The name of the agent to send the task to.
        task: The comprehensive conversation context summary
            and goal to be achieved regarding user inquiry and purchase request.
        tool_context: The tool context this method runs in.

    Yields:
        A dictionary of JSON data.
    """
    if agent_name not in self.remote_agent_connections:
        raise ValueError(f"Agent {agent_name} not found")
    state = tool_context.state
    state["active_agent"] = agent_name
    client = self.remote_agent_connections[agent_name]
    if not client:
        raise ValueError(f"Client not available for {agent_name}")
    if "task_id" in state:
        taskId = state["task_id"]
    else:
        taskId = str(uuid.uuid4())
    sessionId = state["session_id"]
    task: Task
    messageId = ""
    metadata = {}
    if "input_message_metadata" in state:
        metadata.update(**state["input_message_metadata"])
        if "message_id" in state["input_message_metadata"]:
            messageId = state["input_message_metadata"]["message_id"]
    if not messageId:
        messageId = str(uuid.uuid4())
    metadata.update(**{"conversation_id": sessionId, "message_id": messageId})
    request: TaskSendParams = TaskSendParams(
        id=taskId,
        sessionId=sessionId,
        message=Message(
            role="user",
            parts=[TextPart(text=task)],
            metadata=metadata,
        ),
        acceptedOutputModes=["text", "text/plain"],
        # pushNotification=None,
        metadata={"conversation_id": sessionId},
    )
    task = await client.send_task(request, self.task_callback)
    # Assume completion unless a state returns that isn't complete
    state["session_active"] = task.status.state not in [
        TaskState.COMPLETED,
        TaskState.CANCELED,
        TaskState.FAILED,
        TaskState.UNKNOWN,
    ]
    if task.status.state == TaskState.INPUT_REQUIRED:
        # Force user input back
        tool_context.actions.escalate = True
    elif task.status.state == TaskState.COMPLETED:
        # Reset active agent is task is completed
        state["active_agent"] = "None"

    response = []
    if task.status.message:
        # Assume the information is in the task message.
        response.extend(convert_parts(task.status.message.parts, tool_context))
    if task.artifacts:
        for artifact in task.artifacts:
            response.extend(convert_parts(artifact.parts, tool_context))
    return response

...

プロンプトでは、利用可能なリモート エージェントの名前と説明をすべて購入コンシェルジュ エージェントに提供します。ツール self.send_task では、エージェントに接続する適切なクライアントを取得し、TaskSendParams オブジェクトを使用して必要なメタデータを送信するメカニズムを提供します。

このツールでは、タスクが完了しなかった場合にエージェントがどのように動作するかを指定することもできます。最後に、タスクの完了時に返されるレスポンス アーティファクトを処理する必要があります。

7. 統合テストとペイロード検査

次に、次の会話を試して、購入コンシェルジュの UI とサービスログを調べてみましょう。次のような会話を試してみましょう。

  • ハンバーガーとピザのメニューを表示
  • バーベキュー チキン ピザ 1 つとスパイシー ケイジャン バーガー 1 つを注文したいのですが

注文が完了するまで会話を続けます。インタラクションの進行状況、ツールの呼び出しとレスポンスを確認します。次の画像は、インタラクションの結果の例です。

b06863bd746b4d1c.png

f550a0e65ac17fca.png

5dea2fba956548b1.png

2da5d77fefc37cb9.png

2 人のエージェントとやり取りすると、2 つの異なる動作が発生しますが、A2A はこれを適切に処理できます。ハンバーガー販売者のエージェントは、Google の購入エージェントのリクエストを直接承認しますが、ピザ販売者のエージェントは、リクエストを進めるために Google の確認が必要であり、Google がエージェントが確認を信頼できることを確認した後に、その確認をピザ販売者のエージェントに伝えることができます。

次に、purchasing-agent サービスログで交換されたデータを確認します。まず、Cloud Run コンソールに移動し、コンソールの上部にある検索バーに「Cloud Run」と入力して、Cloud Run アイコンを右クリックし、新しいブラウザタブで開きます

1adde569bb345b48.png

次のように、以前にデプロイしたサービスが表示されます。[purchasing-concierge] をクリックします。

7d8a0b9a227e4372.png

サービスの詳細ページが表示されたら、[ログ] タブをクリックします。

7a45204e086ea264.png

デプロイされた purchasing-concierge サービスのログが表示されます。下にスクロールして、Google とのやり取りに関する最新のログを探します。

2f223615795c19e5.png

6226653668a0f83b.png

A2A クライアントとサーバー間のリクエストとレスポンスは JSON-RPC 形式で、A2A 標準に準拠しています。

これで A2A の基本コンセプトが終わりました。次は、クライアントとサーバーのアーキテクチャとしてどのように実装されるかを見てみましょう。

8. クリーンアップ

この Codelab で使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の操作を行います。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
  4. または、コンソールの [Cloud Run] に移動し、デプロイしたサービスを選択して削除することもできます。