Thursday, June 04, 2026

Building a Python Library for LLM Application Development



Introduction

Developing applications that integrate Large Language Models requires solving many recurring technical challenges. Every new project typically requires developers to reimplement GPU detection, configuration management, tool calling mechanisms, rate limiting, and other foundational components. This article presents a comprehensive Python library designed to eliminate this redundant work by providing production-ready, reusable components that handle the most common requirements of LLM-based applications.

The library follows clean architecture principles with clear separation of concerns. Each component is designed to be independently usable while also working seamlessly with other components. The goal is to provide developers with a toolkit that accelerates development without imposing rigid constraints on application architecture.

This article explores each component in depth, explaining not just what it does but why it is designed in a particular way. We will examine the technical challenges each component addresses and provide concrete code examples. At the end, a complete running example demonstrates how all components integrate to create a functional LLM application.

GPU Detection and Optimization Component

Modern LLM inference requires significant computational resources, and utilizing available GPU acceleration is essential for acceptable performance. However, different hardware platforms use different GPU technologies. Apple Silicon uses Metal Performance Shaders, NVIDIA uses CUDA, AMD uses ROCm, and Intel has its own acceleration framework. An application that runs on multiple platforms must detect the available hardware and configure the inference engine accordingly.

The GPU detection component solves this problem by automatically identifying available acceleration hardware and providing the appropriate configuration for the inference engine. This component abstracts away platform-specific details, allowing the rest of the application to remain hardware-agnostic.

The detection process follows a priority order. CUDA is checked first because it offers the most mature ecosystem for LLM inference. If CUDA is unavailable, the component checks for ROCm on AMD hardware, then Metal Performance Shaders on Apple Silicon, and finally Intel acceleration. If no GPU is available, the component falls back to CPU inference with appropriate warnings.

Here is the core implementation of the GPU detector:

import torch
import platform
import logging
from enum import Enum
from dataclasses import dataclass
from typing import Optional

class AcceleratorType(Enum):
    """Enumeration of supported hardware accelerators."""
    CUDA = "cuda"
    ROCM = "rocm"
    MPS = "mps"
    INTEL = "intel"
    CPU = "cpu"

@dataclass
class AcceleratorInfo:
    """Information about detected hardware accelerator."""
    accelerator_type: AcceleratorType
    device_name: str
    device_count: int
    memory_available: Optional[int]
    compute_capability: Optional[str]

class GPUDetector:
    """Detects and configures GPU acceleration for LLM inference."""
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self._accelerator_info = None
    
    def detect(self) -> AcceleratorInfo:
        """
        Detect available GPU acceleration and return configuration.
        
        Returns:
            AcceleratorInfo object containing hardware details
        """
        if self._accelerator_info is not None:
            return self._accelerator_info
        
        # Check for NVIDIA CUDA
        if torch.cuda.is_available():
            self._accelerator_info = self._detect_cuda()
            self.logger.info(f"Detected CUDA GPU: {self._accelerator_info.device_name}")
            return self._accelerator_info
        
        # Check for AMD ROCm
        if hasattr(torch.version, 'hip') and torch.version.hip is not None:
            self._accelerator_info = self._detect_rocm()
            self.logger.info(f"Detected ROCm GPU: {self._accelerator_info.device_name}")
            return self._accelerator_info
        
        # Check for Apple Metal Performance Shaders
        if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
            self._accelerator_info = self._detect_mps()
            self.logger.info(f"Detected Apple MPS: {self._accelerator_info.device_name}")
            return self._accelerator_info
        
        # Fallback to CPU
        self._accelerator_info = self._detect_cpu()
        self.logger.warning("No GPU acceleration available, using CPU")
        return self._accelerator_info

The detector implements lazy initialization through caching. Once hardware is detected, subsequent calls return the cached result rather than repeating the detection process. This is important because hardware detection can be expensive and the hardware configuration does not change during application runtime.

Each detection method gathers platform-specific information. For CUDA, this includes device count, memory, and compute capability. For MPS, it includes the chip architecture. This information helps the application make informed decisions about model loading and inference parameters.

    def _detect_cuda(self) -> AcceleratorInfo:
        """Detect NVIDIA CUDA configuration."""
        device_count = torch.cuda.device_count()
        device_name = torch.cuda.get_device_name(0) if device_count > 0 else "Unknown"
        memory = torch.cuda.get_device_properties(0).total_memory if device_count > 0 else None
        compute_cap = f"{torch.cuda.get_device_properties(0).major}.{torch.cuda.get_device_properties(0).minor}" if device_count > 0 else None
        
        return AcceleratorInfo(
            accelerator_type=AcceleratorType.CUDA,
            device_name=device_name,
            device_count=device_count,
            memory_available=memory,
            compute_capability=compute_cap
        )
    
    def _detect_mps(self) -> AcceleratorInfo:
        """Detect Apple Metal Performance Shaders configuration."""
        chip_info = platform.processor() or platform.machine()
        
        return AcceleratorInfo(
            accelerator_type=AcceleratorType.MPS,
            device_name=f"Apple Silicon {chip_info}",
            device_count=1,
            memory_available=None,
            compute_capability=None
        )
    
    def _detect_cpu(self) -> AcceleratorInfo:
        """Fallback CPU configuration."""
        import multiprocessing
        
        return AcceleratorInfo(
            accelerator_type=AcceleratorType.CPU,
            device_name=platform.processor() or "CPU",
            device_count=multiprocessing.cpu_count(),
            memory_available=None,
            compute_capability=None
        )
    
    def get_device_string(self) -> str:
        """
        Get PyTorch device string for model loading.
        
        Returns:
            Device string like 'cuda:0', 'mps', or 'cpu'
        """
        info = self.detect()
        if info.accelerator_type == AcceleratorType.CUDA:
            return "cuda:0"
        elif info.accelerator_type == AcceleratorType.MPS:
            return "mps"
        else:
            return "cpu"

The get_device_string method provides the string that PyTorch and other frameworks expect when loading models. This abstraction means application code can simply call get_device_string without knowing anything about the underlying hardware.

The component also provides optimization hints based on detected hardware. For example, on systems with limited GPU memory, it might recommend using quantized models or smaller batch sizes. On high-end CUDA systems, it might enable features like flash attention or tensor cores.

Abstract LLM Interface Component

LLM applications often need to support multiple model providers. An application might use OpenAI's GPT models in production but switch to a local Llama model for development or privacy-sensitive deployments. Alternatively, different parts of an application might use different models optimized for specific tasks.

Hard-coding dependencies on specific LLM providers creates tight coupling that makes the application brittle and difficult to modify. The abstract LLM interface component solves this by defining a common contract that all LLM implementations must satisfy. Application code depends on this interface rather than concrete implementations, enabling seamless model swapping.

The interface defines the essential operations that any LLM must support: generating completions, streaming responses, and managing conversation context. It also standardizes how parameters like temperature and top-k are passed to the model.

from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Iterator, Union
from dataclasses import dataclass

@dataclass
class Message:
    """Represents a single message in a conversation."""
    role: str  # 'system', 'user', or 'assistant'
    content: str
    name: Optional[str] = None
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert message to dictionary format."""
        result = {"role": self.role, "content": self.content}
        if self.name:
            result["name"] = self.name
        return result

@dataclass
class CompletionResponse:
    """Response from LLM completion request."""
    content: str
    model: str
    finish_reason: str
    usage: Dict[str, int]
    raw_response: Optional[Any] = None

class BaseLLM(ABC):
    """
    Abstract base class for LLM implementations.
    
    This interface ensures all LLM providers expose consistent methods,
    allowing applications to swap implementations without code changes.
    """
    
    def __init__(self, model_name: str, **kwargs):
        """
        Initialize LLM with model name and optional parameters.
        
        Args:
            model_name: Identifier for the specific model to use
            **kwargs: Provider-specific configuration
        """
        self.model_name = model_name
        self.config = kwargs
        self.logger = logging.getLogger(self.__class__.__name__)
    
    @abstractmethod
    def complete(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        top_p: float = 1.0,
        top_k: Optional[int] = None,
        **kwargs
    ) -> CompletionResponse:
        """
        Generate completion for given messages.
        
        Args:
            messages: Conversation history
            temperature: Sampling temperature (0.0 to 2.0)
            max_tokens: Maximum tokens to generate
            top_p: Nucleus sampling parameter
            top_k: Top-k sampling parameter
            **kwargs: Additional provider-specific parameters
        
        Returns:
            CompletionResponse containing generated text and metadata
        """
        pass
    
    @abstractmethod
    def stream_complete(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        top_p: float = 1.0,
        top_k: Optional[int] = None,
        **kwargs
    ) -> Iterator[str]:
        """
        Stream completion tokens as they are generated.
        
        Args:
            Same as complete()
        
        Yields:
            Individual tokens or token chunks as strings
        """
        pass

The interface uses dataclasses for structured data. The Message class represents individual conversation turns with role, content, and optional name fields. This structure matches the format used by most LLM APIs, making it easy to adapt existing code.

The CompletionResponse class encapsulates everything the application needs from a completion: the generated text, metadata about token usage, and the reason generation stopped. Including the raw response allows applications to access provider-specific information when needed while maintaining portability.

Concrete implementations of this interface handle the specifics of communicating with different LLM providers. Here is an implementation for local models using the transformers library:

from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

class LocalTransformerLLM(BaseLLM):
    """LLM implementation using HuggingFace transformers for local inference."""
    
    def __init__(self, model_name: str, device: Optional[str] = None, **kwargs):
        """
        Initialize local transformer model.
        
        Args:
            model_name: HuggingFace model identifier
            device: Device string ('cuda', 'mps', 'cpu') or None for auto-detect
            **kwargs: Additional model loading parameters
        """
        super().__init__(model_name, **kwargs)
        
        # Auto-detect device if not specified
        if device is None:
            detector = GPUDetector()
            device = detector.get_device_string()
        
        self.device = device
        self.logger.info(f"Loading model {model_name} on device {device}")
        
        # Load tokenizer and model
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_name,
            trust_remote_code=kwargs.get('trust_remote_code', False)
        )
        
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=kwargs.get('torch_dtype', torch.float16),
            device_map=kwargs.get('device_map', 'auto'),
            trust_remote_code=kwargs.get('trust_remote_code', False)
        )
        
        # Set pad token if not present
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
    
    def complete(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        top_p: float = 1.0,
        top_k: Optional[int] = None,
        **kwargs
    ) -> CompletionResponse:
        """Generate completion using local transformer model."""
        
        # Format messages using chat template if available
        if hasattr(self.tokenizer, 'apply_chat_template'):
            prompt = self.tokenizer.apply_chat_template(
                [msg.to_dict() for msg in messages],
                tokenize=False,
                add_generation_prompt=True
            )
        else:
            # Fallback: concatenate messages
            prompt = "\n".join([f"{msg.role}: {msg.content}" for msg in messages])
        
        # Tokenize input
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        input_length = inputs.input_ids.shape[1]
        
        # Generate
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_tokens or 512,
                temperature=temperature,
                top_p=top_p,
                top_k=top_k if top_k else 50,
                do_sample=temperature > 0,
                pad_token_id=self.tokenizer.pad_token_id,
                eos_token_id=self.tokenizer.eos_token_id,
                **kwargs
            )
        
        # Decode only the generated tokens
        generated_tokens = outputs[0][input_length:]
        generated_text = self.tokenizer.decode(generated_tokens, skip_special_tokens=True)
        
        return CompletionResponse(
            content=generated_text,
            model=self.model_name,
            finish_reason="stop",
            usage={
                "prompt_tokens": input_length,
                "completion_tokens": len(generated_tokens),
                "total_tokens": input_length + len(generated_tokens)
            }
        )

This implementation demonstrates how the abstract interface accommodates different LLM backends. The local transformer implementation handles model loading, device placement, tokenization, and generation. It uses the GPU detector component to automatically select the appropriate device, showing how components work together.

The chat template support is particularly important. Modern instruction-tuned models expect specific formatting for conversation turns. The implementation checks if the tokenizer provides a chat template and uses it when available, falling back to simple concatenation for models without templates.

Another implementation might target OpenAI's API:

import openai
from typing import List, Iterator

class OpenAILLM(BaseLLM):
    """LLM implementation using OpenAI API."""
    
    def __init__(self, model_name: str, api_key: str, **kwargs):
        """
        Initialize OpenAI LLM client.
        
        Args:
            model_name: OpenAI model identifier (e.g., 'gpt-4', 'gpt-3.5-turbo')
            api_key: OpenAI API key
            **kwargs: Additional configuration
        """
        super().__init__(model_name, **kwargs)
        self.client = openai.OpenAI(api_key=api_key)
    
    def complete(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        top_p: float = 1.0,
        top_k: Optional[int] = None,
        **kwargs
    ) -> CompletionResponse:
        """Generate completion using OpenAI API."""
        
        # Convert messages to OpenAI format
        openai_messages = [msg.to_dict() for msg in messages]
        
        # Make API call
        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=openai_messages,
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            **kwargs
        )
        
        # Extract response
        choice = response.choices[0]
        
        return CompletionResponse(
            content=choice.message.content,
            model=response.model,
            finish_reason=choice.finish_reason,
            usage={
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens,
                "total_tokens": response.usage.total_tokens
            },
            raw_response=response
        )
    
    def stream_complete(
        self,
        messages: List[Message],
        temperature: float = 0.7,
        max_tokens: Optional[int] = None,
        top_p: float = 1.0,
        top_k: Optional[int] = None,
        **kwargs
    ) -> Iterator[str]:
        """Stream completion from OpenAI API."""
        
        openai_messages = [msg.to_dict() for msg in messages]
        
        stream = self.client.chat.completions.create(
            model=self.model_name,
            messages=openai_messages,
            temperature=temperature,
            max_tokens=max_tokens,
            top_p=top_p,
            stream=True,
            **kwargs
        )
        
        for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                yield chunk.choices[0].delta.content

The OpenAI implementation shows how the same interface adapts to a completely different backend. Instead of loading models and running inference locally, it makes HTTP requests to OpenAI's API. Yet from the application's perspective, both implementations are interchangeable because they satisfy the same contract.

The streaming implementation demonstrates another important capability. Many applications need to display partial results as they are generated rather than waiting for the complete response. The stream_complete method returns an iterator that yields tokens incrementally, enabling responsive user interfaces.

Configuration Management Component

LLM applications require extensive configuration. Model parameters like temperature and top_k affect generation quality. Context window sizes determine how much conversation history the model can consider. API keys and endpoints vary between environments. Hard-coding these values makes applications inflexible and difficult to maintain.

The configuration management component provides a clean way to externalize all configuration into files that can be modified without changing code. It supports both JSON and YAML formats, validates configuration values, and provides sensible defaults.

The component uses a hierarchical structure where general settings can be overridden by environment-specific values. For example, a base configuration might specify default model parameters, while a production configuration overrides the model endpoint and API key.

import json
import yaml
from pathlib import Path
from typing import Any, Dict, Optional, Union
from dataclasses import dataclass, field, asdict

@dataclass
class LLMModelConfig:
    """Configuration for LLM model parameters."""
    model_name: str
    temperature: float = 0.7
    max_tokens: Optional[int] = None
    top_p: float = 1.0
    top_k: Optional[int] = None
    context_window: int = 4096
    system_message: Optional[str] = None
    
    def validate(self):
        """Validate configuration values."""
        if not 0.0 <= self.temperature <= 2.0:
            raise ValueError(f"Temperature must be between 0.0 and 2.0, got {self.temperature}")
        if not 0.0 <= self.top_p <= 1.0:
            raise ValueError(f"top_p must be between 0.0 and 1.0, got {self.top_p}")
        if self.top_k is not None and self.top_k < 1:
            raise ValueError(f"top_k must be positive, got {self.top_k}")
        if self.context_window < 1:
            raise ValueError(f"context_window must be positive, got {self.context_window}")

@dataclass
class ApplicationConfig:
    """Complete application configuration."""
    llm: LLMModelConfig
    api_keys: Dict[str, str] = field(default_factory=dict)
    logging_level: str = "INFO"
    enable_streaming: bool = True
    max_retries: int = 3
    timeout_seconds: int = 30
    
    def validate(self):
        """Validate entire configuration."""
        self.llm.validate()
        valid_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
        if self.logging_level not in valid_log_levels:
            raise ValueError(f"Invalid logging level: {self.logging_level}")

class ConfigurationManager:
    """Manages application configuration from files."""
    
    def __init__(self, config_path: Optional[Union[str, Path]] = None):
        """
        Initialize configuration manager.
        
        Args:
            config_path: Path to configuration file (JSON or YAML)
        """
        self.config_path = Path(config_path) if config_path else None
        self._config: Optional[ApplicationConfig] = None
        self.logger = logging.getLogger(__name__)
    
    def load(self, config_path: Optional[Union[str, Path]] = None) -> ApplicationConfig:
        """
        Load configuration from file.
        
        Args:
            config_path: Path to configuration file, overrides constructor path
        
        Returns:
            Validated ApplicationConfig object
        """
        path = Path(config_path) if config_path else self.config_path
        
        if path is None:
            self.logger.warning("No configuration file specified, using defaults")
            return self._create_default_config()
        
        if not path.exists():
            raise FileNotFoundError(f"Configuration file not found: {path}")
        
        # Load based on file extension
        suffix = path.suffix.lower()
        if suffix == '.json':
            config_dict = self._load_json(path)
        elif suffix in ['.yaml', '.yml']:
            config_dict = self._load_yaml(path)
        else:
            raise ValueError(f"Unsupported configuration format: {suffix}")
        
        # Parse into configuration objects
        self._config = self._parse_config(config_dict)
        self._config.validate()
        
        self.logger.info(f"Loaded configuration from {path}")
        return self._config

The configuration manager separates loading from parsing. The load method determines the file format and delegates to format-specific loaders. This separation makes it easy to add support for additional formats in the future.

Validation is a critical feature. The validate methods check that all configuration values are within acceptable ranges. Temperature must be between zero and two, top_p between zero and one, and so on. This catches configuration errors early rather than allowing them to cause cryptic failures during model inference.

    def _load_json(self, path: Path) -> Dict[str, Any]:
        """Load configuration from JSON file."""
        with open(path, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def _load_yaml(self, path: Path) -> Dict[str, Any]:
        """Load configuration from YAML file."""
        with open(path, 'r', encoding='utf-8') as f:
            return yaml.safe_load(f)
    
    def _parse_config(self, config_dict: Dict[str, Any]) -> ApplicationConfig:
        """Parse dictionary into ApplicationConfig object."""
        
        # Parse LLM configuration
        llm_dict = config_dict.get('llm', {})
        llm_config = LLMModelConfig(
            model_name=llm_dict.get('model_name', 'gpt-3.5-turbo'),
            temperature=llm_dict.get('temperature', 0.7),
            max_tokens=llm_dict.get('max_tokens'),
            top_p=llm_dict.get('top_p', 1.0),
            top_k=llm_dict.get('top_k'),
            context_window=llm_dict.get('context_window', 4096),
            system_message=llm_dict.get('system_message')
        )
        
        # Parse application configuration
        app_config = ApplicationConfig(
            llm=llm_config,
            api_keys=config_dict.get('api_keys', {}),
            logging_level=config_dict.get('logging_level', 'INFO'),
            enable_streaming=config_dict.get('enable_streaming', True),
            max_retries=config_dict.get('max_retries', 3),
            timeout_seconds=config_dict.get('timeout_seconds', 30)
        )
        
        return app_config
    
    def _create_default_config(self) -> ApplicationConfig:
        """Create configuration with default values."""
        return ApplicationConfig(
            llm=LLMModelConfig(model_name='gpt-3.5-turbo')
        )
    
    def save(self, config: ApplicationConfig, output_path: Union[str, Path]):
        """
        Save configuration to file.
        
        Args:
            config: Configuration object to save
            output_path: Destination file path
        """
        path = Path(output_path)
        config_dict = self._config_to_dict(config)
        
        suffix = path.suffix.lower()
        if suffix == '.json':
            with open(path, 'w', encoding='utf-8') as f:
                json.dump(config_dict, f, indent=2)
        elif suffix in ['.yaml', '.yml']:
            with open(path, 'w', encoding='utf-8') as f:
                yaml.dump(config_dict, f, default_flow_style=False)
        else:
            raise ValueError(f"Unsupported output format: {suffix}")
        
        self.logger.info(f"Saved configuration to {path}")
    
    def _config_to_dict(self, config: ApplicationConfig) -> Dict[str, Any]:
        """Convert configuration object to dictionary."""
        return {
            'llm': asdict(config.llm),
            'api_keys': config.api_keys,
            'logging_level': config.logging_level,
            'enable_streaming': config.enable_streaming,
            'max_retries': config.max_retries,
            'timeout_seconds': config.timeout_seconds
        }

The save method enables round-tripping configuration. Applications can load configuration, modify it programmatically, and save the updated version. This is useful for tools that help users configure the application through a graphical interface.

A typical configuration file in YAML format might look like this:

llm:
  model_name: "meta-llama/Llama-2-7b-chat-hf"
  temperature: 0.8
  max_tokens: 2048
  top_p: 0.95
  top_k: 50
  context_window: 4096
  system_message: "You are a helpful AI assistant."

api_keys:
  openai: "sk-..."
  anthropic: "sk-ant-..."

logging_level: "INFO"
enable_streaming: true
max_retries: 3
timeout_seconds: 60

The hierarchical structure makes configuration files readable and maintainable. Related settings are grouped together, and the structure mirrors the code's data classes, making it easy to understand how configuration maps to application behavior.

Tool Calling Framework Component

Modern LLMs can use external tools to extend their capabilities beyond text generation. A model might call a web search API to find current information, execute Python code to perform calculations, or query a database to retrieve specific data. The tool calling framework component provides infrastructure for defining tools, invoking them safely, and integrating results back into the conversation.

The framework uses a plugin architecture where each tool is a self-contained unit with a clear interface. Tools declare their name, description, and parameters using a schema that the LLM can understand. When the model decides to use a tool, the framework validates the parameters, executes the tool, and formats the result.

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
import json
import inspect
from pydantic import BaseModel, Field, ValidationError

class ToolParameter(BaseModel):
    """Schema for a single tool parameter."""
    name: str
    type: str  # 'string', 'number', 'boolean', 'array', 'object'
    description: str
    required: bool = True
    enum: Optional[List[Any]] = None

class ToolSchema(BaseModel):
    """Schema describing a tool's interface."""
    name: str
    description: str
    parameters: List[ToolParameter]
    
    def to_openai_format(self) -> Dict[str, Any]:
        """Convert schema to OpenAI function calling format."""
        properties = {}
        required = []
        
        for param in self.parameters:
            properties[param.name] = {
                "type": param.type,
                "description": param.description
            }
            if param.enum:
                properties[param.name]["enum"] = param.enum
            
            if param.required:
                required.append(param.name)
        
        return {
            "name": self.name,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": properties,
                "required": required
            }
        }

class BaseTool(ABC):
    """Abstract base class for tools that LLMs can invoke."""
    
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
    
    @abstractmethod
    def get_schema(self) -> ToolSchema:
        """Return schema describing this tool's interface."""
        pass
    
    @abstractmethod
    def execute(self, **kwargs) -> Dict[str, Any]:
        """
        Execute the tool with given parameters.
        
        Args:
            **kwargs: Tool-specific parameters
        
        Returns:
            Dictionary containing execution results
        """
        pass
    
    def validate_and_execute(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """
        Validate parameters against schema and execute tool.
        
        Args:
            parameters: Dictionary of parameter values
        
        Returns:
            Execution results or error information
        """
        schema = self.get_schema()
        
        # Validate required parameters
        for param in schema.parameters:
            if param.required and param.name not in parameters:
                return {
                    "error": f"Missing required parameter: {param.name}",
                    "success": False
                }
        
        try:
            result = self.execute(**parameters)
            return {"success": True, "result": result}
        except Exception as e:
            self.logger.error(f"Tool execution failed: {e}", exc_info=True)
            return {
                "error": str(e),
                "success": False
            }

The BaseTool class defines the contract that all tools must implement. The get_schema method returns metadata that describes what the tool does and what parameters it accepts. The execute method performs the actual work. The validate_and_execute method adds a safety layer that checks parameters before execution and handles errors gracefully.

The schema format is designed to be compatible with OpenAI's function calling format, but the to_openai_format method shows how it can be adapted to other formats. This flexibility allows the same tool definitions to work with different LLM providers.

Here is a concrete implementation of a web search tool using DuckDuckGo:

from duckduckgo_search import DDGS
from datetime import datetime

class WebSearchTool(BaseTool):
    """Tool for searching the web using DuckDuckGo."""
    
    def __init__(self, max_results: int = 5):
        """
        Initialize web search tool.
        
        Args:
            max_results: Maximum number of search results to return
        """
        super().__init__()
        self.max_results = max_results
        self.ddgs = DDGS()
    
    def get_schema(self) -> ToolSchema:
        """Return schema for web search tool."""
        return ToolSchema(
            name="web_search",
            description="Search the web for current information using DuckDuckGo. Use this when you need up-to-date information or facts that you don't have in your training data.",
            parameters=[
                ToolParameter(
                    name="query",
                    type="string",
                    description="The search query to execute",
                    required=True
                ),
                ToolParameter(
                    name="max_results",
                    type="number",
                    description="Maximum number of results to return (default: 5)",
                    required=False
                )
            ]
        )
    
    def execute(self, query: str, max_results: Optional[int] = None) -> Dict[str, Any]:
        """
        Execute web search.
        
        Args:
            query: Search query string
            max_results: Maximum results to return
        
        Returns:
            Dictionary containing search results
        """
        num_results = max_results if max_results is not None else self.max_results
        
        self.logger.info(f"Executing web search: {query}")
        
        try:
            results = list(self.ddgs.text(query, max_results=num_results))
            
            formatted_results = []
            for idx, result in enumerate(results, 1):
                formatted_results.append({
                    "position": idx,
                    "title": result.get("title", ""),
                    "url": result.get("href", ""),
                    "snippet": result.get("body", "")
                })
            
            return {
                "query": query,
                "timestamp": datetime.now().isoformat(),
                "results": formatted_results,
                "count": len(formatted_results)
            }
        
        except Exception as e:
            self.logger.error(f"Search failed: {e}")
            raise

The web search tool demonstrates several important patterns. It encapsulates the complexity of interacting with the DuckDuckGo API behind a simple interface. The execute method returns structured data that is easy for both the LLM and application code to process. Error handling ensures that search failures are logged and reported rather than crashing the application.

The tool framework also includes a registry that manages available tools and routes execution requests:

class ToolRegistry:
    """Registry for managing available tools."""
    
    def __init__(self):
        self.tools: Dict[str, BaseTool] = {}
        self.logger = logging.getLogger(__name__)
    
    def register(self, tool: BaseTool):
        """
        Register a tool.
        
        Args:
            tool: Tool instance to register
        """
        schema = tool.get_schema()
        self.tools[schema.name] = tool
        self.logger.info(f"Registered tool: {schema.name}")
    
    def get_tool(self, name: str) -> Optional[BaseTool]:
        """
        Retrieve a tool by name.
        
        Args:
            name: Tool name
        
        Returns:
            Tool instance or None if not found
        """
        return self.tools.get(name)
    
    def get_all_schemas(self) -> List[ToolSchema]:
        """Get schemas for all registered tools."""
        return [tool.get_schema() for tool in self.tools.values()]
    
    def execute_tool(self, name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute a tool by name.
        
        Args:
            name: Tool name
            parameters: Tool parameters
        
        Returns:
            Execution results
        """
        tool = self.get_tool(name)
        if tool is None:
            return {
                "error": f"Tool not found: {name}",
                "success": False
            }
        
        return tool.validate_and_execute(parameters)

The registry provides a central point for managing tools. Applications register all available tools at startup, and the registry handles routing execution requests to the appropriate tool. This centralization makes it easy to add, remove, or modify tools without changing the core application logic.

For paid services like SERP API, the framework supports the same interface with different implementations:

import requests

class SerpAPISearchTool(BaseTool):
    """Web search tool using SERP API (paid service)."""
    
    def __init__(self, api_key: str, max_results: int = 5):
        """
        Initialize SERP API search tool.
        
        Args:
            api_key: SERP API key
            max_results: Maximum number of results
        """
        super().__init__()
        self.api_key = api_key
        self.max_results = max_results
        self.base_url = "https://serpapi.com/search"
    
    def get_schema(self) -> ToolSchema:
        """Return schema for SERP API search."""
        return ToolSchema(
            name="web_search",
            description="Search the web using SERP API for high-quality, structured results.",
            parameters=[
                ToolParameter(
                    name="query",
                    type="string",
                    description="The search query",
                    required=True
                ),
                ToolParameter(
                    name="max_results",
                    type="number",
                    description="Maximum results to return",
                    required=False
                )
            ]
        )
    
    def execute(self, query: str, max_results: Optional[int] = None) -> Dict[str, Any]:
        """Execute search using SERP API."""
        num_results = max_results if max_results is not None else self.max_results
        
        params = {
            "q": query,
            "api_key": self.api_key,
            "num": num_results
        }
        
        response = requests.get(self.base_url, params=params)
        response.raise_for_status()
        
        data = response.json()
        organic_results = data.get("organic_results", [])
        
        formatted_results = []
        for idx, result in enumerate(organic_results[:num_results], 1):
            formatted_results.append({
                "position": idx,
                "title": result.get("title", ""),
                "url": result.get("link", ""),
                "snippet": result.get("snippet", "")
            })
        
        return {
            "query": query,
            "timestamp": datetime.now().isoformat(),
            "results": formatted_results,
            "count": len(formatted_results)
        }

Both search tools implement the same schema, making them interchangeable. An application can switch from the free DuckDuckGo service to the paid SERP API by simply registering a different tool instance, without changing any other code.

Model Context Protocol Integration Component

The Model Context Protocol, developed by Anthropic, provides a standardized way for LLM applications to access external context sources. MCP servers expose resources like files, databases, or APIs through a uniform interface. MCP clients consume these resources and make them available to LLMs.

The MCP integration component provides both client and server implementations, allowing applications to act as either consumers or providers of context. This enables sophisticated architectures where multiple applications share context through MCP.

from typing import List, Dict, Any, Optional, Protocol
import asyncio
import json

class MCPResource(BaseModel):
    """Represents a resource available through MCP."""
    uri: str
    name: str
    description: Optional[str] = None
    mime_type: Optional[str] = None
    
class MCPTool(BaseModel):
    """Represents a tool available through MCP."""
    name: str
    description: str
    input_schema: Dict[str, Any]

class MCPClient:
    """Client for connecting to MCP servers."""
    
    def __init__(self, server_url: str):
        """
        Initialize MCP client.
        
        Args:
            server_url: URL of the MCP server
        """
        self.server_url = server_url
        self.logger = logging.getLogger(__name__)
        self._session = None
    
    async def connect(self):
        """Establish connection to MCP server."""
        self.logger.info(f"Connecting to MCP server: {self.server_url}")
        # Implementation would establish actual connection
        # This is a simplified version showing the interface
        self._session = {"connected": True}
    
    async def list_resources(self) -> List[MCPResource]:
        """
        List all resources available from the server.
        
        Returns:
            List of MCPResource objects
        """
        if not self._session:
            raise RuntimeError("Not connected to MCP server")
        
        # In real implementation, this would make an RPC call
        # Here we show the interface structure
        self.logger.info("Listing MCP resources")
        return []
    
    async def read_resource(self, uri: str) -> Dict[str, Any]:
        """
        Read content of a specific resource.
        
        Args:
            uri: Resource URI
        
        Returns:
            Resource content and metadata
        """
        if not self._session:
            raise RuntimeError("Not connected to MCP server")
        
        self.logger.info(f"Reading resource: {uri}")
        # Real implementation would fetch the resource
        return {
            "uri": uri,
            "content": "",
            "mime_type": "text/plain"
        }
    
    async def list_tools(self) -> List[MCPTool]:
        """
        List tools available from the server.
        
        Returns:
            List of MCPTool objects
        """
        if not self._session:
            raise RuntimeError("Not connected to MCP server")
        
        self.logger.info("Listing MCP tools")
        return []
    
    async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """
        Invoke a tool on the server.
        
        Args:
            name: Tool name
            arguments: Tool arguments
        
        Returns:
            Tool execution results
        """
        if not self._session:
            raise RuntimeError("Not connected to MCP server")
        
        self.logger.info(f"Calling MCP tool: {name}")
        # Real implementation would make RPC call
        return {"result": None}
    
    async def disconnect(self):
        """Close connection to MCP server."""
        if self._session:
            self.logger.info("Disconnecting from MCP server")
            self._session = None

The MCP client provides asynchronous methods for all operations because network communication is inherently asynchronous. Using async/await allows the application to remain responsive while waiting for server responses.

The client separates resource access from tool invocation. Resources are passive data sources that the client reads. Tools are active operations that the server executes on behalf of the client. This distinction is important because it affects caching, permissions, and error handling.

The server implementation mirrors the client:

class MCPServer:
    """Server implementation for exposing resources and tools via MCP."""
    
    def __init__(self, name: str, version: str):
        """
        Initialize MCP server.
        
        Args:
            name: Server name
            version: Server version
        """
        self.name = name
        self.version = version
        self.resources: Dict[str, Callable] = {}
        self.tools: Dict[str, Callable] = {}
        self.logger = logging.getLogger(__name__)
    
    def register_resource(
        self,
        uri: str,
        name: str,
        handler: Callable,
        description: Optional[str] = None,
        mime_type: Optional[str] = None
    ):
        """
        Register a resource handler.
        
        Args:
            uri: Resource URI
            name: Resource name
            handler: Async function that returns resource content
            description: Resource description
            mime_type: MIME type of resource content
        """
        self.resources[uri] = {
            "name": name,
            "handler": handler,
            "description": description,
            "mime_type": mime_type
        }
        self.logger.info(f"Registered resource: {uri}")
    
    def register_tool(
        self,
        name: str,
        handler: Callable,
        description: str,
        input_schema: Dict[str, Any]
    ):
        """
        Register a tool handler.
        
        Args:
            name: Tool name
            handler: Async function that executes the tool
            description: Tool description
            input_schema: JSON schema for tool inputs
        """
        self.tools[name] = {
            "handler": handler,
            "description": description,
            "input_schema": input_schema
        }
        self.logger.info(f"Registered tool: {name}")
    
    async def handle_list_resources(self) -> List[MCPResource]:
        """Handle request to list resources."""
        return [
            MCPResource(
                uri=uri,
                name=info["name"],
                description=info["description"],
                mime_type=info["mime_type"]
            )
            for uri, info in self.resources.items()
        ]
    
    async def handle_read_resource(self, uri: str) -> Dict[str, Any]:
        """Handle request to read a resource."""
        if uri not in self.resources:
            raise ValueError(f"Resource not found: {uri}")
        
        resource_info = self.resources[uri]
        content = await resource_info["handler"]()
        
        return {
            "uri": uri,
            "content": content,
            "mime_type": resource_info["mime_type"]
        }
    
    async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """Handle request to call a tool."""
        if name not in self.tools:
            raise ValueError(f"Tool not found: {name}")
        
        tool_info = self.tools[name]
        result = await tool_info["handler"](**arguments)
        
        return {"result": result}

The server uses a registration pattern where handlers are registered for specific URIs and tool names. This makes it easy to add new resources and tools dynamically. The handlers are async functions, allowing them to perform I/O operations efficiently.

An example of using the MCP server to expose a file system:


import aiofiles
from pathlib import Path

async def create_file_server(base_path: str) -> MCPServer:
    """
    Create an MCP server that exposes a file system.
    
    Args:
        base_path: Root directory to expose
    
    Returns:
        Configured MCPServer instance
    """
    server = MCPServer(name="file_server", version="1.0.0")
    base = Path(base_path)
    
    # Register a resource for each file
    for file_path in base.rglob("*.txt"):
        relative_path = file_path.relative_to(base)
        uri = f"file:///{relative_path}"
        
        async def read_file(path=file_path):
            async with aiofiles.open(path, 'r') as f:
                return await f.read()
        
        server.register_resource(
            uri=uri,
            name=str(relative_path),
            handler=read_file,
            description=f"Text file: {relative_path}",
            mime_type="text/plain"
        )
    
    # Register a search tool
    async def search_files(query: str):
        results = []
        for file_path in base.rglob("*.txt"):
            async with aiofiles.open(file_path, 'r') as f:
                content = await f.read()
                if query.lower() in content.lower():
                    results.append(str(file_path.relative_to(base)))
        return results
    
    server.register_tool(
        name="search_files",
        handler=search_files,
        description="Search for files containing specific text",
        input_schema={
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Text to search for"
                }
            },
            "required": ["query"]
        }
    )
    
    return server

This example shows how MCP enables powerful integrations. The file server exposes an entire directory tree as MCP resources, making all files accessible to any MCP client. The search tool provides a way to find files by content, demonstrating how MCP tools can perform operations beyond simple data retrieval.

Message Management and Chat History Component

Conversational LLM applications must manage the flow of messages between users and the model. Each interaction involves system messages that set behavior, user messages containing requests, and assistant messages with responses. Managing this conversation state correctly is essential for coherent multi-turn interactions.

The message management component provides structures for representing messages and utilities for managing conversation history. It handles concerns like context window limits, message formatting, and conversation persistence.

from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
import json
from pathlib import Path

@dataclass
class ConversationMessage:
    """Represents a single message in a conversation."""
    role: str
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert message to dictionary format."""
        return {
            "role": self.role,
            "content": self.content,
            "timestamp": self.timestamp.isoformat(),
            "metadata": self.metadata
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'ConversationMessage':
        """Create message from dictionary."""
        return cls(
            role=data["role"],
            content=data["content"],
            timestamp=datetime.fromisoformat(data["timestamp"]),
            metadata=data.get("metadata", {})
        )

class ChatHistory:
    """Manages conversation history with context window awareness."""
    
    def __init__(
        self,
        system_message: Optional[str] = None,
        max_context_tokens: int = 4096,
        token_counter: Optional[Callable[[str], int]] = None
    ):
        """
        Initialize chat history manager.
        
        Args:
            system_message: Initial system message
            max_context_tokens: Maximum tokens to keep in context
            token_counter: Function to count tokens in text
        """
        self.messages: List[ConversationMessage] = []
        self.max_context_tokens = max_context_tokens
        self.token_counter = token_counter or self._default_token_counter
        self.logger = logging.getLogger(__name__)
        
        if system_message:
            self.add_system_message(system_message)
    
    def _default_token_counter(self, text: str) -> int:
        """
        Default token counter using simple word-based estimation.
        
        Args:
            text: Text to count tokens for
        
        Returns:
            Estimated token count
        """
        # Rough estimation: 1 token per 4 characters
        return len(text) // 4
    
    def add_system_message(self, content: str, metadata: Optional[Dict[str, Any]] = None):
        """Add a system message."""
        message = ConversationMessage(
            role="system",
            content=content,
            metadata=metadata or {}
        )
        self.messages.append(message)
        self.logger.debug("Added system message")
    
    def add_user_message(self, content: str, metadata: Optional[Dict[str, Any]] = None):
        """Add a user message."""
        message = ConversationMessage(
            role="user",
            content=content,
            metadata=metadata or {}
        )
        self.messages.append(message)
        self.logger.debug("Added user message")
    
    def add_assistant_message(self, content: str, metadata: Optional[Dict[str, Any]] = None):
        """Add an assistant message."""
        message = ConversationMessage(
            role="assistant",
            content=content,
            metadata=metadata or {}
        )
        self.messages.append(message)
        self.logger.debug("Added assistant message")
    
    def get_messages_for_llm(self) -> List[Message]:
        """
        Get messages formatted for LLM, respecting context window.
        
        Returns:
            List of Message objects within token limit
        """
        # Always include system messages
        system_messages = [msg for msg in self.messages if msg.role == "system"]
        conversation_messages = [msg for msg in self.messages if msg.role != "system"]
        
        # Count tokens for system messages
        system_tokens = sum(
            self.token_counter(msg.content) for msg in system_messages
        )
        
        # Calculate available tokens for conversation
        available_tokens = self.max_context_tokens - system_tokens
        
        # Add conversation messages from most recent, staying within limit
        selected_messages = []
        current_tokens = 0
        
        for msg in reversed(conversation_messages):
            msg_tokens = self.token_counter(msg.content)
            if current_tokens + msg_tokens > available_tokens:
                break
            selected_messages.insert(0, msg)
            current_tokens += msg_tokens
        
        # Combine system and selected conversation messages
        all_messages = system_messages + selected_messages
        
        # Convert to Message objects
        return [
            Message(role=msg.role, content=msg.content)
            for msg in all_messages
        ]

The chat history component implements intelligent context window management. It ensures that the total tokens sent to the LLM never exceed the model's context limit. System messages are always included because they define the assistant's behavior. Conversation messages are included starting from the most recent, working backward until the token limit is reached.

This approach ensures that the model always has the most relevant context. Recent messages are more important for maintaining conversation coherence than older messages. If the conversation becomes very long, older messages are automatically dropped.

The token counter is pluggable. The default implementation uses a simple character-based estimation, but applications can provide more accurate counters using tokenizers specific to their model:

    def set_token_counter(self, counter: Callable[[str], int]):
        """
        Set a custom token counter.
        
        Args:
            counter: Function that takes text and returns token count
        """
        self.token_counter = counter
        self.logger.info("Updated token counter")
    
    def get_total_tokens(self) -> int:
        """Calculate total tokens in current history."""
        return sum(
            self.token_counter(msg.content) for msg in self.messages
        )
    
    def clear(self):
        """Clear all messages except system messages."""
        self.messages = [msg for msg in self.messages if msg.role == "system"]
        self.logger.info("Cleared conversation history")
    
    def save(self, file_path: Union[str, Path]):
        """
        Save conversation history to file.
        
        Args:
            file_path: Path to save conversation
        """
        path = Path(file_path)
        data = {
            "max_context_tokens": self.max_context_tokens,
            "messages": [msg.to_dict() for msg in self.messages]
        }
        
        with open(path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2)
        
        self.logger.info(f"Saved conversation to {path}")
    
    @classmethod
    def load(cls, file_path: Union[str, Path]) -> 'ChatHistory':
        """
        Load conversation history from file.
        
        Args:
            file_path: Path to conversation file
        
        Returns:
            ChatHistory instance with loaded conversation
        """
        path = Path(file_path)
        
        with open(path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        history = cls(max_context_tokens=data["max_context_tokens"])
        history.messages = [
            ConversationMessage.from_dict(msg_data)
            for msg_data in data["messages"]
        ]
        
        return history

The save and load methods enable conversation persistence. Applications can save conversations to disk and resume them later. This is essential for applications that need to maintain state across sessions or allow users to review past conversations.

The metadata field in ConversationMessage provides extensibility. Applications can attach arbitrary data to messages, such as user IDs, confidence scores, or references to external resources. This metadata is preserved when saving and loading conversations.

Circuit Breaker and Rate Limiting Component

Production LLM applications must handle failures gracefully and respect API rate limits. External LLM services can experience outages, network issues can cause timeouts, and exceeding rate limits can result in blocked requests. The circuit breaker and rate limiting component provides resilience mechanisms that prevent cascading failures and ensure compliance with service quotas.

A circuit breaker monitors requests to an external service and automatically stops sending requests when the service appears to be failing. This prevents wasting resources on requests that will likely fail and gives the service time to recover. After a cooldown period, the circuit breaker allows a test request through to check if the service has recovered.

from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any, Optional
import time
from collections import deque
import asyncio

class CircuitState(Enum):
    """States of a circuit breaker."""
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Blocking requests
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    """
    Circuit breaker for protecting against cascading failures.
    
    Monitors request failures and opens the circuit when failure
    threshold is exceeded, preventing further requests until recovery.
    """
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        """
        Initialize circuit breaker.
        
        Args:
            failure_threshold: Number of failures before opening circuit
            recovery_timeout: Seconds to wait before attempting recovery
            expected_exception: Exception type to count as failure
        """
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time: Optional[datetime] = None
        self.state = CircuitState.CLOSED
        self.logger = logging.getLogger(__name__)
    
    def call(self, func: Callable, *args, **kwargs) -> Any:
        """
        Execute function with circuit breaker protection.
        
        Args:
            func: Function to execute
            *args: Positional arguments for function
            **kwargs: Keyword arguments for function
        
        Returns:
            Function result
        
        Raises:
            Exception: If circuit is open or function fails
        """
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.logger.info("Circuit breaker entering half-open state")
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        
        except self.expected_exception as e:
            self._on_failure()
            raise
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt recovery."""
        if self.last_failure_time is None:
            return False
        
        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.recovery_timeout
    
    def _on_success(self):
        """Handle successful request."""
        if self.state == CircuitState.HALF_OPEN:
            self.logger.info("Circuit breaker closing after successful test")
            self.state = CircuitState.CLOSED
        
        self.failure_count = 0
    
    def _on_failure(self):
        """Handle failed request."""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.logger.warning(
                f"Circuit breaker opening after {self.failure_count} failures"
            )
            self.state = CircuitState.OPEN
    
    def reset(self):
        """Manually reset circuit breaker to closed state."""
        self.logger.info("Manually resetting circuit breaker")
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None

The circuit breaker uses a state machine with three states. In the closed state, requests flow normally. When failures exceed the threshold, the circuit opens and blocks all requests. After the recovery timeout, the circuit enters half-open state and allows one test request. If the test succeeds, the circuit closes. If it fails, the circuit reopens.

This mechanism prevents overwhelming a failing service with requests while still allowing automatic recovery. The recovery timeout gives the service time to stabilize before testing whether it is healthy again.

Rate limiting complements the circuit breaker by preventing the application from exceeding service quotas:

class RateLimiter:
    """
    Token bucket rate limiter.
    
    Limits the rate of operations to prevent exceeding API quotas.
    """
    
    def __init__(
        self,
        max_requests: int,
        time_window: int,
        burst_size: Optional[int] = None
    ):
        """
        Initialize rate limiter.
        
        Args:
            max_requests: Maximum requests allowed in time window
            time_window: Time window in seconds
            burst_size: Maximum burst size (defaults to max_requests)
        """
        self.max_requests = max_requests
        self.time_window = time_window
        self.burst_size = burst_size or max_requests
        
        self.tokens = self.burst_size
        self.last_update = time.time()
        self.logger = logging.getLogger(__name__)
    
    def acquire(self, tokens: int = 1) -> bool:
        """
        Attempt to acquire tokens for a request.
        
        Args:
            tokens: Number of tokens to acquire
        
        Returns:
            True if tokens were acquired, False otherwise
        """
        self._refill()
        
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        
        return False
    
    def wait_and_acquire(self, tokens: int = 1, timeout: Optional[float] = None):
        """
        Wait until tokens are available and acquire them.
        
        Args:
            tokens: Number of tokens to acquire
            timeout: Maximum time to wait in seconds
        
        Raises:
            TimeoutError: If timeout is exceeded
        """
        start_time = time.time()
        
        while not self.acquire(tokens):
            if timeout and (time.time() - start_time) > timeout:
                raise TimeoutError("Rate limiter timeout exceeded")
            
            # Calculate wait time until next token
            wait_time = self._time_until_next_token()
            time.sleep(min(wait_time, 0.1))
    
    def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.time()
        elapsed = now - self.last_update
        
        # Calculate tokens to add based on elapsed time
        tokens_to_add = (elapsed / self.time_window) * self.max_requests
        
        self.tokens = min(self.burst_size, self.tokens + tokens_to_add)
        self.last_update = now
    
    def _time_until_next_token(self) -> float:
        """Calculate seconds until next token is available."""
        if self.tokens >= 1:
            return 0.0
        
        tokens_needed = 1 - self.tokens
        return (tokens_needed / self.max_requests) * self.time_window

The rate limiter implements the token bucket algorithm. Tokens are added to the bucket at a steady rate determined by max_requests and time_window. Each request consumes one token. If no tokens are available, the request must wait.

The burst_size parameter allows short bursts of requests above the average rate. This is useful for handling legitimate traffic spikes without triggering rate limits. The bucket can hold more tokens than the steady-state rate, allowing accumulated tokens to be spent quickly.

The wait_and_acquire method blocks until tokens are available. This is convenient for applications that can afford to wait rather than failing immediately. The timeout parameter prevents indefinite blocking.

Combining circuit breaker and rate limiter creates robust request handling:

class ResilientLLMClient:
    """LLM client with circuit breaker and rate limiting."""
    
    def __init__(
        self,
        llm: BaseLLM,
        max_requests_per_minute: int = 60,
        circuit_breaker_threshold: int = 5
    ):
        """
        Initialize resilient LLM client.
        
        Args:
            llm: Underlying LLM implementation
            max_requests_per_minute: Rate limit
            circuit_breaker_threshold: Failures before opening circuit
        """
        self.llm = llm
        self.rate_limiter = RateLimiter(
            max_requests=max_requests_per_minute,
            time_window=60
        )
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=circuit_breaker_threshold
        )
        self.logger = logging.getLogger(__name__)
    
    def complete(
        self,
        messages: List[Message],
        **kwargs
    ) -> CompletionResponse:
        """
        Generate completion with resilience mechanisms.
        
        Args:
            messages: Conversation messages
            **kwargs: Additional parameters for LLM
        
        Returns:
            CompletionResponse
        """
        # Wait for rate limit
        self.rate_limiter.wait_and_acquire()
        
        # Execute with circuit breaker
        def _complete():
            return self.llm.complete(messages, **kwargs)
        
        return self.circuit_breaker.call(_complete)

The resilient client wraps an LLM implementation with both rate limiting and circuit breaking. Every request first waits for rate limit tokens, then executes through the circuit breaker. This ensures that the application respects rate limits and handles failures gracefully.

Additional Useful Components

Beyond the core components already discussed, several additional utilities enhance LLM application development.

Prompt Template Component

Prompt engineering is critical for LLM applications, but hard-coding prompts makes them difficult to modify and test. A prompt template component provides a structured way to define, parameterize, and manage prompts.

from string import Template
from typing import Dict, Any, Optional

class PromptTemplate:
    """Template for constructing prompts with variable substitution."""
    
    def __init__(self, template: str, description: Optional[str] = None):
        """
        Initialize prompt template.
        
        Args:
            template: Template string with ${variable} placeholders
            description: Description of template purpose
        """
        self.template = Template(template)
        self.raw_template = template
        self.description = description
        self.logger = logging.getLogger(__name__)
    
    def format(self, **kwargs) -> str:
        """
        Format template with provided variables.
        
        Args:
            **kwargs: Variable values
        
        Returns:
            Formatted prompt string
        """
        try:
            return self.template.substitute(**kwargs)
        except KeyError as e:
            missing_var = str(e).strip("'")
            raise ValueError(f"Missing required variable: {missing_var}")
    
    def get_variables(self) -> List[str]:
        """Extract variable names from template."""
        import re
        pattern = r'\$\{([^}]+)\}'
        return re.findall(pattern, self.raw_template)

Prompt templates use Python's string.Template for variable substitution. This provides a simple syntax while preventing code injection vulnerabilities that could occur with more powerful templating systems.

A template library manages collections of prompts:

class PromptLibrary:
    """Library for managing prompt templates."""
    
    def __init__(self):
        self.templates: Dict[str, PromptTemplate] = {}
        self.logger = logging.getLogger(__name__)
    
    def register(self, name: str, template: PromptTemplate):
        """Register a template."""
        self.templates[name] = template
        self.logger.info(f"Registered prompt template: {name}")
    
    def get(self, name: str) -> PromptTemplate:
        """Retrieve a template by name."""
        if name not in self.templates:
            raise KeyError(f"Template not found: {name}")
        return self.templates[name]
    
    def format(self, name: str, **kwargs) -> str:
        """Format a template by name."""
        template = self.get(name)
        return template.format(**kwargs)

Using a prompt library, applications can define all prompts in one place and reference them by name. This separation of prompts from code makes it easy to experiment with different phrasings and maintain consistency across the application.

Embedding and Vector Store Component

Many LLM applications use embeddings for semantic search and retrieval-augmented generation. An embedding component provides a consistent interface for generating embeddings from different models.

import numpy as np
from typing import List, Union

class EmbeddingModel(ABC):
    """Abstract base for embedding models."""
    
    @abstractmethod
    def embed(self, texts: Union[str, List[str]]) -> np.ndarray:
        """
        Generate embeddings for text(s).
        
        Args:
            texts: Single text or list of texts
        
        Returns:
            Numpy array of embeddings
        """
        pass
    
    @abstractmethod
    def get_dimension(self) -> int:
        """Return embedding dimension."""
        pass

A simple vector store provides in-memory storage and similarity search:

from typing import List, Tuple, Optional
import numpy as np

class VectorStore:
    """In-memory vector store with similarity search."""
    
    def __init__(self, embedding_model: EmbeddingModel):
        """
        Initialize vector store.
        
        Args:
            embedding_model: Model for generating embeddings
        """
        self.embedding_model = embedding_model
        self.vectors: List[np.ndarray] = []
        self.documents: List[str] = []
        self.metadata: List[Dict[str, Any]] = []
        self.logger = logging.getLogger(__name__)
    
    def add(self, text: str, metadata: Optional[Dict[str, Any]] = None):
        """
        Add document to store.
        
        Args:
            text: Document text
            metadata: Optional metadata
        """
        embedding = self.embedding_model.embed(text)
        self.vectors.append(embedding)
        self.documents.append(text)
        self.metadata.append(metadata or {})
    
    def search(
        self,
        query: str,
        top_k: int = 5
    ) -> List[Tuple[str, float, Dict[str, Any]]]:
        """
        Search for similar documents.
        
        Args:
            query: Search query
            top_k: Number of results to return
        
        Returns:
            List of (document, similarity_score, metadata) tuples
        """
        if not self.vectors:
            return []
        
        query_embedding = self.embedding_model.embed(query)
        
        # Calculate cosine similarities
        similarities = []
        for vec in self.vectors:
            similarity = self._cosine_similarity(query_embedding, vec)
            similarities.append(similarity)
        
        # Get top-k indices
        top_indices = np.argsort(similarities)[-top_k:][::-1]
        
        results = [
            (self.documents[i], similarities[i], self.metadata[i])
            for i in top_indices
        ]
        
        return results
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        """Calculate cosine similarity between vectors."""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

This vector store provides basic semantic search capabilities. Applications can add documents with their embeddings and search for similar documents using cosine similarity. For production use, applications would typically use specialized vector databases like Pinecone, Weaviate, or Chroma, but this component provides a simple interface that works with any backend.

Logging and Observability Component

Understanding LLM application behavior requires comprehensive logging. A logging component standardizes log formatting and provides utilities for tracking LLM interactions.

import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any

class LLMLogger:
    """Specialized logger for LLM interactions."""
    
    def __init__(self, name: str, log_file: Optional[str] = None):
        """
        Initialize LLM logger.
        
        Args:
            name: Logger name
            log_file: Optional file for structured logs
        """
        self.logger = logging.getLogger(name)
        self.log_file = log_file
    
    def log_completion(
        self,
        messages: List[Message],
        response: CompletionResponse,
        duration: float,
        metadata: Optional[Dict[str, Any]] = None
    ):
        """
        Log an LLM completion request and response.
        
        Args:
            messages: Input messages
            response: LLM response
            duration: Request duration in seconds
            metadata: Additional metadata
        """
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "type": "completion",
            "model": response.model,
            "duration_seconds": duration,
            "token_usage": response.usage,
            "message_count": len(messages),
            "finish_reason": response.finish_reason,
            "metadata": metadata or {}
        }
        
        self.logger.info(f"Completion: {response.model} ({duration:.2f}s)")
        
        if self.log_file:
            self._write_structured_log(log_entry)
    
    def _write_structured_log(self, entry: Dict[str, Any]):
        """Write structured log entry to file."""
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(json.dumps(entry) + '\n')

The LLM logger creates structured logs that can be analyzed to understand usage patterns, costs, and performance. Each completion is logged with timing information, token usage, and custom metadata.

Complete Running Example

The following complete example demonstrates how all components integrate to create a functional LLM application. This application provides a conversational interface with web search capabilities, configuration management, and resilience mechanisms.

import logging
import sys
import time
from pathlib import Path
from typing import Optional, List, Dict, Any
import json

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler('llm_app.log')
    ]
)

class LLMApplication:
    """
    Complete LLM application integrating all components.
    
    This application provides a conversational interface with:
    - Automatic GPU detection and optimization
    - Configurable LLM backend (local or remote)
    - Web search tool integration
    - Chat history management
    - Rate limiting and circuit breaking
    - Comprehensive logging
    """
    
    def __init__(self, config_path: str):
        """
        Initialize application with configuration file.
        
        Args:
            config_path: Path to configuration file (JSON or YAML)
        """
        self.logger = logging.getLogger(__name__)
        self.logger.info("Initializing LLM Application")
        
        # Load configuration
        self.config_manager = ConfigurationManager(config_path)
        self.config = self.config_manager.load()
        
        # Setup logging level from config
        logging.getLogger().setLevel(self.config.logging_level)
        
        # Initialize GPU detector
        self.gpu_detector = GPUDetector()
        self.accelerator_info = self.gpu_detector.detect()
        self.logger.info(
            f"Using accelerator: {self.accelerator_info.accelerator_type.value}"
        )
        
        # Initialize LLM based on configuration
        self.llm = self._initialize_llm()
        
        # Initialize tool registry and register tools
        self.tool_registry = ToolRegistry()
        self._register_tools()
        
        # Initialize chat history
        self.chat_history = ChatHistory(
            system_message=self.config.llm.system_message,
            max_context_tokens=self.config.llm.context_window
        )
        
        # Initialize resilient client
        self.resilient_client = ResilientLLMClient(
            llm=self.llm,
            max_requests_per_minute=60,
            circuit_breaker_threshold=5
        )
        
        # Initialize specialized logger
        self.llm_logger = LLMLogger(
            name="llm_interactions",
            log_file="llm_interactions.jsonl"
        )
        
        self.logger.info("Application initialization complete")
    
    def _initialize_llm(self) -> BaseLLM:
        """
        Initialize LLM based on configuration.
        
        Returns:
            Configured LLM instance
        """
        model_name = self.config.llm.model_name
        
        # Determine if this is a local or remote model
        if model_name.startswith("gpt-") or model_name.startswith("claude-"):
            # Remote API model
            api_key = self.config.api_keys.get("openai")
            if not api_key:
                raise ValueError("OpenAI API key required for GPT models")
            
            self.logger.info(f"Initializing OpenAI LLM: {model_name}")
            return OpenAILLM(model_name=model_name, api_key=api_key)
        else:
            # Local transformer model
            device = self.gpu_detector.get_device_string()
            self.logger.info(f"Initializing local LLM: {model_name} on {device}")
            
            return LocalTransformerLLM(
                model_name=model_name,
                device=device,
                trust_remote_code=True
            )
    
    def _register_tools(self):
        """Register available tools."""
        # Register web search tool
        search_tool = WebSearchTool(max_results=5)
        self.tool_registry.register(search_tool)
        self.logger.info("Registered web search tool")
    
    def process_user_input(self, user_input: str) -> str:
        """
        Process user input and generate response.
        
        Args:
            user_input: User's message
        
        Returns:
            Assistant's response
        """
        self.logger.info(f"Processing user input: {user_input[:50]}...")
        
        # Add user message to history
        self.chat_history.add_user_message(user_input)
        
        # Check if tools should be used
        # In a real implementation, this would use the LLM's tool calling capability
        # For this example, we use a simple keyword check
        response_text = ""
        
        if "search" in user_input.lower() or "find" in user_input.lower():
            # Extract search query (simplified)
            search_query = user_input
            
            # Execute search tool
            search_result = self.tool_registry.execute_tool(
                "web_search",
                {"query": search_query, "max_results": 3}
            )
            
            if search_result["success"]:
                # Add search results to context
                results_text = self._format_search_results(search_result["result"])
                
                # Create augmented prompt
                augmented_input = f"""User asked: {user_input}

Here are relevant search results: {results_text}

Please provide a helpful response based on this information."""

                self.chat_history.messages[-1].content = augmented_input
        
        # Get messages for LLM
        messages = self.chat_history.get_messages_for_llm()
        
        # Generate completion
        start_time = time.time()
        
        try:
            response = self.resilient_client.complete(
                messages=messages,
                temperature=self.config.llm.temperature,
                max_tokens=self.config.llm.max_tokens,
                top_p=self.config.llm.top_p,
                top_k=self.config.llm.top_k
            )
            
            duration = time.time() - start_time
            response_text = response.content
            
            # Log the interaction
            self.llm_logger.log_completion(
                messages=messages,
                response=response,
                duration=duration
            )
            
            # Add assistant response to history
            self.chat_history.add_assistant_message(response_text)
            
            self.logger.info(f"Generated response in {duration:.2f}s")
            
        except Exception as e:
            self.logger.error(f"Error generating response: {e}", exc_info=True)
            response_text = "I apologize, but I encountered an error processing your request. Please try again."
        
        return response_text
    
    def _format_search_results(self, search_data: Dict[str, Any]) -> str:
        """
        Format search results for inclusion in prompt.
        
        Args:
            search_data: Search results from tool
        
        Returns:
            Formatted text
        """
        results = search_data.get("results", [])
        formatted = []
        
        for result in results:
            formatted.append(
                f"[{result['position']}] {result['title']}\n"
                f"URL: {result['url']}\n"
                f"{result['snippet']}\n"
            )
        
        return "\n".join(formatted)
    
    def save_conversation(self, file_path: str):
        """
        Save current conversation to file.
        
        Args:
            file_path: Destination file path
        """
        self.chat_history.save(file_path)
        self.logger.info(f"Saved conversation to {file_path}")
    
    def load_conversation(self, file_path: str):
        """
        Load conversation from file.
        
        Args:
            file_path: Source file path
        """
        self.chat_history = ChatHistory.load(file_path)
        self.logger.info(f"Loaded conversation from {file_path}")
    
    def run_interactive(self):
        """Run interactive conversation loop."""
        print("LLM Application Started")
        print(f"Using model: {self.config.llm.model_name}")
        print(f"Accelerator: {self.accelerator_info.accelerator_type.value}")
        print("Type 'quit' to exit, 'save' to save conversation, 'clear' to clear history\n")
        
        while True:
            try:
                user_input = input("You: ").strip()
                
                if not user_input:
                    continue
                
                if user_input.lower() == 'quit':
                    print("Goodbye!")
                    break
                
                if user_input.lower() == 'save':
                    filename = f"conversation_{int(time.time())}.json"
                    self.save_conversation(filename)
                    print(f"Conversation saved to {filename}")
                    continue
                
                if user_input.lower() == 'clear':
                    self.chat_history.clear()
                    print("Conversation history cleared")
                    continue
                
                response = self.process_user_input(user_input)
                print(f"\nAssistant: {response}\n")
            
            except KeyboardInterrupt:
                print("\nGoodbye!")
                break
            except Exception as e:
                self.logger.error(f"Error in interactive loop: {e}", exc_info=True)
                print(f"Error: {e}")


def create_default_config(output_path: str):
    """
    Create a default configuration file.
    
    Args:
        output_path: Path for configuration file
    """
    config = ApplicationConfig(
        llm=LLMModelConfig(
            model_name="gpt-3.5-turbo",
            temperature=0.7,
            max_tokens=2048,
            top_p=0.95,
            top_k=50,
            context_window=4096,
            system_message="You are a helpful AI assistant with access to web search. Provide accurate, helpful responses."
        ),
        api_keys={},
        logging_level="INFO",
        enable_streaming=True,
        max_retries=3,
        timeout_seconds=60
    )
    
    manager = ConfigurationManager()
    manager.save(config, output_path)
    print(f"Created default configuration at {output_path}")


def main():
    """Main entry point for the application."""
    import argparse
    
    parser = argparse.ArgumentParser(description="LLM Application")
    parser.add_argument(
        "--config",
        type=str,
        default="config.yaml",
        help="Path to configuration file"
    )
    parser.add_argument(
        "--create-config",
        action="store_true",
        help="Create default configuration file"
    )
    
    args = parser.parse_args()
    
    if args.create_config:
        create_default_config(args.config)
        return
    
    # Check if config exists
    if not Path(args.config).exists():
        print(f"Configuration file not found: {args.config}")
        print("Create one with: python app.py --create-config")
        return
    
    # Initialize and run application
    app = LLMApplication(config_path=args.config)
    app.run_interactive()


if __name__ == "__main__":
    main()

This complete example demonstrates how all components work together. The application initializes by loading configuration, detecting GPU hardware, setting up the LLM, registering tools, and creating the chat history manager. The process_user_input method orchestrates the entire flow: adding messages to history, potentially invoking tools, generating completions through the resilient client, and logging interactions.

The interactive loop provides a simple command-line interface where users can have conversations, save and load conversation history, and clear the context. The application handles errors gracefully and provides informative logging throughout.

To use this application, users first create a configuration file:

python app.py --create-config

Then edit the configuration to add API keys and adjust parameters. Finally, run the application:

python app.py --config config.yaml

The application demonstrates production-ready patterns including proper error handling, comprehensive logging, configuration management, and graceful degradation when services are unavailable.

Conclusion

This article has presented a comprehensive Python library for LLM application development. Each component addresses a specific recurring challenge: GPU detection eliminates platform-specific code, the abstract LLM interface enables model swapping, configuration management externalizes settings, tool calling extends LLM capabilities, MCP integration enables context sharing, message management handles conversation state, and circuit breakers with rate limiting provide resilience.

The components follow clean architecture principles with clear separation of concerns. Each component has a well-defined interface and can be used independently or in combination with others. The running example demonstrates how these components integrate to create a complete, production-ready application.

By providing these reusable components, the library eliminates the need for developers to repeatedly solve the same problems. Instead of spending time on infrastructure, developers can focus on the unique aspects of their applications: domain-specific logic, user experience, and business value.

The library is designed to be extensible. New LLM providers can be added by implementing the BaseLLM interface. New tools can be registered with the tool registry. Additional resilience mechanisms can wrap the existing components. This extensibility ensures that the library can evolve with the rapidly changing LLM ecosystem.

Future enhancements could include streaming response support in more components, integration with vector databases for retrieval-augmented generation, support for multi-modal models, and enhanced observability with metrics and tracing. The foundation provided by these components makes such enhancements straightforward to implement while maintaining backward compatibility.

The goal of this library is to accelerate LLM application development by providing robust, well-tested components that handle common requirements. By building on this foundation, developers can create sophisticated LLM applications more quickly and with greater confidence in their reliability and maintainability.

No comments: