🔌 Anthropic's Model Context Protocol (MCP) - Complete Guide
1. Core Architecture & Installation
Installation & Setup
# bash
# Python SDK
pip install mcp
# TypeScript SDK
npm install @modelcontextprotocol/sdk
# Development tools
pip install mcp-server-stdio mcp-server-http
Basic Project Structure
my-mcp-project/
├── server/
│ ├── __init__.py
│ ├── main.py
│ └── sources/
│ ├── database.py
│ ├── filesystem.py
│ └── api.py
├── client/
│ └── test_client.py
└── config/
└── server_config.json
2. MCP Server Implementation
Basic Server Setup (Python)
# server/main.py
import asyncio
import logging
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource, Tool, TextContent, ImageContent, EmbeddedResource
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create server instance
server = Server("my-mcp-server")
@server.list_resources()
async def handle_list_resources() -> list[Resource]:
"""List available resources"""
return [
Resource(
uri="file://documents/readme.txt",
name="README",
description="Project documentation",
mimeType="text/plain"
),
Resource(
uri="db://users",
name="Users Database",
description="User management database",
mimeType="application/json"
)
]
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read resource content"""
if uri == "file://documents/readme.txt":
return "# Project Documentation\nThis is a sample MCP server."
elif uri == "db://users":
return '{"users": [{"id": 1, "name": "John"}, {"id": 2, "name": "Jane"}]}'
else:
raise ValueError(f"Unknown resource: {uri}")
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available tools"""
return [
Tool(
name="calculate",
description="Perform mathematical calculations",
inputSchema={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate"
}
},
"required": ["expression"]
}
),
Tool(
name="search_files",
description="Search for files in the filesystem",
inputSchema={
"type": "object",
"properties": {
"pattern": {"type": "string"},
"directory": {"type": "string", "default": "."}
},
"required": ["pattern"]
}
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Execute tool calls"""
if name == "calculate":
try:
result = eval(arguments["expression"])
return [TextContent(type="text", text=f"Result: {result}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
elif name == "search_files":
import glob
import os
pattern = arguments["pattern"]
directory = arguments.get("directory", ".")
search_path = os.path.join(directory, pattern)
files = glob.glob(search_path, recursive=True)
if files:
file_list = "\n".join(files)
return [TextContent(type="text", text=f"Found files:\n{file_list}")]
else:
return [TextContent(type="text", text="No files found matching pattern")]
else:
raise ValueError(f"Unknown tool: {name}")
async def main():
# Run server with stdio transport
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="my-mcp-server",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=None,
experimental_capabilities={}
)
)
)
if __name__ == "__main__":
asyncio.run(main())
Advanced Server with Database Integration
# server/sources/database.py
import asyncio
import sqlite3
from typing import List, Dict, Any
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
class DatabaseMCPServer:
def __init__(self, db_path: str):
self.db_path = db_path
self.server = Server("database-mcp-server")
self._setup_handlers()
self._init_database()
def _init_database(self):
"""Initialize SQLite database with sample data"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price REAL,
category TEXT,
stock INTEGER DEFAULT 0
)
''')
# Insert sample data
cursor.executemany('''
INSERT OR IGNORE INTO users (name, email) VALUES (?, ?)
''', [
("Alice Johnson", "alice@example.com"),
("Bob Smith", "bob@example.com"),
("Carol Davis", "carol@example.com")
])
cursor.executemany('''
INSERT OR IGNORE INTO products (name, price, category, stock) VALUES (?, ?, ?, ?)
''', [
("Laptop", 999.99, "Electronics", 10),
("Mouse", 29.99, "Electronics", 50),
("Desk Chair", 199.99, "Furniture", 5)
])
conn.commit()
conn.close()
def _setup_handlers(self):
@self.server.list_resources()
async def list_resources() -> List[Resource]:
return [
Resource(
uri="sqlite://users",
name="Users Table",
description="User management data",
mimeType="application/json"
),
Resource(
uri="sqlite://products",
name="Products Table",
description="Product catalog data",
mimeType="application/json"
)
]
@self.server.read_resource()
async def read_resource(uri: str) -> str:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
if uri == "sqlite://users":
cursor.execute("SELECT * FROM users")
rows = [dict(row) for row in cursor.fetchall()]
return json.dumps({"users": rows}, indent=2)
elif uri == "sqlite://products":
cursor.execute("SELECT * FROM products")
rows = [dict(row) for row in cursor.fetchall()]
return json.dumps({"products": rows}, indent=2)
else:
raise ValueError(f"Unknown resource: {uri}")
finally:
conn.close()
@self.server.list_tools()
async def list_tools() -> List[Tool]:
return [
Tool(
name="query_database",
description="Execute SQL queries on the database",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL query to execute"
},
"params": {
"type": "array",
"description": "Query parameters",
"items": {"type": "string"}
}
},
"required": ["query"]
}
),
Tool(
name="add_user",
description="Add a new user to the database",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string"}
},
"required": ["name", "email"]
}
)
]
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
if name == "query_database":
query = arguments["query"]
params = arguments.get("params", [])
cursor.execute(query, params)
if query.strip().upper().startswith("SELECT"):
rows = [dict(row) for row in cursor.fetchall()]
result = json.dumps(rows, indent=2)
else:
conn.commit()
result = f"Query executed successfully. Rows affected: {cursor.rowcount}"
return [TextContent(type="text", text=result)]
elif name == "add_user":
name_val = arguments["name"]
email = arguments["email"]
cursor.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name_val, email)
)
conn.commit()
return [TextContent(
type="text",
text=f"User '{name_val}' added successfully with ID {cursor.lastrowid}"
)]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
finally:
conn.close()
HTTP Server Implementation
# server/http_server.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from mcp.server.fastapi import MCPServer
from mcp.types import *
import json
app = FastAPI(title="MCP HTTP Server", version="1.0.0")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create MCP server instance
mcp_server = MCPServer("http-mcp-server")
# File system resource handler
@mcp_server.list_resources()
async def list_resources() -> List[Resource]:
import os
resources = []
for root, dirs, files in os.walk("./data"):
for file in files:
file_path = os.path.join(root, file)
resources.append(Resource(
uri=f"file://{file_path}",
name=file,
description=f"File: {file_path}",
mimeType="text/plain" if file.endswith('.txt') else "application/octet-stream"
))
return resources
@mcp_server.read_resource()
async def read_resource(uri: str) -> str:
if uri.startswith("file://"):
file_path = uri[7:] # Remove "file://" prefix
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
else:
raise HTTPException(status_code=400, detail="Invalid URI scheme")
# Web scraping tool
@mcp_server.list_tools()
async def list_tools() -> List[Tool]:
return [
Tool(
name="web_scrape",
description="Scrape content from a web page",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL to scrape"
},
"selector": {
"type": "string",
"description": "CSS selector for content extraction",
"default": "body"
}
},
"required": ["url"]
}
),
Tool(
name="json_query",
description="Query JSON data using JSONPath",
inputSchema={
"type": "object",
"properties": {
"data": {"type": "string", "description": "JSON data"},
"query": {"type": "string", "description": "JSONPath query"}
},
"required": ["data", "query"]
}
)
]
@mcp_server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
if name == "web_scrape":
import requests
from bs4 import BeautifulSoup
url = arguments["url"]
selector = arguments.get("selector", "body")
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
elements = soup.select(selector)
content = "\n".join([elem.get_text(strip=True) for elem in elements])
return [TextContent(
type="text",
text=f"Scraped content from {url}:\n\n{content[:2000]}..."
)]
except Exception as e:
return [TextContent(type="text", text=f"Error scraping {url}: {str(e)}")]
elif name == "json_query":
import jsonpath_ng
try:
data = json.loads(arguments["data"])
query = arguments["query"]
jsonpath_expr = jsonpath_ng.parse(query)
matches = [match.value for match in jsonpath_expr.find(data)]
result = json.dumps(matches, indent=2)
return [TextContent(type="text", text=f"Query results:\n{result}")]
except Exception as e:
return [TextContent(type="text", text=f"Error: {str(e)}")]
else:
raise HTTPException(status_code=400, detail=f"Unknown tool: {name}")
# Mount MCP server to FastAPI
app.mount("/mcp", mcp_server.create_app())
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy", "server": "MCP HTTP Server"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
```
3. MCP Client Implementation
Basic Client Usage
# client/basic_client.py
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
# Connect to MCP server via stdio
server_params = StdioServerParameters(
command="python",
args=["server/main.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
# List available resources
resources = await session.list_resources()
print("Available resources:")
for resource in resources.resources:
print(f" - {resource.name}: {resource.uri}")
# Read a specific resource
if resources.resources:
content = await session.read_resource(resources.resources[0].uri)
print(f"\nResource content:\n{content.contents[0].text}")
# List available tools
tools = await session.list_tools()
print(f"\nAvailable tools:")
for tool in tools.tools:
print(f" - {tool.name}: {tool.description}")
# Call a tool
if tools.tools:
result = await session.call_tool(
tools.tools[0].name,
{"expression": "2 + 2 * 3"}
)
print(f"\nTool result: {result.content[0].text}")
if __name__ == "__main__":
asyncio.run(main())
HTTP Client Implementation
# client/http_client.py
import asyncio
import aiohttp
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
class MCPHTTPClient:
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
self.session = None
async def __aenter__(self):
self.http_session = aiohttp.ClientSession()
# Connect via Server-Sent Events
async with sse_client(f"{self.base_url}/mcp/sse") as (read, write):
self.session = ClientSession(read, write)
await self.session.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.http_session:
await self.http_session.close()
async def list_resources(self):
return await self.session.list_resources()
async def read_resource(self, uri: str):
return await self.session.read_resource(uri)
async def list_tools(self):
return await self.session.list_tools()
async def call_tool(self, name: str, arguments: dict):
return await self.session.call_tool(name, arguments)
# Usage example
async def test_http_client():
async with MCPHTTPClient("http://localhost:8000") as client:
# List and read resources
resources = await client.list_resources()
for resource in resources.resources:
print(f"Resource: {resource.name}")
content = await client.read_resource(resource.uri)
print(f"Content preview: {content.contents[0].text[:100]}...")
# Use tools
tools = await client.list_tools()
for tool in tools.tools:
print(f"Tool: {tool.name} - {tool.description}")
# Web scraping example
result = await client.call_tool("web_scrape", {
"url": "https://httpbin.org/json",
"selector": "body"
})
print(f"Scrape result: {result.content[0].text}")
if __name__ == "__main__":
asyncio.run(test_http_client())
4. Advanced MCP Features
Prompts and Templates
# server/prompts.py
from mcp.server import Server
from mcp.types import Prompt, PromptArgument, PromptMessage, TextContent
server = Server("prompt-server")
@server.list_prompts()
async def list_prompts() -> List[Prompt]:
return [
Prompt(
name="code_review",
description="Generate a code review for the given code",
arguments=[
PromptArgument(
name="code",
description="The code to review",
required=True
),
PromptArgument(
name="language",
description="Programming language",
required=False
)
]
),
Prompt(
name="summarize_data",
description="Summarize structured data",
arguments=[
PromptArgument(
name="data",
description="JSON data to summarize",
required=True
),
PromptArgument(
name="focus",
description="What to focus on in the summary",
required=False
)
]
)
]
@server.get_prompt()
async def get_prompt(name: str, arguments: Dict[str, str]) -> PromptMessage:
if name == "code_review":
code = arguments["code"]
language = arguments.get("language", "unknown")
prompt_text = f"""Please review the following {language} code:
```{language}
{code}
```
Focus on:
1. Code quality and best practices
2. Potential bugs or issues
3. Performance considerations
4. Readability and maintainability
5. Security concerns
Provide specific suggestions for improvement."""
return PromptMessage(
role="user",
content=TextContent(type="text", text=prompt_text)
)
elif name == "summarize_data":
data = arguments["data"]
focus = arguments.get("focus", "key insights")
prompt_text = f"""Analyze and summarize the following data, focusing on {focus}:
{data}
Please provide:
1. Key statistics and trends
2. Notable patterns or anomalies
3. Main insights and takeaways
4. Recommendations based on the data"""
return PromptMessage(
role="user",
content=TextContent(type="text", text=prompt_text)
)
else:
raise ValueError(f"Unknown prompt: {name}")
Notifications and Subscriptions
# server/notifications.py
import asyncio
from typing import Dict, Set
from mcp.server import Server, NotificationOptions
from mcp.types import *
class NotificationMCPServer:
def __init__(self):
self.server = Server("notification-server")
self.subscribers: Dict[str, Set[str]] = {}
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_resources()
async def list_resources() -> List[Resource]:
return [
Resource(
uri="notifications://system",
name="System Notifications",
description="Real-time system notifications"
)
]
@self.server.subscribe()
async def handle_subscribe(uri: str) -> None:
if uri not in self.subscribers:
self.subscribers[uri] = set()
# In a real implementation, you'd track the client session
print(f"Client subscribed to {uri}")
@self.server.unsubscribe()
async def handle_unsubscribe(uri: str) -> None:
if uri in self.subscribers:
# Remove client from subscribers
print(f"Client unsubscribed from {uri}")
async def send_notification(self, uri: str, content: str):
"""Send notification to all subscribers of a URI"""
if uri in self.subscribers and self.subscribers[uri]:
notification = ResourceUpdatedNotification(
uri=uri,
content=TextContent(type="text", text=content)
)
# In a real implementation, send to all subscribed clients
print(f"Notification sent for {uri}: {content}")
# Usage in a monitoring system
async def monitoring_example():
server = NotificationMCPServer()
# Simulate system events
while True:
await asyncio.sleep(5)
await server.send_notification(
"notifications://system",
f"System check at {asyncio.get_event_loop().time()}: All systems operational"
)
Error Handling and Logging
# server/error_handling.py
import logging
from typing import Any, Dict, List
from mcp.server import Server
from mcp.types import *
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class RobustMCPServer:
def __init__(self):
self.server = Server("robust-server")
self._setup_handlers()
def _setup_handlers(self):
@self.server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
logger.info(f"Tool called: {name} with args: {arguments}")
try:
if name == "risky_operation":
# Validate input
if "data" not in arguments:
raise ValueError("Missing required parameter: data")
data = arguments["data"]
if not isinstance(data, str) or len(data) == 0:
raise ValueError("Data must be a non-empty string")
# Simulate risky operation
if "error" in data.lower():
raise RuntimeError("Simulated operation failure")
result = f"Processed: {data.upper()}"
logger.info(f"Tool {name} completed successfully")
return [TextContent(type="text", text=result)]
else:
raise ValueError(f"Unknown tool: {name}")
except ValueError as e:
logger.warning(f"Validation error in tool {name}: {str(e)}")
return [TextContent(
type="text",
text=f"Validation Error: {str(e)}"
)]
except RuntimeError as e:
logger.error(f"Runtime error in tool {name}: {str(e)}")
return [TextContent(
type="text",
text=f"Operation Failed: {str(e)}"
)]
except Exception as e:
logger.exception(f"Unexpected error in tool {name}")
return [TextContent(
type="text",
text=f"Internal Error: An unexpected error occurred"
)]
@self.server.read_resource()
async def read_resource(uri: str) -> str:
logger.info(f"Resource requested: {uri}")
try:
# Validate URI format
if not uri.startswith(("file://", "http://", "https://")):
raise ValueError(f"Unsupported URI scheme: {uri}")
if uri.startswith("file://"):
file_path = uri[7:]
# Security check - prevent directory traversal
if ".." in file_path or file_path.startswith("/"):
raise PermissionError("Access denied: Invalid file path")
with open(file_path, 'r') as f:
content = f.read()
logger.info(f"File read successfully: {file_path}")
return content
else:
raise NotImplementedError(f"URI scheme not implemented: {uri}")
except FileNotFoundError:
logger.warning(f"File not found: {uri}")
raise
except PermissionError as e:
logger.warning(f"Permission denied: {uri} - {str(e)}")
raise
except Exception as e:
logger.exception(f"Error reading resource: {uri}")
raise RuntimeError(f"Failed to read resource: {str(e)}")
🤖 Google's Agent2Agent (A2A) Protocol - Complete Guide
1. Core Architecture & Installation
Installation & Setup
#bash
# Clone the official repository
git clone https://github.com/google/A2A.git
cd A2A
# Install Python SDK
pip install --upgrade a2a-sdk
# Alternative: Install from source
git clone https://github.com/google/a2a-python.git
cd a2a-python
pip install -e .
Project Structure
my-a2a-project/
├── agents/
│ ├── currency_agent/
│ │ ├── __init__.py
│ │ ├── agent.py
│ │ ├── executor.py
│ │ └── tools.py
│ ├── weather_agent/
│ └── orchestrator/
├── shared/
│ ├── types.py
│ └── utils.py
├── tests/
└── config/
└── agent_configs.json
2. Agent Card & Skills Definition
Comprehensive Agent Card
# agents/currency_agent/agent.py
from a2a.types import (
AgentCard, AgentSkill, AgentCapabilities, AgentAuthentication,
AgentContact, AgentLicense, InputMode, OutputMode
)
def create_currency_agent_card() -> AgentCard:
# Define multiple skills
skills = [
AgentSkill(
id='currency_conversion',
name='Currency Conversion',
description='Convert amounts between different currencies using real-time exchange rates',
tags=['finance', 'currency', 'conversion', 'exchange-rates'],
examples=[
'Convert 100 USD to EUR',
'How much is 50 GBP in Japanese Yen?',
'What is the current USD to CAD exchange rate?'
],
inputModes=[InputMode.TEXT],
outputModes=[OutputMode.TEXT, OutputMode.JSON]
),
AgentSkill(
id='currency_history',
name='Currency Rate History',
description='Retrieve historical exchange rate data for currency pairs',
tags=['finance', 'currency', 'history', 'analytics'],
examples=[
'Show EUR/USD rates for the last 30 days',
'Historical GBP/JPY exchange rates for 2023',
'Currency trend analysis for Bitcoin'
],
inputModes=[InputMode.TEXT],
outputModes=[OutputMode.TEXT, OutputMode.JSON, OutputMode.IMAGE]
),
AgentSkill(
id='currency_alerts',
name='Currency Rate Alerts',
description='Set up alerts for currency rate changes',
tags=['finance', 'alerts', 'monitoring'],
examples=[
'Alert me when EUR/USD goes above 1.10',
'Notify when Bitcoin drops below $30,000'
],
inputModes=[InputMode.TEXT],
outputModes=[OutputMode.TEXT]
)
]
return AgentCard(
name='Advanced Currency Agent',
description='Comprehensive currency conversion and analysis agent with real-time data',
url='http://localhost:9999/',
version='2.1.0',
# Capabilities
capabilities=AgentCapabilities(
streaming=True,
pushNotifications=True,
multiTurn=True,
contextRetention=True
),
# I/O Modes
defaultInputModes=[InputMode.TEXT],
defaultOutputModes=[OutputMode.TEXT, OutputMode.JSON],
supportedInputModes=[InputMode.TEXT, InputMode.JSON],
supportedOutputModes=[InputMode.TEXT, OutputMode.JSON, OutputMode.IMAGE],
# Skills
skills=skills,
# Authentication
authentication=AgentAuthentication(
schemes=['bearer', 'api-key'],
required=False
),
# Metadata
contact=AgentContact(
name='Currency Team',
email='currency-team@example.com',
url='https://example.com/currency-agent'
),
license=AgentLicense(
name='MIT',
url='https://opensource.org/licenses/MIT'
),
# Additional metadata
tags=['finance', 'currency', 'real-time', 'analytics'],
categories=['finance', 'data-analysis'],
# Rate limiting
rateLimits={
'requests_per_minute': 100,
'requests_per_hour': 1000
}
)
Multi-Agent System Card
# agents/orchestrator/multi_agent_card.py
from typing import List
from a2a.types import *
class MultiAgentOrchestrator:
def __init__(self):
self.registered_agents: List[AgentCard] = []
def create_orchestrator_card(self) -> AgentCard:
skills = [
AgentSkill(
id='agent_discovery',
name='Agent Discovery',
description='Discover and list available agents in the system',
tags=['orchestration', 'discovery', 'system'],
examples=['List all available agents', 'Find agents with finance capabilities']
),
AgentSkill(
id='task_routing',
name='Task Routing',
description='Route tasks to appropriate specialized agents',
tags=['orchestration', 'routing', 'delegation'],
examples=['Route this currency question to the finance agent']
),
AgentSkill(
id='multi_agent_workflow',
name='Multi-Agent Workflow',
description='Coordinate complex workflows across multiple agents',
tags=['orchestration', 'workflow', 'coordination'],
examples=['Plan a trip using weather, booking, and currency agents']
)
]
return AgentCard(
name='Multi-Agent Orchestrator',
description='Coordinates and routes tasks between specialized agents',
url='http://localhost:9998/',
version='1.0.0',
capabilities=AgentCapabilities(
streaming=True,
multiTurn=True,
contextRetention=True,
pushNotifications=True
),
skills=skills,
defaultInputModes=[InputMode.TEXT],
defaultOutputModes=[OutputMode.TEXT, OutputMode.JSON],
authentication=AgentAuthentication(schemes=['public'])
)
def register_agent(self, agent_card: AgentCard):
"""Register an agent with the orchestrator"""
self.registered_agents.append(agent_card)
def find_agents_by_capability(self, capability: str) -> List[AgentCard]:
"""Find agents that have specific capabilities"""
matching_agents = []
for agent in self.registered_agents:
for skill in agent.skills:
if capability.lower() in [tag.lower() for tag in skill.tags]:
matching_agents.append(agent)
break
return matching_agents
3. Advanced Agent Executor Implementation
Comprehensive Agent Executor
# agents/currency_agent/executor.py
import asyncio
import json
import logging
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_agent_text_message, new_agent_json_message
from a2a.types import Message, MessagePart, TextPart, JSONPart
logger = logging.getLogger(__name__)
class CurrencyAgentExecutor(AgentExecutor):
def __init__(self):
self.exchange_api_key = "your-api-key-here"
self.base_url = "https://api.exchangerate-api.com/v4/latest"
self.active_tasks: Dict[str, bool] = {}
self.user_sessions: Dict[str, Dict[str, Any]] = {}
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Main execution method for handling requests"""
task_id = context.task_id
self.active_tasks[task_id] = True
try:
# Extract user message
message = context.request.params.message
user_input = self._extract_text_from_message(message)
user_id = message.messageId
# Initialize user session if needed
if user_id not in self.user_sessions:
self.user_sessions[user_id] = {
'conversation_history': [],
'preferences': {},
'alerts': []
}
# Add to conversation history
self.user_sessions[user_id]['conversation_history'].append({
'timestamp': datetime.now().isoformat(),
'user_input': user_input,
'type': 'user'
})
# Send initial acknowledgment (streaming)
await self._send_status_update(
event_queue,
"Processing your currency request...",
"working"
)
# Determine intent and route to appropriate handler
intent = await self._classify_intent(user_input)
if intent == 'currency_conversion':
await self._handle_currency_conversion(user_input, event_queue, context)
elif intent == 'currency_history':
await self._handle_currency_history(user_input, event_queue, context)
elif intent == 'currency_alerts':
await self._handle_currency_alerts(user_input, event_queue, context, user_id)
elif intent == 'help':
await self._handle_help_request(event_queue)
else:
await self._handle_unknown_request(user_input, event_queue)
except Exception as e:
logger.exception(f"Error in task {task_id}")
await self._send_error_message(event_queue, str(e))
finally:
self.active_tasks.pop(task_id, None)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Handle task cancellation"""
task_id = context.task_id
if task_id in self.active_tasks:
self.active_tasks[task_id] = False
await self._send_status_update(
event_queue,
"Task cancelled by user",
"cancelled"
)
def _extract_text_from_message(self, message: Message) -> str:
"""Extract text content from message parts"""
text_parts = []
for part in message.parts:
if isinstance(part, TextPart):
text_parts.append(part.text)
return " ".join(text_parts)
async def _classify_intent(self, user_input: str) -> str:
"""Classify user intent using simple keyword matching"""
user_input_lower = user_input.lower()
conversion_keywords = ['convert', 'exchange', 'how much', 'rate', 'usd', 'eur', 'gbp']
history_keywords = ['history', 'historical', 'trend', 'chart', 'past', 'last month']
alert_keywords = ['alert', 'notify', 'notification', 'watch', 'monitor']
help_keywords = ['help', 'what can you do', 'capabilities', 'commands']
if any(keyword in user_input_lower for keyword in conversion_keywords):
return 'currency_conversion'
elif any(keyword in user_input_lower for keyword in history_keywords):
return 'currency_history'
elif any(keyword in user_input_lower for keyword in alert_keywords):
return 'currency_alerts'
elif any(keyword in user_input_lower for keyword in help_keywords):
return 'help'
else:
return 'unknown'
async def _handle_currency_conversion(self, user_input: str, event_queue: EventQueue, context: RequestContext):
"""Handle currency conversion requests"""
# Parse conversion request
conversion_data = await self._parse_conversion_request(user_input)
if not conversion_data:
await self._send_text_message(
event_queue,
"I couldn't understand the conversion request. Please specify amount, from currency, and to currency. Example: 'Convert 100 USD to EUR'"
)
return
amount = conversion_data['amount']
from_currency = conversion_data['from_currency']
to_currency = conversion_data['to_currency']
# Send status update
await self._send_status_update(
event_queue,
f"Converting {amount} {from_currency} to {to_currency}...",
"working"
)
# Fetch exchange rate
try:
rate_data = await self._fetch_exchange_rate(from_currency, to_currency)
converted_amount = amount * rate_data['rate']
# Prepare response
response_text = f"{amount} {from_currency} = {converted_amount:.2f} {to_currency}\n"
response_text += f"Exchange rate: 1 {from_currency} = {rate_data['rate']:.4f} {to_currency}\n"
response_text += f"Last updated: {rate_data['timestamp']}"
# Send both text and JSON responses
await self._send_text_message(event_queue, response_text)
json_response = {
"conversion": {
"amount": amount,
"from_currency": from_currency,
"to_currency": to_currency,
"converted_amount": round(converted_amount, 2),
"exchange_rate": rate_data['rate'],
"timestamp": rate_data['timestamp']
}
}
await self._send_json_message(event_queue, json_response)
except Exception as e:
await self._send_error_message(event_queue, f"Failed to fetch exchange rate: {str(e)}")
async def _handle_currency_history(self, user_input: str, event_queue: EventQueue, context: RequestContext):
"""Handle historical currency data requests"""
await self._send_status_update(
event_queue,
"Fetching historical currency data...",
"working"
)
# Simulate historical data fetching
await asyncio.sleep(2) # Simulate API call delay
# Mock historical data
historical_data = {
"currency_pair": "EUR/USD",
"period": "30 days",
"data": [
{"date": "2024-01-01", "rate": 1.1045},
{"date": "2024-01-02", "rate": 1.1052},
{"date": "2024-01-03", "rate": 1.1038},
# ... more data points
],
"summary": {
"average_rate": 1.1042,
"min_rate": 1.0998,
"max_rate": 1.1087,
"volatility": 0.0089
}
}
response_text = f"Historical data for {historical_data['currency_pair']} over {historical_data['period']}:\n"
response_text += f"Average rate: {historical_data['summary']['average_rate']:.4f}\n"
response_text += f"Range: {historical_data['summary']['min_rate']:.4f} - {historical_data['summary']['max_rate']:.4f}\n"
response_text += f"Volatility: {historical_data['summary']['volatility']:.4f}"
await self._send_text_message(event_queue, response_text)
await self._send_json_message(event_queue, historical_data)
async def _handle_currency_alerts(self, user_input: str, event_queue: EventQueue, context: RequestContext, user_id: str):
"""Handle currency alert setup"""
await self._send_status_update(
event_queue,
"Setting up currency alert...",
"working"
)
# Parse alert request (simplified)
alert_config = {
"currency_pair": "EUR/USD",
"condition": "above",
"threshold": 1.10,
"created_at": datetime.now().isoformat()
}
# Store alert in user session
self.user_sessions[user_id]['alerts'].append(alert_config)
response_text = f"Alert set up successfully!\n"
response_text += f"You'll be notified when {alert_config['currency_pair']} goes {alert_config['condition']} {alert_config['threshold']}"
await self._send_text_message(event_queue, response_text)
async def _handle_help_request(self, event_queue: EventQueue):
"""Handle help requests"""
help_text = """Currency Agent Capabilities:
1. **Currency Conversion**
- Convert 100 USD to EUR
- How much is 50 GBP in JPY?
- Current USD to CAD rate
2. **Historical Data**
- Show EUR/USD rates for last 30 days
- Historical GBP/JPY for 2023
- Currency trend analysis
3. **Rate Alerts**
- Alert me when EUR/USD goes above 1.10
- Notify when Bitcoin drops below $30,000
4. **Supported Currencies**
- Major currencies: USD, EUR, GBP, JPY, CAD, AUD, CHF
- Cryptocurrencies: BTC, ETH, ADA, DOT
- 150+ total currencies supported
Just ask me in natural language!"""
await self._send_text_message(event_queue, help_text)
async def _handle_unknown_request(self, user_input: str, event_queue: EventQueue):
"""Handle unknown or unclear requests"""
response = f"I'm not sure how to help with: '{user_input}'\n\n"
response += "I can help with currency conversions, historical rates, and rate alerts. "
response += "Try asking something like 'Convert 100 USD to EUR' or type 'help' for more options."
await self._send_text_message(event_queue, response)
async def _parse_conversion_request(self, user_input: str) -> Optional[Dict[str, Any]]:
"""Parse currency conversion request from natural language"""
import re
# Simple regex patterns for parsing
patterns = [
r'convert\s+(\d+(?:\.\d+)?)\s+(\w{3})\s+to\s+(\w{3})',
r'(\d+(?:\.\d+)?)\s+(\w{3})\s+to\s+(\w{3})',
r'how\s+much\s+is\s+(\d+(?:\.\d+)?)\s+(\w{3})\s+in\s+(\w{3})'
]
user_input_lower = user_input.lower()
for pattern in patterns:
match = re.search(pattern, user_input_lower)
if match:
return {
'amount': float(match.group(1)),
'from_currency': match.group(2).upper(),
'to_currency': match.group(3).upper()
}
return None
async def _fetch_exchange_rate(self, from_currency: str, to_currency: str) -> Dict[str, Any]:
"""Fetch real-time exchange rate"""
# Mock implementation - replace with actual API call
import random
# Simulate API delay
await asyncio.sleep(1)
# Mock exchange rates
mock_rates = {
('USD', 'EUR'): 0.85,
('EUR', 'USD'): 1.18,
('USD', 'GBP'): 0.73,
('GBP', 'USD'): 1.37,
('USD', 'JPY'): 110.0,
('JPY', 'USD'): 0.009
}
rate = mock_rates.get((from_currency, to_currency))
if not rate:
# Calculate inverse if available
inverse_rate = mock_rates.get((to_currency, from_currency))
if inverse_rate:
rate = 1 / inverse_rate
else:
rate = random.uniform(0.5, 2.0) # Fallback random rate
return {
'rate': rate,
'timestamp': datetime.now().isoformat()
}
async def _send_text_message(self, event_queue: EventQueue, text: str):
"""Send a text message to the client"""
event_queue.enqueue_event(new_agent_text_message(text))
async def _send_json_message(self, event_queue: EventQueue, data: Dict[str, Any]):
"""Send a JSON message to the client"""
event_queue.enqueue_event(new_agent_json_message(data))
async def _send_status_update(self, event_queue: EventQueue, message: str, status: str):
"""Send a status update for streaming responses"""
from a2a.types import TaskStatusUpdateEvent
status_event = TaskStatusUpdateEvent(
status=status,
message=message,
timestamp=datetime.now().isoformat()
)
event_queue.enqueue_event(status_event)
async def _send_error_message(self, event_queue: EventQueue, error: str):
"""Send an error message"""
error_text = f"Error: {error}"
await self._send_text_message(event_queue, error_text)
await self._send_status_update(event_queue, error_text, "error")
4. Server Implementation & Configuration
Complete A2A Server Setup
# agents/currency_agent/server.py
import asyncio
import logging
import uvicorn
from typing import Dict, Any
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.middleware import LoggingMiddleware, CORSMiddleware
from .agent import create_currency_agent_card
from .executor import CurrencyAgentExecutor
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class CurrencyAgentServer:
def __init__(self, host: str = "0.0.0.0", port: int = 9999):
self.host = host
self.port = port
self.agent_card = create_currency_agent_card()
self.task_store = InMemoryTaskStore()
self.executor = CurrencyAgentExecutor()
# Create request handler
self.request_handler = DefaultRequestHandler(
agent_executor=self.executor,
task_store=self.task_store
)
# Create A2A application
self.app = A2AStarletteApplication(
agent_card=self.agent_card,
http_handler=self.request_handler
)
# Add middleware
self._setup_middleware()
def _setup_middleware(self):
"""Setup middleware for the server"""
starlette_app = self.app.build()
# Add CORS middleware
starlette_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
# Add logging middleware
starlette_app.add_middleware(LoggingMiddleware)
async def start(self):
"""Start the A2A server"""
logger.info(f"Starting Currency Agent server on {self.host}:{self.port}")
logger.info(f"Agent Card URL: http://{self.host}:{self.port}/.well-known/agent.json")
config = uvicorn.Config(
self.app.build(),
host=self.host,
port=self.port,
log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
def run(self):
"""Run the server (blocking)"""
asyncio.run(self.start())
# Health check and metrics endpoints
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
async def health_check(request):
"""Health check endpoint"""
return JSONResponse({
"status": "healthy",
"agent": "Currency Agent",
"version": "2.1.0",
"timestamp": datetime.now().isoformat()
})
async def metrics(request):
"""Metrics endpoint"""
# In a real implementation, collect actual metrics
return JSONResponse({
"active_tasks": len(executor.active_tasks),
"total_sessions": len(executor.user_sessions),
"uptime": "24h 15m",
"requests_processed": 1247
})
# Add custom routes
def create_enhanced_server():
"""Create server with additional endpoints"""
base_app = A2AStarletteApplication(
agent_card=create_currency_agent_card(),
http_handler=DefaultRequestHandler(
agent_executor=CurrencyAgentExecutor(),
task_store=InMemoryTaskStore()
)
).build()
# Add custom routes
base_app.router.routes.extend([
Route("/health", health_check),
Route("/metrics", metrics)
])
return base_app
if __name__ == "__main__":
server = CurrencyAgentServer()
server.run()
5. Advanced Client Implementation
Comprehensive A2A Client
# client/advanced_client.py
import asyncio
import json
import logging
from typing import Dict, Any, List, Optional, AsyncGenerator
from datetime import datetime
import httpx
from a2a.client import A2AClient
from a2a.types import (
MessageSendParams, SendMessageRequest, SendStreamingMessageRequest,
Message, TextPart, JSONPart, TaskStatusUpdateEvent
)
logger = logging.getLogger(__name__)
class AdvancedA2AClient:
def __init__(self, agent_url: str):
self.agent_url = agent_url
self.client: Optional[A2AClient] = None
self.session_id = None
self.conversation_history: List[Dict[str, Any]] = []
async def __aenter__(self):
self.httpx_client = httpx.AsyncClient(timeout=30.0)
# Initialize A2A client from agent card
self.client = await A2AClient.get_client_from_agent_card_url(
self.httpx_client, self.agent_url
)
logger.info(f"Connected to agent at {self.agent_url}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.httpx_client:
await self.httpx_client.aclose()
async def send_message(self, text: str, stream: bool = True) -> Dict[str, Any]:
"""Send a message to the agent"""
if not self.client:
raise RuntimeError("Client not initialized")
# Create message
message = Message(
role='user',
parts=[TextPart(type='text', text=text)],
messageId=f"msg_{datetime.now().timestamp()}"
)
# Add to conversation history
self.conversation_history.append({
'timestamp': datetime.now().isoformat(),
'role': 'user',
'content': text
})
if stream:
return await self._send_streaming_message(message)
else:
return await self._send_regular_message(message)
async def _send_regular_message(self, message: Message) -> Dict[str, Any]:
"""Send a regular (non-streaming) message"""
request = SendMessageRequest(
params=MessageSendParams(message=message)
)
response = await self.client.send_message(request)
# Process response
result = {
'type': 'regular',
'response': response.result,
'timestamp': datetime.now().isoformat()
}
# Add to conversation history
if hasattr(response.result, 'parts') and response.result.parts:
content = response.result.parts[0].text if hasattr(response.result.parts[0], 'text') else str(response.result.parts[0])
self.conversation_history.append({
'timestamp': datetime.now().isoformat(),
'role': 'agent',
'content': content
})
return result
async def _send_streaming_message(self, message: Message) -> Dict[str, Any]:
"""Send a streaming message"""
request = SendStreamingMessageRequest(
params=MessageSendParams(message=message)
)
stream_response = self.client.send_message_streaming(request)
chunks = []
final_content = []
async for chunk in stream_response:
chunks.append(chunk.model_dump(mode='json', exclude_none=True))
# Extract content from chunk
if hasattr(chunk, 'result') and chunk.result:
if hasattr(chunk.result, 'parts'):
for part in chunk.result.parts:
if hasattr(part, 'text'):
final_content.append(part.text)
elif hasattr(part, 'data'):
final_content.append(json.dumps(part.data))
# Add to conversation history
if final_content:
self.conversation_history.append({
'timestamp': datetime.now().isoformat(),
'role': 'agent',
'content': ' '.join(final_content)
})
return {
'type': 'streaming',
'chunks': chunks,
'final_content': final_content,
'timestamp': datetime.now().isoformat()
}
async def get_agent_info(self) -> Dict[str, Any]:
"""Get agent card information"""
if not self.client:
raise RuntimeError("Client not initialized")
# Fetch agent card directly
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.agent_url}/.well-known/agent.json")
response.raise_for_status()
return response.json()
async def test_all_capabilities(self) -> Dict[str, Any]:
"""Test all agent capabilities"""
results = {}
# Get agent info
agent_info = await self.get_agent_info()
results['agent_info'] = agent_info
# Test each skill
skills = agent_info.get('skills', [])
results['skill_tests'] = {}
for skill in skills:
skill_id = skill['id']
examples = skill.get('examples', [])
if examples:
logger.info(f"Testing skill: {skill_id}")
test_result = await self.send_message(examples[0], stream=True)
results['skill_tests'][skill_id] = {
'example_used': examples[0],
'result': test_result
}
return results
def get_conversation_history(self) -> List[Dict[str, Any]]:
"""Get the conversation history"""
return self.conversation_history.copy()
def clear_conversation_history(self):
"""Clear the conversation history"""
self.conversation_history.clear()
# Interactive client for testing
class InteractiveA2AClient:
def __init__(self, agent_url: str):
self.agent_url = agent_url
async def run_interactive_session(self):
"""Run an interactive session with the agent"""
print(f"Connecting to agent at {self.agent_url}...")
async with AdvancedA2AClient(self.agent_url) as client:
# Get agent info
agent_info = await client.get_agent_info()
print(f"\nConnected to: {agent_info['name']}")
print(f"Description: {agent_info['description']}")
print(f"Version: {agent_info['version']}")
# Show available skills
print("\nAvailable skills:")
for skill in agent_info.get('skills', []):
print(f" - {skill['name']}: {skill['description']}")
if skill.get('examples'):
print(f" Examples: {', '.join(skill['examples'][:2])}")
print("\nType 'quit' to exit, 'help' for agent help, 'history' to see conversation")
print("=" * 60)
while True:
try:
user_input = input("\nYou: ").strip()
if user_input.lower() == 'quit':
break
elif user_input.lower() == 'history':
history = client.get_conversation_history()
print("\nConversation History:")
for entry in history[-10:]: # Show last 10 entries
print(f"[{entry['timestamp']}] {entry['role']}: {entry['content'][:100]}...")
continue
elif not user_input:
continue
# Send message to agent
print("Agent: ", end="", flush=True)
result = await client.send_message(user_input, stream=True)
if result['type'] == 'streaming':
# Print streaming response in real-time
for content in result['final_content']:
print(content)
else:
# Print regular response
if hasattr(result['response'], 'parts'):
for part in result['response'].parts:
if hasattr(part, 'text'):
```python
print(part.text)
except KeyboardInterrupt:
print("\nExiting...")
break
except Exception as e:
print(f"Error: {str(e)}")
# Multi-agent client for orchestration
class MultiAgentClient:
def __init__(self):
self.agents: Dict[str, AdvancedA2AClient] = {}
self.orchestrator_url = None
async def register_agent(self, name: str, url: str):
"""Register an agent with the client"""
client = AdvancedA2AClient(url)
await client.__aenter__()
self.agents[name] = client
# Get agent info for capabilities
agent_info = await client.get_agent_info()
print(f"Registered agent '{name}': {agent_info['name']}")
async def route_message(self, message: str, preferred_agent: Optional[str] = None) -> Dict[str, Any]:
"""Route message to appropriate agent"""
if preferred_agent and preferred_agent in self.agents:
return await self.agents[preferred_agent].send_message(message)
# Simple routing logic based on keywords
message_lower = message.lower()
if any(keyword in message_lower for keyword in ['currency', 'exchange', 'convert', 'rate']):
if 'currency' in self.agents:
return await self.agents['currency'].send_message(message)
elif any(keyword in message_lower for keyword in ['weather', 'temperature', 'forecast']):
if 'weather' in self.agents:
return await self.agents['weather'].send_message(message)
# Default to first available agent
if self.agents:
first_agent = next(iter(self.agents.values()))
return await first_agent.send_message(message)
raise RuntimeError("No agents available")
async def cleanup(self):
"""Clean up all agent connections"""
for client in self.agents.values():
await client.__aexit__(None, None, None)
# Usage example
async def multi_agent_example():
multi_client = MultiAgentClient()
try:
# Register multiple agents
await multi_client.register_agent('currency', 'http://localhost:9999')
await multi_client.register_agent('weather', 'http://localhost:9998')
# Route messages to appropriate agents
currency_result = await multi_client.route_message("Convert 100 USD to EUR")
weather_result = await multi_client.route_message("What's the weather like?")
print("Currency result:", currency_result)
print("Weather result:", weather_result)
finally:
await multi_client.cleanup()
if __name__ == "__main__":
# Run interactive session
client = InteractiveA2AClient("http://localhost:9999")
asyncio.run(client.run_interactive_session())
6. Agent-to-Agent Communication
Direct Agent Communication
# agents/orchestrator/agent_communication.py
import asyncio
import json
from typing import Dict, Any, List, Optional
from datetime import datetime
from a2a.client import A2AClient
from a2a.types import Message, TextPart, JSONPart
class AgentCommunicationManager:
def __init__(self):
self.registered_agents: Dict[str, Dict[str, Any]] = {}
self.active_connections: Dict[str, A2AClient] = {}
async def register_agent(self, agent_id: str, agent_url: str, capabilities: List[str]):
"""Register an agent for communication"""
self.registered_agents[agent_id] = {
'url': agent_url,
'capabilities': capabilities,
'last_seen': datetime.now().isoformat(),
'status': 'active'
}
# Establish connection
try:
import httpx
async with httpx.AsyncClient() as http_client:
client = await A2AClient.get_client_from_agent_card_url(
http_client, agent_url
)
self.active_connections[agent_id] = client
print(f"Agent {agent_id} registered and connected")
except Exception as e:
print(f"Failed to connect to agent {agent_id}: {str(e)}")
self.registered_agents[agent_id]['status'] = 'unreachable'
async def send_agent_message(self, target_agent_id: str, message: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Send a message from one agent to another"""
if target_agent_id not in self.active_connections:
raise ValueError(f"Agent {target_agent_id} not connected")
client = self.active_connections[target_agent_id]
# Create message with context
message_parts = [TextPart(type='text', text=message)]
if context:
message_parts.append(JSONPart(type='json', data=context))
agent_message = Message(
role='agent', # Message from another agent
parts=message_parts,
messageId=f"agent_msg_{datetime.now().timestamp()}"
)
# Send message
from a2a.types import MessageSendParams, SendMessageRequest
request = SendMessageRequest(
params=MessageSendParams(message=agent_message)
)
response = await client.send_message(request)
return {
'target_agent': target_agent_id,
'response': response.result,
'timestamp': datetime.now().isoformat()
}
async def broadcast_message(self, message: str, exclude_agents: Optional[List[str]] = None) -> Dict[str, Any]:
"""Broadcast a message to all connected agents"""
exclude_agents = exclude_agents or []
results = {}
for agent_id, client in self.active_connections.items():
if agent_id not in exclude_agents:
try:
result = await self.send_agent_message(agent_id, message)
results[agent_id] = result
except Exception as e:
results[agent_id] = {'error': str(e)}
return results
def find_agents_by_capability(self, capability: str) -> List[str]:
"""Find agents that have a specific capability"""
matching_agents = []
for agent_id, info in self.registered_agents.items():
if capability.lower() in [cap.lower() for cap in info['capabilities']]:
matching_agents.append(agent_id)
return matching_agents
async def coordinate_workflow(self, workflow_steps: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Coordinate a multi-step workflow across agents"""
workflow_results = []
context = {}
for step in workflow_steps:
step_id = step['id']
required_capability = step['capability']
message = step['message']
# Find suitable agent
suitable_agents = self.find_agents_by_capability(required_capability)
if not suitable_agents:
workflow_results.append({
'step_id': step_id,
'error': f"No agent found with capability: {required_capability}"
})
continue
# Use first suitable agent
target_agent = suitable_agents[0]
try:
# Include context from previous steps
result = await self.send_agent_message(
target_agent,
message,
context={'workflow_context': context}
)
workflow_results.append({
'step_id': step_id,
'agent': target_agent,
'result': result,
'status': 'completed'
})
# Update context for next steps
context[step_id] = result
except Exception as e:
workflow_results.append({
'step_id': step_id,
'agent': target_agent,
'error': str(e),
'status': 'failed'
})
return {
'workflow_id': f"workflow_{datetime.now().timestamp()}",
'steps': workflow_results,
'completed_at': datetime.now().isoformat()
}
# Example workflow coordination
async def travel_planning_workflow():
"""Example: Coordinate travel planning across multiple agents"""
comm_manager = AgentCommunicationManager()
# Register agents
await comm_manager.register_agent('weather', 'http://localhost:9997', ['weather', 'forecast'])
await comm_manager.register_agent('currency', 'http://localhost:9999', ['currency', 'exchange'])
await comm_manager.register_agent('booking', 'http://localhost:9996', ['hotels', 'flights'])
# Define workflow
workflow = [
{
'id': 'weather_check',
'capability': 'weather',
'message': 'What will the weather be like in Paris next week?'
},
{
'id': 'currency_conversion',
'capability': 'currency',
'message': 'Convert 1000 USD to EUR for travel budget'
},
{
'id': 'hotel_search',
'capability': 'hotels',
'message': 'Find hotels in Paris for next week, budget around 800 EUR'
}
]
# Execute workflow
results = await comm_manager.coordinate_workflow(workflow)
print("Travel Planning Workflow Results:")
for step in results['steps']:
print(f"Step {step['step_id']}: {step['status']}")
if 'result' in step:
print(f" Agent: {step['agent']}")
print(f" Result: {step['result']}")
elif 'error' in step:
print(f" Error: {step['error']}")
7. Authentication & Security
Authentication Implementation
# shared/auth.py
import jwt
import hashlib
import secrets
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
class A2AAuthManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.api_keys: Dict[str, Dict[str, Any]] = {}
self.jwt_tokens: Dict[str, Dict[str, Any]] = {}
def generate_api_key(self, agent_id: str, permissions: List[str]) -> str:
"""Generate API key for agent authentication"""
api_key = secrets.token_urlsafe(32)
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
self.api_keys[key_hash] = {
'agent_id': agent_id,
'permissions': permissions,
'created_at': datetime.now().isoformat(),
'last_used': None,
'active': True
}
return api_key
def validate_api_key(self, api_key: str) -> Optional[Dict[str, Any]]:
"""Validate API key and return agent info"""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
if key_hash in self.api_keys:
key_info = self.api_keys[key_hash]
if key_info['active']:
key_info['last_used'] = datetime.now().isoformat()
return key_info
return None
def generate_jwt_token(self, agent_id: str, permissions: List[str], expires_in_hours: int = 24) -> str:
"""Generate JWT token for agent authentication"""
payload = {
'agent_id': agent_id,
'permissions': permissions,
'iat': datetime.utcnow(),
'exp': datetime.utcnow() + timedelta(hours=expires_in_hours)
}
token = jwt.encode(payload, self.secret_key, algorithm='HS256')
# Store token info
self.jwt_tokens[token] = {
'agent_id': agent_id,
'created_at': datetime.now().isoformat(),
'expires_at': (datetime.now() + timedelta(hours=expires_in_hours)).isoformat()
}
return token
def validate_jwt_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Validate JWT token and return payload"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
# Authentication middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
class A2AAuthMiddleware(BaseHTTPMiddleware):
def __init__(self, app, auth_manager: A2AAuthManager):
super().__init__(app)
self.auth_manager = auth_manager
async def dispatch(self, request: Request, call_next):
# Skip auth for public endpoints
if request.url.path in ['/health', '/.well-known/agent.json']:
return await call_next(request)
# Check for API key
api_key = request.headers.get('X-API-Key')
if api_key:
agent_info = self.auth_manager.validate_api_key(api_key)
if agent_info:
request.state.agent_id = agent_info['agent_id']
request.state.permissions = agent_info['permissions']
return await call_next(request)
# Check for JWT token
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header[7:]
payload = self.auth_manager.validate_jwt_token(token)
if payload:
request.state.agent_id = payload['agent_id']
request.state.permissions = payload['permissions']
return await call_next(request)
# Authentication failed
return JSONResponse(
status_code=401,
content={'error': 'Authentication required'}
)
# Secure agent server
class SecureA2AServer:
def __init__(self, agent_card, executor, secret_key: str):
self.agent_card = agent_card
self.executor = executor
self.auth_manager = A2AAuthManager(secret_key)
# Generate API keys for known agents
self.setup_default_auth()
def setup_default_auth(self):
"""Setup default authentication for known agents"""
# Generate API key for admin
admin_key = self.auth_manager.generate_api_key(
'admin',
['read', 'write', 'admin']
)
print(f"Admin API Key: {admin_key}")
# Generate API key for client agents
client_key = self.auth_manager.generate_api_key(
'client_agent',
['read', 'write']
)
print(f"Client API Key: {client_key}")
def create_app(self):
"""Create secure Starlette application"""
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
app = A2AStarletteApplication(
agent_card=self.agent_card,
http_handler=DefaultRequestHandler(
agent_executor=self.executor,
task_store=InMemoryTaskStore()
)
).build()
# Add authentication middleware
app.add_middleware(A2AAuthMiddleware, auth_manager=self.auth_manager)
return app
8. Testing & Debugging
Comprehensive Testing Suite
# tests/test_a2a_integration.py
import asyncio
import pytest
import json
from typing import Dict, Any
from a2a.client import A2AClient
from agents.currency_agent.server import CurrencyAgentServer
from client.advanced_client import AdvancedA2AClient
class A2ATestSuite:
def __init__(self, agent_url: str):
self.agent_url = agent_url
self.test_results: Dict[str, Any] = {}
async def run_all_tests(self) -> Dict[str, Any]:
"""Run comprehensive test suite"""
print("Starting A2A Integration Tests...")
# Test agent discovery
await self.test_agent_discovery()
# Test basic communication
await self.test_basic_communication()
# Test streaming responses
await self.test_streaming_responses()
# Test error handling
await self.test_error_handling()
# Test concurrent requests
await self.test_concurrent_requests()
# Test skill-specific functionality
await self.test_skill_functionality()
return self.test_results
async def test_agent_discovery(self):
"""Test agent card discovery"""
print("Testing agent discovery...")
try:
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.agent_url}/.well-known/agent.json")
response.raise_for_status()
agent_card = response.json()
# Validate agent card structure
required_fields = ['name', 'description', 'version', 'skills']
for field in required_fields:
assert field in agent_card, f"Missing required field: {field}"
self.test_results['agent_discovery'] = {
'status': 'passed',
'agent_name': agent_card['name'],
'skills_count': len(agent_card.get('skills', []))
}
print("✓ Agent discovery test passed")
except Exception as e:
self.test_results['agent_discovery'] = {
'status': 'failed',
'error': str(e)
}
print(f"✗ Agent discovery test failed: {str(e)}")
async def test_basic_communication(self):
"""Test basic message sending"""
print("Testing basic communication...")
try:
async with AdvancedA2AClient(self.agent_url) as client:
response = await client.send_message("help", stream=False)
assert response['type'] == 'regular'
assert 'response' in response
self.test_results['basic_communication'] = {
'status': 'passed',
'response_type': response['type']
}
print("✓ Basic communication test passed")
except Exception as e:
self.test_results['basic_communication'] = {
'status': 'failed',
'error': str(e)
}
print(f"✗ Basic communication test failed: {str(e)}")
async def test_streaming_responses(self):
"""Test streaming message responses"""
print("Testing streaming responses...")
try:
async with AdvancedA2AClient(self.agent_url) as client:
response = await client.send_message("Convert 100 USD to EUR", stream=True)
assert response['type'] == 'streaming'
assert 'chunks' in response
assert 'final_content' in response
self.test_results['streaming_responses'] = {
'status': 'passed',
'chunks_count': len(response['chunks']),
'has_final_content': len(response['final_content']) > 0
}
print("✓ Streaming responses test passed")
except Exception as e:
self.test_results['streaming_responses'] = {
'status': 'failed',
'error': str(e)
}
print(f"✗ Streaming responses test failed: {str(e)}")
async def test_error_handling(self):
"""Test error handling for invalid requests"""
print("Testing error handling...")
try:
async with AdvancedA2AClient(self.agent_url) as client:
# Send invalid request
response = await client.send_message("invalid_command_xyz_123", stream=False)
# Should handle gracefully without crashing
self.test_results['error_handling'] = {
'status': 'passed',
'handled_gracefully': True
}
print("✓ Error handling test passed")
except Exception as e:
self.test_results['error_handling'] = {
'status': 'failed',
'error': str(e)
}
print(f"✗ Error handling test failed: {str(e)}")
async def test_concurrent_requests(self):
"""Test handling of concurrent requests"""
print("Testing concurrent requests...")
try:
async with AdvancedA2AClient(self.agent_url) as client:
# Send multiple concurrent requests
tasks = []
for i in range(5):
task = client.send_message(f"Convert {10 * (i+1)} USD to EUR", stream=True)
tasks.append(task)
responses = await asyncio.gather(*tasks, return_exceptions=True)
successful_responses = [r for r in responses if not isinstance(r, Exception)]
self.test_results['concurrent_requests'] = {
'status': 'passed',
'total_requests': len(tasks),
'successful_responses': len(successful_responses),
'success_rate': len(successful_responses) / len(tasks)
}
print(f"✓ Concurrent requests test passed ({len(successful_responses)}/{len(tasks)} successful)")
except Exception as e:
self.test_results['concurrent_requests'] = {
'status': 'failed',
'error': str(e)
}
print(f"✗ Concurrent requests test failed: {str(e)}")
async def test_skill_functionality(self):
"""Test specific skill functionality"""
print("Testing skill functionality...")
skill_tests = {
'currency_conversion': 'Convert 50 USD to GBP',
'currency_history': 'Show EUR/USD rates for last week',
'currency_alerts': 'Alert me when EUR/USD goes above 1.15'
}
skill_results = {}
try:
async with AdvancedA2AClient(self.agent_url) as client:
for skill_name, test_message in skill_tests.items():
try:
response = await client.send_message(test_message, stream=True)
skill_results[skill_name] = {
'status': 'passed',
'has_response': len(response.get('final_content', [])) > 0
}
print(f"✓ {skill_name} test passed")
except Exception as e:
skill_results[skill_name] = {
'status': 'failed',
'error': str(e)
}
print(f"✗ {skill_name} test failed: {str(e)}")
self.test_results['skill_functionality'] = skill_results
except Exception as e:
self.test_results['skill_functionality'] = {
'status': 'failed',
'error': str(e)
}
print(f"✗ Skill functionality tests failed: {str(e)}")
# Performance testing
class A2APerformanceTest:
def __init__(self, agent_url: str):
self.agent_url = agent_url
async def run_performance_tests(self) -> Dict[str, Any]:
"""Run performance benchmarks"""
results = {}
# Test response time
results['response_time'] = await self.test_response_time()
# Test throughput
results['throughput'] = await self.test_throughput()
# Test memory usage
results['memory_usage'] = await self.test_memory_usage()
return results
async def test_response_time(self) -> Dict[str, Any]:
"""Test average response time"""
import time
response_times = []
async with AdvancedA2AClient(self.agent_url) as client:
for _ in range(10):
start_time = time.time()
await client.send_message("Convert 100 USD to EUR", stream=False)
end_time = time.time()
response_times.append(end_time - start_time)
return {
'average_response_time': sum(response_times) / len(response_times),
'min_response_time': min(response_times),
'max_response_time': max(response_times),
'total_requests': len(response_times)
}
async def test_throughput(self) -> Dict[str, Any]:
"""Test requests per second"""
import time
start_time = time.time()
request_count = 50
async with AdvancedA2AClient(self.agent_url) as client:
tasks = []
for i in range(request_count):
task = client.send_message(f"Convert {i+1} USD to EUR", stream=False)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
duration = end_time - start_time
return {
'requests_per_second': request_count / duration,
'total_requests': request_count,
'total_duration': duration
}
async def test_memory_usage(self) -> Dict[str, Any]:
"""Test memory usage during operations"""
import psutil
import os
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
async with AdvancedA2AClient(self.agent_url) as client:
# Perform memory-intensive operations
for _ in range(20):
await client.send_message("Show historical data for all currencies", stream=True)
final_memory = process.memory_info().rss / 1024 / 1024 # MB
return {
'initial_memory_mb': initial_memory,
'final_memory_mb': final_memory,
'memory_increase_mb': final_memory - initial_memory
}
# Usage example
async def run_comprehensive_tests():
"""Run all tests"""
agent_url = "http://localhost:9999"
# Integration tests
integration_test = A2ATestSuite(agent_url)
integration_results = await integration_test.run_all_tests()
# Performance tests
performance_test = A2APerformanceTest(agent_url)
performance_results = await performance_test.run_performance_tests()
# Generate test report
test_report = {
'timestamp': datetime.now().isoformat(),
'agent_url': agent_url,
'integration_tests': integration_results,
'performance_tests': performance_results
}
# Save report
with open('test_report.json', 'w') as f:
json.dump(test_report, f, indent=2)
print("\nTest Report Generated: test_report.json")
return test_report
if __name__ == "__main__":
asyncio.run(run_comprehensive_tests())
9. Production Deployment
Docker Configuration
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash app
RUN chown -R app:app /app
USER app
# Expose port
EXPOSE 9999
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:9999/health || exit 1
# Start command
CMD ["python", "-m", "agents.currency_agent.server"]
# docker-compose.yml
version: '3.8'
services:
currency-agent:
build: .
ports:
- "9999:9999"
environment:
- EXCHANGE_API_KEY=${EXCHANGE_API_KEY}
- LOG_LEVEL=INFO
- AUTH_SECRET_KEY=${AUTH_SECRET_KEY}
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9999/health"]
interval: 30s
timeout: 10s
retries: 3
weather-agent:
build:
context: .
dockerfile: Dockerfile.weather
ports:
- "9998:9998"
environment:
- WEATHER_API_KEY=${WEATHER_API_KEY}
restart: unless-stopped
orchestrator:
build:
context: .
dockerfile: Dockerfile.orchestrator
ports:
- "9997:9997"
depends_on:
- currency-agent
- weather-agent
environment:
- REGISTERED_AGENTS=currency-agent:9999,weather-agent:9998
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- currency-agent
- weather-agent
- orchestrator
restart: unless-stopped
Production Configuration
# config/production.py
import os
from typing import Dict, Any
class ProductionConfig:
# Server configuration
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', 9999))
WORKERS = int(os.getenv('WORKERS', 4))
# Security
AUTH_SECRET_KEY = os.getenv('AUTH_SECRET_KEY', 'your-secret-key-here')
ALLOWED_ORIGINS = os.getenv('ALLOWED_ORIGINS', '*').split(',')
# API Keys
EXCHANGE_API_KEY = os.getenv('EXCHANGE_API_KEY')
# Database
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///./data/agent.db')
# Logging
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
LOG_FILE = os.getenv('LOG_FILE', './logs/agent.log')
# Rate limiting
RATE_LIMIT_REQUESTS = int(os.getenv('RATE_LIMIT_REQUESTS', 100))
RATE_LIMIT_WINDOW = int(os.getenv('RATE_LIMIT_WINDOW', 60))
# Monitoring
METRICS_ENABLED = os.getenv('METRICS_ENABLED', 'true').lower() == 'true'
HEALTH_CHECK_INTERVAL = int(os.getenv('HEALTH_CHECK_INTERVAL', 30))
@classmethod
def validate(cls) -> Dict[str, Any]:
"""Validate configuration"""
errors = []
warnings = []
# Required environment variables
required_vars = ['AUTH_SECRET_KEY', 'EXCHANGE_API_KEY']
for var in required_vars:
if not getattr(cls, var):
errors.append(f"Missing required environment variable: {var}")
# Validate numeric values
if cls.PORT < 1 or cls.PORT > 65535:
errors.append(f"Invalid port number: {cls.PORT}")
if cls.WORKERS < 1:
warnings.append(f"Low worker count: {cls.WORKERS}")
return {
'errors': errors,
'warnings': warnings,
'valid': len(errors) == 0
}
# Production server with monitoring
class ProductionA2AServer:
def __init__(self, config: ProductionConfig):
self.config = config
self.setup_logging()
self.setup_monitoring()
def setup_logging(self):
"""Setup production logging"""
import logging
from logging.handlers import RotatingFileHandler
# Create logs directory
os.makedirs(os.path.dirname(self.config.LOG_FILE), exist_ok=True)
# Configure logging
logging.basicConfig(
level=getattr(logging, self.config.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
RotatingFileHandler(
self.config.LOG_FILE,
maxBytes=10*1024*1024, # 10MB
backupCount=5
),
logging.StreamHandler()
]
)
def setup_monitoring(self):
"""Setup monitoring and metrics"""
if self.config.METRICS_ENABLED:
# Setup Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge
self.request_count = Counter('a2a_requests_total', 'Total requests')
self.request_duration = Histogram('a2a_request_duration_seconds', 'Request duration')
self.active_connections = Gauge('a2a_active_connections', 'Active connections')
def create_app(self):
"""Create production-ready application"""
from agents.currency_agent.agent import create_currency_agent_card
from agents.currency_agent.executor import CurrencyAgentExecutor
from shared.auth import SecureA2AServer
# Validate configuration
validation = self.config.validate()
if not validation['valid']:
raise RuntimeError(f"Configuration errors: {validation['errors']}")
# Create secure server
secure_server = SecureA2AServer(
agent_card=create_currency_agent_card(),
executor=CurrencyAgentExecutor(),
secret_key=self.config.AUTH_SECRET_KEY
)
app = secure_server.create_app()
# Add production middleware
self.add_production_middleware(app)
return app
def add_production_middleware(self, app):
"""Add production middleware"""
from starlette.middleware.gzip import GZipMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
# Add compression
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Add trusted host middleware
if self.config.ALLOWED_ORIGINS != ['*']:
app.add_middleware(TrustedHostMiddleware, allowed_hosts=self.config.ALLOWED_ORIGINS)
# Add rate limiting
from shared.middleware import RateLimitMiddleware
app.add_middleware(
RateLimitMiddleware,
requests_per_minute=self.config.RATE_LIMIT_REQUESTS,
window_seconds=self.config.RATE_LIMIT_WINDOW
)
def run(self):
"""Run production server"""
import uvicorn
uvicorn.run(
self.create_app(),
host=self.config.HOST,
port=self.config.PORT,
workers=self.config.WORKERS,
access_log=True,
server_header=False,
date_header=False
)
if __name__ == "__main__":
config = ProductionConfig()
server = ProductionA2AServer(config)
server.run()6
No comments:
Post a Comment