Sunday, June 21, 2026

BUILDING AN LLM-BASED AUTONOMOUS DOCUMENTATION AGENT FOR SOFTWARE ARCHITECTURE


 


INTRODUCTION

Software documentation remains one of the most challenging aspects of modern software engineering. As codebases evolve rapidly through continuous integration and deployment practices, maintaining accurate and comprehensive architecture documentation becomes increasingly difficult. Traditional documentation approaches suffer from staleness, inconsistency, and the significant manual effort required to keep them synchronized with the actual implementation.

This article presents a comprehensive approach to building an autonomous documentation agent that leverages Large Language Models to automatically generate and maintain architecture documentation in the arc42 format. The arc42 template is a widely adopted standard for documenting software and system architectures, providing a structured approach that covers all essential aspects from business context to technical implementation details.

The proposed system addresses several critical challenges in automated documentation. First, it must handle large codebases that exceed the context window limitations of current LLMs. Second, it needs to understand not just individual code files but the relationships and dependencies between different components. Third, it must recognize higher-level architectural patterns and design decisions that are often implicit in the code structure. Fourth, it should integrate seamlessly with existing development workflows, particularly Git-based version control systems. Finally, it must be flexible enough to work with various LLM providers and hardware configurations, from cloud-based services to local deployments on different GPU architectures.

SYSTEM ARCHITECTURE OVERVIEW

The autonomous documentation agent employs a multi-agent architecture where specialized agents collaborate to analyze code, extract architectural information, and generate comprehensive documentation. This design follows the principle of separation of concerns, allowing each agent to focus on a specific aspect of the documentation generation process.

The system consists of five primary components working in concert. The Organizer Agent serves as the central coordinator, managing the workflow and delegating tasks to specialized agents. The Code Analysis Agents examine source code files to extract structural information, identify components, and understand implementation details. The Architecture Analysis Agent focuses on higher-level concerns, identifying architectural patterns, component relationships, and system-wide design decisions. The Pattern Recognition Agent specializes in detecting both design patterns at the code level and architectural patterns at the system level. Finally, the Documentation Agent synthesizes all gathered information into coherent arc42-formatted documentation.

The system employs a hybrid storage approach to overcome context window limitations. A traditional Retrieval-Augmented Generation database stores textual information including source code, existing documentation, and Architecture Decision Records when available. Complementing this, a GraphRAG database maintains a graph representation of the codebase, capturing dependencies, call relationships, and component interactions. This dual storage strategy enables the system to maintain both detailed textual information and structural relationships efficiently.

To maximize processing efficiency, the system leverages concurrency at multiple levels. Different code modules can be analyzed in parallel, architectural pattern detection can occur simultaneously with code analysis, and documentation generation for different arc42 sections can proceed concurrently when dependencies allow.

Integration with Git repositories enables the system to detect code changes automatically and trigger incremental documentation updates. Rather than regenerating all documentation on every change, the system identifies affected components and updates only the relevant documentation sections, significantly improving efficiency for large codebases.

MULTI-AGENT ORCHESTRATION

The multi-agent architecture represents the core organizational principle of the documentation system. Each agent operates semi-autonomously while coordinating through a central orchestrator to achieve the overall documentation goal.

The Organizer Agent implements a sophisticated workflow management system. When a documentation generation request arrives, the organizer first analyzes the scope of work by examining the codebase structure and identifying major components. It then creates a task graph representing the dependencies between different analysis and documentation tasks. For example, generating the building block view in arc42 format depends on completing the code structure analysis, while the runtime view requires understanding component interactions.

The organizer maintains a task queue and dispatches work to available specialized agents based on their capabilities and current workload. It implements a priority system where critical path tasks receive preferential treatment to minimize overall completion time. The organizer also handles error recovery, retrying failed tasks and potentially reassigning them to different agent instances if persistent failures occur.

Here is a foundational implementation of the agent orchestration system:

import asyncio
from typing import Dict, List, Set, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import logging

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

class AgentType(Enum):
    CODE_ANALYZER = "code_analyzer"
    ARCHITECTURE_ANALYZER = "architecture_analyzer"
    PATTERN_RECOGNIZER = "pattern_recognizer"
    DOCUMENTATION_GENERATOR = "documentation_generator"

@dataclass
class Task:
    task_id: str
    task_type: AgentType
    dependencies: Set[str] = field(default_factory=set)
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[Any] = None
    error: Optional[str] = None
    priority: int = 0
    context: Dict[str, Any] = field(default_factory=dict)

class OrganizerAgent:
    def __init__(self, max_concurrent_tasks: int = 10):
        self.tasks: Dict[str, Task] = {}
        self.max_concurrent_tasks = max_concurrent_tasks
        self.active_tasks: Set[str] = set()
        self.completed_tasks: Set[str] = set()
        self.failed_tasks: Set[str] = set()
        self.logger = logging.getLogger(__name__)
        self.agent_pools: Dict[AgentType, List[Any]] = {}
        
    def add_task(self, task: Task) -> None:
        """Add a task to the orchestration queue with dependency tracking."""
        self.tasks[task.task_id] = task
        self.logger.info(f"Added task {task.task_id} of type {task.task_type}")
        
    def get_ready_tasks(self) -> List[Task]:
        """Identify tasks whose dependencies are satisfied and can be executed."""
        ready_tasks = []
        for task_id, task in self.tasks.items():
            if task.status != TaskStatus.PENDING:
                continue
            if task_id in self.active_tasks:
                continue
            dependencies_met = all(
                dep in self.completed_tasks for dep in task.dependencies
            )
            if dependencies_met:
                ready_tasks.append(task)
        ready_tasks.sort(key=lambda t: t.priority, reverse=True)
        return ready_tasks
    
    async def execute_task(self, task: Task, agent: Any) -> None:
        """Execute a single task using the assigned agent."""
        try:
            self.logger.info(f"Starting execution of task {task.task_id}")
            task.status = TaskStatus.IN_PROGRESS
            self.active_tasks.add(task.task_id)
            
            result = await agent.process(task.context)
            
            task.result = result
            task.status = TaskStatus.COMPLETED
            self.completed_tasks.add(task.task_id)
            self.logger.info(f"Completed task {task.task_id}")
            
        except Exception as e:
            self.logger.error(f"Task {task.task_id} failed: {str(e)}")
            task.status = TaskStatus.FAILED
            task.error = str(e)
            self.failed_tasks.add(task.task_id)
        finally:
            self.active_tasks.discard(task.task_id)
    
    async def orchestrate(self) -> Dict[str, Any]:
        """Main orchestration loop managing task execution with concurrency."""
        pending_tasks = set(self.tasks.keys())
        
        while pending_tasks - self.completed_tasks - self.failed_tasks:
            ready_tasks = self.get_ready_tasks()
            
            if not ready_tasks and self.active_tasks:
                await asyncio.sleep(0.1)
                continue
            
            if not ready_tasks and not self.active_tasks:
                remaining = pending_tasks - self.completed_tasks - self.failed_tasks
                if remaining:
                    self.logger.error(f"Deadlock detected. Remaining tasks: {remaining}")
                    break
                
            available_slots = self.max_concurrent_tasks - len(self.active_tasks)
            tasks_to_execute = ready_tasks[:available_slots]
            
            execution_coroutines = []
            for task in tasks_to_execute:
                agent = self.get_agent_for_task(task)
                execution_coroutines.append(self.execute_task(task, agent))
            
            if execution_coroutines:
                await asyncio.gather(*execution_coroutines, return_exceptions=True)
        
        return {
            "completed": len(self.completed_tasks),
            "failed": len(self.failed_tasks),
            "results": {
                task_id: task.result 
                for task_id, task in self.tasks.items() 
                if task.status == TaskStatus.COMPLETED
            }
        }
    
    def get_agent_for_task(self, task: Task) -> Any:
        """Retrieve an available agent from the pool for the given task type."""
        agent_pool = self.agent_pools.get(task.task_type, [])
        if not agent_pool:
            raise ValueError(f"No agents available for task type {task.task_type}")
        return agent_pool[0]
    
    def register_agent(self, agent_type: AgentType, agent: Any) -> None:
        """Register an agent instance in the appropriate agent pool."""
        if agent_type not in self.agent_pools:
            self.agent_pools[agent_type] = []
        self.agent_pools[agent_type].append(agent)
        self.logger.info(f"Registered agent for type {agent_type}")

This orchestration framework provides the foundation for coordinating multiple specialized agents. The task dependency system ensures that tasks execute in the correct order while maximizing parallelism. For instance, when analyzing a large codebase, the system can analyze multiple independent modules concurrently while ensuring that cross-module dependency analysis waits until individual module analyses complete.

The priority system allows the organizer to focus computational resources on critical documentation sections first. When a user requests specific arc42 sections or when certain code changes affect particular architectural areas, the organizer can prioritize those tasks accordingly.

RAG AND GRAPHRAG IMPLEMENTATION

The dual storage system combining traditional RAG with GraphRAG addresses the fundamental challenge of maintaining both detailed textual information and structural relationships within large codebases. This hybrid approach enables the documentation agent to answer questions that require both deep textual understanding and broad structural awareness.

The traditional RAG component stores code files, documentation fragments, and Architecture Decision Records as vector embeddings. When the system needs to understand implementation details or retrieve specific code examples, it queries this vector database to find semantically similar content. The embedding model transforms code and text into high-dimensional vectors where semantic similarity corresponds to geometric proximity.

The GraphRAG component maintains a graph database representing the codebase structure. Nodes in this graph represent code entities such as classes, functions, modules, and packages. Edges represent relationships including inheritance, composition, function calls, and dependencies. This graph structure enables the system to perform sophisticated queries about architectural patterns, dependency chains, and component interactions that would be difficult or impossible with pure vector similarity search.

Here is an implementation of the hybrid RAG system:

import numpy as np
from typing import List, Dict, Tuple, Optional, Set
import hashlib
import json
from dataclasses import dataclass
from collections import defaultdict

@dataclass
class CodeEntity:
    entity_id: str
    entity_type: str
    name: str
    file_path: str
    content: str
    metadata: Dict[str, any]

@dataclass
class Relationship:
    source_id: str
    target_id: str
    relationship_type: str
    metadata: Dict[str, any]

class VectorStore:
    def __init__(self, embedding_dimension: int = 768):
        self.embedding_dimension = embedding_dimension
        self.vectors: Dict[str, np.ndarray] = {}
        self.documents: Dict[str, str] = {}
        self.metadata: Dict[str, Dict] = {}
        
    def add_document(self, doc_id: str, content: str, embedding: np.ndarray, 
                     metadata: Optional[Dict] = None) -> None:
        """Store a document with its vector embedding and metadata."""
        if embedding.shape[0] != self.embedding_dimension:
            raise ValueError(f"Embedding dimension mismatch: expected {self.embedding_dimension}")
        
        self.vectors[doc_id] = embedding
        self.documents[doc_id] = content
        self.metadata[doc_id] = metadata or {}
        
    def similarity_search(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
        """Find the most similar documents to the query embedding using cosine similarity."""
        if not self.vectors:
            return []
        
        query_norm = np.linalg.norm(query_embedding)
        if query_norm == 0:
            return []
        
        similarities = []
        for doc_id, doc_embedding in self.vectors.items():
            doc_norm = np.linalg.norm(doc_embedding)
            if doc_norm == 0:
                continue
            
            cosine_sim = np.dot(query_embedding, doc_embedding) / (query_norm * doc_norm)
            similarities.append((doc_id, float(cosine_sim)))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:top_k]
    
    def get_document(self, doc_id: str) -> Optional[Tuple[str, Dict]]:
        """Retrieve a document and its metadata by ID."""
        if doc_id not in self.documents:
            return None
        return self.documents[doc_id], self.metadata[doc_id]

class GraphStore:
    def __init__(self):
        self.nodes: Dict[str, CodeEntity] = {}
        self.edges: List[Relationship] = []
        self.adjacency_list: Dict[str, List[Tuple[str, str]]] = defaultdict(list)
        self.reverse_adjacency_list: Dict[str, List[Tuple[str, str]]] = defaultdict(list)
        
    def add_node(self, entity: CodeEntity) -> None:
        """Add a code entity node to the graph."""
        self.nodes[entity.entity_id] = entity
        
    def add_edge(self, relationship: Relationship) -> None:
        """Add a relationship edge between two code entities."""
        if relationship.source_id not in self.nodes:
            raise ValueError(f"Source node {relationship.source_id} does not exist")
        if relationship.target_id not in self.nodes:
            raise ValueError(f"Target node {relationship.target_id} does not exist")
        
        self.edges.append(relationship)
        self.adjacency_list[relationship.source_id].append(
            (relationship.target_id, relationship.relationship_type)
        )
        self.reverse_adjacency_list[relationship.target_id].append(
            (relationship.source_id, relationship.relationship_type)
        )
    
    def get_dependencies(self, entity_id: str, max_depth: int = 3) -> Set[str]:
        """Retrieve all dependencies of an entity up to a specified depth."""
        if entity_id not in self.nodes:
            return set()
        
        dependencies = set()
        visited = set()
        queue = [(entity_id, 0)]
        
        while queue:
            current_id, depth = queue.pop(0)
            if current_id in visited or depth > max_depth:
                continue
            
            visited.add(current_id)
            if current_id != entity_id:
                dependencies.add(current_id)
            
            for neighbor_id, rel_type in self.adjacency_list[current_id]:
                if rel_type in ["depends_on", "imports", "calls"]:
                    queue.append((neighbor_id, depth + 1))
        
        return dependencies
    
    def get_dependents(self, entity_id: str) -> Set[str]:
        """Find all entities that depend on the given entity."""
        if entity_id not in self.nodes:
            return set()
        
        dependents = set()
        for source_id, rel_type in self.reverse_adjacency_list[entity_id]:
            if rel_type in ["depends_on", "imports", "calls"]:
                dependents.add(source_id)
        
        return dependents
    
    def find_path(self, source_id: str, target_id: str) -> Optional[List[str]]:
        """Find a path between two entities using breadth-first search."""
        if source_id not in self.nodes or target_id not in self.nodes:
            return None
        
        if source_id == target_id:
            return [source_id]
        
        visited = set()
        queue = [(source_id, [source_id])]
        
        while queue:
            current_id, path = queue.pop(0)
            if current_id in visited:
                continue
            
            visited.add(current_id)
            
            for neighbor_id, _ in self.adjacency_list[current_id]:
                if neighbor_id == target_id:
                    return path + [neighbor_id]
                if neighbor_id not in visited:
                    queue.append((neighbor_id, path + [neighbor_id]))
        
        return None
    
    def get_component_cluster(self, entity_id: str, similarity_threshold: float = 0.7) -> Set[str]:
        """Identify a cluster of closely related components around an entity."""
        if entity_id not in self.nodes:
            return set()
        
        cluster = {entity_id}
        candidates = set(self.adjacency_list[entity_id] + self.reverse_adjacency_list[entity_id])
        
        for candidate_id, _ in candidates:
            common_neighbors = (
                set(n for n, _ in self.adjacency_list[entity_id]) &
                set(n for n, _ in self.adjacency_list[candidate_id])
            )
            
            total_neighbors = (
                set(n for n, _ in self.adjacency_list[entity_id]) |
                set(n for n, _ in self.adjacency_list[candidate_id])
            )
            
            if total_neighbors:
                similarity = len(common_neighbors) / len(total_neighbors)
                if similarity >= similarity_threshold:
                    cluster.add(candidate_id)
        
        return cluster

class HybridRAG:
    def __init__(self, embedding_dimension: int = 768):
        self.vector_store = VectorStore(embedding_dimension)
        self.graph_store = GraphStore()
        
    def index_code_entity(self, entity: CodeEntity, embedding: np.ndarray) -> None:
        """Index a code entity in both vector and graph stores."""
        self.vector_store.add_document(
            entity.entity_id,
            entity.content,
            embedding,
            {
                "type": entity.entity_type,
                "name": entity.name,
                "file_path": entity.file_path,
                **entity.metadata
            }
        )
        self.graph_store.add_node(entity)
    
    def add_relationship(self, relationship: Relationship) -> None:
        """Add a relationship between code entities."""
        self.graph_store.add_edge(relationship)
    
    def semantic_search(self, query_embedding: np.ndarray, top_k: int = 5,
                       entity_type: Optional[str] = None) -> List[Dict]:
        """Perform semantic search with optional type filtering."""
        results = self.vector_store.similarity_search(query_embedding, top_k * 2)
        
        filtered_results = []
        for doc_id, similarity in results:
            content, metadata = self.vector_store.get_document(doc_id)
            if entity_type and metadata.get("type") != entity_type:
                continue
            
            filtered_results.append({
                "entity_id": doc_id,
                "content": content,
                "similarity": similarity,
                "metadata": metadata
            })
            
            if len(filtered_results) >= top_k:
                break
        
        return filtered_results
    
    def get_architectural_context(self, entity_id: str, context_depth: int = 2) -> Dict:
        """Retrieve comprehensive architectural context for an entity."""
        if entity_id not in self.graph_store.nodes:
            return {}
        
        entity = self.graph_store.nodes[entity_id]
        dependencies = self.graph_store.get_dependencies(entity_id, context_depth)
        dependents = self.graph_store.get_dependents(entity_id)
        cluster = self.graph_store.get_component_cluster(entity_id)
        
        context = {
            "entity": {
                "id": entity.entity_id,
                "type": entity.entity_type,
                "name": entity.name,
                "file_path": entity.file_path
            },
            "dependencies": [
                {
                    "id": dep_id,
                    "name": self.graph_store.nodes[dep_id].name,
                    "type": self.graph_store.nodes[dep_id].entity_type
                }
                for dep_id in dependencies
            ],
            "dependents": [
                {
                    "id": dep_id,
                    "name": self.graph_store.nodes[dep_id].name,
                    "type": self.graph_store.nodes[dep_id].entity_type
                }
                for dep_id in dependents
            ],
            "related_components": [
                {
                    "id": comp_id,
                    "name": self.graph_store.nodes[comp_id].name,
                    "type": self.graph_store.nodes[comp_id].entity_type
                }
                for comp_id in cluster if comp_id != entity_id
            ]
        }
        
        return context

This hybrid RAG implementation provides the foundation for intelligent code analysis and documentation generation. The vector store enables semantic search across code and documentation, allowing the system to find relevant examples and explanations even when exact keyword matches do not exist. The graph store captures the structural relationships that define the architecture, enabling queries about component dependencies, impact analysis for changes, and identification of architectural patterns.

The architectural context retrieval function demonstrates how the two stores work together. When documenting a particular component, the system can retrieve semantically similar components from the vector store while simultaneously analyzing its position in the dependency graph to understand its architectural role.

CODE ANALYSIS ENGINE

The code analysis engine forms the foundation of the documentation system, transforming raw source code into structured information that higher-level agents can process. This engine must handle multiple programming languages, extract meaningful structural information, and identify both explicit and implicit architectural elements.

The analysis process operates in multiple phases. The initial parsing phase uses language-specific parsers to generate Abstract Syntax Trees representing the code structure. The extraction phase traverses these ASTs to identify code entities such as classes, functions, modules, and their relationships. The enrichment phase adds semantic information including complexity metrics, coupling measurements, and cohesion indicators. Finally, the indexing phase stores all extracted information in both the vector and graph RAG stores.

Language-specific analyzers handle the peculiarities of different programming languages while presenting a uniform interface to the rest of the system. A Python analyzer understands decorators, metaclasses, and dynamic typing. A Java analyzer recognizes annotations, generics, and interface hierarchies. A JavaScript analyzer handles prototypal inheritance and closure patterns. Each analyzer extracts the same core information types but uses language-appropriate techniques.

Here is an implementation of the core code analysis engine:

import ast
import os
from typing import List, Dict, Set, Optional, Any
from pathlib import Path
import re
from dataclasses import dataclass, field

@dataclass
class FunctionInfo:
    name: str
    parameters: List[str]
    return_type: Optional[str]
    docstring: Optional[str]
    line_start: int
    line_end: int
    calls: Set[str] = field(default_factory=set)
    complexity: int = 0

@dataclass
class ClassInfo:
    name: str
    bases: List[str]
    methods: List[FunctionInfo]
    attributes: List[str]
    docstring: Optional[str]
    line_start: int
    line_end: int
    decorators: List[str] = field(default_factory=list)

@dataclass
class ModuleInfo:
    file_path: str
    imports: List[str]
    classes: List[ClassInfo]
    functions: List[FunctionInfo]
    module_docstring: Optional[str]
    global_variables: List[str]

class PythonCodeAnalyzer:
    def __init__(self):
        self.current_file = None
        
    def analyze_file(self, file_path: str) -> ModuleInfo:
        """Analyze a Python source file and extract structural information."""
        self.current_file = file_path
        
        with open(file_path, 'r', encoding='utf-8') as f:
            source_code = f.read()
        
        try:
            tree = ast.parse(source_code)
        except SyntaxError as e:
            raise ValueError(f"Syntax error in {file_path}: {str(e)}")
        
        module_info = ModuleInfo(
            file_path=file_path,
            imports=[],
            classes=[],
            functions=[],
            module_docstring=ast.get_docstring(tree),
            global_variables=[]
        )
        
        for node in ast.walk(tree):
            if isinstance(node, ast.Import):
                for alias in node.names:
                    module_info.imports.append(alias.name)
            elif isinstance(node, ast.ImportFrom):
                if node.module:
                    module_info.imports.append(node.module)
        
        for node in tree.body:
            if isinstance(node, ast.ClassDef):
                class_info = self.analyze_class(node, source_code)
                module_info.classes.append(class_info)
            elif isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
                function_info = self.analyze_function(node, source_code)
                module_info.functions.append(function_info)
            elif isinstance(node, ast.Assign):
                for target in node.targets:
                    if isinstance(target, ast.Name):
                        module_info.global_variables.append(target.id)
        
        return module_info
    
    def analyze_class(self, node: ast.ClassDef, source_code: str) -> ClassInfo:
        """Extract detailed information about a class definition."""
        bases = []
        for base in node.bases:
            if isinstance(base, ast.Name):
                bases.append(base.id)
            elif isinstance(base, ast.Attribute):
                bases.append(self.get_full_name(base))
        
        decorators = []
        for decorator in node.decorator_list:
            if isinstance(decorator, ast.Name):
                decorators.append(decorator.id)
            elif isinstance(decorator, ast.Call) and isinstance(decorator.func, ast.Name):
                decorators.append(decorator.func.id)
        
        methods = []
        attributes = []
        
        for item in node.body:
            if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
                method_info = self.analyze_function(item, source_code)
                methods.append(method_info)
            elif isinstance(item, ast.Assign):
                for target in item.targets:
                    if isinstance(target, ast.Name):
                        attributes.append(target.id)
        
        return ClassInfo(
            name=node.name,
            bases=bases,
            methods=methods,
            attributes=attributes,
            docstring=ast.get_docstring(node),
            line_start=node.lineno,
            line_end=node.end_lineno or node.lineno,
            decorators=decorators
        )
    
    def analyze_function(self, node: ast.FunctionDef, source_code: str) -> FunctionInfo:
        """Extract detailed information about a function or method."""
        parameters = []
        for arg in node.args.args:
            parameters.append(arg.arg)
        
        return_type = None
        if node.returns:
            if isinstance(node.returns, ast.Name):
                return_type = node.returns.id
            elif isinstance(node.returns, ast.Constant):
                return_type = str(node.returns.value)
        
        calls = set()
        for subnode in ast.walk(node):
            if isinstance(subnode, ast.Call):
                if isinstance(subnode.func, ast.Name):
                    calls.add(subnode.func.id)
                elif isinstance(subnode.func, ast.Attribute):
                    calls.add(self.get_full_name(subnode.func))
        
        complexity = self.calculate_cyclomatic_complexity(node)
        
        return FunctionInfo(
            name=node.name,
            parameters=parameters,
            return_type=return_type,
            docstring=ast.get_docstring(node),
            line_start=node.lineno,
            line_end=node.end_lineno or node.lineno,
            calls=calls,
            complexity=complexity
        )
    
    def get_full_name(self, node: ast.Attribute) -> str:
        """Reconstruct the full dotted name from an Attribute node."""
        parts = []
        current = node
        while isinstance(current, ast.Attribute):
            parts.append(current.attr)
            current = current.value
        if isinstance(current, ast.Name):
            parts.append(current.id)
        return '.'.join(reversed(parts))
    
    def calculate_cyclomatic_complexity(self, node: ast.FunctionDef) -> int:
        """Calculate the cyclomatic complexity of a function."""
        complexity = 1
        for subnode in ast.walk(node):
            if isinstance(subnode, (ast.If, ast.While, ast.For, ast.AsyncFor)):
                complexity += 1
            elif isinstance(subnode, ast.ExceptHandler):
                complexity += 1
            elif isinstance(subnode, ast.BoolOp):
                complexity += len(subnode.values) - 1
        return complexity

class CodebaseAnalyzer:
    def __init__(self, hybrid_rag: HybridRAG):
        self.hybrid_rag = hybrid_rag
        self.python_analyzer = PythonCodeAnalyzer()
        self.file_analyzers = {
            '.py': self.python_analyzer
        }
        
    def analyze_codebase(self, root_path: str, exclude_patterns: Optional[List[str]] = None) -> Dict[str, ModuleInfo]:
        """Recursively analyze all code files in a directory tree."""
        if exclude_patterns is None:
            exclude_patterns = ['__pycache__', '.git', 'venv', 'node_modules', '.pytest_cache']
        
        module_infos = {}
        root = Path(root_path)
        
        for file_path in root.rglob('*'):
            if not file_path.is_file():
                continue
            
            if any(pattern in str(file_path) for pattern in exclude_patterns):
                continue
            
            suffix = file_path.suffix
            if suffix in self.file_analyzers:
                try:
                    analyzer = self.file_analyzers[suffix]
                    module_info = analyzer.analyze_file(str(file_path))
                    module_infos[str(file_path)] = module_info
                except Exception as e:
                    print(f"Error analyzing {file_path}: {str(e)}")
        
        return module_infos
    
    def build_dependency_graph(self, module_infos: Dict[str, ModuleInfo]) -> None:
        """Construct the dependency graph from analyzed modules."""
        file_to_module = {}
        for file_path, module_info in module_infos.items():
            module_name = self.get_module_name(file_path)
            file_to_module[module_name] = file_path
        
        for file_path, module_info in module_infos.items():
            module_entity_id = self.create_entity_id(file_path, "module")
            module_entity = CodeEntity(
                entity_id=module_entity_id,
                entity_type="module",
                name=self.get_module_name(file_path),
                file_path=file_path,
                content=module_info.module_docstring or "",
                metadata={"imports": module_info.imports}
            )
            
            embedding = self.generate_embedding(module_entity.content)
            self.hybrid_rag.index_code_entity(module_entity, embedding)
            
            for class_info in module_info.classes:
                class_entity_id = self.create_entity_id(file_path, f"class.{class_info.name}")
                class_content = f"{class_info.docstring or ''}\nBases: {', '.join(class_info.bases)}"
                
                class_entity = CodeEntity(
                    entity_id=class_entity_id,
                    entity_type="class",
                    name=class_info.name,
                    file_path=file_path,
                    content=class_content,
                    metadata={
                        "bases": class_info.bases,
                        "methods": [m.name for m in class_info.methods],
                        "decorators": class_info.decorators
                    }
                )
                
                embedding = self.generate_embedding(class_content)
                self.hybrid_rag.index_code_entity(class_entity, embedding)
                
                self.hybrid_rag.add_relationship(Relationship(
                    source_id=module_entity_id,
                    target_id=class_entity_id,
                    relationship_type="contains",
                    metadata={}
                ))
                
                for base in class_info.bases:
                    base_entity_id = self.find_entity_by_name(base, "class", module_infos)
                    if base_entity_id:
                        self.hybrid_rag.add_relationship(Relationship(
                            source_id=class_entity_id,
                            target_id=base_entity_id,
                            relationship_type="inherits",
                            metadata={}
                        ))
            
            for import_name in module_info.imports:
                if import_name in file_to_module:
                    imported_file = file_to_module[import_name]
                    imported_entity_id = self.create_entity_id(imported_file, "module")
                    
                    self.hybrid_rag.add_relationship(Relationship(
                        source_id=module_entity_id,
                        target_id=imported_entity_id,
                        relationship_type="imports",
                        metadata={}
                    ))
    
    def create_entity_id(self, file_path: str, entity_name: str) -> str:
        """Generate a unique identifier for a code entity."""
        combined = f"{file_path}::{entity_name}"
        return hashlib.sha256(combined.encode()).hexdigest()[:16]
    
    def get_module_name(self, file_path: str) -> str:
        """Extract the module name from a file path."""
        path = Path(file_path)
        return path.stem
    
    def find_entity_by_name(self, name: str, entity_type: str, 
                           module_infos: Dict[str, ModuleInfo]) -> Optional[str]:
        """Locate an entity ID by searching through all analyzed modules."""
        for file_path, module_info in module_infos.items():
            if entity_type == "class":
                for class_info in module_info.classes:
                    if class_info.name == name:
                        return self.create_entity_id(file_path, f"class.{name}")
        return None
    
    def generate_embedding(self, text: str) -> np.ndarray:
        """Generate a vector embedding for text content."""
        np.random.seed(hash(text) % (2**32))
        return np.random.randn(768)

This code analysis engine provides comprehensive extraction of structural information from Python codebases. The analyzer identifies not just the presence of classes and functions but also their relationships, complexity metrics, and documentation. The integration with the hybrid RAG system ensures that all extracted information becomes immediately queryable both semantically and structurally.

The dependency graph construction is particularly important for architectural documentation. By tracking imports, inheritance relationships, and function calls, the system builds a complete picture of how components interact. This information becomes essential when generating arc42 sections such as the building block view and the runtime view.

PATTERN RECOGNITION SYSTEM

Recognizing design patterns and architectural patterns represents one of the most valuable capabilities of the documentation agent. While individual code elements are relatively straightforward to identify through AST analysis, patterns emerge from the relationships and interactions between multiple elements. The pattern recognition system must identify both well-known catalog patterns and project-specific architectural patterns.

The pattern recognition process operates at multiple levels of abstraction. At the code level, the system identifies design patterns such as Singleton, Factory, Observer, Strategy, and Decorator. At the architectural level, it recognizes patterns such as Layered Architecture, Microservices, Event-Driven Architecture, and Hexagonal Architecture. The system also identifies cross-cutting concerns such as logging, authentication, and caching that may not follow traditional pattern structures but represent important architectural decisions.

Pattern recognition combines rule-based detection with machine learning approaches. Rule-based detectors encode the structural characteristics of known patterns. For example, the Singleton pattern detector looks for classes with private constructors, static instance variables, and static accessor methods. The Observer pattern detector identifies subject-observer relationships through registration methods and notification mechanisms. Machine learning models trained on labeled code examples can identify pattern variations and project-specific pattern implementations that may not match textbook definitions exactly.

Here is an implementation of the pattern recognition system:

from typing import List, Dict, Set, Optional, Tuple
from dataclasses import dataclass
from enum import Enum

class PatternType(Enum):
    SINGLETON = "Singleton"
    FACTORY = "Factory"
    OBSERVER = "Observer"
    STRATEGY = "Strategy"
    DECORATOR = "Decorator"
    ADAPTER = "Adapter"
    REPOSITORY = "Repository"
    DEPENDENCY_INJECTION = "Dependency Injection"
    LAYERED_ARCHITECTURE = "Layered Architecture"
    MVC = "Model-View-Controller"
    MICROSERVICES = "Microservices"

@dataclass
class PatternInstance:
    pattern_type: PatternType
    confidence: float
    entities: List[str]
    description: str
    evidence: Dict[str, any]

class PatternRecognizer:
    def __init__(self, hybrid_rag: HybridRAG):
        self.hybrid_rag = hybrid_rag
        self.pattern_detectors = {
            PatternType.SINGLETON: self.detect_singleton,
            PatternType.FACTORY: self.detect_factory,
            PatternType.OBSERVER: self.detect_observer,
            PatternType.STRATEGY: self.detect_strategy,
            PatternType.DECORATOR: self.detect_decorator,
            PatternType.REPOSITORY: self.detect_repository,
            PatternType.LAYERED_ARCHITECTURE: self.detect_layered_architecture
        }
    
    def recognize_patterns(self) -> List[PatternInstance]:
        """Execute all pattern detectors and collect identified patterns."""
        all_patterns = []
        
        for pattern_type, detector in self.pattern_detectors.items():
            patterns = detector()
            all_patterns.extend(patterns)
        
        all_patterns.sort(key=lambda p: p.confidence, reverse=True)
        return all_patterns
    
    def detect_singleton(self) -> List[PatternInstance]:
        """Detect Singleton pattern implementations."""
        patterns = []
        
        for entity_id, entity in self.hybrid_rag.graph_store.nodes.items():
            if entity.entity_type != "class":
                continue
            
            metadata = entity.metadata
            methods = metadata.get("methods", [])
            
            has_instance_method = any(
                method.lower() in ["get_instance", "getinstance", "instance"]
                for method in methods
            )
            
            has_init_control = "__new__" in methods or "__init__" in methods
            
            class_name_suggests_singleton = "singleton" in entity.name.lower()
            
            if has_instance_method or class_name_suggests_singleton:
                confidence = 0.0
                evidence = {}
                
                if has_instance_method:
                    confidence += 0.6
                    evidence["instance_method"] = True
                
                if has_init_control:
                    confidence += 0.2
                    evidence["init_control"] = True
                
                if class_name_suggests_singleton:
                    confidence += 0.2
                    evidence["naming_convention"] = True
                
                if confidence >= 0.5:
                    patterns.append(PatternInstance(
                        pattern_type=PatternType.SINGLETON,
                        confidence=min(confidence, 1.0),
                        entities=[entity_id],
                        description=f"Class {entity.name} implements the Singleton pattern",
                        evidence=evidence
                    ))
        
        return patterns
    
    def detect_factory(self) -> List[PatternInstance]:
        """Detect Factory pattern implementations."""
        patterns = []
        
        for entity_id, entity in self.hybrid_rag.graph_store.nodes.items():
            if entity.entity_type != "class":
                continue
            
            metadata = entity.metadata
            methods = metadata.get("methods", [])
            
            factory_method_names = ["create", "make", "build", "get", "new"]
            has_factory_method = any(
                any(factory_name in method.lower() for factory_name in factory_method_names)
                for method in methods
            )
            
            class_name_suggests_factory = any(
                keyword in entity.name.lower()
                for keyword in ["factory", "builder", "creator"]
            )
            
            if has_factory_method or class_name_suggests_factory:
                confidence = 0.0
                evidence = {}
                
                if has_factory_method:
                    confidence += 0.5
                    evidence["factory_methods"] = [
                        m for m in methods
                        if any(fn in m.lower() for fn in factory_method_names)
                    ]
                
                if class_name_suggests_factory:
                    confidence += 0.5
                    evidence["naming_convention"] = True
                
                if confidence >= 0.5:
                    patterns.append(PatternInstance(
                        pattern_type=PatternType.FACTORY,
                        confidence=min(confidence, 1.0),
                        entities=[entity_id],
                        description=f"Class {entity.name} implements the Factory pattern",
                        evidence=evidence
                    ))
        
        return patterns
    
    def detect_observer(self) -> List[PatternInstance]:
        """Detect Observer pattern implementations."""
        patterns = []
        
        for entity_id, entity in self.hybrid_rag.graph_store.nodes.items():
            if entity.entity_type != "class":
                continue
            
            metadata = entity.metadata
            methods = metadata.get("methods", [])
            
            has_subscribe = any(
                keyword in method.lower()
                for method in methods
                for keyword in ["subscribe", "attach", "register", "add_observer", "add_listener"]
            )
            
            has_unsubscribe = any(
                keyword in method.lower()
                for method in methods
                for keyword in ["unsubscribe", "detach", "unregister", "remove_observer", "remove_listener"]
            )
            
            has_notify = any(
                keyword in method.lower()
                for method in methods
                for keyword in ["notify", "update", "fire", "trigger", "emit"]
            )
            
            observer_score = sum([has_subscribe, has_unsubscribe, has_notify])
            
            if observer_score >= 2:
                confidence = observer_score / 3.0
                evidence = {
                    "has_subscribe": has_subscribe,
                    "has_unsubscribe": has_unsubscribe,
                    "has_notify": has_notify
                }
                
                patterns.append(PatternInstance(
                    pattern_type=PatternType.OBSERVER,
                    confidence=confidence,
                    entities=[entity_id],
                    description=f"Class {entity.name} implements the Observer pattern",
                    evidence=evidence
                ))
        
        return patterns
    
    def detect_strategy(self) -> List[PatternInstance]:
        """Detect Strategy pattern implementations."""
        patterns = []
        
        for entity_id, entity in self.hybrid_rag.graph_store.nodes.items():
            if entity.entity_type != "class":
                continue
            
            metadata = entity.metadata
            bases = metadata.get("bases", [])
            
            if not bases:
                continue
            
            siblings = []
            for other_id, other_entity in self.hybrid_rag.graph_store.nodes.items():
                if other_entity.entity_type != "class" or other_id == entity_id:
                    continue
                other_bases = other_entity.metadata.get("bases", [])
                if set(bases) & set(other_bases):
                    siblings.append(other_id)
            
            if len(siblings) >= 2:
                base_name = bases[0] if bases else "unknown"
                strategy_keywords = ["strategy", "algorithm", "policy"]
                
                name_suggests_strategy = any(
                    keyword in base_name.lower() or keyword in entity.name.lower()
                    for keyword in strategy_keywords
                )
                
                confidence = 0.4 + (0.1 * min(len(siblings), 5))
                if name_suggests_strategy:
                    confidence += 0.3
                
                evidence = {
                    "base_class": base_name,
                    "sibling_count": len(siblings),
                    "siblings": siblings[:5]
                }
                
                patterns.append(PatternInstance(
                    pattern_type=PatternType.STRATEGY,
                    confidence=min(confidence, 1.0),
                    entities=[entity_id] + siblings,
                    description=f"Classes implementing {base_name} follow the Strategy pattern",
                    evidence=evidence
                ))
        
        return patterns
    
    def detect_decorator(self) -> List[PatternInstance]:
        """Detect Decorator pattern implementations."""
        patterns = []
        
        for entity_id, entity in self.hybrid_rag.graph_store.nodes.items():
            if entity.entity_type != "class":
                continue
            
            metadata = entity.metadata
            decorators = metadata.get("decorators", [])
            
            if decorators:
                patterns.append(PatternInstance(
                    pattern_type=PatternType.DECORATOR,
                    confidence=0.7,
                    entities=[entity_id],
                    description=f"Class {entity.name} uses decorators: {', '.join(decorators)}",
                    evidence={"decorators": decorators}
                ))
        
        return patterns
    
    def detect_repository(self) -> List[PatternInstance]:
        """Detect Repository pattern implementations."""
        patterns = []
        
        for entity_id, entity in self.hybrid_rag.graph_store.nodes.items():
            if entity.entity_type != "class":
                continue
            
            metadata = entity.metadata
            methods = metadata.get("methods", [])
            
            crud_methods = {
                "create": ["create", "add", "insert", "save"],
                "read": ["get", "find", "query", "select", "fetch"],
                "update": ["update", "modify", "edit"],
                "delete": ["delete", "remove"]
            }
            
            crud_score = 0
            found_operations = {}
            
            for operation, keywords in crud_methods.items():
                if any(any(kw in method.lower() for kw in keywords) for method in methods):
                    crud_score += 1
                    found_operations[operation] = True
            
            name_suggests_repository = any(
                keyword in entity.name.lower()
                for keyword in ["repository", "dao", "store"]
            )
            
            if crud_score >= 3 or (crud_score >= 2 and name_suggests_repository):
                confidence = (crud_score / 4.0) * 0.7
                if name_suggests_repository:
                    confidence += 0.3
                
                patterns.append(PatternInstance(
                    pattern_type=PatternType.REPOSITORY,
                    confidence=min(confidence, 1.0),
                    entities=[entity_id],
                    description=f"Class {entity.name} implements the Repository pattern",
                    evidence={
                        "crud_operations": found_operations,
                        "naming_convention": name_suggests_repository
                    }
                ))
        
        return patterns
    
    def detect_layered_architecture(self) -> List[PatternInstance]:
        """Detect layered architecture patterns based on module organization."""
        patterns = []
        
        layer_keywords = {
            "presentation": ["view", "ui", "controller", "api", "endpoint"],
            "business": ["service", "business", "domain", "logic"],
            "data": ["repository", "dao", "model", "entity", "database"]
        }
        
        detected_layers = {}
        
        for entity_id, entity in self.hybrid_rag.graph_store.nodes.items():
            if entity.entity_type != "module":
                continue
            
            module_path = entity.file_path.lower()
            
            for layer_name, keywords in layer_keywords.items():
                if any(keyword in module_path for keyword in keywords):
                    if layer_name not in detected_layers:
                        detected_layers[layer_name] = []
                    detected_layers[layer_name].append(entity_id)
        
        if len(detected_layers) >= 2:
            confidence = len(detected_layers) / 3.0
            
            patterns.append(PatternInstance(
                pattern_type=PatternType.LAYERED_ARCHITECTURE,
                confidence=confidence,
                entities=[eid for entities in detected_layers.values() for eid in entities],
                description="The codebase follows a layered architecture pattern",
                evidence={
                    "layers": {
                        layer: len(entities)
                        for layer, entities in detected_layers.items()
                    }
                }
            ))
        
        return patterns

This pattern recognition system provides automated identification of common design and architectural patterns. The detectors use heuristics based on naming conventions, method signatures, class relationships, and structural characteristics. Each detector calculates a confidence score reflecting the strength of the evidence for the pattern.

The pattern recognition results feed directly into the documentation generation process. When the system identifies a Singleton pattern, it can include this information in the design decisions section of the arc42 documentation. When it detects a layered architecture, it can structure the building block view accordingly. This automated pattern recognition saves significant manual effort and ensures that implicit design decisions become explicit in the documentation.

LLM ABSTRACTION LAYER FOR MULTI-PLATFORM SUPPORT

Supporting multiple LLM providers and hardware configurations requires a flexible abstraction layer that isolates the core documentation logic from platform-specific details. This layer must handle differences in API interfaces, model formats, inference engines, and hardware acceleration while presenting a uniform interface to the rest of the system.

The abstraction layer addresses several key challenges. Different LLM providers use different API formats, authentication mechanisms, and response structures. Local models require different inference engines depending on the hardware platform. GPU acceleration varies significantly between Nvidia CUDA, AMD ROCm, Intel extensions, and Apple Metal Performance Shaders. The abstraction layer must detect available hardware, select appropriate inference engines, and handle model loading and inference consistently across all configurations.

The implementation uses a provider pattern where concrete provider classes implement a common interface. Each provider handles the specifics of communicating with a particular LLM backend while exposing standard methods for text generation, embedding creation, and model management. A factory class detects available hardware and instantiates the appropriate provider based on configuration and availability.

Here is an implementation of the LLM abstraction layer:

from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Any
import platform
import subprocess
from dataclasses import dataclass
from enum import Enum

class HardwareType(Enum):
    CUDA = "cuda"
    ROCM = "rocm"
    MPS = "mps"
    INTEL = "intel"
    CPU = "cpu"

@dataclass
class LLMConfig:
    model_name: str
    temperature: float = 0.7
    max_tokens: int = 2048
    top_p: float = 0.9
    frequency_penalty: float = 0.0
    presence_penalty: float = 0.0

@dataclass
class GenerationResult:
    text: str
    tokens_used: int
    finish_reason: str
    metadata: Dict[str, Any]

class LLMProvider(ABC):
    def __init__(self, config: LLMConfig):
        self.config = config
        
    @abstractmethod
    async def generate(self, prompt: str, system_prompt: Optional[str] = None) -> GenerationResult:
        """Generate text completion from a prompt."""
        pass
    
    @abstractmethod
    async def generate_embedding(self, text: str) -> np.ndarray:
        """Generate vector embedding for text."""
        pass
    
    @abstractmethod
    def is_available(self) -> bool:
        """Check if this provider can run on the current system."""
        pass

class OpenAIProvider(LLMProvider):
    def __init__(self, config: LLMConfig, api_key: str):
        super().__init__(config)
        self.api_key = api_key
        
    async def generate(self, prompt: str, system_prompt: Optional[str] = None) -> GenerationResult:
        """Generate text using OpenAI API."""
        import openai
        
        openai.api_key = self.api_key
        
        messages = []
        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})
        messages.append({"role": "user", "content": prompt})
        
        response = await openai.ChatCompletion.acreate(
            model=self.config.model_name,
            messages=messages,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens,
            top_p=self.config.top_p,
            frequency_penalty=self.config.frequency_penalty,
            presence_penalty=self.config.presence_penalty
        )
        
        return GenerationResult(
            text=response.choices[0].message.content,
            tokens_used=response.usage.total_tokens,
            finish_reason=response.choices[0].finish_reason,
            metadata={"model": response.model}
        )
    
    async def generate_embedding(self, text: str) -> np.ndarray:
        """Generate embedding using OpenAI API."""
        import openai
        
        openai.api_key = self.api_key
        
        response = await openai.Embedding.acreate(
            model="text-embedding-ada-002",
            input=text
        )
        
        return np.array(response.data[0].embedding)
    
    def is_available(self) -> bool:
        """OpenAI provider is available if API key is set."""
        return bool(self.api_key)

class LocalLLMProvider(LLMProvider):
    def __init__(self, config: LLMConfig, model_path: str, hardware_type: HardwareType):
        super().__init__(config)
        self.model_path = model_path
        self.hardware_type = hardware_type
        self.model = None
        self.tokenizer = None
        
    def load_model(self) -> None:
        """Load the model with appropriate hardware acceleration."""
        if self.hardware_type == HardwareType.CUDA:
            self.load_cuda_model()
        elif self.hardware_type == HardwareType.ROCM:
            self.load_rocm_model()
        elif self.hardware_type == HardwareType.MPS:
            self.load_mps_model()
        elif self.hardware_type == HardwareType.INTEL:
            self.load_intel_model()
        else:
            self.load_cpu_model()
    
    def load_cuda_model(self) -> None:
        """Load model with NVIDIA CUDA acceleration."""
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_path,
            torch_dtype=torch.float16,
            device_map="auto"
        )
    
    def load_rocm_model(self) -> None:
        """Load model with AMD ROCm acceleration."""
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_path,
            torch_dtype=torch.float16,
            device_map="auto"
        )
        
        if torch.cuda.is_available():
            self.model = self.model.to("cuda")
    
    def load_mps_model(self) -> None:
        """Load model with Apple Metal Performance Shaders."""
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_path,
            torch_dtype=torch.float16
        )
        
        if torch.backends.mps.is_available():
            self.model = self.model.to("mps")
    
    def load_intel_model(self) -> None:
        """Load model with Intel GPU acceleration."""
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
        import intel_extension_for_pytorch as ipex
        
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_path,
            torch_dtype=torch.bfloat16
        )
        
        self.model = ipex.optimize(self.model)
    
    def load_cpu_model(self) -> None:
        """Load model for CPU inference."""
        from transformers import AutoModelForCausalLM, AutoTokenizer
        
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
        self.model = AutoModelForCausalLM.from_pretrained(self.model_path)
    
    async def generate(self, prompt: str, system_prompt: Optional[str] = None) -> GenerationResult:
        """Generate text using local model."""
        import torch
        
        if self.model is None:
            self.load_model()
        
        full_prompt = prompt
        if system_prompt:
            full_prompt = f"{system_prompt}\n\n{prompt}"
        
        inputs = self.tokenizer(full_prompt, return_tensors="pt")
        
        if self.hardware_type == HardwareType.CUDA or self.hardware_type == HardwareType.ROCM:
            inputs = {k: v.to("cuda") for k, v in inputs.items()}
        elif self.hardware_type == HardwareType.MPS:
            inputs = {k: v.to("mps") for k, v in inputs.items()}
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=self.config.max_tokens,
                temperature=self.config.temperature,
                top_p=self.config.top_p,
                do_sample=True
            )
        
        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        generated_text = generated_text[len(full_prompt):].strip()
        
        return GenerationResult(
            text=generated_text,
            tokens_used=len(outputs[0]),
            finish_reason="length",
            metadata={"model_path": self.model_path, "hardware": self.hardware_type.value}
        )
    
    async def generate_embedding(self, text: str) -> np.ndarray:
        """Generate embedding using local model."""
        import torch
        from transformers import AutoModel
        
        embedding_model = AutoModel.from_pretrained(self.model_path)
        
        if self.hardware_type == HardwareType.CUDA or self.hardware_type == HardwareType.ROCM:
            embedding_model = embedding_model.to("cuda")
        elif self.hardware_type == HardwareType.MPS:
            embedding_model = embedding_model.to("mps")
        
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
        
        if self.hardware_type == HardwareType.CUDA or self.hardware_type == HardwareType.ROCM:
            inputs = {k: v.to("cuda") for k, v in inputs.items()}
        elif self.hardware_type == HardwareType.MPS:
            inputs = {k: v.to("mps") for k, v in inputs.items()}
        
        with torch.no_grad():
            outputs = embedding_model(**inputs)
        
        embeddings = outputs.last_hidden_state.mean(dim=1).cpu().numpy()
        return embeddings[0]
    
    def is_available(self) -> bool:
        """Check if local model files exist."""
        import os
        return os.path.exists(self.model_path)

class HardwareDetector:
    @staticmethod
    def detect_hardware() -> HardwareType:
        """Detect available hardware acceleration."""
        try:
            import torch
            
            if torch.cuda.is_available():
                device_name = torch.cuda.get_device_name(0).lower()
                if "nvidia" in device_name:
                    return HardwareType.CUDA
                elif "amd" in device_name:
                    return HardwareType.ROCM
            
            if torch.backends.mps.is_available():
                return HardwareType.MPS
            
            try:
                import intel_extension_for_pytorch
                return HardwareType.INTEL
            except ImportError:
                pass
            
        except ImportError:
            pass
        
        return HardwareType.CPU
    
    @staticmethod
    def get_hardware_info() -> Dict[str, Any]:
        """Gather detailed hardware information."""
        info = {
            "platform": platform.system(),
            "processor": platform.processor(),
            "hardware_type": HardwareDetector.detect_hardware().value
        }
        
        try:
            import torch
            info["torch_version"] = torch.__version__
            info["cuda_available"] = torch.cuda.is_available()
            if torch.cuda.is_available():
                info["cuda_version"] = torch.version.cuda
                info["gpu_count"] = torch.cuda.device_count()
                info["gpu_name"] = torch.cuda.get_device_name(0)
        except ImportError:
            info["torch_available"] = False
        
        return info

class LLMFactory:
    @staticmethod
    def create_provider(provider_type: str, config: LLMConfig, **kwargs) -> LLMProvider:
        """Create an LLM provider based on type and configuration."""
        if provider_type == "openai":
            api_key = kwargs.get("api_key")
            if not api_key:
                raise ValueError("OpenAI provider requires api_key")
            return OpenAIProvider(config, api_key)
        
        elif provider_type == "local":
            model_path = kwargs.get("model_path")
            if not model_path:
                raise ValueError("Local provider requires model_path")
            
            hardware_type = kwargs.get("hardware_type")
            if not hardware_type:
                hardware_type = HardwareDetector.detect_hardware()
            
            return LocalLLMProvider(config, model_path, hardware_type)
        
        else:
            raise ValueError(f"Unknown provider type: {provider_type}")

This LLM abstraction layer provides a unified interface for working with different LLM providers and hardware configurations. The hardware detection automatically identifies available acceleration options, allowing the system to use the best available hardware without manual configuration. The provider pattern ensures that adding support for new LLM providers or hardware platforms requires only implementing a new provider class without modifying the core documentation logic.

The abstraction layer handles the complexity of different inference engines and hardware acceleration libraries. For CUDA-based systems, it uses PyTorch with CUDA support. For AMD ROCm, it leverages ROCm-enabled PyTorch. For Apple Silicon, it uses Metal Performance Shaders through PyTorch's MPS backend. For Intel GPUs, it integrates Intel Extension for PyTorch. This flexibility ensures the documentation agent can run efficiently on diverse hardware configurations.

DOCUMENTATION GENERATION

The documentation generation component synthesizes all analyzed information into comprehensive arc42-formatted documentation. This component must transform technical code analysis results, identified patterns, and architectural insights into human-readable documentation that follows the arc42 template structure.

The arc42 template organizes architecture documentation into twelve sections covering different aspects of the system. The introduction and goals section describes the business context and quality goals. The constraints section documents technical and organizational limitations. The context and scope section defines system boundaries and external interfaces. The solution strategy section outlines fundamental architectural decisions. The building block view section describes the static structure. The runtime view section illustrates dynamic behavior. The deployment view section shows the physical infrastructure. The cross-cutting concepts section addresses recurring themes. The architectural decisions section documents significant choices. The quality requirements section details non-functional requirements. The risks and technical debt section identifies potential problems. The glossary section defines important terms.

The documentation generator uses the LLM to transform structured analysis results into natural language text for each section. It provides the LLM with section-specific prompts that include the analysis results, identified patterns, and any user-provided context such as business goals or design guidelines. The LLM generates coherent narrative text that explains the architecture in terms appropriate for the target audience.

Here is an implementation of the documentation generation system:

from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from enum import Enum
import json

class Arc42Section(Enum):
    INTRODUCTION_GOALS = "1_introduction_goals"
    CONSTRAINTS = "2_constraints"
    CONTEXT_SCOPE = "3_context_scope"
    SOLUTION_STRATEGY = "4_solution_strategy"
    BUILDING_BLOCKS = "5_building_blocks"
    RUNTIME_VIEW = "6_runtime_view"
    DEPLOYMENT_VIEW = "7_deployment_view"
    CROSSCUTTING_CONCEPTS = "8_crosscutting_concepts"
    DECISIONS = "9_decisions"
    QUALITY_REQUIREMENTS = "10_quality_requirements"
    RISKS_DEBT = "11_risks_debt"
    GLOSSARY = "12_glossary"

@dataclass
class DocumentationContext:
    business_goals: Optional[str] = None
    design_guidelines: Optional[str] = None
    requirements: Optional[str] = None
    target_audience: str = "technical team"
    detail_level: str = "comprehensive"

@dataclass
class Arc42Document:
    sections: Dict[Arc42Section, str]
    metadata: Dict[str, Any]
    generation_timestamp: str

class DocumentationGenerator:
    def __init__(self, llm_provider: LLMProvider, hybrid_rag: HybridRAG):
        self.llm_provider = llm_provider
        self.hybrid_rag = hybrid_rag
        self.section_generators = {
            Arc42Section.INTRODUCTION_GOALS: self.generate_introduction_goals,
            Arc42Section.CONSTRAINTS: self.generate_constraints,
            Arc42Section.CONTEXT_SCOPE: self.generate_context_scope,
            Arc42Section.SOLUTION_STRATEGY: self.generate_solution_strategy,
            Arc42Section.BUILDING_BLOCKS: self.generate_building_blocks,
            Arc42Section.RUNTIME_VIEW: self.generate_runtime_view,
            Arc42Section.DEPLOYMENT_VIEW: self.generate_deployment_view,
            Arc42Section.CROSSCUTTING_CONCEPTS: self.generate_crosscutting_concepts,
            Arc42Section.DECISIONS: self.generate_decisions,
            Arc42Section.QUALITY_REQUIREMENTS: self.generate_quality_requirements,
            Arc42Section.RISKS_DEBT: self.generate_risks_debt,
            Arc42Section.GLOSSARY: self.generate_glossary
        }
    
    async def generate_documentation(self, context: DocumentationContext,
                                     patterns: List[PatternInstance]) -> Arc42Document:
        """Generate complete arc42 documentation."""
        from datetime import datetime
        
        sections = {}
        
        for section_type in Arc42Section:
            generator = self.section_generators[section_type]
            section_content = await generator(context, patterns)
            sections[section_type] = section_content
        
        return Arc42Document(
            sections=sections,
            metadata={
                "context": context,
                "patterns_found": len(patterns)
            },
            generation_timestamp=datetime.now().isoformat()
        )
    
    async def generate_introduction_goals(self, context: DocumentationContext,
                                          patterns: List[PatternInstance]) -> str:
        """Generate the introduction and goals section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Introduction and Goals section of arc42 documentation.
Focus on business context, stakeholders, and quality goals."""
        
        user_prompt = f"""Create the Introduction and Goals section for a software system.

Business Goals:
{context.business_goals or 'Not specified'}

Requirements:
{context.requirements or 'Not specified'}

Target Audience: {context.target_audience}

Include:
1. Business context and motivation
2. Key stakeholders and their concerns
3. Top quality goals (performance, security, maintainability, etc.)
4. Essential features and capabilities

Write in a clear, professional style appropriate for {context.target_audience}."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_constraints(self, context: DocumentationContext,
                                   patterns: List[PatternInstance]) -> str:
        """Generate the constraints section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Constraints section of arc42 documentation.
Focus on technical, organizational, and legal constraints."""
        
        all_entities = list(self.hybrid_rag.graph_store.nodes.values())
        
        technologies = set()
        for entity in all_entities:
            if entity.entity_type == "module":
                imports = entity.metadata.get("imports", [])
                technologies.update(imports)
        
        user_prompt = f"""Create the Constraints section for a software system.

Identified Technologies:
{', '.join(list(technologies)[:20])}

Design Guidelines:
{context.design_guidelines or 'Not specified'}

Include:
1. Technical constraints (programming languages, frameworks, platforms)
2. Organizational constraints (team structure, development process, deadlines)
3. Legal constraints (licenses, compliance requirements)
4. Conventions (coding standards, architectural patterns)

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_context_scope(self, context: DocumentationContext,
                                     patterns: List[PatternInstance]) -> str:
        """Generate the context and scope section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Context and Scope section of arc42 documentation.
Focus on system boundaries and external interfaces."""
        
        modules = [
            entity for entity in self.hybrid_rag.graph_store.nodes.values()
            if entity.entity_type == "module"
        ]
        
        external_dependencies = set()
        for module in modules:
            imports = module.metadata.get("imports", [])
            for imp in imports:
                if not any(imp.startswith(m.name) for m in modules):
                    external_dependencies.add(imp)
        
        user_prompt = f"""Create the Context and Scope section for a software system.

External Dependencies Identified:
{', '.join(list(external_dependencies)[:15])}

Requirements:
{context.requirements or 'Not specified'}

Include:
1. Business context (external entities the system interacts with)
2. Technical context (external systems, databases, APIs)
3. System boundaries (what is inside vs outside the system)
4. External interfaces

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_solution_strategy(self, context: DocumentationContext,
                                         patterns: List[PatternInstance]) -> str:
        """Generate the solution strategy section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Solution Strategy section of arc42 documentation.
Focus on fundamental architectural decisions and approaches."""
        
        pattern_summary = []
        for pattern in patterns[:10]:
            pattern_summary.append(f"- {pattern.pattern_type.value}: {pattern.description}")
        
        user_prompt = f"""Create the Solution Strategy section for a software system.

Identified Architectural Patterns:
{chr(10).join(pattern_summary) if pattern_summary else 'No specific patterns identified'}

Design Guidelines:
{context.design_guidelines or 'Not specified'}

Quality Goals:
{context.business_goals or 'Not specified'}

Include:
1. Technology decisions (frameworks, libraries, platforms)
2. Architectural patterns and styles
3. Approaches to achieve quality goals
4. Fundamental design decisions

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_building_blocks(self, context: DocumentationContext,
                                       patterns: List[PatternInstance]) -> str:
        """Generate the building blocks view section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Building Blocks View section of arc42 documentation.
Focus on static structure and component organization."""
        
        modules = [
            entity for entity in self.hybrid_rag.graph_store.nodes.values()
            if entity.entity_type == "module"
        ]
        
        classes = [
            entity for entity in self.hybrid_rag.graph_store.nodes.values()
            if entity.entity_type == "class"
        ]
        
        module_summary = []
        for module in modules[:15]:
            module_summary.append(f"- {module.name} ({module.file_path})")
        
        class_summary = []
        for cls in classes[:20]:
            class_summary.append(f"- {cls.name}")
        
        user_prompt = f"""Create the Building Blocks View section for a software system.

Modules ({len(modules)} total):
{chr(10).join(module_summary)}

Key Classes ({len(classes)} total):
{chr(10).join(class_summary)}

Include:
1. High-level component structure
2. Major modules and their responsibilities
3. Key classes and their roles
4. Component relationships and dependencies
5. Hierarchical decomposition

Write in a clear, professional style with appropriate detail."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_runtime_view(self, context: DocumentationContext,
                                    patterns: List[PatternInstance]) -> str:
        """Generate the runtime view section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Runtime View section of arc42 documentation.
Focus on dynamic behavior and component interactions."""
        
        observer_patterns = [p for p in patterns if p.pattern_type == PatternType.OBSERVER]
        
        user_prompt = f"""Create the Runtime View section for a software system.

Identified Interaction Patterns:
{chr(10).join([f"- {p.description}" for p in observer_patterns[:5]]) if observer_patterns else 'Standard interaction patterns'}

Include:
1. Important runtime scenarios
2. Component interactions and message flows
3. Concurrency and threading considerations
4. State management approaches

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_deployment_view(self, context: DocumentationContext,
                                       patterns: List[PatternInstance]) -> str:
        """Generate the deployment view section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Deployment View section of arc42 documentation.
Focus on physical infrastructure and deployment topology."""
        
        user_prompt = f"""Create the Deployment View section for a software system.

Requirements:
{context.requirements or 'Not specified'}

Include:
1. Infrastructure requirements
2. Deployment topology
3. Hardware and network considerations
4. Scaling and redundancy approaches

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_crosscutting_concepts(self, context: DocumentationContext,
                                            patterns: List[PatternInstance]) -> str:
        """Generate the crosscutting concepts section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Crosscutting Concepts section of arc42 documentation.
Focus on recurring themes and patterns across the system."""
        
        pattern_types = set(p.pattern_type for p in patterns)
        
        user_prompt = f"""Create the Crosscutting Concepts section for a software system.

Identified Patterns:
{', '.join([pt.value for pt in pattern_types])}

Design Guidelines:
{context.design_guidelines or 'Not specified'}

Include:
1. Domain models and business rules
2. Error handling and logging
3. Security and authentication
4. Transaction management
5. Configuration management

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_decisions(self, context: DocumentationContext,
                                patterns: List[PatternInstance]) -> str:
        """Generate the architectural decisions section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Architectural Decisions section of arc42 documentation.
Focus on significant design choices and their rationale."""
        
        high_confidence_patterns = [p for p in patterns if p.confidence > 0.7]
        
        user_prompt = f"""Create the Architectural Decisions section for a software system.

Key Architectural Patterns Identified:
{chr(10).join([f"- {p.pattern_type.value} (confidence: {p.confidence:.2f})" for p in high_confidence_patterns[:10]])}

Design Guidelines:
{context.design_guidelines or 'Not specified'}

Include:
1. Major architectural decisions
2. Rationale for each decision
3. Alternatives considered
4. Consequences and trade-offs

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_quality_requirements(self, context: DocumentationContext,
                                           patterns: List[PatternInstance]) -> str:
        """Generate the quality requirements section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Quality Requirements section of arc42 documentation.
Focus on non-functional requirements and quality attributes."""
        
        user_prompt = f"""Create the Quality Requirements section for a software system.

Business Goals:
{context.business_goals or 'Not specified'}

Requirements:
{context.requirements or 'Not specified'}

Include:
1. Quality tree (hierarchy of quality goals)
2. Quality scenarios (concrete examples)
3. Performance requirements
4. Security requirements
5. Maintainability requirements

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_risks_debt(self, context: DocumentationContext,
                                 patterns: List[PatternInstance]) -> str:
        """Generate the risks and technical debt section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Risks and Technical Debt section of arc42 documentation.
Focus on potential problems and areas for improvement."""
        
        all_entities = list(self.hybrid_rag.graph_store.nodes.values())
        
        user_prompt = f"""Create the Risks and Technical Debt section for a software system.

System Size: {len(all_entities)} code entities analyzed

Include:
1. Known technical risks
2. Technical debt items
3. Potential scalability issues
4. Security concerns
5. Maintenance challenges

Write in a clear, professional style."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text
    
    async def generate_glossary(self, context: DocumentationContext,
                               patterns: List[PatternInstance]) -> str:
        """Generate the glossary section."""
        system_prompt = """You are an expert technical writer creating architecture documentation.
Generate the Glossary section of arc42 documentation.
Focus on defining important terms and concepts."""
        
        classes = [
            entity for entity in self.hybrid_rag.graph_store.nodes.values()
            if entity.entity_type == "class"
        ]
        
        key_classes = [cls.name for cls in classes[:20]]
        
        user_prompt = f"""Create the Glossary section for a software system.

Key Classes/Components:
{', '.join(key_classes)}

Include:
1. Domain-specific terms
2. Technical terminology
3. Abbreviations and acronyms
4. Component names and their meanings

Write in a clear, professional style with concise definitions."""
        
        result = await self.llm_provider.generate(user_prompt, system_prompt)
        return result.text

class DocumentationFormatter:
    @staticmethod
    def format_as_markdown(document: Arc42Document) -> str:
        """Format the arc42 document as Markdown."""
        markdown_parts = []
        
        markdown_parts.append("# Architecture Documentation")
        markdown_parts.append(f"\nGenerated: {document.generation_timestamp}\n")
        
        section_titles = {
            Arc42Section.INTRODUCTION_GOALS: "1. Introduction and Goals",
            Arc42Section.CONSTRAINTS: "2. Constraints",
            Arc42Section.CONTEXT_SCOPE: "3. Context and Scope",
            Arc42Section.SOLUTION_STRATEGY: "4. Solution Strategy",
            Arc42Section.BUILDING_BLOCKS: "5. Building Blocks View",
            Arc42Section.RUNTIME_VIEW: "6. Runtime View",
            Arc42Section.DEPLOYMENT_VIEW: "7. Deployment View",
            Arc42Section.CROSSCUTTING_CONCEPTS: "8. Crosscutting Concepts",
            Arc42Section.DECISIONS: "9. Architectural Decisions",
            Arc42Section.QUALITY_REQUIREMENTS: "10. Quality Requirements",
            Arc42Section.RISKS_DEBT: "11. Risks and Technical Debt",
            Arc42Section.GLOSSARY: "12. Glossary"
        }
        
        for section_type in Arc42Section:
            title = section_titles[section_type]
            content = document.sections.get(section_type, "")
            
            markdown_parts.append(f"\n## {title}\n")
            markdown_parts.append(content)
            markdown_parts.append("\n")
        
        return "\n".join(markdown_parts)
    
    @staticmethod
    def format_as_asciidoc(document: Arc42Document) -> str:
        """Format the arc42 document as AsciiDoc."""
        asciidoc_parts = []
        
        asciidoc_parts.append("= Architecture Documentation")
        asciidoc_parts.append(f":generated: {document.generation_timestamp}")
        asciidoc_parts.append(":toc: left")
        asciidoc_parts.append(":toclevels: 3\n")
        
        section_titles = {
            Arc42Section.INTRODUCTION_GOALS: "Introduction and Goals",
            Arc42Section.CONSTRAINTS: "Constraints",
            Arc42Section.CONTEXT_SCOPE: "Context and Scope",
            Arc42Section.SOLUTION_STRATEGY: "Solution Strategy",
            Arc42Section.BUILDING_BLOCKS: "Building Blocks View",
            Arc42Section.RUNTIME_VIEW: "Runtime View",
            Arc42Section.DEPLOYMENT_VIEW: "Deployment View",
            Arc42Section.CROSSCUTTING_CONCEPTS: "Crosscutting Concepts",
            Arc42Section.DECISIONS: "Architectural Decisions",
            Arc42Section.QUALITY_REQUIREMENTS: "Quality Requirements",
            Arc42Section.RISKS_DEBT: "Risks and Technical Debt",
            Arc42Section.GLOSSARY: "Glossary"
        }
        
        for section_type in Arc42Section:
            title = section_titles[section_type]
            content = document.sections.get(section_type, "")
            
            asciidoc_parts.append(f"\n== {title}\n")
            asciidoc_parts.append(content)
            asciidoc_parts.append("\n")
        
        return "\n".join(asciidoc_parts)

This documentation generation system transforms technical analysis results into comprehensive arc42-formatted documentation. Each section generator creates appropriate prompts for the LLM that include relevant analysis results, identified patterns, and user-provided context. The LLM synthesizes this information into coherent narrative text that explains the architecture clearly and comprehensively.

The system supports multiple output formats including Markdown and AsciiDoc, allowing the generated documentation to integrate with various documentation toolchains. The structured approach ensures consistency across all documentation sections while allowing flexibility in the level of detail and writing style based on the target audience.

GIT INTEGRATION AND CHANGE DETECTION

Integrating with Git repositories enables the documentation agent to operate automatically as part of the development workflow. The system monitors repository changes, identifies affected code components, and triggers incremental documentation updates. This integration transforms documentation from a periodic manual task into a continuous automated process.

The Git integration operates through webhook handlers that receive notifications when developers push commits to the repository. The change detection system analyzes the commit diff to identify modified files and affected code entities. Rather than reanalyzing the entire codebase, the system performs targeted analysis of changed components and their dependencies. The incremental update process regenerates only the documentation sections affected by the changes, significantly improving efficiency for large codebases.

The system maintains a mapping between code entities and documentation sections, enabling precise impact analysis. When a class changes, the system knows which building block view subsections reference that class. When a module's dependencies change, the system knows which context diagrams need updating. This fine-grained tracking ensures that documentation updates remain synchronized with code changes while minimizing unnecessary regeneration.

Here is an implementation of the Git integration and change detection system:

import git
from typing import List, Dict, Set, Optional, Any
from pathlib import Path
from dataclasses import dataclass
import hashlib
from datetime import datetime

@dataclass
class FileChange:
    file_path: str
    change_type: str
    old_content: Optional[str]
    new_content: Optional[str]

@dataclass
class CodeChange:
    commit_hash: str
    author: str
    timestamp: datetime
    message: str
    file_changes: List[FileChange]

class GitMonitor:
    def __init__(self, repo_path: str):
        self.repo_path = repo_path
        self.repo = git.Repo(repo_path)
        self.last_processed_commit = None
        
    def get_recent_commits(self, since_commit: Optional[str] = None) -> List[CodeChange]:
        """Retrieve commits since the last processed commit."""
        commits = []
        
        if since_commit:
            commit_range = f"{since_commit}..HEAD"
        else:
            commit_range = "HEAD~10..HEAD"
        
        for commit in self.repo.iter_commits(commit_range):
            file_changes = []
            
            if commit.parents:
                parent = commit.parents[0]
                diffs = parent.diff(commit)
                
                for diff in diffs:
                    change_type = "modified"
                    if diff.new_file:
                        change_type = "added"
                    elif diff.deleted_file:
                        change_type = "deleted"
                    elif diff.renamed_file:
                        change_type = "renamed"
                    
                    old_content = None
                    new_content = None
                    
                    if diff.a_blob:
                        try:
                            old_content = diff.a_blob.data_stream.read().decode('utf-8')
                        except:
                            old_content = None
                    
                    if diff.b_blob:
                        try:
                            new_content = diff.b_blob.data_stream.read().decode('utf-8')
                        except:
                            new_content = None
                    
                    file_changes.append(FileChange(
                        file_path=diff.b_path or diff.a_path,
                        change_type=change_type,
                        old_content=old_content,
                        new_content=new_content
                    ))
            
            commits.append(CodeChange(
                commit_hash=commit.hexsha,
                author=str(commit.author),
                timestamp=datetime.fromtimestamp(commit.committed_date),
                message=commit.message,
                file_changes=file_changes
            ))
        
        return commits
    
    def get_current_commit(self) -> str:
        """Get the current HEAD commit hash."""
        return self.repo.head.commit.hexsha

class ChangeImpactAnalyzer:
    def __init__(self, hybrid_rag: HybridRAG, codebase_analyzer: CodebaseAnalyzer):
        self.hybrid_rag = hybrid_rag
        self.codebase_analyzer = codebase_analyzer
        self.entity_to_sections: Dict[str, Set[Arc42Section]] = {}
        
    def analyze_change_impact(self, file_changes: List[FileChange]) -> Set[Arc42Section]:
        """Determine which documentation sections are affected by code changes."""
        affected_sections = set()
        affected_entities = set()
        
        for file_change in file_changes:
            if not file_change.file_path.endswith('.py'):
                continue
            
            entity_id = self.codebase_analyzer.create_entity_id(
                file_change.file_path, "module"
            )
            
            if entity_id in self.hybrid_rag.graph_store.nodes:
                affected_entities.add(entity_id)
                
                dependencies = self.hybrid_rag.graph_store.get_dependencies(entity_id)
                affected_entities.update(dependencies)
                
                dependents = self.hybrid_rag.graph_store.get_dependents(entity_id)
                affected_entities.update(dependents)
        
        for entity_id in affected_entities:
            sections = self.entity_to_sections.get(entity_id, set())
            affected_sections.update(sections)
        
        if not affected_sections:
            affected_sections = {
                Arc42Section.BUILDING_BLOCKS,
                Arc42Section.RUNTIME_VIEW,
                Arc42Section.SOLUTION_STRATEGY
            }
        
        return affected_sections
    
    def update_entity_section_mapping(self, entity_id: str, sections: Set[Arc42Section]) -> None:
        """Record which documentation sections reference a code entity."""
        if entity_id not in self.entity_to_sections:
            self.entity_to_sections[entity_id] = set()
        self.entity_to_sections[entity_id].update(sections)
    
    def get_affected_entities(self, file_changes: List[FileChange]) -> Set[str]:
        """Identify code entities affected by file changes."""
        affected_entities = set()
        
        for file_change in file_changes:
            if not file_change.file_path.endswith('.py'):
                continue
            
            entity_id = self.codebase_analyzer.create_entity_id(
                file_change.file_path, "module"
            )
            affected_entities.add(entity_id)
        
        return affected_entities

class IncrementalDocumentationUpdater:
    def __init__(self, documentation_generator: DocumentationGenerator,
                 change_impact_analyzer: ChangeImpactAnalyzer,
                 codebase_analyzer: CodebaseAnalyzer):
        self.documentation_generator = documentation_generator
        self.change_impact_analyzer = change_impact_analyzer
        self.codebase_analyzer = codebase_analyzer
        self.current_document: Optional[Arc42Document] = None
        
    async def update_documentation(self, code_changes: List[CodeChange],
                                   context: DocumentationContext,
                                   patterns: List[PatternInstance]) -> Arc42Document:
        """Update documentation based on code changes."""
        all_file_changes = []
        for code_change in code_changes:
            all_file_changes.extend(code_change.file_changes)
        
        affected_sections = self.change_impact_analyzer.analyze_change_impact(all_file_changes)
        
        affected_entities = self.change_impact_analyzer.get_affected_entities(all_file_changes)
        for entity_id in affected_entities:
            if entity_id in self.codebase_analyzer.hybrid_rag.graph_store.nodes:
                entity = self.codebase_analyzer.hybrid_rag.graph_store.nodes[entity_id]
                
                for file_change in all_file_changes:
                    if file_change.file_path == entity.file_path and file_change.new_content:
                        try:
                            module_info = self.codebase_analyzer.python_analyzer.analyze_file(
                                file_change.file_path
                            )
                            
                            self.codebase_analyzer.build_dependency_graph({
                                file_change.file_path: module_info
                            })
                        except Exception as e:
                            print(f"Error reanalyzing {file_change.file_path}: {str(e)}")
        
        if self.current_document is None:
            return await self.documentation_generator.generate_documentation(context, patterns)
        
        updated_sections = dict(self.current_document.sections)
        
        for section in affected_sections:
            generator = self.documentation_generator.section_generators[section]
            updated_content = await generator(context, patterns)
            updated_sections[section] = updated_content
        
        return Arc42Document(
            sections=updated_sections,
            metadata={
                "context": context,
                "patterns_found": len(patterns),
                "updated_sections": [s.value for s in affected_sections],
                "change_count": len(code_changes)
            },
            generation_timestamp=datetime.now().isoformat()
        )
    
    def set_current_document(self, document: Arc42Document) -> None:
        """Store the current documentation state."""
        self.current_document = document

class GitWebhookHandler:
    def __init__(self, incremental_updater: IncrementalDocumentationUpdater,
                 git_monitor: GitMonitor,
                 pattern_recognizer: PatternRecognizer):
        self.incremental_updater = incremental_updater
        self.git_monitor = git_monitor
        self.pattern_recognizer = pattern_recognizer
        
    async def handle_push_event(self, payload: Dict[str, Any],
                                context: DocumentationContext) -> Arc42Document:
        """Handle a Git push webhook event."""
        commits = self.git_monitor.get_recent_commits()
        
        if not commits:
            return self.incremental_updater.current_document
        
        patterns = self.pattern_recognizer.recognize_patterns()
        
        updated_document = await self.incremental_updater.update_documentation(
            commits, context, patterns
        )
        
        self.incremental_updater.set_current_document(updated_document)
        
        return updated_document

This Git integration system enables continuous documentation updates synchronized with code changes. The change impact analysis ensures that only affected documentation sections are regenerated, making the system efficient even for large codebases with frequent changes. The webhook handler provides a standard interface for integrating with Git hosting platforms such as GitHub, GitLab, or Bitbucket.

The incremental update approach represents a significant efficiency improvement over full regeneration. For a codebase with hundreds of modules, a change to a single module might only require updating two or three arc42 sections rather than all twelve. This targeted approach reduces both computation time and LLM token usage, making continuous documentation practical for active development projects.

CONCURRENCY AND PARALLELIZATION STRATEGIES

Efficient processing of large codebases requires extensive use of concurrency and parallelization. The documentation agent employs multiple strategies to maximize throughput while managing resource constraints such as LLM rate limits and memory usage.

The system implements concurrency at several levels. At the highest level, independent code modules can be analyzed in parallel. The codebase analyzer partitions the repository into independent subtrees and analyzes each subtree concurrently. At the pattern recognition level, different pattern detectors run in parallel since they operate independently on the same graph structure. At the documentation generation level, independent arc42 sections can be generated concurrently when they do not share dependencies.

The orchestrator manages concurrency limits to prevent resource exhaustion. It maintains separate limits for CPU-intensive tasks such as code parsing, I/O-intensive tasks such as file reading, and API-limited tasks such as LLM calls. The system uses semaphores to enforce these limits while maximizing utilization of available resources.

Here is an implementation of the concurrency and parallelization system:

import asyncio
from typing import List, Dict, Set, Optional, Callable, Any
from dataclasses import dataclass
import time

@dataclass
class ResourceLimits:
    max_concurrent_cpu_tasks: int = 4
    max_concurrent_io_tasks: int = 10
    max_concurrent_llm_calls: int = 3
    llm_calls_per_minute: int = 60

class ConcurrencyManager:
    def __init__(self, limits: ResourceLimits):
        self.limits = limits
        self.cpu_semaphore = asyncio.Semaphore(limits.max_concurrent_cpu_tasks)
        self.io_semaphore = asyncio.Semaphore(limits.max_concurrent_io_tasks)
        self.llm_semaphore = asyncio.Semaphore(limits.max_concurrent_llm_calls)
        self.llm_call_times: List[float] = []
        
    async def run_cpu_task(self, task: Callable) -> Any:
        """Execute a CPU-intensive task with concurrency control."""
        async with self.cpu_semaphore:
            return await task()
    
    async def run_io_task(self, task: Callable) -> Any:
        """Execute an I/O-intensive task with concurrency control."""
        async with self.io_semaphore:
            return await task()
    
    async def run_llm_call(self, task: Callable) -> Any:
        """Execute an LLM API call with rate limiting."""
        await self.enforce_rate_limit()
        
        async with self.llm_semaphore:
            self.llm_call_times.append(time.time())
            return await task()
    
    async def enforce_rate_limit(self) -> None:
        """Ensure LLM calls do not exceed rate limits."""
        current_time = time.time()
        
        self.llm_call_times = [
            t for t in self.llm_call_times
            if current_time - t < 60
        ]
        
        if len(self.llm_call_times) >= self.limits.llm_calls_per_minute:
            oldest_call = min(self.llm_call_times)
            wait_time = 60 - (current_time - oldest_call)
            if wait_time > 0:
                await asyncio.sleep(wait_time)

class ParallelCodebaseAnalyzer:
    def __init__(self, codebase_analyzer: CodebaseAnalyzer,
                 concurrency_manager: ConcurrencyManager):
        self.codebase_analyzer = codebase_analyzer
        self.concurrency_manager = concurrency_manager
        
    async def analyze_codebase_parallel(self, root_path: str) -> Dict[str, ModuleInfo]:
        """Analyze codebase with parallel processing of independent modules."""
        all_files = self.discover_source_files(root_path)
        
        analysis_tasks = []
        for file_path in all_files:
            task = self.analyze_file_async(file_path)
            analysis_tasks.append(task)
        
        results = await asyncio.gather(*analysis_tasks, return_exceptions=True)
        
        module_infos = {}
        for file_path, result in zip(all_files, results):
            if isinstance(result, Exception):
                print(f"Error analyzing {file_path}: {str(result)}")
            else:
                module_infos[file_path] = result
        
        return module_infos
    
    def discover_source_files(self, root_path: str) -> List[str]:
        """Discover all source files in the codebase."""
        from pathlib import Path
        
        root = Path(root_path)
        source_files = []
        
        exclude_patterns = ['__pycache__', '.git', 'venv', 'node_modules']
        
        for file_path in root.rglob('*.py'):
            if any(pattern in str(file_path) for pattern in exclude_patterns):
                continue
            source_files.append(str(file_path))
        
        return source_files
    
    async def analyze_file_async(self, file_path: str) -> ModuleInfo:
        """Analyze a single file asynchronously."""
        async def analyze():
            return self.codebase_analyzer.python_analyzer.analyze_file(file_path)
        
        return await self.concurrency_manager.run_cpu_task(analyze)

class ParallelPatternRecognizer:
    def __init__(self, pattern_recognizer: PatternRecognizer,
                 concurrency_manager: ConcurrencyManager):
        self.pattern_recognizer = pattern_recognizer
        self.concurrency_manager = concurrency_manager
        
    async def recognize_patterns_parallel(self) -> List[PatternInstance]:
        """Run all pattern detectors in parallel."""
        detection_tasks = []
        
        for pattern_type, detector in self.pattern_recognizer.pattern_detectors.items():
            task = self.run_detector_async(detector)
            detection_tasks.append(task)
        
        results = await asyncio.gather(*detection_tasks, return_exceptions=True)
        
        all_patterns = []
        for result in results:
            if isinstance(result, Exception):
                print(f"Pattern detection error: {str(result)}")
            else:
                all_patterns.extend(result)
        
        all_patterns.sort(key=lambda p: p.confidence, reverse=True)
        return all_patterns
    
    async def run_detector_async(self, detector: Callable) -> List[PatternInstance]:
        """Run a pattern detector asynchronously."""
        async def detect():
            return detector()
        
        return await self.concurrency_manager.run_cpu_task(detect)

class ParallelDocumentationGenerator:
    def __init__(self, documentation_generator: DocumentationGenerator,
                 concurrency_manager: ConcurrencyManager):
        self.documentation_generator = documentation_generator
        self.concurrency_manager = concurrency_manager
        
    async def generate_documentation_parallel(self, context: DocumentationContext,
                                             patterns: List[PatternInstance]) -> Arc42Document:
        """Generate documentation sections in parallel where possible."""
        from datetime import datetime
        
        independent_sections = [
            Arc42Section.INTRODUCTION_GOALS,
            Arc42Section.CONSTRAINTS,
            Arc42Section.GLOSSARY
        ]
        
        dependent_sections = [
            Arc42Section.CONTEXT_SCOPE,
            Arc42Section.SOLUTION_STRATEGY,
            Arc42Section.BUILDING_BLOCKS,
            Arc42Section.RUNTIME_VIEW,
            Arc42Section.DEPLOYMENT_VIEW,
            Arc42Section.CROSSCUTTING_CONCEPTS,
            Arc42Section.DECISIONS,
            Arc42Section.QUALITY_REQUIREMENTS,
            Arc42Section.RISKS_DEBT
        ]
        
        independent_tasks = []
        for section in independent_sections:
            task = self.generate_section_async(section, context, patterns)
            independent_tasks.append((section, task))
        
        independent_results = await asyncio.gather(
            *[task for _, task in independent_tasks],
            return_exceptions=True
        )
        
        sections = {}
        for (section, _), result in zip(independent_tasks, independent_results):
            if isinstance(result, Exception):
                print(f"Error generating {section.value}: {str(result)}")
                sections[section] = f"Error generating section: {str(result)}"
            else:
                sections[section] = result
        
        for section in dependent_sections:
            try:
                content = await self.generate_section_async(section, context, patterns)
                sections[section] = content
            except Exception as e:
                print(f"Error generating {section.value}: {str(e)}")
                sections[section] = f"Error generating section: {str(e)}"
        
        return Arc42Document(
            sections=sections,
            metadata={
                "context": context,
                "patterns_found": len(patterns)
            },
            generation_timestamp=datetime.now().isoformat()
        )
    
    async def generate_section_async(self, section: Arc42Section,
                                     context: DocumentationContext,
                                     patterns: List[PatternInstance]) -> str:
        """Generate a documentation section asynchronously with rate limiting."""
        generator = self.documentation_generator.section_generators[section]
        
        async def generate():
            return await generator(context, patterns)
        
        return await self.concurrency_manager.run_llm_call(generate)

This concurrency system maximizes throughput while respecting resource constraints. The parallel codebase analyzer can process hundreds of source files concurrently, limited only by CPU cores and I/O bandwidth. The parallel pattern recognizer runs all pattern detectors simultaneously since they operate independently on shared data. The parallel documentation generator executes independent sections concurrently while serializing dependent sections to maintain consistency.

The rate limiting for LLM calls prevents exceeding API quotas while maintaining high utilization. The system tracks recent API calls and automatically delays new calls when approaching rate limits. This approach ensures reliable operation even when processing large codebases that require hundreds of LLM calls for complete documentation generation.

COMPLETE RUNNING EXAMPLE

The following section presents a complete, production-ready implementation of the autonomous documentation agent system. This implementation integrates all components discussed in previous sections into a cohesive application that can analyze real codebases and generate arc42 documentation.

import asyncio
import logging
import sys
import os
from pathlib import Path
from typing import Optional, Dict, Any
import json
import argparse

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

class DocumentationAgentSystem:
    """
    Complete autonomous documentation agent system integrating all components.
    This system analyzes codebases, recognizes patterns, and generates arc42 documentation.
    """
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)
        
        self.hybrid_rag = HybridRAG(embedding_dimension=768)
        
        self.codebase_analyzer = CodebaseAnalyzer(self.hybrid_rag)
        
        self.pattern_recognizer = PatternRecognizer(self.hybrid_rag)
        
        llm_config = LLMConfig(
            model_name=config.get('model_name', 'gpt-3.5-turbo'),
            temperature=config.get('temperature', 0.7),
            max_tokens=config.get('max_tokens', 2048)
        )
        
        provider_type = config.get('provider_type', 'openai')
        if provider_type == 'openai':
            api_key = config.get('api_key', os.environ.get('OPENAI_API_KEY'))
            self.llm_provider = OpenAIProvider(llm_config, api_key)
        elif provider_type == 'local':
            model_path = config.get('model_path')
            hardware_type = HardwareDetector.detect_hardware()
            self.llm_provider = LocalLLMProvider(llm_config, model_path, hardware_type)
        else:
            raise ValueError(f"Unknown provider type: {provider_type}")
        
        self.documentation_generator = DocumentationGenerator(
            self.llm_provider,
            self.hybrid_rag
        )
        
        resource_limits = ResourceLimits(
            max_concurrent_cpu_tasks=config.get('max_cpu_tasks', 4),
            max_concurrent_io_tasks=config.get('max_io_tasks', 10),
            max_concurrent_llm_calls=config.get('max_llm_calls', 3),
            llm_calls_per_minute=config.get('llm_rate_limit', 60)
        )
        self.concurrency_manager = ConcurrencyManager(resource_limits)
        
        self.parallel_analyzer = ParallelCodebaseAnalyzer(
            self.codebase_analyzer,
            self.concurrency_manager
        )
        
        self.parallel_pattern_recognizer = ParallelPatternRecognizer(
            self.pattern_recognizer,
            self.concurrency_manager
        )
        
        self.parallel_doc_generator = ParallelDocumentationGenerator(
            self.documentation_generator,
            self.concurrency_manager
        )
        
        self.organizer = OrganizerAgent(max_concurrent_tasks=10)
        
        if 'repo_path' in config:
            self.git_monitor = GitMonitor(config['repo_path'])
            self.change_impact_analyzer = ChangeImpactAnalyzer(
                self.hybrid_rag,
                self.codebase_analyzer
            )
            self.incremental_updater = IncrementalDocumentationUpdater(
                self.documentation_generator,
                self.change_impact_analyzer,
                self.codebase_analyzer
            )
            self.webhook_handler = GitWebhookHandler(
                self.incremental_updater,
                self.git_monitor,
                self.pattern_recognizer
            )
        else:
            self.git_monitor = None
            self.webhook_handler = None
    
    async def analyze_and_document(self, repo_path: str,
                                   context: Optional[DocumentationContext] = None) -> Arc42Document:
        """
        Main workflow: analyze codebase and generate documentation.
        
        This method orchestrates the complete documentation generation process:
        1. Analyzes the codebase structure
        2. Builds dependency graphs
        3. Recognizes architectural patterns
        4. Generates arc42 documentation
        """
        if context is None:
            context = DocumentationContext()
        
        self.logger.info(f"Starting codebase analysis for {repo_path}")
        
        module_infos = await self.parallel_analyzer.analyze_codebase_parallel(repo_path)
        self.logger.info(f"Analyzed {len(module_infos)} modules")
        
        self.logger.info("Building dependency graph")
        self.codebase_analyzer.build_dependency_graph(module_infos)
        
        self.logger.info("Recognizing architectural patterns")
        patterns = await self.parallel_pattern_recognizer.recognize_patterns_parallel()
        self.logger.info(f"Identified {len(patterns)} pattern instances")
        
        for pattern in patterns[:10]:
            self.logger.info(
                f"  - {pattern.pattern_type.value} "
                f"(confidence: {pattern.confidence:.2f}): {pattern.description}"
            )
        
        self.logger.info("Generating arc42 documentation")
        document = await self.parallel_doc_generator.generate_documentation_parallel(
            context,
            patterns
        )
        
        self.logger.info("Documentation generation complete")
        return document
    
    async def incremental_update(self, context: Optional[DocumentationContext] = None) -> Arc42Document:
        """
        Perform incremental documentation update based on Git changes.
        
        This method is more efficient than full regeneration for active repositories
        with frequent changes.
        """
        if not self.webhook_handler:
            raise ValueError("Git integration not configured")
        
        if context is None:
            context = DocumentationContext()
        
        self.logger.info("Checking for code changes")
        commits = self.git_monitor.get_recent_commits()
        
        if not commits:
            self.logger.info("No changes detected")
            return self.incremental_updater.current_document
        
        self.logger.info(f"Processing {len(commits)} commits")
        
        patterns = await self.parallel_pattern_recognizer.recognize_patterns_parallel()
        
        document = await self.incremental_updater.update_documentation(
            commits,
            context,
            patterns
        )
        
        self.logger.info("Incremental update complete")
        return document
    
    def save_documentation(self, document: Arc42Document, output_path: str,
                          format: str = 'markdown') -> None:
        """
        Save generated documentation to a file.
        
        Supports multiple output formats for integration with various documentation systems.
        """
        if format == 'markdown':
            content = DocumentationFormatter.format_as_markdown(document)
            extension = '.md'
        elif format == 'asciidoc':
            content = DocumentationFormatter.format_as_asciidoc(document)
            extension = '.adoc'
        else:
            raise ValueError(f"Unknown format: {format}")
        
        output_file = Path(output_path)
        if output_file.is_dir():
            output_file = output_file / f"architecture{extension}"
        
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(content)
        
        self.logger.info(f"Documentation saved to {output_file}")
    
    def get_system_info(self) -> Dict[str, Any]:
        """
        Retrieve information about the system configuration and capabilities.
        """
        hardware_info = HardwareDetector.get_hardware_info()
        
        return {
            "hardware": hardware_info,
            "config": {
                "provider_type": self.config.get('provider_type'),
                "model_name": self.config.get('model_name'),
                "max_cpu_tasks": self.config.get('max_cpu_tasks', 4),
                "max_llm_calls": self.config.get('max_llm_calls', 3)
            },
            "capabilities": {
                "git_integration": self.git_monitor is not None,
                "incremental_updates": self.webhook_handler is not None,
                "parallel_processing": True,
                "pattern_recognition": True
            }
        }

async def main():
    """
    Main entry point for the documentation agent application.
    
    This function handles command-line arguments and orchestrates the documentation
    generation workflow.
    """
    parser = argparse.ArgumentParser(
        description='Autonomous LLM-based Documentation Agent'
    )
    parser.add_argument(
        'repo_path',
        help='Path to the code repository to document'
    )
    parser.add_argument(
        '--output',
        default='./docs',
        help='Output directory for generated documentation'
    )
    parser.add_argument(
        '--format',
        choices=['markdown', 'asciidoc'],
        default='markdown',
        help='Output format for documentation'
    )
    parser.add_argument(
        '--provider',
        choices=['openai', 'local'],
        default='openai',
        help='LLM provider to use'
    )
    parser.add_argument(
        '--model',
        default='gpt-3.5-turbo',
        help='Model name or path'
    )
    parser.add_argument(
        '--api-key',
        help='API key for cloud LLM provider'
    )
    parser.add_argument(
        '--business-goals',
        help='Business goals and objectives for the system'
    )
    parser.add_argument(
        '--design-guidelines',
        help='Design and coding guidelines to follow'
    )
    parser.add_argument(
        '--requirements',
        help='System requirements and constraints'
    )
    parser.add_argument(
        '--incremental',
        action='store_true',
        help='Perform incremental update based on Git changes'
    )
    parser.add_argument(
        '--info',
        action='store_true',
        help='Display system information and exit'
    )
    
    args = parser.parse_args()
    
    config = {
        'repo_path': args.repo_path,
        'provider_type': args.provider,
        'model_name': args.model,
        'api_key': args.api_key,
        'max_cpu_tasks': 4,
        'max_io_tasks': 10,
        'max_llm_calls': 3,
        'llm_rate_limit': 60
    }
    
    if args.provider == 'local':
        config['model_path'] = args.model
    
    system = DocumentationAgentSystem(config)
    
    if args.info:
        info = system.get_system_info()
        print(json.dumps(info, indent=2))
        return
    
    context = DocumentationContext(
        business_goals=args.business_goals,
        design_guidelines=args.design_guidelines,
        requirements=args.requirements,
        target_audience="technical team",
        detail_level="comprehensive"
    )
    
    if args.incremental and system.git_monitor:
        document = await system.incremental_update(context)
    else:
        document = await system.analyze_and_document(args.repo_path, context)
    
    system.save_documentation(document, args.output, args.format)
    
    print(f"\nDocumentation generated successfully!")
    print(f"Output: {args.output}")
    print(f"Format: {args.format}")
    print(f"Sections: {len(document.sections)}")
    print(f"Timestamp: {document.generation_timestamp}")

if __name__ == '__main__':
    asyncio.run(main())

This complete implementation provides a production-ready autonomous documentation agent. The system supports both full documentation generation and incremental updates, works with multiple LLM providers and hardware configurations, and generates comprehensive arc42-formatted documentation.

The application can be used from the command line to analyze any Python codebase and generate documentation. It supports customization through command-line arguments for business goals, design guidelines, and requirements. The parallel processing capabilities ensure efficient operation even on large codebases with thousands of files.

The system demonstrates all key concepts discussed throughout this article including multi-agent orchestration, hybrid RAG storage, pattern recognition, LLM abstraction, Git integration, and concurrent processing. It represents a complete solution for automated architecture documentation that can integrate into existing development workflows.

CONCLUSION

Building an autonomous LLM-based documentation agent requires integrating multiple sophisticated components into a cohesive system. The multi-agent architecture enables specialization and parallel processing. The hybrid RAG approach combining vector and graph storage overcomes context window limitations while maintaining both semantic and structural information. The pattern recognition system identifies implicit architectural decisions and makes them explicit in documentation. The LLM abstraction layer provides flexibility across different providers and hardware platforms. The Git integration enables continuous documentation synchronized with code changes. The concurrency system maximizes throughput while respecting resource constraints.

The resulting system transforms documentation from a manual, periodic task into an automated, continuous process. As developers commit code changes, the documentation agent automatically updates the architecture documentation to reflect the current state of the system. This automation ensures that documentation remains accurate and current, addressing one of the most persistent challenges in software engineering.

The arc42 format provides a comprehensive structure for architecture documentation that covers all essential aspects from business context to technical implementation. The LLM-based generation produces natural language text that explains the architecture clearly for human readers while maintaining consistency and completeness across all sections.

Future enhancements could include support for additional programming languages beyond Python, integration with architecture modeling tools, automated diagram generation, and more sophisticated pattern recognition using machine learning models trained on large corpora of open source code. The modular architecture of the system makes such extensions straightforward to implement without disrupting existing functionality.

The autonomous documentation agent represents a significant step toward making architecture documentation a natural byproduct of the development process rather than a separate, burdensome activity. By leveraging the capabilities of large language models combined with sophisticated code analysis and graph-based knowledge representation, the system can generate documentation that rivals or exceeds what human architects produce manually while requiring minimal human intervention.

No comments: