INTRODUCTION
This tutorial presents a complete guide to building a sophisticated local Large Language Model system using Capability-Centric Architecture. The system we will construct provides an OpenAI-compatible API for serving LLM queries, complete with user registration, authentication, conversation history, and multi-GPU support. The architecture follows CCA principles to ensure clean separation of concerns, testability, and maintainability.
The Capability-Centric Architecture extends concepts from Domain-Driven Design, Hexagonal Architecture, and Clean Architecture. In CCA, we structure systems as collections of capabilities, where each capability represents a cohesive unit of functionality. Every capability consists of three fundamental layers: the Essence contains pure domain logic free from infrastructure concerns, the Realization provides infrastructure-specific implementations, and the Adaptation layer handles external interfaces. These layers communicate through well-defined Contracts, and the system manages evolution through Evolution Envelopes.
Our system will demonstrate how CCA principles apply to a real-world scenario involving containerization, distributed systems, GPU acceleration, authentication, and persistent storage. We will see how the architecture naturally accommodates different GPU backends including NVIDIA CUDA, AMD ROCm, Intel GPUs, and Apple Metal Performance Shaders without polluting our domain logic with hardware-specific concerns.
SYSTEM OVERVIEW
The system comprises several major capabilities working together to provide a complete LLM service. The LLM Inference Capability handles the core responsibility of processing natural language queries and generating responses. The User Management Capability manages user registration, authentication, and API token lifecycle. The Conversation History Capability maintains context across multiple interactions. The Authentication Capability validates API tokens and ensures secure access. The Email Confirmation Capability handles user verification through email workflows.
Each capability operates independently yet collaborates through well-defined contracts. The system deploys within Kubernetes, with the LLM inference running in containers that can leverage available GPU resources. A separate container handles user management, registration, and the web interface. An ingress controller routes external traffic to appropriate services.
The client applications interact with the system through two primary interfaces: an OpenAI-compatible REST API for LLM queries and a web interface for user registration and token management. The API requires authentication via tokens that users obtain through the registration process.
CAPABILITY-CENTRIC ARCHITECTURE FUNDAMENTALS
Before diving into implementation details, we must understand how CCA structures each capability. The Essence layer contains the pure domain logic, representing what the capability does without any concern for how it does it. This layer contains domain entities, value objects, business rules, and domain services. The Essence has no dependencies on frameworks, databases, or external systems.
The Realization layer implements the infrastructure-specific concerns. This is where we find database access, GPU integration, email sending, and other technical implementations. The Realization depends on the Essence but the Essence never depends on the Realization. This dependency inversion ensures that business logic remains pure and testable.
The Adaptation layer provides interfaces to the outside world. This includes REST API endpoints, message queue consumers, scheduled tasks, and other entry points. The Adaptation layer depends on both Essence and Realization, orchestrating their collaboration to fulfill external requests.
Contracts define the interfaces between capabilities and between layers within a capability. These contracts are expressed as abstract interfaces in the Essence layer, with concrete implementations in the Realization layer. This allows the Essence to define what it needs without knowing how those needs will be fulfilled.
Evolution Envelopes manage how capabilities change over time. They provide versioning strategies, backward compatibility mechanisms, and migration paths. In our system, we will see Evolution Envelopes in action when we version our API endpoints and manage database schema migrations.
Efficiency Gradients allow us to optimize critical paths while maintaining clean architecture for non-critical paths. In our LLM system, the inference path represents a critical performance path where we might allow tighter coupling to GPU libraries, while the user registration path can afford more abstraction layers.
THE LLM INFERENCE CAPABILITY
The LLM Inference Capability represents the heart of our system. Its Essence defines what it means to process a language model query without concerning itself with specific model formats, GPU types, or API protocols.
Essence Layer: Domain Model
The Essence defines several key domain entities. The InferenceRequest represents a query to the language model, containing the prompt text, generation parameters like temperature and maximum tokens, and optional conversation context. The InferenceResponse contains the generated text, token usage statistics, and metadata about the generation process.
Here is how we define these core domain entities:
class InferenceRequest:
def __init__(self, prompt, temperature=0.7, max_tokens=512,
conversation_id=None, system_prompt=None):
self.prompt = prompt
self.temperature = temperature
self.max_tokens = max_tokens
self.conversation_id = conversation_id
self.system_prompt = system_prompt
self.timestamp = None
def validate(self):
if not self.prompt or len(self.prompt.strip()) == 0:
raise ValueError("Prompt cannot be empty")
if self.temperature < 0 or self.temperature > 2:
raise ValueError("Temperature must be between 0 and 2")
if self.max_tokens < 1 or self.max_tokens > 4096:
raise ValueError("Max tokens must be between 1 and 4096")
return True
class InferenceResponse:
def __init__(self, generated_text, prompt_tokens,
completion_tokens, model_name):
self.generated_text = generated_text
self.prompt_tokens = prompt_tokens
self.completion_tokens = completion_tokens
self.total_tokens = prompt_tokens + completion_tokens
self.model_name = model_name
self.timestamp = None
The InferenceRequest class encapsulates all parameters needed for a generation request. The validate method enforces business rules about acceptable parameter ranges. Notice that this class has no knowledge of HTTP, JSON, databases, or GPU libraries. It represents pure domain concepts.
The domain service that orchestrates inference is defined as an abstract interface in the Essence layer:
from abc import ABC, abstractmethod
class InferenceService(ABC):
@abstractmethod
def generate(self, request):
"""Generate a response for the given inference request"""
pass
@abstractmethod
def get_model_info(self):
"""Return information about the loaded model"""
pass
@abstractmethod
def health_check(self):
"""Check if the inference service is healthy and ready"""
pass
This interface defines the contract that any concrete implementation must fulfill. The Essence layer can use this interface without knowing whether the implementation uses a local GPU, a remote API, or a mock for testing.
Realization Layer: GPU-Accelerated Inference
The Realization layer provides concrete implementations of the InferenceService contract. We need to support multiple GPU backends, which presents an interesting architectural challenge. Rather than creating a single monolithic implementation with conditional logic for each GPU type, we create separate Realization implementations for each backend, all conforming to the same contract.
First, we need a GPU detection and selection mechanism:
import platform
import subprocess
import os
class GPUDetector:
def __init__(self):
self.available_backends = []
self._detect_backends()
def _detect_backends(self):
"""Detect all available GPU backends on this system"""
if self._check_cuda():
self.available_backends.append('cuda')
if self._check_rocm():
self.available_backends.append('rocm')
if self._check_metal():
self.available_backends.append('metal')
if self._check_intel():
self.available_backends.append('intel')
if len(self.available_backends) == 0:
self.available_backends.append('cpu')
def _check_cuda(self):
"""Check if NVIDIA CUDA is available"""
try:
result = subprocess.run(['nvidia-smi'],
capture_output=True,
timeout=5)
return result.returncode == 0
except Exception:
return False
def _check_rocm(self):
"""Check if AMD ROCm is available"""
try:
result = subprocess.run(['rocm-smi'],
capture_output=True,
timeout=5)
return result.returncode == 0
except Exception:
return False
def _check_metal(self):
"""Check if Apple Metal is available"""
return platform.system() == 'Darwin'
def _check_intel(self):
"""Check if Intel GPU is available"""
if platform.system() == 'Linux':
return os.path.exists('/dev/dri')
return False
def get_preferred_backend(self):
"""Return the preferred backend based on availability"""
preference_order = ['cuda', 'rocm', 'metal', 'intel', 'cpu']
for backend in preference_order:
if backend in self.available_backends:
return backend
return 'cpu'
The GPUDetector class examines the system to determine which GPU backends are available. It checks for NVIDIA GPUs by attempting to run nvidia-smi, for AMD GPUs by checking for rocm-smi, for Apple Metal by detecting macOS, and for Intel GPUs by checking for the DRI device on Linux.
Now we implement the actual inference service using llama-cpp-python:
import time
from datetime import datetime
class LocalInferenceService(InferenceService):
def __init__(self, model_path, gpu_backend=None):
self.model_path = model_path
self.gpu_backend = gpu_backend or GPUDetector().get_preferred_backend()
self.model = None
self.model_info = {}
self._load_model()
def _load_model(self):
"""Load the language model with appropriate GPU backend"""
try:
from llama_cpp import Llama
n_gpu_layers = 0
if self.gpu_backend != 'cpu':
n_gpu_layers = -1 # Offload all layers to GPU
self.model = Llama(
model_path=self.model_path,
n_gpu_layers=n_gpu_layers,
n_ctx=2048,
n_batch=512,
verbose=False
)
self.model_info = {
'model_path': self.model_path,
'backend': self.gpu_backend,
'context_length': 2048,
'loaded_at': datetime.utcnow().isoformat()
}
except Exception as e:
raise RuntimeError(f"Failed to load model: {str(e)}")
def generate(self, request):
"""Generate text based on the inference request"""
if not self.model:
raise RuntimeError("Model not loaded")
request.validate()
prompt = request.prompt
if request.system_prompt:
prompt = f"System: {request.system_prompt}\n\nUser: {prompt}"
start_time = time.time()
result = self.model(
prompt,
max_tokens=request.max_tokens,
temperature=request.temperature,
stop=["User:", "\n\n"],
echo=False
)
generated_text = result['choices'][0]['text']
prompt_tokens = result['usage']['prompt_tokens']
completion_tokens = result['usage']['completion_tokens']
response = InferenceResponse(
generated_text=generated_text.strip(),
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
model_name=self.model_info.get('model_path', 'unknown')
)
response.timestamp = datetime.utcnow()
return response
def get_model_info(self):
"""Return information about the loaded model"""
return self.model_info.copy()
def health_check(self):
"""Check if the inference service is healthy and ready"""
if not self.model:
return {'status': 'unhealthy', 'reason': 'Model not loaded'}
try:
test_request = InferenceRequest(
prompt="Test",
max_tokens=5
)
self.generate(test_request)
return {'status': 'healthy', 'backend': self.gpu_backend}
except Exception as e:
return {'status': 'unhealthy', 'reason': str(e)}
The LocalInferenceService implements the InferenceService contract. It loads a GGUF format model using llama-cpp-python, which automatically handles different GPU backends. The generate method takes an InferenceRequest from the Essence layer, validates it, performs the actual inference, and returns an InferenceResponse.
Notice how the implementation details of GPU acceleration are hidden behind the InferenceService interface. The Essence layer never needs to know about llama-cpp-python, CUDA, or any other infrastructure concern.
Adaptation Layer: REST API
The Adaptation layer exposes the inference capability through a REST API. We use FastAPI to create an OpenAI-compatible endpoint:
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from typing import Optional, List
import uuid
from datetime import datetime
app = FastAPI(title="Local LLM API", version="1.0.0")
# Pydantic models for API request/response
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str = "local-llm"
messages: List[ChatMessage]
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = 512
stream: Optional[bool] = False
class ChatCompletionResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[dict]
usage: dict
# Dependency injection for authentication
async def verify_api_token(authorization: str = Header(None)):
"""Verify the API token from Authorization header"""
if not authorization:
raise HTTPException(status_code=401, detail="Missing authorization header")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization format")
token = authorization[7:] # Remove "Bearer " prefix
# This will be replaced with actual token verification
# For now, we'll use a placeholder
from authentication_capability import AuthenticationService
auth_service = get_auth_service()
user = auth_service.verify_token(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid or expired token")
return user
# Global inference service instance
inference_service = None
def get_inference_service():
"""Get or create the inference service instance"""
global inference_service
if inference_service is None:
model_path = os.getenv('MODEL_PATH', '/models/model.gguf')
inference_service = LocalInferenceService(model_path)
return inference_service
@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def chat_completion(
request: ChatCompletionRequest,
user=Depends(verify_api_token)
):
"""OpenAI-compatible chat completion endpoint"""
service = get_inference_service()
# Extract the last user message as the prompt
user_messages = [msg for msg in request.messages if msg.role == "user"]
if not user_messages:
raise HTTPException(status_code=400, detail="No user message found")
prompt = user_messages[-1].content
# Extract system message if present
system_messages = [msg for msg in request.messages if msg.role == "system"]
system_prompt = system_messages[0].content if system_messages else None
# Create domain inference request
inference_request = InferenceRequest(
prompt=prompt,
temperature=request.temperature,
max_tokens=request.max_tokens,
system_prompt=system_prompt
)
try:
# Execute inference
inference_response = service.generate(inference_request)
# Convert to OpenAI format
response = ChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4().hex[:8]}",
created=int(datetime.utcnow().timestamp()),
model=request.model,
choices=[{
"index": 0,
"message": {
"role": "assistant",
"content": inference_response.generated_text
},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": inference_response.prompt_tokens,
"completion_tokens": inference_response.completion_tokens,
"total_tokens": inference_response.total_tokens
}
)
return response
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Inference failed: {str(e)}")
@app.get("/v1/models")
async def list_models(user=Depends(verify_api_token)):
"""List available models"""
service = get_inference_service()
model_info = service.get_model_info()
return {
"object": "list",
"data": [{
"id": "local-llm",
"object": "model",
"created": int(datetime.fromisoformat(model_info['loaded_at']).timestamp()),
"owned_by": "local",
"permission": [],
"root": "local-llm",
"parent": None
}]
}
@app.get("/health")
async def health_check():
"""Health check endpoint for Kubernetes"""
service = get_inference_service()
health = service.health_check()
if health['status'] == 'healthy':
return health
else:
raise HTTPException(status_code=503, detail=health)
The Adaptation layer translates between the external OpenAI API format and our internal domain model. The chat_completion endpoint receives an OpenAI-compatible request, extracts the relevant information, creates an InferenceRequest domain object, calls the InferenceService, and translates the InferenceResponse back to OpenAI format.
This separation allows us to change our internal domain model without breaking the external API, or vice versa. The Adaptation layer acts as an anti-corruption layer protecting our domain from external concerns.
THE USER MANAGEMENT CAPABILITY
The User Management Capability handles user registration, email verification, and API token generation. This capability demonstrates how CCA handles stateful operations and external service integration.
Essence Layer: User Domain
The User domain defines entities and business rules related to user accounts:
import hashlib
import secrets
from datetime import datetime, timedelta
from enum import Enum
class UserStatus(Enum):
PENDING_VERIFICATION = "pending_verification"
ACTIVE = "active"
SUSPENDED = "suspended"
DELETED = "deleted"
class User:
def __init__(self, email, password_hash=None):
self.id = None
self.email = email
self.password_hash = password_hash
self.status = UserStatus.PENDING_VERIFICATION
self.created_at = None
self.verified_at = None
self.api_tokens = []
def validate_email(self):
"""Validate email format"""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, self.email):
raise ValueError("Invalid email format")
return True
def verify(self):
"""Mark user as verified"""
if self.status != UserStatus.PENDING_VERIFICATION:
raise ValueError("User is not in pending verification status")
self.status = UserStatus.ACTIVE
self.verified_at = datetime.utcnow()
def can_generate_token(self):
"""Check if user can generate API tokens"""
return self.status == UserStatus.ACTIVE
def suspend(self):
"""Suspend user account"""
self.status = UserStatus.SUSPENDED
class APIToken:
def __init__(self, user_id, name=None):
self.id = None
self.user_id = user_id
self.token = None
self.name = name or "Default Token"
self.created_at = None
self.expires_at = None
self.last_used_at = None
self.is_active = True
def generate_token(self):
"""Generate a secure random token"""
self.token = secrets.token_urlsafe(32)
self.created_at = datetime.utcnow()
# Tokens expire after 1 year
self.expires_at = self.created_at + timedelta(days=365)
return self.token
def is_valid(self):
"""Check if token is valid"""
if not self.is_active:
return False
if self.expires_at and datetime.utcnow() > self.expires_at:
return False
return True
def revoke(self):
"""Revoke this token"""
self.is_active = False
class EmailVerification:
def __init__(self, user_id, email):
self.id = None
self.user_id = user_id
self.email = email
self.verification_code = None
self.created_at = None
self.expires_at = None
self.verified_at = None
def generate_code(self):
"""Generate a verification code"""
self.verification_code = secrets.token_urlsafe(32)
self.created_at = datetime.utcnow()
# Verification codes expire after 24 hours
self.expires_at = self.created_at + timedelta(hours=24)
return self.verification_code
def is_valid(self):
"""Check if verification code is still valid"""
if self.verified_at:
return False
if datetime.utcnow() > self.expires_at:
return False
return True
def verify(self, code):
"""Verify the code"""
if not self.is_valid():
raise ValueError("Verification code has expired")
if self.verification_code != code:
raise ValueError("Invalid verification code")
self.verified_at = datetime.utcnow()
return True
These domain entities capture the business rules for user management. The User class knows that it must be verified before generating tokens. The APIToken class knows how to validate itself based on expiration and active status. The EmailVerification class manages the lifecycle of verification codes.
Now we define the service interfaces:
from abc import ABC, abstractmethod
class UserRepository(ABC):
@abstractmethod
def save(self, user):
"""Save a user to persistent storage"""
pass
@abstractmethod
def find_by_email(self, email):
"""Find a user by email address"""
pass
@abstractmethod
def find_by_id(self, user_id):
"""Find a user by ID"""
pass
class TokenRepository(ABC):
@abstractmethod
def save(self, token):
"""Save an API token"""
pass
@abstractmethod
def find_by_token(self, token_string):
"""Find a token by its string value"""
pass
@abstractmethod
def find_by_user(self, user_id):
"""Find all tokens for a user"""
pass
@abstractmethod
def delete(self, token_id):
"""Delete a token"""
pass
class EmailService(ABC):
@abstractmethod
def send_verification_email(self, email, verification_code):
"""Send a verification email"""
pass
class PasswordHasher(ABC):
@abstractmethod
def hash_password(self, password):
"""Hash a password"""
pass
@abstractmethod
def verify_password(self, password, password_hash):
"""Verify a password against its hash"""
pass
class UserManagementService:
def __init__(self, user_repo, token_repo, email_service, password_hasher):
self.user_repo = user_repo
self.token_repo = token_repo
self.email_service = email_service
self.password_hasher = password_hasher
self.verification_codes = {} # In production, use a repository
def register_user(self, email, password):
"""Register a new user"""
# Check if user already exists
existing_user = self.user_repo.find_by_email(email)
if existing_user:
raise ValueError("User with this email already exists")
# Create new user
user = User(email)
user.validate_email()
user.password_hash = self.password_hasher.hash_password(password)
# Save user
saved_user = self.user_repo.save(user)
# Create verification
verification = EmailVerification(saved_user.id, email)
code = verification.generate_code()
self.verification_codes[saved_user.id] = verification
# Send verification email
self.email_service.send_verification_email(email, code)
return saved_user
def verify_email(self, user_id, code):
"""Verify a user's email"""
verification = self.verification_codes.get(user_id)
if not verification:
raise ValueError("No verification found for this user")
verification.verify(code)
user = self.user_repo.find_by_id(user_id)
user.verify()
self.user_repo.save(user)
return user
def create_api_token(self, user_id, token_name=None):
"""Create an API token for a user"""
user = self.user_repo.find_by_id(user_id)
if not user:
raise ValueError("User not found")
if not user.can_generate_token():
raise ValueError("User cannot generate tokens")
token = APIToken(user_id, token_name)
token.generate_token()
saved_token = self.token_repo.save(token)
return saved_token
def list_user_tokens(self, user_id):
"""List all tokens for a user"""
return self.token_repo.find_by_user(user_id)
def revoke_token(self, token_id, user_id):
"""Revoke a token"""
token = self.token_repo.find_by_token(token_id)
if not token or token.user_id != user_id:
raise ValueError("Token not found or does not belong to user")
token.revoke()
self.token_repo.save(token)
The UserManagementService orchestrates the registration workflow. It depends on abstract interfaces for repositories, email sending, and password hashing. This allows the Essence layer to define the workflow without knowing about PostgreSQL, SMTP, or bcrypt.
Realization Layer: Concrete Implementations
Now we implement the concrete infrastructure components:
import bcrypt
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import psycopg2
from psycopg2.extras import RealDictCursor
from datetime import datetime
class BcryptPasswordHasher(PasswordHasher):
def hash_password(self, password):
"""Hash a password using bcrypt"""
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(self, password, password_hash):
"""Verify a password against its hash"""
return bcrypt.checkpw(
password.encode('utf-8'),
password_hash.encode('utf-8')
)
class SMTPEmailService(EmailService):
def __init__(self, smtp_host, smtp_port, smtp_user, smtp_password, from_email):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.smtp_user = smtp_user
self.smtp_password = smtp_password
self.from_email = from_email
def send_verification_email(self, email, verification_code):
"""Send verification email via SMTP"""
verification_url = f"https://your-domain.com/verify?code={verification_code}"
msg = MIMEMultipart('alternative')
msg['Subject'] = "Verify your email address"
msg['From'] = self.from_email
msg['To'] = email
text = f"""
Welcome to Local LLM API!
Please verify your email address by clicking the link below:
{verification_url}
This link will expire in 24 hours.
"""
html = f"""
<html>
<body>
<h2>Welcome to Local LLM API!</h2>
<p>Please verify your email address by clicking the button below:</p>
<a href="{verification_url}" style="background-color: #4CAF50; color: white; padding: 14px 20px; text-decoration: none; display: inline-block;">
Verify Email
</a>
<p>This link will expire in 24 hours.</p>
</body>
</html>
"""
part1 = MIMEText(text, 'plain')
part2 = MIMEText(html, 'html')
msg.attach(part1)
msg.attach(part2)
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.smtp_user, self.smtp_password)
server.send_message(msg)
class PostgreSQLUserRepository(UserRepository):
def __init__(self, connection_string):
self.connection_string = connection_string
def _get_connection(self):
return psycopg2.connect(self.connection_string)
def save(self, user):
"""Save user to PostgreSQL"""
conn = self._get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
if user.id is None:
# Insert new user
cur.execute("""
INSERT INTO users (email, password_hash, status, created_at)
VALUES (%s, %s, %s, %s)
RETURNING id, created_at
""", (user.email, user.password_hash, user.status.value, datetime.utcnow()))
result = cur.fetchone()
user.id = result['id']
user.created_at = result['created_at']
else:
# Update existing user
cur.execute("""
UPDATE users
SET email = %s, password_hash = %s, status = %s, verified_at = %s
WHERE id = %s
""", (user.email, user.password_hash, user.status.value, user.verified_at, user.id))
conn.commit()
return user
finally:
conn.close()
def find_by_email(self, email):
"""Find user by email"""
conn = self._get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, email, password_hash, status, created_at, verified_at
FROM users
WHERE email = %s
""", (email,))
row = cur.fetchone()
if not row:
return None
user = User(row['email'], row['password_hash'])
user.id = row['id']
user.status = UserStatus(row['status'])
user.created_at = row['created_at']
user.verified_at = row['verified_at']
return user
finally:
conn.close()
def find_by_id(self, user_id):
"""Find user by ID"""
conn = self._get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, email, password_hash, status, created_at, verified_at
FROM users
WHERE id = %s
""", (user_id,))
row = cur.fetchone()
if not row:
return None
user = User(row['email'], row['password_hash'])
user.id = row['id']
user.status = UserStatus(row['status'])
user.created_at = row['created_at']
user.verified_at = row['verified_at']
return user
finally:
conn.close()
class PostgreSQLTokenRepository(TokenRepository):
def __init__(self, connection_string):
self.connection_string = connection_string
def _get_connection(self):
return psycopg2.connect(self.connection_string)
def save(self, token):
"""Save API token to PostgreSQL"""
conn = self._get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
if token.id is None:
cur.execute("""
INSERT INTO api_tokens (user_id, token, name, created_at, expires_at, is_active)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (token.user_id, token.token, token.name, token.created_at,
token.expires_at, token.is_active))
result = cur.fetchone()
token.id = result['id']
else:
cur.execute("""
UPDATE api_tokens
SET is_active = %s, last_used_at = %s
WHERE id = %s
""", (token.is_active, token.last_used_at, token.id))
conn.commit()
return token
finally:
conn.close()
def find_by_token(self, token_string):
"""Find token by its string value"""
conn = self._get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, user_id, token, name, created_at, expires_at, last_used_at, is_active
FROM api_tokens
WHERE token = %s
""", (token_string,))
row = cur.fetchone()
if not row:
return None
token = APIToken(row['user_id'], row['name'])
token.id = row['id']
token.token = row['token']
token.created_at = row['created_at']
token.expires_at = row['expires_at']
token.last_used_at = row['last_used_at']
token.is_active = row['is_active']
return token
finally:
conn.close()
def find_by_user(self, user_id):
"""Find all tokens for a user"""
conn = self._get_connection()
try:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT id, user_id, token, name, created_at, expires_at, last_used_at, is_active
FROM api_tokens
WHERE user_id = %s
ORDER BY created_at DESC
""", (user_id,))
rows = cur.fetchall()
tokens = []
for row in rows:
token = APIToken(row['user_id'], row['name'])
token.id = row['id']
token.token = row['token']
token.created_at = row['created_at']
token.expires_at = row['expires_at']
token.last_used_at = row['last_used_at']
token.is_active = row['is_active']
tokens.append(token)
return tokens
finally:
conn.close()
def delete(self, token_id):
"""Delete a token"""
conn = self._get_connection()
try:
with conn.cursor() as cur:
cur.execute("DELETE FROM api_tokens WHERE id = %s", (token_id,))
conn.commit()
finally:
conn.close()
These Realization implementations handle the actual infrastructure work. They depend on the Essence interfaces but the Essence never depends on them. This allows us to swap PostgreSQL for MongoDB, or SMTP for SendGrid, without touching our domain logic.
Adaptation Layer: Web Interface
The Adaptation layer provides a web interface for user registration and token management:
from fastapi import FastAPI, Form, Request, HTTPException, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import os
web_app = FastAPI()
templates = Jinja2Templates(directory="templates")
# Dependency injection
def get_user_management_service():
"""Get or create user management service"""
db_connection = os.getenv('DATABASE_URL')
smtp_host = os.getenv('SMTP_HOST')
smtp_port = int(os.getenv('SMTP_PORT', 587))
smtp_user = os.getenv('SMTP_USER')
smtp_password = os.getenv('SMTP_PASSWORD')
from_email = os.getenv('FROM_EMAIL')
user_repo = PostgreSQLUserRepository(db_connection)
token_repo = PostgreSQLTokenRepository(db_connection)
email_service = SMTPEmailService(smtp_host, smtp_port, smtp_user, smtp_password, from_email)
password_hasher = BcryptPasswordHasher()
return UserManagementService(user_repo, token_repo, email_service, password_hasher)
@web_app.get("/", response_class=HTMLResponse)
async def home(request: Request):
"""Home page"""
return templates.TemplateResponse("home.html", {"request": request})
@web_app.get("/register", response_class=HTMLResponse)
async def register_page(request: Request):
"""Registration page"""
return templates.TemplateResponse("register.html", {"request": request})
@web_app.post("/register")
async def register(
email: str = Form(...),
password: str = Form(...),
confirm_password: str = Form(...),
service: UserManagementService = Depends(get_user_management_service)
):
"""Handle registration form submission"""
if password != confirm_password:
raise HTTPException(status_code=400, detail="Passwords do not match")
if len(password) < 8:
raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
try:
user = service.register_user(email, password)
return RedirectResponse(url="/register/success", status_code=303)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@web_app.get("/register/success", response_class=HTMLResponse)
async def register_success(request: Request):
"""Registration success page"""
return templates.TemplateResponse("register_success.html", {"request": request})
@web_app.get("/verify")
async def verify_email(
code: str,
service: UserManagementService = Depends(get_user_management_service)
):
"""Verify email with code from URL"""
try:
# In a real implementation, you'd extract user_id from the code
# For now, this is simplified
user = service.verify_email(user_id, code)
return RedirectResponse(url="/verify/success", status_code=303)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@web_app.get("/dashboard", response_class=HTMLResponse)
async def dashboard(request: Request):
"""User dashboard showing API tokens"""
# In production, this would require authentication
return templates.TemplateResponse("dashboard.html", {"request": request})
@web_app.post("/tokens/create")
async def create_token(
name: str = Form(...),
user_id: int = Form(...), # In production, get from session
service: UserManagementService = Depends(get_user_management_service)
):
"""Create a new API token"""
try:
token = service.create_api_token(user_id, name)
return {"token": token.token, "name": token.name}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@web_app.get("/tokens")
async def list_tokens(
user_id: int, # In production, get from session
service: UserManagementService = Depends(get_user_management_service)
):
"""List all tokens for the current user"""
tokens = service.list_user_tokens(user_id)
return {
"tokens": [{
"id": t.id,
"name": t.name,
"created_at": t.created_at.isoformat(),
"last_used_at": t.last_used_at.isoformat() if t.last_used_at else None,
"is_active": t.is_active
} for t in tokens]
}
@web_app.post("/tokens/{token_id}/revoke")
async def revoke_token(
token_id: int,
user_id: int = Form(...), # In production, get from session
service: UserManagementService = Depends(get_user_management_service)
):
"""Revoke an API token"""
try:
service.revoke_token(token_id, user_id)
return {"status": "success"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
The web interface Adaptation layer translates between HTML forms and our domain model. It handles HTTP concerns like form parsing, redirects, and template rendering, while delegating business logic to the UserManagementService.
THE AUTHENTICATION CAPABILITY
The Authentication Capability validates API tokens for incoming requests. This is a small but critical capability that other capabilities depend on.
Essence Layer
from abc import ABC, abstractmethod
from datetime import datetime
class AuthenticationResult:
def __init__(self, is_authenticated, user=None, reason=None):
self.is_authenticated = is_authenticated
self.user = user
self.reason = reason
class AuthenticationService(ABC):
@abstractmethod
def verify_token(self, token_string):
"""Verify an API token and return the associated user"""
pass
@abstractmethod
def authenticate_request(self, token_string):
"""Authenticate a request and return authentication result"""
pass
Realization Layer
class TokenAuthenticationService(AuthenticationService):
def __init__(self, token_repo, user_repo):
self.token_repo = token_repo
self.user_repo = user_repo
def verify_token(self, token_string):
"""Verify token and return user if valid"""
token = self.token_repo.find_by_token(token_string)
if not token:
return None
if not token.is_valid():
return None
# Update last used timestamp
token.last_used_at = datetime.utcnow()
self.token_repo.save(token)
# Get user
user = self.user_repo.find_by_id(token.user_id)
return user
def authenticate_request(self, token_string):
"""Authenticate a request"""
if not token_string:
return AuthenticationResult(
is_authenticated=False,
reason="No token provided"
)
user = self.verify_token(token_string)
if not user:
return AuthenticationResult(
is_authenticated=False,
reason="Invalid or expired token"
)
if user.status != UserStatus.ACTIVE:
return AuthenticationResult(
is_authenticated=False,
reason=f"User account is {user.status.value}"
)
return AuthenticationResult(
is_authenticated=True,
user=user
)
The Authentication Capability is deliberately simple. It has one job: verify tokens. By keeping it focused, we make it easy to test, maintain, and reason about.
DATABASE SCHEMA
The system requires a PostgreSQL database with the following schema:
-- Users table
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending_verification',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
verified_at TIMESTAMP,
CONSTRAINT email_format CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$')
);
-- API tokens table
CREATE TABLE api_tokens (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
last_used_at TIMESTAMP,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
INDEX idx_token (token),
INDEX idx_user_id (user_id)
);
-- Conversation history table
CREATE TABLE conversations (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(255),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id)
);
-- Messages table
CREATE TABLE messages (
id SERIAL PRIMARY KEY,
conversation_id INTEGER NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
prompt_tokens INTEGER,
completion_tokens INTEGER,
INDEX idx_conversation_id (conversation_id)
);
-- Email verifications table
CREATE TABLE email_verifications (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
verification_code VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
verified_at TIMESTAMP,
INDEX idx_verification_code (verification_code)
);
CONTAINERIZATION
The system deploys as Docker containers. We need two main containers: one for the LLM inference service and one for the user management web interface.
LLM Inference Dockerfile
FROM nvidia/cuda:12.1.0-devel-ubuntu22.04
# Install Python and dependencies
RUN apt-get update && apt-get install -y \
python3.10 \
python3-pip \
git \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements
COPY requirements-inference.txt .
# Install Python packages
RUN pip3 install --no-cache-dir -r requirements-inference.txt
# Install llama-cpp-python with CUDA support
RUN CMAKE_ARGS="-DLLAMA_CUBLAS=on" pip3 install llama-cpp-python --force-reinstall --no-cache-dir
# Copy application code
COPY inference_capability/ ./inference_capability/
COPY main_inference.py .
# Create directory for models
RUN mkdir -p /models
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD python3 -c "import requests; requests.get('http://localhost:8000/health')"
# Run the application
CMD ["python3", "main_inference.py"]
Web Interface Dockerfile
FROM python:3.10-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements
COPY requirements-web.txt .
# Install Python packages
RUN pip install --no-cache-dir -r requirements-web.txt
# Copy application code
COPY user_management_capability/ ./user_management_capability/
COPY authentication_capability/ ./authentication_capability/
COPY templates/ ./templates/
COPY static/ ./static/
COPY main_web.py .
# Expose port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8080/health')"
# Run the application
CMD ["python", "main_web.py"]
Requirements Files
requirements-inference.txt:
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
llama-cpp-python==0.2.20
psycopg2-binary==2.9.9
python-multipart==0.0.6
requirements-web.txt:
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
jinja2==3.1.2
python-multipart==0.0.6
bcrypt==4.1.1
psycopg2-binary==2.9.9
KUBERNETES DEPLOYMENT
The system deploys to Kubernetes with separate deployments for each service.
Namespace
apiVersion: v1
kind: Namespace
metadata:
name: local-llm
ConfigMap for Configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: llm-config
namespace: local-llm
data:
MODEL_PATH: "/models/mistral-7b-instruct-v0.2.Q4_K_M.gguf"
DATABASE_HOST: "postgresql"
DATABASE_PORT: "5432"
DATABASE_NAME: "llm_api"
SMTP_HOST: "smtp.gmail.com"
SMTP_PORT: "587"
FROM_EMAIL: "noreply@your-domain.com"
Secrets
apiVersion: v1
kind: Secret
metadata:
name: llm-secrets
namespace: local-llm
type: Opaque
stringData:
DATABASE_USER: "llm_user"
DATABASE_PASSWORD: "your-secure-password"
SMTP_USER: "your-smtp-user"
SMTP_PASSWORD: "your-smtp-password"
PostgreSQL Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgresql
namespace: local-llm
spec:
replicas: 1
selector:
matchLabels:
app: postgresql
template:
metadata:
labels:
app: postgresql
spec:
containers:
- name: postgresql
image: postgres:15
ports:
- containerPort: 5432
env:
- name: POSTGRES_DB
valueFrom:
configMapKeyRef:
name: llm-config
key: DATABASE_NAME
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: llm-secrets
key: DATABASE_USER
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: llm-secrets
key: DATABASE_PASSWORD
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
volumes:
- name: postgres-storage
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
name: postgresql
namespace: local-llm
spec:
selector:
app: postgresql
ports:
- port: 5432
targetPort: 5432
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-pvc
namespace: local-llm
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
LLM Inference Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-inference
namespace: local-llm
spec:
replicas: 1
selector:
matchLabels:
app: llm-inference
template:
metadata:
labels:
app: llm-inference
spec:
containers:
- name: inference
image: your-registry/llm-inference:latest
ports:
- containerPort: 8000
env:
- name: MODEL_PATH
valueFrom:
configMapKeyRef:
name: llm-config
key: MODEL_PATH
- name: DATABASE_URL
value: "postgresql://$(DATABASE_USER):$(DATABASE_PASSWORD)@$(DATABASE_HOST):$(DATABASE_PORT)/$(DATABASE_NAME)"
envFrom:
- configMapRef:
name: llm-config
- secretRef:
name: llm-secrets
resources:
limits:
nvidia.com/gpu: 1
memory: "16Gi"
cpu: "4"
requests:
nvidia.com/gpu: 1
memory: "8Gi"
cpu: "2"
volumeMounts:
- name: model-storage
mountPath: /models
readOnly: true
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-pvc
nodeSelector:
gpu: "true"
---
apiVersion: v1
kind: Service
metadata:
name: llm-inference
namespace: local-llm
spec:
selector:
app: llm-inference
ports:
- port: 8000
targetPort: 8000
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: model-pvc
namespace: local-llm
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 20Gi
Web Interface Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-web
namespace: local-llm
spec:
replicas: 2
selector:
matchLabels:
app: llm-web
template:
metadata:
labels:
app: llm-web
spec:
containers:
- name: web
image: your-registry/llm-web:latest
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
value: "postgresql://$(DATABASE_USER):$(DATABASE_PASSWORD)@$(DATABASE_HOST):$(DATABASE_PORT)/$(DATABASE_NAME)"
- name: INFERENCE_SERVICE_URL
value: "http://llm-inference:8000"
envFrom:
- configMapRef:
name: llm-config
- secretRef:
name: llm-secrets
resources:
limits:
memory: "512Mi"
cpu: "500m"
requests:
memory: "256Mi"
cpu: "250m"
---
apiVersion: v1
kind: Service
metadata:
name: llm-web
namespace: local-llm
spec:
selector:
app: llm-web
ports:
- port: 8080
targetPort: 8080
Ingress Configuration
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: llm-ingress
namespace: local-llm
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
ingressClassName: nginx
tls:
- hosts:
- api.your-domain.com
- app.your-domain.com
secretName: llm-tls
rules:
- host: api.your-domain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: llm-inference
port:
number: 8000
- host: app.your-domain.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: llm-web
port:
number: 8080
MAIN APPLICATION ENTRY POINTS
Main Inference Service
# main_inference.py
import os
import uvicorn
from fastapi import FastAPI
from inference_capability.adaptation.api import app as inference_app
from inference_capability.realization.local_inference import LocalInferenceService
from authentication_capability.realization.token_auth import TokenAuthenticationService
from user_management_capability.realization.repositories import (
PostgreSQLUserRepository,
PostgreSQLTokenRepository
)
# Initialize repositories
db_url = os.getenv('DATABASE_URL')
user_repo = PostgreSQLUserRepository(db_url)
token_repo = PostgreSQLTokenRepository(db_url)
# Initialize authentication service
auth_service = TokenAuthenticationService(token_repo, user_repo)
# Initialize inference service
model_path = os.getenv('MODEL_PATH', '/models/model.gguf')
inference_service = LocalInferenceService(model_path)
# Make services available to the app
inference_app.state.inference_service = inference_service
inference_app.state.auth_service = auth_service
if __name__ == "__main__":
uvicorn.run(
inference_app,
host="0.0.0.0",
port=8000,
log_level="info"
)
Main Web Service
# main_web.py
import os
import uvicorn
from user_management_capability.adaptation.web import web_app
from user_management_capability.essence.services import UserManagementService
from user_management_capability.realization.repositories import (
PostgreSQLUserRepository,
PostgreSQLTokenRepository
)
from user_management_capability.realization.email import SMTPEmailService
from user_management_capability.realization.password import BcryptPasswordHasher
# Initialize infrastructure components
db_url = os.getenv('DATABASE_URL')
user_repo = PostgreSQLUserRepository(db_url)
token_repo = PostgreSQLTokenRepository(db_url)
smtp_config = {
'host': os.getenv('SMTP_HOST'),
'port': int(os.getenv('SMTP_PORT', 587)),
'user': os.getenv('SMTP_USER'),
'password': os.getenv('SMTP_PASSWORD'),
'from_email': os.getenv('FROM_EMAIL')
}
email_service = SMTPEmailService(**smtp_config)
password_hasher = BcryptPasswordHasher()
# Initialize user management service
user_service = UserManagementService(
user_repo,
token_repo,
email_service,
password_hasher
)
# Make service available to the app
web_app.state.user_service = user_service
if __name__ == "__main__":
uvicorn.run(
web_app,
host="0.0.0.0",
port=8080,
log_level="info"
)
CLIENT USAGE EXAMPLES
Python Client
import requests
class LLMClient:
def __init__(self, api_url, api_token):
self.api_url = api_url
self.api_token = api_token
self.headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
def chat(self, messages, temperature=0.7, max_tokens=512):
"""Send a chat completion request"""
response = requests.post(
f"{self.api_url}/v1/chat/completions",
headers=self.headers,
json={
"model": "local-llm",
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
)
response.raise_for_status()
return response.json()
def list_models(self):
"""List available models"""
response = requests.get(
f"{self.api_url}/v1/models",
headers=self.headers
)
response.raise_for_status()
return response.json()
# Usage example
client = LLMClient(
api_url="https://api.your-domain.com",
api_token="your-api-token-here"
)
# Simple chat
response = client.chat([
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"}
])
print(response['choices'][0]['message']['content'])
# Multi-turn conversation
conversation = [
{"role": "system", "content": "You are a helpful coding assistant."},
{"role": "user", "content": "How do I reverse a string in Python?"}
]
response = client.chat(conversation)
assistant_message = response['choices'][0]['message']['content']
print(assistant_message)
# Continue the conversation
conversation.append({"role": "assistant", "content": assistant_message})
conversation.append({"role": "user", "content": "Can you show me a more efficient way?"})
response = client.chat(conversation)
print(response['choices'][0]['message']['content'])
JavaScript Client
class LLMClient {
constructor(apiUrl, apiToken) {
this.apiUrl = apiUrl;
this.apiToken = apiToken;
}
async chat(messages, temperature = 0.7, maxTokens = 512) {
const response = await fetch(`${this.apiUrl}/v1/chat/completions`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: 'local-llm',
messages: messages,
temperature: temperature,
max_tokens: maxTokens
})
});
if (!response.ok) {
throw new Error(`API request failed: ${response.statusText}`);
}
return await response.json();
}
async listModels() {
const response = await fetch(`${this.apiUrl}/v1/models`, {
headers: {
'Authorization': `Bearer ${this.apiToken}`
}
});
if (!response.ok) {
throw new Error(`API request failed: ${response.statusText}`);
}
return await response.json();
}
}
// Usage example
const client = new LLMClient(
'https://api.your-domain.com',
'your-api-token-here'
);
// Simple chat
const response = await client.chat([
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'What is the capital of France?' }
]);
console.log(response.choices[0].message.content);
Curl Example
# Chat completion
curl -X POST https://api.your-domain.com/v1/chat/completions \
-H "Authorization: Bearer your-api-token-here" \
-H "Content-Type: application/json" \
-d '{
"model": "local-llm",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"}
],
"temperature": 0.7,
"max_tokens": 512
}'
# List models
curl https://api.your-domain.com/v1/models \
-H "Authorization: Bearer your-api-token-here"
TESTING STRATEGY
The CCA structure makes testing straightforward. We can test each layer independently.
Testing the Essence Layer
import unittest
from inference_capability.essence.models import InferenceRequest, InferenceResponse
class TestInferenceRequest(unittest.TestCase):
def test_valid_request(self):
"""Test that valid requests pass validation"""
request = InferenceRequest(
prompt="Hello, world!",
temperature=0.7,
max_tokens=100
)
self.assertTrue(request.validate())
def test_empty_prompt_fails(self):
"""Test that empty prompts fail validation"""
request = InferenceRequest(prompt="")
with self.assertRaises(ValueError):
request.validate()
def test_invalid_temperature_fails(self):
"""Test that invalid temperature fails validation"""
request = InferenceRequest(prompt="Test", temperature=3.0)
with self.assertRaises(ValueError):
request.validate()
def test_invalid_max_tokens_fails(self):
"""Test that invalid max_tokens fails validation"""
request = InferenceRequest(prompt="Test", max_tokens=0)
with self.assertRaises(ValueError):
request.validate()
class TestUser(unittest.TestCase):
def test_email_validation(self):
"""Test email validation"""
user = User("valid@example.com")
self.assertTrue(user.validate_email())
user = User("invalid-email")
with self.assertRaises(ValueError):
user.validate_email()
def test_user_verification(self):
"""Test user verification workflow"""
user = User("test@example.com")
self.assertEqual(user.status, UserStatus.PENDING_VERIFICATION)
user.verify()
self.assertEqual(user.status, UserStatus.ACTIVE)
self.assertIsNotNone(user.verified_at)
def test_cannot_verify_twice(self):
"""Test that user cannot be verified twice"""
user = User("test@example.com")
user.verify()
with self.assertRaises(ValueError):
user.verify()
def test_can_generate_token_only_when_active(self):
"""Test token generation rules"""
user = User("test@example.com")
self.assertFalse(user.can_generate_token())
user.verify()
self.assertTrue(user.can_generate_token())
user.suspend()
self.assertFalse(user.can_generate_token())
class TestAPIToken(unittest.TestCase):
def test_token_generation(self):
"""Test token generation"""
token = APIToken(user_id=1)
token_string = token.generate_token()
self.assertIsNotNone(token_string)
self.assertGreater(len(token_string), 20)
self.assertIsNotNone(token.created_at)
self.assertIsNotNone(token.expires_at)
def test_token_validation(self):
"""Test token validation"""
token = APIToken(user_id=1)
token.generate_token()
self.assertTrue(token.is_valid())
token.revoke()
self.assertFalse(token.is_valid())
Testing the Realization Layer with Mocks
import unittest
from unittest.mock import Mock, patch
from user_management_capability.essence.services import UserManagementService
from user_management_capability.essence.models import User
class TestUserManagementService(unittest.TestCase):
def setUp(self):
"""Set up test dependencies"""
self.user_repo = Mock()
self.token_repo = Mock()
self.email_service = Mock()
self.password_hasher = Mock()
self.service = UserManagementService(
self.user_repo,
self.token_repo,
self.email_service,
self.password_hasher
)
def test_register_new_user(self):
"""Test registering a new user"""
# Setup mocks
self.user_repo.find_by_email.return_value = None
self.password_hasher.hash_password.return_value = "hashed_password"
saved_user = User("test@example.com")
saved_user.id = 1
self.user_repo.save.return_value = saved_user
# Execute
user = self.service.register_user("test@example.com", "password123")
# Verify
self.assertEqual(user.email, "test@example.com")
self.user_repo.save.assert_called_once()
self.email_service.send_verification_email.assert_called_once()
def test_cannot_register_duplicate_email(self):
"""Test that duplicate emails are rejected"""
# Setup mocks
existing_user = User("test@example.com")
self.user_repo.find_by_email.return_value = existing_user
# Execute and verify
with self.assertRaises(ValueError):
self.service.register_user("test@example.com", "password123")
def test_create_token_for_verified_user(self):
"""Test creating token for verified user"""
# Setup mocks
user = User("test@example.com")
user.id = 1
user.verify()
self.user_repo.find_by_id.return_value = user
token = APIToken(1)
token.id = 1
token.generate_token()
self.token_repo.save.return_value = token
# Execute
created_token = self.service.create_api_token(1, "My Token")
# Verify
self.assertIsNotNone(created_token)
self.token_repo.save.assert_called_once()
def test_cannot_create_token_for_unverified_user(self):
"""Test that unverified users cannot create tokens"""
# Setup mocks
user = User("test@example.com")
user.id = 1
# User is not verified
self.user_repo.find_by_id.return_value = user
# Execute and verify
with self.assertRaises(ValueError):
self.service.create_api_token(1, "My Token")
Integration Testing
import unittest
import psycopg2
from testcontainers.postgres import PostgresContainer
from user_management_capability.realization.repositories import PostgreSQLUserRepository
from user_management_capability.essence.models import User
class TestPostgreSQLUserRepository(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Start a PostgreSQL container for testing"""
cls.postgres = PostgresContainer("postgres:15")
cls.postgres.start()
# Create schema
conn = psycopg2.connect(cls.postgres.get_connection_url())
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending_verification',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
verified_at TIMESTAMP
)
""")
conn.commit()
conn.close()
@classmethod
def tearDownClass(cls):
"""Stop the PostgreSQL container"""
cls.postgres.stop()
def setUp(self):
"""Create repository instance"""
self.repo = PostgreSQLUserRepository(self.postgres.get_connection_url())
def tearDown(self):
"""Clean up database"""
conn = psycopg2.connect(self.postgres.get_connection_url())
with conn.cursor() as cur:
cur.execute("DELETE FROM users")
conn.commit()
conn.close()
def test_save_and_find_user(self):
"""Test saving and retrieving a user"""
user = User("test@example.com", "hashed_password")
# Save
saved_user = self.repo.save(user)
self.assertIsNotNone(saved_user.id)
# Find by email
found_user = self.repo.find_by_email("test@example.com")
self.assertIsNotNone(found_user)
self.assertEqual(found_user.email, "test@example.com")
# Find by ID
found_by_id = self.repo.find_by_id(saved_user.id)
self.assertIsNotNone(found_by_id)
self.assertEqual(found_by_id.id, saved_user.id)
def test_update_user(self):
"""Test updating a user"""
user = User("test@example.com", "hashed_password")
saved_user = self.repo.save(user)
# Verify user
saved_user.verify()
updated_user = self.repo.save(saved_user)
# Retrieve and check
found_user = self.repo.find_by_id(saved_user.id)
self.assertEqual(found_user.status, UserStatus.ACTIVE)
self.assertIsNotNone(found_user.verified_at)
MONITORING AND OBSERVABILITY
Production systems require monitoring. We add observability through Prometheus metrics and structured logging.
Metrics Collection
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time
# Define metrics
inference_requests_total = Counter(
'inference_requests_total',
'Total number of inference requests',
['status']
)
inference_duration_seconds = Histogram(
'inference_duration_seconds',
'Time spent processing inference requests',
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
inference_tokens_total = Counter(
'inference_tokens_total',
'Total number of tokens processed',
['type']
)
active_users_gauge = Gauge(
'active_users',
'Number of active users'
)
# Middleware to collect metrics
@app.middleware("http")
async def metrics_middleware(request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
if request.url.path.startswith("/v1/chat/completions"):
inference_duration_seconds.observe(duration)
if response.status_code == 200:
inference_requests_total.labels(status='success').inc()
else:
inference_requests_total.labels(status='error').inc()
return response
# Metrics endpoint
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return Response(content=generate_latest(), media_type="text/plain")
Structured Logging
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def _log(self, level, message, **kwargs):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': level,
'message': message,
**kwargs
}
self.logger.log(getattr(logging, level), json.dumps(log_entry))
def info(self, message, **kwargs):
self._log('INFO', message, **kwargs)
def error(self, message, **kwargs):
self._log('ERROR', message, **kwargs)
def warning(self, message, **kwargs):
self._log('WARNING', message, **kwargs)
# Usage in inference service
logger = StructuredLogger('inference')
class LocalInferenceService(InferenceService):
def generate(self, request):
logger.info(
"Inference request received",
prompt_length=len(request.prompt),
temperature=request.temperature,
max_tokens=request.max_tokens
)
try:
response = self._do_inference(request)
logger.info(
"Inference completed successfully",
prompt_tokens=response.prompt_tokens,
completion_tokens=response.completion_tokens,
total_tokens=response.total_tokens
)
return response
except Exception as e:
logger.error(
"Inference failed",
error=str(e),
error_type=type(e).__name__
)
raise
EVOLUTION ENVELOPES AND VERSIONING
As the system evolves, we need strategies to manage changes without breaking existing clients. Evolution Envelopes provide this capability.
API Versioning
from fastapi import APIRouter
# Version 1 API
v1_router = APIRouter(prefix="/v1")
@v1_router.post("/chat/completions")
async def chat_completion_v1(request: ChatCompletionRequest):
"""Version 1 of chat completions API"""
# Implementation
pass
# Version 2 API with enhanced features
v2_router = APIRouter(prefix="/v2")
class ChatCompletionRequestV2(ChatCompletionRequest):
"""Enhanced request with additional features"""
conversation_id: Optional[str] = None
user_id: Optional[str] = None
metadata: Optional[dict] = None
@v2_router.post("/chat/completions")
async def chat_completion_v2(request: ChatCompletionRequestV2):
"""Version 2 with conversation tracking"""
# Enhanced implementation
pass
# Register both versions
app.include_router(v1_router)
app.include_router(v2_router)
Database Migrations
# migrations/001_initial_schema.py
def upgrade(conn):
"""Apply migration"""
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending_verification',
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
verified_at TIMESTAMP
)
""")
conn.commit()
def downgrade(conn):
"""Revert migration"""
with conn.cursor() as cur:
cur.execute("DROP TABLE users")
conn.commit()
# migrations/002_add_user_metadata.py
def upgrade(conn):
"""Add metadata column to users"""
with conn.cursor() as cur:
cur.execute("""
ALTER TABLE users
ADD COLUMN metadata JSONB DEFAULT '{}'::jsonb
""")
conn.commit()
def downgrade(conn):
"""Remove metadata column"""
with conn.cursor() as cur:
cur.execute("ALTER TABLE users DROP COLUMN metadata")
conn.commit()
DEPLOYMENT CHECKLIST
Before deploying to production, ensure the following:
Security
- All secrets stored in Kubernetes Secrets, not ConfigMaps
- TLS certificates configured for all external endpoints
- Database connections use SSL
- API rate limiting implemented
- Input validation on all endpoints
- SQL injection prevention (parameterized queries)
- CORS properly configured
Reliability
- Health checks configured for all containers
- Liveness and readiness probes set
- Resource limits defined
- Horizontal pod autoscaling configured
- Database backups automated
- Disaster recovery plan documented
Performance
- GPU utilization monitored
- Connection pooling for database
- Caching strategy implemented
- Load testing completed
- Performance baselines established
Observability
- Prometheus metrics exported
- Grafana dashboards created
- Log aggregation configured
- Alerting rules defined
- Distributed tracing implemented
Operations
- CI/CD pipeline configured
- Rolling update strategy defined
- Rollback procedure documented
- Runbooks created for common issues
- On-call rotation established
CONCLUSION
This tutorial has demonstrated how to build a production-ready local LLM system using Capability-Centric Architecture. We have seen how CCA principles guide us to create a system that is:
- Maintainable: Each capability has clear boundaries and responsibilities
- Testable: Pure domain logic can be tested without infrastructure
- Flexible: Infrastructure can be swapped without changing business logic
- Scalable: Capabilities can be deployed and scaled independently
- Evolvable: New versions can coexist with old versions
The key insights from CCA that made this possible:
- Essence-Realization-Adaptation separation keeps domain logic pure
- Dependency inversion allows infrastructure to be pluggable
- Contracts define clear interfaces between capabilities
- Evolution Envelopes manage change over time
- Efficiency Gradients allow optimization where needed
This architecture scales from a single developer running locally to a production deployment serving thousands of users. The same code structure works whether you are using NVIDIA GPUs, AMD GPUs, or CPU-only inference. The same domain logic works whether you store data in PostgreSQL, MongoDB, or in-memory structures.
By following CCA principles, we have built a system that will serve us well as requirements evolve, technologies change, and scale increases.
No comments:
Post a Comment