Monday, August 18, 2025

Comprehensive Programming Cheatsheets: MCP & A2A

  🔌 Anthropic's Model Context Protocol (MCP) - Complete Guide


1. Core Architecture & Installation

Installation & Setup

# bash


# Python SDK

pip install mcp


# TypeScript SDK

npm install @modelcontextprotocol/sdk


# Development tools

pip install mcp-server-stdio mcp-server-http


Basic Project Structure


my-mcp-project/

├── server/

│   ├── __init__.py

│   ├── main.py

│   └── sources/

│       ├── database.py

│       ├── filesystem.py

│       └── api.py

├── client/

│   └── test_client.py

└── config/

    └── server_config.json



2. MCP Server Implementation

Basic Server Setup (Python)


# server/main.py

import asyncio

import logging

from mcp.server import Server

from mcp.server.models import InitializationOptions

from mcp.server.stdio import stdio_server

from mcp.types import (

    Resource, Tool, TextContent, ImageContent, EmbeddedResource

)


# Configure logging

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)


# Create server instance

server = Server("my-mcp-server")


@server.list_resources()

async def handle_list_resources() -> list[Resource]:

    """List available resources"""

    return [

        Resource(

            uri="file://documents/readme.txt",

            name="README",

            description="Project documentation",

            mimeType="text/plain"

        ),

        Resource(

            uri="db://users",

            name="Users Database",

            description="User management database",

            mimeType="application/json"

        )

    ]


@server.read_resource()

async def handle_read_resource(uri: str) -> str:

    """Read resource content"""

    if uri == "file://documents/readme.txt":

        return "# Project Documentation\nThis is a sample MCP server."

    elif uri == "db://users":

        return '{"users": [{"id": 1, "name": "John"}, {"id": 2, "name": "Jane"}]}'

    else:

        raise ValueError(f"Unknown resource: {uri}")


@server.list_tools()

async def handle_list_tools() -> list[Tool]:

    """List available tools"""

    return [

        Tool(

            name="calculate",

            description="Perform mathematical calculations",

            inputSchema={

                "type": "object",

                "properties": {

                    "expression": {

                        "type": "string",

                        "description": "Mathematical expression to evaluate"

                    }

                },

                "required": ["expression"]

            }

        ),

        Tool(

            name="search_files",

            description="Search for files in the filesystem",

            inputSchema={

                "type": "object",

                "properties": {

                    "pattern": {"type": "string"},

                    "directory": {"type": "string", "default": "."}

                },

                "required": ["pattern"]

            }

        )

    ]


@server.call_tool()

async def handle_call_tool(name: str, arguments: dict) -> list[TextContent]:

    """Execute tool calls"""

    if name == "calculate":

        try:

            result = eval(arguments["expression"])

            return [TextContent(type="text", text=f"Result: {result}")]

        except Exception as e:

            return [TextContent(type="text", text=f"Error: {str(e)}")]

    

    elif name == "search_files":

        import glob

        import os

        

        pattern = arguments["pattern"]

        directory = arguments.get("directory", ".")

        

        search_path = os.path.join(directory, pattern)

        files = glob.glob(search_path, recursive=True)

        

        if files:

            file_list = "\n".join(files)

            return [TextContent(type="text", text=f"Found files:\n{file_list}")]

        else:

            return [TextContent(type="text", text="No files found matching pattern")]

    

    else:

        raise ValueError(f"Unknown tool: {name}")


async def main():

    # Run server with stdio transport

    async with stdio_server() as (read_stream, write_stream):

        await server.run(

            read_stream,

            write_stream,

            InitializationOptions(

                server_name="my-mcp-server",

                server_version="1.0.0",

                capabilities=server.get_capabilities(

                    notification_options=None,

                    experimental_capabilities={}

                )

            )

        )


if __name__ == "__main__":

    asyncio.run(main())



Advanced Server with Database Integration



# server/sources/database.py

import asyncio

import sqlite3

from typing import List, Dict, Any

from mcp.server import Server

from mcp.types import Resource, Tool, TextContent


class DatabaseMCPServer:

    def __init__(self, db_path: str):

        self.db_path = db_path

        self.server = Server("database-mcp-server")

        self._setup_handlers()

        self._init_database()

    

    def _init_database(self):

        """Initialize SQLite database with sample data"""

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        # Create tables

        cursor.execute('''

            CREATE TABLE IF NOT EXISTS users (

                id INTEGER PRIMARY KEY,

                name TEXT NOT NULL,

                email TEXT UNIQUE,

                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

            )

        ''')

        

        cursor.execute('''

            CREATE TABLE IF NOT EXISTS products (

                id INTEGER PRIMARY KEY,

                name TEXT NOT NULL,

                price REAL,

                category TEXT,

                stock INTEGER DEFAULT 0

            )

        ''')

        

        # Insert sample data

        cursor.executemany('''

            INSERT OR IGNORE INTO users (name, email) VALUES (?, ?)

        ''', [

            ("Alice Johnson", "alice@example.com"),

            ("Bob Smith", "bob@example.com"),

            ("Carol Davis", "carol@example.com")

        ])

        

        cursor.executemany('''

            INSERT OR IGNORE INTO products (name, price, category, stock) VALUES (?, ?, ?, ?)

        ''', [

            ("Laptop", 999.99, "Electronics", 10),

            ("Mouse", 29.99, "Electronics", 50),

            ("Desk Chair", 199.99, "Furniture", 5)

        ])

        

        conn.commit()

        conn.close()

    

    def _setup_handlers(self):

        @self.server.list_resources()

        async def list_resources() -> List[Resource]:

            return [

                Resource(

                    uri="sqlite://users",

                    name="Users Table",

                    description="User management data",

                    mimeType="application/json"

                ),

                Resource(

                    uri="sqlite://products", 

                    name="Products Table",

                    description="Product catalog data",

                    mimeType="application/json"

                )

            ]

        

        @self.server.read_resource()

        async def read_resource(uri: str) -> str:

            conn = sqlite3.connect(self.db_path)

            conn.row_factory = sqlite3.Row

            cursor = conn.cursor()

            

            try:

                if uri == "sqlite://users":

                    cursor.execute("SELECT * FROM users")

                    rows = [dict(row) for row in cursor.fetchall()]

                    return json.dumps({"users": rows}, indent=2)

                

                elif uri == "sqlite://products":

                    cursor.execute("SELECT * FROM products")

                    rows = [dict(row) for row in cursor.fetchall()]

                    return json.dumps({"products": rows}, indent=2)

                

                else:

                    raise ValueError(f"Unknown resource: {uri}")

            finally:

                conn.close()

        

        @self.server.list_tools()

        async def list_tools() -> List[Tool]:

            return [

                Tool(

                    name="query_database",

                    description="Execute SQL queries on the database",

                    inputSchema={

                        "type": "object",

                        "properties": {

                            "query": {

                                "type": "string",

                                "description": "SQL query to execute"

                            },

                            "params": {

                                "type": "array",

                                "description": "Query parameters",

                                "items": {"type": "string"}

                            }

                        },

                        "required": ["query"]

                    }

                ),

                Tool(

                    name="add_user",

                    description="Add a new user to the database",

                    inputSchema={

                        "type": "object",

                        "properties": {

                            "name": {"type": "string"},

                            "email": {"type": "string"}

                        },

                        "required": ["name", "email"]

                    }

                )

            ]

        

        @self.server.call_tool()

        async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:

            conn = sqlite3.connect(self.db_path)

            conn.row_factory = sqlite3.Row

            cursor = conn.cursor()

            

            try:

                if name == "query_database":

                    query = arguments["query"]

                    params = arguments.get("params", [])

                    

                    cursor.execute(query, params)

                    

                    if query.strip().upper().startswith("SELECT"):

                        rows = [dict(row) for row in cursor.fetchall()]

                        result = json.dumps(rows, indent=2)

                    else:

                        conn.commit()

                        result = f"Query executed successfully. Rows affected: {cursor.rowcount}"

                    

                    return [TextContent(type="text", text=result)]

                

                elif name == "add_user":

                    name_val = arguments["name"]

                    email = arguments["email"]

                    

                    cursor.execute(

                        "INSERT INTO users (name, email) VALUES (?, ?)",

                        (name_val, email)

                    )

                    conn.commit()

                    

                    return [TextContent(

                        type="text", 

                        text=f"User '{name_val}' added successfully with ID {cursor.lastrowid}"

                    )]

                

                else:

                    raise ValueError(f"Unknown tool: {name}")

            

            except Exception as e:

                return [TextContent(type="text", text=f"Error: {str(e)}")]

            finally:

                conn.close()




HTTP Server Implementation



# server/http_server.py

from fastapi import FastAPI, HTTPException

from fastapi.middleware.cors import CORSMiddleware

from mcp.server.fastapi import MCPServer

from mcp.types import *

import json


app = FastAPI(title="MCP HTTP Server", version="1.0.0")


# Add CORS middleware

app.add_middleware(

    CORSMiddleware,

    allow_origins=["*"],

    allow_credentials=True,

    allow_methods=["*"],

    allow_headers=["*"],

)


# Create MCP server instance

mcp_server = MCPServer("http-mcp-server")


# File system resource handler

@mcp_server.list_resources()

async def list_resources() -> List[Resource]:

    import os

    resources = []

    

    for root, dirs, files in os.walk("./data"):

        for file in files:

            file_path = os.path.join(root, file)

            resources.append(Resource(

                uri=f"file://{file_path}",

                name=file,

                description=f"File: {file_path}",

                mimeType="text/plain" if file.endswith('.txt') else "application/octet-stream"

            ))

    

    return resources


@mcp_server.read_resource()

async def read_resource(uri: str) -> str:

    if uri.startswith("file://"):

        file_path = uri[7:]  # Remove "file://" prefix

        try:

            with open(file_path, 'r', encoding='utf-8') as f:

                return f.read()

        except FileNotFoundError:

            raise HTTPException(status_code=404, detail="File not found")

        except Exception as e:

            raise HTTPException(status_code=500, detail=str(e))

    else:

        raise HTTPException(status_code=400, detail="Invalid URI scheme")


# Web scraping tool

@mcp_server.list_tools()

async def list_tools() -> List[Tool]:

    return [

        Tool(

            name="web_scrape",

            description="Scrape content from a web page",

            inputSchema={

                "type": "object",

                "properties": {

                    "url": {

                        "type": "string",

                        "description": "URL to scrape"

                    },

                    "selector": {

                        "type": "string", 

                        "description": "CSS selector for content extraction",

                        "default": "body"

                    }

                },

                "required": ["url"]

            }

        ),

        Tool(

            name="json_query",

            description="Query JSON data using JSONPath",

            inputSchema={

                "type": "object",

                "properties": {

                    "data": {"type": "string", "description": "JSON data"},

                    "query": {"type": "string", "description": "JSONPath query"}

                },

                "required": ["data", "query"]

            }

        )

    ]


@mcp_server.call_tool()

async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:

    if name == "web_scrape":

        import requests

        from bs4 import BeautifulSoup

        

        url = arguments["url"]

        selector = arguments.get("selector", "body")

        

        try:

            response = requests.get(url, timeout=10)

            response.raise_for_status()

            

            soup = BeautifulSoup(response.content, 'html.parser')

            elements = soup.select(selector)

            

            content = "\n".join([elem.get_text(strip=True) for elem in elements])

            

            return [TextContent(

                type="text",

                text=f"Scraped content from {url}:\n\n{content[:2000]}..."

            )]

        

        except Exception as e:

            return [TextContent(type="text", text=f"Error scraping {url}: {str(e)}")]

    

    elif name == "json_query":

        import jsonpath_ng

        

        try:

            data = json.loads(arguments["data"])

            query = arguments["query"]

            

            jsonpath_expr = jsonpath_ng.parse(query)

            matches = [match.value for match in jsonpath_expr.find(data)]

            

            result = json.dumps(matches, indent=2)

            return [TextContent(type="text", text=f"Query results:\n{result}")]

        

        except Exception as e:

            return [TextContent(type="text", text=f"Error: {str(e)}")]

    

    else:

        raise HTTPException(status_code=400, detail=f"Unknown tool: {name}")


# Mount MCP server to FastAPI

app.mount("/mcp", mcp_server.create_app())


# Health check endpoint

@app.get("/health")

async def health_check():

    return {"status": "healthy", "server": "MCP HTTP Server"}


if __name__ == "__main__":

    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

```


3. MCP Client Implementation


Basic Client Usage



# client/basic_client.py

import asyncio

from mcp import ClientSession, StdioServerParameters

from mcp.client.stdio import stdio_client


async def main():

    # Connect to MCP server via stdio

    server_params = StdioServerParameters(

        command="python",

        args=["server/main.py"]

    )

    

    async with stdio_client(server_params) as (read, write):

        async with ClientSession(read, write) as session:

            # Initialize the connection

            await session.initialize()

            

            # List available resources

            resources = await session.list_resources()

            print("Available resources:")

            for resource in resources.resources:

                print(f"  - {resource.name}: {resource.uri}")

            

            # Read a specific resource

            if resources.resources:

                content = await session.read_resource(resources.resources[0].uri)

                print(f"\nResource content:\n{content.contents[0].text}")

            

            # List available tools

            tools = await session.list_tools()

            print(f"\nAvailable tools:")

            for tool in tools.tools:

                print(f"  - {tool.name}: {tool.description}")

            

            # Call a tool

            if tools.tools:

                result = await session.call_tool(

                    tools.tools[0].name,

                    {"expression": "2 + 2 * 3"}

                )

                print(f"\nTool result: {result.content[0].text}")


if __name__ == "__main__":

    asyncio.run(main())



HTTP Client Implementation



# client/http_client.py

import asyncio

import aiohttp

from mcp.client.session import ClientSession

from mcp.client.sse import sse_client


class MCPHTTPClient:

    def __init__(self, base_url: str):

        self.base_url = base_url.rstrip('/')

        self.session = None

    

    async def __aenter__(self):

        self.http_session = aiohttp.ClientSession()

        

        # Connect via Server-Sent Events

        async with sse_client(f"{self.base_url}/mcp/sse") as (read, write):

            self.session = ClientSession(read, write)

            await self.session.initialize()

            return self

    

    async def __aexit__(self, exc_type, exc_val, exc_tb):

        if self.http_session:

            await self.http_session.close()

    

    async def list_resources(self):

        return await self.session.list_resources()

    

    async def read_resource(self, uri: str):

        return await self.session.read_resource(uri)

    

    async def list_tools(self):

        return await self.session.list_tools()

    

    async def call_tool(self, name: str, arguments: dict):

        return await self.session.call_tool(name, arguments)


# Usage example

async def test_http_client():

    async with MCPHTTPClient("http://localhost:8000") as client:

        # List and read resources

        resources = await client.list_resources()

        for resource in resources.resources:

            print(f"Resource: {resource.name}")

            content = await client.read_resource(resource.uri)

            print(f"Content preview: {content.contents[0].text[:100]}...")

        

        # Use tools

        tools = await client.list_tools()

        for tool in tools.tools:

            print(f"Tool: {tool.name} - {tool.description}")

        

        # Web scraping example

        result = await client.call_tool("web_scrape", {

            "url": "https://httpbin.org/json",

            "selector": "body"

        })

        print(f"Scrape result: {result.content[0].text}")


if __name__ == "__main__":

    asyncio.run(test_http_client())



4. Advanced MCP Features


Prompts and Templates



# server/prompts.py

from mcp.server import Server

from mcp.types import Prompt, PromptArgument, PromptMessage, TextContent


server = Server("prompt-server")


@server.list_prompts()

async def list_prompts() -> List[Prompt]:

    return [

        Prompt(

            name="code_review",

            description="Generate a code review for the given code",

            arguments=[

                PromptArgument(

                    name="code",

                    description="The code to review",

                    required=True

                ),

                PromptArgument(

                    name="language",

                    description="Programming language",

                    required=False

                )

            ]

        ),

        Prompt(

            name="summarize_data",

            description="Summarize structured data",

            arguments=[

                PromptArgument(

                    name="data",

                    description="JSON data to summarize",

                    required=True

                ),

                PromptArgument(

                    name="focus",

                    description="What to focus on in the summary",

                    required=False

                )

            ]

        )

    ]


@server.get_prompt()

async def get_prompt(name: str, arguments: Dict[str, str]) -> PromptMessage:

    if name == "code_review":

        code = arguments["code"]

        language = arguments.get("language", "unknown")

        

        prompt_text = f"""Please review the following {language} code:


```{language}

{code}

```


Focus on:

1. Code quality and best practices

2. Potential bugs or issues

3. Performance considerations

4. Readability and maintainability

5. Security concerns


Provide specific suggestions for improvement."""


        return PromptMessage(

            role="user",

            content=TextContent(type="text", text=prompt_text)

        )

    

    elif name == "summarize_data":

        data = arguments["data"]

        focus = arguments.get("focus", "key insights")

        

        prompt_text = f"""Analyze and summarize the following data, focusing on {focus}:


{data}


Please provide:

1. Key statistics and trends

2. Notable patterns or anomalies

3. Main insights and takeaways

4. Recommendations based on the data"""


        return PromptMessage(

            role="user",

            content=TextContent(type="text", text=prompt_text)

        )

    

    else:

        raise ValueError(f"Unknown prompt: {name}")



Notifications and Subscriptions



# server/notifications.py

import asyncio

from typing import Dict, Set

from mcp.server import Server, NotificationOptions

from mcp.types import *


class NotificationMCPServer:

    def __init__(self):

        self.server = Server("notification-server")

        self.subscribers: Dict[str, Set[str]] = {}

        self._setup_handlers()

    

    def _setup_handlers(self):

        @self.server.list_resources()

        async def list_resources() -> List[Resource]:

            return [

                Resource(

                    uri="notifications://system",

                    name="System Notifications",

                    description="Real-time system notifications"

                )

            ]

        

        @self.server.subscribe()

        async def handle_subscribe(uri: str) -> None:

            if uri not in self.subscribers:

                self.subscribers[uri] = set()

            # In a real implementation, you'd track the client session

            print(f"Client subscribed to {uri}")

        

        @self.server.unsubscribe()

        async def handle_unsubscribe(uri: str) -> None:

            if uri in self.subscribers:

                # Remove client from subscribers

                print(f"Client unsubscribed from {uri}")

    

    async def send_notification(self, uri: str, content: str):

        """Send notification to all subscribers of a URI"""

        if uri in self.subscribers and self.subscribers[uri]:

            notification = ResourceUpdatedNotification(

                uri=uri,

                content=TextContent(type="text", text=content)

            )

            # In a real implementation, send to all subscribed clients

            print(f"Notification sent for {uri}: {content}")


# Usage in a monitoring system

async def monitoring_example():

    server = NotificationMCPServer()

    

    # Simulate system events

    while True:

        await asyncio.sleep(5)

        await server.send_notification(

            "notifications://system",

            f"System check at {asyncio.get_event_loop().time()}: All systems operational"

        )



Error Handling and Logging



# server/error_handling.py

import logging

from typing import Any, Dict, List

from mcp.server import Server

from mcp.types import *


# Configure logging

logging.basicConfig(

    level=logging.INFO,

    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'

)

logger = logging.getLogger(__name__)


class RobustMCPServer:

    def __init__(self):

        self.server = Server("robust-server")

        self._setup_handlers()

    

    def _setup_handlers(self):

        @self.server.call_tool()

        async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:

            logger.info(f"Tool called: {name} with args: {arguments}")

            

            try:

                if name == "risky_operation":

                    # Validate input

                    if "data" not in arguments:

                        raise ValueError("Missing required parameter: data")

                    

                    data = arguments["data"]

                    if not isinstance(data, str) or len(data) == 0:

                        raise ValueError("Data must be a non-empty string")

                    

                    # Simulate risky operation

                    if "error" in data.lower():

                        raise RuntimeError("Simulated operation failure")

                    

                    result = f"Processed: {data.upper()}"

                    logger.info(f"Tool {name} completed successfully")

                    

                    return [TextContent(type="text", text=result)]

                

                else:

                    raise ValueError(f"Unknown tool: {name}")

            

            except ValueError as e:

                logger.warning(f"Validation error in tool {name}: {str(e)}")

                return [TextContent(

                    type="text", 

                    text=f"Validation Error: {str(e)}"

                )]

            

            except RuntimeError as e:

                logger.error(f"Runtime error in tool {name}: {str(e)}")

                return [TextContent(

                    type="text", 

                    text=f"Operation Failed: {str(e)}"

                )]

            

            except Exception as e:

                logger.exception(f"Unexpected error in tool {name}")

                return [TextContent(

                    type="text", 

                    text=f"Internal Error: An unexpected error occurred"

                )]

        

        @self.server.read_resource()

        async def read_resource(uri: str) -> str:

            logger.info(f"Resource requested: {uri}")

            

            try:

                # Validate URI format

                if not uri.startswith(("file://", "http://", "https://")):

                    raise ValueError(f"Unsupported URI scheme: {uri}")

                

                if uri.startswith("file://"):

                    file_path = uri[7:]

                    

                    # Security check - prevent directory traversal

                    if ".." in file_path or file_path.startswith("/"):

                        raise PermissionError("Access denied: Invalid file path")

                    

                    with open(file_path, 'r') as f:

                        content = f.read()

                    

                    logger.info(f"File read successfully: {file_path}")

                    return content

                

                else:

                    raise NotImplementedError(f"URI scheme not implemented: {uri}")

            

            except FileNotFoundError:

                logger.warning(f"File not found: {uri}")

                raise

            except PermissionError as e:

                logger.warning(f"Permission denied: {uri} - {str(e)}")

                raise

            except Exception as e:

                logger.exception(f"Error reading resource: {uri}")

                raise RuntimeError(f"Failed to read resource: {str(e)}")





🤖 Google's Agent2Agent (A2A) Protocol - Complete Guide


1. Core Architecture & Installation


Installation & Setup


#bash

# Clone the official repository

git clone https://github.com/google/A2A.git

cd A2A


# Install Python SDK

pip install --upgrade a2a-sdk


# Alternative: Install from source

git clone https://github.com/google/a2a-python.git

cd a2a-python

pip install -e .



Project Structure



my-a2a-project/

├── agents/

│   ├── currency_agent/

│   │   ├── __init__.py

│   │   ├── agent.py

│   │   ├── executor.py

│   │   └── tools.py

│   ├── weather_agent/

│   └── orchestrator/

├── shared/

│   ├── types.py

│   └── utils.py

├── tests/

└── config/

    └── agent_configs.json



2. Agent Card & Skills Definition


Comprehensive Agent Card



# agents/currency_agent/agent.py

from a2a.types import (

    AgentCard, AgentSkill, AgentCapabilities, AgentAuthentication,

    AgentContact, AgentLicense, InputMode, OutputMode

)


def create_currency_agent_card() -> AgentCard:

    # Define multiple skills

    skills = [

        AgentSkill(

            id='currency_conversion',

            name='Currency Conversion',

            description='Convert amounts between different currencies using real-time exchange rates',

            tags=['finance', 'currency', 'conversion', 'exchange-rates'],

            examples=[

                'Convert 100 USD to EUR',

                'How much is 50 GBP in Japanese Yen?',

                'What is the current USD to CAD exchange rate?'

            ],

            inputModes=[InputMode.TEXT],

            outputModes=[OutputMode.TEXT, OutputMode.JSON]

        ),

        AgentSkill(

            id='currency_history',

            name='Currency Rate History',

            description='Retrieve historical exchange rate data for currency pairs',

            tags=['finance', 'currency', 'history', 'analytics'],

            examples=[

                'Show EUR/USD rates for the last 30 days',

                'Historical GBP/JPY exchange rates for 2023',

                'Currency trend analysis for Bitcoin'

            ],

            inputModes=[InputMode.TEXT],

            outputModes=[OutputMode.TEXT, OutputMode.JSON, OutputMode.IMAGE]

        ),

        AgentSkill(

            id='currency_alerts',

            name='Currency Rate Alerts',

            description='Set up alerts for currency rate changes',

            tags=['finance', 'alerts', 'monitoring'],

            examples=[

                'Alert me when EUR/USD goes above 1.10',

                'Notify when Bitcoin drops below $30,000'

            ],

            inputModes=[InputMode.TEXT],

            outputModes=[OutputMode.TEXT]

        )

    ]

    

    return AgentCard(

        name='Advanced Currency Agent',

        description='Comprehensive currency conversion and analysis agent with real-time data',

        url='http://localhost:9999/',

        version='2.1.0',

        

        # Capabilities

        capabilities=AgentCapabilities(

            streaming=True,

            pushNotifications=True,

            multiTurn=True,

            contextRetention=True

        ),

        

        # I/O Modes

        defaultInputModes=[InputMode.TEXT],

        defaultOutputModes=[OutputMode.TEXT, OutputMode.JSON],

        supportedInputModes=[InputMode.TEXT, InputMode.JSON],

        supportedOutputModes=[InputMode.TEXT, OutputMode.JSON, OutputMode.IMAGE],

        

        # Skills

        skills=skills,

        

        # Authentication

        authentication=AgentAuthentication(

            schemes=['bearer', 'api-key'],

            required=False

        ),

        

        # Metadata

        contact=AgentContact(

            name='Currency Team',

            email='currency-team@example.com',

            url='https://example.com/currency-agent'

        ),

        

        license=AgentLicense(

            name='MIT',

            url='https://opensource.org/licenses/MIT'

        ),

        

        # Additional metadata

        tags=['finance', 'currency', 'real-time', 'analytics'],

        categories=['finance', 'data-analysis'],

        

        # Rate limiting

        rateLimits={

            'requests_per_minute': 100,

            'requests_per_hour': 1000

        }

    )



Multi-Agent System Card



# agents/orchestrator/multi_agent_card.py

from typing import List

from a2a.types import *


class MultiAgentOrchestrator:

    def __init__(self):

        self.registered_agents: List[AgentCard] = []

    

    def create_orchestrator_card(self) -> AgentCard:

        skills = [

            AgentSkill(

                id='agent_discovery',

                name='Agent Discovery',

                description='Discover and list available agents in the system',

                tags=['orchestration', 'discovery', 'system'],

                examples=['List all available agents', 'Find agents with finance capabilities']

            ),

            AgentSkill(

                id='task_routing',

                name='Task Routing',

                description='Route tasks to appropriate specialized agents',

                tags=['orchestration', 'routing', 'delegation'],

                examples=['Route this currency question to the finance agent']

            ),

            AgentSkill(

                id='multi_agent_workflow',

                name='Multi-Agent Workflow',

                description='Coordinate complex workflows across multiple agents',

                tags=['orchestration', 'workflow', 'coordination'],

                examples=['Plan a trip using weather, booking, and currency agents']

            )

        ]

        

        return AgentCard(

            name='Multi-Agent Orchestrator',

            description='Coordinates and routes tasks between specialized agents',

            url='http://localhost:9998/',

            version='1.0.0',

            capabilities=AgentCapabilities(

                streaming=True,

                multiTurn=True,

                contextRetention=True,

                pushNotifications=True

            ),

            skills=skills,

            defaultInputModes=[InputMode.TEXT],

            defaultOutputModes=[OutputMode.TEXT, OutputMode.JSON],

            authentication=AgentAuthentication(schemes=['public'])

        )

    

    def register_agent(self, agent_card: AgentCard):

        """Register an agent with the orchestrator"""

        self.registered_agents.append(agent_card)

    

    def find_agents_by_capability(self, capability: str) -> List[AgentCard]:

        """Find agents that have specific capabilities"""

        matching_agents = []

        for agent in self.registered_agents:

            for skill in agent.skills:

                if capability.lower() in [tag.lower() for tag in skill.tags]:

                    matching_agents.append(agent)

                    break

        return matching_agents



3. Advanced Agent Executor Implementation


Comprehensive Agent Executor



# agents/currency_agent/executor.py

import asyncio

import json

import logging

from typing import Dict, Any, Optional, List

from datetime import datetime, timedelta


from a2a.server.agent_execution import AgentExecutor, RequestContext

from a2a.server.events import EventQueue

from a2a.utils import new_agent_text_message, new_agent_json_message

from a2a.types import Message, MessagePart, TextPart, JSONPart


logger = logging.getLogger(__name__)


class CurrencyAgentExecutor(AgentExecutor):

    def __init__(self):

        self.exchange_api_key = "your-api-key-here"

        self.base_url = "https://api.exchangerate-api.com/v4/latest"

        self.active_tasks: Dict[str, bool] = {}

        self.user_sessions: Dict[str, Dict[str, Any]] = {}

    

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:

        """Main execution method for handling requests"""

        task_id = context.task_id

        self.active_tasks[task_id] = True

        

        try:

            # Extract user message

            message = context.request.params.message

            user_input = self._extract_text_from_message(message)

            user_id = message.messageId

            

            # Initialize user session if needed

            if user_id not in self.user_sessions:

                self.user_sessions[user_id] = {

                    'conversation_history': [],

                    'preferences': {},

                    'alerts': []

                }

            

            # Add to conversation history

            self.user_sessions[user_id]['conversation_history'].append({

                'timestamp': datetime.now().isoformat(),

                'user_input': user_input,

                'type': 'user'

            })

            

            # Send initial acknowledgment (streaming)

            await self._send_status_update(

                event_queue, 

                "Processing your currency request...", 

                "working"

            )

            

            # Determine intent and route to appropriate handler

            intent = await self._classify_intent(user_input)

            

            if intent == 'currency_conversion':

                await self._handle_currency_conversion(user_input, event_queue, context)

            elif intent == 'currency_history':

                await self._handle_currency_history(user_input, event_queue, context)

            elif intent == 'currency_alerts':

                await self._handle_currency_alerts(user_input, event_queue, context, user_id)

            elif intent == 'help':

                await self._handle_help_request(event_queue)

            else:

                await self._handle_unknown_request(user_input, event_queue)

            

        except Exception as e:

            logger.exception(f"Error in task {task_id}")

            await self._send_error_message(event_queue, str(e))

        finally:

            self.active_tasks.pop(task_id, None)

    

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:

        """Handle task cancellation"""

        task_id = context.task_id

        if task_id in self.active_tasks:

            self.active_tasks[task_id] = False

            await self._send_status_update(

                event_queue, 

                "Task cancelled by user", 

                "cancelled"

            )

    

    def _extract_text_from_message(self, message: Message) -> str:

        """Extract text content from message parts"""

        text_parts = []

        for part in message.parts:

            if isinstance(part, TextPart):

                text_parts.append(part.text)

        return " ".join(text_parts)

    

    async def _classify_intent(self, user_input: str) -> str:

        """Classify user intent using simple keyword matching"""

        user_input_lower = user_input.lower()

        

        conversion_keywords = ['convert', 'exchange', 'how much', 'rate', 'usd', 'eur', 'gbp']

        history_keywords = ['history', 'historical', 'trend', 'chart', 'past', 'last month']

        alert_keywords = ['alert', 'notify', 'notification', 'watch', 'monitor']

        help_keywords = ['help', 'what can you do', 'capabilities', 'commands']

        

        if any(keyword in user_input_lower for keyword in conversion_keywords):

            return 'currency_conversion'

        elif any(keyword in user_input_lower for keyword in history_keywords):

            return 'currency_history'

        elif any(keyword in user_input_lower for keyword in alert_keywords):

            return 'currency_alerts'

        elif any(keyword in user_input_lower for keyword in help_keywords):

            return 'help'

        else:

            return 'unknown'

    

    async def _handle_currency_conversion(self, user_input: str, event_queue: EventQueue, context: RequestContext):

        """Handle currency conversion requests"""

        # Parse conversion request

        conversion_data = await self._parse_conversion_request(user_input)

        

        if not conversion_data:

            await self._send_text_message(

                event_queue,

                "I couldn't understand the conversion request. Please specify amount, from currency, and to currency. Example: 'Convert 100 USD to EUR'"

            )

            return

        

        amount = conversion_data['amount']

        from_currency = conversion_data['from_currency']

        to_currency = conversion_data['to_currency']

        

        # Send status update

        await self._send_status_update(

            event_queue,

            f"Converting {amount} {from_currency} to {to_currency}...",

            "working"

        )

        

        # Fetch exchange rate

        try:

            rate_data = await self._fetch_exchange_rate(from_currency, to_currency)

            converted_amount = amount * rate_data['rate']

            

            # Prepare response

            response_text = f"{amount} {from_currency} = {converted_amount:.2f} {to_currency}\n"

            response_text += f"Exchange rate: 1 {from_currency} = {rate_data['rate']:.4f} {to_currency}\n"

            response_text += f"Last updated: {rate_data['timestamp']}"

            

            # Send both text and JSON responses

            await self._send_text_message(event_queue, response_text)

            

            json_response = {

                "conversion": {

                    "amount": amount,

                    "from_currency": from_currency,

                    "to_currency": to_currency,

                    "converted_amount": round(converted_amount, 2),

                    "exchange_rate": rate_data['rate'],

                    "timestamp": rate_data['timestamp']

                }

            }

            await self._send_json_message(event_queue, json_response)

            

        except Exception as e:

            await self._send_error_message(event_queue, f"Failed to fetch exchange rate: {str(e)}")

    

    async def _handle_currency_history(self, user_input: str, event_queue: EventQueue, context: RequestContext):

        """Handle historical currency data requests"""

        await self._send_status_update(

            event_queue,

            "Fetching historical currency data...",

            "working"

        )

        

        # Simulate historical data fetching

        await asyncio.sleep(2)  # Simulate API call delay

        

        # Mock historical data

        historical_data = {

            "currency_pair": "EUR/USD",

            "period": "30 days",

            "data": [

                {"date": "2024-01-01", "rate": 1.1045},

                {"date": "2024-01-02", "rate": 1.1052},

                {"date": "2024-01-03", "rate": 1.1038},

                # ... more data points

            ],

            "summary": {

                "average_rate": 1.1042,

                "min_rate": 1.0998,

                "max_rate": 1.1087,

                "volatility": 0.0089

            }

        }

        

        response_text = f"Historical data for {historical_data['currency_pair']} over {historical_data['period']}:\n"

        response_text += f"Average rate: {historical_data['summary']['average_rate']:.4f}\n"

        response_text += f"Range: {historical_data['summary']['min_rate']:.4f} - {historical_data['summary']['max_rate']:.4f}\n"

        response_text += f"Volatility: {historical_data['summary']['volatility']:.4f}"

        

        await self._send_text_message(event_queue, response_text)

        await self._send_json_message(event_queue, historical_data)

    

    async def _handle_currency_alerts(self, user_input: str, event_queue: EventQueue, context: RequestContext, user_id: str):

        """Handle currency alert setup"""

        await self._send_status_update(

            event_queue,

            "Setting up currency alert...",

            "working"

        )

        

        # Parse alert request (simplified)

        alert_config = {

            "currency_pair": "EUR/USD",

            "condition": "above",

            "threshold": 1.10,

            "created_at": datetime.now().isoformat()

        }

        

        # Store alert in user session

        self.user_sessions[user_id]['alerts'].append(alert_config)

        

        response_text = f"Alert set up successfully!\n"

        response_text += f"You'll be notified when {alert_config['currency_pair']} goes {alert_config['condition']} {alert_config['threshold']}"

        

        await self._send_text_message(event_queue, response_text)

    

    async def _handle_help_request(self, event_queue: EventQueue):

        """Handle help requests"""

        help_text = """Currency Agent Capabilities:


1. **Currency Conversion**

   - Convert 100 USD to EUR

   - How much is 50 GBP in JPY?

   - Current USD to CAD rate


2. **Historical Data**

   - Show EUR/USD rates for last 30 days

   - Historical GBP/JPY for 2023

   - Currency trend analysis


3. **Rate Alerts**

   - Alert me when EUR/USD goes above 1.10

   - Notify when Bitcoin drops below $30,000


4. **Supported Currencies**

   - Major currencies: USD, EUR, GBP, JPY, CAD, AUD, CHF

   - Cryptocurrencies: BTC, ETH, ADA, DOT

   - 150+ total currencies supported


Just ask me in natural language!"""

        

        await self._send_text_message(event_queue, help_text)

    

    async def _handle_unknown_request(self, user_input: str, event_queue: EventQueue):

        """Handle unknown or unclear requests"""

        response = f"I'm not sure how to help with: '{user_input}'\n\n"

        response += "I can help with currency conversions, historical rates, and rate alerts. "

        response += "Try asking something like 'Convert 100 USD to EUR' or type 'help' for more options."

        

        await self._send_text_message(event_queue, response)

    

    async def _parse_conversion_request(self, user_input: str) -> Optional[Dict[str, Any]]:

        """Parse currency conversion request from natural language"""

        import re

        

        # Simple regex patterns for parsing

        patterns = [

            r'convert\s+(\d+(?:\.\d+)?)\s+(\w{3})\s+to\s+(\w{3})',

            r'(\d+(?:\.\d+)?)\s+(\w{3})\s+to\s+(\w{3})',

            r'how\s+much\s+is\s+(\d+(?:\.\d+)?)\s+(\w{3})\s+in\s+(\w{3})'

        ]

        

        user_input_lower = user_input.lower()

        

        for pattern in patterns:

            match = re.search(pattern, user_input_lower)

            if match:

                return {

                    'amount': float(match.group(1)),

                    'from_currency': match.group(2).upper(),

                    'to_currency': match.group(3).upper()

                }

        

        return None

    

    async def _fetch_exchange_rate(self, from_currency: str, to_currency: str) -> Dict[str, Any]:

        """Fetch real-time exchange rate"""

        # Mock implementation - replace with actual API call

        import random

        

        # Simulate API delay

        await asyncio.sleep(1)

        

        # Mock exchange rates

        mock_rates = {

            ('USD', 'EUR'): 0.85,

            ('EUR', 'USD'): 1.18,

            ('USD', 'GBP'): 0.73,

            ('GBP', 'USD'): 1.37,

            ('USD', 'JPY'): 110.0,

            ('JPY', 'USD'): 0.009

        }

        

        rate = mock_rates.get((from_currency, to_currency))

        if not rate:

            # Calculate inverse if available

            inverse_rate = mock_rates.get((to_currency, from_currency))

            if inverse_rate:

                rate = 1 / inverse_rate

            else:

                rate = random.uniform(0.5, 2.0)  # Fallback random rate

        

        return {

            'rate': rate,

            'timestamp': datetime.now().isoformat()

        }

    

    async def _send_text_message(self, event_queue: EventQueue, text: str):

        """Send a text message to the client"""

        event_queue.enqueue_event(new_agent_text_message(text))

    

    async def _send_json_message(self, event_queue: EventQueue, data: Dict[str, Any]):

        """Send a JSON message to the client"""

        event_queue.enqueue_event(new_agent_json_message(data))

    

    async def _send_status_update(self, event_queue: EventQueue, message: str, status: str):

        """Send a status update for streaming responses"""

        from a2a.types import TaskStatusUpdateEvent

        

        status_event = TaskStatusUpdateEvent(

            status=status,

            message=message,

            timestamp=datetime.now().isoformat()

        )

        event_queue.enqueue_event(status_event)

    

    async def _send_error_message(self, event_queue: EventQueue, error: str):

        """Send an error message"""

        error_text = f"Error: {error}"

        await self._send_text_message(event_queue, error_text)

        await self._send_status_update(event_queue, error_text, "error")



4. Server Implementation & Configuration


Complete A2A Server Setup



# agents/currency_agent/server.py

import asyncio

import logging

import uvicorn

from typing import Dict, Any


from a2a.server.apps import A2AStarletteApplication

from a2a.server.request_handlers import DefaultRequestHandler

from a2a.server.tasks import InMemoryTaskStore

from a2a.server.middleware import LoggingMiddleware, CORSMiddleware


from .agent import create_currency_agent_card

from .executor import CurrencyAgentExecutor


# Configure logging

logging.basicConfig(

    level=logging.INFO,

    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'

)

logger = logging.getLogger(__name__)


class CurrencyAgentServer:

    def __init__(self, host: str = "0.0.0.0", port: int = 9999):

        self.host = host

        self.port = port

        self.agent_card = create_currency_agent_card()

        self.task_store = InMemoryTaskStore()

        self.executor = CurrencyAgentExecutor()

        

        # Create request handler

        self.request_handler = DefaultRequestHandler(

            agent_executor=self.executor,

            task_store=self.task_store

        )

        

        # Create A2A application

        self.app = A2AStarletteApplication(

            agent_card=self.agent_card,

            http_handler=self.request_handler

        )

        

        # Add middleware

        self._setup_middleware()

    

    def _setup_middleware(self):

        """Setup middleware for the server"""

        starlette_app = self.app.build()

        

        # Add CORS middleware

        starlette_app.add_middleware(

            CORSMiddleware,

            allow_origins=["*"],

            allow_credentials=True,

            allow_methods=["*"],

            allow_headers=["*"]

        )

        

        # Add logging middleware

        starlette_app.add_middleware(LoggingMiddleware)

    

    async def start(self):

        """Start the A2A server"""

        logger.info(f"Starting Currency Agent server on {self.host}:{self.port}")

        logger.info(f"Agent Card URL: http://{self.host}:{self.port}/.well-known/agent.json")

        

        config = uvicorn.Config(

            self.app.build(),

            host=self.host,

            port=self.port,

            log_level="info"

        )

        

        server = uvicorn.Server(config)

        await server.serve()

    

    def run(self):

        """Run the server (blocking)"""

        asyncio.run(self.start())


# Health check and metrics endpoints

from starlette.applications import Starlette

from starlette.responses import JSONResponse

from starlette.routing import Route


async def health_check(request):

    """Health check endpoint"""

    return JSONResponse({

        "status": "healthy",

        "agent": "Currency Agent",

        "version": "2.1.0",

        "timestamp": datetime.now().isoformat()

    })


async def metrics(request):

    """Metrics endpoint"""

    # In a real implementation, collect actual metrics

    return JSONResponse({

        "active_tasks": len(executor.active_tasks),

        "total_sessions": len(executor.user_sessions),

        "uptime": "24h 15m",

        "requests_processed": 1247

    })


# Add custom routes

def create_enhanced_server():

    """Create server with additional endpoints"""

    base_app = A2AStarletteApplication(

        agent_card=create_currency_agent_card(),

        http_handler=DefaultRequestHandler(

            agent_executor=CurrencyAgentExecutor(),

            task_store=InMemoryTaskStore()

        )

    ).build()

    

    # Add custom routes

    base_app.router.routes.extend([

        Route("/health", health_check),

        Route("/metrics", metrics)

    ])

    

    return base_app


if __name__ == "__main__":

    server = CurrencyAgentServer()

    server.run()



5. Advanced Client Implementation


Comprehensive A2A Client



# client/advanced_client.py

import asyncio

import json

import logging

from typing import Dict, Any, List, Optional, AsyncGenerator

from datetime import datetime


import httpx

from a2a.client import A2AClient

from a2a.types import (

    MessageSendParams, SendMessageRequest, SendStreamingMessageRequest,

    Message, TextPart, JSONPart, TaskStatusUpdateEvent

)


logger = logging.getLogger(__name__)


class AdvancedA2AClient:

    def __init__(self, agent_url: str):

        self.agent_url = agent_url

        self.client: Optional[A2AClient] = None

        self.session_id = None

        self.conversation_history: List[Dict[str, Any]] = []

    

    async def __aenter__(self):

        self.httpx_client = httpx.AsyncClient(timeout=30.0)

        

        # Initialize A2A client from agent card

        self.client = await A2AClient.get_client_from_agent_card_url(

            self.httpx_client, self.agent_url

        )

        

        logger.info(f"Connected to agent at {self.agent_url}")

        return self

    

    async def __aexit__(self, exc_type, exc_val, exc_tb):

        if self.httpx_client:

            await self.httpx_client.aclose()

    

    async def send_message(self, text: str, stream: bool = True) -> Dict[str, Any]:

        """Send a message to the agent"""

        if not self.client:

            raise RuntimeError("Client not initialized")

        

        # Create message

        message = Message(

            role='user',

            parts=[TextPart(type='text', text=text)],

            messageId=f"msg_{datetime.now().timestamp()}"

        )

        

        # Add to conversation history

        self.conversation_history.append({

            'timestamp': datetime.now().isoformat(),

            'role': 'user',

            'content': text

        })

        

        if stream:

            return await self._send_streaming_message(message)

        else:

            return await self._send_regular_message(message)

    

    async def _send_regular_message(self, message: Message) -> Dict[str, Any]:

        """Send a regular (non-streaming) message"""

        request = SendMessageRequest(

            params=MessageSendParams(message=message)

        )

        

        response = await self.client.send_message(request)

        

        # Process response

        result = {

            'type': 'regular',

            'response': response.result,

            'timestamp': datetime.now().isoformat()

        }

        

        # Add to conversation history

        if hasattr(response.result, 'parts') and response.result.parts:

            content = response.result.parts[0].text if hasattr(response.result.parts[0], 'text') else str(response.result.parts[0])

            self.conversation_history.append({

                'timestamp': datetime.now().isoformat(),

                'role': 'agent',

                'content': content

            })

        

        return result

    

    async def _send_streaming_message(self, message: Message) -> Dict[str, Any]:

        """Send a streaming message"""

        request = SendStreamingMessageRequest(

            params=MessageSendParams(message=message)

        )

        

        stream_response = self.client.send_message_streaming(request)

        

        chunks = []

        final_content = []

        

        async for chunk in stream_response:

            chunks.append(chunk.model_dump(mode='json', exclude_none=True))

            

            # Extract content from chunk

            if hasattr(chunk, 'result') and chunk.result:

                if hasattr(chunk.result, 'parts'):

                    for part in chunk.result.parts:

                        if hasattr(part, 'text'):

                            final_content.append(part.text)

                        elif hasattr(part, 'data'):

                            final_content.append(json.dumps(part.data))

        

        # Add to conversation history

        if final_content:

            self.conversation_history.append({

                'timestamp': datetime.now().isoformat(),

                'role': 'agent',

                'content': ' '.join(final_content)

            })

        

        return {

            'type': 'streaming',

            'chunks': chunks,

            'final_content': final_content,

            'timestamp': datetime.now().isoformat()

        }

    

    async def get_agent_info(self) -> Dict[str, Any]:

        """Get agent card information"""

        if not self.client:

            raise RuntimeError("Client not initialized")

        

        # Fetch agent card directly

        async with httpx.AsyncClient() as client:

            response = await client.get(f"{self.agent_url}/.well-known/agent.json")

            response.raise_for_status()

            return response.json()

    

    async def test_all_capabilities(self) -> Dict[str, Any]:

        """Test all agent capabilities"""

        results = {}

        

        # Get agent info

        agent_info = await self.get_agent_info()

        results['agent_info'] = agent_info

        

        # Test each skill

        skills = agent_info.get('skills', [])

        results['skill_tests'] = {}

        

        for skill in skills:

            skill_id = skill['id']

            examples = skill.get('examples', [])

            

            if examples:

                logger.info(f"Testing skill: {skill_id}")

                test_result = await self.send_message(examples[0], stream=True)

                results['skill_tests'][skill_id] = {

                    'example_used': examples[0],

                    'result': test_result

                }

        

        return results

    

    def get_conversation_history(self) -> List[Dict[str, Any]]:

        """Get the conversation history"""

        return self.conversation_history.copy()

    

    def clear_conversation_history(self):

        """Clear the conversation history"""

        self.conversation_history.clear()


# Interactive client for testing

class InteractiveA2AClient:

    def __init__(self, agent_url: str):

        self.agent_url = agent_url

    

    async def run_interactive_session(self):

        """Run an interactive session with the agent"""

        print(f"Connecting to agent at {self.agent_url}...")

        

        async with AdvancedA2AClient(self.agent_url) as client:

            # Get agent info

            agent_info = await client.get_agent_info()

            print(f"\nConnected to: {agent_info['name']}")

            print(f"Description: {agent_info['description']}")

            print(f"Version: {agent_info['version']}")

            

            # Show available skills

            print("\nAvailable skills:")

            for skill in agent_info.get('skills', []):

                print(f"  - {skill['name']}: {skill['description']}")

                if skill.get('examples'):

                    print(f"    Examples: {', '.join(skill['examples'][:2])}")

            

            print("\nType 'quit' to exit, 'help' for agent help, 'history' to see conversation")

            print("=" * 60)

            

            while True:

                try:

                    user_input = input("\nYou: ").strip()

                    

                    if user_input.lower() == 'quit':

                        break

                    elif user_input.lower() == 'history':

                        history = client.get_conversation_history()

                        print("\nConversation History:")

                        for entry in history[-10:]:  # Show last 10 entries

                            print(f"[{entry['timestamp']}] {entry['role']}: {entry['content'][:100]}...")

                        continue

                    elif not user_input:

                        continue

                    

                    # Send message to agent

                    print("Agent: ", end="", flush=True)

                    result = await client.send_message(user_input, stream=True)

                    

                    if result['type'] == 'streaming':

                        # Print streaming response in real-time

                        for content in result['final_content']:

                            print(content)

                    else:

                        # Print regular response

                        if hasattr(result['response'], 'parts'):

                            for part in result['response'].parts:

                                if hasattr(part, 'text'):

```python

                                    print(part.text)

                

                except KeyboardInterrupt:

                    print("\nExiting...")

                    break

                except Exception as e:

                    print(f"Error: {str(e)}")


# Multi-agent client for orchestration

class MultiAgentClient:

    def __init__(self):

        self.agents: Dict[str, AdvancedA2AClient] = {}

        self.orchestrator_url = None

    

    async def register_agent(self, name: str, url: str):

        """Register an agent with the client"""

        client = AdvancedA2AClient(url)

        await client.__aenter__()

        self.agents[name] = client

        

        # Get agent info for capabilities

        agent_info = await client.get_agent_info()

        print(f"Registered agent '{name}': {agent_info['name']}")

    

    async def route_message(self, message: str, preferred_agent: Optional[str] = None) -> Dict[str, Any]:

        """Route message to appropriate agent"""

        if preferred_agent and preferred_agent in self.agents:

            return await self.agents[preferred_agent].send_message(message)

        

        # Simple routing logic based on keywords

        message_lower = message.lower()

        

        if any(keyword in message_lower for keyword in ['currency', 'exchange', 'convert', 'rate']):

            if 'currency' in self.agents:

                return await self.agents['currency'].send_message(message)

        

        elif any(keyword in message_lower for keyword in ['weather', 'temperature', 'forecast']):

            if 'weather' in self.agents:

                return await self.agents['weather'].send_message(message)

        

        # Default to first available agent

        if self.agents:

            first_agent = next(iter(self.agents.values()))

            return await first_agent.send_message(message)

        

        raise RuntimeError("No agents available")

    

    async def cleanup(self):

        """Clean up all agent connections"""

        for client in self.agents.values():

            await client.__aexit__(None, None, None)


# Usage example

async def multi_agent_example():

    multi_client = MultiAgentClient()

    

    try:

        # Register multiple agents

        await multi_client.register_agent('currency', 'http://localhost:9999')

        await multi_client.register_agent('weather', 'http://localhost:9998')

        

        # Route messages to appropriate agents

        currency_result = await multi_client.route_message("Convert 100 USD to EUR")

        weather_result = await multi_client.route_message("What's the weather like?")

        

        print("Currency result:", currency_result)

        print("Weather result:", weather_result)

        

    finally:

        await multi_client.cleanup()


if __name__ == "__main__":

    # Run interactive session

    client = InteractiveA2AClient("http://localhost:9999")

    asyncio.run(client.run_interactive_session())



6. Agent-to-Agent Communication


Direct Agent Communication



# agents/orchestrator/agent_communication.py

import asyncio

import json

from typing import Dict, Any, List, Optional

from datetime import datetime


from a2a.client import A2AClient

from a2a.types import Message, TextPart, JSONPart


class AgentCommunicationManager:

    def __init__(self):

        self.registered_agents: Dict[str, Dict[str, Any]] = {}

        self.active_connections: Dict[str, A2AClient] = {}

    

    async def register_agent(self, agent_id: str, agent_url: str, capabilities: List[str]):

        """Register an agent for communication"""

        self.registered_agents[agent_id] = {

            'url': agent_url,

            'capabilities': capabilities,

            'last_seen': datetime.now().isoformat(),

            'status': 'active'

        }

        

        # Establish connection

        try:

            import httpx

            async with httpx.AsyncClient() as http_client:

                client = await A2AClient.get_client_from_agent_card_url(

                    http_client, agent_url

                )

                self.active_connections[agent_id] = client

                print(f"Agent {agent_id} registered and connected")

        except Exception as e:

            print(f"Failed to connect to agent {agent_id}: {str(e)}")

            self.registered_agents[agent_id]['status'] = 'unreachable'

    

    async def send_agent_message(self, target_agent_id: str, message: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:

        """Send a message from one agent to another"""

        if target_agent_id not in self.active_connections:

            raise ValueError(f"Agent {target_agent_id} not connected")

        

        client = self.active_connections[target_agent_id]

        

        # Create message with context

        message_parts = [TextPart(type='text', text=message)]

        if context:

            message_parts.append(JSONPart(type='json', data=context))

        

        agent_message = Message(

            role='agent',  # Message from another agent

            parts=message_parts,

            messageId=f"agent_msg_{datetime.now().timestamp()}"

        )

        

        # Send message

        from a2a.types import MessageSendParams, SendMessageRequest

        request = SendMessageRequest(

            params=MessageSendParams(message=agent_message)

        )

        

        response = await client.send_message(request)

        

        return {

            'target_agent': target_agent_id,

            'response': response.result,

            'timestamp': datetime.now().isoformat()

        }

    

    async def broadcast_message(self, message: str, exclude_agents: Optional[List[str]] = None) -> Dict[str, Any]:

        """Broadcast a message to all connected agents"""

        exclude_agents = exclude_agents or []

        results = {}

        

        for agent_id, client in self.active_connections.items():

            if agent_id not in exclude_agents:

                try:

                    result = await self.send_agent_message(agent_id, message)

                    results[agent_id] = result

                except Exception as e:

                    results[agent_id] = {'error': str(e)}

        

        return results

    

    def find_agents_by_capability(self, capability: str) -> List[str]:

        """Find agents that have a specific capability"""

        matching_agents = []

        for agent_id, info in self.registered_agents.items():

            if capability.lower() in [cap.lower() for cap in info['capabilities']]:

                matching_agents.append(agent_id)

        return matching_agents

    

    async def coordinate_workflow(self, workflow_steps: List[Dict[str, Any]]) -> Dict[str, Any]:

        """Coordinate a multi-step workflow across agents"""

        workflow_results = []

        context = {}

        

        for step in workflow_steps:

            step_id = step['id']

            required_capability = step['capability']

            message = step['message']

            

            # Find suitable agent

            suitable_agents = self.find_agents_by_capability(required_capability)

            if not suitable_agents:

                workflow_results.append({

                    'step_id': step_id,

                    'error': f"No agent found with capability: {required_capability}"

                })

                continue

            

            # Use first suitable agent

            target_agent = suitable_agents[0]

            

            try:

                # Include context from previous steps

                result = await self.send_agent_message(

                    target_agent, 

                    message, 

                    context={'workflow_context': context}

                )

                

                workflow_results.append({

                    'step_id': step_id,

                    'agent': target_agent,

                    'result': result,

                    'status': 'completed'

                })

                

                # Update context for next steps

                context[step_id] = result

                

            except Exception as e:

                workflow_results.append({

                    'step_id': step_id,

                    'agent': target_agent,

                    'error': str(e),

                    'status': 'failed'

                })

        

        return {

            'workflow_id': f"workflow_{datetime.now().timestamp()}",

            'steps': workflow_results,

            'completed_at': datetime.now().isoformat()

        }


# Example workflow coordination

async def travel_planning_workflow():

    """Example: Coordinate travel planning across multiple agents"""

    comm_manager = AgentCommunicationManager()

    

    # Register agents

    await comm_manager.register_agent('weather', 'http://localhost:9997', ['weather', 'forecast'])

    await comm_manager.register_agent('currency', 'http://localhost:9999', ['currency', 'exchange'])

    await comm_manager.register_agent('booking', 'http://localhost:9996', ['hotels', 'flights'])

    

    # Define workflow

    workflow = [

        {

            'id': 'weather_check',

            'capability': 'weather',

            'message': 'What will the weather be like in Paris next week?'

        },

        {

            'id': 'currency_conversion',

            'capability': 'currency',

            'message': 'Convert 1000 USD to EUR for travel budget'

        },

        {

            'id': 'hotel_search',

            'capability': 'hotels',

            'message': 'Find hotels in Paris for next week, budget around 800 EUR'

        }

    ]

    

    # Execute workflow

    results = await comm_manager.coordinate_workflow(workflow)

    

    print("Travel Planning Workflow Results:")

    for step in results['steps']:

        print(f"Step {step['step_id']}: {step['status']}")

        if 'result' in step:

            print(f"  Agent: {step['agent']}")

            print(f"  Result: {step['result']}")

        elif 'error' in step:

            print(f"  Error: {step['error']}")



7. Authentication & Security


Authentication Implementation


# shared/auth.py

import jwt

import hashlib

import secrets

from typing import Dict, Any, Optional

from datetime import datetime, timedelta


class A2AAuthManager:

    def __init__(self, secret_key: str):

        self.secret_key = secret_key

        self.api_keys: Dict[str, Dict[str, Any]] = {}

        self.jwt_tokens: Dict[str, Dict[str, Any]] = {}

    

    def generate_api_key(self, agent_id: str, permissions: List[str]) -> str:

        """Generate API key for agent authentication"""

        api_key = secrets.token_urlsafe(32)

        key_hash = hashlib.sha256(api_key.encode()).hexdigest()

        

        self.api_keys[key_hash] = {

            'agent_id': agent_id,

            'permissions': permissions,

            'created_at': datetime.now().isoformat(),

            'last_used': None,

            'active': True

        }

        

        return api_key

    

    def validate_api_key(self, api_key: str) -> Optional[Dict[str, Any]]:

        """Validate API key and return agent info"""

        key_hash = hashlib.sha256(api_key.encode()).hexdigest()

        

        if key_hash in self.api_keys:

            key_info = self.api_keys[key_hash]

            if key_info['active']:

                key_info['last_used'] = datetime.now().isoformat()

                return key_info

        

        return None

    

    def generate_jwt_token(self, agent_id: str, permissions: List[str], expires_in_hours: int = 24) -> str:

        """Generate JWT token for agent authentication"""

        payload = {

            'agent_id': agent_id,

            'permissions': permissions,

            'iat': datetime.utcnow(),

            'exp': datetime.utcnow() + timedelta(hours=expires_in_hours)

        }

        

        token = jwt.encode(payload, self.secret_key, algorithm='HS256')

        

        # Store token info

        self.jwt_tokens[token] = {

            'agent_id': agent_id,

            'created_at': datetime.now().isoformat(),

            'expires_at': (datetime.now() + timedelta(hours=expires_in_hours)).isoformat()

        }

        

        return token

    

    def validate_jwt_token(self, token: str) -> Optional[Dict[str, Any]]:

        """Validate JWT token and return payload"""

        try:

            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])

            return payload

        except jwt.ExpiredSignatureError:

            return None

        except jwt.InvalidTokenError:

            return None


# Authentication middleware

from starlette.middleware.base import BaseHTTPMiddleware

from starlette.requests import Request

from starlette.responses import JSONResponse


class A2AAuthMiddleware(BaseHTTPMiddleware):

    def __init__(self, app, auth_manager: A2AAuthManager):

        super().__init__(app)

        self.auth_manager = auth_manager

    

    async def dispatch(self, request: Request, call_next):

        # Skip auth for public endpoints

        if request.url.path in ['/health', '/.well-known/agent.json']:

            return await call_next(request)

        

        # Check for API key

        api_key = request.headers.get('X-API-Key')

        if api_key:

            agent_info = self.auth_manager.validate_api_key(api_key)

            if agent_info:

                request.state.agent_id = agent_info['agent_id']

                request.state.permissions = agent_info['permissions']

                return await call_next(request)

        

        # Check for JWT token

        auth_header = request.headers.get('Authorization')

        if auth_header and auth_header.startswith('Bearer '):

            token = auth_header[7:]

            payload = self.auth_manager.validate_jwt_token(token)

            if payload:

                request.state.agent_id = payload['agent_id']

                request.state.permissions = payload['permissions']

                return await call_next(request)

        

        # Authentication failed

        return JSONResponse(

            status_code=401,

            content={'error': 'Authentication required'}

        )


# Secure agent server

class SecureA2AServer:

    def __init__(self, agent_card, executor, secret_key: str):

        self.agent_card = agent_card

        self.executor = executor

        self.auth_manager = A2AAuthManager(secret_key)

        

        # Generate API keys for known agents

        self.setup_default_auth()

    

    def setup_default_auth(self):

        """Setup default authentication for known agents"""

        # Generate API key for admin

        admin_key = self.auth_manager.generate_api_key(

            'admin', 

            ['read', 'write', 'admin']

        )

        print(f"Admin API Key: {admin_key}")

        

        # Generate API key for client agents

        client_key = self.auth_manager.generate_api_key(

            'client_agent',

            ['read', 'write']

        )

        print(f"Client API Key: {client_key}")

    

    def create_app(self):

        """Create secure Starlette application"""

        from a2a.server.apps import A2AStarletteApplication

        from a2a.server.request_handlers import DefaultRequestHandler

        from a2a.server.tasks import InMemoryTaskStore

        

        app = A2AStarletteApplication(

            agent_card=self.agent_card,

            http_handler=DefaultRequestHandler(

                agent_executor=self.executor,

                task_store=InMemoryTaskStore()

            )

        ).build()

        

        # Add authentication middleware

        app.add_middleware(A2AAuthMiddleware, auth_manager=self.auth_manager)

        

        return app



8. Testing & Debugging


Comprehensive Testing Suite



# tests/test_a2a_integration.py

import asyncio

import pytest

import json

from typing import Dict, Any


from a2a.client import A2AClient

from agents.currency_agent.server import CurrencyAgentServer

from client.advanced_client import AdvancedA2AClient


class A2ATestSuite:

    def __init__(self, agent_url: str):

        self.agent_url = agent_url

        self.test_results: Dict[str, Any] = {}

    

    async def run_all_tests(self) -> Dict[str, Any]:

        """Run comprehensive test suite"""

        print("Starting A2A Integration Tests...")

        

        # Test agent discovery

        await self.test_agent_discovery()

        

        # Test basic communication

        await self.test_basic_communication()

        

        # Test streaming responses

        await self.test_streaming_responses()

        

        # Test error handling

        await self.test_error_handling()

        

        # Test concurrent requests

        await self.test_concurrent_requests()

        

        # Test skill-specific functionality

        await self.test_skill_functionality()

        

        return self.test_results

    

    async def test_agent_discovery(self):

        """Test agent card discovery"""

        print("Testing agent discovery...")

        

        try:

            import httpx

            async with httpx.AsyncClient() as client:

                response = await client.get(f"{self.agent_url}/.well-known/agent.json")

                response.raise_for_status()

                agent_card = response.json()

                

                # Validate agent card structure

                required_fields = ['name', 'description', 'version', 'skills']

                for field in required_fields:

                    assert field in agent_card, f"Missing required field: {field}"

                

                self.test_results['agent_discovery'] = {

                    'status': 'passed',

                    'agent_name': agent_card['name'],

                    'skills_count': len(agent_card.get('skills', []))

                }

                print("✓ Agent discovery test passed")

        

        except Exception as e:

            self.test_results['agent_discovery'] = {

                'status': 'failed',

                'error': str(e)

            }

            print(f"✗ Agent discovery test failed: {str(e)}")

    

    async def test_basic_communication(self):

        """Test basic message sending"""

        print("Testing basic communication...")

        

        try:

            async with AdvancedA2AClient(self.agent_url) as client:

                response = await client.send_message("help", stream=False)

                

                assert response['type'] == 'regular'

                assert 'response' in response

                

                self.test_results['basic_communication'] = {

                    'status': 'passed',

                    'response_type': response['type']

                }

                print("✓ Basic communication test passed")

        

        except Exception as e:

            self.test_results['basic_communication'] = {

                'status': 'failed',

                'error': str(e)

            }

            print(f"✗ Basic communication test failed: {str(e)}")

    

    async def test_streaming_responses(self):

        """Test streaming message responses"""

        print("Testing streaming responses...")

        

        try:

            async with AdvancedA2AClient(self.agent_url) as client:

                response = await client.send_message("Convert 100 USD to EUR", stream=True)

                

                assert response['type'] == 'streaming'

                assert 'chunks' in response

                assert 'final_content' in response

                

                self.test_results['streaming_responses'] = {

                    'status': 'passed',

                    'chunks_count': len(response['chunks']),

                    'has_final_content': len(response['final_content']) > 0

                }

                print("✓ Streaming responses test passed")

        

        except Exception as e:

            self.test_results['streaming_responses'] = {

                'status': 'failed',

                'error': str(e)

            }

            print(f"✗ Streaming responses test failed: {str(e)}")

    

    async def test_error_handling(self):

        """Test error handling for invalid requests"""

        print("Testing error handling...")

        

        try:

            async with AdvancedA2AClient(self.agent_url) as client:

                # Send invalid request

                response = await client.send_message("invalid_command_xyz_123", stream=False)

                

                # Should handle gracefully without crashing

                self.test_results['error_handling'] = {

                    'status': 'passed',

                    'handled_gracefully': True

                }

                print("✓ Error handling test passed")

        

        except Exception as e:

            self.test_results['error_handling'] = {

                'status': 'failed',

                'error': str(e)

            }

            print(f"✗ Error handling test failed: {str(e)}")

    

    async def test_concurrent_requests(self):

        """Test handling of concurrent requests"""

        print("Testing concurrent requests...")

        

        try:

            async with AdvancedA2AClient(self.agent_url) as client:

                # Send multiple concurrent requests

                tasks = []

                for i in range(5):

                    task = client.send_message(f"Convert {10 * (i+1)} USD to EUR", stream=True)

                    tasks.append(task)

                

                responses = await asyncio.gather(*tasks, return_exceptions=True)

                

                successful_responses = [r for r in responses if not isinstance(r, Exception)]

                

                self.test_results['concurrent_requests'] = {

                    'status': 'passed',

                    'total_requests': len(tasks),

                    'successful_responses': len(successful_responses),

                    'success_rate': len(successful_responses) / len(tasks)

                }

                print(f"✓ Concurrent requests test passed ({len(successful_responses)}/{len(tasks)} successful)")

        

        except Exception as e:

            self.test_results['concurrent_requests'] = {

                'status': 'failed',

                'error': str(e)

            }

            print(f"✗ Concurrent requests test failed: {str(e)}")

    

    async def test_skill_functionality(self):

        """Test specific skill functionality"""

        print("Testing skill functionality...")

        

        skill_tests = {

            'currency_conversion': 'Convert 50 USD to GBP',

            'currency_history': 'Show EUR/USD rates for last week',

            'currency_alerts': 'Alert me when EUR/USD goes above 1.15'

        }

        

        skill_results = {}

        

        try:

            async with AdvancedA2AClient(self.agent_url) as client:

                for skill_name, test_message in skill_tests.items():

                    try:

                        response = await client.send_message(test_message, stream=True)

                        skill_results[skill_name] = {

                            'status': 'passed',

                            'has_response': len(response.get('final_content', [])) > 0

                        }

                        print(f"✓ {skill_name} test passed")

                    

                    except Exception as e:

                        skill_results[skill_name] = {

                            'status': 'failed',

                            'error': str(e)

                        }

                        print(f"✗ {skill_name} test failed: {str(e)}")

                

                self.test_results['skill_functionality'] = skill_results

        

        except Exception as e:

            self.test_results['skill_functionality'] = {

                'status': 'failed',

                'error': str(e)

            }

            print(f"✗ Skill functionality tests failed: {str(e)}")


# Performance testing

class A2APerformanceTest:

    def __init__(self, agent_url: str):

        self.agent_url = agent_url

    

    async def run_performance_tests(self) -> Dict[str, Any]:

        """Run performance benchmarks"""

        results = {}

        

        # Test response time

        results['response_time'] = await self.test_response_time()

        

        # Test throughput

        results['throughput'] = await self.test_throughput()

        

        # Test memory usage

        results['memory_usage'] = await self.test_memory_usage()

        

        return results

    

    async def test_response_time(self) -> Dict[str, Any]:

        """Test average response time"""

        import time

        

        response_times = []

        

        async with AdvancedA2AClient(self.agent_url) as client:

            for _ in range(10):

                start_time = time.time()

                await client.send_message("Convert 100 USD to EUR", stream=False)

                end_time = time.time()

                

                response_times.append(end_time - start_time)

        

        return {

            'average_response_time': sum(response_times) / len(response_times),

            'min_response_time': min(response_times),

            'max_response_time': max(response_times),

            'total_requests': len(response_times)

        }

    

    async def test_throughput(self) -> Dict[str, Any]:

        """Test requests per second"""

        import time

        

        start_time = time.time()

        request_count = 50

        

        async with AdvancedA2AClient(self.agent_url) as client:

            tasks = []

            for i in range(request_count):

                task = client.send_message(f"Convert {i+1} USD to EUR", stream=False)

                tasks.append(task)

            

            await asyncio.gather(*tasks, return_exceptions=True)

        

        end_time = time.time()

        duration = end_time - start_time

        

        return {

            'requests_per_second': request_count / duration,

            'total_requests': request_count,

            'total_duration': duration

        }

    

    async def test_memory_usage(self) -> Dict[str, Any]:

        """Test memory usage during operations"""

        import psutil

        import os

        

        process = psutil.Process(os.getpid())

        initial_memory = process.memory_info().rss / 1024 / 1024  # MB

        

        async with AdvancedA2AClient(self.agent_url) as client:

            # Perform memory-intensive operations

            for _ in range(20):

                await client.send_message("Show historical data for all currencies", stream=True)

        

        final_memory = process.memory_info().rss / 1024 / 1024  # MB

        

        return {

            'initial_memory_mb': initial_memory,

            'final_memory_mb': final_memory,

            'memory_increase_mb': final_memory - initial_memory

        }


# Usage example

async def run_comprehensive_tests():

    """Run all tests"""

    agent_url = "http://localhost:9999"

    

    # Integration tests

    integration_test = A2ATestSuite(agent_url)

    integration_results = await integration_test.run_all_tests()

    

    # Performance tests

    performance_test = A2APerformanceTest(agent_url)

    performance_results = await performance_test.run_performance_tests()

    

    # Generate test report

    test_report = {

        'timestamp': datetime.now().isoformat(),

        'agent_url': agent_url,

        'integration_tests': integration_results,

        'performance_tests': performance_results

    }

    

    # Save report

    with open('test_report.json', 'w') as f:

        json.dump(test_report, f, indent=2)

    

    print("\nTest Report Generated: test_report.json")

    return test_report


if __name__ == "__main__":

    asyncio.run(run_comprehensive_tests())



9. Production Deployment


Docker Configuration


# Dockerfile

FROM python:3.11-slim


WORKDIR /app


# Install system dependencies

RUN apt-get update && apt-get install -y \

    gcc \

    && rm -rf /var/lib/apt/lists/*


# Copy requirements

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt


# Copy application code

COPY . .


# Create non-root user

RUN useradd --create-home --shell /bin/bash app

RUN chown -R app:app /app

USER app


# Expose port

EXPOSE 9999


# Health check

HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \

    CMD curl -f http://localhost:9999/health || exit 1


# Start command

CMD ["python", "-m", "agents.currency_agent.server"]




# docker-compose.yml

version: '3.8'


services:

  currency-agent:

    build: .

    ports:

      - "9999:9999"

    environment:

      - EXCHANGE_API_KEY=${EXCHANGE_API_KEY}

      - LOG_LEVEL=INFO

      - AUTH_SECRET_KEY=${AUTH_SECRET_KEY}

    volumes:

      - ./data:/app/data

      - ./logs:/app/logs

    restart: unless-stopped

    healthcheck:

      test: ["CMD", "curl", "-f", "http://localhost:9999/health"]

      interval: 30s

      timeout: 10s

      retries: 3


  weather-agent:

    build: 

      context: .

      dockerfile: Dockerfile.weather

    ports:

      - "9998:9998"

    environment:

      - WEATHER_API_KEY=${WEATHER_API_KEY}

    restart: unless-stopped


  orchestrator:

    build:

      context: .

      dockerfile: Dockerfile.orchestrator

    ports:

      - "9997:9997"

    depends_on:

      - currency-agent

      - weather-agent

    environment:

      - REGISTERED_AGENTS=currency-agent:9999,weather-agent:9998

    restart: unless-stopped


  nginx:

    image: nginx:alpine

    ports:

      - "80:80"

      - "443:443"

    volumes:

      - ./nginx.conf:/etc/nginx/nginx.conf

      - ./ssl:/etc/nginx/ssl

    depends_on:

      - currency-agent

      - weather-agent

      - orchestrator

    restart: unless-stopped



Production Configuration



# config/production.py

import os

from typing import Dict, Any


class ProductionConfig:

    # Server configuration

    HOST = os.getenv('HOST', '0.0.0.0')

    PORT = int(os.getenv('PORT', 9999))

    WORKERS = int(os.getenv('WORKERS', 4))

    

    # Security

    AUTH_SECRET_KEY = os.getenv('AUTH_SECRET_KEY', 'your-secret-key-here')

    ALLOWED_ORIGINS = os.getenv('ALLOWED_ORIGINS', '*').split(',')

    

    # API Keys

    EXCHANGE_API_KEY = os.getenv('EXCHANGE_API_KEY')

    

    # Database

    DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///./data/agent.db')

    

    # Logging

    LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')

    LOG_FILE = os.getenv('LOG_FILE', './logs/agent.log')

    

    # Rate limiting

    RATE_LIMIT_REQUESTS = int(os.getenv('RATE_LIMIT_REQUESTS', 100))

    RATE_LIMIT_WINDOW = int(os.getenv('RATE_LIMIT_WINDOW', 60))

    

    # Monitoring

    METRICS_ENABLED = os.getenv('METRICS_ENABLED', 'true').lower() == 'true'

    HEALTH_CHECK_INTERVAL = int(os.getenv('HEALTH_CHECK_INTERVAL', 30))

    

    @classmethod

    def validate(cls) -> Dict[str, Any]:

        """Validate configuration"""

        errors = []

        warnings = []

        

        # Required environment variables

        required_vars = ['AUTH_SECRET_KEY', 'EXCHANGE_API_KEY']

        for var in required_vars:

            if not getattr(cls, var):

                errors.append(f"Missing required environment variable: {var}")

        

        # Validate numeric values

        if cls.PORT < 1 or cls.PORT > 65535:

            errors.append(f"Invalid port number: {cls.PORT}")

        

        if cls.WORKERS < 1:

            warnings.append(f"Low worker count: {cls.WORKERS}")

        

        return {

            'errors': errors,

            'warnings': warnings,

            'valid': len(errors) == 0

        }


# Production server with monitoring

class ProductionA2AServer:

    def __init__(self, config: ProductionConfig):

        self.config = config

        self.setup_logging()

        self.setup_monitoring()

    

    def setup_logging(self):

        """Setup production logging"""

        import logging

        from logging.handlers import RotatingFileHandler

        

        # Create logs directory

        os.makedirs(os.path.dirname(self.config.LOG_FILE), exist_ok=True)

        

        # Configure logging

        logging.basicConfig(

            level=getattr(logging, self.config.LOG_LEVEL),

            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

            handlers=[

                RotatingFileHandler(

                    self.config.LOG_FILE,

                    maxBytes=10*1024*1024,  # 10MB

                    backupCount=5

                ),

                logging.StreamHandler()

            ]

        )

    

    def setup_monitoring(self):

        """Setup monitoring and metrics"""

        if self.config.METRICS_ENABLED:

            # Setup Prometheus metrics

            from prometheus_client import Counter, Histogram, Gauge

            

            self.request_count = Counter('a2a_requests_total', 'Total requests')

            self.request_duration = Histogram('a2a_request_duration_seconds', 'Request duration')

            self.active_connections = Gauge('a2a_active_connections', 'Active connections')

    

    def create_app(self):

        """Create production-ready application"""

        from agents.currency_agent.agent import create_currency_agent_card

        from agents.currency_agent.executor import CurrencyAgentExecutor

        from shared.auth import SecureA2AServer

        

        # Validate configuration

        validation = self.config.validate()

        if not validation['valid']:

            raise RuntimeError(f"Configuration errors: {validation['errors']}")

        

        # Create secure server

        secure_server = SecureA2AServer(

            agent_card=create_currency_agent_card(),

            executor=CurrencyAgentExecutor(),

            secret_key=self.config.AUTH_SECRET_KEY

        )

        

        app = secure_server.create_app()

        

        # Add production middleware

        self.add_production_middleware(app)

        

        return app

    

    def add_production_middleware(self, app):

        """Add production middleware"""

        from starlette.middleware.gzip import GZipMiddleware

        from starlette.middleware.trustedhost import TrustedHostMiddleware

        

        # Add compression

        app.add_middleware(GZipMiddleware, minimum_size=1000)

        

        # Add trusted host middleware

        if self.config.ALLOWED_ORIGINS != ['*']:

            app.add_middleware(TrustedHostMiddleware, allowed_hosts=self.config.ALLOWED_ORIGINS)

        

        # Add rate limiting

        from shared.middleware import RateLimitMiddleware

        app.add_middleware(

            RateLimitMiddleware,

            requests_per_minute=self.config.RATE_LIMIT_REQUESTS,

            window_seconds=self.config.RATE_LIMIT_WINDOW

        )

    

    def run(self):

        """Run production server"""

        import uvicorn

        

        uvicorn.run(

            self.create_app(),

            host=self.config.HOST,

            port=self.config.PORT,

            workers=self.config.WORKERS,

            access_log=True,

            server_header=False,

            date_header=False

        )


if __name__ == "__main__":

    config = ProductionConfig()

    server = ProductionA2AServer(config)

    server.run()6

No comments: