Thursday, December 04, 2025

BUILDING MODEL CONTEXT PROTOCOL SERVERS: A DEVELOPER GUIDE



UNDERSTANDING THE MODEL CONTEXT PROTOCOL

The Model Context Protocol, commonly abbreviated as MCP, represents a groundbreaking standardization effort in the realm of artificial intelligence and large language model integration. This protocol was invented and released by Anthropic in November 2024 as an open standard designed to solve a fundamental challenge that has plagued AI application development: the lack of a unified way for AI assistants to interact with external data sources and tools.

Before MCP existed, every AI application developer had to create custom integrations for each tool, database, or service they wanted their AI to access. If you wanted your language model to search the internet, query a database, perform calculations, or interact with business systems, you had to write bespoke code for each integration. This approach was not only time-consuming but also led to fragmented ecosystems where solutions could not be easily shared or reused across different AI platforms.

The purpose of MCP is elegantly simple yet profoundly impactful. It establishes a universal protocol that allows AI models to communicate with external resources in a standardized manner. Think of it as USB for AI applications. Just as USB created a universal standard for connecting peripherals to computers, MCP creates a universal standard for connecting AI models to tools and data sources. When a developer builds an MCP server that provides access to a particular service or dataset, any MCP-compatible AI client can immediately use that server without requiring custom integration work.

The architecture of MCP follows a client-server model built on top of JSON-RPC 2.0. This is crucial to understand because JSON-RPC 2.0 is the actual wire protocol that MCP uses for all communication. Every message exchanged between an MCP client and server is a valid JSON-RPC 2.0 message. An MCP server exposes capabilities called tools, resources, and prompts. Tools are functions that the AI can invoke to perform actions or retrieve information. Resources represent data that the AI can access, such as files or database records. Prompts are pre-defined templates that help structure interactions. On the other side, an MCP client is the component that connects to these servers and allows the language model to discover and use the available capabilities.

What makes MCP particularly powerful is its design philosophy. The protocol uses JSON-RPC 2.0 as its communication layer, which is a well-established, language-agnostic standard for remote procedure calls. This means MCP servers can be written in any programming language, and clients in another language can seamlessly communicate with them. The protocol also includes sophisticated features like progress notifications, cancellation support, and structured error handling. All messages are exchanged over a transport layer, with the most common being stdio (standard input/output), which makes it perfect for local development and subprocess-based architectures.

PREPARING YOUR DEVELOPMENT ENVIRONMENT

Before we dive into building our MCP server, we need to set up a proper development environment. For this tutorial, we will use Python as our primary programming language because of its excellent ecosystem for AI development and its readability, which makes it ideal for learning.

First, ensure you have Python 3.10 or later installed on your system. You can verify this by opening a terminal and running the command to check your Python version. The MCP SDK requires modern Python features, so older versions will not work properly.

Next, we need to install the official MCP SDK for Python. Anthropic provides this SDK to make server development straightforward. Create a new directory for your project and set up a virtual environment to keep your dependencies isolated. This is a best practice that prevents conflicts between different Python projects on your system.

Inside your project directory, create a virtual environment by running the appropriate command for your operating system. On Unix-based systems like macOS and Linux, you would use the venv module. On Windows, the process is similar but with slightly different activation commands. Once your virtual environment is created and activated, you will see your terminal prompt change to indicate that you are working within the virtual environment.

Now install the MCP SDK using pip, Python's package manager. The package you need is called mcp. Additionally, install some supporting libraries that we will use throughout our examples. These include the aiohttp library for making asynchronous HTTP calls, python-dateutil for date manipulation, and potentially openai or anthropic client libraries depending on which language model you plan to use.

Here are the exact commands you need to run:

# Create project directory
mkdir mcp-tutorial
cd mcp-tutorial

# Create virtual environment
python3 -m venv venv

# Activate virtual environment
# On macOS/Linux:
source venv/bin/activate
# On Windows:
# venv\Scripts\activate

# Install required packages
pip install mcp aiohttp python-dateutil transformers torch


CONCEPTUAL FOUNDATION OF MCP SERVERS

Before writing any code, let us establish a solid conceptual understanding of how MCP servers work at the protocol level. An MCP server is essentially a program that runs continuously, waiting for JSON-RPC 2.0 requests from MCP clients. The communication happens over a transport layer, most commonly stdio, where the server reads JSON-RPC messages from standard input and writes responses to standard output.

When a client first connects to a server, it must perform an initialization handshake. The client sends an initialize request as a JSON-RPC message with the method name "initialize". This request includes information about the client's capabilities and the protocol version it supports. The server responds with its own capabilities and version information. After the server responds, the client sends an initialized notification to confirm the handshake is complete. This three-way handshake ensures both sides understand each other's capabilities before any actual work begins.

Once initialized, the client can query the server to discover what capabilities are available. For tools, the client sends a tools/list request. The server responds with a list of all available tools, each described with a name, description, and an input schema that defines what parameters the tool accepts. This schema follows JSON Schema format, which is a standard way to describe the structure of JSON data.

When the language model determines that it needs to use one of these tools to answer a user's question, the client sends a tools/call request to the server. This request includes the tool name and the arguments as a JSON object. The server executes the requested operation and returns the results in a JSON-RPC response. The client then incorporates that information into the language model's context, allowing it to formulate a response to the user.

The beauty of this architecture is its flexibility and extensibility. You can create an MCP server that provides a single specialized tool, or you can build a comprehensive server that offers an entire suite of related capabilities. The server can be as simple as wrapping a few API calls or as complex as managing connections to multiple databases and external services. The key is that everything communicates through standardized JSON-RPC 2.0 messages.

BUILDING YOUR FIRST SIMPLE MCP SERVER

Let us start with the simplest possible MCP server to understand the basic structure. This server will provide a single tool: a calculator that can perform basic arithmetic operations. While simple, this example will demonstrate all the essential components of an MCP server using the proper MCP Python SDK.

We begin by importing the necessary modules from the MCP SDK. The SDK provides several key classes and decorators that form the foundation of every server. The Server class from mcp.server.fastmcp is the main server instance that we will use. This FastMCP class is a higher-level abstraction that handles all the JSON-RPC protocol details for us. We also need asyncio for asynchronous operations and logging for debugging.

from mcp.server.fastmcp import FastMCP
import logging
import asyncio

# Configure logging to help with debugging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

The first thing we do is create a server instance using FastMCP. This object will manage all the tools we register and handle the JSON-RPC communication with clients automatically. We give our server a descriptive name. The name helps identify the server when multiple servers are available.

# Create the MCP server instance
mcp = FastMCP("simple-calculator")

Now we define our calculator tool. In MCP with FastMCP, tools are simply Python functions that are decorated with the tool decorator. The decorator handles all the complexity of registering the function with the server, generating the JSON Schema for the tool's parameters, and managing the JSON-RPC communication. Our function just needs to focus on the actual logic.

@mcp.tool()
def calculate(operation: str, a: float, b: float) -> str:
    """
    Perform basic arithmetic operations.
    
    This tool accepts two numbers and an operation type, then returns
    the calculated result. It supports addition, subtraction, 
    multiplication, and division.
    
    Args:
        operation: The operation to perform (add, subtract, multiply, divide)
        a: The first number
        b: The second number
        
    Returns:
        A string containing the calculation result
    """
    logger.info(f"Calculating: {a} {operation} {b}")
    
    if operation == "add":
        result = a + b
    elif operation == "subtract":
        result = a - b
    elif operation == "multiply":
        result = a * b
    elif operation == "divide":
        if b == 0:
            return "Error: Division by zero is not allowed"
        result = a / b
    else:
        return f"Error: Unknown operation '{operation}'. Supported operations are: add, subtract, multiply, divide"
    
    return f"The result of {a} {operation} {b} is {result}"

Notice several important aspects of this function. First, the function uses type hints for all its parameters. These type hints are critical because the MCP SDK uses them to automatically generate the JSON Schema that describes the tool to clients. When a language model queries the server using the tools/list JSON-RPC method, it receives a detailed schema showing that this tool requires three parameters: an operation string and two floating-point numbers.

Second, the function includes a comprehensive docstring. This docstring becomes part of the tool's description that the language model sees in the tools/list response. A well-written description helps the model understand when and how to use the tool. The description should explain what the tool does, what parameters it accepts, and what kind of result it returns.

The function body itself is straightforward. It examines the operation parameter and performs the corresponding calculation. Notice how we handle potential errors, such as division by zero or invalid operation names. Robust error handling is crucial in MCP tools because the language model relies on clear error messages to understand what went wrong and potentially retry with corrected parameters.

Finally, we need to set up the server to actually run and accept connections. The FastMCP framework provides a simple run method that handles all the stdio transport and JSON-RPC protocol details.

if __name__ == "__main__":
    # Run the server using stdio transport
    mcp.run(transport="stdio")

This single line does a tremendous amount of work behind the scenes. It sets up the stdio transport, which means the server will read JSON-RPC messages from standard input and write responses to standard output. It handles the initialize handshake automatically. It processes tools/list requests by examining all the decorated functions and generating proper JSON Schema descriptions. It routes tools/call requests to the appropriate function and wraps the results in proper JSON-RPC response messages.

This simple calculator server demonstrates the core pattern you will use for all MCP servers built with FastMCP. You create a server instance, register tools using the tool decorator, and then run the server with an appropriate transport mechanism. The FastMCP framework handles all the JSON-RPC 2.0 protocol details, message parsing, error handling, and response formatting.

EXPANDING TO MULTIPLE TOOLS

Now that we understand the basic structure, let us build a more comprehensive MCP server that provides multiple related tools. We will create a utility server that offers several practical capabilities: internet search, grammar checking, date-time calculations, and text analysis. This server demonstrates how to organize multiple tools within a single server and how to handle more complex tool logic while maintaining proper MCP protocol compliance.

Let us start with the internet search tool. This tool will accept a search query and return relevant results from the web. To implement this, we will use a search API. For this example, we will use the DuckDuckGo search API because it does not require authentication, making it easy to set up.

from mcp.server.fastmcp import FastMCP
import aiohttp
import logging
from typing import Optional
from datetime import datetime, timedelta
from dateutil import parser as date_parser
from dateutil.relativedelta import relativedelta

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create the comprehensive utilities server
mcp = FastMCP("comprehensive-utilities")

@mcp.tool()
async def search_web(query: str, max_results: int = 5) -> str:
    """
    Search the web for information.
    
    Uses DuckDuckGo's API to search for information on the internet.
    Returns relevant results with summaries and URLs.
    
    Args:
        query: Search query string
        max_results: Maximum number of results to return (default: 5)
        
    Returns:
        Formatted search results with summaries and links
    """
    logger.info(f"Searching web for: {query}")
    
    try:
        url = "https://api.duckduckgo.com/"
        params = {
            "q": query,
            "format": "json",
            "no_html": 1,
            "skip_disambig": 1
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.get(url, params=params, timeout=10) as response:
                if response.status != 200:
                    return f"Error: Search failed with status {response.status}"
                
                data = await response.json()
                results = []
                
                if data.get("Abstract"):
                    results.append(f"Summary: {data['Abstract']}")
                    if data.get("AbstractURL"):
                        results.append(f"Source: {data['AbstractURL']}\n")
                
                if data.get("RelatedTopics"):
                    results.append("Related Information:")
                    count = 0
                    for topic in data["RelatedTopics"]:
                        if count >= max_results:
                            break
                        if isinstance(topic, dict) and "Text" in topic:
                            results.append(f"\n{count + 1}. {topic['Text']}")
                            if topic.get("FirstURL"):
                                results.append(f"   URL: {topic['FirstURL']}")
                            count += 1
                
                if not results:
                    return f"No results found for: {query}"
                
                return "\n".join(results)
                
    except Exception as e:
        logger.error(f"Search error: {str(e)}")
        return f"Error performing search: {str(e)}"

This search tool demonstrates several important patterns. First, it is defined as async because it performs network I/O operations. The FastMCP framework fully supports asynchronous tools and will await them properly when handling tools/call requests. Second, the tool includes comprehensive error handling. Network requests can fail for many reasons, and by catching exceptions and returning descriptive error messages, we ensure that the language model receives useful feedback. Third, the tool processes and formats the API response into a human-readable string that the language model can easily incorporate into its response.

Now let us add a grammar checking tool. This tool will analyze text and identify potential grammar, spelling, and style issues using the LanguageTool API.

@mcp.tool()
async def check_grammar(text: str, language: str = "en-US") -> str:
    """
    Check text for grammar and style issues.
    
    Analyzes text using LanguageTool API to identify grammar,
    spelling, and style problems with suggestions for improvement.
    
    Args:
        text: Text to analyze
        language: Language code (default: en-US)
        
    Returns:
        Formatted report of issues found with suggestions
    """
    logger.info(f"Checking grammar for {len(text)} characters")
    
    try:
        url = "https://api.languagetool.org/v2/check"
        data = {
            "text": text,
            "language": language
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(url, data=data, timeout=15) as response:
                if response.status != 200:
                    return f"Error: Grammar check failed with status {response.status}"
                
                result = await response.json()
                matches = result.get("matches", [])
                
                if not matches:
                    return "No grammar or style issues detected!"
                
                issues = [f"Found {len(matches)} issue(s):\n"]
                
                for i, match in enumerate(matches[:10], 1):
                    category = match.get("rule", {}).get("category", {}).get("name", "Unknown")
                    message = match.get("message", "No description")
                    context = match.get("context", {})
                    
                    issues.append(f"\n{i}. {category}")
                    issues.append(f"   Issue: {message}")
                    
                    offset = context.get("offset", 0)
                    length = context.get("length", 0)
                    context_text = context.get("text", "")
                    
                    if context_text and offset >= 0 and length > 0:
                        problem = context_text[offset:offset+length]
                        issues.append(f"   Text: '{problem}'")
                    
                    replacements = match.get("replacements", [])
                    if replacements:
                        suggestions = [r.get("value") for r in replacements[:3]]
                        issues.append(f"   Suggestions: {', '.join(suggestions)}")
                
                if len(matches) > 10:
                    issues.append(f"\n... and {len(matches) - 10} more issues")
                
                return "\n".join(issues)
                
    except Exception as e:
        logger.error(f"Grammar check error: {str(e)}")
        return f"Error checking grammar: {str(e)}"

The grammar checking tool follows the same async pattern and error handling approach. It makes an HTTP POST request to the LanguageTool API, processes the response, and formats the results in a clear, structured way that helps the language model understand the issues and communicate them to the user.

Next, let us add a date and time calculation tool. This tool will handle various date-related queries, such as calculating the difference between dates, adding or subtracting time periods, or determining what day of the week a particular date falls on.

@mcp.tool()
async def calculate_date(
    operation: str,
    date1: str,
    date2: Optional[str] = None,
    amount: Optional[int] = None,
    unit: Optional[str] = None
) -> str:
    """
    Perform date and time calculations.
    
    Supports various date operations including calculating differences,
    adding or subtracting time periods, and determining day of week.
    
    Operations:
    - difference: Calculate time between two dates (requires date1 and date2)
    - add: Add a time period to a date (requires date1, amount, and unit)
    - subtract: Subtract a time period from a date (requires date1, amount, and unit)
    - day_of_week: Determine what day of the week a date is (requires date1)
    
    Args:
        operation: Operation type (difference, add, subtract, day_of_week)
        date1: First date (or only date for some operations)
        date2: Second date (for difference operation)
        amount: Number of units to add/subtract
        unit: Time unit (days, weeks, months, years)
        
    Returns:
        Result of the date calculation
    """
    logger.info(f"Date calculation: {operation}")
    
    try:
        dt1 = date_parser.parse(date1)
        
        if operation == "difference":
            if not date2:
                return "Error: 'difference' requires two dates"
            
            dt2 = date_parser.parse(date2)
            delta = abs((dt2 - dt1).days)
            
            years = delta // 365
            remaining = delta % 365
            months = remaining // 30
            days = remaining % 30
            
            parts = []
            if years > 0:
                parts.append(f"{years} year(s)")
            if months > 0:
                parts.append(f"{months} month(s)")
            if days > 0 or not parts:
                parts.append(f"{days} day(s)")
            
            return f"Difference between {date1} and {date2}: {', '.join(parts)} ({delta} total days)"
        
        elif operation in ["add", "subtract"]:
            if amount is None or unit is None:
                return f"Error: '{operation}' requires 'amount' and 'unit' parameters"
            
            multiplier = 1 if operation == "add" else -1
            
            if unit == "days":
                result = dt1 + timedelta(days=amount * multiplier)
            elif unit == "weeks":
                result = dt1 + timedelta(weeks=amount * multiplier)
            elif unit == "months":
                result = dt1 + relativedelta(months=amount * multiplier)
            elif unit == "years":
                result = dt1 + relativedelta(years=amount * multiplier)
            else:
                return f"Error: Unknown unit '{unit}'. Supported: days, weeks, months, years"
            
            action = "Adding" if operation == "add" else "Subtracting"
            return f"{action} {amount} {unit} to/from {date1} gives: {result.strftime('%Y-%m-%d %A')}"
        
        elif operation == "day_of_week":
            day_name = dt1.strftime('%A')
            date_formatted = dt1.strftime('%B %d, %Y')
            return f"{date_formatted} is a {day_name}"
        
        else:
            return f"Error: Unknown operation '{operation}'. Supported: difference, add, subtract, day_of_week"
            
    except Exception as e:
        logger.error(f"Date calculation error: {str(e)}")
        return f"Error in date calculation: {str(e)}"

The date-time tool showcases how to handle tools with complex parameter combinations. Different operations require different sets of parameters. The tool validates that the required parameters are present for each operation and returns clear error messages if they are missing. This validation is important because it helps the language model understand how to use the tool correctly.

Finally, let us add a text analysis tool that provides statistics about text.

@mcp.tool()
async def analyze_text(text: str) -> str:
    """
    Analyze text and provide detailed statistics.
    
    Provides comprehensive statistics about the text including word count,
    character count, sentence count, paragraph count, and readability metrics.
    
    Args:
        text: Text to analyze
        
    Returns:
        Formatted analysis report with various text metrics
    """
    logger.info(f"Analyzing text of length {len(text)}")
    
    try:
        # Basic counts
        char_count = len(text)
        char_no_spaces = len(text.replace(" ", ""))
        word_count = len(text.split())
        
        # Sentence count (simple approximation)
        sentence_endings = text.count('.') + text.count('!') + text.count('?')
        sentence_count = max(1, sentence_endings)
        
        # Paragraph count
        paragraph_count = len([p for p in text.split('\n\n') if p.strip()])
        
        # Average metrics
        avg_word_length = char_no_spaces / word_count if word_count > 0 else 0
        avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0
        
        # Build report
        report = [
            "Text Analysis Report:",
            f"\nBasic Statistics:",
            f"  Characters (with spaces): {char_count}",
            f"  Characters (no spaces): {char_no_spaces}",
            f"  Words: {word_count}",
            f"  Sentences: {sentence_count}",
            f"  Paragraphs: {paragraph_count}",
            f"\nAverages:",
            f"  Average word length: {avg_word_length:.1f} characters",
            f"  Average sentence length: {avg_sentence_length:.1f} words",
        ]
        
        # Reading time estimate (average reading speed: 200 words/minute)
        reading_time_minutes = word_count / 200
        if reading_time_minutes < 1:
            reading_time = f"{int(reading_time_minutes * 60)} seconds"
        else:
            reading_time = f"{reading_time_minutes:.1f} minutes"
        
        report.append(f"\nEstimated reading time: {reading_time}")
        
        return "\n".join(report)
        
    except Exception as e:
        logger.error(f"Text analysis error: {str(e)}")
        return f"Error analyzing text: {str(e)}"

if __name__ == "__main__":
    mcp.run(transport="stdio")

This comprehensive utilities server now provides five different tools, all accessible through the standard MCP protocol. When a client connects and sends a tools/list request, it will receive JSON Schema descriptions for all five tools. The FastMCP framework handles all the protocol details, allowing us to focus on implementing the actual tool logic.

IMPLEMENTING AN MCP CLIENT

Now that we have built MCP servers, we need a client that can connect to these servers and allow a language model to use their tools. The client is responsible for managing connections to one or more MCP servers, discovering available tools through the tools/list JSON-RPC method, and facilitating communication between the language model and the servers through tools/call requests.

We will build a client that uses a local language model through the Hugging Face transformers library. This client will support GPU acceleration on both NVIDIA CUDA and Apple Metal Performance Shaders. The client will connect to our MCP servers, allow the model to discover and invoke tools, and handle the complete request-response cycle while properly implementing the MCP protocol.

import asyncio
import json
import logging
from typing import List, Dict, Any, Optional
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from mcp import ClientSession
from mcp.client.stdio import stdio_client, StdioServerParameters
import sys

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class MCPClient:
    """
    A client for interacting with MCP servers using a local language model.
    
    This client properly implements the MCP protocol, managing connections
    to MCP servers, discovering available tools via JSON-RPC, and orchestrating
    interactions between the language model and the tools.
    """
    
    def __init__(self, model_name: str = "microsoft/Phi-3-mini-4k-instruct"):
        """
        Initialize the MCP client with a language model.
        
        Args:
            model_name: HuggingFace model identifier
        """
        self.model_name = model_name
        self.sessions: Dict[str, ClientSession] = {}
        self.tools: Dict[str, Dict] = {}
        
        # Determine the best available device
        if torch.cuda.is_available():
            self.device = "cuda"
            logger.info("Using NVIDIA CUDA for acceleration")
        elif torch.backends.mps.is_available():
            self.device = "mps"
            logger.info("Using Apple Metal Performance Shaders for acceleration")
        else:
            self.device = "cpu"
            logger.info("Using CPU (no GPU acceleration available)")
        
        # Load the model and tokenizer
        logger.info(f"Loading model {model_name}...")
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_name,
            trust_remote_code=True
        )
        
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16 if self.device != "cpu" else torch.float32,
            device_map="auto" if self.device == "cuda" else None,
            trust_remote_code=True
        )
        
        if self.device == "mps":
            self.model = self.model.to(self.device)
        
        # Create a text generation pipeline
        self.generator = pipeline(
            "text-generation",
            model=self.model,
            tokenizer=self.tokenizer,
            device=0 if self.device == "cuda" else -1,
            max_new_tokens=1024,
            temperature=0.7,
            do_sample=True,
            pad_token_id=self.tokenizer.eos_token_id
        )
        
        logger.info("Model loaded successfully")
    
    async def connect_to_server(
        self,
        server_name: str,
        command: str,
        args: List[str] = None
    ):
        """
        Connect to an MCP server using stdio transport.
        
        This method properly implements the MCP initialization handshake:
        1. Launches the server as a subprocess
        2. Sends an 'initialize' request
        3. Receives the server's capabilities
        4. Sends an 'initialized' notification
        
        Args:
            server_name: Identifier for this server connection
            command: Command to launch the server
            args: Command-line arguments for the server
        """
        if args is None:
            args = []
        
        logger.info(f"Connecting to MCP server: {server_name}")
        
        try:
            # Create server parameters for stdio transport
            server_params = StdioServerParameters(
                command=command,
                args=args,
                env=None
            )
            
            # Establish stdio connection
            stdio_transport = await stdio_client(server_params)
            read, write = stdio_transport
            
            # Create a client session
            # This handles the initialize/initialized handshake automatically
            session = ClientSession(read, write)
            await session.initialize()
            
            self.sessions[server_name] = session
            
            # Discover tools from this server using tools/list
            tools_result = await session.list_tools()
            
            for tool in tools_result.tools:
                tool_key = f"{server_name}::{tool.name}"
                self.tools[tool_key] = {
                    "server": server_name,
                    "name": tool.name,
                    "description": tool.description,
                    "schema": tool.inputSchema
                }
            
            logger.info(f"Connected to {server_name}, discovered {len(tools_result.tools)} tools")
            
        except Exception as e:
            logger.error(f"Error connecting to server: {str(e)}")
            raise
    
    def format_tools_for_prompt(self) -> str:
        """
        Format available tools as a string for inclusion in the LLM prompt.
        
        Returns:
            Formatted string describing all available tools with their schemas
        """
        if not self.tools:
            return "No tools available."
        
        tool_descriptions = ["Available tools:\n"]
        
        for tool_key, tool_info in self.tools.items():
            tool_descriptions.append(f"\nTool: {tool_info['name']}")
            tool_descriptions.append(f"Description: {tool_info['description']}")
            
            # Add parameter information from JSON Schema
            schema = tool_info['schema']
            if 'properties' in schema:
                tool_descriptions.append("Parameters:")
                for param_name, param_info in schema['properties'].items():
                    param_type = param_info.get('type', 'unknown')
                    param_desc = param_info.get('description', 'No description')
                    required = param_name in schema.get('required', [])
                    req_marker = " (REQUIRED)" if required else " (optional)"
                    tool_descriptions.append(f"  - {param_name} ({param_type}){req_marker}: {param_desc}")
        
        tool_descriptions.append("\nTo use a tool, respond with:")
        tool_descriptions.append("TOOL_CALL: tool_name")
        tool_descriptions.append("ARGUMENTS: {\"param1\": value1, \"param2\": value2}")
        
        return "\n".join(tool_descriptions)
    
    async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
        """
        Call a tool on the appropriate MCP server using tools/call.
        
        This sends a proper JSON-RPC tools/call request to the server.
        
        Args:
            tool_name: Name of the tool to call
            arguments: Dictionary of arguments for the tool
            
        Returns:
            The tool's response as a string
        """
        # Find the tool
        matching_tools = [k for k in self.tools.keys() if k.endswith(f"::{tool_name}")]
        
        if not matching_tools:
            return f"Error: Tool '{tool_name}' not found"
        
        tool_key = matching_tools[0]
        tool_info = self.tools[tool_key]
        server_name = tool_info['server']
        
        session = self.sessions.get(server_name)
        if not session:
            return f"Error: Not connected to server for tool '{tool_name}'"
        
        logger.info(f"Calling tool {tool_name} with arguments: {arguments}")
        
        try:
            # Send tools/call request via the session
            result = await session.call_tool(tool_name, arguments)
            
            # Extract the text content from the result
            if hasattr(result, 'content') and result.content:
                # MCP tool results contain content items
                content_items = []
                for item in result.content:
                    if hasattr(item, 'text'):
                        content_items.append(item.text)
                return "\n".join(content_items)
            else:
                return str(result)
                
        except Exception as e:
            logger.error(f"Error calling tool: {str(e)}")
            return f"Error calling tool: {str(e)}"
    
    def parse_tool_call(self, response: str) -> Optional[Dict[str, Any]]:
        """
        Parse a tool call from the model's response.
        
        This looks for a specific format in the response that indicates
        the model wants to call a tool.
        
        Args:
            response: The model's text response
            
        Returns:
            Dictionary with tool name and arguments, or None if no tool call found
        """
        lines = response.split('\n')
        tool_name = None
        arguments = None
        
        for line in lines:
            if line.startswith("TOOL_CALL:"):
                tool_name = line.replace("TOOL_CALL:", "").strip()
            elif line.startswith("ARGUMENTS:"):
                args_str = line.replace("ARGUMENTS:", "").strip()
                try:
                    arguments = json.loads(args_str)
                except json.JSONDecodeError:
                    logger.error(f"Failed to parse arguments: {args_str}")
                    return None
        
        if tool_name and arguments is not None:
            return {"tool": tool_name, "arguments": arguments}
        
        return None
    
    async def chat(self, user_message: str, max_iterations: int = 5) -> str:
        """
        Process a user message, potentially using tools to answer.
        
        This method implements an agentic loop where the model can decide
        to use tools, see their results, and continue reasoning.
        
        Args:
            user_message: The user's question or request
            max_iterations: Maximum number of tool-calling iterations
            
        Returns:
            The final response to the user
        """
        conversation_history = []
        tools_description = self.format_tools_for_prompt()
        
        # Initial system message
        system_message = f"""You are a helpful AI assistant with access to various tools.
When you need to use a tool, respond with:
TOOL_CALL: tool_name
ARGUMENTS: {{"param1": "value1", "param2": "value2"}}

After seeing tool results, continue your reasoning and provide a final answer.

{tools_description}

Remember to use tools when needed to provide accurate, up-to-date information."""
        
        conversation_history.append(f"System: {system_message}")
        conversation_history.append(f"User: {user_message}")
        
        for iteration in range(max_iterations):
            # Generate response from model
            prompt = "\n\n".join(conversation_history) + "\n\nAssistant:"
            
            try:
                response = self.generator(
                    prompt,
                    max_new_tokens=512,
                    num_return_sequences=1,
                    pad_token_id=self.tokenizer.eos_token_id
                )[0]['generated_text']
                
                # Extract just the new part of the response
                assistant_response = response[len(prompt):].strip()
                conversation_history.append(f"Assistant: {assistant_response}")
                
                # Check if the model wants to call a tool
                tool_call = self.parse_tool_call(assistant_response)
                
                if tool_call:
                    # Execute the tool via MCP tools/call
                    tool_result = await self.call_tool(
                        tool_call['tool'],
                        tool_call['arguments']
                    )
                    
                    conversation_history.append(f"Tool Result: {tool_result}")
                    
                    # Continue the loop to let the model process the result
                    continue
                else:
                    # No tool call, this is the final response
                    return assistant_response
                    
            except Exception as e:
                logger.error(f"Error during chat: {str(e)}")
                return f"I encountered an error: {str(e)}"
        
        # If we hit max iterations, return the last response
        return assistant_response
    
    async def close(self):
        """Close all MCP server connections properly."""
        for name, session in self.sessions.items():
            try:
                await session.close()
                logger.info(f"Closed connection to {name}")
            except Exception as e:
                logger.error(f"Error closing {name}: {str(e)}")

This MCP client implementation properly follows the MCP protocol. It uses the official MCP Python SDK's ClientSession class, which handles all the JSON-RPC 2.0 message formatting and protocol details. When connecting to a server, the session automatically performs the initialize handshake. When discovering tools, it uses the session's list_tools method, which sends a proper tools/list JSON-RPC request. When calling tools, it uses the session's call_tool method, which sends a proper tools/call JSON-RPC request and parses the response.

The client manages multiple server connections simultaneously, each providing different sets of tools. When the user asks a question, the client constructs a prompt that includes descriptions of all available tools, allowing the model to understand what capabilities it has access to. The chat method implements an agentic loop where the model can iteratively use tools to gather information before formulating its final answer.

COMPLETE PRODUCTION-READY IMPLEMENTATION

Now let us bring everything together into a complete, production-ready example that demonstrates the full MCP protocol implementation with actual running servers and clients. This example includes three separate Python files that work together to create a fully functional MCP system.

File 1: mcp_server.py - The MCP Server

#!/usr/bin/env python3
"""
MCP Server Implementation
A production-ready MCP server providing multiple utility tools
"""

import asyncio
import logging
from typing import Optional
from datetime import datetime, timedelta
from mcp.server.fastmcp import FastMCP
import aiohttp
from dateutil import parser as date_parser
from dateutil.relativedelta import relativedelta

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),  # Console output
        logging.FileHandler('mcp_server.log')  # File output
    ]
)
logger = logging.getLogger(__name__)

# Create the MCP server
mcp = FastMCP("comprehensive-utilities-server")


@mcp.tool()
def calculate(operation: str, a: float, b: float) -> str:
    """
    Perform basic arithmetic operations.
    
    Supports addition, subtraction, multiplication, and division
    of two numbers with proper error handling.
    
    Args:
        operation: The operation to perform (add, subtract, multiply, divide)
        a: First number
        b: Second number
        
    Returns:
        String containing the calculation result or error message
    """
    logger.info(f"Tool called: calculate({operation}, {a}, {b})")
    
    operations = {
        "add": lambda x, y: x + y,
        "subtract": lambda x, y: x - y,
        "multiply": lambda x, y: x * y,
        "divide": lambda x, y: x / y if y != 0 else None
    }
    
    if operation not in operations:
        error_msg = f"Error: Unknown operation '{operation}'. Supported: {', '.join(operations.keys())}"
        logger.warning(error_msg)
        return error_msg
    
    if operation == "divide" and b == 0:
        error_msg = "Error: Division by zero is not allowed"
        logger.warning(error_msg)
        return error_msg
    
    result = operations[operation](a, b)
    result_msg = f"The result of {a} {operation} {b} is {result}"
    logger.info(f"Calculation result: {result}")
    return result_msg


@mcp.tool()
async def search_web(query: str, max_results: int = 5) -> str:
    """
    Search the web for information using DuckDuckGo.
    
    Performs a web search and returns relevant results with summaries
    and URLs. Useful for finding current information or facts.
    
    Args:
        query: Search query string
        max_results: Maximum number of results to return (default: 5)
        
    Returns:
        Formatted search results with summaries and links
    """
    logger.info(f"Tool called: search_web(query='{query}', max_results={max_results})")
    
    try:
        url = "https://api.duckduckgo.com/"
        params = {
            "q": query,
            "format": "json",
            "no_html": 1,
            "skip_disambig": 1
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.get(url, params=params, timeout=10) as response:
                if response.status != 200:
                    error_msg = f"Error: Search failed with status {response.status}"
                    logger.error(error_msg)
                    return error_msg
                
                data = await response.json()
                results = []
                
                if data.get("Abstract"):
                    results.append(f"Summary: {data['Abstract']}")
                    if data.get("AbstractURL"):
                        results.append(f"Source: {data['AbstractURL']}\n")
                
                if data.get("RelatedTopics"):
                    results.append("Related Information:")
                    count = 0
                    for topic in data["RelatedTopics"]:
                        if count >= max_results:
                            break
                        if isinstance(topic, dict) and "Text" in topic:
                            results.append(f"\n{count + 1}. {topic['Text']}")
                            if topic.get("FirstURL"):
                                results.append(f"   URL: {topic['FirstURL']}")
                            count += 1
                
                if not results:
                    no_results_msg = f"No results found for: {query}"
                    logger.info(no_results_msg)
                    return no_results_msg
                
                result_text = "\n".join(results)
                logger.info(f"Search completed successfully, {len(results)} result items")
                return result_text
                
    except asyncio.TimeoutError:
        error_msg = "Error: Search request timed out"
        logger.error(error_msg)
        return error_msg
    except Exception as e:
        error_msg = f"Error performing search: {str(e)}"
        logger.error(error_msg)
        return error_msg


@mcp.tool()
async def check_grammar(text: str, language: str = "en-US") -> str:
    """
    Check text for grammar and style issues using LanguageTool.
    
    Analyzes text to identify grammar, spelling, and style problems
    with suggestions for improvement.
    
    Args:
        text: Text to analyze
        language: Language code (default: en-US)
        
    Returns:
        Formatted report of issues found with suggestions
    """
    logger.info(f"Tool called: check_grammar(text_length={len(text)}, language={language})")
    
    try:
        url = "https://api.languagetool.org/v2/check"
        data = {
            "text": text,
            "language": language
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(url, data=data, timeout=15) as response:
                if response.status != 200:
                    error_msg = f"Error: Grammar check failed with status {response.status}"
                    logger.error(error_msg)
                    return error_msg
                
                result = await response.json()
                matches = result.get("matches", [])
                
                if not matches:
                    success_msg = "No grammar or style issues detected!"
                    logger.info(success_msg)
                    return success_msg
                
                issues = [f"Found {len(matches)} issue(s):\n"]
                
                for i, match in enumerate(matches[:10], 1):
                    category = match.get("rule", {}).get("category", {}).get("name", "Unknown")
                    message = match.get("message", "No description")
                    context = match.get("context", {})
                    
                    issues.append(f"\n{i}. {category}")
                    issues.append(f"   Issue: {message}")
                    
                    offset = context.get("offset", 0)
                    length = context.get("length", 0)
                    context_text = context.get("text", "")
                    
                    if context_text and offset >= 0 and length > 0:
                        problem = context_text[offset:offset+length]
                        issues.append(f"   Text: '{problem}'")
                    
                    replacements = match.get("replacements", [])
                    if replacements:
                        suggestions = [r.get("value") for r in replacements[:3]]
                        issues.append(f"   Suggestions: {', '.join(suggestions)}")
                
                if len(matches) > 10:
                    issues.append(f"\n... and {len(matches) - 10} more issues")
                
                result_text = "\n".join(issues)
                logger.info(f"Grammar check completed, found {len(matches)} issues")
                return result_text
                
    except asyncio.TimeoutError:
        error_msg = "Error: Grammar check timed out"
        logger.error(error_msg)
        return error_msg
    except Exception as e:
        error_msg = f"Error checking grammar: {str(e)}"
        logger.error(error_msg)
        return error_msg


@mcp.tool()
def calculate_date(
    operation: str,
    date1: str,
    date2: Optional[str] = None,
    amount: Optional[int] = None,
    unit: Optional[str] = None
) -> str:
    """
    Perform date and time calculations.
    
    Supports calculating differences between dates, adding or subtracting
    time periods, and determining day of week.
    
    Operations:
    - difference: Calculate time between two dates (requires date1 and date2)
    - add: Add a time period to a date (requires date1, amount, and unit)
    - subtract: Subtract a time period from a date (requires date1, amount, and unit)
    - day_of_week: Determine what day of the week a date is (requires date1)
    
    Args:
        operation: Operation type (difference, add, subtract, day_of_week)
        date1: First date (or only date for some operations)
        date2: Second date (for difference operation)
        amount: Number of units to add/subtract
        unit: Time unit (days, weeks, months, years)
        
    Returns:
        Result of the date calculation
    """
    logger.info(f"Tool called: calculate_date(operation={operation}, date1={date1})")
    
    try:
        dt1 = date_parser.parse(date1)
        
        if operation == "difference":
            if not date2:
                error_msg = "Error: 'difference' requires two dates"
                logger.warning(error_msg)
                return error_msg
            
            dt2 = date_parser.parse(date2)
            delta = abs((dt2 - dt1).days)
            
            years = delta // 365
            remaining = delta % 365
            months = remaining // 30
            days = remaining % 30
            
            parts = []
            if years > 0:
                parts.append(f"{years} year(s)")
            if months > 0:
                parts.append(f"{months} month(s)")
            if days > 0 or not parts:
                parts.append(f"{days} day(s)")
            
            result_msg = f"Difference between {date1} and {date2}: {', '.join(parts)} ({delta} total days)"
            logger.info(f"Date difference calculated: {delta} days")
            return result_msg
        
        elif operation in ["add", "subtract"]:
            if amount is None or unit is None:
                error_msg = f"Error: '{operation}' requires 'amount' and 'unit' parameters"
                logger.warning(error_msg)
                return error_msg
            
            multiplier = 1 if operation == "add" else -1
            
            if unit == "days":
                result = dt1 + timedelta(days=amount * multiplier)
            elif unit == "weeks":
                result = dt1 + timedelta(weeks=amount * multiplier)
            elif unit == "months":
                result = dt1 + relativedelta(months=amount * multiplier)
            elif unit == "years":
                result = dt1 + relativedelta(years=amount * multiplier)
            else:
                error_msg = f"Error: Unknown unit '{unit}'. Supported: days, weeks, months, years"
                logger.warning(error_msg)
                return error_msg
            
            action = "Adding" if operation == "add" else "Subtracting"
            result_msg = f"{action} {amount} {unit} to/from {date1} gives: {result.strftime('%Y-%m-%d %A')}"
            logger.info(f"Date calculation completed: {result_msg}")
            return result_msg
        
        elif operation == "day_of_week":
            day_name = dt1.strftime('%A')
            date_formatted = dt1.strftime('%B %d, %Y')
            result_msg = f"{date_formatted} is a {day_name}"
            logger.info(f"Day of week determined: {day_name}")
            return result_msg
        
        else:
            error_msg = f"Error: Unknown operation '{operation}'. Supported: difference, add, subtract, day_of_week"
            logger.warning(error_msg)
            return error_msg
            
    except Exception as e:
        error_msg = f"Error in date calculation: {str(e)}"
        logger.error(error_msg)
        return error_msg


@mcp.tool()
def analyze_text(text: str) -> str:
    """
    Analyze text and provide detailed statistics.
    
    Provides comprehensive statistics including word count, character count,
    sentence count, paragraph count, averages, and reading time estimate.
    
    Args:
        text: Text to analyze
        
    Returns:
        Formatted analysis report with various text metrics
    """
    logger.info(f"Tool called: analyze_text(text_length={len(text)})")
    
    try:
        # Basic counts
        char_count = len(text)
        char_no_spaces = len(text.replace(" ", ""))
        word_count = len(text.split())
        
        # Sentence count (simple approximation)
        sentence_endings = text.count('.') + text.count('!') + text.count('?')
        sentence_count = max(1, sentence_endings)
        
        # Paragraph count
        paragraph_count = len([p for p in text.split('\n\n') if p.strip()])
        
        # Average metrics
        avg_word_length = char_no_spaces / word_count if word_count > 0 else 0
        avg_sentence_length = word_count / sentence_count if sentence_count > 0 else 0
        
        # Build report
        report = [
            "Text Analysis Report:",
            f"\nBasic Statistics:",
            f"  Characters (with spaces): {char_count}",
            f"  Characters (no spaces): {char_no_spaces}",
            f"  Words: {word_count}",
            f"  Sentences: {sentence_count}",
            f"  Paragraphs: {paragraph_count}",
            f"\nAverages:",
            f"  Average word length: {avg_word_length:.1f} characters",
            f"  Average sentence length: {avg_sentence_length:.1f} words",
        ]
        
        # Reading time estimate (average reading speed: 200 words/minute)
        reading_time_minutes = word_count / 200
        if reading_time_minutes < 1:
            reading_time = f"{int(reading_time_minutes * 60)} seconds"
        else:
            reading_time = f"{reading_time_minutes:.1f} minutes"
        
        report.append(f"\nEstimated reading time: {reading_time}")
        
        result_text = "\n".join(report)
        logger.info(f"Text analysis completed: {word_count} words, {sentence_count} sentences")
        return result_text
        
    except Exception as e:
        error_msg = f"Error analyzing text: {str(e)}"
        logger.error(error_msg)
        return error_msg


def main():
    """Main entry point for the MCP server."""
    logger.info("=" * 70)
    logger.info("Starting MCP Server: comprehensive-utilities-server")
    logger.info("Transport: stdio (JSON-RPC 2.0)")
    logger.info("=" * 70)
    logger.info("Available tools:")
    logger.info("  - calculate: Basic arithmetic operations")
    logger.info("  - search_web: Web search using DuckDuckGo")
    logger.info("  - check_grammar: Grammar and style checking")
    logger.info("  - calculate_date: Date and time calculations")
    logger.info("  - analyze_text: Text statistics and analysis")
    logger.info("=" * 70)
    logger.info("Server is ready to accept connections...")
    
    # Run the server with stdio transport
    mcp.run(transport="stdio")


if __name__ == "__main__":
    main()

File 2: mcp_client.py - The MCP Client

#!/usr/bin/env python3
"""
MCP Client Implementation
A production-ready client that connects to MCP servers and uses LLM with tools
"""

import asyncio
import json
import logging
import sys
from typing import List, Dict, Any, Optional
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from mcp import ClientSession
from mcp.client.stdio import stdio_client, StdioServerParameters

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('mcp_client.log')
    ]
)
logger = logging.getLogger(__name__)


class MCPClient:
    """
    Production-ready MCP client with proper protocol implementation.
    
    Manages connections to MCP servers, discovers tools, and orchestrates
    interactions between a language model and the available tools.
    """
    
    def __init__(self, model_name: str = "microsoft/Phi-3-mini-4k-instruct", use_gpu: bool = True):
        """
        Initialize the MCP client with a language model.
        
        Args:
            model_name: HuggingFace model identifier
            use_gpu: Whether to attempt GPU acceleration
        """
        self.model_name = model_name
        self.sessions: Dict[str, ClientSession] = {}
        self.tools: Dict[str, Dict] = {}
        self.use_gpu = use_gpu
        
        # Determine device
        if use_gpu:
            if torch.cuda.is_available():
                self.device = "cuda"
                logger.info("GPU acceleration: NVIDIA CUDA")
            elif torch.backends.mps.is_available():
                self.device = "mps"
                logger.info("GPU acceleration: Apple Metal Performance Shaders")
            else:
                self.device = "cpu"
                logger.info("GPU not available, using CPU")
        else:
            self.device = "cpu"
            logger.info("GPU disabled, using CPU")
        
        # Load model
        self._load_model()
    
    def _load_model(self):
        """Load the language model and tokenizer."""
        logger.info(f"Loading model: {self.model_name}")
        
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_name,
                trust_remote_code=True
            )
            
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token
            
            dtype = torch.float16 if self.device != "cpu" else torch.float32
            
            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_name,
                torch_dtype=dtype,
                device_map="auto" if self.device == "cuda" else None,
                trust_remote_code=True,
                low_cpu_mem_usage=True
            )
            
            if self.device == "mps":
                self.model = self.model.to(self.device)
            
            self.generator = pipeline(
                "text-generation",
                model=self.model,
                tokenizer=self.tokenizer,
                device=0 if self.device == "cuda" else -1,
                max_new_tokens=512,
                temperature=0.7,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )
            
            logger.info("Model loaded successfully")
            
        except Exception as e:
            logger.error(f"Error loading model: {str(e)}")
            raise
    
    async def connect_to_server(
        self,
        server_name: str,
        command: str,
        args: List[str] = None
    ):
        """
        Connect to an MCP server using stdio transport.
        
        Implements the full MCP initialization handshake and tool discovery.
        
        Args:
            server_name: Identifier for this server
            command: Command to launch the server
            args: Command-line arguments for the server
        """
        if args is None:
            args = []
        
        logger.info(f"Connecting to MCP server: {server_name}")
        logger.info(f"  Command: {command} {' '.join(args)}")
        
        try:
            # Create server parameters
            server_params = StdioServerParameters(
                command=command,
                args=args,
                env=None
            )
            
            # Establish stdio connection
            logger.info("  Establishing stdio transport...")
            stdio_transport = await stdio_client(server_params)
            read, write = stdio_transport
            
            # Create session (handles initialize/initialized handshake)
            logger.info("  Performing MCP handshake...")
            session = ClientSession(read, write)
            await session.initialize()
            
            self.sessions[server_name] = session
            logger.info("  Handshake completed successfully")
            
            # Discover tools via tools/list
            logger.info("  Discovering available tools...")
            tools_result = await session.list_tools()
            
            tool_count = 0
            for tool in tools_result.tools:
                tool_key = f"{server_name}::{tool.name}"
                self.tools[tool_key] = {
                    "server": server_name,
                    "name": tool.name,
                    "description": tool.description,
                    "schema": tool.inputSchema
                }
                logger.info(f"    - {tool.name}: {tool.description[:60]}...")
                tool_count += 1
            
            logger.info(f"Connected to {server_name}: {tool_count} tools available")
            
        except Exception as e:
            logger.error(f"Error connecting to server {server_name}: {str(e)}")
            raise
    
    def format_tools_for_prompt(self) -> str:
        """Format available tools for the LLM prompt."""
        if not self.tools:
            return "No tools available."
        
        descriptions = ["Available tools:\n"]
        
        for tool_key, tool_info in self.tools.items():
            descriptions.append(f"\nTool: {tool_info['name']}")
            descriptions.append(f"Description: {tool_info['description']}")
            
            schema = tool_info.get('schema', {})
            properties = schema.get('properties', {})
            required = schema.get('required', [])
            
            if properties:
                descriptions.append("Parameters:")
                for param_name, param_info in properties.items():
                    param_type = param_info.get('type', 'unknown')
                    param_desc = param_info.get('description', '')
                    is_required = param_name in required
                    req_text = " (REQUIRED)" if is_required else " (optional)"
                    descriptions.append(f"  - {param_name} ({param_type}){req_text}: {param_desc}")
        
        descriptions.append("\nTo use a tool, respond with:")
        descriptions.append("TOOL_CALL: tool_name")
        descriptions.append("ARGUMENTS: {\"param1\": \"value1\", \"param2\": value2}")
        
        return "\n".join(descriptions)
    
    async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
        """
        Execute a tool via MCP tools/call.
        
        Args:
            tool_name: Name of the tool
            arguments: Tool parameters
            
        Returns:
            Tool execution result
        """
        # Find the tool
        matching = [k for k in self.tools.keys() if k.endswith(f"::{tool_name}")]
        
        if not matching:
            error_msg = f"Error: Tool '{tool_name}' not found"
            logger.warning(error_msg)
            return error_msg
        
        tool_key = matching[0]
        tool_info = self.tools[tool_key]
        server_name = tool_info['server']
        
        session = self.sessions.get(server_name)
        if not session:
            error_msg = f"Error: Server for tool '{tool_name}' not connected"
            logger.error(error_msg)
            return error_msg
        
        logger.info(f"Calling tool: {tool_name}")
        logger.info(f"  Arguments: {arguments}")
        
        try:
            # Execute via JSON-RPC tools/call
            result = await session.call_tool(tool_name, arguments)
            
            # Extract text content
            if hasattr(result, 'content') and result.content:
                content_items = []
                for item in result.content:
                    if hasattr(item, 'text'):
                        content_items.append(item.text)
                result_text = "\n".join(content_items)
                logger.info(f"  Tool result: {result_text[:100]}...")
                return result_text
            
            result_text = str(result)
            logger.info(f"  Tool result: {result_text[:100]}...")
            return result_text
            
        except Exception as e:
            error_msg = f"Error executing tool: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def parse_tool_call(self, response: str) -> Optional[Dict[str, Any]]:
        """Parse tool call from model response."""
        lines = response.split('\n')
        tool_name = None
        arguments = None
        
        for line in lines:
            if line.startswith("TOOL_CALL:"):
                tool_name = line.replace("TOOL_CALL:", "").strip()
            elif line.startswith("ARGUMENTS:"):
                args_str = line.replace("ARGUMENTS:", "").strip()
                try:
                    arguments = json.loads(args_str)
                except json.JSONDecodeError:
                    logger.error(f"Failed to parse arguments: {args_str}")
                    return None
        
        if tool_name and arguments is not None:
            return {"tool": tool_name, "arguments": arguments}
        
        return None
    
    async def chat(self, user_message: str, max_iterations: int = 5) -> str:
        """
        Process a user query with agentic tool usage.
        
        Args:
            user_message: User's question
            max_iterations: Maximum tool-calling iterations
            
        Returns:
            Final response
        """
        logger.info(f"Processing user query: {user_message}")
        
        tools_desc = self.format_tools_for_prompt()
        
        system_prompt = f"""You are a helpful AI assistant with access to various tools.
When you need information or to perform an action, use the available tools.

{tools_desc}

Think step by step. If you use a tool, wait for its result before continuing.
Provide clear, helpful responses."""
        
        messages = [
            f"System: {system_prompt}",
            f"User: {user_message}"
        ]
        
        for iteration in range(max_iterations):
            logger.info(f"Iteration {iteration + 1}/{max_iterations}")
            
            # Generate response
            prompt = "\n\n".join(messages) + "\n\nAssistant:"
            
            try:
                generated = self.generator(
                    prompt,
                    max_new_tokens=512,
                    num_return_sequences=1,
                    pad_token_id=self.tokenizer.pad_token_id
                )[0]['generated_text']
                
                # Extract new content
                assistant_response = generated[len(prompt):].strip()
                messages.append(f"Assistant: {assistant_response}")
                logger.info(f"Model response: {assistant_response[:100]}...")
                
                # Check for tool usage
                tool_call = self.parse_tool_call(assistant_response)
                
                if tool_call:
                    logger.info(f"Model wants to use tool: {tool_call['tool']}")
                    
                    # Execute tool
                    result = await self.call_tool(
                        tool_call['tool'],
                        tool_call['arguments']
                    )
                    
                    messages.append(f"Tool Result: {result}")
                    logger.info("Tool result added to context, continuing...")
                    continue
                else:
                    # No tool call, return response
                    logger.info("No tool call detected, returning final response")
                    return assistant_response
                    
            except Exception as e:
                error_msg = f"Error during chat: {str(e)}"
                logger.error(error_msg)
                return f"I encountered an error: {str(e)}"
        
        # Max iterations reached
        logger.warning("Max iterations reached")
        return messages[-1].replace("Assistant: ", "")
    
    async def close(self):
        """Close all MCP server connections."""
        logger.info("Closing all server connections...")
        for name, session in self.sessions.items():
            try:
                await session.close()
                logger.info(f"  Closed: {name}")
            except Exception as e:
                logger.error(f"  Error closing {name}: {str(e)}")


async def main():
    """Main entry point for the client."""
    logger.info("=" * 70)
    logger.info("MCP Client - Production Example")
    logger.info("=" * 70)
    
    # Create client
    client = MCPClient(use_gpu=True)
    
    try:
        # Connect to the MCP server
        logger.info("\nConnecting to MCP servers...")
        await client.connect_to_server(
            server_name="utilities",
            command=sys.executable,  # Use the same Python interpreter
            args=["mcp_server.py"]
        )
        
        logger.info("\n" + "=" * 70)
        logger.info("Ready to process queries!")
        logger.info("=" * 70)
        
        # Example queries
        queries = [
            "What is 15 multiplied by 7?",
            "Calculate the difference between January 1, 2024 and December 31, 2024",
            "Analyze this text: The quick brown fox jumps over the lazy dog."
        ]
        
        for i, query in enumerate(queries, 1):
            logger.info(f"\n{'=' * 70}")
            logger.info(f"Query {i}: {query}")
            logger.info("=" * 70)
            
            response = await client.chat(query)
            
            print(f"\n{'=' * 70}")
            print(f"QUERY: {query}")
            print(f"{'=' * 70}")
            print(f"RESPONSE: {response}")
            print(f"{'=' * 70}\n")
        
    except KeyboardInterrupt:
        logger.info("\nInterrupted by user")
    except Exception as e:
        logger.error(f"Error in main: {str(e)}")
    finally:
        await client.close()
        logger.info("Client shutdown complete")


if __name__ == "__main__":
    asyncio.run(main())

File 3: run_demo.py - Complete Demo Runner

#!/usr/bin/env python3
"""
Complete MCP Demo Runner
Demonstrates the full MCP protocol with actual server and client processes
"""

import asyncio
import logging
import sys
from pathlib import Path

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


async def run_interactive_demo():
    """Run an interactive demo with the MCP client."""
    # Import here to avoid circular imports
    from mcp_client import MCPClient
    
    logger.info("=" * 70)
    logger.info("MCP INTERACTIVE DEMO")
    logger.info("=" * 70)
    logger.info("This demo connects to a real MCP server process")
    logger.info("and allows you to interact with it using a language model.")
    logger.info("=" * 70)
    
    # Create client
    client = MCPClient(use_gpu=True)
    
    try:
        # Connect to server
        logger.info("\nConnecting to MCP server...")
        await client.connect_to_server(
            server_name="utilities",
            command=sys.executable,
            args=["mcp_server.py"]
        )
        
        logger.info("\n" + "=" * 70)
        logger.info("Connected! You can now ask questions.")
        logger.info("Type 'quit' or 'exit' to stop.")
        logger.info("=" * 70 + "\n")
        
        # Interactive loop
        while True:
            try:
                user_input = input("You: ").strip()
                
                if user_input.lower() in ['quit', 'exit', 'q']:
                    logger.info("Exiting...")
                    break
                
                if not user_input:
                    continue
                
                print("\nProcessing...\n")
                response = await client.chat(user_input)
                print(f"Assistant: {response}\n")
                
            except KeyboardInterrupt:
                logger.info("\nInterrupted by user")
                break
            except Exception as e:
                logger.error(f"Error: {str(e)}")
                continue
        
    finally:
        await client.close()
        logger.info("Demo complete")


async def run_automated_demo():
    """Run an automated demo with predefined queries."""
    from mcp_client import MCPClient
    
    logger.info("=" * 70)
    logger.info("MCP AUTOMATED DEMO")
    logger.info("=" * 70)
    logger.info("Running predefined queries to demonstrate MCP capabilities")
    logger.info("=" * 70)
    
    client = MCPClient(use_gpu=True)
    
    try:
        # Connect to server
        logger.info("\nConnecting to MCP server...")
        await client.connect_to_server(
            server_name="utilities",
            command=sys.executable,
            args=["mcp_server.py"]
        )
        
        # Predefined queries that demonstrate different tools
        queries = [
            {
                "query": "What is 156 divided by 12?",
                "description": "Tests the calculator tool"
            },
            {
                "query": "What day of the week was January 1, 2000?",
                "description": "Tests the date calculation tool"
            },
            {
                "query": "Analyze this sentence: The quick brown fox jumps over the lazy dog.",
                "description": "Tests the text analysis tool"
            },
            {
                "query": "Check the grammar of this text: I has went to the store yesterday.",
                "description": "Tests the grammar checking tool"
            },
            {
                "query": "Calculate the difference between March 15, 2024 and June 20, 2024",
                "description": "Tests complex date calculations"
            }
        ]
        
        for i, item in enumerate(queries, 1):
            logger.info(f"\n{'=' * 70}")
            logger.info(f"Demo Query {i}/{len(queries)}")
            logger.info(f"Description: {item['description']}")
            logger.info(f"Query: {item['query']}")
            logger.info("=" * 70)
            
            response = await client.chat(item['query'])
            
            print(f"\n{'=' * 70}")
            print(f"QUERY {i}: {item['query']}")
            print(f"{'=' * 70}")
            print(f"RESPONSE:\n{response}")
            print(f"{'=' * 70}\n")
            
            # Small delay between queries
            await asyncio.sleep(1)
        
        logger.info("\n" + "=" * 70)
        logger.info("All demo queries completed successfully!")
        logger.info("=" * 70)
        
    finally:
        await client.close()


def main():
    """Main entry point."""
    print("\nMCP Demo Runner")
    print("=" * 70)
    print("This demonstrates a complete MCP implementation with:")
    print("  - Real MCP server process (mcp_server.py)")
    print("  - MCP client with language model (mcp_client.py)")
    print("  - JSON-RPC 2.0 protocol over stdio transport")
    print("  - Multiple tools: calculator, dates, text analysis, etc.")
    print("=" * 70)
    print("\nChoose demo mode:")
    print("  1. Interactive mode (ask your own questions)")
    print("  2. Automated mode (run predefined queries)")
    print("  3. Exit")
    print("=" * 70)
    
    choice = input("\nEnter choice (1-3): ").strip()
    
    if choice == "1":
        asyncio.run(run_interactive_demo())
    elif choice == "2":
        asyncio.run(run_automated_demo())
    elif choice == "3":
        print("Exiting...")
    else:
        print("Invalid choice. Exiting...")


if __name__ == "__main__":
    main()

Installation and Usage Instructions

To use this complete MCP implementation, follow these steps:

  1. Install Dependencies:
pip install mcp aiohttp python-dateutil transformers torch
  1. Save the Files:

    • Save the first code block as mcp_server.py
    • Save the second code block as mcp_client.py
    • Save the third code block as run_demo.py
  2. Run the Demo:

python run_demo.py

This will present you with options to run either an interactive demo where you can ask your own questions, or an automated demo that runs predefined queries to showcase all the tools.

Testing Individual Components:

You can also test the server independently:

# Test the server directly (it will wait for JSON-RPC messages on stdin)
python mcp_server.py

Or run the client with custom queries by modifying the queries list in mcp_client.py.

This complete implementation demonstrates a production-ready MCP system with:

  • Proper Protocol Implementation: All communication uses JSON-RPC 2.0 over stdio transport
  • Real Server Processes: The client actually launches the server as a subprocess and communicates via stdin/stdout
  • Full Tool Discovery: The client discovers tools via the tools/list method
  • Actual Tool Execution: Tools are executed via the tools/call method
  • Comprehensive Logging: Both server and client log all operations for debugging
  • Error Handling: Robust error handling throughout the stack
  • GPU Support: Automatic detection and use of CUDA or Metal Performance Shaders
  • Agentic Loop: The language model can iteratively use tools to solve complex problems


No comments: