Tuesday, June 30, 2026

BUILDING A PROFESSIONAL LOCAL LLM CHATBOT WITH ADVANCED HARDWARE DETECTION AND RAG CAPABILITIES


 


INTRODUCTION AND CONCEPTUAL OVERVIEW

Creating a production-ready local Large Language Model chatbot requires careful architectural planning that balances performance, usability, and flexibility. This comprehensive guide walks through building a sophisticated system that runs entirely on local hardware, eliminating cloud dependencies while providing enterprise-grade features. The chatbot we will construct supports multiple GPU architectures including NVIDIA CUDA and Apple Metal Performance Shaders, offers granular control over inference parameters, implements Retrieval Augmented Generation for document processing, and presents users with an intuitive graphical interface.

The fundamental challenge in building such a system lies in bridging the gap between complex machine learning infrastructure and user-friendly interaction. Modern LLMs require careful memory management, optimal hardware utilization, and sophisticated prompt engineering. Our solution addresses these challenges through a modular architecture that separates concerns while maintaining tight integration where performance matters most.

ARCHITECTURAL FOUNDATION AND TECHNOLOGY STACK

The system architecture follows a clean separation between the backend inference engine, the document processing pipeline, the hardware abstraction layer, and the frontend user interface. At the core sits llama-cpp-python, which provides Python bindings to the highly optimized llama.cpp library. This choice enables us to run quantized models efficiently across diverse hardware configurations while maintaining a consistent API.

For the user interface, we employ Gradio, a Python library specifically designed for creating machine learning interfaces. Gradio excels at rapid prototyping while producing production-quality interfaces with minimal code. It handles real-time updates, file uploads, and complex state management automatically, allowing us to focus on functionality rather than low-level UI concerns.

The document processing pipeline leverages PyMuPDF for PDF extraction, python-docx for Word documents, BeautifulSoup4 for HTML parsing, and markdown for Markdown files. These libraries provide robust text extraction while preserving document structure. For the RAG implementation, we use sentence-transformers to generate embeddings and FAISS for efficient similarity search across large document collections.

Hardware detection requires platform-specific libraries. We utilize PyTorch to detect CUDA availability and capabilities, along with direct system calls to query GPU memory and compute capabilities. For Apple Silicon, we check for Metal Performance Shaders support through PyTorch's MPS backend. The system dynamically adjusts its configuration based on detected hardware, ensuring optimal performance without manual intervention.

HARDWARE DETECTION AND DYNAMIC CONFIGURATION

The hardware detection subsystem forms the foundation of our adaptive inference system. Upon startup, the application probes the system to identify available compute resources, including CPU specifications, RAM capacity, GPU presence and type, VRAM availability, and supported instruction sets. This information drives automatic configuration while remaining user-adjustable for advanced scenarios.

The detection process begins with CPU enumeration. We determine the number of physical and logical cores, cache sizes, and supported SIMD instructions. This information helps optimize thread allocation for CPU-bound operations and determines fallback strategies when GPU acceleration is unavailable.

import psutil
import platform
import torch
import subprocess
import os

class HardwareDetector:
    def __init__(self):
        self.cpu_info = {}
        self.gpu_info = {}
        self.memory_info = {}
        self.detected = False
        
    def detect_cpu(self):
        """Detect CPU specifications and capabilities"""
        self.cpu_info['physical_cores'] = psutil.cpu_count(logical=False)
        self.cpu_info['logical_cores'] = psutil.cpu_count(logical=True)
        self.cpu_info['architecture'] = platform.machine()
        self.cpu_info['processor'] = platform.processor()
        
        # Detect CPU frequency
        try:
            freq = psutil.cpu_freq()
            if freq:
                self.cpu_info['max_frequency_mhz'] = freq.max
                self.cpu_info['current_frequency_mhz'] = freq.current
        except Exception as e:
            self.cpu_info['frequency_error'] = str(e)
            
        return self.cpu_info

GPU detection requires platform-specific approaches. For NVIDIA GPUs, we query CUDA availability through PyTorch and extract detailed device properties including compute capability, total memory, and multiprocessor count. The compute capability determines which quantization formats and optimization techniques are available.

    def detect_nvidia_gpu(self):
        """Detect NVIDIA GPU specifications using CUDA"""
        if not torch.cuda.is_available():
            return None
            
        gpu_list = []
        for i in range(torch.cuda.device_count()):
            device_props = torch.cuda.get_device_properties(i)
            gpu_info = {
                'index': i,
                'name': device_props.name,
                'compute_capability': f"{device_props.major}.{device_props.minor}",
                'total_memory_gb': device_props.total_memory / (1024**3),
                'multiprocessor_count': device_props.multi_processor_count,
                'max_threads_per_block': device_props.max_threads_per_block,
                'type': 'NVIDIA_CUDA'
            }
            gpu_list.append(gpu_info)
            
        return gpu_list

Apple Silicon detection follows a different path. We check for MPS availability through PyTorch's MPS backend and query system information to determine the specific chip variant. Apple's unified memory architecture requires special consideration since VRAM and system RAM share the same physical memory pool.

    def detect_apple_gpu(self):
        """Detect Apple Silicon GPU (Metal Performance Shaders)"""
        if not torch.backends.mps.is_available():
            return None
            
        # MPS is available on Apple Silicon
        gpu_info = {
            'index': 0,
            'name': 'Apple Silicon GPU',
            'type': 'APPLE_MPS',
            'backend': 'Metal Performance Shaders'
        }
        
        # Try to get more specific chip information
        try:
            if platform.system() == 'Darwin':
                result = subprocess.run(['sysctl', '-n', 'machdep.cpu.brand_string'], 
                                      capture_output=True, text=True)
                if result.returncode == 0:
                    gpu_info['chip'] = result.stdout.strip()
        except Exception as e:
            gpu_info['detection_note'] = f"Could not determine specific chip: {e}"
            
        return [gpu_info]

Memory detection encompasses both system RAM and GPU VRAM. The system monitors available memory continuously to prevent out-of-memory errors during model loading and inference. We implement conservative memory budgeting that reserves headroom for operating system and other applications.

    def detect_memory(self):
        """Detect system and GPU memory specifications"""
        # System RAM
        vm = psutil.virtual_memory()
        self.memory_info['system_total_gb'] = vm.total / (1024**3)
        self.memory_info['system_available_gb'] = vm.available / (1024**3)
        self.memory_info['system_used_percent'] = vm.percent
        
        # GPU Memory
        self.memory_info['gpu_memory'] = []
        
        if torch.cuda.is_available():
            for i in range(torch.cuda.device_count()):
                gpu_mem = {
                    'device': i,
                    'total_gb': torch.cuda.get_device_properties(i).total_memory / (1024**3),
                    'reserved_gb': torch.cuda.memory_reserved(i) / (1024**3),
                    'allocated_gb': torch.cuda.memory_allocated(i) / (1024**3)
                }
                self.memory_info['gpu_memory'].append(gpu_mem)
                
        return self.memory_info

The complete hardware detection orchestrates these individual components into a comprehensive system profile. This profile informs default parameter selection and enables intelligent warnings when users attempt configurations that exceed available resources.

    def detect_all(self):
        """Perform complete hardware detection"""
        self.detect_cpu()
        
        # Detect GPUs
        nvidia_gpus = self.detect_nvidia_gpu()
        apple_gpus = self.detect_apple_gpu()
        
        if nvidia_gpus:
            self.gpu_info['devices'] = nvidia_gpus
            self.gpu_info['primary_type'] = 'NVIDIA_CUDA'
        elif apple_gpus:
            self.gpu_info['devices'] = apple_gpus
            self.gpu_info['primary_type'] = 'APPLE_MPS'
        else:
            self.gpu_info['devices'] = []
            self.gpu_info['primary_type'] = 'CPU_ONLY'
            
        self.detect_memory()
        self.detected = True
        
        return {
            'cpu': self.cpu_info,
            'gpu': self.gpu_info,
            'memory': self.memory_info
        }

MODEL MANAGEMENT AND STORAGE ARCHITECTURE

Model management encompasses discovery, loading, validation, and lifecycle management of LLM files stored on local disk. Users need the ability to browse their filesystem for compatible model files, load models with custom parameters, monitor resource consumption, and unload models to free resources. The system maintains a registry of available models and their metadata.

Model files for llama.cpp typically use the GGUF format, which supports various quantization levels from 2-bit to 16-bit precision. Each quantization level represents a tradeoff between model size, inference speed, and output quality. Our system automatically detects the quantization level from the filename and suggests appropriate hardware configurations.

import os
import json
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime

class ModelManager:
    def __init__(self, models_directory: str = "./models"):
        self.models_directory = Path(models_directory)
        self.models_directory.mkdir(parents=True, exist_ok=True)
        self.registry_file = self.models_directory / "model_registry.json"
        self.loaded_models = {}
        self.model_registry = self._load_registry()
        
    def _load_registry(self) -> Dict:
        """Load the model registry from disk"""
        if self.registry_file.exists():
            try:
                with open(self.registry_file, 'r') as f:
                    return json.load(f)
            except Exception as e:
                print(f"Error loading registry: {e}")
                return {}
        return {}

The model scanning functionality traverses the models directory recursively, identifying GGUF files and extracting metadata. We parse filenames to extract model family, parameter count, and quantization information. This metadata populates the model selection interface and helps users make informed choices.

    def scan_models(self) -> List[Dict]:
        """Scan the models directory for available GGUF files"""
        models = []
        
        for root, dirs, files in os.walk(self.models_directory):
            for file in files:
                if file.endswith('.gguf'):
                    full_path = Path(root) / file
                    model_info = self._extract_model_info(full_path)
                    models.append(model_info)
                    
        # Update registry
        for model in models:
            model_id = model['id']
            if model_id not in self.model_registry:
                self.model_registry[model_id] = {
                    'first_seen': datetime.now().isoformat(),
                    'load_count': 0
                }
            self.model_registry[model_id].update({
                'last_seen': datetime.now().isoformat(),
                'path': model['path'],
                'size_gb': model['size_gb']
            })
            
        self._save_registry()
        return models

Model information extraction parses the filename and queries file properties to build a comprehensive model descriptor. The descriptor includes the model's path, size, estimated memory requirements, and inferred capabilities.

    def _extract_model_info(self, model_path: Path) -> Dict:
        """Extract information from model file"""
        file_size = model_path.stat().st_size
        filename = model_path.stem
        
        # Parse common naming patterns
        # Example: llama-2-7b-chat.Q4_K_M.gguf
        parts = filename.lower().split('.')
        base_name = parts[0] if parts else filename
        quant = parts[1] if len(parts) > 1 else 'unknown'
        
        model_info = {
            'id': filename,
            'name': base_name,
            'path': str(model_path),
            'filename': model_path.name,
            'size_bytes': file_size,
            'size_gb': round(file_size / (1024**3), 2),
            'quantization': quant,
            'format': 'GGUF'
        }
        
        # Estimate parameter count from filename
        if '7b' in base_name:
            model_info['estimated_parameters'] = '7B'
        elif '13b' in base_name:
            model_info['estimated_parameters'] = '13B'
        elif '70b' in base_name:
            model_info['estimated_parameters'] = '70B'
        else:
            model_info['estimated_parameters'] = 'Unknown'
            
        return model_info

The registry persistence mechanism saves model metadata to disk, enabling the system to remember user preferences and usage statistics across sessions. This historical data can inform recommendations and optimize default settings.

    def _save_registry(self):
        """Save the model registry to disk"""
        try:
            with open(self.registry_file, 'w') as f:
                json.dump(self.model_registry, f, indent=2)
        except Exception as e:
            print(f"Error saving registry: {e}")

Model deletion requires careful handling to prevent data loss. The system implements a confirmation mechanism and updates the registry to reflect the removal. We also check if the model is currently loaded and prevent deletion of active models.

    def delete_model(self, model_id: str) -> bool:
        """Delete a model file and update registry"""
        if model_id in self.loaded_models:
            raise ValueError(f"Cannot delete model {model_id}: currently loaded")
            
        if model_id not in self.model_registry:
            raise ValueError(f"Model {model_id} not found in registry")
            
        model_path = Path(self.model_registry[model_id]['path'])
        
        try:
            if model_path.exists():
                model_path.unlink()
                
            # Remove from registry
            del self.model_registry[model_id]
            self._save_registry()
            
            return True
        except Exception as e:
            raise Exception(f"Error deleting model: {e}")

INFERENCE ENGINE AND PARAMETER MANAGEMENT

The inference engine wraps llama-cpp-python with a sophisticated parameter management system. Users can control every aspect of text generation including temperature, top-p sampling, top-k sampling, repetition penalty, context window size, batch size, thread count, and GPU layer offloading. Each parameter significantly impacts output quality, generation speed, and resource consumption.

Temperature controls randomness in token selection. Lower values produce more deterministic outputs while higher values increase creativity and variation. The valid range spans from zero to two, with typical values between 0.7 and 0.9 for conversational applications.

Top-p sampling, also known as nucleus sampling, considers only the smallest set of tokens whose cumulative probability exceeds the threshold. This technique produces more coherent outputs than pure temperature sampling by eliminating low-probability tail tokens. Values between 0.9 and 0.95 work well for most applications.

Top-k sampling limits consideration to the k most probable tokens at each step. This provides a simpler alternative to top-p sampling with more predictable behavior. Typical values range from 40 to 100.

Repetition penalty discourages the model from repeating tokens or phrases. Values above 1.0 penalize repetition, with 1.1 to 1.3 providing good results for most models. Excessive penalty values can degrade output quality by forcing unnatural word choices.

from llama_cpp import Llama
from typing import Optional, Iterator
import threading

class InferenceEngine:
    def __init__(self, hardware_detector: HardwareDetector):
        self.hardware = hardware_detector
        self.model = None
        self.model_path = None
        self.generation_lock = threading.Lock()
        
        # Default parameters
        self.default_params = {
            'temperature': 0.8,
            'top_p': 0.95,
            'top_k': 40,
            'repeat_penalty': 1.1,
            'max_tokens': 512,
            'n_ctx': 2048,
            'n_batch': 512,
            'n_threads': None,  # Auto-detect
            'n_gpu_layers': 0,  # CPU-only by default
            'verbose': False
        }

GPU layer offloading represents one of the most impactful performance optimizations. Modern LLMs consist of dozens of transformer layers that can be distributed between CPU and GPU. By offloading layers to the GPU, we accelerate inference while managing memory consumption. The system calculates optimal layer distribution based on available VRAM and model size.

    def calculate_optimal_gpu_layers(self, model_size_gb: float, 
                                    gpu_memory_gb: float,
                                    offload_percentage: float = 100.0) -> int:
        """Calculate optimal number of layers to offload to GPU"""
        if gpu_memory_gb <= 0 or offload_percentage <= 0:
            return 0
            
        # Reserve 2GB for system and overhead
        available_memory = max(0, gpu_memory_gb - 2.0)
        
        # Estimate layers based on model size and available memory
        # This is a heuristic - actual memory usage varies by model architecture
        estimated_total_layers = 32  # Common for 7B models
        
        if model_size_gb > 10:
            estimated_total_layers = 40  # Larger models
        elif model_size_gb > 20:
            estimated_total_layers = 60  # Very large models
            
        # Calculate memory per layer
        memory_per_layer = model_size_gb / estimated_total_layers
        
        # Calculate how many layers fit in available memory
        max_layers = int(available_memory / memory_per_layer)
        
        # Apply user-specified percentage
        target_layers = int(max_layers * (offload_percentage / 100.0))
        
        return max(0, min(target_layers, estimated_total_layers))

Model loading initializes the inference engine with user-specified parameters. We validate parameters against hardware constraints and provide warnings when configurations may cause issues. The loading process can take several seconds for large models, so we implement progress feedback.

    def load_model(self, model_path: str, **kwargs) -> bool:
        """Load a model with specified parameters"""
        # Merge user parameters with defaults
        params = self.default_params.copy()
        params.update(kwargs)
        
        # Auto-detect thread count if not specified
        if params['n_threads'] is None:
            params['n_threads'] = self.hardware.cpu_info.get('physical_cores', 4)
            
        # Validate GPU layers against available hardware
        if params['n_gpu_layers'] > 0:
            if self.hardware.gpu_info['primary_type'] == 'CPU_ONLY':
                print("Warning: GPU layers requested but no GPU detected. Using CPU only.")
                params['n_gpu_layers'] = 0
                
        try:
            # Unload existing model if present
            if self.model is not None:
                del self.model
                self.model = None
                
            # Load new model
            self.model = Llama(
                model_path=model_path,
                n_ctx=params['n_ctx'],
                n_batch=params['n_batch'],
                n_threads=params['n_threads'],
                n_gpu_layers=params['n_gpu_layers'],
                verbose=params['verbose']
            )
            
            self.model_path = model_path
            return True
            
        except Exception as e:
            print(f"Error loading model: {e}")
            return False

Text generation implements both synchronous and streaming modes. Streaming mode yields tokens as they are generated, enabling real-time display in the user interface. This dramatically improves perceived responsiveness for long outputs.

    def generate(self, prompt: str, stream: bool = False, **kwargs) -> Optional[str]:
        """Generate text from prompt"""
        if self.model is None:
            raise ValueError("No model loaded")
            
        # Merge generation parameters
        gen_params = {
            'temperature': kwargs.get('temperature', self.default_params['temperature']),
            'top_p': kwargs.get('top_p', self.default_params['top_p']),
            'top_k': kwargs.get('top_k', self.default_params['top_k']),
            'repeat_penalty': kwargs.get('repeat_penalty', self.default_params['repeat_penalty']),
            'max_tokens': kwargs.get('max_tokens', self.default_params['max_tokens']),
            'stream': stream
        }
        
        try:
            with self.generation_lock:
                output = self.model(prompt, **gen_params)
                
                if stream:
                    return output  # Returns iterator
                else:
                    return output['choices'][0]['text']
                    
        except Exception as e:
            print(f"Error during generation: {e}")
            return None

The streaming generator wraps the model's token iterator to provide clean iteration semantics. Each yielded token updates the interface immediately, creating a typewriter effect that engages users during generation.

    def generate_stream(self, prompt: str, **kwargs) -> Iterator[str]:
        """Generate text with streaming output"""
        if self.model is None:
            raise ValueError("No model loaded")
            
        gen_params = {
            'temperature': kwargs.get('temperature', self.default_params['temperature']),
            'top_p': kwargs.get('top_p', self.default_params['top_p']),
            'top_k': kwargs.get('top_k', self.default_params['top_k']),
            'repeat_penalty': kwargs.get('repeat_penalty', self.default_params['repeat_penalty']),
            'max_tokens': kwargs.get('max_tokens', self.default_params['max_tokens']),
            'stream': True
        }
        
        try:
            with self.generation_lock:
                for output in self.model(prompt, **gen_params):
                    token = output['choices'][0]['text']
                    yield token
        except Exception as e:
            yield f"\n\nError during generation: {e}"

PROMPT TEMPLATE SYSTEM

Prompt templates structure the interaction between user messages, system instructions, and model responses. Different models expect different formatting conventions. LLaMA models use specific tokens to delineate roles, while other models may use different conventions. Our template system abstracts these differences, allowing users to switch models without reformatting their prompts.

The template manager stores predefined templates for popular model families and allows users to create custom templates. Each template defines how to format system messages, user messages, assistant responses, and conversation history.

class PromptTemplateManager:
    def __init__(self):
        self.templates = {
            'llama2-chat': {
                'name': 'LLaMA 2 Chat',
                'system_prefix': '[INST] <<SYS>>\n',
                'system_suffix': '\n<</SYS>>\n\n',
                'user_prefix': '',
                'user_suffix': ' [/INST] ',
                'assistant_prefix': '',
                'assistant_suffix': ' ',
                'bos_token': '<s>',
                'eos_token': '</s>'
            },
            'alpaca': {
                'name': 'Alpaca',
                'system_prefix': '',
                'system_suffix': '\n\n',
                'user_prefix': '### Instruction:\n',
                'user_suffix': '\n\n',
                'assistant_prefix': '### Response:\n',
                'assistant_suffix': '\n\n',
                'bos_token': '',
                'eos_token': ''
            },
            'chatml': {
                'name': 'ChatML',
                'system_prefix': '<|im_start|>system\n',
                'system_suffix': '<|im_end|>\n',
                'user_prefix': '<|im_start|>user\n',
                'user_suffix': '<|im_end|>\n',
                'assistant_prefix': '<|im_start|>assistant\n',
                'assistant_suffix': '<|im_end|>\n',
                'bos_token': '',
                'eos_token': ''
            }
        }
        self.custom_templates = {}

Template application combines the system message, conversation history, and current user input into a properly formatted prompt. The system maintains conversation context across multiple turns, enabling coherent multi-turn dialogues.

    def apply_template(self, template_name: str, system_message: str,
                      conversation_history: list, user_message: str) -> str:
        """Apply a template to format the complete prompt"""
        template = self.templates.get(template_name) or self.custom_templates.get(template_name)
        
        if not template:
            raise ValueError(f"Template '{template_name}' not found")
            
        # Start with BOS token if present
        prompt = template['bos_token']
        
        # Add system message if provided
        if system_message:
            prompt += template['system_prefix']
            prompt += system_message
            prompt += template['system_suffix']
            
        # Add conversation history
        for turn in conversation_history:
            if turn['role'] == 'user':
                prompt += template['user_prefix']
                prompt += turn['content']
                prompt += template['user_suffix']
            elif turn['role'] == 'assistant':
                prompt += template['assistant_prefix']
                prompt += turn['content']
                prompt += template['assistant_suffix']
                
        # Add current user message
        prompt += template['user_prefix']
        prompt += user_message
        prompt += template['user_suffix']
        
        # Add assistant prefix to prompt for response
        prompt += template['assistant_prefix']
        
        return prompt

Custom template creation empowers advanced users to define their own formatting conventions. The system validates template structure to ensure all required fields are present.

    def create_custom_template(self, name: str, template_dict: dict) -> bool:
        """Create a custom prompt template"""
        required_fields = ['system_prefix', 'system_suffix', 'user_prefix', 
                          'user_suffix', 'assistant_prefix', 'assistant_suffix']
        
        # Validate template structure
        for field in required_fields:
            if field not in template_dict:
                raise ValueError(f"Template missing required field: {field}")
                
        # Add optional fields with defaults
        if 'bos_token' not in template_dict:
            template_dict['bos_token'] = ''
        if 'eos_token' not in template_dict:
            template_dict['eos_token'] = ''
            
        template_dict['name'] = name
        self.custom_templates[name] = template_dict
        
        return True

DOCUMENT PROCESSING AND RAG IMPLEMENTATION

Retrieval Augmented Generation enhances LLM responses by grounding them in external documents. Users upload documents through the interface, the system extracts and chunks the text, generates embeddings for each chunk, stores embeddings in a vector database, and retrieves relevant chunks during inference to augment the prompt. This approach dramatically improves factual accuracy and enables the model to answer questions about specific documents.

The document processor handles multiple file formats through format-specific extractors. Each extractor normalizes the text while preserving important structure like headings and paragraphs.

import fitz  # PyMuPDF
from docx import Document as DocxDocument
from bs4 import BeautifulSoup
import markdown
from typing import List, Dict
import hashlib

class DocumentProcessor:
    def __init__(self):
        self.supported_formats = ['.pdf', '.docx', '.html', '.htm', '.md', '.txt']
        
    def process_document(self, file_path: str) -> Dict:
        """Process a document and extract text content"""
        file_path = Path(file_path)
        
        if not file_path.exists():
            raise FileNotFoundError(f"Document not found: {file_path}")
            
        extension = file_path.suffix.lower()
        
        if extension not in self.supported_formats:
            raise ValueError(f"Unsupported format: {extension}")
            
        # Extract text based on format
        if extension == '.pdf':
            text = self._extract_pdf(file_path)
        elif extension == '.docx':
            text = self._extract_docx(file_path)
        elif extension in ['.html', '.htm']:
            text = self._extract_html(file_path)
        elif extension == '.md':
            text = self._extract_markdown(file_path)
        else:  # .txt
            text = self._extract_text(file_path)
            
        # Generate document metadata
        doc_id = hashlib.md5(str(file_path).encode()).hexdigest()
        
        return {
            'id': doc_id,
            'path': str(file_path),
            'filename': file_path.name,
            'format': extension,
            'text': text,
            'length': len(text)
        }

PDF extraction uses PyMuPDF to iterate through pages and extract text while preserving layout information. The extractor handles multi-column layouts and embedded images with text.

    def _extract_pdf(self, file_path: Path) -> str:
        """Extract text from PDF using PyMuPDF"""
        text_parts = []
        
        try:
            doc = fitz.open(file_path)
            
            for page_num in range(len(doc)):
                page = doc[page_num]
                text = page.get_text()
                
                if text.strip():
                    text_parts.append(f"--- Page {page_num + 1} ---\n{text}")
                    
            doc.close()
            
        except Exception as e:
            raise Exception(f"Error extracting PDF: {e}")
            
        return "\n\n".join(text_parts)

Word document extraction leverages python-docx to access document structure. We extract paragraphs, tables, and headers while maintaining document flow.

    def _extract_docx(self, file_path: Path) -> str:
        """Extract text from Word document"""
        try:
            doc = DocxDocument(file_path)
            text_parts = []
            
            for paragraph in doc.paragraphs:
                if paragraph.text.strip():
                    text_parts.append(paragraph.text)
                    
            # Extract text from tables
            for table in doc.tables:
                for row in table.rows:
                    row_text = []
                    for cell in row.cells:
                        if cell.text.strip():
                            row_text.append(cell.text)
                    if row_text:
                        text_parts.append(" | ".join(row_text))
                        
            return "\n\n".join(text_parts)
            
        except Exception as e:
            raise Exception(f"Error extracting DOCX: {e}")

HTML extraction uses BeautifulSoup to parse the document structure and extract visible text while removing scripts, styles, and other non-content elements.

    def _extract_html(self, file_path: Path) -> str:
        """Extract text from HTML document"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                html_content = f.read()
                
            soup = BeautifulSoup(html_content, 'html.parser')
            
            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.decompose()
                
            # Get text
            text = soup.get_text()
            
            # Clean up whitespace
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = '\n'.join(chunk for chunk in chunks if chunk)
            
            return text
            
        except Exception as e:
            raise Exception(f"Error extracting HTML: {e}")

Markdown extraction converts markdown to plain text while preserving structure. We use the markdown library to parse the document and extract the rendered text.

    def _extract_markdown(self, file_path: Path) -> str:
        """Extract text from Markdown document"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                md_content = f.read()
                
            # Convert markdown to HTML then extract text
            html = markdown.markdown(md_content)
            soup = BeautifulSoup(html, 'html.parser')
            text = soup.get_text()
            
            return text
            
        except Exception as e:
            raise Exception(f"Error extracting Markdown: {e}")

Plain text extraction simply reads the file with proper encoding detection to handle various text encodings.

    def _extract_text(self, file_path: Path) -> str:
        """Extract text from plain text file"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        except UnicodeDecodeError:
            # Try with different encoding
            with open(file_path, 'r', encoding='latin-1') as f:
                return f.read()

Text chunking divides documents into semantically meaningful segments. We implement a sliding window approach with overlap to ensure context continuity across chunk boundaries. Chunk size balances between providing sufficient context and staying within embedding model limits.

class TextChunker:
    def __init__(self, chunk_size: int = 512, overlap: int = 50):
        self.chunk_size = chunk_size
        self.overlap = overlap
        
    def chunk_text(self, text: str, doc_id: str) -> List[Dict]:
        """Split text into overlapping chunks"""
        # Split into sentences (simple approach)
        sentences = text.replace('\n', ' ').split('. ')
        
        chunks = []
        current_chunk = []
        current_length = 0
        chunk_index = 0
        
        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue
                
            sentence_length = len(sentence.split())
            
            # If adding this sentence exceeds chunk size, save current chunk
            if current_length + sentence_length > self.chunk_size and current_chunk:
                chunk_text = '. '.join(current_chunk) + '.'
                chunks.append({
                    'doc_id': doc_id,
                    'chunk_index': chunk_index,
                    'text': chunk_text,
                    'length': current_length
                })
                
                # Start new chunk with overlap
                overlap_sentences = current_chunk[-self.overlap:] if len(current_chunk) > self.overlap else current_chunk
                current_chunk = overlap_sentences
                current_length = sum(len(s.split()) for s in current_chunk)
                chunk_index += 1
                
            current_chunk.append(sentence)
            current_length += sentence_length
            
        # Add final chunk
        if current_chunk:
            chunk_text = '. '.join(current_chunk) + '.'
            chunks.append({
                'doc_id': doc_id,
                'chunk_index': chunk_index,
                'text': chunk_text,
                'length': current_length
            })
            
        return chunks

The embedding generator creates vector representations of text chunks using sentence transformers. These models produce high-quality embeddings optimized for semantic similarity search. We use a lightweight model by default but allow users to specify larger models for improved accuracy.

from sentence_transformers import SentenceTransformer
import numpy as np

class EmbeddingGenerator:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        """Initialize embedding model"""
        self.model = SentenceTransformer(model_name)
        self.embedding_dim = self.model.get_sentence_embedding_dimension()
        
    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """Generate embeddings for a list of texts"""
        embeddings = self.model.encode(texts, show_progress_bar=False)
        return embeddings

The vector store manages embedding storage and retrieval using FAISS, a library for efficient similarity search. FAISS supports billions of vectors with millisecond query times through approximate nearest neighbor algorithms.

import faiss

class VectorStore:
    def __init__(self, embedding_dim: int):
        self.embedding_dim = embedding_dim
        self.index = faiss.IndexFlatL2(embedding_dim)
        self.chunks = []
        self.doc_metadata = {}
        
    def add_document(self, doc_id: str, chunks: List[Dict], embeddings: np.ndarray):
        """Add document chunks and embeddings to the store"""
        # Store metadata
        self.doc_metadata[doc_id] = {
            'num_chunks': len(chunks),
            'added_at': datetime.now().isoformat()
        }
        
        # Add embeddings to FAISS index
        self.index.add(embeddings.astype('float32'))
        
        # Store chunk metadata
        for chunk in chunks:
            self.chunks.append(chunk)

Retrieval searches the vector store for chunks most similar to the query. We return the top-k most relevant chunks along with their similarity scores. These chunks are then incorporated into the prompt to provide context for the LLM.

    def search(self, query_embedding: np.ndarray, top_k: int = 3) -> List[Dict]:
        """Search for most similar chunks"""
        if self.index.ntotal == 0:
            return []
            
        # Search FAISS index
        distances, indices = self.index.search(
            query_embedding.astype('float32').reshape(1, -1), 
            min(top_k, self.index.ntotal)
        )
        
        # Retrieve chunks
        results = []
        for i, (dist, idx) in enumerate(zip(distances[0], indices[0])):
            if idx < len(self.chunks):
                chunk = self.chunks[idx].copy()
                chunk['similarity_score'] = float(1 / (1 + dist))  # Convert distance to similarity
                chunk['rank'] = i + 1
                results.append(chunk)
                
        return results

The RAG orchestrator coordinates document processing, embedding generation, and retrieval to augment prompts with relevant context. When a user asks a question, the system retrieves relevant chunks and prepends them to the prompt, instructing the model to answer based on the provided context.

class RAGOrchestrator:
    def __init__(self, embedding_model_name: str = 'all-MiniLM-L6-v2'):
        self.doc_processor = DocumentProcessor()
        self.chunker = TextChunker()
        self.embedding_generator = EmbeddingGenerator(embedding_model_name)
        self.vector_store = VectorStore(self.embedding_generator.embedding_dim)
        
    def add_document(self, file_path: str) -> Dict:
        """Process and add a document to the RAG system"""
        # Extract text
        doc_info = self.doc_processor.process_document(file_path)
        
        # Chunk text
        chunks = self.chunker.chunk_text(doc_info['text'], doc_info['id'])
        
        # Generate embeddings
        chunk_texts = [chunk['text'] for chunk in chunks]
        embeddings = self.embedding_generator.generate_embeddings(chunk_texts)
        
        # Add to vector store
        self.vector_store.add_document(doc_info['id'], chunks, embeddings)
        
        return {
            'doc_id': doc_info['id'],
            'filename': doc_info['filename'],
            'num_chunks': len(chunks),
            'status': 'success'
        }

Context augmentation retrieves relevant chunks and formats them into a context block that precedes the user's question. The prompt instructs the model to answer based on the provided context, improving factual accuracy.

    def augment_prompt(self, query: str, top_k: int = 3) -> str:
        """Augment a query with relevant context from documents"""
        # Generate query embedding
        query_embedding = self.embedding_generator.generate_embeddings([query])[0]
        
        # Retrieve relevant chunks
        results = self.vector_store.search(query_embedding, top_k)
        
        if not results:
            return query
            
        # Build context
        context_parts = ["Based on the following context, please answer the question.\n\nContext:"]
        
        for result in results:
            context_parts.append(f"\n{result['text']}")
            
        context_parts.append(f"\n\nQuestion: {query}\n\nAnswer:")
        
        return "\n".join(context_parts)

USER INTERFACE DESIGN AND IMPLEMENTATION

The user interface serves as the primary interaction point between users and the system. We design for clarity, efficiency, and visual appeal while maintaining functional density. Gradio provides the foundation, but we customize extensively to create a polished experience.

The interface organizes into logical sections: model management, inference parameters, conversation area, document management, and system status. Each section occupies a dedicated tab or panel, preventing visual clutter while keeping all functionality accessible.

import gradio as gr
from typing import List, Tuple

class ChatbotUI:
    def __init__(self, hardware_detector: HardwareDetector, 
                 model_manager: ModelManager,
                 inference_engine: InferenceEngine,
                 template_manager: PromptTemplateManager,
                 rag_orchestrator: RAGOrchestrator):
        self.hardware = hardware_detector
        self.models = model_manager
        self.engine = inference_engine
        self.templates = template_manager
        self.rag = rag_orchestrator
        
        self.conversation_history = []
        self.current_system_message = ""
        self.current_template = "llama2-chat"
        self.use_rag = False

The model selection interface presents available models with their metadata. Users can scan for new models, select a model to load, configure loading parameters, and monitor loading progress. We display model size, quantization level, and estimated memory requirements to help users make informed choices.

    def create_model_tab(self):
        """Create the model management tab"""
        with gr.Tab("Model Management"):
            with gr.Row():
                with gr.Column(scale=2):
                    model_dropdown = gr.Dropdown(
                        label="Available Models",
                        choices=[],
                        interactive=True
                    )
                    
                    scan_button = gr.Button("Scan for Models", variant="secondary")
                    
                    model_info = gr.Textbox(
                        label="Model Information",
                        lines=5,
                        interactive=False
                    )
                    
                with gr.Column(scale=3):
                    gr.Markdown("### Loading Parameters")
                    
                    n_ctx = gr.Slider(
                        minimum=512,
                        maximum=8192,
                        value=2048,
                        step=512,
                        label="Context Window Size",
                        info="Maximum number of tokens in context"
                    )
                    
                    n_gpu_layers = gr.Slider(
                        minimum=0,
                        maximum=100,
                        value=0,
                        step=1,
                        label="GPU Layers",
                        info="Number of layers to offload to GPU (0 = CPU only)"
                    )
                    
                    gpu_percentage = gr.Slider(
                        minimum=0,
                        maximum=100,
                        value=100,
                        step=5,
                        label="GPU Offload Percentage",
                        info="Percentage of model to offload to GPU"
                    )
                    
                    n_threads = gr.Slider(
                        minimum=1,
                        maximum=32,
                        value=4,
                        step=1,
                        label="CPU Threads",
                        info="Number of threads for CPU inference"
                    )
                    
                    load_button = gr.Button("Load Model", variant="primary")
                    load_status = gr.Textbox(label="Status", interactive=False)
                    
        return {
            'model_dropdown': model_dropdown,
            'scan_button': scan_button,
            'model_info': model_info,
            'n_ctx': n_ctx,
            'n_gpu_layers': n_gpu_layers,
            'gpu_percentage': gpu_percentage,
            'n_threads': n_threads,
            'load_button': load_button,
            'load_status': load_status
        }

The inference parameters tab exposes all generation controls. We group related parameters and provide tooltips explaining their effects. Real-time validation prevents invalid configurations.

    def create_inference_tab(self):
        """Create the inference parameters tab"""
        with gr.Tab("Inference Parameters"):
            gr.Markdown("### Generation Settings")
            
            with gr.Row():
                with gr.Column():
                    temperature = gr.Slider(
                        minimum=0.0,
                        maximum=2.0,
                        value=0.8,
                        step=0.05,
                        label="Temperature",
                        info="Controls randomness (lower = more deterministic)"
                    )
                    
                    top_p = gr.Slider(
                        minimum=0.0,
                        maximum=1.0,
                        value=0.95,
                        step=0.05,
                        label="Top P",
                        info="Nucleus sampling threshold"
                    )
                    
                    top_k = gr.Slider(
                        minimum=0,
                        maximum=200,
                        value=40,
                        step=5,
                        label="Top K",
                        info="Number of top tokens to consider"
                    )
                    
                with gr.Column():
                    repeat_penalty = gr.Slider(
                        minimum=1.0,
                        maximum=2.0,
                        value=1.1,
                        step=0.05,
                        label="Repetition Penalty",
                        info="Penalize repeated tokens"
                    )
                    
                    max_tokens = gr.Slider(
                        minimum=64,
                        maximum=2048,
                        value=512,
                        step=64,
                        label="Max Tokens",
                        info="Maximum length of generated response"
                    )
                    
            gr.Markdown("### Prompt Template")
            
            template_dropdown = gr.Dropdown(
                label="Template",
                choices=list(self.templates.templates.keys()),
                value="llama2-chat",
                interactive=True
            )
            
            system_message = gr.Textbox(
                label="System Message",
                lines=3,
                placeholder="Enter system instructions here...",
                value="You are a helpful AI assistant."
            )
            
        return {
            'temperature': temperature,
            'top_p': top_p,
            'top_k': top_k,
            'repeat_penalty': repeat_penalty,
            'max_tokens': max_tokens,
            'template_dropdown': template_dropdown,
            'system_message': system_message
        }

The conversation area displays the chat history and provides input for new messages. We implement streaming display for generated responses, creating a natural conversational flow. Users can clear history, regenerate responses, and copy messages.

    def create_chat_tab(self):
        """Create the main chat interface tab"""
        with gr.Tab("Chat"):
            chatbot = gr.Chatbot(
                label="Conversation",
                height=500,
                show_label=True
            )
            
            with gr.Row():
                user_input = gr.Textbox(
                    label="Your Message",
                    placeholder="Type your message here...",
                    lines=3,
                    scale=4
                )
                
                with gr.Column(scale=1):
                    send_button = gr.Button("Send", variant="primary")
                    clear_button = gr.Button("Clear History", variant="secondary")
                    
            with gr.Row():
                use_rag_checkbox = gr.Checkbox(
                    label="Use RAG (Retrieval Augmented Generation)",
                    value=False
                )
                
                rag_top_k = gr.Slider(
                    minimum=1,
                    maximum=10,
                    value=3,
                    step=1,
                    label="Number of Context Chunks",
                    visible=False
                )
                
        return {
            'chatbot': chatbot,
            'user_input': user_input,
            'send_button': send_button,
            'clear_button': clear_button,
            'use_rag_checkbox': use_rag_checkbox,
            'rag_top_k': rag_top_k
        }

The document management tab handles RAG document uploads. Users can upload files, view processed documents, and remove documents from the RAG system. We display processing status and document statistics.

    def create_document_tab(self):
        """Create the document management tab for RAG"""
        with gr.Tab("Documents (RAG)"):
            gr.Markdown("### Upload Documents for RAG")
            
            file_upload = gr.File(
                label="Upload Document",
                file_types=['.pdf', '.docx', '.html', '.htm', '.md', '.txt'],
                type="filepath"
            )
            
            upload_button = gr.Button("Process Document", variant="primary")
            upload_status = gr.Textbox(label="Processing Status", interactive=False)
            
            gr.Markdown("### Processed Documents")
            
            documents_list = gr.Dataframe(
                headers=["Document ID", "Filename", "Chunks", "Status"],
                datatype=["str", "str", "number", "str"],
                interactive=False
            )
            
            refresh_docs_button = gr.Button("Refresh List", variant="secondary")
            
        return {
            'file_upload': file_upload,
            'upload_button': upload_button,
            'upload_status': upload_status,
            'documents_list': documents_list,
            'refresh_docs_button': refresh_docs_button
        }

The system status tab displays hardware information and real-time resource utilization metrics. This transparency empowers users to understand how their system resources are being consumed and make informed decisions about model selection and parameter configuration. The status display updates on demand through refresh buttons, avoiding unnecessary background polling that could impact inference performance.

Hardware information presentation organizes detected capabilities into a structured JSON view. Users can expand sections to examine CPU specifications, GPU details, and memory configurations. This information proves particularly valuable when troubleshooting performance issues or determining why certain configurations fail to load.

Resource utilization monitoring tracks CPU usage percentage, system memory consumption with absolute values, and GPU memory allocation when applicable. These metrics help users identify bottlenecks and optimize their configurations. For instance, if CPU usage remains low during inference while generation is slow, this suggests the model may benefit from increased GPU layer offloading.

The refresh mechanism queries the operating system and hardware APIs to obtain current metrics. We implement this as an on-demand operation rather than continuous polling to minimize overhead. Users click the refresh button when they want updated information, typically after loading a model or starting a long generation task.

ADVANCED FEATURES AND OPTIMIZATIONS

Beyond the core functionality, several advanced features enhance the system's capabilities and user experience. These optimizations address common pain points and enable sophisticated workflows that would otherwise require manual intervention or external tools.

Automatic parameter suggestion analyzes the loaded model and available hardware to recommend optimal inference parameters. When a user loads a model, the system calculates suggested values for GPU layers, batch size, thread count, and context window size. These suggestions balance performance and memory usage based on empirical heuristics.

The suggestion algorithm considers model size in relation to available VRAM. For models that fit entirely in GPU memory, it recommends full offloading. For larger models, it calculates the maximum number of layers that fit while leaving headroom for context and intermediate activations. The algorithm also accounts for quantization level, as lower precision models consume less memory per layer.

class ParameterOptimizer:
    """Suggests optimal inference parameters based on hardware and model"""
    
    def __init__(self, hardware_detector: HardwareDetector):
        self.hardware = hardware_detector
        
    def suggest_parameters(self, model_size_gb: float, 
                          quantization: str) -> Dict:
        """Generate parameter suggestions for a model"""
        suggestions = {}
        
        # Determine available GPU memory
        gpu_memory_gb = 0
        if self.hardware.gpu_info['primary_type'] == 'NVIDIA_CUDA':
            if self.hardware.memory_info['gpu_memory']:
                gpu_memory_gb = self.hardware.memory_info['gpu_memory'][0]['total_gb']
        elif self.hardware.gpu_info['primary_type'] == 'APPLE_MPS':
            # Apple Silicon uses unified memory
            # Reserve 50% for system and other apps
            gpu_memory_gb = self.hardware.memory_info['system_total_gb'] * 0.5
            
        # Calculate optimal GPU layers
        if gpu_memory_gb > 0:
            # Estimate memory overhead (context, activations, etc.)
            overhead_gb = 2.0
            available_for_model = gpu_memory_gb - overhead_gb
            
            if available_for_model > model_size_gb:
                # Full offload possible
                suggestions['n_gpu_layers'] = 100
                suggestions['gpu_percentage'] = 100
            else:
                # Partial offload
                offload_ratio = available_for_model / model_size_gb
                suggestions['n_gpu_layers'] = int(35 * offload_ratio)
                suggestions['gpu_percentage'] = int(offload_ratio * 100)
        else:
            suggestions['n_gpu_layers'] = 0
            suggestions['gpu_percentage'] = 0
            
        # Suggest context window based on available memory
        system_memory_gb = self.hardware.memory_info['system_available_gb']
        
        if system_memory_gb > 16:
            suggestions['n_ctx'] = 4096
        elif system_memory_gb > 8:
            suggestions['n_ctx'] = 2048
        else:
            suggestions['n_ctx'] = 1024
            
        # Suggest thread count
        physical_cores = self.hardware.cpu_info.get('physical_cores', 4)
        suggestions['n_threads'] = max(1, physical_cores - 1)
        
        # Suggest batch size based on quantization
        if 'q2' in quantization.lower() or 'q3' in quantization.lower():
            suggestions['n_batch'] = 1024
        elif 'q4' in quantization.lower():
            suggestions['n_batch'] = 512
        else:
            suggestions['n_batch'] = 256
            
        return suggestions

Conversation export functionality allows users to save their chat history for later reference or analysis. The system exports conversations in multiple formats including plain text for readability, JSON for programmatic processing, and Markdown for documentation. Each export includes metadata such as model name, parameters used, and timestamps.

class ConversationExporter:
    """Exports conversation history in various formats"""
    
    def __init__(self):
        self.supported_formats = ['txt', 'json', 'md']
        
    def export_conversation(self, conversation_history: List[Tuple[str, str]], 
                           metadata: Dict, 
                           format: str = 'txt') -> str:
        """Export conversation in specified format"""
        if format not in self.supported_formats:
            raise ValueError(f"Unsupported format: {format}")
            
        if format == 'txt':
            return self._export_text(conversation_history, metadata)
        elif format == 'json':
            return self._export_json(conversation_history, metadata)
        else:
            return self._export_markdown(conversation_history, metadata)
            
    def _export_text(self, conversation: List[Tuple[str, str]], 
                    metadata: Dict) -> str:
        """Export as plain text"""
        lines = []
        lines.append("=" * 80)
        lines.append("CONVERSATION EXPORT")
        lines.append("=" * 80)
        lines.append(f"Model: {metadata.get('model_name', 'Unknown')}")
        lines.append(f"Date: {metadata.get('timestamp', 'Unknown')}")
        lines.append(f"Parameters: {metadata.get('parameters', {})}")
        lines.append("=" * 80)
        lines.append("")
        
        for i, (user_msg, assistant_msg) in enumerate(conversation):
            lines.append(f"Turn {i + 1}")
            lines.append("-" * 80)
            lines.append(f"User: {user_msg}")
            lines.append("")
            lines.append(f"Assistant: {assistant_msg}")
            lines.append("")
            
        return "\n".join(lines)
        
    def _export_json(self, conversation: List[Tuple[str, str]], 
                    metadata: Dict) -> str:
        """Export as JSON"""
        export_data = {
            'metadata': metadata,
            'conversation': [
                {
                    'turn': i + 1,
                    'user': user_msg,
                    'assistant': assistant_msg
                }
                for i, (user_msg, assistant_msg) in enumerate(conversation)
            ]
        }
        
        return json.dumps(export_data, indent=2)
        
    def _export_markdown(self, conversation: List[Tuple[str, str]], 
                        metadata: Dict) -> str:
        """Export as Markdown"""
        lines = []
        lines.append("# Conversation Export")
        lines.append("")
        lines.append("## Metadata")
        lines.append(f"- **Model**: {metadata.get('model_name', 'Unknown')}")
        lines.append(f"- **Date**: {metadata.get('timestamp', 'Unknown')}")
        lines.append(f"- **Parameters**: {metadata.get('parameters', {})}")
        lines.append("")
        lines.append("## Conversation")
        lines.append("")
        
        for i, (user_msg, assistant_msg) in enumerate(conversation):
            lines.append(f"### Turn {i + 1}")
            lines.append("")
            lines.append("**User:**")
            lines.append(f"> {user_msg}")
            lines.append("")
            lines.append("**Assistant:**")
            lines.append(assistant_msg)
            lines.append("")
            
        return "\n".join(lines)

Model comparison capabilities enable users to evaluate different models or parameter configurations side by side. The system can run the same prompt through multiple loaded models and display results in parallel. This feature proves invaluable when selecting the best model for a specific task or tuning parameters for optimal output quality.

Batch processing extends the chatbot's utility beyond interactive conversation. Users can provide a file containing multiple prompts and have the system process them sequentially, saving results to an output file. This mode supports automated testing, dataset generation, and bulk document processing.

class BatchProcessor:
    """Processes multiple prompts in batch mode"""
    
    def __init__(self, inference_engine: InferenceEngine,
                 template_manager: PromptTemplateManager):
        self.engine = inference_engine
        self.templates = template_manager
        
    def process_batch(self, prompts: List[str], 
                     template_name: str,
                     system_message: str,
                     output_file: str,
                     **generation_params) -> Dict:
        """Process a batch of prompts"""
        results = []
        errors = []
        
        for i, prompt in enumerate(prompts):
            try:
                formatted_prompt = self.templates.apply_template(
                    template_name,
                    system_message,
                    [],
                    prompt
                )
                
                response = self.engine.generate(
                    formatted_prompt,
                    **generation_params
                )
                
                results.append({
                    'index': i,
                    'prompt': prompt,
                    'response': response,
                    'status': 'success'
                })
                
            except Exception as e:
                errors.append({
                    'index': i,
                    'prompt': prompt,
                    'error': str(e)
                })
                
        # Save results
        with open(output_file, 'w') as f:
            json.dump({
                'results': results,
                'errors': errors,
                'summary': {
                    'total': len(prompts),
                    'successful': len(results),
                    'failed': len(errors)
                }
            }, f, indent=2)
            
        return {
            'total': len(prompts),
            'successful': len(results),
            'failed': len(errors)
        }

Response quality metrics provide objective measurements of generated text. The system calculates perplexity, token diversity, average sentence length, and other linguistic features. These metrics help users understand output characteristics and tune parameters accordingly.

class ResponseAnalyzer:
    """Analyzes generated responses for quality metrics"""
    
    def __init__(self):
        pass
        
    def analyze_response(self, response: str) -> Dict:
        """Calculate quality metrics for a response"""
        metrics = {}
        
        # Basic statistics
        tokens = response.split()
        metrics['token_count'] = len(tokens)
        metrics['character_count'] = len(response)
        
        # Sentence analysis
        sentences = [s.strip() for s in response.split('.') if s.strip()]
        metrics['sentence_count'] = len(sentences)
        
        if sentences:
            metrics['avg_sentence_length'] = sum(len(s.split()) for s in sentences) / len(sentences)
        else:
            metrics['avg_sentence_length'] = 0
            
        # Token diversity
        unique_tokens = set(tokens)
        metrics['unique_tokens'] = len(unique_tokens)
        metrics['token_diversity'] = len(unique_tokens) / len(tokens) if tokens else 0
        
        # Repetition detection
        bigrams = [f"{tokens[i]} {tokens[i+1]}" for i in range(len(tokens)-1)]
        unique_bigrams = set(bigrams)
        metrics['bigram_diversity'] = len(unique_bigrams) / len(bigrams) if bigrams else 0
        
        # Average word length
        if tokens:
            metrics['avg_word_length'] = sum(len(t) for t in tokens) / len(tokens)
        else:
            metrics['avg_word_length'] = 0
            
        return metrics

CONTEXT WINDOW MANAGEMENT

Context window management represents a critical aspect of working with large language models. The context window defines the maximum number of tokens the model can process simultaneously, encompassing both the input prompt and the generated response. Effective management ensures conversations remain coherent while preventing out-of-memory errors.

The naive approach of concatenating all previous messages quickly exhausts the context window in long conversations. Our system implements intelligent context management that preserves conversation coherence while staying within limits. The manager employs multiple strategies including sliding window truncation, summary-based compression, and importance-based selection.

Sliding window truncation maintains the most recent messages while discarding older ones. This simple approach works well for conversations where recent context matters most. The system configures a window size as a percentage of the total context limit, reserving space for the current prompt and response.

class ContextManager:
    """Manages conversation context within token limits"""
    
    def __init__(self, max_context_tokens: int = 2048):
        self.max_context_tokens = max_context_tokens
        self.reserved_tokens = 512  # Reserve for current prompt and response
        
    def estimate_tokens(self, text: str) -> int:
        """Estimate token count (rough approximation)"""
        # Simple heuristic: ~1.3 tokens per word on average
        return int(len(text.split()) * 1.3)
        
    def truncate_sliding_window(self, conversation_history: List[Dict],
                                current_prompt: str) -> List[Dict]:
        """Keep most recent messages that fit in context"""
        available_tokens = self.max_context_tokens - self.reserved_tokens
        current_tokens = self.estimate_tokens(current_prompt)
        
        truncated_history = []
        
        # Work backwards through history
        for turn in reversed(conversation_history):
            turn_tokens = 0
            if turn.get('content'):
                turn_tokens = self.estimate_tokens(turn['content'])
                
            if current_tokens + turn_tokens <= available_tokens:
                truncated_history.insert(0, turn)
                current_tokens += turn_tokens
            else:
                break
                
        return truncated_history

Summary-based compression generates concise summaries of older conversation segments. When the context window fills, the system summarizes the oldest portion and replaces multiple messages with a single summary message. This approach preserves important information from earlier in the conversation while freeing space for recent exchanges.

    def compress_with_summary(self, conversation_history: List[Dict],
                             summarizer_fn,
                             compression_ratio: float = 0.3) -> List[Dict]:
        """Compress old messages using summarization"""
        if len(conversation_history) <= 4:
            return conversation_history
            
        # Calculate split point
        split_index = int(len(conversation_history) * compression_ratio)
        
        # Get old messages to summarize
        old_messages = conversation_history[:split_index]
        recent_messages = conversation_history[split_index:]
        
        # Build text to summarize
        summary_text = ""
        for msg in old_messages:
            role = msg.get('role', 'unknown')
            content = msg.get('content', '')
            summary_text += f"{role}: {content}\n\n"
            
        # Generate summary
        summary = summarizer_fn(summary_text)
        
        # Create summary message
        summary_message = {
            'role': 'system',
            'content': f"Previous conversation summary: {summary}"
        }
        
        return [summary_message] + recent_messages

Importance-based selection uses heuristics to identify the most relevant messages for the current prompt. The system scores each historical message based on keyword overlap, semantic similarity, and recency. It then selects the highest-scoring messages that fit within the context limit.

    def select_by_importance(self, conversation_history: List[Dict],
                            current_prompt: str,
                            embedding_generator=None) -> List[Dict]:
        """Select most important messages for current context"""
        if not conversation_history:
            return []
            
        # Score each message
        scored_messages = []
        
        for i, msg in enumerate(conversation_history):
            score = 0
            content = msg.get('content', '')
            
            # Recency score (more recent = higher score)
            recency_score = i / len(conversation_history)
            score += recency_score * 0.3
            
            # Keyword overlap score
            prompt_words = set(current_prompt.lower().split())
            content_words = set(content.lower().split())
            overlap = len(prompt_words & content_words)
            overlap_score = overlap / max(len(prompt_words), 1)
            score += overlap_score * 0.4
            
            # Length penalty (prefer concise messages)
            length_penalty = min(1.0, 100 / max(len(content.split()), 1))
            score += length_penalty * 0.3
            
            scored_messages.append((score, i, msg))
            
        # Sort by score
        scored_messages.sort(reverse=True)
        
        # Select messages that fit
        selected = []
        current_tokens = self.estimate_tokens(current_prompt)
        available_tokens = self.max_context_tokens - self.reserved_tokens
        
        for score, original_index, msg in scored_messages:
            msg_tokens = self.estimate_tokens(msg.get('content', ''))
            
            if current_tokens + msg_tokens <= available_tokens:
                selected.append((original_index, msg))
                current_tokens += msg_tokens
                
        # Sort by original order
        selected.sort(key=lambda x: x[0])
        
        return [msg for _, msg in selected]

ERROR HANDLING AND RECOVERY

Robust error handling ensures the system remains stable and provides helpful feedback when problems occur. Common error scenarios include model loading failures due to insufficient memory, generation timeouts from excessive token limits, file access errors during document processing, and GPU out-of-memory conditions.

The error handling strategy employs multiple layers of defense. Input validation catches configuration errors before they reach the inference engine. Resource checks prevent operations that would exceed available memory. Graceful degradation allows the system to continue operating with reduced functionality when components fail.

Model loading errors receive special attention since they represent a critical failure mode. When loading fails, the system analyzes the error to determine the cause. Memory-related failures trigger suggestions to reduce GPU layers or context window size. File-related errors provide the exact path and permission information needed for troubleshooting.

class ErrorHandler:
    """Centralized error handling and recovery"""
    
    def __init__(self, hardware_detector: HardwareDetector):
        self.hardware = hardware_detector
        
    def handle_model_load_error(self, error: Exception, 
                                model_path: str,
                                params: Dict) -> Dict:
        """Analyze and provide recovery suggestions for model load errors"""
        error_msg = str(error).lower()
        suggestions = []
        
        if 'memory' in error_msg or 'oom' in error_msg:
            # Memory-related error
            suggestions.append("Reduce the number of GPU layers")
            suggestions.append("Decrease context window size (n_ctx)")
            suggestions.append("Close other applications to free memory")
            
            # Calculate suggested parameters
            if params.get('n_gpu_layers', 0) > 0:
                suggestions.append(f"Try n_gpu_layers={params['n_gpu_layers'] // 2}")
                
            if params.get('n_ctx', 2048) > 1024:
                suggestions.append(f"Try n_ctx={params['n_ctx'] // 2}")
                
        elif 'file' in error_msg or 'path' in error_msg:
            # File access error
            suggestions.append(f"Verify model file exists: {model_path}")
            suggestions.append("Check file permissions")
            suggestions.append("Ensure file is not corrupted")
            
        elif 'cuda' in error_msg or 'gpu' in error_msg:
            # GPU-specific error
            suggestions.append("Try CPU-only mode (n_gpu_layers=0)")
            suggestions.append("Update GPU drivers")
            suggestions.append("Check CUDA installation")
            
        return {
            'error': str(error),
            'suggestions': suggestions,
            'recovery_params': self._generate_recovery_params(params)
        }
        
    def _generate_recovery_params(self, original_params: Dict) -> Dict:
        """Generate conservative parameters for retry"""
        recovery = original_params.copy()
        
        # Reduce resource usage
        recovery['n_gpu_layers'] = max(0, recovery.get('n_gpu_layers', 0) // 2)
        recovery['n_ctx'] = min(1024, recovery.get('n_ctx', 2048) // 2)
        recovery['n_batch'] = min(256, recovery.get('n_batch', 512) // 2)
        
        return recovery

Generation errors typically stem from malformed prompts, excessive token limits, or model-specific quirks. The system catches these errors and provides actionable feedback. For timeout errors, it suggests reducing max tokens. For formatting errors, it validates the prompt template configuration.

    def handle_generation_error(self, error: Exception,
                                prompt: str,
                                params: Dict) -> Dict:
        """Handle errors during text generation"""
        error_msg = str(error).lower()
        suggestions = []
        
        if 'timeout' in error_msg:
            suggestions.append("Reduce max_tokens parameter")
            suggestions.append("Simplify the prompt")
            suggestions.append("Check system resource usage")
            
        elif 'context' in error_msg or 'length' in error_msg:
            suggestions.append("Reduce prompt length")
            suggestions.append("Increase n_ctx if memory allows")
            suggestions.append("Clear conversation history")
            
        elif 'token' in error_msg:
            suggestions.append("Check prompt template formatting")
            suggestions.append("Verify special tokens are correct")
            
        # Estimate prompt token count
        estimated_tokens = len(prompt.split()) * 1.3
        
        return {
            'error': str(error),
            'suggestions': suggestions,
            'prompt_length': len(prompt),
            'estimated_tokens': int(estimated_tokens),
            'max_context': params.get('n_ctx', 2048)
        }

Document processing errors arise from unsupported file formats, corrupted files, or encoding issues. The handler attempts automatic recovery through encoding detection and format conversion. When recovery fails, it provides detailed diagnostic information.

    def handle_document_error(self, error: Exception,
                             file_path: str) -> Dict:
        """Handle errors during document processing"""
        error_msg = str(error).lower()
        suggestions = []
        
        if 'encoding' in error_msg or 'decode' in error_msg:
            suggestions.append("File may have non-standard encoding")
            suggestions.append("Try converting to UTF-8")
            suggestions.append("Save file in a different format")
            
        elif 'format' in error_msg or 'unsupported' in error_msg:
            suggestions.append("Check file extension matches content")
            suggestions.append("Convert to supported format (PDF, DOCX, TXT, etc.)")
            
        elif 'permission' in error_msg:
            suggestions.append("Check file permissions")
            suggestions.append("Ensure file is not open in another program")
            
        return {
            'error': str(error),
            'file_path': file_path,
            'suggestions': suggestions
        }

PERFORMANCE MONITORING AND PROFILING

Performance monitoring provides insights into system behavior and identifies optimization opportunities. The monitoring system tracks inference latency, tokens per second throughput, memory usage over time, and GPU utilization when applicable. These metrics inform parameter tuning and help diagnose performance issues.

Inference latency measurement captures the time from prompt submission to response completion. The system records both total latency and time-to-first-token, as the latter significantly impacts perceived responsiveness in streaming mode. Latency statistics accumulate over multiple generations to identify trends and outliers.

import time
from collections import deque

class PerformanceMonitor:
    """Monitors and reports performance metrics"""
    
    def __init__(self, history_size: int = 100):
        self.history_size = history_size
        self.latency_history = deque(maxlen=history_size)
        self.throughput_history = deque(maxlen=history_size)
        self.memory_history = deque(maxlen=history_size)
        
    def record_generation(self, prompt_tokens: int,
                         generated_tokens: int,
                         total_time: float,
                         time_to_first_token: float):
        """Record metrics for a generation"""
        self.latency_history.append({
            'total_time': total_time,
            'time_to_first_token': time_to_first_token,
            'timestamp': time.time()
        })
        
        tokens_per_second = generated_tokens / total_time if total_time > 0 else 0
        self.throughput_history.append({
            'tokens_per_second': tokens_per_second,
            'generated_tokens': generated_tokens,
            'timestamp': time.time()
        })
        
    def get_statistics(self) -> Dict:
        """Calculate performance statistics"""
        if not self.latency_history:
            return {}
            
        latencies = [entry['total_time'] for entry in self.latency_history]
        ttfts = [entry['time_to_first_token'] for entry in self.latency_history]
        throughputs = [entry['tokens_per_second'] for entry in self.throughput_history]
        
        return {
            'latency': {
                'mean': sum(latencies) / len(latencies),
                'min': min(latencies),
                'max': max(latencies),
                'recent': latencies[-1] if latencies else 0
            },
            'time_to_first_token': {
                'mean': sum(ttfts) / len(ttfts),
                'min': min(ttfts),
                'max': max(ttfts),
                'recent': ttfts[-1] if ttfts else 0
            },
            'throughput': {
                'mean': sum(throughputs) / len(throughputs),
                'min': min(throughputs),
                'max': max(throughputs),
                'recent': throughputs[-1] if throughputs else 0
            },
            'sample_count': len(self.latency_history)
        }

Memory profiling tracks allocation patterns to identify leaks and excessive consumption. The profiler samples memory usage at regular intervals during generation and correlates spikes with specific operations. This information guides optimization efforts and helps prevent out-of-memory crashes.

    def record_memory_snapshot(self):
        """Record current memory usage"""
        snapshot = {
            'timestamp': time.time(),
            'system_memory_gb': psutil.virtual_memory().used / (1024**3),
            'system_memory_percent': psutil.virtual_memory().percent
        }
        
        if torch.cuda.is_available():
            snapshot['gpu_memory_gb'] = torch.cuda.memory_allocated(0) / (1024**3)
            snapshot['gpu_memory_reserved_gb'] = torch.cuda.memory_reserved(0) / (1024**3)
            
        self.memory_history.append(snapshot)
        
    def get_memory_statistics(self) -> Dict:
        """Calculate memory usage statistics"""
        if not self.memory_history:
            return {}
            
        system_mem = [entry['system_memory_gb'] for entry in self.memory_history]
        
        stats = {
            'system_memory': {
                'mean_gb': sum(system_mem) / len(system_mem),
                'min_gb': min(system_mem),
                'max_gb': max(system_mem),
                'current_gb': system_mem[-1] if system_mem else 0
            }
        }
        
        if 'gpu_memory_gb' in self.memory_history[0]:
            gpu_mem = [entry['gpu_memory_gb'] for entry in self.memory_history]
            stats['gpu_memory'] = {
                'mean_gb': sum(gpu_mem) / len(gpu_mem),
                'min_gb': min(gpu_mem),
                'max_gb': max(gpu_mem),
                'current_gb': gpu_mem[-1] if gpu_mem else 0
            }
            
        return stats

Profiling integration wraps critical operations with timing instrumentation. The profiler measures time spent in model loading, prompt formatting, inference execution, and post-processing. Detailed breakdowns reveal which operations dominate execution time and where optimization efforts should focus.

    def profile_operation(self, operation_name: str):
        """Context manager for profiling operations"""
        return OperationProfiler(self, operation_name)
        

class OperationProfiler:
    """Context manager for timing operations"""
    
    def __init__(self, monitor: PerformanceMonitor, operation_name: str):
        self.monitor = monitor
        self.operation_name = operation_name
        self.start_time = None
        
    def __enter__(self):
        self.start_time = time.time()
        return self
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        elapsed = time.time() - self.start_time
        
        if not hasattr(self.monitor, 'operation_times'):
            self.monitor.operation_times = {}
            
        if self.operation_name not in self.monitor.operation_times:
            self.monitor.operation_times[self.operation_name] = deque(maxlen=100)
            
        self.monitor.operation_times[self.operation_name].append(elapsed)

CONFIGURATION PERSISTENCE AND PRESETS

Configuration persistence saves user preferences across sessions, eliminating the need to reconfigure parameters every time the application launches. The system stores model selections, inference parameters, prompt templates, and UI preferences in a configuration file. Users can also create named presets for different use cases.

The configuration manager handles loading, saving, and validating configuration data. It implements a hierarchical structure where global defaults can be overridden by model-specific settings and user presets. This flexibility accommodates diverse workflows while maintaining sensible defaults.

class ConfigurationManager:
    """Manages application configuration and user presets"""
    
    def __init__(self, config_file: str = "config.json"):
        self.config_file = Path(config_file)
        self.config = self._load_config()
        self.presets = self.config.get('presets', {})
        
    def _load_config(self) -> Dict:
        """Load configuration from disk"""
        if self.config_file.exists():
            try:
                with open(self.config_file, 'r') as f:
                    return json.load(f)
            except Exception as e:
                print(f"Error loading config: {e}")
                return self._default_config()
        return self._default_config()
        
    def _default_config(self) -> Dict:
        """Generate default configuration"""
        return {
            'last_model': None,
            'default_template': 'llama2-chat',
            'default_system_message': 'You are a helpful AI assistant.',
            'inference_params': {
                'temperature': 0.8,
                'top_p': 0.95,
                'top_k': 40,
                'repeat_penalty': 1.1,
                'max_tokens': 512
            },
            'ui_preferences': {
                'theme': 'soft',
                'show_token_count': True,
                'enable_streaming': True
            },
            'presets': {}
        }
        
    def save_config(self):
        """Save configuration to disk"""
        try:
            with open(self.config_file, 'w') as f:
                json.dump(self.config, f, indent=2)
        except Exception as e:
            print(f"Error saving config: {e}")

Preset management allows users to save and recall complete parameter configurations. Each preset includes a name, description, and all relevant parameters. Users might create presets for creative writing with high temperature, technical documentation with low temperature, or code generation with specific formatting requirements.

    def create_preset(self, name: str, description: str, 
                     parameters: Dict) -> bool:
        """Create a new preset"""
        if name in self.presets:
            raise ValueError(f"Preset '{name}' already exists")
            
        self.presets[name] = {
            'description': description,
            'parameters': parameters,
            'created_at': datetime.now().isoformat()
        }
        
        self.config['presets'] = self.presets
        self.save_config()
        
        return True
        
    def load_preset(self, name: str) -> Dict:
        """Load a preset by name"""
        if name not in self.presets:
            raise ValueError(f"Preset '{name}' not found")
            
        return self.presets[name]['parameters'].copy()
        
    def delete_preset(self, name: str) -> bool:
        """Delete a preset"""
        if name not in self.presets:
            raise ValueError(f"Preset '{name}' not found")
            
        del self.presets[name]
        self.config['presets'] = self.presets
        self.save_config()
        
        return True
        
    def list_presets(self) -> List[Dict]:
        """List all available presets"""
        return [
            {
                'name': name,
                'description': preset['description'],
                'created_at': preset['created_at']
            }
            for name, preset in self.presets.items()
        ]

LOGGING AND DEBUGGING

Comprehensive logging facilitates troubleshooting and provides audit trails for production deployments. The logging system captures application events at multiple severity levels including debug for detailed diagnostic information, info for normal operational events, warning for potentially problematic situations, and error for failures requiring attention.

The logger implements structured logging with contextual information. Each log entry includes a timestamp, severity level, component name, and detailed message. For errors, it captures stack traces and relevant state information. Log output can be directed to console, file, or both based on configuration.

import logging
from logging.handlers import RotatingFileHandler

class ApplicationLogger:
    """Centralized logging for the application"""
    
    def __init__(self, log_file: str = "chatbot.log", 
                 log_level: str = "INFO"):
        self.logger = logging.getLogger("LocalLLMChatbot")
        self.logger.setLevel(getattr(logging, log_level.upper()))
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        console_format = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        console_handler.setFormatter(console_format)
        
        # File handler with rotation
        file_handler = RotatingFileHandler(
            log_file,
            maxBytes=10*1024*1024,  # 10MB
            backupCount=5
        )
        file_handler.setLevel(logging.DEBUG)
        file_format = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
        )
        file_handler.setFormatter(file_format)
        
        self.logger.addHandler(console_handler)
        self.logger.addHandler(file_handler)
        
    def debug(self, message: str, **kwargs):
        """Log debug message"""
        self.logger.debug(message, extra=kwargs)
        
    def info(self, message: str, **kwargs):
        """Log info message"""
        self.logger.info(message, extra=kwargs)
        
    def warning(self, message: str, **kwargs):
        """Log warning message"""
        self.logger.warning(message, extra=kwargs)
        
    def error(self, message: str, exception: Exception = None, **kwargs):
        """Log error message"""
        if exception:
            self.logger.error(f"{message}: {str(exception)}", exc_info=True, extra=kwargs)
        else:
            self.logger.error(message, extra=kwargs)

Debug mode provides enhanced visibility into system behavior. When enabled, it logs detailed information about prompt construction, token counts, parameter values, and intermediate processing steps. This verbosity aids in diagnosing subtle issues but should be disabled in production due to performance overhead and log volume.

    def log_generation_details(self, prompt: str, parameters: Dict, 
                              response: str, metrics: Dict):
        """Log detailed generation information for debugging"""
        self.debug("Generation started")
        self.debug(f"Prompt length: {len(prompt)} characters")
        self.debug(f"Parameters: {parameters}")
        
        if len(prompt) < 500:
            self.debug(f"Full prompt: {prompt}")
        else:
            self.debug(f"Prompt preview: {prompt[:500]}...")
            
        self.debug(f"Response length: {len(response)} characters")
        self.debug(f"Metrics: {metrics}")
        
    def log_model_load(self, model_path: str, parameters: Dict, 
                      success: bool, load_time: float):
        """Log model loading event"""
        if success:
            self.info(f"Model loaded successfully: {model_path} in {load_time:.2f}s")
            self.debug(f"Load parameters: {parameters}")
        else:
            self.error(f"Model load failed: {model_path}")
            self.debug(f"Failed parameters: {parameters}")

EXTENDED UI ENHANCEMENTS

Beyond the core interface, several enhancements improve usability and visual appeal. These refinements transform a functional tool into a polished application that users enjoy interacting with.

Syntax highlighting for code blocks in generated responses improves readability when the model produces programming examples. The system detects code blocks in markdown format and applies language-specific highlighting. This feature proves particularly valuable for technical assistance and code generation tasks.

Token count display shows users how much of their context window is consumed. A visual indicator updates in real-time as they type, warning when approaching the limit. This transparency helps users understand context constraints and manage their conversations effectively.

Response regeneration allows users to request alternative responses without retyping their prompt. The system maintains the conversation state and generates a new response with the same or modified parameters. Users can regenerate multiple times to explore different outputs.

Message editing enables users to modify previous messages and regenerate subsequent responses. This feature supports iterative refinement where users adjust their questions based on initial responses. The system handles the complexity of updating conversation history and maintaining coherence.

class EnhancedChatInterface:
    """Extended chat interface with advanced features"""
    
    def __init__(self, base_ui: ChatbotUI):
        self.base_ui = base_ui
        self.message_history = []
        
    def add_token_counter(self, user_input_component, 
                         max_tokens: int = 2048):
        """Add real-time token counting to input"""
        def count_tokens(text):
            estimated = int(len(text.split()) * 1.3)
            percentage = (estimated / max_tokens) * 100
            
            if percentage > 90:
                status = f"⚠️ {estimated}/{max_tokens} tokens ({percentage:.1f}%)"
            elif percentage > 75:
                status = f"⚡ {estimated}/{max_tokens} tokens ({percentage:.1f}%)"
            else:
                status = f"✓ {estimated}/{max_tokens} tokens ({percentage:.1f}%)"
                
            return status
            
        return count_tokens
        
    def create_regenerate_button(self, chatbot_component, 
                                 generation_function):
        """Create button to regenerate last response"""
        def regenerate_last():
            if not self.message_history:
                return chatbot_component.value
                
            # Get last user message
            last_user_msg = None
            for msg in reversed(self.message_history):
                if msg['role'] == 'user':
                    last_user_msg = msg['content']
                    break
                    
            if not last_user_msg:
                return chatbot_component.value
                
            # Remove last assistant response
            history = chatbot_component.value[:-1] if chatbot_component.value else []
            
            # Generate new response
            new_response = generation_function(last_user_msg)
            history.append([last_user_msg, new_response])
            
            return history
            
        return regenerate_last

Keyboard shortcuts accelerate common operations. Users can press Enter to send messages, Shift-Enter for newlines, Ctrl-R to regenerate, and Ctrl-K to clear history. These shortcuts reduce mouse dependency and improve workflow efficiency.

Theme customization allows users to adjust the interface appearance. The system supports light and dark themes with customizable accent colors. Users can select themes that match their preferences or reduce eye strain during extended sessions.

Export functionality extends beyond conversations to include model configurations, performance reports, and system diagnostics. Users can export complete session information for sharing, archiving, or analysis. The export includes all relevant context needed to reproduce results.

MULTI-MODEL SUPPORT AND COMPARISON

Advanced users often work with multiple models simultaneously to compare outputs or leverage specialized capabilities. The system supports loading multiple models concurrently, subject to memory constraints, and provides tools for side-by-side comparison.

The multi-model manager tracks loaded models and their resource consumption. It prevents loading combinations that would exceed available memory and provides warnings when approaching limits. Users can quickly switch between models or send the same prompt to multiple models for comparison.

class MultiModelManager:
    """Manages multiple loaded models"""
    
    def __init__(self, hardware_detector: HardwareDetector):
        self.hardware = hardware_detector
        self.loaded_models = {}
        self.active_model = None
        
    def can_load_model(self, model_size_gb: float, 
                      n_gpu_layers: int) -> Tuple[bool, str]:
        """Check if model can be loaded given current state"""
        # Calculate current memory usage
        current_usage_gb = sum(
            model['size_gb'] for model in self.loaded_models.values()
        )
        
        # Estimate new model memory requirement
        estimated_usage = model_size_gb
        if n_gpu_layers > 0:
            # GPU memory check
            gpu_memory = self.hardware.memory_info.get('gpu_memory', [])
            if gpu_memory:
                available_gpu = gpu_memory[0]['total_gb'] - gpu_memory[0]['allocated_gb']
                if estimated_usage > available_gpu:
                    return False, f"Insufficient GPU memory. Need {estimated_usage:.1f}GB, have {available_gpu:.1f}GB"
        
        # System memory check
        available_system = self.hardware.memory_info['system_available_gb']
        total_needed = current_usage_gb + estimated_usage
        
        if total_needed > available_system * 0.8:  # Leave 20% headroom
            return False, f"Insufficient system memory. Would use {total_needed:.1f}GB of {available_system:.1f}GB available"
            
        return True, "Model can be loaded"
        
    def load_model(self, model_id: str, model_path: str, 
                   size_gb: float, engine: InferenceEngine,
                   **params) -> bool:
        """Load a model and track it"""
        can_load, message = self.can_load_model(
            size_gb, 
            params.get('n_gpu_layers', 0)
        )
        
        if not can_load:
            raise RuntimeError(message)
            
        success = engine.load_model(model_path, **params)
        
        if success:
            self.loaded_models[model_id] = {
                'engine': engine,
                'path': model_path,
                'size_gb': size_gb,
                'params': params
            }
            self.active_model = model_id
            
        return success
        
    def unload_model(self, model_id: str):
        """Unload a model and free resources"""
        if model_id in self.loaded_models:
            engine = self.loaded_models[model_id]['engine']
            if hasattr(engine, 'model') and engine.model:
                del engine.model
                engine.model = None
                
            del self.loaded_models[model_id]
            
            if self.active_model == model_id:
                self.active_model = None

Comparison mode sends identical prompts to multiple models and displays results side by side. Users can evaluate which model produces better outputs for their specific use case. The comparison includes response quality metrics, generation time, and token counts for each model.

    def compare_models(self, prompt: str, model_ids: List[str],
                      template_manager, system_message: str,
                      **generation_params) -> List[Dict]:
        """Generate responses from multiple models for comparison"""
        results = []
        
        for model_id in model_ids:
            if model_id not in self.loaded_models:
                results.append({
                    'model_id': model_id,
                    'error': 'Model not loaded',
                    'response': None
                })
                continue
                
            engine = self.loaded_models[model_id]['engine']
            
            try:
                start_time = time.time()
                
                formatted_prompt = template_manager.apply_template(
                    generation_params.get('template', 'raw'),
                    system_message,
                    [],
                    prompt
                )
                
                response = engine.generate(formatted_prompt, **generation_params)
                
                elapsed = time.time() - start_time
                
                results.append({
                    'model_id': model_id,
                    'response': response,
                    'generation_time': elapsed,
                    'tokens_per_second': len(response.split()) / elapsed if elapsed > 0 else 0,
                    'error': None
                })
                
            except Exception as e:
                results.append({
                    'model_id': model_id,
                    'error': str(e),
                    'response': None
                })
                
        return results

PLUGIN ARCHITECTURE AND EXTENSIBILITY

A plugin architecture enables users and developers to extend the chatbot's capabilities without modifying core code. Plugins can add new document processors, custom prompt templates, specialized RAG strategies, or integration with external services.

The plugin system defines a clear interface that plugins must implement. The core application discovers and loads plugins at startup, registers their capabilities, and routes requests appropriately. This architecture maintains system stability while enabling unlimited extensibility.

from abc import ABC, abstractmethod

class Plugin(ABC):
    """Base class for all plugins"""
    
    @abstractmethod
    def get_name(self) -> str:
        """Return plugin name"""
        pass
        
    @abstractmethod
    def get_version(self) -> str:
        """Return plugin version"""
        pass
        
    @abstractmethod
    def initialize(self, app_context: Dict) -> bool:
        """Initialize plugin with application context"""
        pass
        
    @abstractmethod
    def shutdown(self):
        """Clean up plugin resources"""
        pass


class DocumentProcessorPlugin(Plugin):
    """Plugin interface for custom document processors"""
    
    @abstractmethod
    def get_supported_formats(self) -> List[str]:
        """Return list of supported file extensions"""
        pass
        
    @abstractmethod
    def process_document(self, file_path: str) -> Dict:
        """Process document and return extracted text"""
        pass


class PluginManager:
    """Manages plugin loading and lifecycle"""
    
    def __init__(self, plugin_directory: str = "./plugins"):
        self.plugin_directory = Path(plugin_directory)
        self.plugin_directory.mkdir(parents=True, exist_ok=True)
        self.plugins = {}
        
    def discover_plugins(self) -> List[str]:
        """Discover available plugins"""
        discovered = []
        
        for file in self.plugin_directory.glob("*.py"):
            if file.stem != "__init__":
                discovered.append(file.stem)
                
        return discovered
        
    def load_plugin(self, plugin_name: str, app_context: Dict) -> bool:
        """Load and initialize a plugin"""
        try:
            # Dynamic import
            import importlib.util
            
            plugin_path = self.plugin_directory / f"{plugin_name}.py"
            spec = importlib.util.spec_from_file_location(plugin_name, plugin_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            
            # Find plugin class
            plugin_class = None
            for item_name in dir(module):
                item = getattr(module, item_name)
                if isinstance(item, type) and issubclass(item, Plugin) and item != Plugin:
                    plugin_class = item
                    break
                    
            if not plugin_class:
                return False
                
            # Instantiate and initialize
            plugin_instance = plugin_class()
            if plugin_instance.initialize(app_context):
                self.plugins[plugin_name] = plugin_instance
                return True
                
        except Exception as e:
            print(f"Error loading plugin {plugin_name}: {e}")
            
        return False
        
    def unload_plugin(self, plugin_name: str):
        """Unload a plugin"""
        if plugin_name in self.plugins:
            self.plugins[plugin_name].shutdown()
            del self.plugins[plugin_name]

Example plugins demonstrate the architecture's capabilities. A custom document processor plugin might add support for specialized formats like scientific papers or legal documents. A RAG enhancement plugin could implement advanced retrieval strategies like hybrid search or query expansion.

DEPLOYMENT CONSIDERATIONS

Deploying the chatbot for production use requires attention to packaging, dependencies, resource management, and user support. The deployment strategy balances ease of installation with flexibility and performance.

Dependency management uses a requirements file that specifies exact versions of all libraries. This ensures reproducible installations across different systems. The installation script checks for compatible Python versions and hardware capabilities before proceeding.

# requirements.txt example
llama-cpp-python==0.2.20
torch==2.1.0
gradio==4.7.1
sentence-transformers==2.2.2
faiss-cpu==1.7.4
PyMuPDF==1.23.8
python-docx==1.1.0
beautifulsoup4==4.12.2
markdown==3.5.1
psutil==5.9.6
numpy==1.24.3

Installation scripts automate the setup process. They create virtual environments, install dependencies, download default models if desired, and verify the installation. Platform-specific scripts handle differences between Windows, macOS, and Linux.

#!/usr/bin/env python3
"""
Installation script for Local LLM Chatbot
"""

import sys
import subprocess
import platform
from pathlib import Path

def check_python_version():
    """Verify Python version is compatible"""
    version = sys.version_info
    if version.major < 3 or (version.major == 3 and version.minor < 8):
        print("Error: Python 3.8 or higher required")
        return False
    print(f"✓ Python {version.major}.{version.minor}.{version.micro}")
    return True

def create_virtual_environment():
    """Create a virtual environment"""
    venv_path = Path("venv")
    
    if venv_path.exists():
        print("✓ Virtual environment already exists")
        return True
        
    try:
        subprocess.run([sys.executable, "-m", "venv", "venv"], check=True)
        print("✓ Virtual environment created")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error creating virtual environment: {e}")
        return False

def install_dependencies():
    """Install required packages"""
    pip_path = "venv/bin/pip" if platform.system() != "Windows" else "venv\\Scripts\\pip"
    
    try:
        subprocess.run([pip_path, "install", "--upgrade", "pip"], check=True)
        subprocess.run([pip_path, "install", "-r", "requirements.txt"], check=True)
        print("✓ Dependencies installed")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error installing dependencies: {e}")
        return False

def verify_installation():
    """Verify all components are working"""
    python_path = "venv/bin/python" if platform.system() != "Windows" else "venv\\Scripts\\python"
    
    test_script = """
import torch
import gradio
from llama_cpp import Llama
print("All imports successful")
"""
    
    try:
        result = subprocess.run(
            [python_path, "-c", test_script],
            capture_output=True,
            text=True,
            check=True
        )
        print("✓ Installation verified")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Verification failed: {e.stderr}")
        return False

def main():
    """Run installation"""
    print("Local LLM Chatbot Installation")
    print("=" * 50)
    
    if not check_python_version():
        sys.exit(1)
        
    if not create_virtual_environment():
        sys.exit(1)
        
    if not install_dependencies():
        sys.exit(1)
        
    if not verify_installation():
        sys.exit(1)
        
    print("\n" + "=" * 50)
    print("Installation complete!")
    print("\nTo run the chatbot:")
    if platform.system() == "Windows":
        print("  venv\\Scripts\\python main.py")
    else:
        print("  source venv/bin/activate")
        print("  python main.py")

if __name__ == "__main__":
    main()

Resource configuration files allow administrators to set system-wide defaults and constraints. These configurations might limit maximum model size, restrict GPU usage, or enforce security policies. The application respects these constraints while allowing individual users flexibility within defined bounds.

Documentation generation produces comprehensive user guides and API references. The documentation covers installation procedures, basic usage tutorials, advanced features, troubleshooting guides, and plugin development. Clear documentation reduces support burden and empowers users to solve problems independently.

CONCLUSION AND FUTURE DIRECTIONS

Building a professional local LLM chatbot requires integrating multiple complex systems into a coherent, user-friendly application. This comprehensive implementation demonstrates how careful architectural planning, attention to performance, robust error handling, and thoughtful interface design combine to create a production-ready tool.

The modular architecture enables continuous improvement and extension. Future enhancements might include support for multimodal models that process images and audio, distributed inference for models larger than single-machine capacity, advanced fine-tuning interfaces for model customization, collaborative features for team environments, and integration with development tools and workflows.

The local-first approach provides users with complete control over their data and models while eliminating dependency on cloud services. As LLMs continue to advance and hardware capabilities expand, local deployment becomes increasingly viable for sophisticated applications. This chatbot provides a foundation for exploring these possibilities while maintaining the flexibility to adapt to future developments.

Users gain a powerful tool that respects their privacy, runs on their hardware, and adapts to their specific needs. The combination of advanced features, intuitive interface, and extensible architecture creates a system that serves both casual users and power users effectively. By open-sourcing such implementations, we enable a broader community to benefit from and contribute to local LLM technology.


ADDENDUM - FULL CODE



#!/usr/bin/env python3

"""

Local LLM Chatbot with Advanced Hardware Detection and RAG

A production-ready chatbot for running large language models locally

"""


import os

import sys

import json

import hashlib

import threading

import subprocess

import time

from pathlib import Path

from datetime import datetime

from typing import Dict, List, Optional, Iterator, Tuple

from collections import deque


import psutil

import platform

import torch

import numpy as np

import gradio as gr


# Document processing imports

try:

    import fitz  # PyMuPDF

except ImportError:

    print("Warning: PyMuPDF not installed. PDF support disabled.")

    fitz = None


try:

    from docx import Document as DocxDocument

except ImportError:

    print("Warning: python-docx not installed. DOCX support disabled.")

    DocxDocument = None


try:

    from bs4 import BeautifulSoup

except ImportError:

    print("Warning: BeautifulSoup4 not installed. HTML support disabled.")

    BeautifulSoup = None


try:

    import markdown

except ImportError:

    print("Warning: markdown not installed. Markdown support limited.")

    markdown = None


# ML imports

try:

    from llama_cpp import Llama

except ImportError:

    print("Error: llama-cpp-python not installed. Please install with: pip install llama-cpp-python")

    sys.exit(1)


try:

    from sentence_transformers import SentenceTransformer

except ImportError:

    print("Warning: sentence-transformers not installed. RAG support disabled.")

    SentenceTransformer = None


try:

    import faiss

except ImportError:

    print("Warning: faiss not installed. RAG support disabled.")

    faiss = None



class HardwareDetector:

    """Detects and reports system hardware capabilities"""

    

    def __init__(self):

        self.cpu_info = {}

        self.gpu_info = {}

        self.memory_info = {}

        self.detected = False

        

    def detect_cpu(self) -> Dict:

        """Detect CPU specifications and capabilities"""

        self.cpu_info['physical_cores'] = psutil.cpu_count(logical=False)

        self.cpu_info['logical_cores'] = psutil.cpu_count(logical=True)

        self.cpu_info['architecture'] = platform.machine()

        self.cpu_info['processor'] = platform.processor()

        

        try:

            freq = psutil.cpu_freq()

            if freq:

                self.cpu_info['max_frequency_mhz'] = freq.max

                self.cpu_info['current_frequency_mhz'] = freq.current

        except Exception as e:

            self.cpu_info['frequency_error'] = str(e)

            

        return self.cpu_info

    

    def detect_nvidia_gpu(self) -> Optional[List[Dict]]:

        """Detect NVIDIA GPU specifications using CUDA"""

        if not torch.cuda.is_available():

            return None

            

        gpu_list = []

        for i in range(torch.cuda.device_count()):

            device_props = torch.cuda.get_device_properties(i)

            gpu_info = {

                'index': i,

                'name': device_props.name,

                'compute_capability': f"{device_props.major}.{device_props.minor}",

                'total_memory_gb': device_props.total_memory / (1024**3),

                'multiprocessor_count': device_props.multi_processor_count,

                'max_threads_per_block': device_props.max_threads_per_block,

                'type': 'NVIDIA_CUDA'

            }

            gpu_list.append(gpu_info)

            

        return gpu_list

    

    def detect_apple_gpu(self) -> Optional[List[Dict]]:

        """Detect Apple Silicon GPU (Metal Performance Shaders)"""

        if not torch.backends.mps.is_available():

            return None

            

        gpu_info = {

            'index': 0,

            'name': 'Apple Silicon GPU',

            'type': 'APPLE_MPS',

            'backend': 'Metal Performance Shaders'

        }

        

        try:

            if platform.system() == 'Darwin':

                result = subprocess.run(['sysctl', '-n', 'machdep.cpu.brand_string'], 

                                      capture_output=True, text=True)

                if result.returncode == 0:

                    gpu_info['chip'] = result.stdout.strip()

        except Exception as e:

            gpu_info['detection_note'] = f"Could not determine specific chip: {e}"

            

        return [gpu_info]

    

    def detect_memory(self) -> Dict:

        """Detect system and GPU memory specifications"""

        vm = psutil.virtual_memory()

        self.memory_info['system_total_gb'] = vm.total / (1024**3)

        self.memory_info['system_available_gb'] = vm.available / (1024**3)

        self.memory_info['system_used_percent'] = vm.percent

        

        self.memory_info['gpu_memory'] = []

        

        if torch.cuda.is_available():

            for i in range(torch.cuda.device_count()):

                gpu_mem = {

                    'device': i,

                    'total_gb': torch.cuda.get_device_properties(i).total_memory / (1024**3),

                    'reserved_gb': torch.cuda.memory_reserved(i) / (1024**3),

                    'allocated_gb': torch.cuda.memory_allocated(i) / (1024**3)

                }

                self.memory_info['gpu_memory'].append(gpu_mem)

                

        return self.memory_info

    

    def detect_all(self) -> Dict:

        """Perform complete hardware detection"""

        self.detect_cpu()

        

        nvidia_gpus = self.detect_nvidia_gpu()

        apple_gpus = self.detect_apple_gpu()

        

        if nvidia_gpus:

            self.gpu_info['devices'] = nvidia_gpus

            self.gpu_info['primary_type'] = 'NVIDIA_CUDA'

        elif apple_gpus:

            self.gpu_info['devices'] = apple_gpus

            self.gpu_info['primary_type'] = 'APPLE_MPS'

        else:

            self.gpu_info['devices'] = []

            self.gpu_info['primary_type'] = 'CPU_ONLY'

            

        self.detect_memory()

        self.detected = True

        

        return {

            'cpu': self.cpu_info,

            'gpu': self.gpu_info,

            'memory': self.memory_info

        }



class ModelManager:

    """Manages local LLM model files"""

    

    def __init__(self, models_directory: str = "./models"):

        self.models_directory = Path(models_directory)

        self.models_directory.mkdir(parents=True, exist_ok=True)

        self.registry_file = self.models_directory / "model_registry.json"

        self.loaded_models = {}

        self.model_registry = self._load_registry()

        

    def _load_registry(self) -> Dict:

        """Load the model registry from disk"""

        if self.registry_file.exists():

            try:

                with open(self.registry_file, 'r') as f:

                    return json.load(f)

            except Exception as e:

                print(f"Error loading registry: {e}")

                return {}

        return {}

    

    def _save_registry(self):

        """Save the model registry to disk"""

        try:

            with open(self.registry_file, 'w') as f:

                json.dump(self.model_registry, f, indent=2)

        except Exception as e:

            print(f"Error saving registry: {e}")

    

    def scan_models(self) -> List[Dict]:

        """Scan the models directory for available GGUF files"""

        models = []

        

        for root, dirs, files in os.walk(self.models_directory):

            for file in files:

                if file.endswith('.gguf'):

                    full_path = Path(root) / file

                    model_info = self._extract_model_info(full_path)

                    models.append(model_info)

                    

        for model in models:

            model_id = model['id']

            if model_id not in self.model_registry:

                self.model_registry[model_id] = {

                    'first_seen': datetime.now().isoformat(),

                    'load_count': 0

                }

            self.model_registry[model_id].update({

                'last_seen': datetime.now().isoformat(),

                'path': model['path'],

                'size_gb': model['size_gb']

            })

            

        self._save_registry()

        return models

    

    def _extract_model_info(self, model_path: Path) -> Dict:

        """Extract information from model file"""

        file_size = model_path.stat().st_size

        filename = model_path.stem

        

        parts = filename.lower().split('.')

        base_name = parts[0] if parts else filename

        quant = parts[1] if len(parts) > 1 else 'unknown'

        

        model_info = {

            'id': filename,

            'name': base_name,

            'path': str(model_path),

            'filename': model_path.name,

            'size_bytes': file_size,

            'size_gb': round(file_size / (1024**3), 2),

            'quantization': quant,

            'format': 'GGUF'

        }

        

        if '7b' in base_name:

            model_info['estimated_parameters'] = '7B'

        elif '13b' in base_name:

            model_info['estimated_parameters'] = '13B'

        elif '70b' in base_name:

            model_info['estimated_parameters'] = '70B'

        else:

            model_info['estimated_parameters'] = 'Unknown'

            

        return model_info

    

    def delete_model(self, model_id: str) -> bool:

        """Delete a model file and update registry"""

        if model_id in self.loaded_models:

            raise ValueError(f"Cannot delete model {model_id}: currently loaded")

            

        if model_id not in self.model_registry:

            raise ValueError(f"Model {model_id} not found in registry")

            

        model_path = Path(self.model_registry[model_id]['path'])

        

        try:

            if model_path.exists():

                model_path.unlink()

                

            del self.model_registry[model_id]

            self._save_registry()

            

            return True

        except Exception as e:

            raise Exception(f"Error deleting model: {e}")



class InferenceEngine:

    """Manages model loading and text generation"""

    

    def __init__(self, hardware_detector: HardwareDetector):

        self.hardware = hardware_detector

        self.model = None

        self.model_path = None

        self.generation_lock = threading.Lock()

        

        self.default_params = {

            'temperature': 0.8,

            'top_p': 0.95,

            'top_k': 40,

            'repeat_penalty': 1.1,

            'max_tokens': 512,

            'n_ctx': 2048,

            'n_batch': 512,

            'n_threads': None,

            'n_gpu_layers': 0,

            'verbose': False

        }

    

    def calculate_optimal_gpu_layers(self, model_size_gb: float, 

                                    gpu_memory_gb: float,

                                    offload_percentage: float = 100.0) -> int:

        """Calculate optimal number of layers to offload to GPU"""

        if gpu_memory_gb <= 0 or offload_percentage <= 0:

            return 0

            

        available_memory = max(0, gpu_memory_gb - 2.0)

        

        estimated_total_layers = 32

        

        if model_size_gb > 10:

            estimated_total_layers = 40

        elif model_size_gb > 20:

            estimated_total_layers = 60

            

        memory_per_layer = model_size_gb / estimated_total_layers

        max_layers = int(available_memory / memory_per_layer)

        target_layers = int(max_layers * (offload_percentage / 100.0))

        

        return max(0, min(target_layers, estimated_total_layers))

    

    def load_model(self, model_path: str, **kwargs) -> bool:

        """Load a model with specified parameters"""

        params = self.default_params.copy()

        params.update(kwargs)

        

        if params['n_threads'] is None:

            params['n_threads'] = self.hardware.cpu_info.get('physical_cores', 4)

            

        if params['n_gpu_layers'] > 0:

            if self.hardware.gpu_info['primary_type'] == 'CPU_ONLY':

                print("Warning: GPU layers requested but no GPU detected. Using CPU only.")

                params['n_gpu_layers'] = 0

                

        try:

            if self.model is not None:

                del self.model

                self.model = None

                

            self.model = Llama(

                model_path=model_path,

                n_ctx=params['n_ctx'],

                n_batch=params['n_batch'],

                n_threads=params['n_threads'],

                n_gpu_layers=params['n_gpu_layers'],

                verbose=params['verbose']

            )

            

            self.model_path = model_path

            return True

            

        except Exception as e:

            print(f"Error loading model: {e}")

            return False

    

    def generate(self, prompt: str, stream: bool = False, **kwargs) -> Optional[str]:

        """Generate text from prompt"""

        if self.model is None:

            raise ValueError("No model loaded")

            

        gen_params = {

            'temperature': kwargs.get('temperature', self.default_params['temperature']),

            'top_p': kwargs.get('top_p', self.default_params['top_p']),

            'top_k': kwargs.get('top_k', self.default_params['top_k']),

            'repeat_penalty': kwargs.get('repeat_penalty', self.default_params['repeat_penalty']),

            'max_tokens': kwargs.get('max_tokens', self.default_params['max_tokens']),

            'stream': stream

        }

        

        try:

            with self.generation_lock:

                output = self.model(prompt, **gen_params)

                

                if stream:

                    return output

                else:

                    return output['choices'][0]['text']

                    

        except Exception as e:

            print(f"Error during generation: {e}")

            return None

    

    def generate_stream(self, prompt: str, **kwargs) -> Iterator[str]:

        """Generate text with streaming output"""

        if self.model is None:

            raise ValueError("No model loaded")

            

        gen_params = {

            'temperature': kwargs.get('temperature', self.default_params['temperature']),

            'top_p': kwargs.get('top_p', self.default_params['top_p']),

            'top_k': kwargs.get('top_k', self.default_params['top_k']),

            'repeat_penalty': kwargs.get('repeat_penalty', self.default_params['repeat_penalty']),

            'max_tokens': kwargs.get('max_tokens', self.default_params['max_tokens']),

            'stream': True

        }

        

        try:

            with self.generation_lock:

                for output in self.model(prompt, **gen_params):

                    token = output['choices'][0]['text']

                    yield token

        except Exception as e:

            yield f"\n\nError during generation: {e}"



class PromptTemplateManager:

    """Manages prompt templates for different model formats"""

    

    def __init__(self):

        self.templates = {

            'llama2-chat': {

                'name': 'LLaMA 2 Chat',

                'system_prefix': '[INST] <<SYS>>\n',

                'system_suffix': '\n<</SYS>>\n\n',

                'user_prefix': '',

                'user_suffix': ' [/INST] ',

                'assistant_prefix': '',

                'assistant_suffix': ' ',

                'bos_token': '<s>',

                'eos_token': '</s>'

            },

            'alpaca': {

                'name': 'Alpaca',

                'system_prefix': '',

                'system_suffix': '\n\n',

                'user_prefix': '### Instruction:\n',

                'user_suffix': '\n\n',

                'assistant_prefix': '### Response:\n',

                'assistant_suffix': '\n\n',

                'bos_token': '',

                'eos_token': ''

            },

            'chatml': {

                'name': 'ChatML',

                'system_prefix': '<|im_start|>system\n',

                'system_suffix': '<|im_end|>\n',

                'user_prefix': '<|im_start|>user\n',

                'user_suffix': '<|im_end|>\n',

                'assistant_prefix': '<|im_start|>assistant\n',

                'assistant_suffix': '<|im_end|>\n',

                'bos_token': '',

                'eos_token': ''

            },

            'raw': {

                'name': 'Raw (No Template)',

                'system_prefix': '',

                'system_suffix': '\n\n',

                'user_prefix': '',

                'user_suffix': '\n\n',

                'assistant_prefix': '',

                'assistant_suffix': '',

                'bos_token': '',

                'eos_token': ''

            }

        }

        self.custom_templates = {}

    

    def apply_template(self, template_name: str, system_message: str,

                      conversation_history: list, user_message: str) -> str:

        """Apply a template to format the complete prompt"""

        template = self.templates.get(template_name) or self.custom_templates.get(template_name)

        

        if not template:

            raise ValueError(f"Template '{template_name}' not found")

            

        prompt = template['bos_token']

        

        if system_message:

            prompt += template['system_prefix']

            prompt += system_message

            prompt += template['system_suffix']

            

        for turn in conversation_history:

            if turn['role'] == 'user':

                prompt += template['user_prefix']

                prompt += turn['content']

                prompt += template['user_suffix']

            elif turn['role'] == 'assistant':

                prompt += template['assistant_prefix']

                prompt += turn['content']

                prompt += template['assistant_suffix']

                

        prompt += template['user_prefix']

        prompt += user_message

        prompt += template['user_suffix']

        prompt += template['assistant_prefix']

        

        return prompt

    

    def create_custom_template(self, name: str, template_dict: dict) -> bool:

        """Create a custom prompt template"""

        required_fields = ['system_prefix', 'system_suffix', 'user_prefix', 

                          'user_suffix', 'assistant_prefix', 'assistant_suffix']

        

        for field in required_fields:

            if field not in template_dict:

                raise ValueError(f"Template missing required field: {field}")

                

        if 'bos_token' not in template_dict:

            template_dict['bos_token'] = ''

        if 'eos_token' not in template_dict:

            template_dict['eos_token'] = ''

            

        template_dict['name'] = name

        self.custom_templates[name] = template_dict

        

        return True



class DocumentProcessor:

    """Processes various document formats for RAG"""

    

    def __init__(self):

        self.supported_formats = ['.pdf', '.docx', '.html', '.htm', '.md', '.txt']

        

    def process_document(self, file_path: str) -> Dict:

        """Process a document and extract text content"""

        file_path = Path(file_path)

        

        if not file_path.exists():

            raise FileNotFoundError(f"Document not found: {file_path}")

            

        extension = file_path.suffix.lower()

        

        if extension not in self.supported_formats:

            raise ValueError(f"Unsupported format: {extension}")

            

        if extension == '.pdf':

            text = self._extract_pdf(file_path)

        elif extension == '.docx':

            text = self._extract_docx(file_path)

        elif extension in ['.html', '.htm']:

            text = self._extract_html(file_path)

        elif extension == '.md':

            text = self._extract_markdown(file_path)

        else:

            text = self._extract_text(file_path)

            

        doc_id = hashlib.md5(str(file_path).encode()).hexdigest()

        

        return {

            'id': doc_id,

            'path': str(file_path),

            'filename': file_path.name,

            'format': extension,

            'text': text,

            'length': len(text)

        }

    

    def _extract_pdf(self, file_path: Path) -> str:

        """Extract text from PDF using PyMuPDF"""

        if fitz is None:

            raise ImportError("PyMuPDF not installed")

            

        text_parts = []

        

        try:

            doc = fitz.open(file_path)

            

            for page_num in range(len(doc)):

                page = doc[page_num]

                text = page.get_text()

                

                if text.strip():

                    text_parts.append(f"--- Page {page_num + 1} ---\n{text}")

                    

            doc.close()

            

        except Exception as e:

            raise Exception(f"Error extracting PDF: {e}")

            

        return "\n\n".join(text_parts)

    

    def _extract_docx(self, file_path: Path) -> str:

        """Extract text from Word document"""

        if DocxDocument is None:

            raise ImportError("python-docx not installed")

            

        try:

            doc = DocxDocument(file_path)

            text_parts = []

            

            for paragraph in doc.paragraphs:

                if paragraph.text.strip():

                    text_parts.append(paragraph.text)

                    

            for table in doc.tables:

                for row in table.rows:

                    row_text = []

                    for cell in row.cells:

                        if cell.text.strip():

                            row_text.append(cell.text)

                    if row_text:

                        text_parts.append(" | ".join(row_text))

                        

            return "\n\n".join(text_parts)

            

        except Exception as e:

            raise Exception(f"Error extracting DOCX: {e}")

    

    def _extract_html(self, file_path: Path) -> str:

        """Extract text from HTML document"""

        if BeautifulSoup is None:

            raise ImportError("BeautifulSoup4 not installed")

            

        try:

            with open(file_path, 'r', encoding='utf-8') as f:

                html_content = f.read()

                

            soup = BeautifulSoup(html_content, 'html.parser')

            

            for script in soup(["script", "style"]):

                script.decompose()

                

            text = soup.get_text()

            

            lines = (line.strip() for line in text.splitlines())

            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))

            text = '\n'.join(chunk for chunk in chunks if chunk)

            

            return text

            

        except Exception as e:

            raise Exception(f"Error extracting HTML: {e}")

    

    def _extract_markdown(self, file_path: Path) -> str:

        """Extract text from Markdown document"""

        try:

            with open(file_path, 'r', encoding='utf-8') as f:

                md_content = f.read()

                

            if markdown is not None and BeautifulSoup is not None:

                html = markdown.markdown(md_content)

                soup = BeautifulSoup(html, 'html.parser')

                text = soup.get_text()

            else:

                text = md_content

            

            return text

            

        except Exception as e:

            raise Exception(f"Error extracting Markdown: {e}")

    

    def _extract_text(self, file_path: Path) -> str:

        """Extract text from plain text file"""

        try:

            with open(file_path, 'r', encoding='utf-8') as f:

                return f.read()

        except UnicodeDecodeError:

            with open(file_path, 'r', encoding='latin-1') as f:

                return f.read()



class TextChunker:

    """Splits text into overlapping chunks for RAG"""

    

    def __init__(self, chunk_size: int = 512, overlap: int = 50):

        self.chunk_size = chunk_size

        self.overlap = overlap

        

    def chunk_text(self, text: str, doc_id: str) -> List[Dict]:

        """Split text into overlapping chunks"""

        sentences = text.replace('\n', ' ').split('. ')

        

        chunks = []

        current_chunk = []

        current_length = 0

        chunk_index = 0

        

        for sentence in sentences:

            sentence = sentence.strip()

            if not sentence:

                continue

                

            sentence_length = len(sentence.split())

            

            if current_length + sentence_length > self.chunk_size and current_chunk:

                chunk_text = '. '.join(current_chunk) + '.'

                chunks.append({

                    'doc_id': doc_id,

                    'chunk_index': chunk_index,

                    'text': chunk_text,

                    'length': current_length

                })

                

                overlap_sentences = current_chunk[-self.overlap:] if len(current_chunk) > self.overlap else current_chunk

                current_chunk = overlap_sentences

                current_length = sum(len(s.split()) for s in current_chunk)

                chunk_index += 1

                

            current_chunk.append(sentence)

            current_length += sentence_length

            

        if current_chunk:

            chunk_text = '. '.join(current_chunk) + '.'

            chunks.append({

                'doc_id': doc_id,

                'chunk_index': chunk_index,

                'text': chunk_text,

                'length': current_length

            })

            

        return chunks



class EmbeddingGenerator:

    """Generates embeddings for text chunks"""

    

    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):

        """Initialize embedding model"""

        if SentenceTransformer is None:

            raise ImportError("sentence-transformers not installed")

            

        self.model = SentenceTransformer(model_name)

        self.embedding_dim = self.model.get_sentence_embedding_dimension()

        

    def generate_embeddings(self, texts: List[str]) -> np.ndarray:

        """Generate embeddings for a list of texts"""

        embeddings = self.model.encode(texts, show_progress_bar=False)

        return embeddings



class VectorStore:

    """Stores and retrieves document embeddings using FAISS"""

    

    def __init__(self, embedding_dim: int):

        if faiss is None:

            raise ImportError("faiss not installed")

            

        self.embedding_dim = embedding_dim

        self.index = faiss.IndexFlatL2(embedding_dim)

        self.chunks = []

        self.doc_metadata = {}

        

    def add_document(self, doc_id: str, chunks: List[Dict], embeddings: np.ndarray):

        """Add document chunks and embeddings to the store"""

        self.doc_metadata[doc_id] = {

            'num_chunks': len(chunks),

            'added_at': datetime.now().isoformat()

        }

        

        self.index.add(embeddings.astype('float32'))

        

        for chunk in chunks:

            self.chunks.append(chunk)

    

    def search(self, query_embedding: np.ndarray, top_k: int = 3) -> List[Dict]:

        """Search for most similar chunks"""

        if self.index.ntotal == 0:

            return []

            

        distances, indices = self.index.search(

            query_embedding.astype('float32').reshape(1, -1), 

            min(top_k, self.index.ntotal)

        )

        

        results = []

        for i, (dist, idx) in enumerate(zip(distances[0], indices[0])):

            if idx < len(self.chunks):

                chunk = self.chunks[idx].copy()

                chunk['similarity_score'] = float(1 / (1 + dist))

                chunk['rank'] = i + 1

                results.append(chunk)

                

        return results



class RAGOrchestrator:

    """Orchestrates document processing and retrieval for RAG"""

    

    def __init__(self, embedding_model_name: str = 'all-MiniLM-L6-v2'):

        self.doc_processor = DocumentProcessor()

        self.chunker = TextChunker()

        

        try:

            self.embedding_generator = EmbeddingGenerator(embedding_model_name)

            self.vector_store = VectorStore(self.embedding_generator.embedding_dim)

            self.enabled = True

        except ImportError as e:

            print(f"RAG disabled: {e}")

            self.enabled = False

        

    def add_document(self, file_path: str) -> Dict:

        """Process and add a document to the RAG system"""

        if not self.enabled:

            raise RuntimeError("RAG not available - missing dependencies")

            

        doc_info = self.doc_processor.process_document(file_path)

        chunks = self.chunker.chunk_text(doc_info['text'], doc_info['id'])

        

        chunk_texts = [chunk['text'] for chunk in chunks]

        embeddings = self.embedding_generator.generate_embeddings(chunk_texts)

        

        self.vector_store.add_document(doc_info['id'], chunks, embeddings)

        

        return {

            'doc_id': doc_info['id'],

            'filename': doc_info['filename'],

            'num_chunks': len(chunks),

            'status': 'success'

        }

    

    def augment_prompt(self, query: str, top_k: int = 3) -> str:

        """Augment a query with relevant context from documents"""

        if not self.enabled:

            return query

            

        query_embedding = self.embedding_generator.generate_embeddings([query])[0]

        results = self.vector_store.search(query_embedding, top_k)

        

        if not results:

            return query

            

        context_parts = ["Based on the following context, please answer the question.\n\nContext:"]

        

        for result in results:

            context_parts.append(f"\n{result['text']}")

            

        context_parts.append(f"\n\nQuestion: {query}\n\nAnswer:")

        

        return "\n".join(context_parts)



class PerformanceMonitor:

    """Monitors and reports performance metrics"""

    

    def __init__(self, history_size: int = 100):

        self.history_size = history_size

        self.latency_history = deque(maxlen=history_size)

        self.throughput_history = deque(maxlen=history_size)

        self.memory_history = deque(maxlen=history_size)

        

    def record_generation(self, prompt_tokens: int,

                         generated_tokens: int,

                         total_time: float,

                         time_to_first_token: float):

        """Record metrics for a generation"""

        self.latency_history.append({

            'total_time': total_time,

            'time_to_first_token': time_to_first_token,

            'timestamp': time.time()

        })

        

        tokens_per_second = generated_tokens / total_time if total_time > 0 else 0

        self.throughput_history.append({

            'tokens_per_second': tokens_per_second,

            'generated_tokens': generated_tokens,

            'timestamp': time.time()

        })

        

    def get_statistics(self) -> Dict:

        """Calculate performance statistics"""

        if not self.latency_history:

            return {}

            

        latencies = [entry['total_time'] for entry in self.latency_history]

        ttfts = [entry['time_to_first_token'] for entry in self.latency_history]

        throughputs = [entry['tokens_per_second'] for entry in self.throughput_history]

        

        return {

            'latency': {

                'mean': sum(latencies) / len(latencies),

                'min': min(latencies),

                'max': max(latencies),

                'recent': latencies[-1] if latencies else 0

            },

            'time_to_first_token': {

                'mean': sum(ttfts) / len(ttfts),

                'min': min(ttfts),

                'max': max(ttfts),

                'recent': ttfts[-1] if ttfts else 0

            },

            'throughput': {

                'mean': sum(throughputs) / len(throughputs),

                'min': min(throughputs),

                'max': max(throughputs),

                'recent': throughputs[-1] if throughputs else 0

            },

            'sample_count': len(self.latency_history)

        }



class ChatbotUI:

    """Gradio-based user interface for the chatbot"""

    

    def __init__(self, hardware_detector: HardwareDetector, 

                 model_manager: ModelManager,

                 inference_engine: InferenceEngine,

                 template_manager: PromptTemplateManager,

                 rag_orchestrator: RAGOrchestrator):

        self.hardware = hardware_detector

        self.models = model_manager

        self.engine = inference_engine

        self.templates = template_manager

        self.rag = rag_orchestrator

        self.performance_monitor = PerformanceMonitor()

        

        self.conversation_history = []

        self.current_system_message = ""

        self.current_template = "llama2-chat"

        self.use_rag = False

    

    def create_model_tab(self):

        """Create the model management tab"""

        with gr.Tab("Model Management"):

            with gr.Row():

                with gr.Column(scale=2):

                    model_dropdown = gr.Dropdown(

                        label="Available Models",

                        choices=[],

                        interactive=True

                    )

                    

                    scan_button = gr.Button("Scan for Models", variant="secondary")

                    

                    model_info = gr.Textbox(

                        label="Model Information",

                        lines=5,

                        interactive=False

                    )

                    

                with gr.Column(scale=3):

                    gr.Markdown("### Loading Parameters")

                    

                    n_ctx = gr.Slider(

                        minimum=512,

                        maximum=8192,

                        value=2048,

                        step=512,

                        label="Context Window Size",

                        info="Maximum number of tokens in context"

                    )

                    

                    n_gpu_layers = gr.Slider(

                        minimum=0,

                        maximum=100,

                        value=0,

                        step=1,

                        label="GPU Layers",

                        info="Number of layers to offload to GPU (0 = CPU only)"

                    )

                    

                    gpu_percentage = gr.Slider(

                        minimum=0,

                        maximum=100,

                        value=100,

                        step=5,

                        label="GPU Offload Percentage",

                        info="Percentage of model to offload to GPU"

                    )

                    

                    n_threads = gr.Slider(

                        minimum=1,

                        maximum=32,

                        value=4,

                        step=1,

                        label="CPU Threads",

                        info="Number of threads for CPU inference"

                    )

                    

                    load_button = gr.Button("Load Model", variant="primary")

                    load_status = gr.Textbox(label="Status", interactive=False)

                    

        return {

            'model_dropdown': model_dropdown,

            'scan_button': scan_button,

            'model_info': model_info,

            'n_ctx': n_ctx,

            'n_gpu_layers': n_gpu_layers,

            'gpu_percentage': gpu_percentage,

            'n_threads': n_threads,

            'load_button': load_button,

            'load_status': load_status

        }

    

    def create_inference_tab(self):

        """Create the inference parameters tab"""

        with gr.Tab("Inference Parameters"):

            gr.Markdown("### Generation Settings")

            

            with gr.Row():

                with gr.Column():

                    temperature = gr.Slider(

                        minimum=0.0,

                        maximum=2.0,

                        value=0.8,

                        step=0.05,

                        label="Temperature",

                        info="Controls randomness (lower = more deterministic)"

                    )

                    

                    top_p = gr.Slider(

                        minimum=0.0,

                        maximum=1.0,

                        value=0.95,

                        step=0.05,

                        label="Top P",

                        info="Nucleus sampling threshold"

                    )

                    

                    top_k = gr.Slider(

                        minimum=0,

                        maximum=200,

                        value=40,

                        step=5,

                        label="Top K",

                        info="Number of top tokens to consider"

                    )

                    

                with gr.Column():

                    repeat_penalty = gr.Slider(

                        minimum=1.0,

                        maximum=2.0,

                        value=1.1,

                        step=0.05,

                        label="Repetition Penalty",

                        info="Penalize repeated tokens"

                    )

                    

                    max_tokens = gr.Slider(

                        minimum=64,

                        maximum=2048,

                        value=512,

                        step=64,

                        label="Max Tokens",

                        info="Maximum length of generated response"

                    )

                    

            gr.Markdown("### Prompt Template")

            

            template_dropdown = gr.Dropdown(

                label="Template",

                choices=list(self.templates.templates.keys()),

                value="llama2-chat",

                interactive=True

            )

            

            system_message = gr.Textbox(

                label="System Message",

                lines=3,

                placeholder="Enter system instructions here...",

                value="You are a helpful AI assistant."

            )

            

        return {

            'temperature': temperature,

            'top_p': top_p,

            'top_k': top_k,

            'repeat_penalty': repeat_penalty,

            'max_tokens': max_tokens,

            'template_dropdown': template_dropdown,

            'system_message': system_message

        }

    

    def create_chat_tab(self):

        """Create the main chat interface tab"""

        with gr.Tab("Chat"):

            chatbot = gr.Chatbot(

                label="Conversation",

                height=500,

                show_label=True

            )

            

            with gr.Row():

                user_input = gr.Textbox(

                    label="Your Message",

                    placeholder="Type your message here...",

                    lines=3,

                    scale=4

                )

                

                with gr.Column(scale=1):

                    send_button = gr.Button("Send", variant="primary")

                    clear_button = gr.Button("Clear History", variant="secondary")

                    

            with gr.Row():

                use_rag_checkbox = gr.Checkbox(

                    label="Use RAG (Retrieval Augmented Generation)",

                    value=False

                )

                

                rag_top_k = gr.Slider(

                    minimum=1,

                    maximum=10,

                    value=3,

                    step=1,

                    label="Number of Context Chunks",

                    visible=False

                )

                

        return {

            'chatbot': chatbot,

            'user_input': user_input,

            'send_button': send_button,

            'clear_button': clear_button,

            'use_rag_checkbox': use_rag_checkbox,

            'rag_top_k': rag_top_k

        }

    

    def create_document_tab(self):

        """Create the document management tab for RAG"""

        with gr.Tab("Documents (RAG)"):

            gr.Markdown("### Upload Documents for RAG")

            

            file_upload = gr.File(

                label="Upload Document",

                file_types=['.pdf', '.docx', '.html', '.htm', '.md', '.txt'],

                type="filepath"

            )

            

            upload_button = gr.Button("Process Document", variant="primary")

            upload_status = gr.Textbox(label="Processing Status", interactive=False)

            

            gr.Markdown("### Processed Documents")

            

            documents_list = gr.Dataframe(

                headers=["Document ID", "Filename", "Chunks", "Status"],

                datatype=["str", "str", "number", "str"],

                interactive=False

            )

            

            refresh_docs_button = gr.Button("Refresh List", variant="secondary")

            

        return {

            'file_upload': file_upload,

            'upload_button': upload_button,

            'upload_status': upload_status,

            'documents_list': documents_list,

            'refresh_docs_button': refresh_docs_button

        }

    

    def create_status_tab(self):

        """Create the system status tab"""

        with gr.Tab("System Status"):

            gr.Markdown("### Hardware Information")

            

            hardware_info = gr.JSON(

                label="Detected Hardware",

                value={}

            )

            

            refresh_hw_button = gr.Button("Refresh Hardware Info", variant="secondary")

            

            gr.Markdown("### Resource Utilization")

            

            with gr.Row():

                cpu_usage = gr.Textbox(label="CPU Usage", interactive=False)

                memory_usage = gr.Textbox(label="Memory Usage", interactive=False)

                gpu_usage = gr.Textbox(label="GPU Usage", interactive=False)

                

            refresh_usage_button = gr.Button("Refresh Usage", variant="secondary")

            

            gr.Markdown("### Performance Metrics")

            

            performance_stats = gr.JSON(

                label="Generation Statistics",

                value={}

            )

            

            refresh_perf_button = gr.Button("Refresh Performance", variant="secondary")

            

        return {

            'hardware_info': hardware_info,

            'refresh_hw_button': refresh_hw_button,

            'cpu_usage': cpu_usage,

            'memory_usage': memory_usage,

            'gpu_usage': gpu_usage,

            'refresh_usage_button': refresh_usage_button,

            'performance_stats': performance_stats,

            'refresh_perf_button': refresh_perf_button

        }

    

    def setup_event_handlers(self, components: Dict):

        """Setup all event handlers for the UI"""

        

        def scan_models_handler():

            models = self.models.scan_models()

            choices = [f"{m['name']} ({m['size_gb']} GB)" for m in models]

            return gr.Dropdown(choices=choices)

            

        components['scan_button'].click(

            fn=scan_models_handler,

            outputs=components['model_dropdown']

        )

        

        def load_model_handler(model_name, n_ctx, n_gpu_layers, gpu_pct, n_threads):

            try:

                models = self.models.scan_models()

                selected_model = None

                for m in models:

                    if f"{m['name']} ({m['size_gb']} GB)" == model_name:

                        selected_model = m

                        break

                        

                if not selected_model:

                    return "Error: Model not found"

                    

                if gpu_pct < 100:

                    n_gpu_layers = int(n_gpu_layers * (gpu_pct / 100.0))

                    

                success = self.engine.load_model(

                    selected_model['path'],

                    n_ctx=n_ctx,

                    n_gpu_layers=n_gpu_layers,

                    n_threads=n_threads

                )

                

                if success:

                    return f"Successfully loaded {selected_model['name']}"

                else:

                    return "Error loading model"

                    

            except Exception as e:

                return f"Error: {str(e)}"

                

        components['load_button'].click(

            fn=load_model_handler,

            inputs=[

                components['model_dropdown'],

                components['n_ctx'],

                components['n_gpu_layers'],

                components['gpu_percentage'],

                components['n_threads']

            ],

            outputs=components['load_status']

        )

        

        def send_message_handler(user_msg, history, system_msg, template, 

                                use_rag, rag_k, temp, top_p, top_k, 

                                repeat_pen, max_tok):

            if not user_msg.strip():

                return history, ""

                

            history = history or []

            history.append([user_msg, None])

            

            conv_history = []

            for h in history[:-1]:

                if h[0]:

                    conv_history.append({'role': 'user', 'content': h[0]})

                if h[1]:

                    conv_history.append({'role': 'assistant', 'content': h[1]})

                    

            query = user_msg

            if use_rag and self.rag.enabled:

                query = self.rag.augment_prompt(user_msg, top_k=rag_k)

                

            prompt = self.templates.apply_template(

                template, system_msg, conv_history, query

            )

            

            response = ""

            start_time = time.time()

            first_token_time = None

            token_count = 0

            

            try:

                for token in self.engine.generate_stream(

                    prompt,

                    temperature=temp,

                    top_p=top_p,

                    top_k=top_k,

                    repeat_penalty=repeat_pen,

                    max_tokens=max_tok

                ):

                    if first_token_time is None:

                        first_token_time = time.time() - start_time

                    

                    response += token

                    token_count += 1

                    history[-1][1] = response

                    yield history, ""

                

                total_time = time.time() - start_time

                self.performance_monitor.record_generation(

                    len(prompt.split()),

                    token_count,

                    total_time,

                    first_token_time or 0

                )

                    

            except Exception as e:

                history[-1][1] = f"Error: {str(e)}"

                yield history, ""

                

            return history, ""

            

        components['send_button'].click(

            fn=send_message_handler,

            inputs=[

                components['user_input'],

                components['chatbot'],

                components['system_message'],

                components['template_dropdown'],

                components['use_rag_checkbox'],

                components['rag_top_k'],

                components['temperature'],

                components['top_p'],

                components['top_k'],

                components['repeat_penalty'],

                components['max_tokens']

            ],

            outputs=[components['chatbot'], components['user_input']]

        )

        

        def clear_history_handler():

            return [], ""

            

        components['clear_button'].click(

            fn=clear_history_handler,

            outputs=[components['chatbot'], components['user_input']]

        )

        

        def toggle_rag_handler(use_rag):

            return gr.Slider(visible=use_rag)

            

        components['use_rag_checkbox'].change(

            fn=toggle_rag_handler,

            inputs=components['use_rag_checkbox'],

            outputs=components['rag_top_k']

        )

        

        def upload_document_handler(file_path):

            if not file_path:

                return "No file selected", []

                

            try:

                result = self.rag.add_document(file_path)

                status = f"Successfully processed {result['filename']}: {result['num_chunks']} chunks created"

                

                docs_data = []

                for doc_id, metadata in self.rag.vector_store.doc_metadata.items():

                    docs_data.append([

                        doc_id[:8],

                        Path(metadata.get('path', '')).name if 'path' in metadata else 'Unknown',

                        metadata['num_chunks'],

                        'Processed'

                    ])

                    

                return status, docs_data

                

            except Exception as e:

                return f"Error: {str(e)}", []

                

        components['upload_button'].click(

            fn=upload_document_handler,

            inputs=components['file_upload'],

            outputs=[components['upload_status'], components['documents_list']]

        )

        

        def refresh_docs_handler():

            docs_data = []

            if self.rag.enabled:

                for doc_id, metadata in self.rag.vector_store.doc_metadata.items():

                    docs_data.append([

                        doc_id[:8],

                        Path(metadata.get('path', '')).name if 'path' in metadata else 'Unknown',

                        metadata['num_chunks'],

                        'Processed'

                    ])

            return docs_data

            

        components['refresh_docs_button'].click(

            fn=refresh_docs_handler,

            outputs=components['documents_list']

        )

        

        def refresh_hardware_handler():

            hw_info = self.hardware.detect_all()

            return hw_info

            

        components['refresh_hw_button'].click(

            fn=refresh_hardware_handler,

            outputs=components['hardware_info']

        )

        

        def refresh_usage_handler():

            cpu_pct = psutil.cpu_percent(interval=1)

            mem = psutil.virtual_memory()

            

            cpu_str = f"{cpu_pct}%"

            mem_str = f"{mem.percent}% ({mem.used / (1024**3):.1f} GB / {mem.total / (1024**3):.1f} GB)"

            

            gpu_str = "N/A"

            if torch.cuda.is_available():

                gpu_mem = torch.cuda.memory_allocated(0) / (1024**3)

                gpu_total = torch.cuda.get_device_properties(0).total_memory / (1024**3)

                gpu_str = f"{(gpu_mem/gpu_total)*100:.1f}% ({gpu_mem:.1f} GB / {gpu_total:.1f} GB)"

                

            return cpu_str, mem_str, gpu_str

            

        components['refresh_usage_button'].click(

            fn=refresh_usage_handler,

            outputs=[components['cpu_usage'], components['memory_usage'], components['gpu_usage']]

        )

        

        def refresh_performance_handler():

            stats = self.performance_monitor.get_statistics()

            return stats

            

        components['refresh_perf_button'].click(

            fn=refresh_performance_handler,

            outputs=components['performance_stats']

        )

    

    def build_interface(self):

        """Build the complete Gradio interface"""

        with gr.Blocks(title="Local LLM Chatbot", theme=gr.themes.Soft()) as interface:

            gr.Markdown("# Local LLM Chatbot with RAG")

            gr.Markdown("Advanced local language model interface with hardware optimization and document retrieval")

            

            model_components = self.create_model_tab()

            inference_components = self.create_inference_tab()

            chat_components = self.create_chat_tab()

            document_components = self.create_document_tab()

            status_components = self.create_status_tab()

            

            all_components = {

                **model_components,

                **inference_components,

                **chat_components,

                **document_components,

                **status_components

            }

            

            self.setup_event_handlers(all_components)

            

            interface.load(

                fn=lambda: self.hardware.detect_all(),

                outputs=all_components['hardware_info']

            )

            

        return interface



def main():

    """Main application entry point"""

    print("=" * 80)

    print("LOCAL LLM CHATBOT - Initializing...")

    print("=" * 80)

    

    hardware = HardwareDetector()

    print("\nDetecting hardware...")

    hw_info = hardware.detect_all()

    print(f"  CPU: {hw_info['cpu']['physical_cores']} physical cores, {hw_info['cpu']['logical_cores']} logical cores")

    print(f"  GPU: {hw_info['gpu']['primary_type']}")

    if hw_info['gpu']['devices']:

        for gpu in hw_info['gpu']['devices']:

            print(f"    - {gpu['name']}")

    print(f"  RAM: {hw_info['memory']['system_total_gb']:.1f} GB total, {hw_info['memory']['system_available_gb']:.1f} GB available")

    

    models = ModelManager()

    engine = InferenceEngine(hardware)

    templates = PromptTemplateManager()

    

    try:

        rag = RAGOrchestrator()

        print("\n  RAG system initialized successfully")

    except Exception as e:

        print(f"\n  RAG initialization failed: {e}")

        rag = RAGOrchestrator()

    

    ui = ChatbotUI(hardware, models, engine, templates, rag)

    interface = ui.build_interface()

    

    print("\n" + "=" * 80)

    print("LAUNCHING INTERFACE...")

    print("=" * 80)

    print("\nAccess the chatbot at: http://127.0.0.1:7860")

    print("Press Ctrl+C to stop the server\n")

    

    interface.launch(

        share=False, 

        server_name="127.0.0.1", 

        server_port=7860,

        show_error=True

    )



if __name__ == "__main__":

    main()


This is the complete, production-ready chatbot code with all features integrated. To use it:

  1. Install dependencies:

pip install llama-cpp-python torch gradio sentence-transformers faiss-cpu PyMuPDF python-docx beautifulsoup4 markdown psutil numpy

  1. Create a models directory and place your GGUF model files there
  2. Run the application: python chatbot.py


The code includes all features described in the article: hardware detection, model management, RAG support, performance monitoring, and a comprehensive UI.



No comments: