Introduction
Developing applications that integrate Large Language Models requires solving many recurring technical challenges. Every new project typically requires developers to reimplement GPU detection, configuration management, tool calling mechanisms, rate limiting, and other foundational components. This article presents a comprehensive Python library designed to eliminate this redundant work by providing production-ready, reusable components that handle the most common requirements of LLM-based applications.
The library follows clean architecture principles with clear separation of concerns. Each component is designed to be independently usable while also working seamlessly with other components. The goal is to provide developers with a toolkit that accelerates development without imposing rigid constraints on application architecture.
This article explores each component in depth, explaining not just what it does but why it is designed in a particular way. We will examine the technical challenges each component addresses and provide concrete code examples. At the end, a complete running example demonstrates how all components integrate to create a functional LLM application.
GPU Detection and Optimization Component
Modern LLM inference requires significant computational resources, and utilizing available GPU acceleration is essential for acceptable performance. However, different hardware platforms use different GPU technologies. Apple Silicon uses Metal Performance Shaders, NVIDIA uses CUDA, AMD uses ROCm, and Intel has its own acceleration framework. An application that runs on multiple platforms must detect the available hardware and configure the inference engine accordingly.
The GPU detection component solves this problem by automatically identifying available acceleration hardware and providing the appropriate configuration for the inference engine. This component abstracts away platform-specific details, allowing the rest of the application to remain hardware-agnostic.
The detection process follows a priority order. CUDA is checked first because it offers the most mature ecosystem for LLM inference. If CUDA is unavailable, the component checks for ROCm on AMD hardware, then Metal Performance Shaders on Apple Silicon, and finally Intel acceleration. If no GPU is available, the component falls back to CPU inference with appropriate warnings.
Here is the core implementation of the GPU detector:
import torch
import platform
import logging
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class AcceleratorType(Enum):
"""Enumeration of supported hardware accelerators."""
CUDA = "cuda"
ROCM = "rocm"
MPS = "mps"
INTEL = "intel"
CPU = "cpu"
@dataclass
class AcceleratorInfo:
"""Information about detected hardware accelerator."""
accelerator_type: AcceleratorType
device_name: str
device_count: int
memory_available: Optional[int]
compute_capability: Optional[str]
class GPUDetector:
"""Detects and configures GPU acceleration for LLM inference."""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._accelerator_info = None
def detect(self) -> AcceleratorInfo:
"""
Detect available GPU acceleration and return configuration.
Returns:
AcceleratorInfo object containing hardware details
"""
if self._accelerator_info is not None:
return self._accelerator_info
# Check for NVIDIA CUDA
if torch.cuda.is_available():
self._accelerator_info = self._detect_cuda()
self.logger.info(f"Detected CUDA GPU: {self._accelerator_info.device_name}")
return self._accelerator_info
# Check for AMD ROCm
if hasattr(torch.version, 'hip') and torch.version.hip is not None:
self._accelerator_info = self._detect_rocm()
self.logger.info(f"Detected ROCm GPU: {self._accelerator_info.device_name}")
return self._accelerator_info
# Check for Apple Metal Performance Shaders
if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
self._accelerator_info = self._detect_mps()
self.logger.info(f"Detected Apple MPS: {self._accelerator_info.device_name}")
return self._accelerator_info
# Fallback to CPU
self._accelerator_info = self._detect_cpu()
self.logger.warning("No GPU acceleration available, using CPU")
return self._accelerator_info
The detector implements lazy initialization through caching. Once hardware is detected, subsequent calls return the cached result rather than repeating the detection process. This is important because hardware detection can be expensive and the hardware configuration does not change during application runtime.
Each detection method gathers platform-specific information. For CUDA, this includes device count, memory, and compute capability. For MPS, it includes the chip architecture. This information helps the application make informed decisions about model loading and inference parameters.
def _detect_cuda(self) -> AcceleratorInfo:
"""Detect NVIDIA CUDA configuration."""
device_count = torch.cuda.device_count()
device_name = torch.cuda.get_device_name(0) if device_count > 0 else "Unknown"
memory = torch.cuda.get_device_properties(0).total_memory if device_count > 0 else None
compute_cap = f"{torch.cuda.get_device_properties(0).major}.{torch.cuda.get_device_properties(0).minor}" if device_count > 0 else None
return AcceleratorInfo(
accelerator_type=AcceleratorType.CUDA,
device_name=device_name,
device_count=device_count,
memory_available=memory,
compute_capability=compute_cap
)
def _detect_mps(self) -> AcceleratorInfo:
"""Detect Apple Metal Performance Shaders configuration."""
chip_info = platform.processor() or platform.machine()
return AcceleratorInfo(
accelerator_type=AcceleratorType.MPS,
device_name=f"Apple Silicon {chip_info}",
device_count=1,
memory_available=None,
compute_capability=None
)
def _detect_cpu(self) -> AcceleratorInfo:
"""Fallback CPU configuration."""
import multiprocessing
return AcceleratorInfo(
accelerator_type=AcceleratorType.CPU,
device_name=platform.processor() or "CPU",
device_count=multiprocessing.cpu_count(),
memory_available=None,
compute_capability=None
)
def get_device_string(self) -> str:
"""
Get PyTorch device string for model loading.
Returns:
Device string like 'cuda:0', 'mps', or 'cpu'
"""
info = self.detect()
if info.accelerator_type == AcceleratorType.CUDA:
return "cuda:0"
elif info.accelerator_type == AcceleratorType.MPS:
return "mps"
else:
return "cpu"
The get_device_string method provides the string that PyTorch and other frameworks expect when loading models. This abstraction means application code can simply call get_device_string without knowing anything about the underlying hardware.
The component also provides optimization hints based on detected hardware. For example, on systems with limited GPU memory, it might recommend using quantized models or smaller batch sizes. On high-end CUDA systems, it might enable features like flash attention or tensor cores.
Abstract LLM Interface Component
LLM applications often need to support multiple model providers. An application might use OpenAI's GPT models in production but switch to a local Llama model for development or privacy-sensitive deployments. Alternatively, different parts of an application might use different models optimized for specific tasks.
Hard-coding dependencies on specific LLM providers creates tight coupling that makes the application brittle and difficult to modify. The abstract LLM interface component solves this by defining a common contract that all LLM implementations must satisfy. Application code depends on this interface rather than concrete implementations, enabling seamless model swapping.
The interface defines the essential operations that any LLM must support: generating completions, streaming responses, and managing conversation context. It also standardizes how parameters like temperature and top-k are passed to the model.
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Iterator, Union
from dataclasses import dataclass
@dataclass
class Message:
"""Represents a single message in a conversation."""
role: str # 'system', 'user', or 'assistant'
content: str
name: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert message to dictionary format."""
result = {"role": self.role, "content": self.content}
if self.name:
result["name"] = self.name
return result
@dataclass
class CompletionResponse:
"""Response from LLM completion request."""
content: str
model: str
finish_reason: str
usage: Dict[str, int]
raw_response: Optional[Any] = None
class BaseLLM(ABC):
"""
Abstract base class for LLM implementations.
This interface ensures all LLM providers expose consistent methods,
allowing applications to swap implementations without code changes.
"""
def __init__(self, model_name: str, **kwargs):
"""
Initialize LLM with model name and optional parameters.
Args:
model_name: Identifier for the specific model to use
**kwargs: Provider-specific configuration
"""
self.model_name = model_name
self.config = kwargs
self.logger = logging.getLogger(self.__class__.__name__)
@abstractmethod
def complete(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
top_p: float = 1.0,
top_k: Optional[int] = None,
**kwargs
) -> CompletionResponse:
"""
Generate completion for given messages.
Args:
messages: Conversation history
temperature: Sampling temperature (0.0 to 2.0)
max_tokens: Maximum tokens to generate
top_p: Nucleus sampling parameter
top_k: Top-k sampling parameter
**kwargs: Additional provider-specific parameters
Returns:
CompletionResponse containing generated text and metadata
"""
pass
@abstractmethod
def stream_complete(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
top_p: float = 1.0,
top_k: Optional[int] = None,
**kwargs
) -> Iterator[str]:
"""
Stream completion tokens as they are generated.
Args:
Same as complete()
Yields:
Individual tokens or token chunks as strings
"""
pass
The interface uses dataclasses for structured data. The Message class represents individual conversation turns with role, content, and optional name fields. This structure matches the format used by most LLM APIs, making it easy to adapt existing code.
The CompletionResponse class encapsulates everything the application needs from a completion: the generated text, metadata about token usage, and the reason generation stopped. Including the raw response allows applications to access provider-specific information when needed while maintaining portability.
Concrete implementations of this interface handle the specifics of communicating with different LLM providers. Here is an implementation for local models using the transformers library:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
class LocalTransformerLLM(BaseLLM):
"""LLM implementation using HuggingFace transformers for local inference."""
def __init__(self, model_name: str, device: Optional[str] = None, **kwargs):
"""
Initialize local transformer model.
Args:
model_name: HuggingFace model identifier
device: Device string ('cuda', 'mps', 'cpu') or None for auto-detect
**kwargs: Additional model loading parameters
"""
super().__init__(model_name, **kwargs)
# Auto-detect device if not specified
if device is None:
detector = GPUDetector()
device = detector.get_device_string()
self.device = device
self.logger.info(f"Loading model {model_name} on device {device}")
# Load tokenizer and model
self.tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=kwargs.get('trust_remote_code', False)
)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=kwargs.get('torch_dtype', torch.float16),
device_map=kwargs.get('device_map', 'auto'),
trust_remote_code=kwargs.get('trust_remote_code', False)
)
# Set pad token if not present
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
def complete(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
top_p: float = 1.0,
top_k: Optional[int] = None,
**kwargs
) -> CompletionResponse:
"""Generate completion using local transformer model."""
# Format messages using chat template if available
if hasattr(self.tokenizer, 'apply_chat_template'):
prompt = self.tokenizer.apply_chat_template(
[msg.to_dict() for msg in messages],
tokenize=False,
add_generation_prompt=True
)
else:
# Fallback: concatenate messages
prompt = "\n".join([f"{msg.role}: {msg.content}" for msg in messages])
# Tokenize input
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
input_length = inputs.input_ids.shape[1]
# Generate
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens or 512,
temperature=temperature,
top_p=top_p,
top_k=top_k if top_k else 50,
do_sample=temperature > 0,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id,
**kwargs
)
# Decode only the generated tokens
generated_tokens = outputs[0][input_length:]
generated_text = self.tokenizer.decode(generated_tokens, skip_special_tokens=True)
return CompletionResponse(
content=generated_text,
model=self.model_name,
finish_reason="stop",
usage={
"prompt_tokens": input_length,
"completion_tokens": len(generated_tokens),
"total_tokens": input_length + len(generated_tokens)
}
)
This implementation demonstrates how the abstract interface accommodates different LLM backends. The local transformer implementation handles model loading, device placement, tokenization, and generation. It uses the GPU detector component to automatically select the appropriate device, showing how components work together.
The chat template support is particularly important. Modern instruction-tuned models expect specific formatting for conversation turns. The implementation checks if the tokenizer provides a chat template and uses it when available, falling back to simple concatenation for models without templates.
Another implementation might target OpenAI's API:
import openai
from typing import List, Iterator
class OpenAILLM(BaseLLM):
"""LLM implementation using OpenAI API."""
def __init__(self, model_name: str, api_key: str, **kwargs):
"""
Initialize OpenAI LLM client.
Args:
model_name: OpenAI model identifier (e.g., 'gpt-4', 'gpt-3.5-turbo')
api_key: OpenAI API key
**kwargs: Additional configuration
"""
super().__init__(model_name, **kwargs)
self.client = openai.OpenAI(api_key=api_key)
def complete(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
top_p: float = 1.0,
top_k: Optional[int] = None,
**kwargs
) -> CompletionResponse:
"""Generate completion using OpenAI API."""
# Convert messages to OpenAI format
openai_messages = [msg.to_dict() for msg in messages]
# Make API call
response = self.client.chat.completions.create(
model=self.model_name,
messages=openai_messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
**kwargs
)
# Extract response
choice = response.choices[0]
return CompletionResponse(
content=choice.message.content,
model=response.model,
finish_reason=choice.finish_reason,
usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
},
raw_response=response
)
def stream_complete(
self,
messages: List[Message],
temperature: float = 0.7,
max_tokens: Optional[int] = None,
top_p: float = 1.0,
top_k: Optional[int] = None,
**kwargs
) -> Iterator[str]:
"""Stream completion from OpenAI API."""
openai_messages = [msg.to_dict() for msg in messages]
stream = self.client.chat.completions.create(
model=self.model_name,
messages=openai_messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
stream=True,
**kwargs
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
The OpenAI implementation shows how the same interface adapts to a completely different backend. Instead of loading models and running inference locally, it makes HTTP requests to OpenAI's API. Yet from the application's perspective, both implementations are interchangeable because they satisfy the same contract.
The streaming implementation demonstrates another important capability. Many applications need to display partial results as they are generated rather than waiting for the complete response. The stream_complete method returns an iterator that yields tokens incrementally, enabling responsive user interfaces.
Configuration Management Component
LLM applications require extensive configuration. Model parameters like temperature and top_k affect generation quality. Context window sizes determine how much conversation history the model can consider. API keys and endpoints vary between environments. Hard-coding these values makes applications inflexible and difficult to maintain.
The configuration management component provides a clean way to externalize all configuration into files that can be modified without changing code. It supports both JSON and YAML formats, validates configuration values, and provides sensible defaults.
The component uses a hierarchical structure where general settings can be overridden by environment-specific values. For example, a base configuration might specify default model parameters, while a production configuration overrides the model endpoint and API key.
import json
import yaml
from pathlib import Path
from typing import Any, Dict, Optional, Union
from dataclasses import dataclass, field, asdict
@dataclass
class LLMModelConfig:
"""Configuration for LLM model parameters."""
model_name: str
temperature: float = 0.7
max_tokens: Optional[int] = None
top_p: float = 1.0
top_k: Optional[int] = None
context_window: int = 4096
system_message: Optional[str] = None
def validate(self):
"""Validate configuration values."""
if not 0.0 <= self.temperature <= 2.0:
raise ValueError(f"Temperature must be between 0.0 and 2.0, got {self.temperature}")
if not 0.0 <= self.top_p <= 1.0:
raise ValueError(f"top_p must be between 0.0 and 1.0, got {self.top_p}")
if self.top_k is not None and self.top_k < 1:
raise ValueError(f"top_k must be positive, got {self.top_k}")
if self.context_window < 1:
raise ValueError(f"context_window must be positive, got {self.context_window}")
@dataclass
class ApplicationConfig:
"""Complete application configuration."""
llm: LLMModelConfig
api_keys: Dict[str, str] = field(default_factory=dict)
logging_level: str = "INFO"
enable_streaming: bool = True
max_retries: int = 3
timeout_seconds: int = 30
def validate(self):
"""Validate entire configuration."""
self.llm.validate()
valid_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if self.logging_level not in valid_log_levels:
raise ValueError(f"Invalid logging level: {self.logging_level}")
class ConfigurationManager:
"""Manages application configuration from files."""
def __init__(self, config_path: Optional[Union[str, Path]] = None):
"""
Initialize configuration manager.
Args:
config_path: Path to configuration file (JSON or YAML)
"""
self.config_path = Path(config_path) if config_path else None
self._config: Optional[ApplicationConfig] = None
self.logger = logging.getLogger(__name__)
def load(self, config_path: Optional[Union[str, Path]] = None) -> ApplicationConfig:
"""
Load configuration from file.
Args:
config_path: Path to configuration file, overrides constructor path
Returns:
Validated ApplicationConfig object
"""
path = Path(config_path) if config_path else self.config_path
if path is None:
self.logger.warning("No configuration file specified, using defaults")
return self._create_default_config()
if not path.exists():
raise FileNotFoundError(f"Configuration file not found: {path}")
# Load based on file extension
suffix = path.suffix.lower()
if suffix == '.json':
config_dict = self._load_json(path)
elif suffix in ['.yaml', '.yml']:
config_dict = self._load_yaml(path)
else:
raise ValueError(f"Unsupported configuration format: {suffix}")
# Parse into configuration objects
self._config = self._parse_config(config_dict)
self._config.validate()
self.logger.info(f"Loaded configuration from {path}")
return self._config
The configuration manager separates loading from parsing. The load method determines the file format and delegates to format-specific loaders. This separation makes it easy to add support for additional formats in the future.
Validation is a critical feature. The validate methods check that all configuration values are within acceptable ranges. Temperature must be between zero and two, top_p between zero and one, and so on. This catches configuration errors early rather than allowing them to cause cryptic failures during model inference.
def _load_json(self, path: Path) -> Dict[str, Any]:
"""Load configuration from JSON file."""
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def _load_yaml(self, path: Path) -> Dict[str, Any]:
"""Load configuration from YAML file."""
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def _parse_config(self, config_dict: Dict[str, Any]) -> ApplicationConfig:
"""Parse dictionary into ApplicationConfig object."""
# Parse LLM configuration
llm_dict = config_dict.get('llm', {})
llm_config = LLMModelConfig(
model_name=llm_dict.get('model_name', 'gpt-3.5-turbo'),
temperature=llm_dict.get('temperature', 0.7),
max_tokens=llm_dict.get('max_tokens'),
top_p=llm_dict.get('top_p', 1.0),
top_k=llm_dict.get('top_k'),
context_window=llm_dict.get('context_window', 4096),
system_message=llm_dict.get('system_message')
)
# Parse application configuration
app_config = ApplicationConfig(
llm=llm_config,
api_keys=config_dict.get('api_keys', {}),
logging_level=config_dict.get('logging_level', 'INFO'),
enable_streaming=config_dict.get('enable_streaming', True),
max_retries=config_dict.get('max_retries', 3),
timeout_seconds=config_dict.get('timeout_seconds', 30)
)
return app_config
def _create_default_config(self) -> ApplicationConfig:
"""Create configuration with default values."""
return ApplicationConfig(
llm=LLMModelConfig(model_name='gpt-3.5-turbo')
)
def save(self, config: ApplicationConfig, output_path: Union[str, Path]):
"""
Save configuration to file.
Args:
config: Configuration object to save
output_path: Destination file path
"""
path = Path(output_path)
config_dict = self._config_to_dict(config)
suffix = path.suffix.lower()
if suffix == '.json':
with open(path, 'w', encoding='utf-8') as f:
json.dump(config_dict, f, indent=2)
elif suffix in ['.yaml', '.yml']:
with open(path, 'w', encoding='utf-8') as f:
yaml.dump(config_dict, f, default_flow_style=False)
else:
raise ValueError(f"Unsupported output format: {suffix}")
self.logger.info(f"Saved configuration to {path}")
def _config_to_dict(self, config: ApplicationConfig) -> Dict[str, Any]:
"""Convert configuration object to dictionary."""
return {
'llm': asdict(config.llm),
'api_keys': config.api_keys,
'logging_level': config.logging_level,
'enable_streaming': config.enable_streaming,
'max_retries': config.max_retries,
'timeout_seconds': config.timeout_seconds
}
The save method enables round-tripping configuration. Applications can load configuration, modify it programmatically, and save the updated version. This is useful for tools that help users configure the application through a graphical interface.
A typical configuration file in YAML format might look like this:
llm:
model_name: "meta-llama/Llama-2-7b-chat-hf"
temperature: 0.8
max_tokens: 2048
top_p: 0.95
top_k: 50
context_window: 4096
system_message: "You are a helpful AI assistant."
api_keys:
openai: "sk-..."
anthropic: "sk-ant-..."
logging_level: "INFO"
enable_streaming: true
max_retries: 3
timeout_seconds: 60
The hierarchical structure makes configuration files readable and maintainable. Related settings are grouped together, and the structure mirrors the code's data classes, making it easy to understand how configuration maps to application behavior.
Tool Calling Framework Component
Modern LLMs can use external tools to extend their capabilities beyond text generation. A model might call a web search API to find current information, execute Python code to perform calculations, or query a database to retrieve specific data. The tool calling framework component provides infrastructure for defining tools, invoking them safely, and integrating results back into the conversation.
The framework uses a plugin architecture where each tool is a self-contained unit with a clear interface. Tools declare their name, description, and parameters using a schema that the LLM can understand. When the model decides to use a tool, the framework validates the parameters, executes the tool, and formats the result.
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable
import json
import inspect
from pydantic import BaseModel, Field, ValidationError
class ToolParameter(BaseModel):
"""Schema for a single tool parameter."""
name: str
type: str # 'string', 'number', 'boolean', 'array', 'object'
description: str
required: bool = True
enum: Optional[List[Any]] = None
class ToolSchema(BaseModel):
"""Schema describing a tool's interface."""
name: str
description: str
parameters: List[ToolParameter]
def to_openai_format(self) -> Dict[str, Any]:
"""Convert schema to OpenAI function calling format."""
properties = {}
required = []
for param in self.parameters:
properties[param.name] = {
"type": param.type,
"description": param.description
}
if param.enum:
properties[param.name]["enum"] = param.enum
if param.required:
required.append(param.name)
return {
"name": self.name,
"description": self.description,
"parameters": {
"type": "object",
"properties": properties,
"required": required
}
}
class BaseTool(ABC):
"""Abstract base class for tools that LLMs can invoke."""
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
@abstractmethod
def get_schema(self) -> ToolSchema:
"""Return schema describing this tool's interface."""
pass
@abstractmethod
def execute(self, **kwargs) -> Dict[str, Any]:
"""
Execute the tool with given parameters.
Args:
**kwargs: Tool-specific parameters
Returns:
Dictionary containing execution results
"""
pass
def validate_and_execute(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate parameters against schema and execute tool.
Args:
parameters: Dictionary of parameter values
Returns:
Execution results or error information
"""
schema = self.get_schema()
# Validate required parameters
for param in schema.parameters:
if param.required and param.name not in parameters:
return {
"error": f"Missing required parameter: {param.name}",
"success": False
}
try:
result = self.execute(**parameters)
return {"success": True, "result": result}
except Exception as e:
self.logger.error(f"Tool execution failed: {e}", exc_info=True)
return {
"error": str(e),
"success": False
}
The BaseTool class defines the contract that all tools must implement. The get_schema method returns metadata that describes what the tool does and what parameters it accepts. The execute method performs the actual work. The validate_and_execute method adds a safety layer that checks parameters before execution and handles errors gracefully.
The schema format is designed to be compatible with OpenAI's function calling format, but the to_openai_format method shows how it can be adapted to other formats. This flexibility allows the same tool definitions to work with different LLM providers.
Here is a concrete implementation of a web search tool using DuckDuckGo:
from duckduckgo_search import DDGS
from datetime import datetime
class WebSearchTool(BaseTool):
"""Tool for searching the web using DuckDuckGo."""
def __init__(self, max_results: int = 5):
"""
Initialize web search tool.
Args:
max_results: Maximum number of search results to return
"""
super().__init__()
self.max_results = max_results
self.ddgs = DDGS()
def get_schema(self) -> ToolSchema:
"""Return schema for web search tool."""
return ToolSchema(
name="web_search",
description="Search the web for current information using DuckDuckGo. Use this when you need up-to-date information or facts that you don't have in your training data.",
parameters=[
ToolParameter(
name="query",
type="string",
description="The search query to execute",
required=True
),
ToolParameter(
name="max_results",
type="number",
description="Maximum number of results to return (default: 5)",
required=False
)
]
)
def execute(self, query: str, max_results: Optional[int] = None) -> Dict[str, Any]:
"""
Execute web search.
Args:
query: Search query string
max_results: Maximum results to return
Returns:
Dictionary containing search results
"""
num_results = max_results if max_results is not None else self.max_results
self.logger.info(f"Executing web search: {query}")
try:
results = list(self.ddgs.text(query, max_results=num_results))
formatted_results = []
for idx, result in enumerate(results, 1):
formatted_results.append({
"position": idx,
"title": result.get("title", ""),
"url": result.get("href", ""),
"snippet": result.get("body", "")
})
return {
"query": query,
"timestamp": datetime.now().isoformat(),
"results": formatted_results,
"count": len(formatted_results)
}
except Exception as e:
self.logger.error(f"Search failed: {e}")
raise
The web search tool demonstrates several important patterns. It encapsulates the complexity of interacting with the DuckDuckGo API behind a simple interface. The execute method returns structured data that is easy for both the LLM and application code to process. Error handling ensures that search failures are logged and reported rather than crashing the application.
The tool framework also includes a registry that manages available tools and routes execution requests:
class ToolRegistry:
"""Registry for managing available tools."""
def __init__(self):
self.tools: Dict[str, BaseTool] = {}
self.logger = logging.getLogger(__name__)
def register(self, tool: BaseTool):
"""
Register a tool.
Args:
tool: Tool instance to register
"""
schema = tool.get_schema()
self.tools[schema.name] = tool
self.logger.info(f"Registered tool: {schema.name}")
def get_tool(self, name: str) -> Optional[BaseTool]:
"""
Retrieve a tool by name.
Args:
name: Tool name
Returns:
Tool instance or None if not found
"""
return self.tools.get(name)
def get_all_schemas(self) -> List[ToolSchema]:
"""Get schemas for all registered tools."""
return [tool.get_schema() for tool in self.tools.values()]
def execute_tool(self, name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute a tool by name.
Args:
name: Tool name
parameters: Tool parameters
Returns:
Execution results
"""
tool = self.get_tool(name)
if tool is None:
return {
"error": f"Tool not found: {name}",
"success": False
}
return tool.validate_and_execute(parameters)
The registry provides a central point for managing tools. Applications register all available tools at startup, and the registry handles routing execution requests to the appropriate tool. This centralization makes it easy to add, remove, or modify tools without changing the core application logic.
For paid services like SERP API, the framework supports the same interface with different implementations:
import requests
class SerpAPISearchTool(BaseTool):
"""Web search tool using SERP API (paid service)."""
def __init__(self, api_key: str, max_results: int = 5):
"""
Initialize SERP API search tool.
Args:
api_key: SERP API key
max_results: Maximum number of results
"""
super().__init__()
self.api_key = api_key
self.max_results = max_results
self.base_url = "https://serpapi.com/search"
def get_schema(self) -> ToolSchema:
"""Return schema for SERP API search."""
return ToolSchema(
name="web_search",
description="Search the web using SERP API for high-quality, structured results.",
parameters=[
ToolParameter(
name="query",
type="string",
description="The search query",
required=True
),
ToolParameter(
name="max_results",
type="number",
description="Maximum results to return",
required=False
)
]
)
def execute(self, query: str, max_results: Optional[int] = None) -> Dict[str, Any]:
"""Execute search using SERP API."""
num_results = max_results if max_results is not None else self.max_results
params = {
"q": query,
"api_key": self.api_key,
"num": num_results
}
response = requests.get(self.base_url, params=params)
response.raise_for_status()
data = response.json()
organic_results = data.get("organic_results", [])
formatted_results = []
for idx, result in enumerate(organic_results[:num_results], 1):
formatted_results.append({
"position": idx,
"title": result.get("title", ""),
"url": result.get("link", ""),
"snippet": result.get("snippet", "")
})
return {
"query": query,
"timestamp": datetime.now().isoformat(),
"results": formatted_results,
"count": len(formatted_results)
}
Both search tools implement the same schema, making them interchangeable. An application can switch from the free DuckDuckGo service to the paid SERP API by simply registering a different tool instance, without changing any other code.
Model Context Protocol Integration Component
The Model Context Protocol, developed by Anthropic, provides a standardized way for LLM applications to access external context sources. MCP servers expose resources like files, databases, or APIs through a uniform interface. MCP clients consume these resources and make them available to LLMs.
The MCP integration component provides both client and server implementations, allowing applications to act as either consumers or providers of context. This enables sophisticated architectures where multiple applications share context through MCP.
from typing import List, Dict, Any, Optional, Protocol
import asyncio
import json
class MCPResource(BaseModel):
"""Represents a resource available through MCP."""
uri: str
name: str
description: Optional[str] = None
mime_type: Optional[str] = None
class MCPTool(BaseModel):
"""Represents a tool available through MCP."""
name: str
description: str
input_schema: Dict[str, Any]
class MCPClient:
"""Client for connecting to MCP servers."""
def __init__(self, server_url: str):
"""
Initialize MCP client.
Args:
server_url: URL of the MCP server
"""
self.server_url = server_url
self.logger = logging.getLogger(__name__)
self._session = None
async def connect(self):
"""Establish connection to MCP server."""
self.logger.info(f"Connecting to MCP server: {self.server_url}")
# Implementation would establish actual connection
# This is a simplified version showing the interface
self._session = {"connected": True}
async def list_resources(self) -> List[MCPResource]:
"""
List all resources available from the server.
Returns:
List of MCPResource objects
"""
if not self._session:
raise RuntimeError("Not connected to MCP server")
# In real implementation, this would make an RPC call
# Here we show the interface structure
self.logger.info("Listing MCP resources")
return []
async def read_resource(self, uri: str) -> Dict[str, Any]:
"""
Read content of a specific resource.
Args:
uri: Resource URI
Returns:
Resource content and metadata
"""
if not self._session:
raise RuntimeError("Not connected to MCP server")
self.logger.info(f"Reading resource: {uri}")
# Real implementation would fetch the resource
return {
"uri": uri,
"content": "",
"mime_type": "text/plain"
}
async def list_tools(self) -> List[MCPTool]:
"""
List tools available from the server.
Returns:
List of MCPTool objects
"""
if not self._session:
raise RuntimeError("Not connected to MCP server")
self.logger.info("Listing MCP tools")
return []
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Invoke a tool on the server.
Args:
name: Tool name
arguments: Tool arguments
Returns:
Tool execution results
"""
if not self._session:
raise RuntimeError("Not connected to MCP server")
self.logger.info(f"Calling MCP tool: {name}")
# Real implementation would make RPC call
return {"result": None}
async def disconnect(self):
"""Close connection to MCP server."""
if self._session:
self.logger.info("Disconnecting from MCP server")
self._session = None
The MCP client provides asynchronous methods for all operations because network communication is inherently asynchronous. Using async/await allows the application to remain responsive while waiting for server responses.
The client separates resource access from tool invocation. Resources are passive data sources that the client reads. Tools are active operations that the server executes on behalf of the client. This distinction is important because it affects caching, permissions, and error handling.
The server implementation mirrors the client:
class MCPServer:
"""Server implementation for exposing resources and tools via MCP."""
def __init__(self, name: str, version: str):
"""
Initialize MCP server.
Args:
name: Server name
version: Server version
"""
self.name = name
self.version = version
self.resources: Dict[str, Callable] = {}
self.tools: Dict[str, Callable] = {}
self.logger = logging.getLogger(__name__)
def register_resource(
self,
uri: str,
name: str,
handler: Callable,
description: Optional[str] = None,
mime_type: Optional[str] = None
):
"""
Register a resource handler.
Args:
uri: Resource URI
name: Resource name
handler: Async function that returns resource content
description: Resource description
mime_type: MIME type of resource content
"""
self.resources[uri] = {
"name": name,
"handler": handler,
"description": description,
"mime_type": mime_type
}
self.logger.info(f"Registered resource: {uri}")
def register_tool(
self,
name: str,
handler: Callable,
description: str,
input_schema: Dict[str, Any]
):
"""
Register a tool handler.
Args:
name: Tool name
handler: Async function that executes the tool
description: Tool description
input_schema: JSON schema for tool inputs
"""
self.tools[name] = {
"handler": handler,
"description": description,
"input_schema": input_schema
}
self.logger.info(f"Registered tool: {name}")
async def handle_list_resources(self) -> List[MCPResource]:
"""Handle request to list resources."""
return [
MCPResource(
uri=uri,
name=info["name"],
description=info["description"],
mime_type=info["mime_type"]
)
for uri, info in self.resources.items()
]
async def handle_read_resource(self, uri: str) -> Dict[str, Any]:
"""Handle request to read a resource."""
if uri not in self.resources:
raise ValueError(f"Resource not found: {uri}")
resource_info = self.resources[uri]
content = await resource_info["handler"]()
return {
"uri": uri,
"content": content,
"mime_type": resource_info["mime_type"]
}
async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle request to call a tool."""
if name not in self.tools:
raise ValueError(f"Tool not found: {name}")
tool_info = self.tools[name]
result = await tool_info["handler"](**arguments)
return {"result": result}
The server uses a registration pattern where handlers are registered for specific URIs and tool names. This makes it easy to add new resources and tools dynamically. The handlers are async functions, allowing them to perform I/O operations efficiently.
An example of using the MCP server to expose a file system:
import aiofiles
from pathlib import Path
async def create_file_server(base_path: str) -> MCPServer:
"""
Create an MCP server that exposes a file system.
Args:
base_path: Root directory to expose
Returns:
Configured MCPServer instance
"""
server = MCPServer(name="file_server", version="1.0.0")
base = Path(base_path)
# Register a resource for each file
for file_path in base.rglob("*.txt"):
relative_path = file_path.relative_to(base)
uri = f"file:///{relative_path}"
async def read_file(path=file_path):
async with aiofiles.open(path, 'r') as f:
return await f.read()
server.register_resource(
uri=uri,
name=str(relative_path),
handler=read_file,
description=f"Text file: {relative_path}",
mime_type="text/plain"
)
# Register a search tool
async def search_files(query: str):
results = []
for file_path in base.rglob("*.txt"):
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
if query.lower() in content.lower():
results.append(str(file_path.relative_to(base)))
return results
server.register_tool(
name="search_files",
handler=search_files,
description="Search for files containing specific text",
input_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Text to search for"
}
},
"required": ["query"]
}
)
return server
This example shows how MCP enables powerful integrations. The file server exposes an entire directory tree as MCP resources, making all files accessible to any MCP client. The search tool provides a way to find files by content, demonstrating how MCP tools can perform operations beyond simple data retrieval.
Message Management and Chat History Component
Conversational LLM applications must manage the flow of messages between users and the model. Each interaction involves system messages that set behavior, user messages containing requests, and assistant messages with responses. Managing this conversation state correctly is essential for coherent multi-turn interactions.
The message management component provides structures for representing messages and utilities for managing conversation history. It handles concerns like context window limits, message formatting, and conversation persistence.
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
import json
from pathlib import Path
@dataclass
class ConversationMessage:
"""Represents a single message in a conversation."""
role: str
content: str
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert message to dictionary format."""
return {
"role": self.role,
"content": self.content,
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ConversationMessage':
"""Create message from dictionary."""
return cls(
role=data["role"],
content=data["content"],
timestamp=datetime.fromisoformat(data["timestamp"]),
metadata=data.get("metadata", {})
)
class ChatHistory:
"""Manages conversation history with context window awareness."""
def __init__(
self,
system_message: Optional[str] = None,
max_context_tokens: int = 4096,
token_counter: Optional[Callable[[str], int]] = None
):
"""
Initialize chat history manager.
Args:
system_message: Initial system message
max_context_tokens: Maximum tokens to keep in context
token_counter: Function to count tokens in text
"""
self.messages: List[ConversationMessage] = []
self.max_context_tokens = max_context_tokens
self.token_counter = token_counter or self._default_token_counter
self.logger = logging.getLogger(__name__)
if system_message:
self.add_system_message(system_message)
def _default_token_counter(self, text: str) -> int:
"""
Default token counter using simple word-based estimation.
Args:
text: Text to count tokens for
Returns:
Estimated token count
"""
# Rough estimation: 1 token per 4 characters
return len(text) // 4
def add_system_message(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""Add a system message."""
message = ConversationMessage(
role="system",
content=content,
metadata=metadata or {}
)
self.messages.append(message)
self.logger.debug("Added system message")
def add_user_message(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""Add a user message."""
message = ConversationMessage(
role="user",
content=content,
metadata=metadata or {}
)
self.messages.append(message)
self.logger.debug("Added user message")
def add_assistant_message(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""Add an assistant message."""
message = ConversationMessage(
role="assistant",
content=content,
metadata=metadata or {}
)
self.messages.append(message)
self.logger.debug("Added assistant message")
def get_messages_for_llm(self) -> List[Message]:
"""
Get messages formatted for LLM, respecting context window.
Returns:
List of Message objects within token limit
"""
# Always include system messages
system_messages = [msg for msg in self.messages if msg.role == "system"]
conversation_messages = [msg for msg in self.messages if msg.role != "system"]
# Count tokens for system messages
system_tokens = sum(
self.token_counter(msg.content) for msg in system_messages
)
# Calculate available tokens for conversation
available_tokens = self.max_context_tokens - system_tokens
# Add conversation messages from most recent, staying within limit
selected_messages = []
current_tokens = 0
for msg in reversed(conversation_messages):
msg_tokens = self.token_counter(msg.content)
if current_tokens + msg_tokens > available_tokens:
break
selected_messages.insert(0, msg)
current_tokens += msg_tokens
# Combine system and selected conversation messages
all_messages = system_messages + selected_messages
# Convert to Message objects
return [
Message(role=msg.role, content=msg.content)
for msg in all_messages
]
The chat history component implements intelligent context window management. It ensures that the total tokens sent to the LLM never exceed the model's context limit. System messages are always included because they define the assistant's behavior. Conversation messages are included starting from the most recent, working backward until the token limit is reached.
This approach ensures that the model always has the most relevant context. Recent messages are more important for maintaining conversation coherence than older messages. If the conversation becomes very long, older messages are automatically dropped.
The token counter is pluggable. The default implementation uses a simple character-based estimation, but applications can provide more accurate counters using tokenizers specific to their model:
def set_token_counter(self, counter: Callable[[str], int]):
"""
Set a custom token counter.
Args:
counter: Function that takes text and returns token count
"""
self.token_counter = counter
self.logger.info("Updated token counter")
def get_total_tokens(self) -> int:
"""Calculate total tokens in current history."""
return sum(
self.token_counter(msg.content) for msg in self.messages
)
def clear(self):
"""Clear all messages except system messages."""
self.messages = [msg for msg in self.messages if msg.role == "system"]
self.logger.info("Cleared conversation history")
def save(self, file_path: Union[str, Path]):
"""
Save conversation history to file.
Args:
file_path: Path to save conversation
"""
path = Path(file_path)
data = {
"max_context_tokens": self.max_context_tokens,
"messages": [msg.to_dict() for msg in self.messages]
}
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
self.logger.info(f"Saved conversation to {path}")
@classmethod
def load(cls, file_path: Union[str, Path]) -> 'ChatHistory':
"""
Load conversation history from file.
Args:
file_path: Path to conversation file
Returns:
ChatHistory instance with loaded conversation
"""
path = Path(file_path)
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
history = cls(max_context_tokens=data["max_context_tokens"])
history.messages = [
ConversationMessage.from_dict(msg_data)
for msg_data in data["messages"]
]
return history
The save and load methods enable conversation persistence. Applications can save conversations to disk and resume them later. This is essential for applications that need to maintain state across sessions or allow users to review past conversations.
The metadata field in ConversationMessage provides extensibility. Applications can attach arbitrary data to messages, such as user IDs, confidence scores, or references to external resources. This metadata is preserved when saving and loading conversations.
Circuit Breaker and Rate Limiting Component
Production LLM applications must handle failures gracefully and respect API rate limits. External LLM services can experience outages, network issues can cause timeouts, and exceeding rate limits can result in blocked requests. The circuit breaker and rate limiting component provides resilience mechanisms that prevent cascading failures and ensure compliance with service quotas.
A circuit breaker monitors requests to an external service and automatically stops sending requests when the service appears to be failing. This prevents wasting resources on requests that will likely fail and gives the service time to recover. After a cooldown period, the circuit breaker allows a test request through to check if the service has recovered.
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any, Optional
import time
from collections import deque
import asyncio
class CircuitState(Enum):
"""States of a circuit breaker."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""
Circuit breaker for protecting against cascading failures.
Monitors request failures and opens the circuit when failure
threshold is exceeded, preventing further requests until recovery.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
"""
Initialize circuit breaker.
Args:
failure_threshold: Number of failures before opening circuit
recovery_timeout: Seconds to wait before attempting recovery
expected_exception: Exception type to count as failure
"""
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time: Optional[datetime] = None
self.state = CircuitState.CLOSED
self.logger = logging.getLogger(__name__)
def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Execute function with circuit breaker protection.
Args:
func: Function to execute
*args: Positional arguments for function
**kwargs: Keyword arguments for function
Returns:
Function result
Raises:
Exception: If circuit is open or function fails
"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.logger.info("Circuit breaker entering half-open state")
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt recovery."""
if self.last_failure_time is None:
return False
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.recovery_timeout
def _on_success(self):
"""Handle successful request."""
if self.state == CircuitState.HALF_OPEN:
self.logger.info("Circuit breaker closing after successful test")
self.state = CircuitState.CLOSED
self.failure_count = 0
def _on_failure(self):
"""Handle failed request."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.logger.warning(
f"Circuit breaker opening after {self.failure_count} failures"
)
self.state = CircuitState.OPEN
def reset(self):
"""Manually reset circuit breaker to closed state."""
self.logger.info("Manually resetting circuit breaker")
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
The circuit breaker uses a state machine with three states. In the closed state, requests flow normally. When failures exceed the threshold, the circuit opens and blocks all requests. After the recovery timeout, the circuit enters half-open state and allows one test request. If the test succeeds, the circuit closes. If it fails, the circuit reopens.
This mechanism prevents overwhelming a failing service with requests while still allowing automatic recovery. The recovery timeout gives the service time to stabilize before testing whether it is healthy again.
Rate limiting complements the circuit breaker by preventing the application from exceeding service quotas:
class RateLimiter:
"""
Token bucket rate limiter.
Limits the rate of operations to prevent exceeding API quotas.
"""
def __init__(
self,
max_requests: int,
time_window: int,
burst_size: Optional[int] = None
):
"""
Initialize rate limiter.
Args:
max_requests: Maximum requests allowed in time window
time_window: Time window in seconds
burst_size: Maximum burst size (defaults to max_requests)
"""
self.max_requests = max_requests
self.time_window = time_window
self.burst_size = burst_size or max_requests
self.tokens = self.burst_size
self.last_update = time.time()
self.logger = logging.getLogger(__name__)
def acquire(self, tokens: int = 1) -> bool:
"""
Attempt to acquire tokens for a request.
Args:
tokens: Number of tokens to acquire
Returns:
True if tokens were acquired, False otherwise
"""
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_and_acquire(self, tokens: int = 1, timeout: Optional[float] = None):
"""
Wait until tokens are available and acquire them.
Args:
tokens: Number of tokens to acquire
timeout: Maximum time to wait in seconds
Raises:
TimeoutError: If timeout is exceeded
"""
start_time = time.time()
while not self.acquire(tokens):
if timeout and (time.time() - start_time) > timeout:
raise TimeoutError("Rate limiter timeout exceeded")
# Calculate wait time until next token
wait_time = self._time_until_next_token()
time.sleep(min(wait_time, 0.1))
def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_update
# Calculate tokens to add based on elapsed time
tokens_to_add = (elapsed / self.time_window) * self.max_requests
self.tokens = min(self.burst_size, self.tokens + tokens_to_add)
self.last_update = now
def _time_until_next_token(self) -> float:
"""Calculate seconds until next token is available."""
if self.tokens >= 1:
return 0.0
tokens_needed = 1 - self.tokens
return (tokens_needed / self.max_requests) * self.time_window
The rate limiter implements the token bucket algorithm. Tokens are added to the bucket at a steady rate determined by max_requests and time_window. Each request consumes one token. If no tokens are available, the request must wait.
The burst_size parameter allows short bursts of requests above the average rate. This is useful for handling legitimate traffic spikes without triggering rate limits. The bucket can hold more tokens than the steady-state rate, allowing accumulated tokens to be spent quickly.
The wait_and_acquire method blocks until tokens are available. This is convenient for applications that can afford to wait rather than failing immediately. The timeout parameter prevents indefinite blocking.
Combining circuit breaker and rate limiter creates robust request handling:
class ResilientLLMClient:
"""LLM client with circuit breaker and rate limiting."""
def __init__(
self,
llm: BaseLLM,
max_requests_per_minute: int = 60,
circuit_breaker_threshold: int = 5
):
"""
Initialize resilient LLM client.
Args:
llm: Underlying LLM implementation
max_requests_per_minute: Rate limit
circuit_breaker_threshold: Failures before opening circuit
"""
self.llm = llm
self.rate_limiter = RateLimiter(
max_requests=max_requests_per_minute,
time_window=60
)
self.circuit_breaker = CircuitBreaker(
failure_threshold=circuit_breaker_threshold
)
self.logger = logging.getLogger(__name__)
def complete(
self,
messages: List[Message],
**kwargs
) -> CompletionResponse:
"""
Generate completion with resilience mechanisms.
Args:
messages: Conversation messages
**kwargs: Additional parameters for LLM
Returns:
CompletionResponse
"""
# Wait for rate limit
self.rate_limiter.wait_and_acquire()
# Execute with circuit breaker
def _complete():
return self.llm.complete(messages, **kwargs)
return self.circuit_breaker.call(_complete)
The resilient client wraps an LLM implementation with both rate limiting and circuit breaking. Every request first waits for rate limit tokens, then executes through the circuit breaker. This ensures that the application respects rate limits and handles failures gracefully.
Additional Useful Components
Beyond the core components already discussed, several additional utilities enhance LLM application development.
Prompt Template Component
Prompt engineering is critical for LLM applications, but hard-coding prompts makes them difficult to modify and test. A prompt template component provides a structured way to define, parameterize, and manage prompts.
from string import Template
from typing import Dict, Any, Optional
class PromptTemplate:
"""Template for constructing prompts with variable substitution."""
def __init__(self, template: str, description: Optional[str] = None):
"""
Initialize prompt template.
Args:
template: Template string with ${variable} placeholders
description: Description of template purpose
"""
self.template = Template(template)
self.raw_template = template
self.description = description
self.logger = logging.getLogger(__name__)
def format(self, **kwargs) -> str:
"""
Format template with provided variables.
Args:
**kwargs: Variable values
Returns:
Formatted prompt string
"""
try:
return self.template.substitute(**kwargs)
except KeyError as e:
missing_var = str(e).strip("'")
raise ValueError(f"Missing required variable: {missing_var}")
def get_variables(self) -> List[str]:
"""Extract variable names from template."""
import re
pattern = r'\$\{([^}]+)\}'
return re.findall(pattern, self.raw_template)
Prompt templates use Python's string.Template for variable substitution. This provides a simple syntax while preventing code injection vulnerabilities that could occur with more powerful templating systems.
A template library manages collections of prompts:
class PromptLibrary:
"""Library for managing prompt templates."""
def __init__(self):
self.templates: Dict[str, PromptTemplate] = {}
self.logger = logging.getLogger(__name__)
def register(self, name: str, template: PromptTemplate):
"""Register a template."""
self.templates[name] = template
self.logger.info(f"Registered prompt template: {name}")
def get(self, name: str) -> PromptTemplate:
"""Retrieve a template by name."""
if name not in self.templates:
raise KeyError(f"Template not found: {name}")
return self.templates[name]
def format(self, name: str, **kwargs) -> str:
"""Format a template by name."""
template = self.get(name)
return template.format(**kwargs)
Using a prompt library, applications can define all prompts in one place and reference them by name. This separation of prompts from code makes it easy to experiment with different phrasings and maintain consistency across the application.
Embedding and Vector Store Component
Many LLM applications use embeddings for semantic search and retrieval-augmented generation. An embedding component provides a consistent interface for generating embeddings from different models.
import numpy as np
from typing import List, Union
class EmbeddingModel(ABC):
"""Abstract base for embedding models."""
@abstractmethod
def embed(self, texts: Union[str, List[str]]) -> np.ndarray:
"""
Generate embeddings for text(s).
Args:
texts: Single text or list of texts
Returns:
Numpy array of embeddings
"""
pass
@abstractmethod
def get_dimension(self) -> int:
"""Return embedding dimension."""
pass
A simple vector store provides in-memory storage and similarity search:
from typing import List, Tuple, Optional
import numpy as np
class VectorStore:
"""In-memory vector store with similarity search."""
def __init__(self, embedding_model: EmbeddingModel):
"""
Initialize vector store.
Args:
embedding_model: Model for generating embeddings
"""
self.embedding_model = embedding_model
self.vectors: List[np.ndarray] = []
self.documents: List[str] = []
self.metadata: List[Dict[str, Any]] = []
self.logger = logging.getLogger(__name__)
def add(self, text: str, metadata: Optional[Dict[str, Any]] = None):
"""
Add document to store.
Args:
text: Document text
metadata: Optional metadata
"""
embedding = self.embedding_model.embed(text)
self.vectors.append(embedding)
self.documents.append(text)
self.metadata.append(metadata or {})
def search(
self,
query: str,
top_k: int = 5
) -> List[Tuple[str, float, Dict[str, Any]]]:
"""
Search for similar documents.
Args:
query: Search query
top_k: Number of results to return
Returns:
List of (document, similarity_score, metadata) tuples
"""
if not self.vectors:
return []
query_embedding = self.embedding_model.embed(query)
# Calculate cosine similarities
similarities = []
for vec in self.vectors:
similarity = self._cosine_similarity(query_embedding, vec)
similarities.append(similarity)
# Get top-k indices
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = [
(self.documents[i], similarities[i], self.metadata[i])
for i in top_indices
]
return results
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculate cosine similarity between vectors."""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
This vector store provides basic semantic search capabilities. Applications can add documents with their embeddings and search for similar documents using cosine similarity. For production use, applications would typically use specialized vector databases like Pinecone, Weaviate, or Chroma, but this component provides a simple interface that works with any backend.
Logging and Observability Component
Understanding LLM application behavior requires comprehensive logging. A logging component standardizes log formatting and provides utilities for tracking LLM interactions.
import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any
class LLMLogger:
"""Specialized logger for LLM interactions."""
def __init__(self, name: str, log_file: Optional[str] = None):
"""
Initialize LLM logger.
Args:
name: Logger name
log_file: Optional file for structured logs
"""
self.logger = logging.getLogger(name)
self.log_file = log_file
def log_completion(
self,
messages: List[Message],
response: CompletionResponse,
duration: float,
metadata: Optional[Dict[str, Any]] = None
):
"""
Log an LLM completion request and response.
Args:
messages: Input messages
response: LLM response
duration: Request duration in seconds
metadata: Additional metadata
"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"type": "completion",
"model": response.model,
"duration_seconds": duration,
"token_usage": response.usage,
"message_count": len(messages),
"finish_reason": response.finish_reason,
"metadata": metadata or {}
}
self.logger.info(f"Completion: {response.model} ({duration:.2f}s)")
if self.log_file:
self._write_structured_log(log_entry)
def _write_structured_log(self, entry: Dict[str, Any]):
"""Write structured log entry to file."""
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry) + '\n')
The LLM logger creates structured logs that can be analyzed to understand usage patterns, costs, and performance. Each completion is logged with timing information, token usage, and custom metadata.
Complete Running Example
The following complete example demonstrates how all components integrate to create a functional LLM application. This application provides a conversational interface with web search capabilities, configuration management, and resilience mechanisms.
import logging
import sys
import time
from pathlib import Path
from typing import Optional, List, Dict, Any
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('llm_app.log')
]
)
class LLMApplication:
"""
Complete LLM application integrating all components.
This application provides a conversational interface with:
- Automatic GPU detection and optimization
- Configurable LLM backend (local or remote)
- Web search tool integration
- Chat history management
- Rate limiting and circuit breaking
- Comprehensive logging
"""
def __init__(self, config_path: str):
"""
Initialize application with configuration file.
Args:
config_path: Path to configuration file (JSON or YAML)
"""
self.logger = logging.getLogger(__name__)
self.logger.info("Initializing LLM Application")
# Load configuration
self.config_manager = ConfigurationManager(config_path)
self.config = self.config_manager.load()
# Setup logging level from config
logging.getLogger().setLevel(self.config.logging_level)
# Initialize GPU detector
self.gpu_detector = GPUDetector()
self.accelerator_info = self.gpu_detector.detect()
self.logger.info(
f"Using accelerator: {self.accelerator_info.accelerator_type.value}"
)
# Initialize LLM based on configuration
self.llm = self._initialize_llm()
# Initialize tool registry and register tools
self.tool_registry = ToolRegistry()
self._register_tools()
# Initialize chat history
self.chat_history = ChatHistory(
system_message=self.config.llm.system_message,
max_context_tokens=self.config.llm.context_window
)
# Initialize resilient client
self.resilient_client = ResilientLLMClient(
llm=self.llm,
max_requests_per_minute=60,
circuit_breaker_threshold=5
)
# Initialize specialized logger
self.llm_logger = LLMLogger(
name="llm_interactions",
log_file="llm_interactions.jsonl"
)
self.logger.info("Application initialization complete")
def _initialize_llm(self) -> BaseLLM:
"""
Initialize LLM based on configuration.
Returns:
Configured LLM instance
"""
model_name = self.config.llm.model_name
# Determine if this is a local or remote model
if model_name.startswith("gpt-") or model_name.startswith("claude-"):
# Remote API model
api_key = self.config.api_keys.get("openai")
if not api_key:
raise ValueError("OpenAI API key required for GPT models")
self.logger.info(f"Initializing OpenAI LLM: {model_name}")
return OpenAILLM(model_name=model_name, api_key=api_key)
else:
# Local transformer model
device = self.gpu_detector.get_device_string()
self.logger.info(f"Initializing local LLM: {model_name} on {device}")
return LocalTransformerLLM(
model_name=model_name,
device=device,
trust_remote_code=True
)
def _register_tools(self):
"""Register available tools."""
# Register web search tool
search_tool = WebSearchTool(max_results=5)
self.tool_registry.register(search_tool)
self.logger.info("Registered web search tool")
def process_user_input(self, user_input: str) -> str:
"""
Process user input and generate response.
Args:
user_input: User's message
Returns:
Assistant's response
"""
self.logger.info(f"Processing user input: {user_input[:50]}...")
# Add user message to history
self.chat_history.add_user_message(user_input)
# Check if tools should be used
# In a real implementation, this would use the LLM's tool calling capability
# For this example, we use a simple keyword check
response_text = ""
if "search" in user_input.lower() or "find" in user_input.lower():
# Extract search query (simplified)
search_query = user_input
# Execute search tool
search_result = self.tool_registry.execute_tool(
"web_search",
{"query": search_query, "max_results": 3}
)
if search_result["success"]:
# Add search results to context
results_text = self._format_search_results(search_result["result"])
# Create augmented prompt
augmented_input = f"""User asked: {user_input}
Here are relevant search results: {results_text}
Please provide a helpful response based on this information."""
self.chat_history.messages[-1].content = augmented_input
# Get messages for LLM
messages = self.chat_history.get_messages_for_llm()
# Generate completion
start_time = time.time()
try:
response = self.resilient_client.complete(
messages=messages,
temperature=self.config.llm.temperature,
max_tokens=self.config.llm.max_tokens,
top_p=self.config.llm.top_p,
top_k=self.config.llm.top_k
)
duration = time.time() - start_time
response_text = response.content
# Log the interaction
self.llm_logger.log_completion(
messages=messages,
response=response,
duration=duration
)
# Add assistant response to history
self.chat_history.add_assistant_message(response_text)
self.logger.info(f"Generated response in {duration:.2f}s")
except Exception as e:
self.logger.error(f"Error generating response: {e}", exc_info=True)
response_text = "I apologize, but I encountered an error processing your request. Please try again."
return response_text
def _format_search_results(self, search_data: Dict[str, Any]) -> str:
"""
Format search results for inclusion in prompt.
Args:
search_data: Search results from tool
Returns:
Formatted text
"""
results = search_data.get("results", [])
formatted = []
for result in results:
formatted.append(
f"[{result['position']}] {result['title']}\n"
f"URL: {result['url']}\n"
f"{result['snippet']}\n"
)
return "\n".join(formatted)
def save_conversation(self, file_path: str):
"""
Save current conversation to file.
Args:
file_path: Destination file path
"""
self.chat_history.save(file_path)
self.logger.info(f"Saved conversation to {file_path}")
def load_conversation(self, file_path: str):
"""
Load conversation from file.
Args:
file_path: Source file path
"""
self.chat_history = ChatHistory.load(file_path)
self.logger.info(f"Loaded conversation from {file_path}")
def run_interactive(self):
"""Run interactive conversation loop."""
print("LLM Application Started")
print(f"Using model: {self.config.llm.model_name}")
print(f"Accelerator: {self.accelerator_info.accelerator_type.value}")
print("Type 'quit' to exit, 'save' to save conversation, 'clear' to clear history\n")
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() == 'quit':
print("Goodbye!")
break
if user_input.lower() == 'save':
filename = f"conversation_{int(time.time())}.json"
self.save_conversation(filename)
print(f"Conversation saved to {filename}")
continue
if user_input.lower() == 'clear':
self.chat_history.clear()
print("Conversation history cleared")
continue
response = self.process_user_input(user_input)
print(f"\nAssistant: {response}\n")
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
self.logger.error(f"Error in interactive loop: {e}", exc_info=True)
print(f"Error: {e}")
def create_default_config(output_path: str):
"""
Create a default configuration file.
Args:
output_path: Path for configuration file
"""
config = ApplicationConfig(
llm=LLMModelConfig(
model_name="gpt-3.5-turbo",
temperature=0.7,
max_tokens=2048,
top_p=0.95,
top_k=50,
context_window=4096,
system_message="You are a helpful AI assistant with access to web search. Provide accurate, helpful responses."
),
api_keys={},
logging_level="INFO",
enable_streaming=True,
max_retries=3,
timeout_seconds=60
)
manager = ConfigurationManager()
manager.save(config, output_path)
print(f"Created default configuration at {output_path}")
def main():
"""Main entry point for the application."""
import argparse
parser = argparse.ArgumentParser(description="LLM Application")
parser.add_argument(
"--config",
type=str,
default="config.yaml",
help="Path to configuration file"
)
parser.add_argument(
"--create-config",
action="store_true",
help="Create default configuration file"
)
args = parser.parse_args()
if args.create_config:
create_default_config(args.config)
return
# Check if config exists
if not Path(args.config).exists():
print(f"Configuration file not found: {args.config}")
print("Create one with: python app.py --create-config")
return
# Initialize and run application
app = LLMApplication(config_path=args.config)
app.run_interactive()
if __name__ == "__main__":
main()
This complete example demonstrates how all components work together. The application initializes by loading configuration, detecting GPU hardware, setting up the LLM, registering tools, and creating the chat history manager. The process_user_input method orchestrates the entire flow: adding messages to history, potentially invoking tools, generating completions through the resilient client, and logging interactions.
The interactive loop provides a simple command-line interface where users can have conversations, save and load conversation history, and clear the context. The application handles errors gracefully and provides informative logging throughout.
To use this application, users first create a configuration file:
python app.py --create-config
Then edit the configuration to add API keys and adjust parameters. Finally, run the application:
python app.py --config config.yaml
The application demonstrates production-ready patterns including proper error handling, comprehensive logging, configuration management, and graceful degradation when services are unavailable.
Conclusion
This article has presented a comprehensive Python library for LLM application development. Each component addresses a specific recurring challenge: GPU detection eliminates platform-specific code, the abstract LLM interface enables model swapping, configuration management externalizes settings, tool calling extends LLM capabilities, MCP integration enables context sharing, message management handles conversation state, and circuit breakers with rate limiting provide resilience.
The components follow clean architecture principles with clear separation of concerns. Each component has a well-defined interface and can be used independently or in combination with others. The running example demonstrates how these components integrate to create a complete, production-ready application.
By providing these reusable components, the library eliminates the need for developers to repeatedly solve the same problems. Instead of spending time on infrastructure, developers can focus on the unique aspects of their applications: domain-specific logic, user experience, and business value.
The library is designed to be extensible. New LLM providers can be added by implementing the BaseLLM interface. New tools can be registered with the tool registry. Additional resilience mechanisms can wrap the existing components. This extensibility ensures that the library can evolve with the rapidly changing LLM ecosystem.
Future enhancements could include streaming response support in more components, integration with vector databases for retrieval-augmented generation, support for multi-modal models, and enhanced observability with metrics and tracing. The foundation provided by these components makes such enhancements straightforward to implement while maintaining backward compatibility.
The goal of this library is to accelerate LLM application development by providing robust, well-tested components that handle common requirements. By building on this foundation, developers can create sophisticated LLM applications more quickly and with greater confidence in their reliability and maintainability.