INTRODUCTION: THE COMPLEXITY CHALLENGE IN LLM APPLICATION DEVELOPMENT
Building applications that leverage Large Language Models presents unique architectural challenges that traditional software design patterns were not originally conceived to address. The landscape of LLM development is characterized by rapid evolution, diverse hardware requirements, multiple model providers, and varying deployment scenarios. A developer today might start with a local Llama model running on an NVIDIA GPU, then need to switch to an AMD ROCm setup, integrate cloud-based GPT models, add multimodal capabilities with vision models, and deploy across different environments from edge devices to cloud clusters.
The core challenge lies in managing this complexity while maintaining code that is maintainable, testable, and flexible. Without proper architectural patterns, LLM applications quickly become tangled masses of conditional logic, hard-coded dependencies, and brittle configurations. A change in one component cascades through the system, breaking seemingly unrelated functionality. This article explores design patterns specifically tailored for LLM application development, providing concrete solutions to common architectural challenges.
We will examine how classical design patterns like Strategy, Factory, Adapter, and Observer can be adapted and combined to create robust LLM applications. We will also introduce patterns specific to the LLM domain, such as the Model Registry pattern for managing multiple model backends, the Document Pipeline pattern for RAG systems, and the Context Window Manager pattern for handling token limitations. Each pattern will be illustrated with practical code examples, culminating in a complete running example that demonstrates how these patterns work together in a production-ready system.
PART ONE: FOUNDATIONAL ARCHITECTURAL PATTERNS
The Strategy Pattern for GPU Backend Abstraction
One of the first challenges in building portable LLM applications is supporting different GPU architectures. NVIDIA CUDA dominates the landscape, but AMD ROCm, Intel extensions, and Apple Metal Performance Shaders each have their own ecosystems. Hard-coding GPU-specific logic throughout an application creates maintenance nightmares and prevents portability.
The Strategy pattern provides an elegant solution by encapsulating GPU-specific operations behind a common interface. Each GPU backend becomes a concrete strategy implementing the same interface, allowing the application to switch between backends without changing client code.
Consider the fundamental operations needed for GPU computation: device detection, memory allocation, tensor operations, and model offloading. Here is how we define a common interface:
class GPUBackend:
"""Abstract interface for GPU operations across different hardware."""
def detect_devices(self):
"""Return list of available GPU devices with their properties."""
raise NotImplementedError
def allocate_memory(self, size_bytes, device_id=0):
"""Allocate GPU memory and return a handle."""
raise NotImplementedError
def transfer_to_device(self, data, device_id=0):
"""Transfer data from CPU to GPU memory."""
raise NotImplementedError
def get_device_properties(self, device_id=0):
"""Return dictionary of device capabilities and properties."""
raise NotImplementedError
def optimize_for_inference(self, model, device_id=0):
"""Apply backend-specific optimizations for inference."""
raise NotImplementedError
Now we implement concrete strategies for each GPU architecture:
class CUDABackend(GPUBackend):
"""NVIDIA CUDA implementation of GPU operations."""
def __init__(self):
import torch
self.torch = torch
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available on this system")
def detect_devices(self):
"""Detect NVIDIA GPUs using CUDA."""
devices = []
for i in range(self.torch.cuda.device_count()):
props = self.torch.cuda.get_device_properties(i)
devices.append({
'id': i,
'name': props.name,
'compute_capability': f"{props.major}.{props.minor}",
'total_memory': props.total_memory,
'backend': 'cuda'
})
return devices
def allocate_memory(self, size_bytes, device_id=0):
"""Allocate CUDA memory."""
with self.torch.cuda.device(device_id):
return self.torch.cuda.memory.malloc(size_bytes)
def transfer_to_device(self, data, device_id=0):
"""Transfer tensor to CUDA device."""
return data.to(f'cuda:{device_id}')
def get_device_properties(self, device_id=0):
"""Get CUDA device properties."""
props = self.torch.cuda.get_device_properties(device_id)
return {
'name': props.name,
'compute_capability': f"{props.major}.{props.minor}",
'total_memory': props.total_memory,
'multi_processor_count': props.multi_processor_count
}
def optimize_for_inference(self, model, device_id=0):
"""Apply CUDA-specific optimizations."""
model = model.to(f'cuda:{device_id}')
model.eval()
# Enable CUDA-specific optimizations
self.torch.backends.cudnn.benchmark = True
return model
class ROCmBackend(GPUBackend):
"""AMD ROCm implementation of GPU operations."""
def __init__(self):
import torch
self.torch = torch
if not torch.cuda.is_available():
raise RuntimeError("ROCm is not available on this system")
def detect_devices(self):
"""Detect AMD GPUs using ROCm."""
devices = []
for i in range(self.torch.cuda.device_count()):
props = self.torch.cuda.get_device_properties(i)
devices.append({
'id': i,
'name': props.name,
'gcn_arch': props.gcnArchName if hasattr(props, 'gcnArchName') else 'unknown',
'total_memory': props.total_memory,
'backend': 'rocm'
})
return devices
def transfer_to_device(self, data, device_id=0):
"""Transfer tensor to ROCm device."""
return data.to(f'cuda:{device_id}')
def optimize_for_inference(self, model, device_id=0):
"""Apply ROCm-specific optimizations."""
model = model.to(f'cuda:{device_id}')
model.eval()
return model
class MPSBackend(GPUBackend):
"""Apple Metal Performance Shaders implementation."""
def __init__(self):
import torch
self.torch = torch
if not torch.backends.mps.is_available():
raise RuntimeError("MPS is not available on this system")
def detect_devices(self):
"""Detect Apple Silicon GPU."""
return [{
'id': 0,
'name': 'Apple Silicon GPU',
'total_memory': 0, # MPS shares unified memory
'backend': 'mps'
}]
def transfer_to_device(self, data, device_id=0):
"""Transfer tensor to MPS device."""
return data.to('mps')
def optimize_for_inference(self, model, device_id=0):
"""Apply MPS-specific optimizations."""
model = model.to('mps')
model.eval()
return model
The beauty of this pattern is that client code remains completely agnostic to the underlying GPU architecture. A model loader can work with any backend:
class ModelLoader:
"""Loads and manages LLM models with GPU backend abstraction."""
def __init__(self, gpu_backend):
"""Initialize with a specific GPU backend strategy."""
self.backend = gpu_backend
self.devices = self.backend.detect_devices()
def load_model(self, model_path, device_id=0):
"""Load model and optimize for the configured backend."""
# Load model using transformers or other framework
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(model_path)
# Apply backend-specific optimizations
model = self.backend.optimize_for_inference(model, device_id)
return model
This separation of concerns means that adding support for a new GPU architecture requires only implementing a new strategy class, without touching any existing code. The application can even switch backends at runtime based on available hardware.
The Factory Pattern for Model Provider Abstraction
LLM applications often need to work with models from different providers: local models loaded through HuggingFace Transformers, remote APIs like OpenAI or Anthropic, quantized models through llama.cpp, or optimized runtimes like ONNX. Each provider has different initialization procedures, parameter formats, and inference interfaces.
The Factory pattern solves this by centralizing object creation logic. Instead of scattering model instantiation code throughout the application, we define factory classes that know how to create and configure models from specific providers.
class ModelFactory:
"""Abstract factory for creating LLM model instances."""
def create_model(self, config):
"""Create and return a model instance based on configuration."""
raise NotImplementedError
def validate_config(self, config):
"""Validate that configuration contains required parameters."""
raise NotImplementedError
class HuggingFaceModelFactory(ModelFactory):
"""Factory for creating HuggingFace Transformers models."""
def validate_config(self, config):
"""Ensure HuggingFace-specific config is valid."""
required = ['model_name', 'model_type']
for field in required:
if field not in config:
raise ValueError(f"Missing required config field: {field}")
def create_model(self, config):
"""Create a HuggingFace model instance."""
self.validate_config(config)
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = config['model_name']
device = config.get('device', 'cpu')
load_in_8bit = config.get('quantization', {}).get('load_in_8bit', False)
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Load model with optional quantization
model = AutoModelForCausalLM.from_pretrained(
model_name,
load_in_8bit=load_in_8bit,
device_map=device if device != 'cpu' else None
)
return {
'model': model,
'tokenizer': tokenizer,
'provider': 'huggingface',
'config': config
}
class OpenAIModelFactory(ModelFactory):
"""Factory for creating OpenAI API model clients."""
def validate_config(self, config):
"""Ensure OpenAI-specific config is valid."""
if 'api_key' not in config:
raise ValueError("OpenAI models require 'api_key' in config")
if 'model_name' not in config:
raise ValueError("Must specify 'model_name' for OpenAI")
def create_model(self, config):
"""Create an OpenAI API client wrapper."""
self.validate_config(config)
import openai
client = openai.OpenAI(api_key=config['api_key'])
return {
'client': client,
'model_name': config['model_name'],
'provider': 'openai',
'config': config
}
class LlamaCppModelFactory(ModelFactory):
"""Factory for creating llama.cpp quantized models."""
def validate_config(self, config):
"""Ensure llama.cpp-specific config is valid."""
if 'model_path' not in config:
raise ValueError("llama.cpp models require 'model_path'")
def create_model(self, config):
"""Create a llama.cpp model instance."""
self.validate_config(config)
from llama_cpp import Llama
model = Llama(
model_path=config['model_path'],
n_ctx=config.get('context_length', 2048),
n_gpu_layers=config.get('gpu_layers', 0),
n_threads=config.get('threads', 4)
)
return {
'model': model,
'provider': 'llama_cpp',
'config': config
}
Now we create a registry that maps provider names to their factories:
class ModelRegistry:
"""Registry for managing model factories."""
def __init__(self):
self.factories = {}
def register_factory(self, provider_name, factory):
"""Register a factory for a specific provider."""
self.factories[provider_name] = factory
def create_model(self, provider_name, config):
"""Create a model using the appropriate factory."""
if provider_name not in self.factories:
raise ValueError(f"Unknown provider: {provider_name}")
factory = self.factories[provider_name]
return factory.create_model(config)
Client code can now create models from any provider using a unified interface:
# Initialize registry and register factories
registry = ModelRegistry()
registry.register_factory('huggingface', HuggingFaceModelFactory())
registry.register_factory('openai', OpenAIModelFactory())
registry.register_factory('llama_cpp', LlamaCppModelFactory())
# Create models from different providers using the same interface
hf_config = {
'model_name': 'meta-llama/Llama-2-7b-hf',
'model_type': 'causal_lm',
'device': 'cuda:0'
}
local_model = registry.create_model('huggingface', hf_config)
openai_config = {
'api_key': 'sk-...',
'model_name': 'gpt-4'
}
remote_model = registry.create_model('openai', openai_config)
This pattern makes it trivial to add new model providers. Simply implement a new factory class and register it with the registry. The rest of the application remains unchanged.
The Adapter Pattern for Unified Model Interfaces
Different model providers expose different interfaces for inference. HuggingFace models use the generate method, OpenAI uses chat completions, llama.cpp has its own calling convention. To build higher-level components that work with any model, we need a unified interface.
The Adapter pattern wraps each provider-specific interface in a common abstraction. This allows the rest of the application to treat all models uniformly, regardless of their underlying implementation.
class ModelAdapter:
"""Unified interface for LLM inference across providers."""
def generate(self, prompt, **kwargs):
"""Generate text from a prompt with optional parameters."""
raise NotImplementedError
def generate_stream(self, prompt, **kwargs):
"""Generate text with streaming, yielding tokens as they arrive."""
raise NotImplementedError
def get_embedding(self, text):
"""Get embedding vector for input text."""
raise NotImplementedError
def count_tokens(self, text):
"""Count tokens in text using model's tokenizer."""
raise NotImplementedError
class HuggingFaceAdapter(ModelAdapter):
"""Adapter for HuggingFace Transformers models."""
def __init__(self, model_dict):
"""Initialize with model and tokenizer from factory."""
self.model = model_dict['model']
self.tokenizer = model_dict['tokenizer']
self.config = model_dict['config']
def generate(self, prompt, **kwargs):
"""Generate text using HuggingFace generate method."""
# Tokenize input
inputs = self.tokenizer(prompt, return_tensors='pt')
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
# Extract generation parameters
max_length = kwargs.get('max_tokens', 100)
temperature = kwargs.get('temperature', 1.0)
top_p = kwargs.get('top_p', 1.0)
# Generate
outputs = self.model.generate(
**inputs,
max_new_tokens=max_length,
temperature=temperature,
top_p=top_p,
do_sample=temperature > 0
)
# Decode and return
generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return generated_text
def generate_stream(self, prompt, **kwargs):
"""Generate text with streaming using TextIteratorStreamer."""
from transformers import TextIteratorStreamer
from threading import Thread
inputs = self.tokenizer(prompt, return_tensors='pt')
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
streamer = TextIteratorStreamer(self.tokenizer, skip_special_tokens=True)
generation_kwargs = {
**inputs,
'max_new_tokens': kwargs.get('max_tokens', 100),
'temperature': kwargs.get('temperature', 1.0),
'streamer': streamer
}
# Run generation in separate thread
thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
# Yield tokens as they arrive
for text in streamer:
yield text
def count_tokens(self, text):
"""Count tokens using HuggingFace tokenizer."""
tokens = self.tokenizer.encode(text)
return len(tokens)
class OpenAIAdapter(ModelAdapter):
"""Adapter for OpenAI API models."""
def __init__(self, model_dict):
"""Initialize with OpenAI client from factory."""
self.client = model_dict['client']
self.model_name = model_dict['model_name']
self.config = model_dict['config']
def generate(self, prompt, **kwargs):
"""Generate text using OpenAI chat completions."""
response = self.client.chat.completions.create(
model=self.model_name,
messages=[{'role': 'user', 'content': prompt}],
max_tokens=kwargs.get('max_tokens', 100),
temperature=kwargs.get('temperature', 1.0),
top_p=kwargs.get('top_p', 1.0)
)
return response.choices[0].message.content
def generate_stream(self, prompt, **kwargs):
"""Generate text with streaming using OpenAI streaming API."""
stream = self.client.chat.completions.create(
model=self.model_name,
messages=[{'role': 'user', 'content': prompt}],
max_tokens=kwargs.get('max_tokens', 100),
temperature=kwargs.get('temperature', 1.0),
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
def count_tokens(self, text):
"""Count tokens using tiktoken."""
import tiktoken
encoding = tiktoken.encoding_for_model(self.model_name)
return len(encoding.encode(text))
Now we can write code that works with any model through the unified adapter interface:
def chat_with_model(adapter, user_message):
"""Chat with any model through unified adapter interface."""
print("Generating response...")
response = adapter.generate(
user_message,
max_tokens=200,
temperature=0.7
)
print(f"Response: {response}")
token_count = adapter.count_tokens(response)
print(f"Response length: {token_count} tokens")
This function works identically whether the adapter wraps a local HuggingFace model, a remote OpenAI model, or any other provider. The adapter pattern provides the abstraction layer that makes this possible.
PART TWO: CONFIGURATION AND PARAMETER MANAGEMENT
The Builder Pattern for Complex Model Configuration
LLM models have numerous configuration parameters: temperature, top-p, top-k, repetition penalty, context length, stopping sequences, and many more. Different use cases require different parameter combinations. Hard-coding these parameters or passing long argument lists quickly becomes unwieldy.
The Builder pattern provides a fluent interface for constructing complex configurations step by step. It separates the construction of a configuration object from its representation, allowing the same construction process to create different configurations.
class ModelConfigBuilder:
"""Builder for constructing LLM model configurations."""
def __init__(self):
"""Initialize with default configuration."""
self.config = {
'generation': {},
'model': {},
'runtime': {}
}
def set_model_path(self, path):
"""Set the model path or identifier."""
self.config['model']['path'] = path
return self
def set_provider(self, provider):
"""Set the model provider (huggingface, openai, etc.)."""
self.config['model']['provider'] = provider
return self
def set_device(self, device):
"""Set the device (cpu, cuda, mps, etc.)."""
self.config['runtime']['device'] = device
return self
def set_temperature(self, temperature):
"""Set generation temperature (0.0 to 2.0)."""
if not 0.0 <= temperature <= 2.0:
raise ValueError("Temperature must be between 0.0 and 2.0")
self.config['generation']['temperature'] = temperature
return self
def set_max_tokens(self, max_tokens):
"""Set maximum tokens to generate."""
if max_tokens <= 0:
raise ValueError("max_tokens must be positive")
self.config['generation']['max_tokens'] = max_tokens
return self
def set_top_p(self, top_p):
"""Set nucleus sampling parameter."""
if not 0.0 <= top_p <= 1.0:
raise ValueError("top_p must be between 0.0 and 1.0")
self.config['generation']['top_p'] = top_p
return self
def set_top_k(self, top_k):
"""Set top-k sampling parameter."""
if top_k <= 0:
raise ValueError("top_k must be positive")
self.config['generation']['top_k'] = top_k
return self
def set_repetition_penalty(self, penalty):
"""Set repetition penalty (1.0 = no penalty)."""
if penalty < 1.0:
raise ValueError("Repetition penalty must be >= 1.0")
self.config['generation']['repetition_penalty'] = penalty
return self
def set_stop_sequences(self, sequences):
"""Set list of sequences that stop generation."""
self.config['generation']['stop_sequences'] = sequences
return self
def set_context_length(self, length):
"""Set maximum context length in tokens."""
if length <= 0:
raise ValueError("Context length must be positive")
self.config['runtime']['context_length'] = length
return self
def enable_quantization(self, bits=8):
"""Enable model quantization (4 or 8 bit)."""
if bits not in [4, 8]:
raise ValueError("Quantization must be 4 or 8 bit")
self.config['runtime']['quantization'] = {'bits': bits, 'enabled': True}
return self
def enable_flash_attention(self):
"""Enable Flash Attention optimization."""
self.config['runtime']['flash_attention'] = True
return self
def build(self):
"""Build and return the final configuration."""
return self.config.copy()
The builder provides a clean, readable way to construct configurations:
# Build a configuration for creative writing
creative_config = (ModelConfigBuilder()
.set_model_path('meta-llama/Llama-2-7b-hf')
.set_provider('huggingface')
.set_device('cuda:0')
.set_temperature(0.9)
.set_top_p(0.95)
.set_max_tokens(500)
.set_repetition_penalty(1.1)
.enable_quantization(8)
.build())
# Build a configuration for precise factual responses
factual_config = (ModelConfigBuilder()
.set_model_path('meta-llama/Llama-2-7b-hf')
.set_provider('huggingface')
.set_device('cuda:0')
.set_temperature(0.1)
.set_top_p(0.9)
.set_max_tokens(200)
.set_stop_sequences(['\n\n', 'Question:'])
.build())
The builder pattern makes configuration construction self-documenting and prevents errors through validation at each step.
Configuration File Management with the Prototype Pattern
For production applications, configurations should be stored in files rather than hard-coded. JSON, YAML, and TOML are popular formats. The Prototype pattern allows us to create new configuration objects by cloning and modifying existing templates.
import json
import yaml
from copy import deepcopy
class ConfigurationManager:
"""Manages loading, saving, and cloning configurations."""
def __init__(self):
"""Initialize with empty configuration store."""
self.configs = {}
def load_from_json(self, filepath):
"""Load configuration from JSON file."""
with open(filepath, 'r') as f:
config = json.load(f)
return config
def load_from_yaml(self, filepath):
"""Load configuration from YAML file."""
with open(filepath, 'r') as f:
config = yaml.safe_load(f)
return config
def save_to_json(self, config, filepath):
"""Save configuration to JSON file."""
with open(filepath, 'w') as f:
json.dump(config, f, indent=2)
def save_to_yaml(self, config, filepath):
"""Save configuration to YAML file."""
with open(filepath, 'w') as f:
yaml.dump(config, f, default_flow_style=False)
def register_template(self, name, config):
"""Register a configuration as a reusable template."""
self.configs[name] = deepcopy(config)
def clone_template(self, name):
"""Create a new configuration by cloning a template."""
if name not in self.configs:
raise ValueError(f"Template '{name}' not found")
return deepcopy(self.configs[name])
def merge_configs(self, base_config, override_config):
"""Merge two configurations, with override taking precedence."""
merged = deepcopy(base_config)
def deep_merge(base, override):
for key, value in override.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
deep_merge(base[key], value)
else:
base[key] = value
deep_merge(merged, override_config)
return merged
Here is how we use the configuration manager in practice:
config_manager = ConfigurationManager()
# Load base configuration from file
base_config = config_manager.load_from_yaml('configs/base_model.yaml')
# Register it as a template
config_manager.register_template('base', base_config)
# Clone and modify for specific use case
chatbot_config = config_manager.clone_template('base')
chatbot_config['generation']['temperature'] = 0.7
chatbot_config['generation']['max_tokens'] = 300
# Save the specialized configuration
config_manager.save_to_yaml(chatbot_config, 'configs/chatbot.yaml')
This approach allows teams to maintain a library of configuration templates that can be easily customized for different use cases.
PART THREE: RETRIEVAL AUGMENTED GENERATION ARCHITECTURE
The Pipeline Pattern for Document Processing
RAG systems require processing documents through multiple stages: loading, parsing, chunking, embedding, and indexing. Each stage transforms the data in some way. The Pipeline pattern chains these transformations together in a flexible, composable way.
class DocumentProcessor:
"""Abstract base for document processing stages."""
def process(self, document):
"""Process a document and return transformed result."""
raise NotImplementedError
class PDFLoader(DocumentProcessor):
"""Load and extract text from PDF documents."""
def process(self, filepath):
"""Extract text from PDF file."""
import PyPDF2
text_content = []
with open(filepath, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
for page_num, page in enumerate(pdf_reader.pages):
text = page.extract_text()
text_content.append({
'page': page_num + 1,
'text': text,
'source': filepath
})
return {
'type': 'pdf',
'source': filepath,
'pages': text_content,
'total_pages': len(text_content)
}
class WordDocumentLoader(DocumentProcessor):
"""Load and extract text from Word documents."""
def process(self, filepath):
"""Extract text from DOCX file."""
from docx import Document
doc = Document(filepath)
paragraphs = []
for para_num, paragraph in enumerate(doc.paragraphs):
if paragraph.text.strip():
paragraphs.append({
'paragraph': para_num + 1,
'text': paragraph.text,
'source': filepath
})
return {
'type': 'docx',
'source': filepath,
'paragraphs': paragraphs,
'total_paragraphs': len(paragraphs)
}
class MarkdownLoader(DocumentProcessor):
"""Load and parse Markdown documents."""
def process(self, filepath):
"""Load and parse Markdown file."""
import markdown
from bs4 import BeautifulSoup
with open(filepath, 'r', encoding='utf-8') as f:
md_content = f.read()
# Convert to HTML for structured parsing
html = markdown.markdown(md_content, extensions=['extra', 'codehilite'])
soup = BeautifulSoup(html, 'html.parser')
sections = []
current_section = {'heading': None, 'content': []}
for element in soup.find_all(['h1', 'h2', 'h3', 'p', 'pre']):
if element.name in ['h1', 'h2', 'h3']:
if current_section['content']:
sections.append(current_section)
current_section = {
'heading': element.get_text(),
'level': int(element.name[1]),
'content': []
}
else:
current_section['content'].append(element.get_text())
if current_section['content']:
sections.append(current_section)
return {
'type': 'markdown',
'source': filepath,
'sections': sections,
'raw_content': md_content
}
class HTMLLoader(DocumentProcessor):
"""Load and extract text from HTML documents."""
def process(self, filepath):
"""Extract text from HTML file."""
from bs4 import BeautifulSoup
with open(filepath, 'r', encoding='utf-8') as f:
html_content = f.read()
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(['script', 'style']):
script.decompose()
# Extract text
text = soup.get_text()
# Clean up whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return {
'type': 'html',
'source': filepath,
'text': text,
'title': soup.title.string if soup.title else None
}
Now we implement chunking strategies as processing stages:
class ChunkingStrategy(DocumentProcessor):
"""Abstract base for text chunking strategies."""
def __init__(self, chunk_size, chunk_overlap):
"""Initialize with chunk parameters."""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
class FixedSizeChunker(ChunkingStrategy):
"""Chunk text into fixed-size segments with overlap."""
def process(self, document):
"""Chunk document text into fixed-size pieces."""
chunks = []
# Extract all text from document
if document['type'] == 'pdf':
text = ' '.join(page['text'] for page in document['pages'])
elif document['type'] == 'docx':
text = ' '.join(para['text'] for para in document['paragraphs'])
elif document['type'] == 'markdown':
text = document['raw_content']
elif document['type'] == 'html':
text = document['text']
else:
text = str(document)
# Split into chunks with overlap
start = 0
chunk_id = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end]
chunks.append({
'chunk_id': chunk_id,
'text': chunk_text,
'start_pos': start,
'end_pos': end,
'source': document.get('source', 'unknown')
})
chunk_id += 1
start += self.chunk_size - self.chunk_overlap
return {
**document,
'chunks': chunks,
'chunking_strategy': 'fixed_size',
'chunk_size': self.chunk_size,
'chunk_overlap': self.chunk_overlap
}
class SemanticChunker(ChunkingStrategy):
"""Chunk text based on semantic boundaries (sentences, paragraphs)."""
def process(self, document):
"""Chunk document at semantic boundaries."""
import nltk
# Ensure NLTK punkt tokenizer is available
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
chunks = []
# Extract text
if document['type'] == 'pdf':
text = ' '.join(page['text'] for page in document['pages'])
elif document['type'] == 'docx':
text = ' '.join(para['text'] for para in document['paragraphs'])
elif document['type'] == 'markdown':
text = document['raw_content']
elif document['type'] == 'html':
text = document['text']
else:
text = str(document)
# Split into sentences
sentences = nltk.sent_tokenize(text)
# Group sentences into chunks
current_chunk = []
current_length = 0
chunk_id = 0
for sentence in sentences:
sentence_length = len(sentence)
if current_length + sentence_length > self.chunk_size and current_chunk:
# Save current chunk
chunk_text = ' '.join(current_chunk)
chunks.append({
'chunk_id': chunk_id,
'text': chunk_text,
'sentence_count': len(current_chunk),
'source': document.get('source', 'unknown')
})
# Start new chunk with overlap
overlap_sentences = current_chunk[-self.chunk_overlap:] if self.chunk_overlap > 0 else []
current_chunk = overlap_sentences + [sentence]
current_length = sum(len(s) for s in current_chunk)
chunk_id += 1
else:
current_chunk.append(sentence)
current_length += sentence_length
# Add final chunk
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append({
'chunk_id': chunk_id,
'text': chunk_text,
'sentence_count': len(current_chunk),
'source': document.get('source', 'unknown')
})
return {
**document,
'chunks': chunks,
'chunking_strategy': 'semantic',
'chunk_size': self.chunk_size
}
Now we create a pipeline that chains these processors together:
class DocumentPipeline:
"""Pipeline for chaining document processors."""
def __init__(self):
"""Initialize empty pipeline."""
self.stages = []
def add_stage(self, processor):
"""Add a processing stage to the pipeline."""
self.stages.append(processor)
return self
def process(self, input_data):
"""Process input through all pipeline stages."""
result = input_data
for stage in self.stages:
result = stage.process(result)
return result
Using the pipeline is straightforward and highly composable:
# Create a pipeline for PDF processing
pdf_pipeline = (DocumentPipeline()
.add_stage(PDFLoader())
.add_stage(SemanticChunker(chunk_size=1000, chunk_overlap=2)))
# Process a PDF document
processed_doc = pdf_pipeline.process('documents/research_paper.pdf')
# Create a different pipeline for Markdown
markdown_pipeline = (DocumentPipeline()
.add_stage(MarkdownLoader())
.add_stage(FixedSizeChunker(chunk_size=500, chunk_overlap=50)))
The pipeline pattern makes it easy to experiment with different processing strategies by simply swapping out stages.
The Strategy Pattern for Embedding and Ranking
Once documents are chunked, we need to embed them for semantic search and rank retrieved chunks by relevance. Different embedding models and ranking algorithms have different trade-offs between speed, accuracy, and resource usage.
class EmbeddingStrategy:
"""Abstract interface for text embedding."""
def embed_text(self, text):
"""Generate embedding vector for text."""
raise NotImplementedError
def embed_batch(self, texts):
"""Generate embeddings for multiple texts efficiently."""
raise NotImplementedError
class HuggingFaceEmbedding(EmbeddingStrategy):
"""Embedding using HuggingFace sentence transformers."""
def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2'):
"""Initialize with specific embedding model."""
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model_name)
def embed_text(self, text):
"""Generate embedding for single text."""
return self.model.encode(text, convert_to_numpy=True)
def embed_batch(self, texts):
"""Generate embeddings for batch of texts."""
return self.model.encode(texts, convert_to_numpy=True, show_progress_bar=True)
class OpenAIEmbedding(EmbeddingStrategy):
"""Embedding using OpenAI's embedding API."""
def __init__(self, api_key, model='text-embedding-ada-002'):
"""Initialize with API key and model."""
import openai
self.client = openai.OpenAI(api_key=api_key)
self.model = model
def embed_text(self, text):
"""Generate embedding for single text."""
response = self.client.embeddings.create(
input=text,
model=self.model
)
return response.data[0].embedding
def embed_batch(self, texts):
"""Generate embeddings for batch of texts."""
response = self.client.embeddings.create(
input=texts,
model=self.model
)
return [item.embedding for item in response.data]
Now we define ranking strategies:
class RankingStrategy:
"""Abstract interface for ranking retrieved chunks."""
def rank(self, query_embedding, chunk_embeddings, chunks):
"""Rank chunks by relevance to query."""
raise NotImplementedError
class CosineSimilarityRanker(RankingStrategy):
"""Rank chunks by cosine similarity to query."""
def rank(self, query_embedding, chunk_embeddings, chunks):
"""Compute cosine similarity and rank chunks."""
import numpy as np
# Normalize embeddings
query_norm = query_embedding / np.linalg.norm(query_embedding)
chunk_norms = chunk_embeddings / np.linalg.norm(chunk_embeddings, axis=1, keepdims=True)
# Compute cosine similarities
similarities = np.dot(chunk_norms, query_norm)
# Create ranked list
ranked_indices = np.argsort(similarities)[::-1]
ranked_chunks = []
for idx in ranked_indices:
ranked_chunks.append({
**chunks[idx],
'similarity_score': float(similarities[idx])
})
return ranked_chunks
class CrossEncoderRanker(RankingStrategy):
"""Rank chunks using cross-encoder reranking."""
def __init__(self, model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'):
"""Initialize cross-encoder model."""
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rank(self, query_embedding, chunk_embeddings, chunks):
"""Rerank chunks using cross-encoder."""
# Extract query text (assuming it's stored somewhere)
# For this example, we'll need the original query text
# This is a limitation we'll address in the architecture
# Create query-chunk pairs
query_text = chunks[0].get('query_text', '') # Placeholder
pairs = [[query_text, chunk['text']] for chunk in chunks]
# Score pairs
scores = self.model.predict(pairs)
# Rank by scores
ranked_indices = np.argsort(scores)[::-1]
ranked_chunks = []
for idx in ranked_indices:
ranked_chunks.append({
**chunks[idx],
'rerank_score': float(scores[idx])
})
return ranked_chunks
These strategies can be combined in a retrieval system:
class RetrievalSystem:
"""System for retrieving and ranking relevant chunks."""
def __init__(self, embedding_strategy, ranking_strategy):
"""Initialize with embedding and ranking strategies."""
self.embedding = embedding_strategy
self.ranking = ranking_strategy
self.chunk_embeddings = []
self.chunks = []
def index_chunks(self, chunks):
"""Index chunks by computing and storing their embeddings."""
texts = [chunk['text'] for chunk in chunks]
self.chunk_embeddings = self.embedding.embed_batch(texts)
self.chunks = chunks
def retrieve(self, query, top_k=5):
"""Retrieve and rank top-k most relevant chunks."""
# Embed query
query_embedding = self.embedding.embed_text(query)
# Rank chunks
ranked_chunks = self.ranking.rank(
query_embedding,
self.chunk_embeddings,
self.chunks
)
# Return top-k
return ranked_chunks[:top_k]
The strategy pattern allows us to easily swap between different embedding models and ranking algorithms based on requirements.
PART FOUR: VECTOR DATABASE ABSTRACTION
The Repository Pattern for Vector Storage
Vector databases like Pinecone, Weaviate, Qdrant, and ChromaDB each have different APIs. The Repository pattern provides a unified interface for storing and retrieving vectors, making it easy to switch between databases.
class VectorRepository:
"""Abstract interface for vector database operations."""
def create_collection(self, name, dimension):
"""Create a new collection for vectors."""
raise NotImplementedError
def insert_vectors(self, collection_name, vectors, metadata):
"""Insert vectors with associated metadata."""
raise NotImplementedError
def search_vectors(self, collection_name, query_vector, top_k=5):
"""Search for similar vectors."""
raise NotImplementedError
def delete_collection(self, name):
"""Delete a collection."""
raise NotImplementedError
class ChromaDBRepository(VectorRepository):
"""ChromaDB implementation of vector repository."""
def __init__(self, persist_directory='./chroma_db'):
"""Initialize ChromaDB client."""
import chromadb
self.client = chromadb.PersistentClient(path=persist_directory)
def create_collection(self, name, dimension):
"""Create ChromaDB collection."""
return self.client.create_collection(
name=name,
metadata={'dimension': dimension}
)
def insert_vectors(self, collection_name, vectors, metadata):
"""Insert vectors into ChromaDB collection."""
collection = self.client.get_collection(collection_name)
# Generate IDs
ids = [f"doc_{i}" for i in range(len(vectors))]
# Insert
collection.add(
embeddings=vectors.tolist() if hasattr(vectors, 'tolist') else vectors,
metadatas=metadata,
ids=ids
)
def search_vectors(self, collection_name, query_vector, top_k=5):
"""Search ChromaDB for similar vectors."""
collection = self.client.get_collection(collection_name)
results = collection.query(
query_embeddings=[query_vector.tolist() if hasattr(query_vector, 'tolist') else query_vector],
n_results=top_k
)
return results
def delete_collection(self, name):
"""Delete ChromaDB collection."""
self.client.delete_collection(name)
class QdrantRepository(VectorRepository):
"""Qdrant implementation of vector repository."""
def __init__(self, host='localhost', port=6333):
"""Initialize Qdrant client."""
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
self.client = QdrantClient(host=host, port=port)
self.Distance = Distance
self.VectorParams = VectorParams
def create_collection(self, name, dimension):
"""Create Qdrant collection."""
self.client.create_collection(
collection_name=name,
vectors_config=self.VectorParams(
size=dimension,
distance=self.Distance.COSINE
)
)
def insert_vectors(self, collection_name, vectors, metadata):
"""Insert vectors into Qdrant collection."""
from qdrant_client.models import PointStruct
points = []
for i, (vector, meta) in enumerate(zip(vectors, metadata)):
points.append(
PointStruct(
id=i,
vector=vector.tolist() if hasattr(vector, 'tolist') else vector,
payload=meta
)
)
self.client.upsert(
collection_name=collection_name,
points=points
)
def search_vectors(self, collection_name, query_vector, top_k=5):
"""Search Qdrant for similar vectors."""
results = self.client.search(
collection_name=collection_name,
query_vector=query_vector.tolist() if hasattr(query_vector, 'tolist') else query_vector,
limit=top_k
)
return results
def delete_collection(self, name):
"""Delete Qdrant collection."""
self.client.delete_collection(collection_name=name)
Now applications can work with any vector database through the same interface:
def build_rag_index(vector_repo, chunks, embedding_strategy):
"""Build RAG index using any vector database."""
# Create collection
vector_repo.create_collection('documents', dimension=384)
# Embed chunks
texts = [chunk['text'] for chunk in chunks]
embeddings = embedding_strategy.embed_batch(texts)
# Prepare metadata
metadata = [{'text': chunk['text'], 'source': chunk['source']} for chunk in chunks]
# Insert into vector database
vector_repo.insert_vectors('documents', embeddings, metadata)
This function works identically whether using ChromaDB, Qdrant, or any other vector database.
PART FIVE: ADVANCED RAG PATTERNS
The Observer Pattern for Document Monitoring
Production RAG systems need to automatically detect and process new documents as they arrive. The Observer pattern allows components to subscribe to document events and react accordingly.
class DocumentEvent:
"""Event representing a document change."""
def __init__(self, event_type, filepath, timestamp):
"""Initialize document event."""
self.event_type = event_type # 'created', 'modified', 'deleted'
self.filepath = filepath
self.timestamp = timestamp
class DocumentObserver:
"""Abstract observer for document events."""
def on_document_added(self, event):
"""Handle document addition event."""
pass
def on_document_modified(self, event):
"""Handle document modification event."""
pass
def on_document_deleted(self, event):
"""Handle document deletion event."""
pass
class DocumentWatcher:
"""Watches directory for document changes and notifies observers."""
def __init__(self, watch_directory):
"""Initialize watcher for specific directory."""
self.watch_directory = watch_directory
self.observers = []
def attach_observer(self, observer):
"""Attach an observer to receive notifications."""
self.observers.append(observer)
def detach_observer(self, observer):
"""Detach an observer."""
self.observers.remove(observer)
def notify_observers(self, event):
"""Notify all observers of an event."""
for observer in self.observers:
if event.event_type == 'created':
observer.on_document_added(event)
elif event.event_type == 'modified':
observer.on_document_modified(event)
elif event.event_type == 'deleted':
observer.on_document_deleted(event)
def start_watching(self):
"""Start watching directory for changes."""
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
class Handler(FileSystemEventHandler):
def __init__(self, watcher):
self.watcher = watcher
def on_created(self, event):
if not event.is_directory:
doc_event = DocumentEvent('created', event.src_path, time.time())
self.watcher.notify_observers(doc_event)
def on_modified(self, event):
if not event.is_directory:
doc_event = DocumentEvent('modified', event.src_path, time.time())
self.watcher.notify_observers(doc_event)
def on_deleted(self, event):
if not event.is_directory:
doc_event = DocumentEvent('deleted', event.src_path, time.time())
self.watcher.notify_observers(doc_event)
event_handler = Handler(self)
observer = Observer()
observer.schedule(event_handler, self.watch_directory, recursive=True)
observer.start()
return observer
Now we create an observer that automatically processes new documents:
class AutoIndexingObserver(DocumentObserver):
"""Observer that automatically indexes new documents."""
def __init__(self, pipeline, vector_repo, embedding_strategy):
"""Initialize with processing pipeline and storage."""
self.pipeline = pipeline
self.vector_repo = vector_repo
self.embedding = embedding_strategy
def on_document_added(self, event):
"""Process and index newly added document."""
print(f"Processing new document: {event.filepath}")
try:
# Process document through pipeline
processed = self.pipeline.process(event.filepath)
# Extract chunks
chunks = processed.get('chunks', [])
if chunks:
# Embed chunks
texts = [chunk['text'] for chunk in chunks]
embeddings = self.embedding.embed_batch(texts)
# Prepare metadata
metadata = [
{
'text': chunk['text'],
'source': event.filepath,
'chunk_id': chunk['chunk_id']
}
for chunk in chunks
]
# Insert into vector database
self.vector_repo.insert_vectors('documents', embeddings, metadata)
print(f"Indexed {len(chunks)} chunks from {event.filepath}")
except Exception as e:
print(f"Error processing {event.filepath}: {e}")
def on_document_modified(self, event):
"""Reprocess modified document."""
print(f"Reprocessing modified document: {event.filepath}")
# In production, we would delete old chunks first
self.on_document_added(event)
Using the observer pattern for automatic indexing:
# Create document watcher
watcher = DocumentWatcher('./documents')
# Create auto-indexing observer
auto_indexer = AutoIndexingObserver(
pipeline=pdf_pipeline,
vector_repo=vector_repo,
embedding_strategy=embedding_strategy
)
# Attach observer to watcher
watcher.attach_observer(auto_indexer)
# Start watching
observer = watcher.start_watching()
Now any document added to the watched directory is automatically processed and indexed.
GraphRAG Integration with the Composite Pattern
GraphRAG extends traditional RAG by representing documents as knowledge graphs, capturing relationships between entities. The Composite pattern allows us to treat individual nodes and subgraphs uniformly.
class GraphNode:
"""Represents a node in the knowledge graph."""
def __init__(self, node_id, node_type, content, metadata=None):
"""Initialize graph node."""
self.node_id = node_id
self.node_type = node_type
self.content = content
self.metadata = metadata or {}
self.edges = []
def add_edge(self, target_node, edge_type, weight=1.0):
"""Add edge to another node."""
self.edges.append({
'target': target_node,
'type': edge_type,
'weight': weight
})
def get_neighbors(self, edge_type=None):
"""Get neighboring nodes, optionally filtered by edge type."""
if edge_type:
return [edge['target'] for edge in self.edges if edge['type'] == edge_type]
return [edge['target'] for edge in self.edges]
class KnowledgeGraph:
"""Knowledge graph for GraphRAG."""
def __init__(self):
"""Initialize empty knowledge graph."""
self.nodes = {}
self.node_embeddings = {}
def add_node(self, node):
"""Add node to graph."""
self.nodes[node.node_id] = node
def add_edge(self, source_id, target_id, edge_type, weight=1.0):
"""Add edge between nodes."""
if source_id in self.nodes and target_id in self.nodes:
self.nodes[source_id].add_edge(
self.nodes[target_id],
edge_type,
weight
)
def embed_nodes(self, embedding_strategy):
"""Compute embeddings for all nodes."""
for node_id, node in self.nodes.items():
embedding = embedding_strategy.embed_text(node.content)
self.node_embeddings[node_id] = embedding
def find_relevant_subgraph(self, query, embedding_strategy, max_nodes=10):
"""Find subgraph relevant to query."""
import numpy as np
# Embed query
query_embedding = embedding_strategy.embed_text(query)
# Compute similarities to all nodes
similarities = {}
for node_id, node_embedding in self.node_embeddings.items():
query_norm = query_embedding / np.linalg.norm(query_embedding)
node_norm = node_embedding / np.linalg.norm(node_embedding)
similarity = np.dot(query_norm, node_norm)
similarities[node_id] = similarity
# Get top nodes
top_node_ids = sorted(similarities.keys(), key=lambda x: similarities[x], reverse=True)[:max_nodes]
# Build subgraph including neighbors
subgraph_nodes = set(top_node_ids)
for node_id in top_node_ids:
neighbors = self.nodes[node_id].get_neighbors()
subgraph_nodes.update(n.node_id for n in neighbors)
return {
'nodes': [self.nodes[nid] for nid in subgraph_nodes],
'relevance_scores': {nid: similarities.get(nid, 0.0) for nid in subgraph_nodes}
}
class GraphRAGBuilder:
"""Builds knowledge graph from documents."""
def __init__(self, entity_extractor):
"""Initialize with entity extraction strategy."""
self.entity_extractor = entity_extractor
def build_graph(self, documents):
"""Build knowledge graph from documents."""
graph = KnowledgeGraph()
for doc in documents:
# Extract entities and relationships
entities = self.entity_extractor.extract_entities(doc['text'])
relationships = self.entity_extractor.extract_relationships(doc['text'])
# Add entity nodes
for entity in entities:
node = GraphNode(
node_id=entity['id'],
node_type=entity['type'],
content=entity['text'],
metadata={'source': doc.get('source')}
)
graph.add_node(node)
# Add relationship edges
for rel in relationships:
graph.add_edge(
source_id=rel['source'],
target_id=rel['target'],
edge_type=rel['type'],
weight=rel.get('confidence', 1.0)
)
return graph
GraphRAG provides richer context by traversing the knowledge graph to find related information.
PART SIX: PERFORMANCE OPTIMIZATION PATTERNS
The Proxy Pattern for Caching
LLM inference is expensive. Caching responses for identical or similar queries can dramatically improve performance. The Proxy pattern intercepts requests and serves cached responses when possible.
import hashlib
import json
import time
class CacheProxy:
"""Proxy that caches LLM responses."""
def __init__(self, model_adapter, cache_backend):
"""Initialize with model adapter and cache backend."""
self.model = model_adapter
self.cache = cache_backend
def _compute_cache_key(self, prompt, **kwargs):
"""Compute cache key from prompt and parameters."""
# Create deterministic representation
cache_input = {
'prompt': prompt,
'params': kwargs
}
cache_str = json.dumps(cache_input, sort_keys=True)
return hashlib.sha256(cache_str.encode()).hexdigest()
def generate(self, prompt, **kwargs):
"""Generate with caching."""
# Compute cache key
cache_key = self._compute_cache_key(prompt, **kwargs)
# Check cache
cached_response = self.cache.get(cache_key)
if cached_response is not None:
print("Cache hit!")
return cached_response
# Generate response
print("Cache miss, generating...")
response = self.model.generate(prompt, **kwargs)
# Store in cache
self.cache.set(cache_key, response)
return response
def generate_stream(self, prompt, **kwargs):
"""Streaming generation (not cached)."""
# Streaming responses are not cached
return self.model.generate_stream(prompt, **kwargs)
class MemoryCache:
"""Simple in-memory cache backend."""
def __init__(self, max_size=1000, ttl=3600):
"""Initialize with max size and time-to-live."""
self.cache = {}
self.timestamps = {}
self.max_size = max_size
self.ttl = ttl
def get(self, key):
"""Get value from cache if not expired."""
if key in self.cache:
# Check if expired
if time.time() - self.timestamps[key] < self.ttl:
return self.cache[key]
else:
# Remove expired entry
del self.cache[key]
del self.timestamps[key]
return None
def set(self, key, value):
"""Set value in cache."""
# Evict oldest if at capacity
if len(self.cache) >= self.max_size:
oldest_key = min(self.timestamps.keys(), key=lambda k: self.timestamps[k])
del self.cache[oldest_key]
del self.timestamps[oldest_key]
self.cache[key] = value
self.timestamps[key] = time.time()
class RedisCache:
"""Redis-backed cache for distributed systems."""
def __init__(self, host='localhost', port=6379, ttl=3600):
"""Initialize Redis connection."""
import redis
self.redis = redis.Redis(host=host, port=port, decode_responses=True)
self.ttl = ttl
def get(self, key):
"""Get value from Redis."""
value = self.redis.get(key)
if value:
return json.loads(value)
return None
def set(self, key, value):
"""Set value in Redis with TTL."""
self.redis.setex(key, self.ttl, json.dumps(value))
Using the cache proxy:
# Wrap model adapter with caching proxy
cache = MemoryCache(max_size=500, ttl=1800)
cached_model = CacheProxy(model_adapter, cache)
# First call generates response
response1 = cached_model.generate("What is the capital of France?", temperature=0.7)
# Second identical call returns cached response instantly
response2 = cached_model.generate("What is the capital of France?", temperature=0.7)
The proxy pattern transparently adds caching without modifying the model adapter or client code.
The Lazy Loading Pattern for Model Management
Loading large LLM models into memory is expensive. The Lazy Loading pattern defers model loading until actually needed, and can unload models when memory pressure is high.
class LazyModelLoader:
"""Lazy loads models on first use."""
def __init__(self, model_config, factory):
"""Initialize with config but don't load model yet."""
self.config = model_config
self.factory = factory
self.model = None
self.last_used = None
def _ensure_loaded(self):
"""Load model if not already loaded."""
if self.model is None:
print(f"Loading model: {self.config.get('model_name', 'unknown')}")
self.model = self.factory.create_model(self.config)
self.last_used = time.time()
def generate(self, prompt, **kwargs):
"""Generate, loading model if necessary."""
self._ensure_loaded()
adapter = HuggingFaceAdapter(self.model)
return adapter.generate(prompt, **kwargs)
def unload(self):
"""Unload model from memory."""
if self.model is not None:
print(f"Unloading model: {self.config.get('model_name', 'unknown')}")
del self.model
self.model = None
import gc
gc.collect()
# Clear CUDA cache if using GPU
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except:
pass
class ModelPool:
"""Manages pool of lazy-loaded models with memory limits."""
def __init__(self, max_loaded_models=2):
"""Initialize with maximum number of simultaneously loaded models."""
self.models = {}
self.max_loaded = max_loaded_models
def register_model(self, name, config, factory):
"""Register a model configuration."""
self.models[name] = LazyModelLoader(config, factory)
def get_model(self, name):
"""Get model, managing memory limits."""
if name not in self.models:
raise ValueError(f"Model '{name}' not registered")
# Count currently loaded models
loaded_models = [m for m in self.models.values() if m.model is not None]
# Unload least recently used if at capacity
if len(loaded_models) >= self.max_loaded:
lru_model = min(loaded_models, key=lambda m: m.last_used)
lru_model.unload()
return self.models[name]
Using the model pool:
# Create model pool
pool = ModelPool(max_loaded_models=2)
# Register multiple models
pool.register_model('llama-7b', llama_config, HuggingFaceModelFactory())
pool.register_model('llama-13b', llama_13b_config, HuggingFaceModelFactory())
pool.register_model('gpt-3.5', gpt_config, OpenAIModelFactory())
# Use models - they load on demand
small_model = pool.get_model('llama-7b')
response = small_model.generate("Hello")
# Switching models manages memory automatically
large_model = pool.get_model('llama-13b') # May unload llama-7b if at capacity
This pattern is crucial for applications that need to work with multiple large models on resource-constrained hardware.
PART SEVEN: CONTEXT WINDOW MANAGEMENT
The Sliding Window Pattern for Long Conversations
LLMs have limited context windows. Long conversations exceed these limits. The Sliding Window pattern maintains a fixed-size window of recent conversation history.
class ConversationHistory:
"""Manages conversation history with context window limits."""
def __init__(self, max_tokens, tokenizer):
"""Initialize with maximum token limit."""
self.max_tokens = max_tokens
self.tokenizer = tokenizer
self.messages = []
def add_message(self, role, content):
"""Add message to history."""
self.messages.append({
'role': role,
'content': content,
'tokens': len(self.tokenizer.encode(content))
})
def get_context_window(self):
"""Get messages that fit in context window."""
total_tokens = 0
window = []
# Start from most recent and work backwards
for message in reversed(self.messages):
if total_tokens + message['tokens'] <= self.max_tokens:
window.insert(0, message)
total_tokens += message['tokens']
else:
break
return window
def get_formatted_history(self):
"""Get formatted history for model input."""
window = self.get_context_window()
return [{'role': m['role'], 'content': m['content']} for m in window]
class SummarizingHistory:
"""Manages history with summarization for very long conversations."""
def __init__(self, max_tokens, tokenizer, summarizer):
"""Initialize with token limit and summarization model."""
self.max_tokens = max_tokens
self.tokenizer = tokenizer
self.summarizer = summarizer
self.messages = []
self.summary = None
def add_message(self, role, content):
"""Add message and summarize if needed."""
self.messages.append({
'role': role,
'content': content,
'tokens': len(self.tokenizer.encode(content))
})
# Check if we need to summarize
total_tokens = sum(m['tokens'] for m in self.messages)
if total_tokens > self.max_tokens:
self._summarize_old_messages()
def _summarize_old_messages(self):
"""Summarize older messages to save space."""
# Keep recent messages, summarize older ones
recent_tokens = 0
split_point = len(self.messages)
for i in range(len(self.messages) - 1, -1, -1):
recent_tokens += self.messages[i]['tokens']
if recent_tokens >= self.max_tokens // 2:
split_point = i
break
# Summarize messages before split point
if split_point > 0:
old_messages = self.messages[:split_point]
old_text = '\n'.join(f"{m['role']}: {m['content']}" for m in old_messages)
summary_prompt = f"Summarize this conversation concisely:\n\n{old_text}"
self.summary = self.summarizer.generate(summary_prompt, max_tokens=200)
# Keep only recent messages
self.messages = self.messages[split_point:]
def get_formatted_history(self):
"""Get formatted history including summary."""
history = []
if self.summary:
history.append({
'role': 'system',
'content': f"Previous conversation summary: {self.summary}"
})
history.extend([
{'role': m['role'], 'content': m['content']}
for m in self.messages
])
return history
These patterns ensure conversations never exceed context limits while preserving important information.
PART EIGHT: TEMPLATE MANAGEMENT
The Template Method Pattern for Prompt Engineering
Different tasks require different prompt structures. The Template Method pattern defines the skeleton of prompt construction while allowing customization of specific steps.
class PromptTemplate:
"""Abstract template for prompt construction."""
def build_prompt(self, user_input, context=None):
"""Build complete prompt from components."""
parts = []
# Add system message if present
system_msg = self.get_system_message()
if system_msg:
parts.append(system_msg)
# Add context if provided
if context:
context_msg = self.format_context(context)
parts.append(context_msg)
# Add user input
user_msg = self.format_user_input(user_input)
parts.append(user_msg)
# Add any task-specific instructions
instructions = self.get_task_instructions()
if instructions:
parts.append(instructions)
return '\n\n'.join(parts)
def get_system_message(self):
"""Override to provide system message."""
return None
def format_context(self, context):
"""Override to customize context formatting."""
return f"Context:\n{context}"
def format_user_input(self, user_input):
"""Override to customize user input formatting."""
return f"User: {user_input}"
def get_task_instructions(self):
"""Override to add task-specific instructions."""
return None
class RAGPromptTemplate(PromptTemplate):
"""Template for RAG-based question answering."""
def get_system_message(self):
"""System message for RAG."""
return ("You are a helpful assistant. Answer questions based on the provided context. "
"If the context doesn't contain relevant information, say so.")
def format_context(self, context):
"""Format retrieved context."""
if isinstance(context, list):
formatted_chunks = []
for i, chunk in enumerate(context, 1):
formatted_chunks.append(f"[{i}] {chunk['text']}")
return "Retrieved Information:\n" + '\n\n'.join(formatted_chunks)
return f"Context:\n{context}"
def get_task_instructions(self):
"""Instructions for RAG task."""
return "Please provide a comprehensive answer based on the retrieved information above."
class CodeGenerationTemplate(PromptTemplate):
"""Template for code generation tasks."""
def __init__(self, language='python'):
"""Initialize with target programming language."""
self.language = language
def get_system_message(self):
"""System message for code generation."""
return f"You are an expert {self.language} programmer. Generate clean, well-documented code."
def format_user_input(self, user_input):
"""Format code generation request."""
return f"Task: {user_input}\n\nGenerate {self.language} code for this task."
def get_task_instructions(self):
"""Instructions for code generation."""
return ("Include comments explaining the code. "
"Follow best practices and coding standards.")
class SummarizationTemplate(PromptTemplate):
"""Template for text summarization."""
def __init__(self, max_length=None):
"""Initialize with optional length constraint."""
self.max_length = max_length
def get_system_message(self):
"""System message for summarization."""
return "You are a skilled summarizer. Create concise, accurate summaries."
def format_context(self, context):
"""Format text to summarize."""
return f"Text to summarize:\n\n{context}"
def get_task_instructions(self):
"""Instructions for summarization."""
if self.max_length:
return f"Provide a summary in no more than {self.max_length} words."
return "Provide a concise summary of the main points."
Using templates:
# RAG question answering
rag_template = RAGPromptTemplate()
prompt = rag_template.build_prompt(
user_input="What are the main benefits of exercise?",
context=retrieved_chunks
)
# Code generation
code_template = CodeGenerationTemplate(language='python')
prompt = code_template.build_prompt(
user_input="Create a function to calculate fibonacci numbers"
)
# Summarization
summary_template = SummarizationTemplate(max_length=100)
prompt = summary_template.build_prompt(
user_input="Summarize this article",
context=article_text
)
Templates ensure consistent prompt structure while allowing customization for different tasks.
PART NINE: STREAMING AND REAL-TIME RESPONSES
The Iterator Pattern for Response Streaming
Streaming responses improves user experience by showing results as they generate. The Iterator pattern provides a uniform interface for consuming streamed tokens.
class ResponseIterator:
"""Iterator for streaming LLM responses."""
def __init__(self, stream_generator):
"""Initialize with underlying stream generator."""
self.generator = stream_generator
self.accumulated = ""
def __iter__(self):
"""Return iterator object."""
return self
def __next__(self):
"""Get next token from stream."""
try:
token = next(self.generator)
self.accumulated += token
return token
except StopIteration:
raise StopIteration
def get_accumulated(self):
"""Get all accumulated tokens so far."""
return self.accumulated
class StreamingResponseHandler:
"""Handles streaming responses with callbacks."""
def __init__(self):
"""Initialize handler."""
self.callbacks = []
def add_callback(self, callback):
"""Add callback to be called for each token."""
self.callbacks.append(callback)
def handle_stream(self, response_iterator):
"""Process stream and invoke callbacks."""
for token in response_iterator:
for callback in self.callbacks:
callback(token)
# Call completion callbacks
final_response = response_iterator.get_accumulated()
for callback in self.callbacks:
if hasattr(callback, 'on_complete'):
callback.on_complete(final_response)
class ConsoleStreamCallback:
"""Callback that prints tokens to console."""
def __call__(self, token):
"""Print token without newline."""
print(token, end='', flush=True)
def on_complete(self, final_response):
"""Print newline on completion."""
print()
class WebSocketStreamCallback:
"""Callback that sends tokens over WebSocket."""
def __init__(self, websocket):
"""Initialize with WebSocket connection."""
self.websocket = websocket
def __call__(self, token):
"""Send token over WebSocket."""
self.websocket.send(json.dumps({'type': 'token', 'content': token}))
def on_complete(self, final_response):
"""Send completion message."""
self.websocket.send(json.dumps({'type': 'complete', 'content': final_response}))
Using streaming with callbacks:
# Create streaming handler
handler = StreamingResponseHandler()
# Add console output callback
handler.add_callback(ConsoleStreamCallback())
# Generate streaming response
stream = model_adapter.generate_stream("Tell me a story", max_tokens=500)
iterator = ResponseIterator(stream)
# Process stream
handler.handle_stream(iterator)
This pattern makes it easy to handle streaming responses in different contexts like console applications, web servers, or real-time chat interfaces.
PART TEN: MULTIMODAL INTEGRATION
The Composite Pattern for Multimodal Inputs
Modern applications need to handle text, images, audio, and video. The Composite pattern allows treating individual modalities and combinations uniformly.
class ModalityInput:
"""Abstract base for different input modalities."""
def get_type(self):
"""Return modality type."""
raise NotImplementedError
def get_content(self):
"""Return content in appropriate format."""
raise NotImplementedError
def to_model_format(self, model_type):
"""Convert to format expected by specific model."""
raise NotImplementedError
class TextInput(ModalityInput):
"""Text input modality."""
def __init__(self, text):
"""Initialize with text content."""
self.text = text
def get_type(self):
"""Return text type."""
return 'text'
def get_content(self):
"""Return text content."""
return self.text
def to_model_format(self, model_type):
"""Convert to model-specific format."""
if model_type == 'openai':
return {'type': 'text', 'text': self.text}
elif model_type == 'huggingface':
return self.text
return self.text
class ImageInput(ModalityInput):
"""Image input modality."""
def __init__(self, image_path=None, image_data=None):
"""Initialize with image path or data."""
self.image_path = image_path
self.image_data = image_data
def get_type(self):
"""Return image type."""
return 'image'
def get_content(self):
"""Return image data."""
if self.image_data is not None:
return self.image_data
from PIL import Image
return Image.open(self.image_path)
def to_model_format(self, model_type):
"""Convert to model-specific format."""
if model_type == 'openai':
import base64
from io import BytesIO
img = self.get_content()
buffered = BytesIO()
img.save(buffered, format='PNG')
img_str = base64.b64encode(buffered.getvalue()).decode()
return {
'type': 'image_url',
'image_url': {'url': f'data:image/png;base64,{img_str}'}
}
elif model_type == 'huggingface':
return self.get_content()
return self.get_content()
class MultimodalInput:
"""Composite input containing multiple modalities."""
def __init__(self):
"""Initialize empty multimodal input."""
self.inputs = []
def add_input(self, modality_input):
"""Add input of any modality."""
self.inputs.append(modality_input)
def get_type(self):
"""Return composite type."""
return 'multimodal'
def get_content(self):
"""Return all inputs."""
return self.inputs
def to_model_format(self, model_type):
"""Convert all inputs to model format."""
return [inp.to_model_format(model_type) for inp in self.inputs]
class MultimodalAdapter:
"""Adapter for vision-language models."""
def __init__(self, model_dict):
"""Initialize with VLM model."""
self.model = model_dict['model']
self.processor = model_dict.get('processor')
self.provider = model_dict.get('provider', 'unknown')
def generate(self, multimodal_input, **kwargs):
"""Generate response from multimodal input."""
if self.provider == 'openai':
return self._generate_openai(multimodal_input, **kwargs)
elif self.provider == 'huggingface':
return self._generate_huggingface(multimodal_input, **kwargs)
def _generate_openai(self, multimodal_input, **kwargs):
"""Generate using OpenAI vision API."""
messages = [{
'role': 'user',
'content': multimodal_input.to_model_format('openai')
}]
response = self.model.chat.completions.create(
model='gpt-4-vision-preview',
messages=messages,
max_tokens=kwargs.get('max_tokens', 300)
)
return response.choices[0].message.content
def _generate_huggingface(self, multimodal_input, **kwargs):
"""Generate using HuggingFace VLM."""
inputs = multimodal_input.get_content()
# Separate text and images
text_parts = [inp.get_content() for inp in inputs if inp.get_type() == 'text']
image_parts = [inp.get_content() for inp in inputs if inp.get_type() == 'image']
# Process inputs
text = ' '.join(text_parts)
if self.processor:
processed = self.processor(
text=text,
images=image_parts if image_parts else None,
return_tensors='pt'
)
outputs = self.model.generate(**processed, max_new_tokens=kwargs.get('max_tokens', 300))
return self.processor.decode(outputs[0], skip_special_tokens=True)
return "VLM processing not fully configured"
Using multimodal inputs:
# Create multimodal input
mm_input = MultimodalInput()
mm_input.add_input(TextInput("What is in this image?"))
mm_input.add_input(ImageInput(image_path="photo.jpg"))
# Generate response
response = vlm_adapter.generate(mm_input, max_tokens=200)
The composite pattern makes it easy to work with any combination of modalities through a uniform interface.
PART ELEVEN: DEPLOYMENT PATTERNS
The Facade Pattern for Deployment Abstraction
Deploying LLM applications to different environments requires different configurations and setups. The Facade pattern provides a simple interface that hides deployment complexity.
class DeploymentFacade:
"""Simplified interface for deploying LLM applications."""
def __init__(self, config):
"""Initialize deployment with configuration."""
self.config = config
def deploy(self):
"""Deploy application to configured environment."""
raise NotImplementedError
def health_check(self):
"""Check deployment health."""
raise NotImplementedError
def scale(self, instances):
"""Scale deployment to specified instances."""
raise NotImplementedError
class LocalDeployment(DeploymentFacade):
"""Deploy LLM application locally."""
def deploy(self):
"""Start local server."""
from flask import Flask, request, jsonify
app = Flask(__name__)
# Initialize model
model_config = self.config.get('model', {})
registry = ModelRegistry()
registry.register_factory('huggingface', HuggingFaceModelFactory())
model_dict = registry.create_model(
model_config.get('provider', 'huggingface'),
model_config
)
adapter = HuggingFaceAdapter(model_dict)
@app.route('/generate', methods=['POST'])
def generate():
data = request.json
prompt = data.get('prompt', '')
response = adapter.generate(prompt, **data.get('params', {}))
return jsonify({'response': response})
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
port = self.config.get('port', 5000)
app.run(host='0.0.0.0', port=port)
def health_check(self):
"""Check if local server is running."""
import requests
try:
response = requests.get(f"http://localhost:{self.config.get('port', 5000)}/health")
return response.status_code == 200
except:
return False
class DockerDeployment(DeploymentFacade):
"""Deploy LLM application using Docker."""
def deploy(self):
"""Build and run Docker container."""
import subprocess
# Generate Dockerfile
dockerfile_content = self._generate_dockerfile()
with open('Dockerfile', 'w') as f:
f.write(dockerfile_content)
# Build image
image_name = self.config.get('image_name', 'llm-app')
subprocess.run(['docker', 'build', '-t', image_name, '.'], check=True)
# Run container
port = self.config.get('port', 5000)
subprocess.run([
'docker', 'run', '-d',
'-p', f'{port}:{port}',
'--name', 'llm-app-container',
image_name
], check=True)
def _generate_dockerfile(self):
"""Generate Dockerfile content."""
return """
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "app.py"] """
def health_check(self):
"""Check Docker container health."""
import subprocess
result = subprocess.run(
['docker', 'ps', '--filter', 'name=llm-app-container', '--format', '{{.Status}}'],
capture_output=True,
text=True
)
return 'Up' in result.stdout
class CloudDeployment(DeploymentFacade):
"""Deploy LLM application to cloud platform."""
def __init__(self, config):
"""Initialize cloud deployment."""
super().__init__(config)
self.platform = config.get('platform', 'aws')
def deploy(self):
"""Deploy to cloud platform."""
if self.platform == 'aws':
self._deploy_aws()
elif self.platform == 'gcp':
self._deploy_gcp()
elif self.platform == 'azure':
self._deploy_azure()
def _deploy_aws(self):
"""Deploy to AWS using ECS or Lambda."""
# This would use boto3 to deploy to AWS
print("Deploying to AWS...")
# Implementation would create ECR repository, push image, create ECS service, etc.
def _deploy_gcp(self):
"""Deploy to Google Cloud Platform."""
print("Deploying to GCP...")
# Implementation would use GCP APIs
def _deploy_azure(self):
"""Deploy to Microsoft Azure."""
print("Deploying to Azure...")
# Implementation would use Azure SDKs
Using the deployment facade:
# Local deployment
local_config = {
'model': {
'provider': 'huggingface',
'model_name': 'meta-llama/Llama-2-7b-hf',
'device': 'cuda:0'
},
'port': 5000
}
local_deploy = LocalDeployment(local_config)
local_deploy.deploy()
# Docker deployment
docker_config = {
'image_name': 'my-llm-app',
'port': 8080
}
docker_deploy = DockerDeployment(docker_config)
docker_deploy.deploy()
The facade pattern hides the complexity of different deployment targets behind a simple, consistent interface.
CONCLUSION
Building robust LLM applications requires careful architectural design. The patterns presented in this article provide a foundation for creating flexible, maintainable systems that can adapt to the rapidly evolving LLM landscape. The Strategy pattern enables swapping GPU backends and model providers. The Factory and Registry patterns centralize object creation. The Adapter pattern provides unified interfaces across heterogeneous systems. The Pipeline pattern chains document processing stages. The Repository pattern abstracts vector database operations. The Observer pattern enables reactive document processing. The Proxy pattern adds caching transparently. The Template Method pattern standardizes prompt construction.
These patterns work together synergistically. The complete running example demonstrates how they integrate into a production-ready framework supporting multiple GPU architectures, model providers, document types, vector databases, and deployment options. The framework handles real-world concerns like context window management, response streaming, caching, and conversation history.
The key insight is that classical design patterns, properly adapted, solve the unique challenges of LLM application development. By applying these patterns systematically, developers can build applications that are not only functional today but adaptable to tomorrow's innovations. As new models, frameworks, and techniques emerge, well-architected systems can incorporate them by implementing new strategies, factories, or adapters without disrupting existing functionality.
The future of LLM applications lies not just in more powerful models, but in better software architecture that makes these models accessible, reliable, and maintainable. The patterns and framework presented here provide a starting point for that journey.
No comments:
Post a Comment