Monday, November 24, 2025

BUILDING A LOCAL LLM SYSTEM WITH CAPABILITY-CENTRIC ARCHITECTURE





INTRODUCTION

This tutorial presents a complete guide to building a sophisticated local Large Language Model system using Capability-Centric Architecture. The system we will construct provides an OpenAI-compatible API for serving LLM queries, complete with user registration, authentication, conversation history, and multi-GPU support. The architecture follows CCA principles to ensure clean separation of concerns, testability, and maintainability.

The Capability-Centric Architecture extends concepts from Domain-Driven Design, Hexagonal Architecture, and Clean Architecture. In CCA, we structure systems as collections of capabilities, where each capability represents a cohesive unit of functionality. Every capability consists of three fundamental layers: the Essence contains pure domain logic free from infrastructure concerns, the Realization provides infrastructure-specific implementations, and the Adaptation layer handles external interfaces. These layers communicate through well-defined Contracts, and the system manages evolution through Evolution Envelopes.

Our system will demonstrate how CCA principles apply to a real-world scenario involving containerization, distributed systems, GPU acceleration, authentication, and persistent storage. We will see how the architecture naturally accommodates different GPU backends including NVIDIA CUDA, AMD ROCm, Intel GPUs, and Apple Metal Performance Shaders without polluting our domain logic with hardware-specific concerns.

SYSTEM OVERVIEW

The system comprises several major capabilities working together to provide a complete LLM service. The LLM Inference Capability handles the core responsibility of processing natural language queries and generating responses. The User Management Capability manages user registration, authentication, and API token lifecycle. The Conversation History Capability maintains context across multiple interactions. The Authentication Capability validates API tokens and ensures secure access. The Email Confirmation Capability handles user verification through email workflows.

Each capability operates independently yet collaborates through well-defined contracts. The system deploys within Kubernetes, with the LLM inference running in containers that can leverage available GPU resources. A separate container handles user management, registration, and the web interface. An ingress controller routes external traffic to appropriate services.

The client applications interact with the system through two primary interfaces: an OpenAI-compatible REST API for LLM queries and a web interface for user registration and token management. The API requires authentication via tokens that users obtain through the registration process.

CAPABILITY-CENTRIC ARCHITECTURE FUNDAMENTALS

Before diving into implementation details, we must understand how CCA structures each capability. The Essence layer contains the pure domain logic, representing what the capability does without any concern for how it does it. This layer contains domain entities, value objects, business rules, and domain services. The Essence has no dependencies on frameworks, databases, or external systems.

The Realization layer implements the infrastructure-specific concerns. This is where we find database access, GPU integration, email sending, and other technical implementations. The Realization depends on the Essence but the Essence never depends on the Realization. This dependency inversion ensures that business logic remains pure and testable.

The Adaptation layer provides interfaces to the outside world. This includes REST API endpoints, message queue consumers, scheduled tasks, and other entry points. The Adaptation layer depends on both Essence and Realization, orchestrating their collaboration to fulfill external requests.

Contracts define the interfaces between capabilities and between layers within a capability. These contracts are expressed as abstract interfaces in the Essence layer, with concrete implementations in the Realization layer. This allows the Essence to define what it needs without knowing how those needs will be fulfilled.

Evolution Envelopes manage how capabilities change over time. They provide versioning strategies, backward compatibility mechanisms, and migration paths. In our system, we will see Evolution Envelopes in action when we version our API endpoints and manage database schema migrations.

Efficiency Gradients allow us to optimize critical paths while maintaining clean architecture for non-critical paths. In our LLM system, the inference path represents a critical performance path where we might allow tighter coupling to GPU libraries, while the user registration path can afford more abstraction layers.

THE LLM INFERENCE CAPABILITY

The LLM Inference Capability represents the heart of our system. Its Essence defines what it means to process a language model query without concerning itself with specific model formats, GPU types, or API protocols.

Essence Layer: Domain Model

The Essence defines several key domain entities. The InferenceRequest represents a query to the language model, containing the prompt text, generation parameters like temperature and maximum tokens, and optional conversation context. The InferenceResponse contains the generated text, token usage statistics, and metadata about the generation process.

Here is how we define these core domain entities:

class InferenceRequest:
    def __init__(self, prompt, temperature=0.7, max_tokens=512, 
                 conversation_id=None, system_prompt=None):
        self.prompt = prompt
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.conversation_id = conversation_id
        self.system_prompt = system_prompt
        self.timestamp = None
        
    def validate(self):
        if not self.prompt or len(self.prompt.strip()) == 0:
            raise ValueError("Prompt cannot be empty")
        if self.temperature < 0 or self.temperature > 2:
            raise ValueError("Temperature must be between 0 and 2")
        if self.max_tokens < 1 or self.max_tokens > 4096:
            raise ValueError("Max tokens must be between 1 and 4096")
        return True


class InferenceResponse:
    def __init__(self, generated_text, prompt_tokens, 
                 completion_tokens, model_name):
        self.generated_text = generated_text
        self.prompt_tokens = prompt_tokens
        self.completion_tokens = completion_tokens
        self.total_tokens = prompt_tokens + completion_tokens
        self.model_name = model_name
        self.timestamp = None

The InferenceRequest class encapsulates all parameters needed for a generation request. The validate method enforces business rules about acceptable parameter ranges. Notice that this class has no knowledge of HTTP, JSON, databases, or GPU libraries. It represents pure domain concepts.

The domain service that orchestrates inference is defined as an abstract interface in the Essence layer:

from abc import ABC, abstractmethod

class InferenceService(ABC):
    @abstractmethod
    def generate(self, request):
        """Generate a response for the given inference request"""
        pass
    
    @abstractmethod
    def get_model_info(self):
        """Return information about the loaded model"""
        pass
    
    @abstractmethod
    def health_check(self):
        """Check if the inference service is healthy and ready"""
        pass

This interface defines the contract that any concrete implementation must fulfill. The Essence layer can use this interface without knowing whether the implementation uses a local GPU, a remote API, or a mock for testing.

Realization Layer: GPU-Accelerated Inference

The Realization layer provides concrete implementations of the InferenceService contract. We need to support multiple GPU backends, which presents an interesting architectural challenge. Rather than creating a single monolithic implementation with conditional logic for each GPU type, we create separate Realization implementations for each backend, all conforming to the same contract.

First, we need a GPU detection and selection mechanism:

import platform
import subprocess
import os

class GPUDetector:
    def __init__(self):
        self.available_backends = []
        self._detect_backends()
        
    def _detect_backends(self):
        """Detect all available GPU backends on this system"""
        if self._check_cuda():
            self.available_backends.append('cuda')
        if self._check_rocm():
            self.available_backends.append('rocm')
        if self._check_metal():
            self.available_backends.append('metal')
        if self._check_intel():
            self.available_backends.append('intel')
        if len(self.available_backends) == 0:
            self.available_backends.append('cpu')
    
    def _check_cuda(self):
        """Check if NVIDIA CUDA is available"""
        try:
            result = subprocess.run(['nvidia-smi'], 
                                    capture_output=True, 
                                    timeout=5)
            return result.returncode == 0
        except Exception:
            return False
    
    def _check_rocm(self):
        """Check if AMD ROCm is available"""
        try:
            result = subprocess.run(['rocm-smi'], 
                                    capture_output=True, 
                                    timeout=5)
            return result.returncode == 0
        except Exception:
            return False
    
    def _check_metal(self):
        """Check if Apple Metal is available"""
        return platform.system() == 'Darwin'
    
    def _check_intel(self):
        """Check if Intel GPU is available"""
        if platform.system() == 'Linux':
            return os.path.exists('/dev/dri')
        return False
    
    def get_preferred_backend(self):
        """Return the preferred backend based on availability"""
        preference_order = ['cuda', 'rocm', 'metal', 'intel', 'cpu']
        for backend in preference_order:
            if backend in self.available_backends:
                return backend
        return 'cpu'

The GPUDetector class examines the system to determine which GPU backends are available. It checks for NVIDIA GPUs by attempting to run nvidia-smi, for AMD GPUs by checking for rocm-smi, for Apple Metal by detecting macOS, and for Intel GPUs by checking for the DRI device on Linux.

Now we implement the actual inference service using llama-cpp-python:

import time
from datetime import datetime

class LocalInferenceService(InferenceService):
    def __init__(self, model_path, gpu_backend=None):
        self.model_path = model_path
        self.gpu_backend = gpu_backend or GPUDetector().get_preferred_backend()
        self.model = None
        self.model_info = {}
        self._load_model()
        
    def _load_model(self):
        """Load the language model with appropriate GPU backend"""
        try:
            from llama_cpp import Llama
            
            n_gpu_layers = 0
            if self.gpu_backend != 'cpu':
                n_gpu_layers = -1  # Offload all layers to GPU
            
            self.model = Llama(
                model_path=self.model_path,
                n_gpu_layers=n_gpu_layers,
                n_ctx=2048,
                n_batch=512,
                verbose=False
            )
            
            self.model_info = {
                'model_path': self.model_path,
                'backend': self.gpu_backend,
                'context_length': 2048,
                'loaded_at': datetime.utcnow().isoformat()
            }
            
        except Exception as e:
            raise RuntimeError(f"Failed to load model: {str(e)}")
    
    def generate(self, request):
        """Generate text based on the inference request"""
        if not self.model:
            raise RuntimeError("Model not loaded")
        
        request.validate()
        
        prompt = request.prompt
        if request.system_prompt:
            prompt = f"System: {request.system_prompt}\n\nUser: {prompt}"
        
        start_time = time.time()
        
        result = self.model(
            prompt,
            max_tokens=request.max_tokens,
            temperature=request.temperature,
            stop=["User:", "\n\n"],
            echo=False
        )
        
        generated_text = result['choices'][0]['text']
        prompt_tokens = result['usage']['prompt_tokens']
        completion_tokens = result['usage']['completion_tokens']
        
        response = InferenceResponse(
            generated_text=generated_text.strip(),
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            model_name=self.model_info.get('model_path', 'unknown')
        )
        
        response.timestamp = datetime.utcnow()
        
        return response
    
    def get_model_info(self):
        """Return information about the loaded model"""
        return self.model_info.copy()
    
    def health_check(self):
        """Check if the inference service is healthy and ready"""
        if not self.model:
            return {'status': 'unhealthy', 'reason': 'Model not loaded'}
        
        try:
            test_request = InferenceRequest(
                prompt="Test",
                max_tokens=5
            )
            self.generate(test_request)
            return {'status': 'healthy', 'backend': self.gpu_backend}
        except Exception as e:
            return {'status': 'unhealthy', 'reason': str(e)}

The LocalInferenceService implements the InferenceService contract. It loads a GGUF format model using llama-cpp-python, which automatically handles different GPU backends. The generate method takes an InferenceRequest from the Essence layer, validates it, performs the actual inference, and returns an InferenceResponse.

Notice how the implementation details of GPU acceleration are hidden behind the InferenceService interface. The Essence layer never needs to know about llama-cpp-python, CUDA, or any other infrastructure concern.

Adaptation Layer: REST API

The Adaptation layer exposes the inference capability through a REST API. We use FastAPI to create an OpenAI-compatible endpoint:

from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from typing import Optional, List
import uuid
from datetime import datetime

app = FastAPI(title="Local LLM API", version="1.0.0")

# Pydantic models for API request/response
class ChatMessage(BaseModel):
    role: str
    content: str

class ChatCompletionRequest(BaseModel):
    model: str = "local-llm"
    messages: List[ChatMessage]
    temperature: Optional[float] = 0.7
    max_tokens: Optional[int] = 512
    stream: Optional[bool] = False

class ChatCompletionResponse(BaseModel):
    id: str
    object: str = "chat.completion"
    created: int
    model: str
    choices: List[dict]
    usage: dict

# Dependency injection for authentication
async def verify_api_token(authorization: str = Header(None)):
    """Verify the API token from Authorization header"""
    if not authorization:
        raise HTTPException(status_code=401, detail="Missing authorization header")
    
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid authorization format")
    
    token = authorization[7:]  # Remove "Bearer " prefix
    
    # This will be replaced with actual token verification
    # For now, we'll use a placeholder
    from authentication_capability import AuthenticationService
    auth_service = get_auth_service()
    
    user = auth_service.verify_token(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid or expired token")
    
    return user

# Global inference service instance
inference_service = None

def get_inference_service():
    """Get or create the inference service instance"""
    global inference_service
    if inference_service is None:
        model_path = os.getenv('MODEL_PATH', '/models/model.gguf')
        inference_service = LocalInferenceService(model_path)
    return inference_service

@app.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def chat_completion(
    request: ChatCompletionRequest,
    user=Depends(verify_api_token)
):
    """OpenAI-compatible chat completion endpoint"""
    
    service = get_inference_service()
    
    # Extract the last user message as the prompt
    user_messages = [msg for msg in request.messages if msg.role == "user"]
    if not user_messages:
        raise HTTPException(status_code=400, detail="No user message found")
    
    prompt = user_messages[-1].content
    
    # Extract system message if present
    system_messages = [msg for msg in request.messages if msg.role == "system"]
    system_prompt = system_messages[0].content if system_messages else None
    
    # Create domain inference request
    inference_request = InferenceRequest(
        prompt=prompt,
        temperature=request.temperature,
        max_tokens=request.max_tokens,
        system_prompt=system_prompt
    )
    
    try:
        # Execute inference
        inference_response = service.generate(inference_request)
        
        # Convert to OpenAI format
        response = ChatCompletionResponse(
            id=f"chatcmpl-{uuid.uuid4().hex[:8]}",
            created=int(datetime.utcnow().timestamp()),
            model=request.model,
            choices=[{
                "index": 0,
                "message": {
                    "role": "assistant",
                    "content": inference_response.generated_text
                },
                "finish_reason": "stop"
            }],
            usage={
                "prompt_tokens": inference_response.prompt_tokens,
                "completion_tokens": inference_response.completion_tokens,
                "total_tokens": inference_response.total_tokens
            }
        )
        
        return response
        
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Inference failed: {str(e)}")

@app.get("/v1/models")
async def list_models(user=Depends(verify_api_token)):
    """List available models"""
    service = get_inference_service()
    model_info = service.get_model_info()
    
    return {
        "object": "list",
        "data": [{
            "id": "local-llm",
            "object": "model",
            "created": int(datetime.fromisoformat(model_info['loaded_at']).timestamp()),
            "owned_by": "local",
            "permission": [],
            "root": "local-llm",
            "parent": None
        }]
    }

@app.get("/health")
async def health_check():
    """Health check endpoint for Kubernetes"""
    service = get_inference_service()
    health = service.health_check()
    
    if health['status'] == 'healthy':
        return health
    else:
        raise HTTPException(status_code=503, detail=health)

The Adaptation layer translates between the external OpenAI API format and our internal domain model. The chat_completion endpoint receives an OpenAI-compatible request, extracts the relevant information, creates an InferenceRequest domain object, calls the InferenceService, and translates the InferenceResponse back to OpenAI format.

This separation allows us to change our internal domain model without breaking the external API, or vice versa. The Adaptation layer acts as an anti-corruption layer protecting our domain from external concerns.

THE USER MANAGEMENT CAPABILITY

The User Management Capability handles user registration, email verification, and API token generation. This capability demonstrates how CCA handles stateful operations and external service integration.

Essence Layer: User Domain

The User domain defines entities and business rules related to user accounts:

import hashlib
import secrets
from datetime import datetime, timedelta
from enum import Enum

class UserStatus(Enum):
    PENDING_VERIFICATION = "pending_verification"
    ACTIVE = "active"
    SUSPENDED = "suspended"
    DELETED = "deleted"

class User:
    def __init__(self, email, password_hash=None):
        self.id = None
        self.email = email
        self.password_hash = password_hash
        self.status = UserStatus.PENDING_VERIFICATION
        self.created_at = None
        self.verified_at = None
        self.api_tokens = []
        
    def validate_email(self):
        """Validate email format"""
        import re
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(pattern, self.email):
            raise ValueError("Invalid email format")
        return True
    
    def verify(self):
        """Mark user as verified"""
        if self.status != UserStatus.PENDING_VERIFICATION:
            raise ValueError("User is not in pending verification status")
        self.status = UserStatus.ACTIVE
        self.verified_at = datetime.utcnow()
    
    def can_generate_token(self):
        """Check if user can generate API tokens"""
        return self.status == UserStatus.ACTIVE
    
    def suspend(self):
        """Suspend user account"""
        self.status = UserStatus.SUSPENDED

class APIToken:
    def __init__(self, user_id, name=None):
        self.id = None
        self.user_id = user_id
        self.token = None
        self.name = name or "Default Token"
        self.created_at = None
        self.expires_at = None
        self.last_used_at = None
        self.is_active = True
        
    def generate_token(self):
        """Generate a secure random token"""
        self.token = secrets.token_urlsafe(32)
        self.created_at = datetime.utcnow()
        # Tokens expire after 1 year
        self.expires_at = self.created_at + timedelta(days=365)
        return self.token
    
    def is_valid(self):
        """Check if token is valid"""
        if not self.is_active:
            return False
        if self.expires_at and datetime.utcnow() > self.expires_at:
            return False
        return True
    
    def revoke(self):
        """Revoke this token"""
        self.is_active = False

class EmailVerification:
    def __init__(self, user_id, email):
        self.id = None
        self.user_id = user_id
        self.email = email
        self.verification_code = None
        self.created_at = None
        self.expires_at = None
        self.verified_at = None
        
    def generate_code(self):
        """Generate a verification code"""
        self.verification_code = secrets.token_urlsafe(32)
        self.created_at = datetime.utcnow()
        # Verification codes expire after 24 hours
        self.expires_at = self.created_at + timedelta(hours=24)
        return self.verification_code
    
    def is_valid(self):
        """Check if verification code is still valid"""
        if self.verified_at:
            return False
        if datetime.utcnow() > self.expires_at:
            return False
        return True
    
    def verify(self, code):
        """Verify the code"""
        if not self.is_valid():
            raise ValueError("Verification code has expired")
        if self.verification_code != code:
            raise ValueError("Invalid verification code")
        self.verified_at = datetime.utcnow()
        return True

These domain entities capture the business rules for user management. The User class knows that it must be verified before generating tokens. The APIToken class knows how to validate itself based on expiration and active status. The EmailVerification class manages the lifecycle of verification codes.

Now we define the service interfaces:

from abc import ABC, abstractmethod

class UserRepository(ABC):
    @abstractmethod
    def save(self, user):
        """Save a user to persistent storage"""
        pass
    
    @abstractmethod
    def find_by_email(self, email):
        """Find a user by email address"""
        pass
    
    @abstractmethod
    def find_by_id(self, user_id):
        """Find a user by ID"""
        pass

class TokenRepository(ABC):
    @abstractmethod
    def save(self, token):
        """Save an API token"""
        pass
    
    @abstractmethod
    def find_by_token(self, token_string):
        """Find a token by its string value"""
        pass
    
    @abstractmethod
    def find_by_user(self, user_id):
        """Find all tokens for a user"""
        pass
    
    @abstractmethod
    def delete(self, token_id):
        """Delete a token"""
        pass

class EmailService(ABC):
    @abstractmethod
    def send_verification_email(self, email, verification_code):
        """Send a verification email"""
        pass

class PasswordHasher(ABC):
    @abstractmethod
    def hash_password(self, password):
        """Hash a password"""
        pass
    
    @abstractmethod
    def verify_password(self, password, password_hash):
        """Verify a password against its hash"""
        pass

class UserManagementService:
    def __init__(self, user_repo, token_repo, email_service, password_hasher):
        self.user_repo = user_repo
        self.token_repo = token_repo
        self.email_service = email_service
        self.password_hasher = password_hasher
        self.verification_codes = {}  # In production, use a repository
        
    def register_user(self, email, password):
        """Register a new user"""
        # Check if user already exists
        existing_user = self.user_repo.find_by_email(email)
        if existing_user:
            raise ValueError("User with this email already exists")
        
        # Create new user
        user = User(email)
        user.validate_email()
        user.password_hash = self.password_hasher.hash_password(password)
        
        # Save user
        saved_user = self.user_repo.save(user)
        
        # Create verification
        verification = EmailVerification(saved_user.id, email)
        code = verification.generate_code()
        self.verification_codes[saved_user.id] = verification
        
        # Send verification email
        self.email_service.send_verification_email(email, code)
        
        return saved_user
    
    def verify_email(self, user_id, code):
        """Verify a user's email"""
        verification = self.verification_codes.get(user_id)
        if not verification:
            raise ValueError("No verification found for this user")
        
        verification.verify(code)
        
        user = self.user_repo.find_by_id(user_id)
        user.verify()
        self.user_repo.save(user)
        
        return user
    
    def create_api_token(self, user_id, token_name=None):
        """Create an API token for a user"""
        user = self.user_repo.find_by_id(user_id)
        if not user:
            raise ValueError("User not found")
        
        if not user.can_generate_token():
            raise ValueError("User cannot generate tokens")
        
        token = APIToken(user_id, token_name)
        token.generate_token()
        
        saved_token = self.token_repo.save(token)
        return saved_token
    
    def list_user_tokens(self, user_id):
        """List all tokens for a user"""
        return self.token_repo.find_by_user(user_id)
    
    def revoke_token(self, token_id, user_id):
        """Revoke a token"""
        token = self.token_repo.find_by_token(token_id)
        if not token or token.user_id != user_id:
            raise ValueError("Token not found or does not belong to user")
        
        token.revoke()
        self.token_repo.save(token)

The UserManagementService orchestrates the registration workflow. It depends on abstract interfaces for repositories, email sending, and password hashing. This allows the Essence layer to define the workflow without knowing about PostgreSQL, SMTP, or bcrypt.

Realization Layer: Concrete Implementations

Now we implement the concrete infrastructure components:

import bcrypt
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import psycopg2
from psycopg2.extras import RealDictCursor
from datetime import datetime

class BcryptPasswordHasher(PasswordHasher):
    def hash_password(self, password):
        """Hash a password using bcrypt"""
        salt = bcrypt.gensalt()
        return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
    
    def verify_password(self, password, password_hash):
        """Verify a password against its hash"""
        return bcrypt.checkpw(
            password.encode('utf-8'),
            password_hash.encode('utf-8')
        )

class SMTPEmailService(EmailService):
    def __init__(self, smtp_host, smtp_port, smtp_user, smtp_password, from_email):
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.smtp_user = smtp_user
        self.smtp_password = smtp_password
        self.from_email = from_email
        
    def send_verification_email(self, email, verification_code):
        """Send verification email via SMTP"""
        verification_url = f"https://your-domain.com/verify?code={verification_code}"
        
        msg = MIMEMultipart('alternative')
        msg['Subject'] = "Verify your email address"
        msg['From'] = self.from_email
        msg['To'] = email
        
        text = f"""
        Welcome to Local LLM API!
        
        Please verify your email address by clicking the link below:
        {verification_url}
        
        This link will expire in 24 hours.
        """
        
        html = f"""
        <html>
        <body>
            <h2>Welcome to Local LLM API!</h2>
            <p>Please verify your email address by clicking the button below:</p>
            <a href="{verification_url}" style="background-color: #4CAF50; color: white; padding: 14px 20px; text-decoration: none; display: inline-block;">
                Verify Email
            </a>
            <p>This link will expire in 24 hours.</p>
        </body>
        </html>
        """
        
        part1 = MIMEText(text, 'plain')
        part2 = MIMEText(html, 'html')
        
        msg.attach(part1)
        msg.attach(part2)
        
        with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
            server.starttls()
            server.login(self.smtp_user, self.smtp_password)
            server.send_message(msg)

class PostgreSQLUserRepository(UserRepository):
    def __init__(self, connection_string):
        self.connection_string = connection_string
        
    def _get_connection(self):
        return psycopg2.connect(self.connection_string)
    
    def save(self, user):
        """Save user to PostgreSQL"""
        conn = self._get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                if user.id is None:
                    # Insert new user
                    cur.execute("""
                        INSERT INTO users (email, password_hash, status, created_at)
                        VALUES (%s, %s, %s, %s)
                        RETURNING id, created_at
                    """, (user.email, user.password_hash, user.status.value, datetime.utcnow()))
                    result = cur.fetchone()
                    user.id = result['id']
                    user.created_at = result['created_at']
                else:
                    # Update existing user
                    cur.execute("""
                        UPDATE users
                        SET email = %s, password_hash = %s, status = %s, verified_at = %s
                        WHERE id = %s
                    """, (user.email, user.password_hash, user.status.value, user.verified_at, user.id))
                conn.commit()
                return user
        finally:
            conn.close()
    
    def find_by_email(self, email):
        """Find user by email"""
        conn = self._get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute("""
                    SELECT id, email, password_hash, status, created_at, verified_at
                    FROM users
                    WHERE email = %s
                """, (email,))
                row = cur.fetchone()
                if not row:
                    return None
                
                user = User(row['email'], row['password_hash'])
                user.id = row['id']
                user.status = UserStatus(row['status'])
                user.created_at = row['created_at']
                user.verified_at = row['verified_at']
                return user
        finally:
            conn.close()
    
    def find_by_id(self, user_id):
        """Find user by ID"""
        conn = self._get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute("""
                    SELECT id, email, password_hash, status, created_at, verified_at
                    FROM users
                    WHERE id = %s
                """, (user_id,))
                row = cur.fetchone()
                if not row:
                    return None
                
                user = User(row['email'], row['password_hash'])
                user.id = row['id']
                user.status = UserStatus(row['status'])
                user.created_at = row['created_at']
                user.verified_at = row['verified_at']
                return user
        finally:
            conn.close()

class PostgreSQLTokenRepository(TokenRepository):
    def __init__(self, connection_string):
        self.connection_string = connection_string
        
    def _get_connection(self):
        return psycopg2.connect(self.connection_string)
    
    def save(self, token):
        """Save API token to PostgreSQL"""
        conn = self._get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                if token.id is None:
                    cur.execute("""
                        INSERT INTO api_tokens (user_id, token, name, created_at, expires_at, is_active)
                        VALUES (%s, %s, %s, %s, %s, %s)
                        RETURNING id
                    """, (token.user_id, token.token, token.name, token.created_at, 
                          token.expires_at, token.is_active))
                    result = cur.fetchone()
                    token.id = result['id']
                else:
                    cur.execute("""
                        UPDATE api_tokens
                        SET is_active = %s, last_used_at = %s
                        WHERE id = %s
                    """, (token.is_active, token.last_used_at, token.id))
                conn.commit()
                return token
        finally:
            conn.close()
    
    def find_by_token(self, token_string):
        """Find token by its string value"""
        conn = self._get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute("""
                    SELECT id, user_id, token, name, created_at, expires_at, last_used_at, is_active
                    FROM api_tokens
                    WHERE token = %s
                """, (token_string,))
                row = cur.fetchone()
                if not row:
                    return None
                
                token = APIToken(row['user_id'], row['name'])
                token.id = row['id']
                token.token = row['token']
                token.created_at = row['created_at']
                token.expires_at = row['expires_at']
                token.last_used_at = row['last_used_at']
                token.is_active = row['is_active']
                return token
        finally:
            conn.close()
    
    def find_by_user(self, user_id):
        """Find all tokens for a user"""
        conn = self._get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                cur.execute("""
                    SELECT id, user_id, token, name, created_at, expires_at, last_used_at, is_active
                    FROM api_tokens
                    WHERE user_id = %s
                    ORDER BY created_at DESC
                """, (user_id,))
                rows = cur.fetchall()
                
                tokens = []
                for row in rows:
                    token = APIToken(row['user_id'], row['name'])
                    token.id = row['id']
                    token.token = row['token']
                    token.created_at = row['created_at']
                    token.expires_at = row['expires_at']
                    token.last_used_at = row['last_used_at']
                    token.is_active = row['is_active']
                    tokens.append(token)
                
                return tokens
        finally:
            conn.close()
    
    def delete(self, token_id):
        """Delete a token"""
        conn = self._get_connection()
        try:
            with conn.cursor() as cur:
                cur.execute("DELETE FROM api_tokens WHERE id = %s", (token_id,))
                conn.commit()
        finally:
            conn.close()

These Realization implementations handle the actual infrastructure work. They depend on the Essence interfaces but the Essence never depends on them. This allows us to swap PostgreSQL for MongoDB, or SMTP for SendGrid, without touching our domain logic.

Adaptation Layer: Web Interface

The Adaptation layer provides a web interface for user registration and token management:

from fastapi import FastAPI, Form, Request, HTTPException, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import os

web_app = FastAPI()
templates = Jinja2Templates(directory="templates")

# Dependency injection
def get_user_management_service():
    """Get or create user management service"""
    db_connection = os.getenv('DATABASE_URL')
    smtp_host = os.getenv('SMTP_HOST')
    smtp_port = int(os.getenv('SMTP_PORT', 587))
    smtp_user = os.getenv('SMTP_USER')
    smtp_password = os.getenv('SMTP_PASSWORD')
    from_email = os.getenv('FROM_EMAIL')
    
    user_repo = PostgreSQLUserRepository(db_connection)
    token_repo = PostgreSQLTokenRepository(db_connection)
    email_service = SMTPEmailService(smtp_host, smtp_port, smtp_user, smtp_password, from_email)
    password_hasher = BcryptPasswordHasher()
    
    return UserManagementService(user_repo, token_repo, email_service, password_hasher)

@web_app.get("/", response_class=HTMLResponse)
async def home(request: Request):
    """Home page"""
    return templates.TemplateResponse("home.html", {"request": request})

@web_app.get("/register", response_class=HTMLResponse)
async def register_page(request: Request):
    """Registration page"""
    return templates.TemplateResponse("register.html", {"request": request})

@web_app.post("/register")
async def register(
    email: str = Form(...),
    password: str = Form(...),
    confirm_password: str = Form(...),
    service: UserManagementService = Depends(get_user_management_service)
):
    """Handle registration form submission"""
    if password != confirm_password:
        raise HTTPException(status_code=400, detail="Passwords do not match")
    
    if len(password) < 8:
        raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
    
    try:
        user = service.register_user(email, password)
        return RedirectResponse(url="/register/success", status_code=303)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@web_app.get("/register/success", response_class=HTMLResponse)
async def register_success(request: Request):
    """Registration success page"""
    return templates.TemplateResponse("register_success.html", {"request": request})

@web_app.get("/verify")
async def verify_email(
    code: str,
    service: UserManagementService = Depends(get_user_management_service)
):
    """Verify email with code from URL"""
    try:
        # In a real implementation, you'd extract user_id from the code
        # For now, this is simplified
        user = service.verify_email(user_id, code)
        return RedirectResponse(url="/verify/success", status_code=303)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@web_app.get("/dashboard", response_class=HTMLResponse)
async def dashboard(request: Request):
    """User dashboard showing API tokens"""
    # In production, this would require authentication
    return templates.TemplateResponse("dashboard.html", {"request": request})

@web_app.post("/tokens/create")
async def create_token(
    name: str = Form(...),
    user_id: int = Form(...),  # In production, get from session
    service: UserManagementService = Depends(get_user_management_service)
):
    """Create a new API token"""
    try:
        token = service.create_api_token(user_id, name)
        return {"token": token.token, "name": token.name}
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@web_app.get("/tokens")
async def list_tokens(
    user_id: int,  # In production, get from session
    service: UserManagementService = Depends(get_user_management_service)
):
    """List all tokens for the current user"""
    tokens = service.list_user_tokens(user_id)
    return {
        "tokens": [{
            "id": t.id,
            "name": t.name,
            "created_at": t.created_at.isoformat(),
            "last_used_at": t.last_used_at.isoformat() if t.last_used_at else None,
            "is_active": t.is_active
        } for t in tokens]
    }

@web_app.post("/tokens/{token_id}/revoke")
async def revoke_token(
    token_id: int,
    user_id: int = Form(...),  # In production, get from session
    service: UserManagementService = Depends(get_user_management_service)
):
    """Revoke an API token"""
    try:
        service.revoke_token(token_id, user_id)
        return {"status": "success"}
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

The web interface Adaptation layer translates between HTML forms and our domain model. It handles HTTP concerns like form parsing, redirects, and template rendering, while delegating business logic to the UserManagementService.

THE AUTHENTICATION CAPABILITY

The Authentication Capability validates API tokens for incoming requests. This is a small but critical capability that other capabilities depend on.

Essence Layer

from abc import ABC, abstractmethod
from datetime import datetime

class AuthenticationResult:
    def __init__(self, is_authenticated, user=None, reason=None):
        self.is_authenticated = is_authenticated
        self.user = user
        self.reason = reason

class AuthenticationService(ABC):
    @abstractmethod
    def verify_token(self, token_string):
        """Verify an API token and return the associated user"""
        pass
    
    @abstractmethod
    def authenticate_request(self, token_string):
        """Authenticate a request and return authentication result"""
        pass

Realization Layer

class TokenAuthenticationService(AuthenticationService):
    def __init__(self, token_repo, user_repo):
        self.token_repo = token_repo
        self.user_repo = user_repo
        
    def verify_token(self, token_string):
        """Verify token and return user if valid"""
        token = self.token_repo.find_by_token(token_string)
        
        if not token:
            return None
        
        if not token.is_valid():
            return None
        
        # Update last used timestamp
        token.last_used_at = datetime.utcnow()
        self.token_repo.save(token)
        
        # Get user
        user = self.user_repo.find_by_id(token.user_id)
        return user
    
    def authenticate_request(self, token_string):
        """Authenticate a request"""
        if not token_string:
            return AuthenticationResult(
                is_authenticated=False,
                reason="No token provided"
            )
        
        user = self.verify_token(token_string)
        
        if not user:
            return AuthenticationResult(
                is_authenticated=False,
                reason="Invalid or expired token"
            )
        
        if user.status != UserStatus.ACTIVE:
            return AuthenticationResult(
                is_authenticated=False,
                reason=f"User account is {user.status.value}"
            )
        
        return AuthenticationResult(
            is_authenticated=True,
            user=user
        )

The Authentication Capability is deliberately simple. It has one job: verify tokens. By keeping it focused, we make it easy to test, maintain, and reason about.

DATABASE SCHEMA

The system requires a PostgreSQL database with the following schema:

-- Users table
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    status VARCHAR(50) NOT NULL DEFAULT 'pending_verification',
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    verified_at TIMESTAMP,
    CONSTRAINT email_format CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}$')
);

-- API tokens table
CREATE TABLE api_tokens (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    token VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    expires_at TIMESTAMP NOT NULL,
    last_used_at TIMESTAMP,
    is_active BOOLEAN NOT NULL DEFAULT TRUE,
    INDEX idx_token (token),
    INDEX idx_user_id (user_id)
);

-- Conversation history table
CREATE TABLE conversations (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    title VARCHAR(255),
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_user_id (user_id)
);

-- Messages table
CREATE TABLE messages (
    id SERIAL PRIMARY KEY,
    conversation_id INTEGER NOT NULL REFERENCES conversations(id) ON DELETE CASCADE,
    role VARCHAR(50) NOT NULL,
    content TEXT NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    prompt_tokens INTEGER,
    completion_tokens INTEGER,
    INDEX idx_conversation_id (conversation_id)
);

-- Email verifications table
CREATE TABLE email_verifications (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    verification_code VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    expires_at TIMESTAMP NOT NULL,
    verified_at TIMESTAMP,
    INDEX idx_verification_code (verification_code)
);

CONTAINERIZATION

The system deploys as Docker containers. We need two main containers: one for the LLM inference service and one for the user management web interface.

LLM Inference Dockerfile

FROM nvidia/cuda:12.1.0-devel-ubuntu22.04

# Install Python and dependencies
RUN apt-get update && apt-get install -y \
    python3.10 \
    python3-pip \
    git \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements
COPY requirements-inference.txt .

# Install Python packages
RUN pip3 install --no-cache-dir -r requirements-inference.txt

# Install llama-cpp-python with CUDA support
RUN CMAKE_ARGS="-DLLAMA_CUBLAS=on" pip3 install llama-cpp-python --force-reinstall --no-cache-dir

# Copy application code
COPY inference_capability/ ./inference_capability/
COPY main_inference.py .

# Create directory for models
RUN mkdir -p /models

# Expose port
EXPOSE 8000

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
    CMD python3 -c "import requests; requests.get('http://localhost:8000/health')"

# Run the application
CMD ["python3", "main_inference.py"]

Web Interface Dockerfile

FROM python:3.10-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    postgresql-client \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements
COPY requirements-web.txt .

# Install Python packages
RUN pip install --no-cache-dir -r requirements-web.txt

# Copy application code
COPY user_management_capability/ ./user_management_capability/
COPY authentication_capability/ ./authentication_capability/
COPY templates/ ./templates/
COPY static/ ./static/
COPY main_web.py .

# Expose port
EXPOSE 8080

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8080/health')"

# Run the application
CMD ["python", "main_web.py"]

Requirements Files

requirements-inference.txt:

fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
llama-cpp-python==0.2.20
psycopg2-binary==2.9.9
python-multipart==0.0.6

requirements-web.txt:

fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
jinja2==3.1.2
python-multipart==0.0.6
bcrypt==4.1.1
psycopg2-binary==2.9.9

KUBERNETES DEPLOYMENT

The system deploys to Kubernetes with separate deployments for each service.

Namespace

apiVersion: v1
kind: Namespace
metadata:
  name: local-llm

ConfigMap for Configuration

apiVersion: v1
kind: ConfigMap
metadata:
  name: llm-config
  namespace: local-llm
data:
  MODEL_PATH: "/models/mistral-7b-instruct-v0.2.Q4_K_M.gguf"
  DATABASE_HOST: "postgresql"
  DATABASE_PORT: "5432"
  DATABASE_NAME: "llm_api"
  SMTP_HOST: "smtp.gmail.com"
  SMTP_PORT: "587"
  FROM_EMAIL: "noreply@your-domain.com"

Secrets

apiVersion: v1
kind: Secret
metadata:
  name: llm-secrets
  namespace: local-llm
type: Opaque
stringData:
  DATABASE_USER: "llm_user"
  DATABASE_PASSWORD: "your-secure-password"
  SMTP_USER: "your-smtp-user"
  SMTP_PASSWORD: "your-smtp-password"

PostgreSQL Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgresql
  namespace: local-llm
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgresql
  template:
    metadata:
      labels:
        app: postgresql
    spec:
      containers:
      - name: postgresql
        image: postgres:15
        ports:
        - containerPort: 5432
        env:
        - name: POSTGRES_DB
          valueFrom:
            configMapKeyRef:
              name: llm-config
              key: DATABASE_NAME
        - name: POSTGRES_USER
          valueFrom:
            secretKeyRef:
              name: llm-secrets
              key: DATABASE_USER
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: llm-secrets
              key: DATABASE_PASSWORD
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
      volumes:
      - name: postgres-storage
        persistentVolumeClaim:
          claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: postgresql
  namespace: local-llm
spec:
  selector:
    app: postgresql
  ports:
  - port: 5432
    targetPort: 5432
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-pvc
  namespace: local-llm
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi

LLM Inference Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-inference
  namespace: local-llm
spec:
  replicas: 1
  selector:
    matchLabels:
      app: llm-inference
  template:
    metadata:
      labels:
        app: llm-inference
    spec:
      containers:
      - name: inference
        image: your-registry/llm-inference:latest
        ports:
        - containerPort: 8000
        env:
        - name: MODEL_PATH
          valueFrom:
            configMapKeyRef:
              name: llm-config
              key: MODEL_PATH
        - name: DATABASE_URL
          value: "postgresql://$(DATABASE_USER):$(DATABASE_PASSWORD)@$(DATABASE_HOST):$(DATABASE_PORT)/$(DATABASE_NAME)"
        envFrom:
        - configMapRef:
            name: llm-config
        - secretRef:
            name: llm-secrets
        resources:
          limits:
            nvidia.com/gpu: 1
            memory: "16Gi"
            cpu: "4"
          requests:
            nvidia.com/gpu: 1
            memory: "8Gi"
            cpu: "2"
        volumeMounts:
        - name: model-storage
          mountPath: /models
          readOnly: true
      volumes:
      - name: model-storage
        persistentVolumeClaim:
          claimName: model-pvc
      nodeSelector:
        gpu: "true"
---
apiVersion: v1
kind: Service
metadata:
  name: llm-inference
  namespace: local-llm
spec:
  selector:
    app: llm-inference
  ports:
  - port: 8000
    targetPort: 8000
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: model-pvc
  namespace: local-llm
spec:
  accessModes:
  - ReadOnlyMany
  resources:
    requests:
      storage: 20Gi

Web Interface Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-web
  namespace: local-llm
spec:
  replicas: 2
  selector:
    matchLabels:
      app: llm-web
  template:
    metadata:
      labels:
        app: llm-web
    spec:
      containers:
      - name: web
        image: your-registry/llm-web:latest
        ports:
        - containerPort: 8080
        env:
        - name: DATABASE_URL
          value: "postgresql://$(DATABASE_USER):$(DATABASE_PASSWORD)@$(DATABASE_HOST):$(DATABASE_PORT)/$(DATABASE_NAME)"
        - name: INFERENCE_SERVICE_URL
          value: "http://llm-inference:8000"
        envFrom:
        - configMapRef:
            name: llm-config
        - secretRef:
            name: llm-secrets
        resources:
          limits:
            memory: "512Mi"
            cpu: "500m"
          requests:
            memory: "256Mi"
            cpu: "250m"
---
apiVersion: v1
kind: Service
metadata:
  name: llm-web
  namespace: local-llm
spec:
  selector:
    app: llm-web
  ports:
  - port: 8080
    targetPort: 8080

Ingress Configuration

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: llm-ingress
  namespace: local-llm
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target: /
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
  ingressClassName: nginx
  tls:
  - hosts:
    - api.your-domain.com
    - app.your-domain.com
    secretName: llm-tls
  rules:
  - host: api.your-domain.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: llm-inference
            port:
              number: 8000
  - host: app.your-domain.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: llm-web
            port:
              number: 8080

MAIN APPLICATION ENTRY POINTS

Main Inference Service

# main_inference.py
import os
import uvicorn
from fastapi import FastAPI
from inference_capability.adaptation.api import app as inference_app
from inference_capability.realization.local_inference import LocalInferenceService
from authentication_capability.realization.token_auth import TokenAuthenticationService
from user_management_capability.realization.repositories import (
    PostgreSQLUserRepository,
    PostgreSQLTokenRepository
)

# Initialize repositories
db_url = os.getenv('DATABASE_URL')
user_repo = PostgreSQLUserRepository(db_url)
token_repo = PostgreSQLTokenRepository(db_url)

# Initialize authentication service
auth_service = TokenAuthenticationService(token_repo, user_repo)

# Initialize inference service
model_path = os.getenv('MODEL_PATH', '/models/model.gguf')
inference_service = LocalInferenceService(model_path)

# Make services available to the app
inference_app.state.inference_service = inference_service
inference_app.state.auth_service = auth_service

if __name__ == "__main__":
    uvicorn.run(
        inference_app,
        host="0.0.0.0",
        port=8000,
        log_level="info"
    )

Main Web Service

# main_web.py
import os
import uvicorn
from user_management_capability.adaptation.web import web_app
from user_management_capability.essence.services import UserManagementService
from user_management_capability.realization.repositories import (
    PostgreSQLUserRepository,
    PostgreSQLTokenRepository
)
from user_management_capability.realization.email import SMTPEmailService
from user_management_capability.realization.password import BcryptPasswordHasher

# Initialize infrastructure components
db_url = os.getenv('DATABASE_URL')
user_repo = PostgreSQLUserRepository(db_url)
token_repo = PostgreSQLTokenRepository(db_url)

smtp_config = {
    'host': os.getenv('SMTP_HOST'),
    'port': int(os.getenv('SMTP_PORT', 587)),
    'user': os.getenv('SMTP_USER'),
    'password': os.getenv('SMTP_PASSWORD'),
    'from_email': os.getenv('FROM_EMAIL')
}
email_service = SMTPEmailService(**smtp_config)
password_hasher = BcryptPasswordHasher()

# Initialize user management service
user_service = UserManagementService(
    user_repo,
    token_repo,
    email_service,
    password_hasher
)

# Make service available to the app
web_app.state.user_service = user_service

if __name__ == "__main__":
    uvicorn.run(
        web_app,
        host="0.0.0.0",
        port=8080,
        log_level="info"
    )

CLIENT USAGE EXAMPLES

Python Client

import requests

class LLMClient:
    def __init__(self, api_url, api_token):
        self.api_url = api_url
        self.api_token = api_token
        self.headers = {
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json"
        }
    
    def chat(self, messages, temperature=0.7, max_tokens=512):
        """Send a chat completion request"""
        response = requests.post(
            f"{self.api_url}/v1/chat/completions",
            headers=self.headers,
            json={
                "model": "local-llm",
                "messages": messages,
                "temperature": temperature,
                "max_tokens": max_tokens
            }
        )
        response.raise_for_status()
        return response.json()
    
    def list_models(self):
        """List available models"""
        response = requests.get(
            f"{self.api_url}/v1/models",
            headers=self.headers
        )
        response.raise_for_status()
        return response.json()

# Usage example
client = LLMClient(
    api_url="https://api.your-domain.com",
    api_token="your-api-token-here"
)

# Simple chat
response = client.chat([
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is the capital of France?"}
])

print(response['choices'][0]['message']['content'])

# Multi-turn conversation
conversation = [
    {"role": "system", "content": "You are a helpful coding assistant."},
    {"role": "user", "content": "How do I reverse a string in Python?"}
]

response = client.chat(conversation)
assistant_message = response['choices'][0]['message']['content']
print(assistant_message)

# Continue the conversation
conversation.append({"role": "assistant", "content": assistant_message})
conversation.append({"role": "user", "content": "Can you show me a more efficient way?"})

response = client.chat(conversation)
print(response['choices'][0]['message']['content'])

JavaScript Client

class LLMClient {
    constructor(apiUrl, apiToken) {
        this.apiUrl = apiUrl;
        this.apiToken = apiToken;
    }
    
    async chat(messages, temperature = 0.7, maxTokens = 512) {
        const response = await fetch(`${this.apiUrl}/v1/chat/completions`, {
            method: 'POST',
            headers: {
                'Authorization': `Bearer ${this.apiToken}`,
                'Content-Type': 'application/json'
            },
            body: JSON.stringify({
                model: 'local-llm',
                messages: messages,
                temperature: temperature,
                max_tokens: maxTokens
            })
        });
        
        if (!response.ok) {
            throw new Error(`API request failed: ${response.statusText}`);
        }
        
        return await response.json();
    }
    
    async listModels() {
        const response = await fetch(`${this.apiUrl}/v1/models`, {
            headers: {
                'Authorization': `Bearer ${this.apiToken}`
            }
        });
        
        if (!response.ok) {
            throw new Error(`API request failed: ${response.statusText}`);
        }
        
        return await response.json();
    }
}

// Usage example
const client = new LLMClient(
    'https://api.your-domain.com',
    'your-api-token-here'
);

// Simple chat
const response = await client.chat([
    { role: 'system', content: 'You are a helpful assistant.' },
    { role: 'user', content: 'What is the capital of France?' }
]);

console.log(response.choices[0].message.content);

Curl Example

# Chat completion
curl -X POST https://api.your-domain.com/v1/chat/completions \
  -H "Authorization: Bearer your-api-token-here" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "local-llm",
    "messages": [
      {"role": "system", "content": "You are a helpful assistant."},
      {"role": "user", "content": "What is the capital of France?"}
    ],
    "temperature": 0.7,
    "max_tokens": 512
  }'

# List models
curl https://api.your-domain.com/v1/models \
  -H "Authorization: Bearer your-api-token-here"

TESTING STRATEGY

The CCA structure makes testing straightforward. We can test each layer independently.

Testing the Essence Layer

import unittest
from inference_capability.essence.models import InferenceRequest, InferenceResponse

class TestInferenceRequest(unittest.TestCase):
    def test_valid_request(self):
        """Test that valid requests pass validation"""
        request = InferenceRequest(
            prompt="Hello, world!",
            temperature=0.7,
            max_tokens=100
        )
        self.assertTrue(request.validate())
    
    def test_empty_prompt_fails(self):
        """Test that empty prompts fail validation"""
        request = InferenceRequest(prompt="")
        with self.assertRaises(ValueError):
            request.validate()
    
    def test_invalid_temperature_fails(self):
        """Test that invalid temperature fails validation"""
        request = InferenceRequest(prompt="Test", temperature=3.0)
        with self.assertRaises(ValueError):
            request.validate()
    
    def test_invalid_max_tokens_fails(self):
        """Test that invalid max_tokens fails validation"""
        request = InferenceRequest(prompt="Test", max_tokens=0)
        with self.assertRaises(ValueError):
            request.validate()

class TestUser(unittest.TestCase):
    def test_email_validation(self):
        """Test email validation"""
        user = User("valid@example.com")
        self.assertTrue(user.validate_email())
        
        user = User("invalid-email")
        with self.assertRaises(ValueError):
            user.validate_email()
    
    def test_user_verification(self):
        """Test user verification workflow"""
        user = User("test@example.com")
        self.assertEqual(user.status, UserStatus.PENDING_VERIFICATION)
        
        user.verify()
        self.assertEqual(user.status, UserStatus.ACTIVE)
        self.assertIsNotNone(user.verified_at)
    
    def test_cannot_verify_twice(self):
        """Test that user cannot be verified twice"""
        user = User("test@example.com")
        user.verify()
        
        with self.assertRaises(ValueError):
            user.verify()
    
    def test_can_generate_token_only_when_active(self):
        """Test token generation rules"""
        user = User("test@example.com")
        self.assertFalse(user.can_generate_token())
        
        user.verify()
        self.assertTrue(user.can_generate_token())
        
        user.suspend()
        self.assertFalse(user.can_generate_token())

class TestAPIToken(unittest.TestCase):
    def test_token_generation(self):
        """Test token generation"""
        token = APIToken(user_id=1)
        token_string = token.generate_token()
        
        self.assertIsNotNone(token_string)
        self.assertGreater(len(token_string), 20)
        self.assertIsNotNone(token.created_at)
        self.assertIsNotNone(token.expires_at)
    
    def test_token_validation(self):
        """Test token validation"""
        token = APIToken(user_id=1)
        token.generate_token()
        
        self.assertTrue(token.is_valid())
        
        token.revoke()
        self.assertFalse(token.is_valid())

Testing the Realization Layer with Mocks

import unittest
from unittest.mock import Mock, patch
from user_management_capability.essence.services import UserManagementService
from user_management_capability.essence.models import User

class TestUserManagementService(unittest.TestCase):
    def setUp(self):
        """Set up test dependencies"""
        self.user_repo = Mock()
        self.token_repo = Mock()
        self.email_service = Mock()
        self.password_hasher = Mock()
        
        self.service = UserManagementService(
            self.user_repo,
            self.token_repo,
            self.email_service,
            self.password_hasher
        )
    
    def test_register_new_user(self):
        """Test registering a new user"""
        # Setup mocks
        self.user_repo.find_by_email.return_value = None
        self.password_hasher.hash_password.return_value = "hashed_password"
        
        saved_user = User("test@example.com")
        saved_user.id = 1
        self.user_repo.save.return_value = saved_user
        
        # Execute
        user = self.service.register_user("test@example.com", "password123")
        
        # Verify
        self.assertEqual(user.email, "test@example.com")
        self.user_repo.save.assert_called_once()
        self.email_service.send_verification_email.assert_called_once()
    
    def test_cannot_register_duplicate_email(self):
        """Test that duplicate emails are rejected"""
        # Setup mocks
        existing_user = User("test@example.com")
        self.user_repo.find_by_email.return_value = existing_user
        
        # Execute and verify
        with self.assertRaises(ValueError):
            self.service.register_user("test@example.com", "password123")
    
    def test_create_token_for_verified_user(self):
        """Test creating token for verified user"""
        # Setup mocks
        user = User("test@example.com")
        user.id = 1
        user.verify()
        self.user_repo.find_by_id.return_value = user
        
        token = APIToken(1)
        token.id = 1
        token.generate_token()
        self.token_repo.save.return_value = token
        
        # Execute
        created_token = self.service.create_api_token(1, "My Token")
        
        # Verify
        self.assertIsNotNone(created_token)
        self.token_repo.save.assert_called_once()
    
    def test_cannot_create_token_for_unverified_user(self):
        """Test that unverified users cannot create tokens"""
        # Setup mocks
        user = User("test@example.com")
        user.id = 1
        # User is not verified
        self.user_repo.find_by_id.return_value = user
        
        # Execute and verify
        with self.assertRaises(ValueError):
            self.service.create_api_token(1, "My Token")

Integration Testing

import unittest
import psycopg2
from testcontainers.postgres import PostgresContainer
from user_management_capability.realization.repositories import PostgreSQLUserRepository
from user_management_capability.essence.models import User

class TestPostgreSQLUserRepository(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        """Start a PostgreSQL container for testing"""
        cls.postgres = PostgresContainer("postgres:15")
        cls.postgres.start()
        
        # Create schema
        conn = psycopg2.connect(cls.postgres.get_connection_url())
        with conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE users (
                    id SERIAL PRIMARY KEY,
                    email VARCHAR(255) UNIQUE NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    status VARCHAR(50) NOT NULL DEFAULT 'pending_verification',
                    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
                    verified_at TIMESTAMP
                )
            """)
        conn.commit()
        conn.close()
    
    @classmethod
    def tearDownClass(cls):
        """Stop the PostgreSQL container"""
        cls.postgres.stop()
    
    def setUp(self):
        """Create repository instance"""
        self.repo = PostgreSQLUserRepository(self.postgres.get_connection_url())
    
    def tearDown(self):
        """Clean up database"""
        conn = psycopg2.connect(self.postgres.get_connection_url())
        with conn.cursor() as cur:
            cur.execute("DELETE FROM users")
        conn.commit()
        conn.close()
    
    def test_save_and_find_user(self):
        """Test saving and retrieving a user"""
        user = User("test@example.com", "hashed_password")
        
        # Save
        saved_user = self.repo.save(user)
        self.assertIsNotNone(saved_user.id)
        
        # Find by email
        found_user = self.repo.find_by_email("test@example.com")
        self.assertIsNotNone(found_user)
        self.assertEqual(found_user.email, "test@example.com")
        
        # Find by ID
        found_by_id = self.repo.find_by_id(saved_user.id)
        self.assertIsNotNone(found_by_id)
        self.assertEqual(found_by_id.id, saved_user.id)
    
    def test_update_user(self):
        """Test updating a user"""
        user = User("test@example.com", "hashed_password")
        saved_user = self.repo.save(user)
        
        # Verify user
        saved_user.verify()
        updated_user = self.repo.save(saved_user)
        
        # Retrieve and check
        found_user = self.repo.find_by_id(saved_user.id)
        self.assertEqual(found_user.status, UserStatus.ACTIVE)
        self.assertIsNotNone(found_user.verified_at)

MONITORING AND OBSERVABILITY

Production systems require monitoring. We add observability through Prometheus metrics and structured logging.

Metrics Collection

from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
import time

# Define metrics
inference_requests_total = Counter(
    'inference_requests_total',
    'Total number of inference requests',
    ['status']
)

inference_duration_seconds = Histogram(
    'inference_duration_seconds',
    'Time spent processing inference requests',
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)

inference_tokens_total = Counter(
    'inference_tokens_total',
    'Total number of tokens processed',
    ['type']
)

active_users_gauge = Gauge(
    'active_users',
    'Number of active users'
)

# Middleware to collect metrics
@app.middleware("http")
async def metrics_middleware(request, call_next):
    start_time = time.time()
    
    response = await call_next(request)
    
    duration = time.time() - start_time
    
    if request.url.path.startswith("/v1/chat/completions"):
        inference_duration_seconds.observe(duration)
        
        if response.status_code == 200:
            inference_requests_total.labels(status='success').inc()
        else:
            inference_requests_total.labels(status='error').inc()
    
    return response

# Metrics endpoint
@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    return Response(content=generate_latest(), media_type="text/plain")

Structured Logging

import logging
import json
from datetime import datetime

class StructuredLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
    
    def _log(self, level, message, **kwargs):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': level,
            'message': message,
            **kwargs
        }
        self.logger.log(getattr(logging, level), json.dumps(log_entry))
    
    def info(self, message, **kwargs):
        self._log('INFO', message, **kwargs)
    
    def error(self, message, **kwargs):
        self._log('ERROR', message, **kwargs)
    
    def warning(self, message, **kwargs):
        self._log('WARNING', message, **kwargs)

# Usage in inference service
logger = StructuredLogger('inference')

class LocalInferenceService(InferenceService):
    def generate(self, request):
        logger.info(
            "Inference request received",
            prompt_length=len(request.prompt),
            temperature=request.temperature,
            max_tokens=request.max_tokens
        )
        
        try:
            response = self._do_inference(request)
            
            logger.info(
                "Inference completed successfully",
                prompt_tokens=response.prompt_tokens,
                completion_tokens=response.completion_tokens,
                total_tokens=response.total_tokens
            )
            
            return response
            
        except Exception as e:
            logger.error(
                "Inference failed",
                error=str(e),
                error_type=type(e).__name__
            )
            raise

EVOLUTION ENVELOPES AND VERSIONING

As the system evolves, we need strategies to manage changes without breaking existing clients. Evolution Envelopes provide this capability.

API Versioning

from fastapi import APIRouter

# Version 1 API
v1_router = APIRouter(prefix="/v1")

@v1_router.post("/chat/completions")
async def chat_completion_v1(request: ChatCompletionRequest):
    """Version 1 of chat completions API"""
    # Implementation
    pass

# Version 2 API with enhanced features
v2_router = APIRouter(prefix="/v2")

class ChatCompletionRequestV2(ChatCompletionRequest):
    """Enhanced request with additional features"""
    conversation_id: Optional[str] = None
    user_id: Optional[str] = None
    metadata: Optional[dict] = None

@v2_router.post("/chat/completions")
async def chat_completion_v2(request: ChatCompletionRequestV2):
    """Version 2 with conversation tracking"""
    # Enhanced implementation
    pass

# Register both versions
app.include_router(v1_router)
app.include_router(v2_router)

Database Migrations

# migrations/001_initial_schema.py
def upgrade(conn):
    """Apply migration"""
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE users (
                id SERIAL PRIMARY KEY,
                email VARCHAR(255) UNIQUE NOT NULL,
                password_hash VARCHAR(255) NOT NULL,
                status VARCHAR(50) NOT NULL DEFAULT 'pending_verification',
                created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
                verified_at TIMESTAMP
            )
        """)
    conn.commit()

def downgrade(conn):
    """Revert migration"""
    with conn.cursor() as cur:
        cur.execute("DROP TABLE users")
    conn.commit()

# migrations/002_add_user_metadata.py
def upgrade(conn):
    """Add metadata column to users"""
    with conn.cursor() as cur:
        cur.execute("""
            ALTER TABLE users
            ADD COLUMN metadata JSONB DEFAULT '{}'::jsonb
        """)
    conn.commit()

def downgrade(conn):
    """Remove metadata column"""
    with conn.cursor() as cur:
        cur.execute("ALTER TABLE users DROP COLUMN metadata")
    conn.commit()

DEPLOYMENT CHECKLIST

Before deploying to production, ensure the following:

  1. Security

    • All secrets stored in Kubernetes Secrets, not ConfigMaps
    • TLS certificates configured for all external endpoints
    • Database connections use SSL
    • API rate limiting implemented
    • Input validation on all endpoints
    • SQL injection prevention (parameterized queries)
    • CORS properly configured
  2. Reliability

    • Health checks configured for all containers
    • Liveness and readiness probes set
    • Resource limits defined
    • Horizontal pod autoscaling configured
    • Database backups automated
    • Disaster recovery plan documented
  3. Performance

    • GPU utilization monitored
    • Connection pooling for database
    • Caching strategy implemented
    • Load testing completed
    • Performance baselines established
  4. Observability

    • Prometheus metrics exported
    • Grafana dashboards created
    • Log aggregation configured
    • Alerting rules defined
    • Distributed tracing implemented
  5. Operations

    • CI/CD pipeline configured
    • Rolling update strategy defined
    • Rollback procedure documented
    • Runbooks created for common issues
    • On-call rotation established

CONCLUSION

This tutorial has demonstrated how to build a production-ready local LLM system using Capability-Centric Architecture. We have seen how CCA principles guide us to create a system that is:

  • Maintainable: Each capability has clear boundaries and responsibilities
  • Testable: Pure domain logic can be tested without infrastructure
  • Flexible: Infrastructure can be swapped without changing business logic
  • Scalable: Capabilities can be deployed and scaled independently
  • Evolvable: New versions can coexist with old versions

The key insights from CCA that made this possible:

  1. Essence-Realization-Adaptation separation keeps domain logic pure
  2. Dependency inversion allows infrastructure to be pluggable
  3. Contracts define clear interfaces between capabilities
  4. Evolution Envelopes manage change over time
  5. Efficiency Gradients allow optimization where needed

This architecture scales from a single developer running locally to a production deployment serving thousands of users. The same code structure works whether you are using NVIDIA GPUs, AMD GPUs, or CPU-only inference. The same domain logic works whether you store data in PostgreSQL, MongoDB, or in-memory structures.

By following CCA principles, we have built a system that will serve us well as requirements evolve, technologies change, and scale increases.

Flux: Building a Turing-Complete Language with Just 9 Operations - How minimalism in language design reveals the fundamental nature of computation




Introduction


In an era where programming languages compete on features, frameworks, and syntactic sugar, there’s something refreshing about going in the opposite direction. What if we stripped away everything—objects, types, functions, even named variables—and asked: what’s the absolute minimum needed for universal computation?

Enter Flux, an esoteric programming language that proves you don’t need much at all.

With only 9 operations and two data structures (an accumulator and a stack), Flux is Turing complete, meaning it can compute anything that any other programming language can compute. It’s not meant for production systems, but as an exploration of computational minimalism and a teaching tool for understanding what programming really is at its core.

The Philosophy of Minimalism


Esoteric programming languages (esolangs) serve a unique purpose in computer science. They’re not designed for practical use, but rather to challenge assumptions, explore theoretical boundaries, or simply create art. Languages like Brainfuck, Malbolge, and Befunge have captivated programmers for decades by being deliberately minimal, obscure, or bizarre.

Flux joins this tradition with a clear design philosophy:

1. Radical Minimalism - Only essential operations, nothing more
2. Composability - Complex behavior emerges from simple primitives
3. Transparency - No hidden state, no magic, no surprises
4. Stack-Based - A proven model for expression evaluation

The goal wasn’t to create the most obscure language possible, but rather the most elegant minimal language—one where every operation serves a clear purpose and nothing is redundant.

The Language Specification


Flux programs operate on two data structures:

- The Accumulator: A single integer register that serves as the central workspace
- The Stack: An unbounded LIFO (Last In, First Out) stack of integers

Every operation in Flux revolves around these two structures. Here are all nine operations:


+    Increment accumulator by 1
-    Decrement accumulator by 1
*    Push accumulator value onto stack
/    Pop top stack value into accumulator
[    Begin loop (if accumulator ≠ 0)
]    End loop (jump back if accumulator ≠ 0)
.    Output accumulator as ASCII character
,    Input character into accumulator
#    Output accumulator as decimal number


That’s it. Everything else—arithmetic, conditionals, data structures, algorithms—must be built from these primitives.

How It Works: A Simple Example


Let’s walk through a simple program that adds 3 and 5:


+++*        Set accumulator to 3, push to stack
+++++*      Set accumulator to 5, push to stack
/*          Pop 5, pop 3
+/          Add them
#           Output: 8


Here’s what happens step by step:

1. `+++` increments the accumulator three times (acc = 3)
2. `*` pushes 3 onto the stack (stack = [3])
3. `+++++` sets accumulator to 5 (acc = 5)
4. `*` pushes 5 onto the stack (stack = [3, 5])
5. `/` pops 5 into accumulator (acc = 5, stack = [3])
6. `*` pushes 5 back (stack = [3, 5])
7. `/` pops 3 into accumulator (acc = 3, stack = [5])
8. The accumulator now needs the sum, which requires a loop pattern
9. `#` outputs the result

This verbosity is intentional—it forces you to think about every single operation at the machine level.

Turing Completeness: Proof Through Construction


A language is Turing complete if it can simulate a Turing machine, which requires:

1. Unbounded memory - Flux has an unbounded stack
2. Conditional execution - Flux has while-loops with `[...]`
3. Basic operations - Flux has arithmetic via `+` and `-`

More formally, Flux can simulate a Turing machine by mapping:

- Tape → The stack (can grow infinitely)
- Head position → Implicitly tracked via stack depth
- State → Encoded in accumulator value
- Symbols → Integer values on the stack
- Transition function → Conditional loops with arithmetic

Since Flux can simulate a universal Turing machine, it can compute any computable function. This places Flux in the same computational class as Python, C++, JavaScript, or any other general-purpose language.

The practical difference, of course, is that writing even trivial programs in Flux is extraordinarily verbose and challenging—which is exactly the point.

The Compiler: From Source to Execution


The Flux compiler is implemented in Go and follows a traditional compilation pipeline:

1. Lexical Analysis


The compiler scans the source code character by character, recognizing the 9 valid operations and treating everything else as comments. This means you can write:


Add 3 and 5 together:
+++*       This pushes 3
+++++*     This pushes 5
/*+/#      This adds them and outputs

The descriptive text is completely ignored.

2. Code Generation


Valid operations are converted into bytecode instructions. For example, `+++` becomes three `INC` (increment) instructions. Loops are more complex—the compiler must match brackets and calculate jump addresses.

3. Loop Matching


This is the most sophisticated part of the compiler. When it encounters `[`, it:

- Records the current instruction position
- Emits a `LOOP` instruction with a placeholder jump address
- Pushes the position onto a loop stack

When it encounters `]`, it:

- Pops the matching loop start position
- Emits an `END` instruction that jumps back to the loop start
- Patches the `LOOP` instruction with the correct end address

This enables O(1) loop jumps at runtime—no need to scan for matching brackets during execution.

4. Execution


The virtual machine executes bytecode with a program counter that increments through instructions. The VM maintains:

- The accumulator (integer)
- The stack (dynamically growing array)
- The program counter (instruction pointer)
- Input/output streams

Each instruction is a simple operation that modifies these structures in predictable ways.

Programming Patterns and Idioms


Despite its minimalism, Flux has emergent patterns that experienced users learn to recognize:

Clearing the Accumulator


[-]    While accumulator ≠ 0, decrement

Conditional Execution


[...code...[-]]    Run code if acc ≠ 0, then clear to exit

Duplicating Stack Top


/*     Pop to accumulator, push twice

Multiplication (A × B)


(Assuming A and B are on stack)
[-]* /         Zero acc, push 0, pop B
[              While B > 0:
  -*             Decrement B, push
  /*             Get A
  /              Get result
  +              Add A
  **             Push result and A
  /              Get B
]
/              Final result in acc


These patterns are Flux’s “standard library”—reusable idioms that programmers memorize and compose.

What Flux Teaches Us


Beyond being a curiosity, Flux offers genuine insights into programming and computation:

1. Computation Is Simple at Its Core


Modern languages hide complexity behind abstractions. Flux strips those away, revealing that computation is really just:

- Moving data around
- Comparing values
- Repeating operations

Everything else is convenience.

2. Syntax Is Arbitrary


There’s nothing inherently special about `if`, `while`, or `function`. These are just conventions. Flux’s `[...]` loops prove you can express the same computational power with different syntax.

3. The Stack Model Is Powerful


Stack-based languages (Forth, PostScript, JVM bytecode) are efficient and expressive. Flux demonstrates why: stacks naturally handle nested evaluation, function calls, and temporary storage with minimal overhead.

4. Constraints Breed Creativity


When you can’t use variables, you must think differently. Flux programmers develop mental models of stack state and learn to compose operations in novel ways. This cognitive challenge mirrors real-world programming where constraints (memory, CPU, time) force creative solutions.

Practical Applications


“When would I actually use Flux?” is a valid question. The honest answer: probably never for production code. But Flux has genuine educational value:

- Teaching tool for compiler construction
- Understanding Turing completeness through hands-on exploration
- Mental exercise in computational thinking
- Gateway to understanding how computers work at a low level
- Inspiration for designing other minimalist systems

Computer science students often encounter Turing machines in theory but never see them in action. Flux provides a bridge—it’s abstract enough to demonstrate theoretical concepts but concrete enough to run real programs.

Comparison to Other Minimal Languages


Brainfuck


The most famous minimal language, Brainfuck has 8 operations and uses a tape metaphor instead of a stack. Flux is arguably cleaner with its accumulator-stack model, which maps more naturally to actual CPU architecture.

Forth


A practical stack-based language that influenced Flux. Forth is far more sophisticated with words (functions), dictionaries, and real-world use cases. Flux is Forth’s minimalist cousin.

Lambda Calculus


The theoretical foundation of functional programming. Lambda calculus is more minimal (only three operations!) but purely abstract. Flux is imperative and executable.

Turing Machines


The theoretical computer that defines computability. Turing machines are even simpler conceptually but incredibly verbose to program. Flux offers a middle ground.

The Implementation Details


The Go implementation of Flux showcases several interesting techniques:

Bytecode Compilation: Rather than interpreting source directly, Flux compiles to bytecode first. This enables optimizations like:

- Pre-computed jump addresses for loops
- Single-pass execution without bracket matching
- Potential future optimizations (instruction fusion, constant folding)

Dynamic Stack: The stack is implemented as Go’s slice, which automatically grows as needed. This provides the “unbounded” memory required for Turing completeness while remaining practical.

Error Handling: The compiler validates bracket matching at compile time, catching syntax errors before execution. This is more user-friendly than runtime failures.

Documentation First: The implementation includes extensive inline documentation explaining the language specification. This makes the compiler itself a learning resource.

Future Directions


Flux could evolve in several interesting ways:

1. Visual Debugger: A tool that visualizes accumulator and stack state during execution
2. Standard Library: A collection of common patterns (math operations, string handling)
3. Optimizer: Detecting patterns like `+-` (no-op) or `+++` (single instruction)
4. Transpiler: Converting Flux to other languages or intermediate representations
5. JIT Compiler: Compiling hot paths to native code for performance

Each extension adds complexity, but the core language remains minimal.

Conclusion: The Beauty of Constraints


Flux isn’t trying to replace Python or JavaScript. It’s not competing with Rust or Go. Instead, it occupies a unique niche: a language that’s minimal enough to understand completely yet powerful enough to compute anything.

In our field’s rush toward ever-more-powerful abstractions, there’s value in occasionally returning to fundamentals. What is computation, really? What do we *actually* need to build a universal computer?

Flux provides an answer: not much. Nine operations, two data structures, and a bit of creativity.

For programmers, working with Flux is like a pianist practicing scales—it might not be glamorous, but it builds fundamental skills and appreciation for the instrument. You develop an intuition for how computation works at the lowest level, which makes you better at reasoning about performance, memory, and algorithms in any language.

And there’s something aesthetically pleasing about a complete programming language that fits on a single page. In a world of sprawling frameworks and endless dependencies, that kind of simplicity is refreshing.

So while you probably won’t build your next web app in Flux, you might find that an afternoon spent writing Flux programs changes how you think about programming itself. And that’s worth far more than another JavaScript framework.

Full Code (Version 0.1)

Prerequisites:
- Go Compiler is installed

Steps:
1. Create a folder named flux: mkdir flux
2. Change to this directory: cd flux
3. Call go mod init
4. Compile Flux with go build -o flux flux.go
5. Run it: ./flux help 





package main

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "strings"
)

/*

FLUX PROGRAMMING LANGUAGE - COMPLETE REFERENCE GUIDE

## OVERVIEW

Flux is a minimal, stack-based esoteric programming language that achieves
Turing completeness with only 9 core operations. It features a single
accumulator register and an unbounded stack, making it capable of universal
computation despite its extreme minimalism.

The language is designed to demonstrate that complex computational behavior
can emerge from simple, well-chosen primitives. Every operation in Flux has
a clear, unambiguous meaning, and there are no hidden mechanisms or implicit
behaviors.

## LANGUAGE PHILOSOPHY

Flux adheres to these core design principles:

1. Minimalism - Include only operations that are absolutely essential for
   Turing completeness. Every operation must serve a distinct purpose.
2. Composability - Complex algorithms and data structures should emerge
   naturally from composing simple primitive operations.
3. Explicitness - All state changes must be explicit. There are no side
   effects, hidden state, or implicit conversions.
4. Stack-based architecture - The stack provides a natural, efficient model
   for expression evaluation and temporary storage without named variables.
5. Educational value - The language should be simple enough to understand
   completely, yet powerful enough to demonstrate fundamental concepts of
   computation and compiler design.

## COMPUTATIONAL MODEL

Flux programs operate on two primary data structures:

ACCUMULATOR:
- A single integer register that serves as the central workspace
- All arithmetic operations modify the accumulator
- The accumulator value determines loop execution
- Starts at zero when program execution begins
- Can hold any integer value (implementation dependent range)

STACK:
- An unbounded Last-In-First-Out (LIFO) data structure
- Stores integer values
- Provides the primary means of data persistence between operations
- Can grow without limit (constrained only by available memory)
- Popping from an empty stack yields zero (graceful degradation)

## OPERATION REFERENCE

Flux has exactly 9 operations:

ARITHMETIC OPERATIONS:
+    Increment accumulator
     Effect: accumulator = accumulator + 1
     Example: If acc=5, then after '+' acc=6

-    Decrement accumulator
     Effect: accumulator = accumulator - 1
     Example: If acc=5, then after '-' acc=4

STACK OPERATIONS:
*    Push accumulator to stack
     Effect: stack.push(accumulator)
     The accumulator value is copied to the top of the stack
     The accumulator itself remains unchanged
     Example: If acc=5, after '*' the stack has 5 on top and acc is still 5

/    Pop stack to accumulator
     Effect: accumulator = stack.pop()
     If stack is empty, accumulator becomes 0
     The value is removed from the stack
     Example: If stack=[3,7] (7 on top) and acc=5, after '/' acc=7 and stack=[3]

CONTROL FLOW OPERATIONS:
[    Begin while loop
     If accumulator == 0, jump forward past the matching ']'
     If accumulator != 0, continue execution into the loop body
     The loop condition is checked only at the '[', not continuously
     Example: If acc=0, execution jumps past the loop
     If acc=5, execution enters the loop

]    End while loop
     If accumulator != 0, jump backward to the matching '['
     If accumulator == 0, continue execution past the loop
     This creates a while-loop structure that continues as long as acc != 0
     Example: If acc=3, jump back to '['
     If acc=0, exit loop

INPUT/OUTPUT OPERATIONS:
.    Output accumulator as ASCII character
     Prints the character corresponding to (accumulator mod 256)
     Example: If acc=65, outputs 'A'
     If acc=72, outputs 'H'

,    Input one character
     Reads a single character from input stream
     Sets accumulator to the ASCII value of that character
     On EOF (end of file), sets accumulator to 0
     Example: If user types 'A', acc becomes 65

#    Output accumulator as decimal number
     Prints the numeric value of the accumulator
     This is an extension for practical debugging and numeric output
     Example: If acc=42, outputs "42"

WHITESPACE AND COMMENTS:
- Spaces, tabs, newlines, and carriage returns are ignored
- Any character that is not one of the 9 operations is treated as a comment
- This allows for readable, documented Flux code

END OF REFERENCE GUIDE
*/

// OpCode represents bytecode instruction types
type OpCode byte

const (
    OpInc    OpCode = iota // + : Increment accumulator
    OpDec                  // - : Decrement accumulator
    OpPush                 // * : Push accumulator to stack
    OpPop                  // / : Pop stack to accumulator
    OpLoop                 // [ : Begin loop
    OpEnd                  // ] : End loop
    OpOut                  // . : Output as ASCII
    OpIn                   // , : Input character
    OpOutNum               // # : Output as number
)

// Instruction represents a single bytecode instruction with optional argument
type Instruction struct {
    Op  OpCode // The operation to perform
    Arg int    // Argument (used for loop jump addresses)
}

// Compiler transforms Flux source code into executable bytecode
type Compiler struct {
    source       []byte        // Source code as byte array
    instructions []Instruction // Generated bytecode instructions
    loopStack    []int         // Stack of loop start positions for bracket matching
    position     int           // Current position in source (for error reporting)
}

// NewCompiler creates a new compiler instance with the given source code
func NewCompiler(source string) *Compiler {
    return &Compiler{
        source:       []byte(source),
        instructions: make([]Instruction, 0, len(source)), // Pre-allocate for efficiency
        loopStack:    make([]int, 0, 16),                  // Pre-allocate small loop stack
        position:     0,
    }
}

// Compile performs the complete compilation pipeline:
// 1. Lexical analysis (tokenization)
// 2. Syntax analysis (bracket matching validation)
// 3. Code generation (bytecode emission)
// Returns the compiled instructions or an error
func (c *Compiler) Compile() ([]Instruction, error) {
    // Single-pass compilation: scan source left to right
    for c.position = 0; c.position < len(c.source); c.position++ {
        char := c.source[c.position]

        switch char {
        case '+':
            // Increment operation: accumulator += 1
            c.emit(OpInc, 0)

        case '-':
            // Decrement operation: accumulator -= 1
            c.emit(OpDec, 0)

        case '*':
            // Push operation: stack.push(accumulator)
            c.emit(OpPush, 0)

        case '/':
            // Pop operation: accumulator = stack.pop()
            c.emit(OpPop, 0)

        case '[':
            // Loop start: if acc == 0, jump past matching ]
            loopStart := len(c.instructions)
            c.emit(OpLoop, 0) // Emit with placeholder jump address
            c.loopStack = append(c.loopStack, loopStart)

        case ']':
            // Loop end: if acc != 0, jump back to matching [
            if len(c.loopStack) == 0 {
                return nil, fmt.Errorf("compilation error: unmatched ']' at position %d", c.position)
            }

            // Pop the matching loop start position
            loopStart := c.loopStack[len(c.loopStack)-1]
            c.loopStack = c.loopStack[:len(c.loopStack)-1]

            loopEnd := len(c.instructions)

            // Emit end instruction that jumps back to loop start
            c.emit(OpEnd, loopStart)

            // Patch the loop start instruction with the end address
            // This allows O(1) jump when condition is false
            c.instructions[loopStart].Arg = loopEnd

        case '.':
            // Output operation: print character
            c.emit(OpOut, 0)

        case ',':
            // Input operation: read character
            c.emit(OpIn, 0)

        case '#':
            // Numeric output operation: print number
            c.emit(OpOutNum, 0)

        case ' ', '\t', '\n', '\r':
            // Whitespace: ignored

        default:
            // Any other character: treated as comment, ignored
            // This allows for readable, documented code
        }
    }

    // Validate that all loops are properly closed
    if len(c.loopStack) > 0 {
        return nil, fmt.Errorf("compilation error: %d unmatched '[' bracket(s) in source code", len(c.loopStack))
    }

    return c.instructions, nil
}

// emit appends a new instruction to the bytecode sequence
func (c *Compiler) emit(op OpCode, arg int) {
    c.instructions = append(c.instructions, Instruction{Op: op, Arg: arg})
}

// VM represents the Flux virtual machine that executes compiled bytecode
type VM struct {
    instructions []Instruction // The bytecode program to execute
    accumulator  int           // The single accumulator register
    stack        []int         // The unbounded stack
    pc           int           // Program counter (instruction pointer)
    input        io.Reader     // Input stream for ',' operation
    output       io.Writer     // Output stream for '.' and '#' operations
}

// NewVM creates a new virtual machine with the given bytecode and I/O streams
func NewVM(instructions []Instruction, input io.Reader, output io.Writer) *VM {
    return &VM{
        instructions: instructions,
        accumulator:  0,                       // Start with accumulator at 0
        stack:        make([]int, 0, 256),     // Pre-allocate stack with reasonable capacity
        pc:           0,                       // Start at first instruction
        input:        input,                   // Input stream
        output:       output,                  // Output stream
    }
}

// Run executes the bytecode program from start to finish
// Returns an error if any runtime error occurs (typically I/O errors)
func (vm *VM) Run() error {
    // Main execution loop: fetch, decode, execute
    for vm.pc < len(vm.instructions) {
        inst := vm.instructions[vm.pc]

        // Decode and execute the instruction
        switch inst.Op {
        case OpInc:
            // Increment: acc = acc + 1
            vm.accumulator++

        case OpDec:
            // Decrement: acc = acc - 1
            vm.accumulator--

        case OpPush:
            // Push: stack.push(acc)
            vm.stack = append(vm.stack, vm.accumulator)

        case OpPop:
            // Pop: acc = stack.pop() or 0 if empty
            if len(vm.stack) > 0 {
                vm.accumulator = vm.stack[len(vm.stack)-1]
                vm.stack = vm.stack[:len(vm.stack)-1]
            } else {
                // Graceful degradation: popping empty stack yields 0
                vm.accumulator = 0
            }

        case OpLoop:
            // Loop start: if acc == 0, jump past loop
            if vm.accumulator == 0 {
                vm.pc = inst.Arg // Jump to instruction after matching ]
            }

        case OpEnd:
            // Loop end: if acc != 0, jump back to loop start
            if vm.accumulator != 0 {
                vm.pc = inst.Arg // Jump back to matching [
            }

        case OpOut:
            // Output as ASCII character
            // Use modulo 256 to handle values outside byte range
            char := byte(vm.accumulator % 256)
            _, err := vm.output.Write([]byte{char})
            if err != nil {
                return fmt.Errorf("output error: %v", err)
            }

        case OpIn:
            // Input one character
            buf := make([]byte, 1)
            n, err := vm.input.Read(buf)
            if err != nil && err != io.EOF {
                return fmt.Errorf("input error: %v", err)
            }
            if err == io.EOF || n == 0 {
                // EOF: set accumulator to 0
                vm.accumulator = 0
            } else {
                // Store ASCII value in accumulator
                vm.accumulator = int(buf[0])
            }

        case OpOutNum:
            // Output as decimal number
            _, err := fmt.Fprintf(vm.output, "%d", vm.accumulator)
            if err != nil {
                return fmt.Errorf("output error: %v", err)
            }

        default:
            // This should never happen if compiler is correct
            return fmt.Errorf("internal error: invalid opcode %d at position %d", inst.Op, vm.pc)
        }

        // Advance to next instruction
        vm.pc++
    }

    return nil
}


// Main function: Entry point for the Flux compiler
func main() {
    // If no arguments, show help
    if len(os.Args) < 2 {
        showHelp()
        return
    }

    command := os.Args[1]

    // Dispatch to appropriate command handler
    switch command {
    case "help", "-h", "--help":
        showHelp()

    case "guide":
        showGuide()

    case "reference", "ref":
        showReference()

    case "examples":
        showExamples()

    case "demo":
        runDemo()

    case "run":
        if len(os.Args) < 3 {
            fmt.Println("Error: Please specify a file to run")
            fmt.Println("Usage: flux run <file>")
            return
        }
        runFile(os.Args[2])

    case "compile":
        if len(os.Args) < 3 {
            fmt.Println("Error: Please specify a file to compile")
            fmt.Println("Usage: flux compile <file>")
            return
        }
        compileFile(os.Args[2])

    case "interactive", "repl":
        runInteractive()

    default:
        fmt.Printf("Unknown command: %s\n", command)
        fmt.Println("Run 'flux help' for usage information")
    }
}

// showHelp displays the main help message
func showHelp() {
    fmt.Println(`
                   FLUX PROGRAMMING LANGUAGE v1.0                          
              Minimal & Stack Based &  Turing Complete 

USAGE
    flux <command> [arguments]

COMMANDS
    help              Show this help message
    guide             Show beginner's tutorial and user guide
    reference         Show complete language reference (also: ref)
    examples          Show example programs with explanations
    demo              Run interactive demonstration programs
    run <file>        Compile and execute a Flux program
    compile <file>    Compile program and show bytecode
    interactive       Start interactive REPL (also: repl)

QUICK REFERENCE
    +    Increment accumulator       *    Push to stack
    -    Decrement accumulator       /    Pop from stack
    [    Start loop (if acc != 0)   ]    End loop (jump if acc != 0)
    .    Output as ASCII             ,    Input character
            #    Output as number

GETTING STARTED
    1. Run 'flux guide' for a beginner-friendly tutorial
    2. Try 'flux demo' to see example programs in action
    3. View 'flux examples' for commented program samples
    4. Read 'flux reference' for complete documentation

EXAMPLE
    Create a file 'hello.flux':
        ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++.
        ++++++++++++++++++++++++++++++++.

    Run it:
        flux run hello.flux

    Output:
        Hi

For complete documentation, visit the sections above.
`)
}

// showGuide displays the beginner's tutorial
func showGuide() {
    fmt.Println(`
                     FLUX BEGINNER'S GUIDE                                 

## INTRODUCTION

Welcome to Flux! This comprehensive guide will teach you programming in Flux
from absolute scratch. Flux is intentionally minimal - you'll learn everything
you need in under an hour.

[Complete beginner's guide content would continue here...]
`)
}

// showReference displays the complete language reference
func showReference() {
    fmt.Println(`
                   FLUX COMPLETE LANGUAGE REFERENCE                       

[Complete reference documentation would continue here...]
`)
}

// showExamples displays example programs
func showExamples() {
    fmt.Println(`
                       FLUX EXAMPLE PROGRAMS                               

[Complete examples would continue here...]
`)
}

// runDemo runs interactive demonstrations
func runDemo() {
    demos := []struct {
        name string
        code string
        desc string
    }{
        {
            "Output Character 'A'",
            strings.Repeat("+", 65) + ".",
            "Builds ASCII value 65 and outputs 'A'",
        },
        {
            "Output Number 42",
            strings.Repeat("+", 42) + "#",
            "Builds value 42 and outputs it as a number",
        },
        {
            "Count Down from 5",
            "+++++[#-]",
            "Loops 5 times, printing and decrementing",
        },
        {
            "Simple Stack Test",
            "+++*++*/#/#",
            "Pushes 3 and 2, then pops and prints both",
        },
        {
            "Hello (short)",
            strings.Repeat("+", 72) + "." + strings.Repeat("+", 29) + "." + strings.Repeat("+", 7) + ".." + "+++.",
            "Prints 'Hello' using ASCII values",
        },
    }

    fmt.Println("")
    fmt.Println("                       FLUX INTERACTIVE DEMONSTRATION                      ")
    fmt.Println("")
    for i, demo := range demos {
        fmt.Printf("")
        fmt.Printf("Demo %d: %s\n", i+1, demo.name)
        fmt.Printf("Description: %s\n", demo.desc)
        fmt.Printf("Code: %s\n", demo.code)
        fmt.Printf("Output: ")
        execute(demo.code)
        fmt.Printf("\n\n")
    }

    fmt.Println("Try writing your own programs using these patterns!")
}

// runFile compiles and executes a Flux source file
func runFile(filename string) {
    data, err := os.ReadFile(filename)
    if err != nil {
        fmt.Printf("Error reading file '%s': %v\n", filename, err)
        return
    }

    fmt.Printf("Executing %s...\n", filename)
    fmt.Println("")
    execute(string(data))
    fmt.Println()
}

// compileFile compiles a Flux source file and displays the bytecode
func compileFile(filename string) {
    data, err := os.ReadFile(filename)
    if err != nil {
        fmt.Printf("Error reading file '%s': %v\n", filename, err)
        return
    }

    compiler := NewCompiler(string(data))
    instructions, err := compiler.Compile()
    if err != nil {
        fmt.Printf("Compilation error: %v\n", err)
        return
    }

    fmt.Printf("Successfully compiled %s\n", filename)
    fmt.Printf("Total instructions: %d\n\n", len(instructions))
    fmt.Println("Bytecode Listing:")
    fmt.Println("")
    fmt.Println("Addr  Opcode    Argument")
    fmt.Println("")

    opNames := map[OpCode]string{
        OpInc:    "INC",
        OpDec:    "DEC",
        OpPush:   "PUSH",
        OpPop:    "POP",
        OpLoop:   "LOOP",
        OpEnd:    "END",
        OpOut:    "OUT",
        OpIn:     "IN",
        OpOutNum: "OUTNUM",
    }

    for i, inst := range instructions {
        opName := opNames[inst.Op]
        if inst.Op == OpLoop || inst.Op == OpEnd {
            fmt.Printf("%04d  %-8s  รข†’ %d\n", i, opName, inst.Arg)
        } else {
            fmt.Printf("%04d  %s\n", i, opName)
        }
    }
    fmt.Println("")
}

// runInteractive starts an interactive REPL
func runInteractive() {
    fmt.Println("")
    fmt.Println("                   FLUX INTERACTIVE MODE (REPL)                            ")
    fmt.Println("")
    fmt.Println("Enter Flux code and press Enter to execute.")
    fmt.Println("Type 'exit' or 'quit' to leave, 'help' for quick reference.\n")

    scanner := bufio.NewScanner(os.Stdin)

    for {
        fmt.Print("flux> ")
        if !scanner.Scan() {
            break
        }

        line := strings.TrimSpace(scanner.Text())

        if line == "" {
            continue
        }

        if line == "exit" || line == "quit" {
            fmt.Println("Goodbye!")
            break
        }

        if line == "help" {
            fmt.Println("Quick Reference:")
            fmt.Println("  +  Increment    *  Push      [  Loop start")
            fmt.Println("  -  Decrement    /  Pop       ]  Loop end")
            fmt.Println("  .  Output char  ,  Input     #  Output number")
            continue
        }

        execute(line)
        fmt.Println()
    }
}

// execute compiles and runs Flux source code
func execute(source string) {
    compiler := NewCompiler(source)
    instructions, err := compiler.Compile()
    if err != nil {
        fmt.Printf("Compilation error: %v\n", err)
        return
    }

    vm := NewVM(instructions, os.Stdin, os.Stdout)
    err = vm.Run()
    if err != nil {
        fmt.Printf("\nRuntime error: %v\n", err)
    }
}