Wednesday, June 10, 2026

CREATING COMPREHENSIVE CODE ANALYSIS WITH LARGE LANGUAGE MODELS


 


INTRODUCTION AND PROBLEM STATEMENT

Modern software development increasingly relies on large language models to assist with code understanding, generation, and analysis. Tools like GitHub Copilot, Windsurf, and Claude Code have demonstrated impressive capabilities in understanding code context, detecting issues, and proposing solutions. However, analyzing entire code repositories presents unique challenges that require sophisticated approaches beyond simple prompt engineering.

The core problem we address in this tutorial is how to build a production-ready code analysis system that can examine single files or entire Git repositories, identify issues with accurate criticality assessment, and propose actionable solutions. This system must handle multiple programming languages, understand configuration files, and provide verifiable findings without hallucination or fabricated issues.

The fundamental challenge stems from the inherent limitations of large language models when applied to large-scale code analysis. While LLMs possess extensive knowledge from training on vast code corpora and can recognize patterns across multiple programming languages, they face significant constraints. The context window limitation means they cannot hold entire large codebases in memory simultaneously. Their knowledge is frozen at training time, lacking awareness of recent framework updates or project-specific conventions. They struggle with temporal dependencies and the evolution of architectural decisions over time. Furthermore, they lack access to the social context that human developers possess through team interactions, meetings, and institutional knowledge.

To address these limitations, we will construct a multi-faceted solution that combines retrieval-augmented generation, graph-based knowledge representation, specialized agents, abstract syntax tree analysis, and meta-cognitive reflection. This tutorial will guide you through each component with detailed explanations and working code examples.

ARCHITECTURAL OVERVIEW AND PLANNING

Before diving into implementation, we must understand the overall architecture of our code analysis system. The system consists of several interconnected components, each addressing specific limitations of LLMs.

The first component is the User Interaction Agent, which serves as the primary interface. This agent gathers essential context including requirements documents, business goals, architectural documentation, coding conventions, and repository locations. It then creates a comprehensive analysis plan based on available information.

The second component is the RAG Storage System, which maintains multiple knowledge bases. One stores Architectural Decision Records to preserve the reasoning behind design choices. Another indexes source code files with their metadata. A third maintains project context including domain knowledge, meeting notes, and architectural principles. This distributed knowledge allows the system to retrieve relevant context without overwhelming the LLM's context window.

The third component is the GraphRAG System, which represents code relationships as a graph structure. Nodes represent code entities like classes, functions, and modules. Edges represent relationships such as function calls, inheritance, interface implementation, and dependencies. This graph structure enables sophisticated queries about code structure and impact analysis.

The fourth component is the Abstract Syntax Tree Parser System, which creates structured representations of code in different programming languages. This allows precise analysis of code structure without relying solely on text-based pattern matching.

The fifth component consists of Specialized Analysis Agents, each focused on a specific aspect. Language-specific agents understand the nuances of Python, JavaScript, TypeScript, HTML, and other languages. A metrics agent calculates code quality metrics. A smell detection agent identifies anti-patterns. An architecture agent evaluates high-level design decisions.

The sixth component is the Change Agent, which monitors repository modifications through Git operations. It identifies what changed, assesses the impact, and triggers relevant re-analysis.

The seventh component is the Meta-Reflection Layer, which evaluates the quality and confidence of the analysis itself. It identifies areas of uncertainty and suggests where human review is most needed.

The eighth component implements Chain of Thought and Tree of Thought reasoning to enable deeper analysis of complex code patterns and architectural decisions.

The ninth component is the Summarization System, which condenses findings across multiple files and creates hierarchical summaries at different levels of abstraction.

The tenth component is the Multithreading Coordinator, which parallelizes analysis across multiple files and modules to improve performance.

Finally, the Orchestration Layer coordinates all components, manages parallel processing of multiple files, aggregates results, and generates the final comprehensive report.

Now let us proceed with detailed implementation of each component.

COMPONENT ONE: USER INTERACTION AGENT AND CONTEXT GATHERING

The User Interaction Agent serves as the entry point for our code analysis system. Its primary responsibility is to gather all available context that will inform the analysis process. This agent must be conversational yet thorough, ensuring no critical information is overlooked.

The agent begins by asking about requirements documentation. Requirements provide the "why" behind the code, allowing the analysis to evaluate whether the implementation actually fulfills its intended purpose. Without requirements, the analysis can only evaluate code quality in isolation, missing potential misalignments between intent and implementation.

Next, the agent inquires about business goals. Business goals provide strategic context that helps prioritize findings. A performance issue might be critical for a high-frequency trading system but acceptable for an internal reporting tool. Understanding business goals allows the analysis to weight findings appropriately.

The agent then asks about architectural documentation. This includes high-level system design, component diagrams, deployment architecture, and most importantly, Architectural Decision Records. ADRs document why certain architectural choices were made, what alternatives were considered, and what trade-offs were accepted. This historical context prevents the analysis from flagging intentional design decisions as issues.

Following architecture, the agent requests information about architectural principles and guidelines. These might include adherence to SOLID principles, specific design patterns mandated by the organization, microservices guidelines, or domain-driven design principles. The analysis must evaluate code against these specific standards rather than generic best practices.

The agent also asks about coding conventions. These are language-specific and project-specific rules about naming, formatting, documentation, error handling, and testing. While some conventions can be checked by linters, others require semantic understanding that LLMs can provide.

Finally, the agent requests the location of the code to analyze. This could be a single file path, a directory, or a Git repository URL with optional branch specification.

Here is the implementation of the User Interaction Agent:

import os
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum


class ContextType(Enum):
    REQUIREMENTS = "requirements"
    BUSINESS_GOALS = "business_goals"
    ARCHITECTURE = "architecture"
    PRINCIPLES = "principles"
    CONVENTIONS = "conventions"
    CODE_LOCATION = "code_location"


@dataclass
class AnalysisContext:
    requirements: Optional[str] = None
    business_goals: Optional[str] = None
    architecture_docs: Optional[str] = None
    architectural_principles: Optional[List[str]] = None
    coding_conventions: Optional[str] = None
    code_location: Optional[str] = None
    repository_url: Optional[str] = None
    branch: Optional[str] = None
    is_git_repo: bool = False
    has_requirements: bool = False
    has_business_goals: bool = False
    has_architecture: bool = False
    has_principles: bool = False
    has_conventions: bool = False
    
    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)
    
    def get_missing_context(self) -> List[str]:
        missing = []
        if not self.has_requirements:
            missing.append("Requirements Documentation")
        if not self.has_business_goals:
            missing.append("Business Goals")
        if not self.has_architecture:
            missing.append("Architecture Documentation")
        if not self.has_principles:
            missing.append("Architectural Principles and Guidelines")
        if not self.has_conventions:
            missing.append("Coding Conventions")
        return missing


class UserInteractionAgent:
    def __init__(self):
        self.context = AnalysisContext()
        self.conversation_history = []
        
    def start_interaction(self) -> AnalysisContext:
        print("=" * 80)
        print("CODE ANALYSIS SYSTEM - CONTEXT GATHERING")
        print("=" * 80)
        print()
        print("Welcome to the Code Analysis System. To provide the most accurate and")
        print("relevant analysis, I need to gather information about your project context.")
        print("This will help me understand not just the code, but also its purpose,")
        print("constraints, and quality expectations.")
        print()
        
        self._gather_requirements()
        self._gather_business_goals()
        self._gather_architecture()
        self._gather_principles()
        self._gather_conventions()
        self._gather_code_location()
        
        self._display_context_summary()
        self._warn_about_missing_context()
        
        return self.context
    
    def _gather_requirements(self):
        print("-" * 80)
        print("REQUIREMENTS DOCUMENTATION")
        print("-" * 80)
        print("Requirements documentation describes what the software should do and why.")
        print("This includes functional requirements, non-functional requirements,")
        print("constraints, and acceptance criteria. Having this information allows")
        print("the analysis to verify that the code actually implements what was intended.")
        print()
        
        response = input("Do you have requirements documentation available? (yes/no): ").strip().lower()
        
        if response in ['yes', 'y']:
            self.context.has_requirements = True
            print()
            print("Please provide the requirements documentation:")
            print("  Option 1: Enter a file path to requirements document")
            print("  Option 2: Enter a URL to requirements")
            print("  Option 3: Type 'paste' to paste requirements text directly")
            print()
            
            req_input = input("Your choice: ").strip()
            
            if req_input.lower() == 'paste':
                print("Paste your requirements (press Ctrl+D or Ctrl+Z when done):")
                lines = []
                try:
                    while True:
                        line = input()
                        lines.append(line)
                except EOFError:
                    pass
                self.context.requirements = "\n".join(lines)
            elif os.path.exists(req_input):
                with open(req_input, 'r') as f:
                    self.context.requirements = f.read()
                print(f"Requirements loaded from {req_input}")
            else:
                self.context.requirements = req_input
                print("Requirements location recorded")
        else:
            self.context.has_requirements = False
            print()
            print("NOTE: Without requirements documentation, the analysis will evaluate")
            print("code quality, structure, and best practices, but cannot verify whether")
            print("the implementation fulfills its intended purpose.")
        
        print()
    
    def _gather_business_goals(self):
        print("-" * 80)
        print("BUSINESS GOALS")
        print("-" * 80)
        print("Business goals provide strategic context for the software. They help")
        print("prioritize findings based on what matters most to your organization.")
        print("For example, if time-to-market is critical, certain architectural")
        print("trade-offs might be acceptable that wouldn't be in a long-term product.")
        print()
        
        response = input("Do you have documented business goals? (yes/no): ").strip().lower()
        
        if response in ['yes', 'y']:
            self.context.has_business_goals = True
            print()
            print("Please describe the key business goals (one per line, press Enter twice when done):")
            goals = []
            while True:
                goal = input().strip()
                if not goal:
                    break
                goals.append(goal)
            self.context.business_goals = "\n".join(goals)
        else:
            self.context.has_business_goals = False
            print()
            print("NOTE: Without business goals, all findings will be weighted equally")
            print("based on general software engineering best practices.")
        
        print()
    
    def _gather_architecture(self):
        print("-" * 80)
        print("ARCHITECTURE DOCUMENTATION")
        print("-" * 80)
        print("Architecture documentation describes the high-level structure of your")
        print("system, including components, their responsibilities, and interactions.")
        print("Most importantly, Architectural Decision Records (ADRs) document why")
        print("certain design choices were made, preventing the analysis from flagging")
        print("intentional decisions as problems.")
        print()
        
        response = input("Do you have architecture documentation or ADRs? (yes/no): ").strip().lower()
        
        if response in ['yes', 'y']:
            self.context.has_architecture = True
            print()
            print("Please provide architecture documentation:")
            print("  Option 1: Enter a directory path containing ADRs")
            print("  Option 2: Enter a file path to architecture document")
            print("  Option 3: Type 'describe' to describe architecture briefly")
            print()
            
            arch_input = input("Your choice: ").strip()
            
            if arch_input.lower() == 'describe':
                print("Please describe your architecture (press Enter twice when done):")
                lines = []
                while True:
                    line = input().strip()
                    if not line:
                        break
                    lines.append(line)
                self.context.architecture_docs = "\n".join(lines)
            elif os.path.isdir(arch_input):
                adr_files = []
                for root, dirs, files in os.walk(arch_input):
                    for file in files:
                        if file.endswith(('.md', '.txt', '.adoc')):
                            adr_files.append(os.path.join(root, file))
                self.context.architecture_docs = f"ADR directory: {arch_input} ({len(adr_files)} documents)"
                print(f"Found {len(adr_files)} architecture documents")
            elif os.path.exists(arch_input):
                with open(arch_input, 'r') as f:
                    self.context.architecture_docs = f.read()
                print(f"Architecture documentation loaded from {arch_input}")
            else:
                self.context.architecture_docs = arch_input
        else:
            self.context.has_architecture = False
            print()
            print("NOTE: Without architecture documentation, the analysis cannot")
            print("distinguish between intentional design decisions and potential issues.")
        
        print()
    
    def _gather_principles(self):
        print("-" * 80)
        print("ARCHITECTURAL PRINCIPLES AND GUIDELINES")
        print("-" * 80)
        print("Architectural principles are the rules and guidelines your team follows.")
        print("These might include SOLID principles, specific design patterns,")
        print("microservices guidelines, domain-driven design principles, or")
        print("organization-specific standards.")
        print()
        
        response = input("Do you follow specific architectural principles? (yes/no): ").strip().lower()
        
        if response in ['yes', 'y']:
            self.context.has_principles = True
            print()
            print("Please list your architectural principles (one per line, press Enter twice when done):")
            print("Examples: SOLID, DRY, KISS, Microservices patterns, DDD, Clean Architecture")
            principles = []
            while True:
                principle = input().strip()
                if not principle:
                    break
                principles.append(principle)
            self.context.architectural_principles = principles
            print(f"Recorded {len(principles)} architectural principles")
        else:
            self.context.has_principles = False
            print()
            print("NOTE: The analysis will use general software engineering best practices")
            print("rather than project-specific guidelines.")
        
        print()
    
    def _gather_conventions(self):
        print("-" * 80)
        print("CODING CONVENTIONS")
        print("-" * 80)
        print("Coding conventions define how code should be written in your project.")
        print("This includes naming conventions, formatting rules, documentation")
        print("standards, error handling patterns, and testing requirements.")
        print()
        
        response = input("Do you have documented coding conventions? (yes/no): ").strip().lower()
        
        if response in ['yes', 'y']:
            self.context.has_conventions = True
            print()
            print("Please provide coding conventions:")
            print("  Option 1: Enter a file path to conventions document")
            print("  Option 2: Type 'describe' to describe key conventions")
            print()
            
            conv_input = input("Your choice: ").strip()
            
            if conv_input.lower() == 'describe':
                print("Please describe key coding conventions (press Enter twice when done):")
                lines = []
                while True:
                    line = input().strip()
                    if not line:
                        break
                    lines.append(line)
                self.context.coding_conventions = "\n".join(lines)
            elif os.path.exists(conv_input):
                with open(conv_input, 'r') as f:
                    self.context.coding_conventions = f.read()
                print(f"Coding conventions loaded from {conv_input}")
            else:
                self.context.coding_conventions = conv_input
        else:
            self.context.has_conventions = False
            print()
            print("NOTE: The analysis will use language-specific standard conventions")
            print("rather than your project-specific rules.")
        
        print()
    
    def _gather_code_location(self):
        print("-" * 80)
        print("CODE LOCATION")
        print("-" * 80)
        print("Please specify what code should be analyzed.")
        print()
        
        print("What would you like to analyze?")
        print("  1. A single file")
        print("  2. A directory")
        print("  3. A Git repository (local)")
        print("  4. A Git repository (remote URL)")
        print()
        
        choice = input("Enter choice (1-4): ").strip()
        
        if choice == '1':
            file_path = input("Enter file path: ").strip()
            if os.path.exists(file_path):
                self.context.code_location = file_path
                self.context.is_git_repo = False
                print(f"Will analyze file: {file_path}")
            else:
                print(f"ERROR: File not found: {file_path}")
                self._gather_code_location()
        
        elif choice == '2':
            dir_path = input("Enter directory path: ").strip()
            if os.path.isdir(dir_path):
                self.context.code_location = dir_path
                if os.path.isdir(os.path.join(dir_path, '.git')):
                    self.context.is_git_repo = True
                    print(f"Will analyze Git repository at: {dir_path}")
                else:
                    self.context.is_git_repo = False
                    print(f"Will analyze directory: {dir_path}")
            else:
                print(f"ERROR: Directory not found: {dir_path}")
                self._gather_code_location()
        
        elif choice == '3':
            repo_path = input("Enter local Git repository path: ").strip()
            if os.path.isdir(os.path.join(repo_path, '.git')):
                self.context.code_location = repo_path
                self.context.is_git_repo = True
                branch = input("Enter branch name (or press Enter for current branch): ").strip()
                if branch:
                    self.context.branch = branch
                print(f"Will analyze Git repository at: {repo_path}")
            else:
                print(f"ERROR: Not a Git repository: {repo_path}")
                self._gather_code_location()
        
        elif choice == '4':
            repo_url = input("Enter Git repository URL: ").strip()
            self.context.repository_url = repo_url
            self.context.is_git_repo = True
            branch = input("Enter branch name (or press Enter for default branch): ").strip()
            if branch:
                self.context.branch = branch
            print(f"Will clone and analyze repository: {repo_url}")
        
        else:
            print("Invalid choice. Please try again.")
            self._gather_code_location()
        
        print()
    
    def _display_context_summary(self):
        print("=" * 80)
        print("CONTEXT SUMMARY")
        print("=" * 80)
        print()
        print("The following context has been gathered:")
        print()
        
        if self.context.has_requirements:
            print("[X] Requirements Documentation - Available")
        else:
            print("[ ] Requirements Documentation - Not Available")
        
        if self.context.has_business_goals:
            print("[X] Business Goals - Available")
        else:
            print("[ ] Business Goals - Not Available")
        
        if self.context.has_architecture:
            print("[X] Architecture Documentation - Available")
        else:
            print("[ ] Architecture Documentation - Not Available")
        
        if self.context.has_principles:
            print(f"[X] Architectural Principles - {len(self.context.architectural_principles)} principles")
        else:
            print("[ ] Architectural Principles - Not Available")
        
        if self.context.has_conventions:
            print("[X] Coding Conventions - Available")
        else:
            print("[ ] Coding Conventions - Not Available")
        
        print()
        print(f"Code Location: {self.context.code_location or self.context.repository_url}")
        print(f"Repository Type: {'Git Repository' if self.context.is_git_repo else 'File/Directory'}")
        if self.context.branch:
            print(f"Branch: {self.context.branch}")
        
        print()
    
    def _warn_about_missing_context(self):
        missing = self.context.get_missing_context()
        
        if missing:
            print("=" * 80)
            print("IMPORTANT NOTICE")
            print("=" * 80)
            print()
            print("The following context is not available:")
            for item in missing:
                print(f"  - {item}")
            print()
            print("The analysis will proceed, but findings will be limited to:")
            print("  - Code quality and structure assessment")
            print("  - Detection of common anti-patterns and code smells")
            print("  - Adherence to general best practices")
            print()
            print("Without complete context, the analysis CANNOT verify:")
            print("  - Whether code fulfills intended requirements")
            print("  - Whether architectural decisions align with business goals")
            print("  - Whether code follows project-specific guidelines")
            print()
            
            proceed = input("Do you want to proceed with limited context? (yes/no): ").strip().lower()
            if proceed not in ['yes', 'y']:
                print("Analysis cancelled. Please gather additional context and try again.")
                exit(0)
        
        print()

This User Interaction Agent implementation demonstrates several critical design principles. First, it separates concerns by using a dedicated data class for storing context information. The AnalysisContext class uses Python dataclasses for clean, type-safe data storage. Second, it provides clear feedback to the user about what information is available and what is missing. Third, it offers multiple input methods to accommodate different workflows, whether users have files, URLs, or want to paste content directly. Fourth, it explicitly warns users about limitations when context is incomplete, setting appropriate expectations for the analysis results.

The agent's conversational approach is intentional. Rather than presenting a form to fill out, it explains why each piece of information matters. This educational aspect helps users understand that code analysis is not just about finding syntax errors but about evaluating code in its full business and technical context.

COMPONENT TWO: RAG STORAGE FOR ARCHITECTURAL DECISION RECORDS

Retrieval-Augmented Generation is essential for overcoming the context window limitation of large language models. When analyzing large codebases, we cannot fit all relevant information into a single prompt. Instead, we must store information in a searchable knowledge base and retrieve only the relevant portions when needed.

The first RAG component focuses on Architectural Decision Records. ADRs are crucial because they document the reasoning behind design choices. Without this historical context, an LLM might flag an intentional architectural decision as a problem. For example, if a team deliberately chose to use a monolithic architecture instead of microservices due to team size constraints, the analysis should not recommend splitting into microservices without understanding this context.

Our ADR storage system must support several operations. First, it must ingest ADR documents in various formats including Markdown, plain text, and AsciiDoc. Second, it must create embeddings of ADR content to enable semantic search. Third, it must retrieve relevant ADRs based on the code being analyzed. Fourth, it must maintain metadata about each ADR including decision date, status, and related decisions.

Here is the implementation of the ADR RAG system:

import os
import re
import hashlib
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import numpy as np


@dataclass
class ADRRecord:
    id: str
    title: str
    status: str
    date: Optional[datetime]
    context: str
    decision: str
    consequences: str
    full_text: str
    file_path: str
    embedding: Optional[np.ndarray] = None
    related_adrs: List[str] = None
    
    def __post_init__(self):
        if self.related_adrs is None:
            self.related_adrs = []


class ADRParser:
    def __init__(self):
        self.adr_pattern = re.compile(
            r'#\s*(.+?)\n.*?'
            r'(?:##\s*Status\s*\n\s*(.+?)\n)?'
            r'(?:##\s*Context\s*\n\s*(.+?)\n(?=##))?'
            r'(?:##\s*Decision\s*\n\s*(.+?)\n(?=##))?'
            r'(?:##\s*Consequences\s*\n\s*(.+?)(?:\n##|$))?',
            re.DOTALL | re.IGNORECASE
        )
    
    def parse_adr_file(self, file_path: str) -> Optional[ADRRecord]:
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            return self.parse_adr_content(content, file_path)
        except Exception as e:
            print(f"Error parsing ADR file {file_path}: {e}")
            return None
    
    def parse_adr_content(self, content: str, file_path: str = "") -> Optional[ADRRecord]:
        match = self.adr_pattern.search(content)
        
        if match:
            title = match.group(1).strip()
            status = match.group(2).strip() if match.group(2) else "Unknown"
            context = match.group(3).strip() if match.group(3) else ""
            decision = match.group(4).strip() if match.group(4) else ""
            consequences = match.group(5).strip() if match.group(5) else ""
        else:
            lines = content.split('\n')
            title = lines[0].strip('#').strip() if lines else "Untitled ADR"
            status = "Unknown"
            context = content
            decision = ""
            consequences = ""
        
        adr_id = self._generate_adr_id(title, file_path)
        
        date_match = re.search(r'(\d{4}-\d{2}-\d{2})', content)
        date = None
        if date_match:
            try:
                date = datetime.strptime(date_match.group(1), '%Y-%m-%d')
            except ValueError:
                pass
        
        related_adrs = self._extract_related_adrs(content)
        
        return ADRRecord(
            id=adr_id,
            title=title,
            status=status,
            date=date,
            context=context,
            decision=decision,
            consequences=consequences,
            full_text=content,
            file_path=file_path,
            related_adrs=related_adrs
        )
    
    def _generate_adr_id(self, title: str, file_path: str) -> str:
        content = f"{title}:{file_path}"
        return hashlib.md5(content.encode()).hexdigest()[:12]
    
    def _extract_related_adrs(self, content: str) -> List[str]:
        related = []
        patterns = [
            r'ADR[- ](\d+)',
            r'relates to ADR (\d+)',
            r'supersedes ADR (\d+)',
            r'superseded by ADR (\d+)'
        ]
        
        for pattern in patterns:
            matches = re.finditer(pattern, content, re.IGNORECASE)
            for match in matches:
                related.append(f"ADR-{match.group(1)}")
        
        return list(set(related))


class SimpleEmbedding:
    def __init__(self, dimension: int = 384):
        self.dimension = dimension
        np.random.seed(42)
    
    def embed_text(self, text: str) -> np.ndarray:
        words = text.lower().split()
        word_hashes = [hash(word) % self.dimension for word in words]
        
        embedding = np.zeros(self.dimension)
        for h in word_hashes:
            embedding[h] += 1.0
        
        norm = np.linalg.norm(embedding)
        if norm > 0:
            embedding = embedding / norm
        
        return embedding
    
    def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        return np.dot(vec1, vec2)


class ADRStorage:
    def __init__(self, storage_dir: str = "./adr_storage"):
        self.storage_dir = storage_dir
        self.adrs: Dict[str, ADRRecord] = {}
        self.parser = ADRParser()
        self.embedder = SimpleEmbedding()
        
        os.makedirs(storage_dir, exist_ok=True)
    
    def ingest_adr_directory(self, directory: str) -> int:
        count = 0
        
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith(('.md', '.txt', '.adoc')):
                    file_path = os.path.join(root, file)
                    if self.ingest_adr_file(file_path):
                        count += 1
        
        print(f"Ingested {count} ADR documents from {directory}")
        return count
    
    def ingest_adr_file(self, file_path: str) -> bool:
        adr = self.parser.parse_adr_file(file_path)
        
        if adr:
            adr.embedding = self._create_adr_embedding(adr)
            self.adrs[adr.id] = adr
            return True
        
        return False
    
    def ingest_adr_content(self, content: str, source: str = "inline") -> bool:
        adr = self.parser.parse_adr_content(content, source)
        
        if adr:
            adr.embedding = self._create_adr_embedding(adr)
            self.adrs[adr.id] = adr
            return True
        
        return False
    
    def _create_adr_embedding(self, adr: ADRRecord) -> np.ndarray:
        combined_text = f"{adr.title} {adr.context} {adr.decision} {adr.consequences}"
        return self.embedder.embed_text(combined_text)
    
    def retrieve_relevant_adrs(self, query: str, top_k: int = 5) -> List[Tuple[ADRRecord, float]]:
        if not self.adrs:
            return []
        
        query_embedding = self.embedder.embed_text(query)
        
        similarities = []
        for adr_id, adr in self.adrs.items():
            if adr.embedding is not None:
                similarity = self.embedder.cosine_similarity(query_embedding, adr.embedding)
                similarities.append((adr, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        return similarities[:top_k]
    
    def get_adr_by_id(self, adr_id: str) -> Optional[ADRRecord]:
        return self.adrs.get(adr_id)
    
    def get_all_adrs(self) -> List[ADRRecord]:
        return list(self.adrs.values())
    
    def get_related_adrs(self, adr_id: str) -> List[ADRRecord]:
        adr = self.get_adr_by_id(adr_id)
        if not adr:
            return []
        
        related = []
        for related_id in adr.related_adrs:
            related_adr = self.get_adr_by_id(related_id)
            if related_adr:
                related.append(related_adr)
        
        return related
    
    def save_storage(self):
        import pickle
        storage_file = os.path.join(self.storage_dir, "adr_storage.pkl")
        with open(storage_file, 'wb') as f:
            pickle.dump(self.adrs, f)
        print(f"ADR storage saved to {storage_file}")
    
    def load_storage(self):
        import pickle
        storage_file = os.path.join(self.storage_dir, "adr_storage.pkl")
        if os.path.exists(storage_file):
            with open(storage_file, 'rb') as f:
                self.adrs = pickle.load(f)
            print(f"Loaded {len(self.adrs)} ADRs from storage")
            return True
        return False

This ADR storage implementation provides several key capabilities. The ADRParser class handles the complexity of parsing ADR documents that may follow different formats. It uses regular expressions to extract structured sections like Status, Context, Decision, and Consequences, but also handles unstructured ADRs gracefully.

The SimpleEmbedding class provides a basic embedding mechanism. In a production system, you would replace this with a proper embedding model like sentence-transformers or OpenAI embeddings. However, this simple implementation demonstrates the concept: convert text to a numerical vector that captures semantic meaning, enabling similarity-based retrieval.

The ADRStorage class manages the collection of ADRs. It can ingest ADRs from directories, individual files, or direct content. It creates embeddings for each ADR and stores them for later retrieval. The retrieve_relevant_adrs method finds ADRs semantically similar to a query, which is crucial when analyzing code. For example, when analyzing a database access layer, the system can retrieve ADRs about database technology choices, transaction management decisions, or data modeling approaches.

The system also tracks relationships between ADRs. When one decision supersedes another or relates to another, these connections are preserved. This allows the analysis to understand the evolution of architectural thinking over time.

COMPONENT THREE: RAG STORAGE FOR SOURCE CODE FILES

The second RAG component stores information about source code files themselves. While we cannot store entire large files in the LLM context, we can store metadata, summaries, and structural information that helps the LLM understand the codebase organization.

This storage system must capture several types of information for each file. First, basic metadata including file path, size, language, and last modification date. Second, a high-level summary of what the file does. Third, key entities defined in the file such as classes, functions, and constants. Fourth, dependencies on other files or external libraries. Fifth, complexity metrics like lines of code, cyclomatic complexity, and nesting depth.

The system must also support incremental updates. When a file changes, we should update only that file's information rather than reprocessing the entire codebase. This is essential for integration with the Change Agent that monitors Git operations.

Here is the implementation of the source code file storage:

import os
import hashlib
from typing import List, Dict, Optional, Set, Any
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
import mimetypes


@dataclass
class FileMetadata:
    file_path: str
    relative_path: str
    file_size: int
    language: str
    last_modified: datetime
    content_hash: str
    lines_of_code: int
    blank_lines: int
    comment_lines: int
    
    
@dataclass
class CodeEntity:
    name: str
    entity_type: str
    line_number: int
    end_line: int
    signature: str
    docstring: Optional[str] = None
    complexity: int = 0
    parameters: List[str] = field(default_factory=list)
    return_type: Optional[str] = None


@dataclass
class FileDependency:
    dependency_type: str
    target: str
    line_number: int
    is_external: bool


@dataclass
class CodeFileRecord:
    metadata: FileMetadata
    summary: str
    entities: List[CodeEntity]
    dependencies: List[FileDependency]
    imports: List[str]
    exports: List[str]
    complexity_score: float
    maintainability_index: float
    embedding: Optional[np.ndarray] = None
    issues: List[str] = field(default_factory=list)
    
    def to_summary_text(self) -> str:
        text_parts = [
            f"File: {self.metadata.relative_path}",
            f"Language: {self.metadata.language}",
            f"Summary: {self.summary}",
            f"Lines of Code: {self.metadata.lines_of_code}",
            f"Entities: {len(self.entities)} ({', '.join(set(e.entity_type for e in self.entities))})",
            f"Dependencies: {len(self.dependencies)}",
            f"Complexity: {self.complexity_score:.2f}",
            f"Maintainability: {self.maintainability_index:.2f}"
        ]
        return "\n".join(text_parts)


class LanguageDetector:
    EXTENSION_MAP = {
        '.py': 'Python',
        '.js': 'JavaScript',
        '.ts': 'TypeScript',
        '.jsx': 'JavaScript',
        '.tsx': 'TypeScript',
        '.html': 'HTML',
        '.htm': 'HTML',
        '.css': 'CSS',
        '.java': 'Java',
        '.cpp': 'C++',
        '.c': 'C',
        '.h': 'C/C++ Header',
        '.cs': 'C#',
        '.go': 'Go',
        '.rs': 'Rust',
        '.rb': 'Ruby',
        '.php': 'PHP',
        '.swift': 'Swift',
        '.kt': 'Kotlin',
        '.scala': 'Scala',
        '.sql': 'SQL',
        '.sh': 'Shell',
        '.bash': 'Bash',
        '.xml': 'XML',
        '.json': 'JSON',
        '.yaml': 'YAML',
        '.yml': 'YAML',
        '.toml': 'TOML',
        '.ini': 'INI',
        '.conf': 'Config',
        '.md': 'Markdown',
        '.rst': 'reStructuredText'
    }
    
    @classmethod
    def detect_language(cls, file_path: str) -> str:
        ext = os.path.splitext(file_path)[1].lower()
        return cls.EXTENSION_MAP.get(ext, 'Unknown')


class CodeMetricsCalculator:
    @staticmethod
    def calculate_loc_metrics(content: str, language: str) -> Dict[str, int]:
        lines = content.split('\n')
        total_lines = len(lines)
        blank_lines = 0
        comment_lines = 0
        code_lines = 0
        
        comment_patterns = {
            'Python': ('#',),
            'JavaScript': ('//', '/*', '*'),
            'TypeScript': ('//', '/*', '*'),
            'Java': ('//', '/*', '*'),
            'C++': ('//', '/*', '*'),
            'C': ('//', '/*', '*'),
            'C#': ('//', '/*', '*'),
            'HTML': ('<!--',),
            'CSS': ('/*',),
            'SQL': ('--', '/*'),
            'Shell': ('#',),
            'Bash': ('#',),
        }
        
        patterns = comment_patterns.get(language, ('#', '//', '/*'))
        
        in_multiline_comment = False
        
        for line in lines:
            stripped = line.strip()
            
            if not stripped:
                blank_lines += 1
                continue
            
            if language in ['JavaScript', 'TypeScript', 'Java', 'C++', 'C', 'C#', 'CSS']:
                if '/*' in stripped:
                    in_multiline_comment = True
                if in_multiline_comment:
                    comment_lines += 1
                    if '*/' in stripped:
                        in_multiline_comment = False
                    continue
            
            is_comment = False
            for pattern in patterns:
                if stripped.startswith(pattern):
                    is_comment = True
                    break
            
            if is_comment:
                comment_lines += 1
            else:
                code_lines += 1
        
        return {
            'total': total_lines,
            'code': code_lines,
            'blank': blank_lines,
            'comment': comment_lines
        }
    
    @staticmethod
    def calculate_complexity_score(entities: List[CodeEntity]) -> float:
        if not entities:
            return 0.0
        
        total_complexity = sum(e.complexity for e in entities)
        avg_complexity = total_complexity / len(entities)
        
        return avg_complexity
    
    @staticmethod
    def calculate_maintainability_index(loc: int, complexity: float, comment_ratio: float) -> float:
        if loc == 0:
            return 100.0
        
        volume = loc * 4.2
        
        mi = 171 - 5.2 * np.log(volume) - 0.23 * complexity - 16.2 * np.log(loc)
        mi = mi + (50 * np.sin(np.sqrt(2.4 * comment_ratio)))
        
        mi = max(0, min(100, mi))
        
        return mi


class SourceCodeStorage:
    def __init__(self, storage_dir: str = "./code_storage"):
        self.storage_dir = storage_dir
        self.files: Dict[str, CodeFileRecord] = {}
        self.embedder = SimpleEmbedding()
        self.root_path: Optional[str] = None
        
        os.makedirs(storage_dir, exist_ok=True)
    
    def set_root_path(self, root_path: str):
        self.root_path = os.path.abspath(root_path)
    
    def ingest_directory(self, directory: str, extensions: Optional[Set[str]] = None) -> int:
        self.set_root_path(directory)
        count = 0
        
        if extensions is None:
            extensions = {'.py', '.js', '.ts', '.jsx', '.tsx', '.html', '.css', 
                         '.java', '.cpp', '.c', '.cs', '.go', '.rs', '.rb', '.php'}
        
        for root, dirs, files in os.walk(directory):
            dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ['node_modules', 'venv', '__pycache__']]
            
            for file in files:
                ext = os.path.splitext(file)[1].lower()
                if ext in extensions:
                    file_path = os.path.join(root, file)
                    if self.ingest_file(file_path):
                        count += 1
        
        print(f"Ingested {count} source code files from {directory}")
        return count
    
    def ingest_file(self, file_path: str) -> bool:
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
            
            return self.ingest_file_content(file_path, content)
        except Exception as e:
            print(f"Error ingesting file {file_path}: {e}")
            return False
    
    def ingest_file_content(self, file_path: str, content: str) -> bool:
        try:
            language = LanguageDetector.detect_language(file_path)
            
            if self.root_path:
                relative_path = os.path.relpath(file_path, self.root_path)
            else:
                relative_path = file_path
            
            content_hash = hashlib.md5(content.encode()).hexdigest()
            
            loc_metrics = CodeMetricsCalculator.calculate_loc_metrics(content, language)
            
            stat = os.stat(file_path) if os.path.exists(file_path) else None
            
            metadata = FileMetadata(
                file_path=file_path,
                relative_path=relative_path,
                file_size=len(content),
                language=language,
                last_modified=datetime.fromtimestamp(stat.st_mtime) if stat else datetime.now(),
                content_hash=content_hash,
                lines_of_code=loc_metrics['code'],
                blank_lines=loc_metrics['blank'],
                comment_lines=loc_metrics['comment']
            )
            
            entities = self._extract_entities_simple(content, language)
            dependencies = self._extract_dependencies_simple(content, language)
            imports = self._extract_imports(content, language)
            
            complexity = CodeMetricsCalculator.calculate_complexity_score(entities)
            
            comment_ratio = loc_metrics['comment'] / max(1, loc_metrics['total'])
            maintainability = CodeMetricsCalculator.calculate_maintainability_index(
                loc_metrics['code'], complexity, comment_ratio
            )
            
            summary = self._generate_summary(metadata, entities, dependencies)
            
            record = CodeFileRecord(
                metadata=metadata,
                summary=summary,
                entities=entities,
                dependencies=dependencies,
                imports=imports,
                exports=[],
                complexity_score=complexity,
                maintainability_index=maintainability
            )
            
            record.embedding = self.embedder.embed_text(record.to_summary_text())
            
            self.files[file_path] = record
            
            return True
        except Exception as e:
            print(f"Error processing file content for {file_path}: {e}")
            return False
    
    def _extract_entities_simple(self, content: str, language: str) -> List[CodeEntity]:
        entities = []
        
        if language == 'Python':
            entities.extend(self._extract_python_entities(content))
        elif language in ['JavaScript', 'TypeScript']:
            entities.extend(self._extract_js_entities(content))
        
        return entities
    
    def _extract_python_entities(self, content: str) -> List[CodeEntity]:
        entities = []
        lines = content.split('\n')
        
        class_pattern = re.compile(r'^\s*class\s+(\w+).*:')
        func_pattern = re.compile(r'^\s*def\s+(\w+)\s*\((.*?)\)')
        
        for i, line in enumerate(lines):
            class_match = class_pattern.match(line)
            if class_match:
                entities.append(CodeEntity(
                    name=class_match.group(1),
                    entity_type='class',
                    line_number=i + 1,
                    end_line=i + 1,
                    signature=line.strip(),
                    complexity=1
                ))
            
            func_match = func_pattern.match(line)
            if func_match:
                params = [p.strip() for p in func_match.group(2).split(',') if p.strip()]
                entities.append(CodeEntity(
                    name=func_match.group(1),
                    entity_type='function',
                    line_number=i + 1,
                    end_line=i + 1,
                    signature=line.strip(),
                    parameters=params,
                    complexity=1
                ))
        
        return entities
    
    def _extract_js_entities(self, content: str) -> List[CodeEntity]:
        entities = []
        lines = content.split('\n')
        
        class_pattern = re.compile(r'^\s*class\s+(\w+)')
        func_pattern = re.compile(r'^\s*(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s*)?\()')
        
        for i, line in enumerate(lines):
            class_match = class_pattern.match(line)
            if class_match:
                entities.append(CodeEntity(
                    name=class_match.group(1),
                    entity_type='class',
                    line_number=i + 1,
                    end_line=i + 1,
                    signature=line.strip(),
                    complexity=1
                ))
            
            func_match = func_pattern.match(line)
            if func_match:
                name = func_match.group(1) or func_match.group(2)
                if name:
                    entities.append(CodeEntity(
                        name=name,
                        entity_type='function',
                        line_number=i + 1,
                        end_line=i + 1,
                        signature=line.strip(),
                        complexity=1
                    ))
        
        return entities
    
    def _extract_dependencies_simple(self, content: str, language: str) -> List[FileDependency]:
        dependencies = []
        lines = content.split('\n')
        
        if language == 'Python':
            import_pattern = re.compile(r'^\s*(?:from\s+(\S+)\s+)?import\s+(.+)')
            for i, line in enumerate(lines):
                match = import_pattern.match(line)
                if match:
                    module = match.group(1) or match.group(2).split(',')[0].strip()
                    is_external = not module.startswith('.')
                    dependencies.append(FileDependency(
                        dependency_type='import',
                        target=module,
                        line_number=i + 1,
                        is_external=is_external
                    ))
        
        elif language in ['JavaScript', 'TypeScript']:
            import_pattern = re.compile(r'^\s*import\s+.*?from\s+[\'"](.+?)[\'"]')
            require_pattern = re.compile(r'require\([\'"](.+?)[\'"]\)')
            
            for i, line in enumerate(lines):
                import_match = import_pattern.match(line)
                if import_match:
                    module = import_match.group(1)
                    is_external = not module.startswith('.')
                    dependencies.append(FileDependency(
                        dependency_type='import',
                        target=module,
                        line_number=i + 1,
                        is_external=is_external
                    ))
                
                require_matches = require_pattern.finditer(line)
                for match in require_matches:
                    module = match.group(1)
                    is_external = not module.startswith('.')
                    dependencies.append(FileDependency(
                        dependency_type='require',
                        target=module,
                        line_number=i + 1,
                        is_external=is_external
                    ))
        
        return dependencies
    
    def _extract_imports(self, content: str, language: str) -> List[str]:
        imports = []
        
        for dep in self._extract_dependencies_simple(content, language):
            imports.append(dep.target)
        
        return list(set(imports))
    
    def _generate_summary(self, metadata: FileMetadata, entities: List[CodeEntity], 
                         dependencies: List[FileDependency]) -> str:
        entity_types = {}
        for entity in entities:
            entity_types[entity.entity_type] = entity_types.get(entity.entity_type, 0) + 1
        
        entity_desc = ', '.join([f"{count} {etype}(s)" for etype, count in entity_types.items()])
        
        dep_count = len(dependencies)
        
        summary = f"A {metadata.language} file with {metadata.lines_of_code} lines of code. "
        if entity_desc:
            summary += f"Contains {entity_desc}. "
        if dep_count > 0:
            summary += f"Has {dep_count} dependencies."
        
        return summary
    
    def retrieve_relevant_files(self, query: str, top_k: int = 10) -> List[Tuple[CodeFileRecord, float]]:
        if not self.files:
            return []
        
        query_embedding = self.embedder.embed_text(query)
        
        similarities = []
        for file_path, record in self.files.items():
            if record.embedding is not None:
                similarity = self.embedder.cosine_similarity(query_embedding, record.embedding)
                similarities.append((record, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        return similarities[:top_k]
    
    def get_file_by_path(self, file_path: str) -> Optional[CodeFileRecord]:
        return self.files.get(file_path)
    
    def get_files_by_language(self, language: str) -> List[CodeFileRecord]:
        return [record for record in self.files.values() 
                if record.metadata.language == language]
    
    def get_all_files(self) -> List[CodeFileRecord]:
        return list(self.files.values())
    
    def update_file(self, file_path: str, new_content: str) -> bool:
        return self.ingest_file_content(file_path, new_content)
    
    def save_storage(self):
        import pickle
        storage_file = os.path.join(self.storage_dir, "code_storage.pkl")
        with open(storage_file, 'wb') as f:
            pickle.dump(self.files, f)
        print(f"Code storage saved to {storage_file}")
    
    def load_storage(self):
        import pickle
        storage_file = os.path.join(self.storage_dir, "code_storage.pkl")
        if os.path.exists(storage_file):
            with open(storage_file, 'rb') as f:
                self.files = pickle.load(f)
            print(f"Loaded {len(self.files)} code files from storage")
            return True
        return False

This source code storage implementation provides comprehensive file-level analysis capabilities. The LanguageDetector identifies programming languages based on file extensions, supporting a wide range of languages. The CodeMetricsCalculator computes various metrics including lines of code broken down by type, complexity scores, and maintainability indices.

The entity extraction methods use regular expressions to identify classes and functions. In a production system, these would be replaced with proper Abstract Syntax Tree parsing, which we will implement in a later component. However, this regex-based approach demonstrates the concept and works reasonably well for simple cases.

The system tracks both internal and external dependencies. Internal dependencies are imports from other files in the same project, while external dependencies are third-party libraries. This distinction is important for understanding the project's coupling and external risk surface.

The maintainability index calculation uses a simplified version of the Microsoft maintainability index formula. This metric combines code volume, complexity, and comment ratio to produce a score from zero to one hundred, where higher scores indicate more maintainable code.

COMPONENT FOUR: RAG STORAGE FOR PROJECT CONTEXT

The third RAG component stores broader project context including domain knowledge, meeting notes, architectural principles, and coding guidelines. This information provides the semantic context that helps the LLM understand not just what the code does, but why it exists and how it should evolve.

Domain knowledge is particularly important. A function named calculate_premium might be perfectly clear to insurance domain experts but opaque to the LLM without context. Storing domain glossaries, business process descriptions, and domain models allows the analysis to use correct terminology and understand business logic.

Meeting notes capture decisions made outside of formal documentation. Often, important architectural choices are discussed in meetings but never written down formally. By indexing meeting notes, we can retrieve relevant discussions when analyzing related code.

Architectural principles define the rules and guidelines that code should follow. These might be general principles like SOLID or project-specific rules like "all database access must go through the repository layer" or "services must be stateless."

Here is the implementation of project context storage:

from typing import List, Dict, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import re


class ContextType(Enum):
    DOMAIN_KNOWLEDGE = "domain_knowledge"
    MEETING_NOTES = "meeting_notes"
    ARCHITECTURAL_PRINCIPLE = "architectural_principle"
    CODING_GUIDELINE = "coding_guideline"
    BUSINESS_PROCESS = "business_process"
    GLOSSARY_TERM = "glossary_term"
    REQUIREMENT = "requirement"
    CONSTRAINT = "constraint"


@dataclass
class ContextEntry:
    id: str
    context_type: ContextType
    title: str
    content: str
    created_date: datetime
    tags: List[str]
    related_entries: List[str] = field(default_factory=list)
    source: Optional[str] = None
    embedding: Optional[np.ndarray] = None
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_text(self) -> str:
        return f"{self.title}\n{self.content}"


@dataclass
class ArchitecturalPrinciple:
    name: str
    description: str
    rationale: str
    examples: List[str]
    anti_patterns: List[str]
    applicability: str
    priority: str
    
    def to_context_entry(self) -> ContextEntry:
        content = f"""
Description: {self.description}

Rationale: {self.rationale}

Applicability: {self.applicability}

Priority: {self.priority}

Examples:
{chr(10).join('- ' + ex for ex in self.examples)}

Anti-patterns to avoid:
{chr(10).join('- ' + ap for ap in self.anti_patterns)}
"""
        
        return ContextEntry(
            id=f"principle_{self.name.lower().replace(' ', '_')}",
            context_type=ContextType.ARCHITECTURAL_PRINCIPLE,
            title=self.name,
            content=content.strip(),
            created_date=datetime.now(),
            tags=['architecture', 'principle'],
            metadata={'priority': self.priority}
        )


@dataclass
class CodingGuideline:
    name: str
    description: str
    applies_to: List[str]
    good_examples: List[str]
    bad_examples: List[str]
    severity: str
    
    def to_context_entry(self) -> ContextEntry:
        content = f"""
Description: {self.description}

Applies to: {', '.join(self.applies_to)}

Severity: {self.severity}

Good Examples:
{chr(10).join('```' + chr(10) + ex + chr(10) + '```' for ex in self.good_examples)}

Bad Examples:
{chr(10).join('```' + chr(10) + ex + chr(10) + '```' for ex in self.bad_examples)}
"""
        
        return ContextEntry(
            id=f"guideline_{self.name.lower().replace(' ', '_')}",
            context_type=ContextType.CODING_GUIDELINE,
            title=self.name,
            content=content.strip(),
            created_date=datetime.now(),
            tags=['coding', 'guideline'] + self.applies_to,
            metadata={'severity': self.severity}
        )


@dataclass
class GlossaryTerm:
    term: str
    definition: str
    synonyms: List[str]
    related_terms: List[str]
    domain: str
    
    def to_context_entry(self) -> ContextEntry:
        content = f"""
Definition: {self.definition}

Domain: {self.domain}

Synonyms: {', '.join(self.synonyms) if self.synonyms else 'None'}

Related Terms: {', '.join(self.related_terms) if self.related_terms else 'None'}
"""
        
        return ContextEntry(
            id=f"term_{self.term.lower().replace(' ', '_')}",
            context_type=ContextType.GLOSSARY_TERM,
            title=self.term,
            content=content.strip(),
            created_date=datetime.now(),
            tags=['glossary', 'domain', self.domain],
            metadata={'domain': self.domain}
        )


class ProjectContextStorage:
    def __init__(self, storage_dir: str = "./context_storage"):
        self.storage_dir = storage_dir
        self.entries: Dict[str, ContextEntry] = {}
        self.embedder = SimpleEmbedding()
        self.tag_index: Dict[str, Set[str]] = {}
        self.type_index: Dict[ContextType, Set[str]] = {}
        
        os.makedirs(storage_dir, exist_ok=True)
    
    def add_entry(self, entry: ContextEntry) -> bool:
        try:
            entry.embedding = self.embedder.embed_text(entry.to_text())
            
            self.entries[entry.id] = entry
            
            for tag in entry.tags:
                if tag not in self.tag_index:
                    self.tag_index[tag] = set()
                self.tag_index[tag].add(entry.id)
            
            if entry.context_type not in self.type_index:
                self.type_index[entry.context_type] = set()
            self.type_index[entry.context_type].add(entry.id)
            
            return True
        except Exception as e:
            print(f"Error adding context entry {entry.id}: {e}")
            return False
    
    def add_principle(self, principle: ArchitecturalPrinciple) -> bool:
        entry = principle.to_context_entry()
        return self.add_entry(entry)
    
    def add_guideline(self, guideline: CodingGuideline) -> bool:
        entry = guideline.to_context_entry()
        return self.add_entry(entry)
    
    def add_glossary_term(self, term: GlossaryTerm) -> bool:
        entry = term.to_context_entry()
        return self.add_entry(entry)
    
    def add_meeting_notes(self, title: str, content: str, date: datetime, 
                         attendees: List[str], tags: List[str]) -> bool:
        entry = ContextEntry(
            id=f"meeting_{date.strftime('%Y%m%d')}_{title.lower().replace(' ', '_')}",
            context_type=ContextType.MEETING_NOTES,
            title=title,
            content=content,
            created_date=date,
            tags=tags + ['meeting'],
            metadata={'attendees': attendees}
        )
        return self.add_entry(entry)
    
    def add_domain_knowledge(self, title: str, content: str, domain: str, tags: List[str]) -> bool:
        entry = ContextEntry(
            id=f"domain_{title.lower().replace(' ', '_')}",
            context_type=ContextType.DOMAIN_KNOWLEDGE,
            title=title,
            content=content,
            created_date=datetime.now(),
            tags=tags + ['domain', domain],
            metadata={'domain': domain}
        )
        return self.add_entry(entry)
    
    def retrieve_relevant_context(self, query: str, top_k: int = 5, 
                                  context_types: Optional[List[ContextType]] = None,
                                  tags: Optional[List[str]] = None) -> List[Tuple[ContextEntry, float]]:
        candidate_ids = set(self.entries.keys())
        
        if context_types:
            type_ids = set()
            for ctx_type in context_types:
                type_ids.update(self.type_index.get(ctx_type, set()))
            candidate_ids = candidate_ids.intersection(type_ids)
        
        if tags:
            tag_ids = set()
            for tag in tags:
                tag_ids.update(self.tag_index.get(tag, set()))
            candidate_ids = candidate_ids.intersection(tag_ids)
        
        if not candidate_ids:
            return []
        
        query_embedding = self.embedder.embed_text(query)
        
        similarities = []
        for entry_id in candidate_ids:
            entry = self.entries[entry_id]
            if entry.embedding is not None:
                similarity = self.embedder.cosine_similarity(query_embedding, entry.embedding)
                similarities.append((entry, similarity))
        
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        return similarities[:top_k]
    
    def get_all_principles(self) -> List[ContextEntry]:
        principle_ids = self.type_index.get(ContextType.ARCHITECTURAL_PRINCIPLE, set())
        return [self.entries[pid] for pid in principle_ids]
    
    def get_all_guidelines(self) -> List[ContextEntry]:
        guideline_ids = self.type_index.get(ContextType.CODING_GUIDELINE, set())
        return [self.entries[gid] for gid in guideline_ids]
    
    def get_glossary(self) -> List[ContextEntry]:
        term_ids = self.type_index.get(ContextType.GLOSSARY_TERM, set())
        return [self.entries[tid] for tid in term_ids]
    
    def get_entries_by_tag(self, tag: str) -> List[ContextEntry]:
        entry_ids = self.tag_index.get(tag, set())
        return [self.entries[eid] for eid in entry_ids]
    
    def load_solid_principles(self):
        principles = [
            ArchitecturalPrinciple(
                name="Single Responsibility Principle",
                description="A class should have only one reason to change",
                rationale="When a class has multiple responsibilities, changes to one responsibility may affect the others, leading to fragile code",
                examples=[
                    "A User class should only handle user data, not user persistence or validation",
                    "Separate UserRepository for database operations, UserValidator for validation"
                ],
                anti_patterns=[
                    "God classes that do everything",
                    "Classes mixing business logic with infrastructure concerns"
                ],
                applicability="All object-oriented code",
                priority="High"
            ),
            ArchitecturalPrinciple(
                name="Open-Closed Principle",
                description="Software entities should be open for extension but closed for modification",
                rationale="Modifying existing code risks breaking existing functionality; extension through inheritance or composition is safer",
                examples=[
                    "Use strategy pattern to add new behaviors without modifying existing code",
                    "Use plugin architectures for extensibility"
                ],
                anti_patterns=[
                    "Long if-else chains that require modification for new cases",
                    "Switch statements on type codes"
                ],
                applicability="All object-oriented code",
                priority="High"
            ),
            ArchitecturalPrinciple(
                name="Liskov Substitution Principle",
                description="Subtypes must be substitutable for their base types",
                rationale="Violating this principle breaks polymorphism and leads to unexpected behavior",
                examples=[
                    "A Square should not inherit from Rectangle if it violates rectangle invariants",
                    "Derived classes should not throw new exceptions not thrown by base class"
                ],
                anti_patterns=[
                    "Derived classes that weaken preconditions or strengthen postconditions",
                    "Derived classes that remove functionality from base class"
                ],
                applicability="All inheritance hierarchies",
                priority="Medium"
            ),
            ArchitecturalPrinciple(
                name="Interface Segregation Principle",
                description="Clients should not be forced to depend on interfaces they do not use",
                rationale="Large interfaces create unnecessary coupling and make code harder to test and maintain",
                examples=[
                    "Split large interfaces into smaller, focused ones",
                    "Use role interfaces instead of header interfaces"
                ],
                anti_patterns=[
                    "Fat interfaces with many unrelated methods",
                    "Interfaces that force implementations to throw NotImplementedException"
                ],
                applicability="All interface design",
                priority="Medium"
            ),
            ArchitecturalPrinciple(
                name="Dependency Inversion Principle",
                description="High-level modules should not depend on low-level modules; both should depend on abstractions",
                rationale="Depending on concrete implementations creates tight coupling and makes code hard to test and change",
                examples=[
                    "Depend on interfaces rather than concrete classes",
                    "Use dependency injection to provide implementations"
                ],
                anti_patterns=[
                    "Direct instantiation of dependencies with 'new'",
                    "Hard-coded dependencies on specific implementations"
                ],
                applicability="All layered architectures",
                priority="High"
            )
        ]
        
        for principle in principles:
            self.add_principle(principle)
        
        print(f"Loaded {len(principles)} SOLID principles")
    
    def save_storage(self):
        import pickle
        storage_file = os.path.join(self.storage_dir, "context_storage.pkl")
        with open(storage_file, 'wb') as f:
            pickle.dump({
                'entries': self.entries,
                'tag_index': self.tag_index,
                'type_index': self.type_index
            }, f)
        print(f"Context storage saved to {storage_file}")
    
    def load_storage(self):
        import pickle
        storage_file = os.path.join(self.storage_dir, "context_storage.pkl")
        if os.path.exists(storage_file):
            with open(storage_file, 'rb') as f:
                data = pickle.load(f)
                self.entries = data['entries']
                self.tag_index = data['tag_index']
                self.type_index = data['type_index']
            print(f"Loaded {len(self.entries)} context entries from storage")
            return True
        return False

This project context storage implementation provides structured storage for different types of contextual information. The ContextEntry class serves as a universal container, while specialized classes like ArchitecturalPrinciple, CodingGuideline, and GlossaryTerm provide domain-specific structure.

The system maintains multiple indices for efficient retrieval. The tag index allows finding all entries with specific tags. The type index groups entries by their context type. This multi-index approach enables both broad queries like "find all architectural principles" and narrow queries like "find domain knowledge about payment processing."

The load_solid_principles method demonstrates how to populate the system with standard architectural principles. In a real project, you would extend this to load project-specific principles, guidelines, and domain knowledge from configuration files or databases.

The retrieval method supports filtering by context type and tags before performing semantic search. This allows queries like "find coding guidelines related to error handling" or "find meeting notes about database architecture."

COMPONENT FIVE: GRAPHRAG FOR CODE RELATIONSHIPS

Graph-based Retrieval-Augmented Generation represents code relationships as a graph structure, enabling sophisticated queries about code dependencies, call chains, inheritance hierarchies, and impact analysis. While traditional RAG stores documents independently, GraphRAG captures the connections between code elements.

The graph consists of nodes representing code entities such as files, classes, functions, methods, variables, and modules. Edges represent relationships such as imports, calls, inherits-from, implements, uses, defines, and contains. Each node and edge can have attributes storing additional information like source location, parameters, return types, and access modifiers.

This graph structure enables powerful queries. We can find all callers of a function to understand impact. We can trace inheritance chains to understand polymorphic behavior. We can identify circular dependencies. We can calculate coupling metrics by analyzing connection density. We can find orphaned code by identifying disconnected subgraphs.

Here is the implementation of the GraphRAG system:

from typing import List, Dict, Optional, Set, Tuple, Any
from dataclasses import dataclass, field
from enum import Enum
import json


class NodeType(Enum):
    FILE = "file"
    MODULE = "module"
    CLASS = "class"
    FUNCTION = "function"
    METHOD = "method"
    VARIABLE = "variable"
    CONSTANT = "constant"
    INTERFACE = "interface"
    ENUM = "enum"
    NAMESPACE = "namespace"


class EdgeType(Enum):
    IMPORTS = "imports"
    CALLS = "calls"
    INHERITS = "inherits"
    IMPLEMENTS = "implements"
    USES = "uses"
    DEFINES = "defines"
    CONTAINS = "contains"
    REFERENCES = "references"
    OVERRIDES = "overrides"
    DEPENDS_ON = "depends_on"


@dataclass
class GraphNode:
    id: str
    node_type: NodeType
    name: str
    qualified_name: str
    file_path: str
    line_number: int
    end_line: int
    attributes: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'id': self.id,
            'type': self.node_type.value,
            'name': self.name,
            'qualified_name': self.qualified_name,
            'file_path': self.file_path,
            'line_number': self.line_number,
            'end_line': self.end_line,
            'attributes': self.attributes
        }


@dataclass
class GraphEdge:
    source_id: str
    target_id: str
    edge_type: EdgeType
    attributes: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'source': self.source_id,
            'target': self.target_id,
            'type': self.edge_type.value,
            'attributes': self.attributes
        }


class CodeGraph:
    def __init__(self):
        self.nodes: Dict[str, GraphNode] = {}
        self.edges: List[GraphEdge] = []
        self.adjacency_list: Dict[str, List[Tuple[str, EdgeType]]] = {}
        self.reverse_adjacency_list: Dict[str, List[Tuple[str, EdgeType]]] = {}
        self.node_type_index: Dict[NodeType, Set[str]] = {}
        self.file_index: Dict[str, Set[str]] = {}
    
    def add_node(self, node: GraphNode) -> bool:
        if node.id in self.nodes:
            return False
        
        self.nodes[node.id] = node
        
        if node.node_type not in self.node_type_index:
            self.node_type_index[node.node_type] = set()
        self.node_type_index[node.node_type].add(node.id)
        
        if node.file_path not in self.file_index:
            self.file_index[node.file_path] = set()
        self.file_index[node.file_path].add(node.id)
        
        return True
    
    def add_edge(self, edge: GraphEdge) -> bool:
        if edge.source_id not in self.nodes or edge.target_id not in self.nodes:
            return False
        
        self.edges.append(edge)
        
        if edge.source_id not in self.adjacency_list:
            self.adjacency_list[edge.source_id] = []
        self.adjacency_list[edge.source_id].append((edge.target_id, edge.edge_type))
        
        if edge.target_id not in self.reverse_adjacency_list:
            self.reverse_adjacency_list[edge.target_id] = []
        self.reverse_adjacency_list[edge.target_id].append((edge.source_id, edge.edge_type))
        
        return True
    
    def get_node(self, node_id: str) -> Optional[GraphNode]:
        return self.nodes.get(node_id)
    
    def get_nodes_by_type(self, node_type: NodeType) -> List[GraphNode]:
        node_ids = self.node_type_index.get(node_type, set())
        return [self.nodes[nid] for nid in node_ids]
    
    def get_nodes_in_file(self, file_path: str) -> List[GraphNode]:
        node_ids = self.file_index.get(file_path, set())
        return [self.nodes[nid] for nid in node_ids]
    
    def get_outgoing_edges(self, node_id: str, edge_type: Optional[EdgeType] = None) -> List[Tuple[GraphNode, EdgeType]]:
        if node_id not in self.adjacency_list:
            return []
        
        edges = self.adjacency_list[node_id]
        
        if edge_type:
            edges = [(target, etype) for target, etype in edges if etype == edge_type]
        
        return [(self.nodes[target], etype) for target, etype in edges]
    
    def get_incoming_edges(self, node_id: str, edge_type: Optional[EdgeType] = None) -> List[Tuple[GraphNode, EdgeType]]:
        if node_id not in self.reverse_adjacency_list:
            return []
        
        edges = self.reverse_adjacency_list[node_id]
        
        if edge_type:
            edges = [(source, etype) for source, etype in edges if etype == edge_type]
        
        return [(self.nodes[source], etype) for source, etype in edges]
    
    def find_callers(self, function_id: str) -> List[GraphNode]:
        incoming = self.get_incoming_edges(function_id, EdgeType.CALLS)
        return [node for node, _ in incoming]
    
    def find_callees(self, function_id: str) -> List[GraphNode]:
        outgoing = self.get_outgoing_edges(function_id, EdgeType.CALLS)
        return [node for node, _ in outgoing]
    
    def find_inheritance_chain(self, class_id: str) -> List[GraphNode]:
        chain = []
        current = class_id
        
        visited = set()
        
        while current and current not in visited:
            visited.add(current)
            node = self.get_node(current)
            if node:
                chain.append(node)
            
            parents = self.get_outgoing_edges(current, EdgeType.INHERITS)
            if parents:
                current = parents[0][0].id
            else:
                current = None
        
        return chain
    
    def find_descendants(self, class_id: str) -> List[GraphNode]:
        descendants = []
        
        def traverse(node_id: str, visited: Set[str]):
            if node_id in visited:
                return
            visited.add(node_id)
            
            children = self.get_incoming_edges(node_id, EdgeType.INHERITS)
            for child_node, _ in children:
                descendants.append(child_node)
                traverse(child_node.id, visited)
        
        traverse(class_id, set())
        return descendants
    
    def find_dependencies(self, node_id: str, max_depth: int = 3) -> List[Tuple[GraphNode, int]]:
        dependencies = []
        visited = set()
        
        def traverse(current_id: str, depth: int):
            if depth > max_depth or current_id in visited:
                return
            
            visited.add(current_id)
            
            deps = self.get_outgoing_edges(current_id, EdgeType.DEPENDS_ON)
            for dep_node, _ in deps:
                dependencies.append((dep_node, depth))
                traverse(dep_node.id, depth + 1)
        
        traverse(node_id, 1)
        return dependencies
    
    def find_circular_dependencies(self) -> List[List[str]]:
        cycles = []
        visited = set()
        rec_stack = set()
        
        def dfs(node_id: str, path: List[str]) -> bool:
            visited.add(node_id)
            rec_stack.add(node_id)
            path.append(node_id)
            
            for neighbor_id, _ in self.adjacency_list.get(node_id, []):
                if neighbor_id not in visited:
                    if dfs(neighbor_id, path.copy()):
                        return True
                elif neighbor_id in rec_stack:
                    cycle_start = path.index(neighbor_id)
                    cycle = path[cycle_start:] + [neighbor_id]
                    cycles.append(cycle)
                    return True
            
            rec_stack.remove(node_id)
            return False
        
        for node_id in self.nodes:
            if node_id not in visited:
                dfs(node_id, [])
        
        return cycles
    
    def calculate_coupling(self, node_id: str) -> Dict[str, int]:
        efferent = len(self.get_outgoing_edges(node_id))
        afferent = len(self.get_incoming_edges(node_id))
        
        instability = efferent / (efferent + afferent) if (efferent + afferent) > 0 else 0
        
        return {
            'efferent_coupling': efferent,
            'afferent_coupling': afferent,
            'instability': instability
        }
    
    def find_orphaned_nodes(self) -> List[GraphNode]:
        orphaned = []
        
        for node_id, node in self.nodes.items():
            has_incoming = node_id in self.reverse_adjacency_list and len(self.reverse_adjacency_list[node_id]) > 0
            has_outgoing = node_id in self.adjacency_list and len(self.adjacency_list[node_id]) > 0
            
            if not has_incoming and not has_outgoing:
                orphaned.append(node)
        
        return orphaned
    
    def export_to_json(self, file_path: str):
        data = {
            'nodes': [node.to_dict() for node in self.nodes.values()],
            'edges': [edge.to_dict() for edge in self.edges]
        }
        
        with open(file_path, 'w') as f:
            json.dump(data, f, indent=2)
        
        print(f"Graph exported to {file_path}")
    
    def get_statistics(self) -> Dict[str, Any]:
        node_counts = {}
        for node_type in NodeType:
            node_counts[node_type.value] = len(self.node_type_index.get(node_type, set()))
        
        edge_counts = {}
        for edge in self.edges:
            edge_counts[edge.edge_type.value] = edge_counts.get(edge.edge_type.value, 0) + 1
        
        return {
            'total_nodes': len(self.nodes),
            'total_edges': len(self.edges),
            'nodes_by_type': node_counts,
            'edges_by_type': edge_counts,
            'files': len(self.file_index)
        }


class GraphBuilder:
    def __init__(self, code_storage: SourceCodeStorage):
        self.code_storage = code_storage
        self.graph = CodeGraph()
        self.node_id_map: Dict[str, str] = {}
    
    def build_graph(self) -> CodeGraph:
        for file_record in self.code_storage.get_all_files():
            self._process_file(file_record)
        
        for file_record in self.code_storage.get_all_files():
            self._process_dependencies(file_record)
        
        return self.graph
    
    def _process_file(self, file_record: CodeFileRecord):
        file_node_id = self._create_node_id(file_record.metadata.file_path, 'file')
        
        file_node = GraphNode(
            id=file_node_id,
            node_type=NodeType.FILE,
            name=os.path.basename(file_record.metadata.file_path),
            qualified_name=file_record.metadata.relative_path,
            file_path=file_record.metadata.file_path,
            line_number=1,
            end_line=file_record.metadata.lines_of_code,
            attributes={
                'language': file_record.metadata.language,
                'size': file_record.metadata.file_size,
                'complexity': file_record.complexity_score,
                'maintainability': file_record.maintainability_index
            }
        )
        
        self.graph.add_node(file_node)
        
        for entity in file_record.entities:
            entity_node_id = self._create_node_id(
                file_record.metadata.file_path,
                entity.entity_type,
                entity.name,
                entity.line_number
            )
            
            node_type = self._map_entity_type(entity.entity_type)
            
            entity_node = GraphNode(
                id=entity_node_id,
                node_type=node_type,
                name=entity.name,
                qualified_name=f"{file_record.metadata.relative_path}::{entity.name}",
                file_path=file_record.metadata.file_path,
                line_number=entity.line_number,
                end_line=entity.end_line,
                attributes={
                    'signature': entity.signature,
                    'complexity': entity.complexity,
                    'parameters': entity.parameters,
                    'return_type': entity.return_type
                }
            )
            
            self.graph.add_node(entity_node)
            
            contains_edge = GraphEdge(
                source_id=file_node_id,
                target_id=entity_node_id,
                edge_type=EdgeType.CONTAINS
            )
            self.graph.add_edge(contains_edge)
    
    def _process_dependencies(self, file_record: CodeFileRecord):
        file_node_id = self._create_node_id(file_record.metadata.file_path, 'file')
        
        for dependency in file_record.dependencies:
            if not dependency.is_external:
                target_file = self._resolve_import(dependency.target, file_record.metadata.file_path)
                
                if target_file:
                    target_node_id = self._create_node_id(target_file, 'file')
                    
                    if target_node_id in self.graph.nodes:
                        import_edge = GraphEdge(
                            source_id=file_node_id,
                            target_id=target_node_id,
                            edge_type=EdgeType.IMPORTS,
                            attributes={
                                'line_number': dependency.line_number,
                                'import_name': dependency.target
                            }
                        )
                        self.graph.add_edge(import_edge)
    
    def _create_node_id(self, file_path: str, *parts) -> str:
        components = [file_path] + [str(p) for p in parts]
        id_string = "::".join(components)
        return hashlib.md5(id_string.encode()).hexdigest()[:16]
    
    def _map_entity_type(self, entity_type: str) -> NodeType:
        mapping = {
            'class': NodeType.CLASS,
            'function': NodeType.FUNCTION,
            'method': NodeType.METHOD,
            'variable': NodeType.VARIABLE,
            'constant': NodeType.CONSTANT,
            'interface': NodeType.INTERFACE,
            'enum': NodeType.ENUM
        }
        return mapping.get(entity_type.lower(), NodeType.FUNCTION)
    
    def _resolve_import(self, import_path: str, source_file: str) -> Optional[str]:
        if import_path.startswith('.'):
            source_dir = os.path.dirname(source_file)
            
            parts = import_path.split('.')
            level = len([p for p in parts if p == ''])
            module_parts = [p for p in parts if p]
            
            target_dir = source_dir
            for _ in range(level - 1):
                target_dir = os.path.dirname(target_dir)
            
            if module_parts:
                target_file = os.path.join(target_dir, *module_parts) + '.py'
                if os.path.exists(target_file):
                    return target_file
                
                target_file = os.path.join(target_dir, *module_parts, '__init__.py')
                if os.path.exists(target_file):
                    return target_file
        
        return None

This GraphRAG implementation provides comprehensive graph-based code analysis capabilities. The CodeGraph class maintains the graph structure with efficient indices for different query patterns. The node type index enables quick retrieval of all classes or all functions. The file index enables quick retrieval of all entities in a specific file. The adjacency lists enable efficient traversal of relationships.

The graph supports sophisticated queries. The find_callers and find_callees methods enable call graph analysis. The find_inheritance_chain method traces the inheritance hierarchy from a class up to its root ancestor. The find_descendants method finds all classes that inherit from a given class, enabling impact analysis when modifying a base class.

The find_circular_dependencies method detects dependency cycles, which often indicate architectural problems. The calculate_coupling method computes efferent coupling (outgoing dependencies) and afferent coupling (incoming dependencies), along with the instability metric that indicates how likely a component is to change.

The GraphBuilder class constructs the graph from the source code storage. It creates nodes for files and entities, then creates edges for containment relationships and dependencies. The import resolution logic handles relative imports in Python, converting them to absolute file paths.

In a production system, this graph would be populated by the Abstract Syntax Tree parsers, which we will implement next. The AST parsers provide more accurate relationship information than simple regex-based parsing.

COMPONENT SIX: ABSTRACT SYNTAX TREE PARSING

Abstract Syntax Tree parsing is essential for accurate code analysis. While regular expressions can identify basic patterns, they cannot understand code structure deeply. AST parsing converts source code into a tree structure that represents the syntactic structure of the code, enabling precise analysis of control flow, data flow, and semantic relationships.

Different programming languages require different AST parsers. Python has the built-in ast module. JavaScript and TypeScript can use parsers like esprima, acorn, or the TypeScript compiler API. Java can use JavaParser or Eclipse JDT. For our implementation, we will focus on Python, JavaScript, and TypeScript, demonstrating how to integrate AST parsing for each language.

The AST parser must extract several types of information. First, all defined entities including classes, functions, methods, variables, and constants with their exact locations. Second, all relationships including function calls, class inheritance, interface implementation, and variable usage. Third, control flow information including conditionals, loops, and exception handling. Fourth, complexity metrics including cyclomatic complexity, nesting depth, and cognitive complexity.

Here is the implementation of AST parsing for Python:

import ast
from typing import List, Dict, Optional, Set, Tuple, Any
from dataclasses import dataclass, field


@dataclass
class ASTEntity:
    name: str
    entity_type: str
    line_number: int
    end_line: int
    column: int
    end_column: int
    signature: str
    docstring: Optional[str] = None
    decorators: List[str] = field(default_factory=list)
    parameters: List[Dict[str, Any]] = field(default_factory=list)
    return_annotation: Optional[str] = None
    is_async: bool = False
    is_abstract: bool = False
    access_modifier: str = "public"
    complexity: int = 1


@dataclass
class ASTRelationship:
    source: str
    target: str
    relationship_type: str
    line_number: int
    context: Optional[str] = None


class PythonASTParser:
    def __init__(self):
        self.entities: List[ASTEntity] = []
        self.relationships: List[ASTRelationship] = []
        self.imports: List[Dict[str, Any]] = []
        self.current_class: Optional[str] = None
        self.current_function: Optional[str] = None
    
    def parse_file(self, file_path: str) -> Tuple[List[ASTEntity], List[ASTRelationship], List[Dict[str, Any]]]:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        return self.parse_code(content, file_path)
    
    def parse_code(self, code: str, source: str = "<string>") -> Tuple[List[ASTEntity], List[ASTRelationship], List[Dict[str, Any]]]:
        self.entities = []
        self.relationships = []
        self.imports = []
        self.current_class = None
        self.current_function = None
        
        try:
            tree = ast.parse(code, filename=source)
            self.visit_node(tree)
        except SyntaxError as e:
            print(f"Syntax error parsing {source}: {e}")
        
        return self.entities, self.relationships, self.imports
    
    def visit_node(self, node: ast.AST, parent: Optional[ast.AST] = None):
        method_name = f'visit_{node.__class__.__name__}'
        visitor = getattr(self, method_name, self.generic_visit)
        visitor(node, parent)
    
    def generic_visit(self, node: ast.AST, parent: Optional[ast.AST] = None):
        for child in ast.iter_child_nodes(node):
            self.visit_node(child, node)
    
    def visit_ClassDef(self, node: ast.ClassDef, parent: Optional[ast.AST] = None):
        decorators = [self._get_decorator_name(dec) for dec in node.decorator_list]
        
        is_abstract = any('ABC' in dec or 'abstract' in dec.lower() for dec in decorators)
        
        docstring = ast.get_docstring(node)
        
        bases = [self._get_name(base) for base in node.bases]
        
        entity = ASTEntity(
            name=node.name,
            entity_type='class',
            line_number=node.lineno,
            end_line=node.end_lineno or node.lineno,
            column=node.col_offset,
            end_column=node.end_col_offset or 0,
            signature=f"class {node.name}({', '.join(bases)})",
            docstring=docstring,
            decorators=decorators,
            is_abstract=is_abstract,
            complexity=self._calculate_class_complexity(node)
        )
        
        self.entities.append(entity)
        
        for base in bases:
            self.relationships.append(ASTRelationship(
                source=node.name,
                target=base,
                relationship_type='inherits',
                line_number=node.lineno
            ))
        
        prev_class = self.current_class
        self.current_class = node.name
        
        for child in node.body:
            self.visit_node(child, node)
        
        self.current_class = prev_class
    
    def visit_FunctionDef(self, node: ast.FunctionDef, parent: Optional[ast.AST] = None):
        self._process_function(node, is_async=False)
    
    def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef, parent: Optional[ast.AST] = None):
        self._process_function(node, is_async=True)
    
    def _process_function(self, node, is_async: bool):
        decorators = [self._get_decorator_name(dec) for dec in node.decorator_list]
        
        docstring = ast.get_docstring(node)
        
        parameters = []
        for arg in node.args.args:
            param_info = {
                'name': arg.arg,
                'annotation': self._get_annotation(arg.annotation),
                'default': None
            }
            parameters.append(param_info)
        
        defaults_offset = len(node.args.args) - len(node.args.defaults)
        for i, default in enumerate(node.args.defaults):
            parameters[defaults_offset + i]['default'] = ast.unparse(default)
        
        return_annotation = self._get_annotation(node.returns)
        
        entity_type = 'method' if self.current_class else 'function'
        
        access_modifier = 'private' if node.name.startswith('_') else 'public'
        
        param_str = ', '.join([p['name'] for p in parameters])
        signature = f"{'async ' if is_async else ''}def {node.name}({param_str})"
        if return_annotation:
            signature += f" -> {return_annotation}"
        
        complexity = self._calculate_function_complexity(node)
        
        entity = ASTEntity(
            name=node.name,
            entity_type=entity_type,
            line_number=node.lineno,
            end_line=node.end_lineno or node.lineno,
            column=node.col_offset,
            end_column=node.end_col_offset or 0,
            signature=signature,
            docstring=docstring,
            decorators=decorators,
            parameters=parameters,
            return_annotation=return_annotation,
            is_async=is_async,
            access_modifier=access_modifier,
            complexity=complexity
        )
        
        self.entities.append(entity)
        
        prev_function = self.current_function
        self.current_function = node.name
        
        for child in node.body:
            self.visit_node(child, node)
        
        self.current_function = prev_function
    
    def visit_Import(self, node: ast.Import, parent: Optional[ast.AST] = None):
        for alias in node.names:
            self.imports.append({
                'type': 'import',
                'module': alias.name,
                'alias': alias.asname,
                'line_number': node.lineno
            })
    
    def visit_ImportFrom(self, node: ast.ImportFrom, parent: Optional[ast.AST] = None):
        module = node.module or ''
        for alias in node.names:
            self.imports.append({
                'type': 'from_import',
                'module': module,
                'name': alias.name,
                'alias': alias.asname,
                'level': node.level,
                'line_number': node.lineno
            })
    
    def visit_Call(self, node: ast.Call, parent: Optional[ast.AST] = None):
        func_name = self._get_name(node.func)
        
        if self.current_function and func_name:
            self.relationships.append(ASTRelationship(
                source=self.current_function,
                target=func_name,
                relationship_type='calls',
                line_number=node.lineno,
                context=self.current_class
            ))
        
        self.generic_visit(node, parent)
    
    def _get_decorator_name(self, decorator: ast.expr) -> str:
        if isinstance(decorator, ast.Name):
            return decorator.id
        elif isinstance(decorator, ast.Call):
            return self._get_name(decorator.func)
        else:
            return ast.unparse(decorator)
    
    def _get_annotation(self, annotation: Optional[ast.expr]) -> Optional[str]:
        if annotation is None:
            return None
        return ast.unparse(annotation)
    
    def _get_name(self, node: ast.expr) -> str:
        if isinstance(node, ast.Name):
            return node.id
        elif isinstance(node, ast.Attribute):
            return f"{self._get_name(node.value)}.{node.attr}"
        elif isinstance(node, ast.Call):
            return self._get_name(node.func)
        else:
            return ast.unparse(node)
    
    def _calculate_function_complexity(self, node: ast.FunctionDef) -> int:
        complexity = 1
        
        for child in ast.walk(node):
            if isinstance(child, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
                complexity += 1
            elif isinstance(child, ast.BoolOp):
                complexity += len(child.values) - 1
        
        return complexity
    
    def _calculate_class_complexity(self, node: ast.ClassDef) -> int:
        total_complexity = 0
        method_count = 0
        
        for child in node.body:
            if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)):
                total_complexity += self._calculate_function_complexity(child)
                method_count += 1
        
        return total_complexity if method_count == 0 else total_complexity // method_count


class JavaScriptASTParser:
    def __init__(self):
        self.entities: List[ASTEntity] = []
        self.relationships: List[ASTRelationship] = []
        self.imports: List[Dict[str, Any]] = []
    
    def parse_file(self, file_path: str) -> Tuple[List[ASTEntity], List[ASTRelationship], List[Dict[str, Any]]]:
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        return self.parse_code(content, file_path)
    
    def parse_code(self, code: str, source: str = "<string>") -> Tuple[List[ASTEntity], List[ASTRelationship], List[Dict[str, Any]]]:
        self.entities = []
        self.relationships = []
        self.imports = []
        
        lines = code.split('\n')
        
        class_pattern = re.compile(r'^\s*class\s+(\w+)(?:\s+extends\s+(\w+))?')
        func_pattern = re.compile(r'^\s*(?:async\s+)?(?:function\s+(\w+)|const\s+(\w+)\s*=\s*(?:async\s*)?\(|(\w+)\s*:\s*(?:async\s*)?\()')
        import_pattern = re.compile(r'^\s*import\s+(.+?)\s+from\s+[\'"](.+?)[\'"]')
        require_pattern = re.compile(r'(?:const|let|var)\s+(\w+)\s*=\s*require\([\'"](.+?)[\'"]\)')
        
        for i, line in enumerate(lines):
            class_match = class_pattern.match(line)
            if class_match:
                class_name = class_match.group(1)
                parent_class = class_match.group(2)
                
                entity = ASTEntity(
                    name=class_name,
                    entity_type='class',
                    line_number=i + 1,
                    end_line=i + 1,
                    column=0,
                    end_column=len(line),
                    signature=line.strip(),
                    complexity=1
                )
                self.entities.append(entity)
                
                if parent_class:
                    self.relationships.append(ASTRelationship(
                        source=class_name,
                        target=parent_class,
                        relationship_type='inherits',
                        line_number=i + 1
                    ))
            
            func_match = func_pattern.match(line)
            if func_match:
                func_name = func_match.group(1) or func_match.group(2) or func_match.group(3)
                if func_name:
                    is_async = 'async' in line
                    
                    entity = ASTEntity(
                        name=func_name,
                        entity_type='function',
                        line_number=i + 1,
                        end_line=i + 1,
                        column=0,
                        end_column=len(line),
                        signature=line.strip(),
                        is_async=is_async,
                        complexity=1
                    )
                    self.entities.append(entity)
            
            import_match = import_pattern.match(line)
            if import_match:
                imports_part = import_match.group(1)
                module = import_match.group(2)
                
                self.imports.append({
                    'type': 'import',
                    'module': module,
                    'imports': imports_part,
                    'line_number': i + 1
                })
            
            require_match = require_pattern.match(line)
            if require_match:
                var_name = require_match.group(1)
                module = require_match.group(2)
                
                self.imports.append({
                    'type': 'require',
                    'module': module,
                    'alias': var_name,
                    'line_number': i + 1
                })
        
        return self.entities, self.relationships, self.imports


class ASTParserFactory:
    @staticmethod
    def get_parser(language: str):
        parsers = {
            'Python': PythonASTParser,
            'JavaScript': JavaScriptASTParser,
            'TypeScript': JavaScriptASTParser
        }
        
        parser_class = parsers.get(language)
        if parser_class:
            return parser_class()
        
        return None


class EnhancedCodeStorage(SourceCodeStorage):
    def __init__(self, storage_dir: str = "./code_storage"):
        super().__init__(storage_dir)
        self.ast_cache: Dict[str, Tuple[List[ASTEntity], List[ASTRelationship], List[Dict[str, Any]]]] = {}
    
    def ingest_file_with_ast(self, file_path: str) -> bool:
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
            
            language = LanguageDetector.detect_language(file_path)
            
            parser = ASTParserFactory.get_parser(language)
            
            if parser:
                entities, relationships, imports = parser.parse_code(content, file_path)
                
                self.ast_cache[file_path] = (entities, relationships, imports)
                
                code_entities = []
                for ast_entity in entities:
                    code_entity = CodeEntity(
                        name=ast_entity.name,
                        entity_type=ast_entity.entity_type,
                        line_number=ast_entity.line_number,
                        end_line=ast_entity.end_line,
                        signature=ast_entity.signature,
                        docstring=ast_entity.docstring,
                        complexity=ast_entity.complexity,
                        parameters=[p['name'] for p in ast_entity.parameters],
                        return_type=ast_entity.return_annotation
                    )
                    code_entities.append(code_entity)
                
                if self.root_path:
                    relative_path = os.path.relpath(file_path, self.root_path)
                else:
                    relative_path = file_path
                
                content_hash = hashlib.md5(content.encode()).hexdigest()
                
                loc_metrics = CodeMetricsCalculator.calculate_loc_metrics(content, language)
                
                stat = os.stat(file_path)
                
                metadata = FileMetadata(
                    file_path=file_path,
                    relative_path=relative_path,
                    file_size=len(content),
                    language=language,
                    last_modified=datetime.fromtimestamp(stat.st_mtime),
                    content_hash=content_hash,
                    lines_of_code=loc_metrics['code'],
                    blank_lines=loc_metrics['blank'],
                    comment_lines=loc_metrics['comment']
                )
                
                dependencies = []
                for imp in imports:
                    dep = FileDependency(
                        dependency_type=imp['type'],
                        target=imp.get('module', ''),
                        line_number=imp['line_number'],
                        is_external=not imp.get('module', '').startswith('.')
                    )
                    dependencies.append(dep)
                
                complexity = CodeMetricsCalculator.calculate_complexity_score(code_entities)
                
                comment_ratio = loc_metrics['comment'] / max(1, loc_metrics['total'])
                maintainability = CodeMetricsCalculator.calculate_maintainability_index(
                    loc_metrics['code'], complexity, comment_ratio
                )
                
                summary = self._generate_summary(metadata, code_entities, dependencies)
                
                record = CodeFileRecord(
                    metadata=metadata,
                    summary=summary,
                    entities=code_entities,
                    dependencies=dependencies,
                    imports=[imp.get('module', '') for imp in imports],
                    exports=[],
                    complexity_score=complexity,
                    maintainability_index=maintainability
                )
                
                record.embedding = self.embedder.embed_text(record.to_summary_text())
                
                self.files[file_path] = record
                
                return True
            else:
                return self.ingest_file_content(file_path, content)
        
        except Exception as e:
            print(f"Error ingesting file with AST {file_path}: {e}")
            return False
    
    def get_ast_data(self, file_path: str) -> Optional[Tuple[List[ASTEntity], List[ASTRelationship], List[Dict[str, Any]]]]:
        return self.ast_cache.get(file_path)

This AST parsing implementation provides deep code understanding capabilities. The PythonASTParser uses Python's built-in ast module to parse Python code into an abstract syntax tree, then traverses the tree to extract entities and relationships. It handles classes, functions, async functions, decorators, type annotations, and docstrings.

The parser calculates cyclomatic complexity by counting decision points in the code. Each if statement, while loop, for loop, and exception handler adds one to the complexity. Boolean operators also increase complexity because they represent additional decision points.

The JavaScriptASTParser uses a simpler regex-based approach because we are not importing a full JavaScript parser library in this example. In a production system, you would use a proper JavaScript parser like esprima or babel-parser to get accurate AST parsing.

The ASTParserFactory provides a unified interface for getting the appropriate parser for each language. This makes it easy to add support for additional languages by implementing new parser classes and registering them in the factory.

The EnhancedCodeStorage class extends the basic SourceCodeStorage with AST parsing capabilities. When ingesting files, it uses the AST parser if available, falling back to the simpler regex-based parsing for unsupported languages.

The AST data is cached separately from the code file records, allowing us to retrieve detailed AST information when needed for deep analysis while keeping the main storage compact.

COMPONENT SEVEN: SPECIALIZED ANALYSIS AGENTS

Specialized analysis agents focus on specific aspects of code quality, each using domain-specific knowledge and techniques. Rather than having a single monolithic analyzer, we create multiple focused agents that can work in parallel and combine their findings.

The first specialized agent is the Code Smell Detector, which identifies anti-patterns and design issues. Code smells are indicators of deeper problems in the code. Examples include long methods, large classes, duplicate code, long parameter lists, feature envy, inappropriate intimacy, and data clumps.

The second specialized agent is the Metrics Analyzer, which calculates quantitative measures of code quality. Metrics include cyclomatic complexity, cognitive complexity, lines of code, comment density, coupling metrics, cohesion metrics, and inheritance depth.

The third specialized agent is the Security Analyzer, which identifies potential security vulnerabilities. This includes SQL injection risks, cross-site scripting vulnerabilities, insecure deserialization, hardcoded credentials, and insufficient input validation.

The fourth specialized agent is the Performance Analyzer, which identifies performance issues. This includes inefficient algorithms, unnecessary computations, resource leaks, blocking operations in async code, and N plus one query problems.

The fifth specialized agent is the Architecture Analyzer, which evaluates high-level design decisions. This includes layer violations, circular dependencies, violation of architectural principles, and inconsistent patterns.

Here is the implementation of specialized analysis agents:

from typing import List, Dict, Optional, Set, Tuple, Any
from dataclasses import dataclass, field
from enum import Enum


class IssueSeverity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"


class IssueCategory(Enum):
    CODE_SMELL = "code_smell"
    SECURITY = "security"
    PERFORMANCE = "performance"
    MAINTAINABILITY = "maintainability"
    ARCHITECTURE = "architecture"
    BEST_PRACTICE = "best_practice"


@dataclass
class CodeIssue:
    id: str
    category: IssueCategory
    severity: IssueSeverity
    title: str
    description: str
    file_path: str
    line_number: int
    end_line: Optional[int] = None
    code_snippet: Optional[str] = None
    recommendation: Optional[str] = None
    references: List[str] = field(default_factory=list)
    confidence: float = 1.0
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'id': self.id,
            'category': self.category.value,
            'severity': self.severity.value,
            'title': self.title,
            'description': self.description,
            'file': self.file_path,
            'line': self.line_number,
            'end_line': self.end_line,
            'snippet': self.code_snippet,
            'recommendation': self.recommendation,
            'references': self.references,
            'confidence': self.confidence
        }


class CodeSmellDetector:
    def __init__(self):
        self.issues: List[CodeIssue] = []
        self.smell_thresholds = {
            'long_method_lines': 50,
            'long_method_complexity': 10,
            'large_class_methods': 20,
            'large_class_lines': 500,
            'long_parameter_list': 5,
            'deep_nesting': 4,
            'duplicate_code_lines': 6
        }
    
    def analyze_file(self, file_record: CodeFileRecord, ast_data: Optional[Tuple] = None) -> List[CodeIssue]:
        self.issues = []
        
        self._detect_long_methods(file_record, ast_data)
        self._detect_large_classes(file_record, ast_data)
        self._detect_long_parameter_lists(file_record, ast_data)
        self._detect_god_classes(file_record)
        self._detect_feature_envy(file_record, ast_data)
        self._detect_duplicate_code(file_record)
        
        return self.issues
    
    def _detect_long_methods(self, file_record: CodeFileRecord, ast_data: Optional[Tuple]):
        for entity in file_record.entities:
            if entity.entity_type in ['function', 'method']:
                lines = entity.end_line - entity.line_number
                
                if lines > self.smell_thresholds['long_method_lines']:
                    severity = IssueSeverity.HIGH if lines > 100 else IssueSeverity.MEDIUM
                    
                    issue = CodeIssue(
                        id=f"long_method_{file_record.metadata.file_path}_{entity.line_number}",
                        category=IssueCategory.CODE_SMELL,
                        severity=severity,
                        title="Long Method",
                        description=f"Method '{entity.name}' is {lines} lines long, exceeding the recommended maximum of {self.smell_thresholds['long_method_lines']} lines.",
                        file_path=file_record.metadata.file_path,
                        line_number=entity.line_number,
                        end_line=entity.end_line,
                        recommendation="Consider breaking this method into smaller, more focused methods. Each method should do one thing well.",
                        references=["Clean Code by Robert Martin", "Refactoring by Martin Fowler"]
                    )
                    self.issues.append(issue)
                
                if entity.complexity > self.smell_thresholds['long_method_complexity']:
                    severity = IssueSeverity.HIGH if entity.complexity > 20 else IssueSeverity.MEDIUM
                    
                    issue = CodeIssue(
                        id=f"complex_method_{file_record.metadata.file_path}_{entity.line_number}",
                        category=IssueCategory.CODE_SMELL,
                        severity=severity,
                        title="High Complexity Method",
                        description=f"Method '{entity.name}' has cyclomatic complexity of {entity.complexity}, exceeding the recommended maximum of {self.smell_thresholds['long_method_complexity']}.",
                        file_path=file_record.metadata.file_path,
                        line_number=entity.line_number,
                        end_line=entity.end_line,
                        recommendation="Reduce complexity by extracting conditional logic into separate methods or using polymorphism instead of conditionals.",
                        references=["Code Complete by Steve McConnell"]
                    )
                    self.issues.append(issue)
    
    def _detect_large_classes(self, file_record: CodeFileRecord, ast_data: Optional[Tuple]):
        classes = [e for e in file_record.entities if e.entity_type == 'class']
        
        for cls in classes:
            methods = [e for e in file_record.entities if e.entity_type == 'method']
            
            if len(methods) > self.smell_thresholds['large_class_methods']:
                issue = CodeIssue(
                    id=f"large_class_{file_record.metadata.file_path}_{cls.line_number}",
                    category=IssueCategory.CODE_SMELL,
                    severity=IssueSeverity.MEDIUM,
                    title="Large Class",
                    description=f"Class '{cls.name}' has {len(methods)} methods, exceeding the recommended maximum of {self.smell_thresholds['large_class_methods']}.",
                    file_path=file_record.metadata.file_path,
                    line_number=cls.line_number,
                    recommendation="Consider splitting this class into multiple smaller classes, each with a single responsibility.",
                    references=["Single Responsibility Principle"]
                )
                self.issues.append(issue)
    
    def _detect_long_parameter_lists(self, file_record: CodeFileRecord, ast_data: Optional[Tuple]):
        for entity in file_record.entities:
            if entity.entity_type in ['function', 'method']:
                param_count = len(entity.parameters)
                
                if param_count > self.smell_thresholds['long_parameter_list']:
                    issue = CodeIssue(
                        id=f"long_params_{file_record.metadata.file_path}_{entity.line_number}",
                        category=IssueCategory.CODE_SMELL,
                        severity=IssueSeverity.MEDIUM,
                        title="Long Parameter List",
                        description=f"Function '{entity.name}' has {param_count} parameters, exceeding the recommended maximum of {self.smell_thresholds['long_parameter_list']}.",
                        file_path=file_record.metadata.file_path,
                        line_number=entity.line_number,
                        recommendation="Consider introducing a parameter object or builder pattern to group related parameters.",
                        references=["Refactoring by Martin Fowler"]
                    )
                    self.issues.append(issue)
    
    def _detect_god_classes(self, file_record: CodeFileRecord):
        if file_record.metadata.lines_of_code > self.smell_thresholds['large_class_lines']:
            classes = [e for e in file_record.entities if e.entity_type == 'class']
            
            if classes:
                issue = CodeIssue(
                    id=f"god_class_{file_record.metadata.file_path}",
                    category=IssueCategory.CODE_SMELL,
                    severity=IssueSeverity.HIGH,
                    title="God Class",
                    description=f"File contains {file_record.metadata.lines_of_code} lines, suggesting a God Class that knows or does too much.",
                    file_path=file_record.metadata.file_path,
                    line_number=1,
                    recommendation="Decompose this class into multiple smaller classes, each with a focused responsibility.",
                    references=["Single Responsibility Principle", "Clean Code"]
                )
                self.issues.append(issue)
    
    def _detect_feature_envy(self, file_record: CodeFileRecord, ast_data: Optional[Tuple]):
        pass
    
    def _detect_duplicate_code(self, file_record: CodeFileRecord):
        pass


class SecurityAnalyzer:
    def __init__(self):
        self.issues: List[CodeIssue] = []
        self.security_patterns = {
            'sql_injection': [
                r'execute\s*\(\s*["\'].*\%s',
                r'execute\s*\(\s*f["\']',
                r'\.raw\s*\(',
            ],
            'xss': [
                r'innerHTML\s*=',
                r'document\.write\s*\(',
                r'eval\s*\(',
            ],
            'hardcoded_secrets': [
                r'password\s*=\s*["\'][^"\']+["\']',
                r'api[_-]?key\s*=\s*["\'][^"\']+["\']',
                r'secret\s*=\s*["\'][^"\']+["\']',
                r'token\s*=\s*["\'][^"\']+["\']',
            ],
            'insecure_random': [
                r'random\.random\s*\(',
                r'Math\.random\s*\(',
            ],
        }
    
    def analyze_file(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        self.issues = []
        
        lines = content.split('\n')
        
        self._detect_sql_injection(lines, file_record)
        self._detect_xss(lines, file_record)
        self._detect_hardcoded_secrets(lines, file_record)
        self._detect_insecure_random(lines, file_record)
        
        return self.issues
    
    def _detect_sql_injection(self, lines: List[str], file_record: CodeFileRecord):
        for i, line in enumerate(lines):
            for pattern in self.security_patterns['sql_injection']:
                if re.search(pattern, line, re.IGNORECASE):
                    issue = CodeIssue(
                        id=f"sql_injection_{file_record.metadata.file_path}_{i+1}",
                        category=IssueCategory.SECURITY,
                        severity=IssueSeverity.CRITICAL,
                        title="Potential SQL Injection",
                        description="SQL query uses string formatting or concatenation, which may be vulnerable to SQL injection attacks.",
                        file_path=file_record.metadata.file_path,
                        line_number=i + 1,
                        code_snippet=line.strip(),
                        recommendation="Use parameterized queries or an ORM to prevent SQL injection. Never concatenate user input into SQL queries.",
                        references=["OWASP Top 10", "CWE-89"]
                    )
                    self.issues.append(issue)
    
    def _detect_xss(self, lines: List[str], file_record: CodeFileRecord):
        for i, line in enumerate(lines):
            for pattern in self.security_patterns['xss']:
                if re.search(pattern, line, re.IGNORECASE):
                    issue = CodeIssue(
                        id=f"xss_{file_record.metadata.file_path}_{i+1}",
                        category=IssueCategory.SECURITY,
                        severity=IssueSeverity.HIGH,
                        title="Potential Cross-Site Scripting (XSS)",
                        description="Code uses potentially unsafe DOM manipulation that could lead to XSS vulnerabilities.",
                        file_path=file_record.metadata.file_path,
                        line_number=i + 1,
                        code_snippet=line.strip(),
                        recommendation="Use safe DOM manipulation methods like textContent instead of innerHTML. Always sanitize user input.",
                        references=["OWASP Top 10", "CWE-79"]
                    )
                    self.issues.append(issue)
    
    def _detect_hardcoded_secrets(self, lines: List[str], file_record: CodeFileRecord):
        for i, line in enumerate(lines):
            for pattern in self.security_patterns['hardcoded_secrets']:
                if re.search(pattern, line, re.IGNORECASE):
                    if 'example' not in line.lower() and 'test' not in line.lower():
                        issue = CodeIssue(
                            id=f"hardcoded_secret_{file_record.metadata.file_path}_{i+1}",
                            category=IssueCategory.SECURITY,
                            severity=IssueSeverity.CRITICAL,
                            title="Hardcoded Secret",
                            description="Code contains what appears to be a hardcoded password, API key, or secret.",
                            file_path=file_record.metadata.file_path,
                            line_number=i + 1,
                            code_snippet="[REDACTED]",
                            recommendation="Never hardcode secrets in source code. Use environment variables or a secrets management system.",
                            references=["OWASP Top 10", "CWE-798"]
                        )
                        self.issues.append(issue)
    
    def _detect_insecure_random(self, lines: List[str], file_record: CodeFileRecord):
        for i, line in enumerate(lines):
            for pattern in self.security_patterns['insecure_random']:
                if re.search(pattern, line):
                    if 'crypto' not in line.lower() and 'security' not in line.lower():
                        issue = CodeIssue(
                            id=f"insecure_random_{file_record.metadata.file_path}_{i+1}",
                            category=IssueCategory.SECURITY,
                            severity=IssueSeverity.MEDIUM,
                            title="Insecure Random Number Generation",
                            description="Code uses non-cryptographic random number generation which may be predictable.",
                            file_path=file_record.metadata.file_path,
                            line_number=i + 1,
                            code_snippet=line.strip(),
                            recommendation="For security-sensitive operations, use cryptographically secure random number generators like secrets module in Python or crypto.getRandomValues in JavaScript.",
                            references=["CWE-330"]
                        )
                        self.issues.append(issue)


class PerformanceAnalyzer:
    def __init__(self):
        self.issues: List[CodeIssue] = []
    
    def analyze_file(self, file_record: CodeFileRecord, content: str, ast_data: Optional[Tuple] = None) -> List[CodeIssue]:
        self.issues = []
        
        lines = content.split('\n')
        
        self._detect_nested_loops(lines, file_record)
        self._detect_inefficient_string_concat(lines, file_record)
        self._detect_blocking_in_async(lines, file_record)
        
        return self.issues
    
    def _detect_nested_loops(self, lines: List[str], file_record: CodeFileRecord):
        loop_stack = []
        
        for i, line in enumerate(lines):
            indent = len(line) - len(line.lstrip())
            
            if re.search(r'\b(for|while)\b', line):
                loop_stack.append((i, indent))
                
                if len(loop_stack) >= 3:
                    issue = CodeIssue(
                        id=f"nested_loops_{file_record.metadata.file_path}_{i+1}",
                        category=IssueCategory.PERFORMANCE,
                        severity=IssueSeverity.MEDIUM,
                        title="Deeply Nested Loops",
                        description=f"Found {len(loop_stack)} levels of nested loops, which may indicate O(n^{len(loop_stack)}) complexity.",
                        file_path=file_record.metadata.file_path,
                        line_number=i + 1,
                        recommendation="Consider refactoring to reduce nesting depth. Use data structures like hash maps to improve lookup performance.",
                        references=["Algorithm Design Manual"]
                    )
                    self.issues.append(issue)
            
            loop_stack = [(line_num, ind) for line_num, ind in loop_stack if ind < indent]
    
    def _detect_inefficient_string_concat(self, lines: List[str], file_record: CodeFileRecord):
        for i, line in enumerate(lines):
            if re.search(r'\+=\s*["\']', line) and 'for' in lines[max(0, i-3):i+1]:
                issue = CodeIssue(
                    id=f"string_concat_{file_record.metadata.file_path}_{i+1}",
                    category=IssueCategory.PERFORMANCE,
                    severity=IssueSeverity.LOW,
                    title="Inefficient String Concatenation in Loop",
                    description="String concatenation in a loop creates many intermediate string objects.",
                    file_path=file_record.metadata.file_path,
                    line_number=i + 1,
                    code_snippet=line.strip(),
                    recommendation="Use a list and join() in Python, or StringBuilder in Java, or array.join() in JavaScript.",
                    references=["Python Performance Tips"]
                )
                self.issues.append(issue)
    
    def _detect_blocking_in_async(self, lines: List[str], file_record: CodeFileRecord):
        in_async_function = False
        async_start_line = 0
        
        for i, line in enumerate(lines):
            if re.search(r'\basync\s+(def|function)', line):
                in_async_function = True
                async_start_line = i
            
            if in_async_function:
                if re.search(r'\btime\.sleep\(|\bsleep\((?!.*await)', line):
                    issue = CodeIssue(
                        id=f"blocking_async_{file_record.metadata.file_path}_{i+1}",
                        category=IssueCategory.PERFORMANCE,
                        severity=IssueSeverity.HIGH,
                        title="Blocking Call in Async Function",
                        description="Async function contains blocking sleep call that will block the entire event loop.",
                        file_path=file_record.metadata.file_path,
                        line_number=i + 1,
                        code_snippet=line.strip(),
                        recommendation="Use await asyncio.sleep() instead of time.sleep() in async functions.",
                        references=["Python asyncio documentation"]
                    )
                    self.issues.append(issue)
                
                if re.search(r'\bdef\s+\w+\(', line) and 'async' not in line:
                    in_async_function = False


class ArchitectureAnalyzer:
    def __init__(self, graph: CodeGraph, context_storage: ProjectContextStorage):
        self.graph = graph
        self.context_storage = context_storage
        self.issues: List[CodeIssue] = []
    
    def analyze_architecture(self) -> List[CodeIssue]:
        self.issues = []
        
        self._detect_circular_dependencies()
        self._detect_layer_violations()
        self._check_solid_principles()
        
        return self.issues
    
    def _detect_circular_dependencies(self):
        cycles = self.graph.find_circular_dependencies()
        
        for cycle in cycles:
            cycle_str = " -> ".join([self.graph.get_node(nid).name for nid in cycle])
            
            issue = CodeIssue(
                id=f"circular_dep_{hashlib.md5(cycle_str.encode()).hexdigest()[:12]}",
                category=IssueCategory.ARCHITECTURE,
                severity=IssueSeverity.HIGH,
                title="Circular Dependency",
                description=f"Circular dependency detected: {cycle_str}",
                file_path="<architecture>",
                line_number=0,
                recommendation="Break the circular dependency by introducing an interface or reorganizing the code structure.",
                references=["Dependency Inversion Principle"]
            )
            self.issues.append(issue)
    
    def _detect_layer_violations(self):
        pass
    
    def _check_solid_principles(self):
        principles = self.context_storage.get_all_principles()
        
        for principle_entry in principles:
            if "Single Responsibility" in principle_entry.title:
                classes = self.graph.get_nodes_by_type(NodeType.CLASS)
                
                for cls in classes:
                    methods = self.graph.get_outgoing_edges(cls.id, EdgeType.CONTAINS)
                    
                    if len(methods) > 15:
                        issue = CodeIssue(
                            id=f"srp_violation_{cls.id}",
                            category=IssueCategory.ARCHITECTURE,
                            severity=IssueSeverity.MEDIUM,
                            title="Possible Single Responsibility Principle Violation",
                            description=f"Class '{cls.name}' has {len(methods)} methods, suggesting it may have multiple responsibilities.",
                            file_path=cls.file_path,
                            line_number=cls.line_number,
                            recommendation="Review the class responsibilities and consider splitting into multiple classes.",
                            references=[principle_entry.title]
                        )
                        self.issues.append(issue)


class AnalysisOrchestrator:
    def __init__(self, code_storage: EnhancedCodeStorage, graph: CodeGraph, 
                 context_storage: ProjectContextStorage):
        self.code_storage = code_storage
        self.graph = graph
        self.context_storage = context_storage
        
        self.smell_detector = CodeSmellDetector()
        self.security_analyzer = SecurityAnalyzer()
        self.performance_analyzer = PerformanceAnalyzer()
        self.architecture_analyzer = ArchitectureAnalyzer(graph, context_storage)
        
        self.all_issues: List[CodeIssue] = []
    
    def analyze_all(self) -> List[CodeIssue]:
        self.all_issues = []
        
        for file_path, file_record in self.code_storage.files.items():
            try:
                with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                    content = f.read()
            except:
                content = ""
            
            ast_data = self.code_storage.get_ast_data(file_path)
            
            smell_issues = self.smell_detector.analyze_file(file_record, ast_data)
            self.all_issues.extend(smell_issues)
            
            security_issues = self.security_analyzer.analyze_file(file_record, content)
            self.all_issues.extend(security_issues)
            
            performance_issues = self.performance_analyzer.analyze_file(file_record, content, ast_data)
            self.all_issues.extend(performance_issues)
        
        arch_issues = self.architecture_analyzer.analyze_architecture()
        self.all_issues.extend(arch_issues)
        
        return self.all_issues
    
    def get_issues_by_severity(self, severity: IssueSeverity) -> List[CodeIssue]:
        return [issue for issue in self.all_issues if issue.severity == severity]
    
    def get_issues_by_category(self, category: IssueCategory) -> List[CodeIssue]:
        return [issue for issue in self.all_issues if issue.category == category]
    
    def get_issues_by_file(self, file_path: str) -> List[CodeIssue]:
        return [issue for issue in self.all_issues if issue.file_path == file_path]
    
    def generate_report(self) -> Dict[str, Any]:
        total_issues = len(self.all_issues)
        
        by_severity = {}
        for severity in IssueSeverity:
            by_severity[severity.value] = len(self.get_issues_by_severity(severity))
        
        by_category = {}
        for category in IssueCategory:
            by_category[category.value] = len(self.get_issues_by_category(category))
        
        critical_issues = self.get_issues_by_severity(IssueSeverity.CRITICAL)
        high_issues = self.get_issues_by_severity(IssueSeverity.HIGH)
        
        return {
            'total_issues': total_issues,
            'by_severity': by_severity,
            'by_category': by_category,
            'critical_issues': [issue.to_dict() for issue in critical_issues],
            'high_issues': [issue.to_dict() for issue in high_issues],
            'files_analyzed': len(self.code_storage.files),
            'timestamp': datetime.now().isoformat()
        }

This specialized analysis agents implementation demonstrates how to create focused analyzers for different quality aspects. Each analyzer has specific detection logic and generates structured issue reports with severity levels, recommendations, and references.

The CodeSmellDetector identifies common anti-patterns using configurable thresholds. It detects long methods both by line count and cyclomatic complexity, recognizing that a short but complex method can be just as problematic as a long simple one. It detects large classes, long parameter lists, and God classes.

The SecurityAnalyzer uses pattern matching to identify common security vulnerabilities. It looks for SQL injection risks by detecting string formatting in SQL queries. It identifies XSS vulnerabilities by finding unsafe DOM manipulation. It detects hardcoded secrets by looking for variable assignments that match secret patterns. Each finding includes references to security standards like OWASP Top Ten and CWE identifiers.

The PerformanceAnalyzer identifies performance anti-patterns. It detects deeply nested loops that indicate high algorithmic complexity. It finds inefficient string concatenation in loops. It identifies blocking calls in async functions that would block the event loop.

The ArchitectureAnalyzer evaluates high-level design using the code graph and project context. It detects circular dependencies by analyzing the dependency graph. It checks adherence to SOLID principles by retrieving principle definitions from the context storage and evaluating code against them.

The AnalysisOrchestrator coordinates all analyzers, collecting issues from each and providing aggregation and reporting capabilities. It can filter issues by severity, category, or file, and generates comprehensive reports.

COMPONENT EIGHT: CHANGE AGENT AND GIT INTEGRATION

The Change Agent monitors repository modifications and triggers appropriate re-analysis. When code changes, we do not want to re-analyze the entire codebase. Instead, we identify what changed and analyze only the affected files and their dependencies.

Git integration provides several capabilities. First, it tracks which files changed in each commit. Second, it identifies the type of change such as added, modified, deleted, or renamed. Third, it extracts commit metadata including author, timestamp, and commit message. Fourth, it analyzes the diff to understand what specifically changed within files.

The Change Agent uses this Git information to make intelligent decisions about what to re-analyze. If a file is modified, we re-analyze that file and check if any files that depend on it need re-analysis. If a file is deleted, we remove it from storage and update the dependency graph. If a file is added, we perform full analysis on it.

The Change Agent also maintains a history of analysis results, allowing us to track how code quality evolves over time. We can identify trends such as increasing complexity, growing technical debt, or improving test coverage.

Here is the implementation of the Change Agent with Git integration:

import subprocess
from typing import List, Dict, Optional, Set, Tuple, Any
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum


class ChangeType(Enum):
    ADDED = "added"
    MODIFIED = "modified"
    DELETED = "deleted"
    RENAMED = "renamed"


@dataclass
class FileChange:
    file_path: str
    change_type: ChangeType
    old_path: Optional[str] = None
    additions: int = 0
    deletions: int = 0
    
@dataclass
class CommitInfo:
    commit_hash: str
    author: str
    author_email: str
    timestamp: datetime
    message: str
    files_changed: List[FileChange]
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'hash': self.commit_hash,
            'author': self.author,
            'email': self.author_email,
            'timestamp': self.timestamp.isoformat(),
            'message': self.message,
            'files_changed': len(self.files_changed)
        }


class GitRepository:
    def __init__(self, repo_path: str):
        self.repo_path = repo_path
        self._verify_git_repo()
    
    def _verify_git_repo(self):
        try:
            result = subprocess.run(
                ['git', 'rev-parse', '--git-dir'],
                cwd=self.repo_path,
                capture_output=True,
                text=True,
                check=True
            )
        except subprocess.CalledProcessError:
            raise ValueError(f"Not a git repository: {self.repo_path}")
    
    def _run_git_command(self, args: List[str]) -> str:
        result = subprocess.run(
            ['git'] + args,
            cwd=self.repo_path,
            capture_output=True,
            text=True,
            check=True
        )
        return result.stdout.strip()
    
    def get_current_branch(self) -> str:
        return self._run_git_command(['branch', '--show-current'])
    
    def get_latest_commit(self) -> CommitInfo:
        commit_hash = self._run_git_command(['rev-parse', 'HEAD'])
        return self.get_commit_info(commit_hash)
    
    def get_commit_info(self, commit_hash: str) -> CommitInfo:
        info_format = '%H%n%an%n%ae%n%at%n%s%n%b'
        info = self._run_git_command(['show', '-s', f'--format={info_format}', commit_hash])
        
        lines = info.split('\n')
        commit_hash = lines[0]
        author = lines[1]
        author_email = lines[2]
        timestamp = datetime.fromtimestamp(int(lines[3]))
        message = '\n'.join(lines[4:])
        
        files_changed = self.get_changed_files(commit_hash)
        
        return CommitInfo(
            commit_hash=commit_hash,
            author=author,
            author_email=author_email,
            timestamp=timestamp,
            message=message,
            files_changed=files_changed
        )
    
    def get_changed_files(self, commit_hash: str) -> List[FileChange]:
        try:
            diff_output = self._run_git_command([
                'diff-tree', '--no-commit-id', '--name-status', '--numstat', '-r', commit_hash
            ])
        except subprocess.CalledProcessError:
            return []
        
        changes = []
        
        lines = diff_output.split('\n')
        i = 0
        while i < len(lines):
            if not lines[i].strip():
                i += 1
                continue
            
            parts = lines[i].split('\t')
            
            if len(parts) >= 3:
                try:
                    additions = int(parts[0]) if parts[0] != '-' else 0
                    deletions = int(parts[1]) if parts[1] != '-' else 0
                    file_path = parts[2]
                    
                    i += 1
                    if i < len(lines):
                        status_parts = lines[i].split('\t')
                        status = status_parts[0][0]
                        
                        if status == 'A':
                            change_type = ChangeType.ADDED
                        elif status == 'M':
                            change_type = ChangeType.MODIFIED
                        elif status == 'D':
                            change_type = ChangeType.DELETED
                        elif status == 'R':
                            change_type = ChangeType.RENAMED
                        else:
                            change_type = ChangeType.MODIFIED
                        
                        changes.append(FileChange(
                            file_path=file_path,
                            change_type=change_type,
                            additions=additions,
                            deletions=deletions
                        ))
                except (ValueError, IndexError):
                    pass
            
            i += 1
        
        return changes
    
    def get_file_history(self, file_path: str, max_commits: int = 10) -> List[CommitInfo]:
        try:
            log_output = self._run_git_command([
                'log', f'-{max_commits}', '--format=%H', '--', file_path
            ])
        except subprocess.CalledProcessError:
            return []
        
        commit_hashes = log_output.split('\n')
        
        history = []
        for commit_hash in commit_hashes:
            if commit_hash:
                history.append(self.get_commit_info(commit_hash))
        
        return history
    
    def get_commits_since(self, since_date: datetime) -> List[CommitInfo]:
        since_str = since_date.strftime('%Y-%m-%d')
        
        try:
            log_output = self._run_git_command([
                'log', f'--since={since_str}', '--format=%H'
            ])
        except subprocess.CalledProcessError:
            return []
        
        commit_hashes = log_output.split('\n')
        
        commits = []
        for commit_hash in commit_hashes:
            if commit_hash:
                commits.append(self.get_commit_info(commit_hash))
        
        return commits
    
    def get_file_content_at_commit(self, file_path: str, commit_hash: str) -> Optional[str]:
        try:
            content = self._run_git_command(['show', f'{commit_hash}:{file_path}'])
            return content
        except subprocess.CalledProcessError:
            return None


class ChangeAgent:
    def __init__(self, git_repo: GitRepository, code_storage: EnhancedCodeStorage,
                 graph: CodeGraph, orchestrator: AnalysisOrchestrator):
        self.git_repo = git_repo
        self.code_storage = code_storage
        self.graph = graph
        self.orchestrator = orchestrator
        
        self.last_analyzed_commit: Optional[str] = None
        self.analysis_history: List[Dict[str, Any]] = []
    
    def analyze_latest_changes(self) -> Dict[str, Any]:
        latest_commit = self.git_repo.get_latest_commit()
        
        if self.last_analyzed_commit == latest_commit.commit_hash:
            return {
                'status': 'no_changes',
                'message': 'No new commits since last analysis'
            }
        
        result = self.analyze_commit(latest_commit)
        
        self.last_analyzed_commit = latest_commit.commit_hash
        
        return result
    
    def analyze_commit(self, commit_info: CommitInfo) -> Dict[str, Any]:
        files_to_analyze = set()
        files_to_remove = set()
        
        for file_change in commit_info.files_changed:
            if file_change.change_type == ChangeType.DELETED:
                files_to_remove.add(file_change.file_path)
            elif file_change.change_type in [ChangeType.ADDED, ChangeType.MODIFIED]:
                files_to_analyze.add(file_change.file_path)
            elif file_change.change_type == ChangeType.RENAMED:
                if file_change.old_path:
                    files_to_remove.add(file_change.old_path)
                files_to_analyze.add(file_change.file_path)
        
        for file_path in files_to_remove:
            self._remove_file_from_analysis(file_path)
        
        for file_path in files_to_analyze:
            full_path = os.path.join(self.git_repo.repo_path, file_path)
            if os.path.exists(full_path):
                self._analyze_file(full_path)
        
        affected_files = self._find_affected_files(files_to_analyze)
        
        for file_path in affected_files:
            full_path = os.path.join(self.git_repo.repo_path, file_path)
            if os.path.exists(full_path):
                self._analyze_file(full_path)
        
        issues = self.orchestrator.analyze_all()
        
        analysis_result = {
            'commit': commit_info.to_dict(),
            'files_analyzed': len(files_to_analyze) + len(affected_files),
            'files_removed': len(files_to_remove),
            'issues_found': len(issues),
            'critical_issues': len([i for i in issues if i.severity == IssueSeverity.CRITICAL]),
            'high_issues': len([i for i in issues if i.severity == IssueSeverity.HIGH]),
            'timestamp': datetime.now().isoformat()
        }
        
        self.analysis_history.append(analysis_result)
        
        return analysis_result
    
    def _remove_file_from_analysis(self, file_path: str):
        full_path = os.path.join(self.git_repo.repo_path, file_path)
        
        if full_path in self.code_storage.files:
            del self.code_storage.files[full_path]
        
        nodes_to_remove = self.graph.get_nodes_in_file(full_path)
        for node in nodes_to_remove:
            if node.id in self.graph.nodes:
                del self.graph.nodes[node.id]
    
    def _analyze_file(self, file_path: str):
        self.code_storage.ingest_file_with_ast(file_path)
    
    def _find_affected_files(self, changed_files: Set[str]) -> Set[str]:
        affected = set()
        
        for file_path in changed_files:
            full_path = os.path.join(self.git_repo.repo_path, file_path)
            
            file_nodes = self.graph.get_nodes_in_file(full_path)
            
            for node in file_nodes:
                dependents = self.graph.get_incoming_edges(node.id, EdgeType.DEPENDS_ON)
                
                for dependent_node, _ in dependents:
                    if dependent_node.file_path not in changed_files:
                        affected.add(dependent_node.file_path)
        
        return affected
    
    def get_quality_trend(self, metric: str = 'total_issues', window: int = 10) -> List[Tuple[datetime, int]]:
        trend = []
        
        for analysis in self.analysis_history[-window:]:
            timestamp = datetime.fromisoformat(analysis['timestamp'])
            value = analysis.get(metric, 0)
            trend.append((timestamp, value))
        
        return trend
    
    def compare_with_previous(self) -> Optional[Dict[str, Any]]:
        if len(self.analysis_history) < 2:
            return None
        
        current = self.analysis_history[-1]
        previous = self.analysis_history[-2]
        
        return {
            'issues_change': current['issues_found'] - previous['issues_found'],
            'critical_change': current['critical_issues'] - previous['critical_issues'],
            'high_change': current['high_issues'] - previous['high_issues'],
            'files_change': current['files_analyzed'] - previous['files_analyzed']
        }

This Change Agent implementation provides comprehensive Git integration and intelligent change tracking. The GitRepository class wraps Git commands to extract commit information, file changes, and file history. It uses subprocess to execute Git commands and parses the output into structured data.

The ChangeAgent class orchestrates the analysis of changes. When analyzing a commit, it categorizes file changes into added, modified, deleted, and renamed. For deleted files, it removes them from storage and the graph. For added and modified files, it performs full analysis. For renamed files, it removes the old file and analyzes the new one.

The agent also identifies affected files by traversing the dependency graph. If file A changes and file B depends on A, then B might be affected by the change and should be re-analyzed. This ensures that ripple effects of changes are detected.

The analysis history tracking enables trend analysis. We can see whether code quality is improving or degrading over time. The compare_with_previous method shows the delta between consecutive analyses, highlighting whether the latest commit introduced new issues or fixed existing ones.

COMPONENT NINE: META-REFLECTION LAYER

The Meta-Reflection Layer evaluates the quality and confidence of the analysis itself. Large language models can produce hallucinations or make incorrect assessments. The meta-reflection layer adds a self-checking mechanism where the system evaluates its own findings.

This layer implements several reflection strategies. First, confidence scoring where each finding is assigned a confidence level based on how certain the analysis is. Second, consistency checking where findings are cross-referenced with multiple sources of evidence. Third, uncertainty identification where the system explicitly flags areas where it lacks sufficient information. Fourth, recommendation validation where proposed solutions are checked for feasibility and correctness.

The meta-reflection layer also identifies when human review is most needed. Not all findings require human attention, but critical issues with low confidence or findings that contradict architectural decisions should be flagged for human review.

Here is the implementation of the Meta-Reflection Layer:

from typing import List, Dict, Optional, Set, Tuple, Any
from dataclasses import dataclass, field
from enum import Enum


class ConfidenceLevel(Enum):
    VERY_HIGH = "very_high"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    VERY_LOW = "very_low"


@dataclass
class ReflectionResult:
    issue_id: str
    confidence: ConfidenceLevel
    evidence_count: int
    contradictions: List[str]
    uncertainties: List[str]
    requires_human_review: bool
    review_reason: Optional[str] = None
    alternative_interpretations: List[str] = field(default_factory=list)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'issue_id': self.issue_id,
            'confidence': self.confidence.value,
            'evidence_count': self.evidence_count,
            'contradictions': self.contradictions,
            'uncertainties': self.uncertainties,
            'requires_human_review': self.requires_human_review,
            'review_reason': self.review_reason,
            'alternative_interpretations': self.alternative_interpretations
        }


class MetaReflectionLayer:
    def __init__(self, code_storage: EnhancedCodeStorage, graph: CodeGraph,
                 context_storage: ProjectContextStorage, adr_storage: ADRStorage):
        self.code_storage = code_storage
        self.graph = graph
        self.context_storage = context_storage
        self.adr_storage = adr_storage
    
    def reflect_on_issues(self, issues: List[CodeIssue]) -> List[ReflectionResult]:
        reflections = []
        
        for issue in issues:
            reflection = self._reflect_on_issue(issue)
            reflections.append(reflection)
        
        return reflections
    
    def _reflect_on_issue(self, issue: CodeIssue) -> ReflectionResult:
        evidence_count = self._count_evidence(issue)
        
        contradictions = self._find_contradictions(issue)
        
        uncertainties = self._identify_uncertainties(issue)
        
        confidence = self._calculate_confidence(evidence_count, contradictions, uncertainties)
        
        requires_review, review_reason = self._determine_human_review_need(
            issue, confidence, contradictions, uncertainties
        )
        
        alternatives = self._generate_alternative_interpretations(issue)
        
        return ReflectionResult(
            issue_id=issue.id,
            confidence=confidence,
            evidence_count=evidence_count,
            contradictions=contradictions,
            uncertainties=uncertainties,
            requires_human_review=requires_review,
            review_reason=review_reason,
            alternative_interpretations=alternatives
        )
    
    def _count_evidence(self, issue: CodeIssue) -> int:
        evidence = 0
        
        if issue.code_snippet:
            evidence += 1
        
        if issue.line_number > 0:
            evidence += 1
        
        if issue.category == IssueCategory.CODE_SMELL:
            file_record = self.code_storage.get_file_by_path(issue.file_path)
            if file_record:
                if issue.title == "Long Method":
                    for entity in file_record.entities:
                        if entity.line_number == issue.line_number:
                            if entity.end_line - entity.line_number > 50:
                                evidence += 1
                            if entity.complexity > 10:
                                evidence += 1
        
        if issue.references:
            evidence += len(issue.references)
        
        return evidence
    
    def _find_contradictions(self, issue: CodeIssue) -> List[str]:
        contradictions = []
        
        if issue.category == IssueCategory.ARCHITECTURE:
            relevant_adrs = self.adr_storage.retrieve_relevant_adrs(issue.description, top_k=3)
            
            for adr, similarity in relevant_adrs:
                if similarity > 0.7:
                    if "intentional" in adr.decision.lower() or "deliberate" in adr.decision.lower():
                        contradictions.append(
                            f"ADR '{adr.title}' suggests this may be an intentional design decision"
                        )
        
        if issue.category == IssueCategory.CODE_SMELL and "Large Class" in issue.title:
            relevant_context = self.context_storage.retrieve_relevant_context(
                issue.description,
                context_types=[ContextType.ARCHITECTURAL_PRINCIPLE]
            )
            
            for context, similarity in relevant_context:
                if "god class" in context.content.lower() and "acceptable" in context.content.lower():
                    contradictions.append(
                        f"Project guidelines may allow this pattern in certain contexts"
                    )
        
        return contradictions
    
    def _identify_uncertainties(self, issue: CodeIssue) -> List[str]:
        uncertainties = []
        
        if not issue.code_snippet:
            uncertainties.append("No code snippet available to verify the issue")
        
        file_record = self.code_storage.get_file_by_path(issue.file_path)
        if not file_record:
            uncertainties.append("File not found in code storage")
        
        if issue.category == IssueCategory.SECURITY:
            if "potential" in issue.description.lower() or "may" in issue.description.lower():
                uncertainties.append("Security issue requires manual verification of data flow")
        
        if issue.category == IssueCategory.PERFORMANCE:
            uncertainties.append("Performance impact depends on runtime data and usage patterns")
        
        if issue.confidence < 0.7:
            uncertainties.append("Low initial confidence in automated detection")
        
        return uncertainties
    
    def _calculate_confidence(self, evidence_count: int, contradictions: List[str],
                             uncertainties: List[str]) -> ConfidenceLevel:
        base_confidence = min(1.0, evidence_count / 5.0)
        
        contradiction_penalty = len(contradictions) * 0.2
        uncertainty_penalty = len(uncertainties) * 0.1
        
        final_confidence = max(0.0, base_confidence - contradiction_penalty - uncertainty_penalty)
        
        if final_confidence >= 0.9:
            return ConfidenceLevel.VERY_HIGH
        elif final_confidence >= 0.7:
            return ConfidenceLevel.HIGH
        elif final_confidence >= 0.5:
            return ConfidenceLevel.MEDIUM
        elif final_confidence >= 0.3:
            return ConfidenceLevel.LOW
        else:
            return ConfidenceLevel.VERY_LOW
    
    def _determine_human_review_need(self, issue: CodeIssue, confidence: ConfidenceLevel,
                                    contradictions: List[str], uncertainties: List[str]) -> Tuple[bool, Optional[str]]:
        if issue.severity == IssueSeverity.CRITICAL:
            if confidence in [ConfidenceLevel.LOW, ConfidenceLevel.VERY_LOW]:
                return True, "Critical issue with low confidence requires verification"
            
            if contradictions:
                return True, "Critical issue contradicts documented architectural decisions"
        
        if len(contradictions) > 0:
            return True, "Finding contradicts existing documentation"
        
        if confidence == ConfidenceLevel.VERY_LOW:
            return True, "Very low confidence in finding"
        
        if issue.category == IssueCategory.SECURITY and len(uncertainties) > 2:
            return True, "Security issue with multiple uncertainties"
        
        if issue.category == IssueCategory.ARCHITECTURE:
            return True, "Architectural issues should be reviewed by architects"
        
        return False, None
    
    def _generate_alternative_interpretations(self, issue: CodeIssue) -> List[str]:
        alternatives = []
        
        if issue.category == IssueCategory.CODE_SMELL and "Long Method" in issue.title:
            alternatives.append("Method may be long due to necessary sequential steps that cannot be easily extracted")
            alternatives.append("Method may be a template method that coordinates multiple steps")
        
        if issue.category == IssueCategory.PERFORMANCE and "Nested Loops" in issue.title:
            alternatives.append("Nested loops may be necessary for the algorithm and data size may be small")
            alternatives.append("Performance may be acceptable for the use case frequency")
        
        if issue.category == IssueCategory.SECURITY:
            alternatives.append("Code may be in a test or example file where security is not critical")
            alternatives.append("Security control may be implemented at a different layer")
        
        return alternatives
    
    def generate_review_report(self, issues: List[CodeIssue], reflections: List[ReflectionResult]) -> Dict[str, Any]:
        total_issues = len(issues)
        
        confidence_distribution = {}
        for conf_level in ConfidenceLevel:
            confidence_distribution[conf_level.value] = len([
                r for r in reflections if r.confidence == conf_level
            ])
        
        issues_needing_review = [r for r in reflections if r.requires_human_review]
        
        high_confidence_critical = []
        for i, issue in enumerate(issues):
            if issue.severity == IssueSeverity.CRITICAL and reflections[i].confidence in [ConfidenceLevel.HIGH, ConfidenceLevel.VERY_HIGH]:
                high_confidence_critical.append({
                    'issue': issue.to_dict(),
                    'reflection': reflections[i].to_dict()
                })
        
        return {
            'total_issues': total_issues,
            'confidence_distribution': confidence_distribution,
            'issues_needing_review': len(issues_needing_review),
            'review_percentage': (len(issues_needing_review) / max(1, total_issues)) * 100,
            'high_confidence_critical': high_confidence_critical,
            'summary': {
                'automated_actionable': total_issues - len(issues_needing_review),
                'requires_human_judgment': len(issues_needing_review),
                'confidence_score': sum([
                    1.0 if r.confidence == ConfidenceLevel.VERY_HIGH else
                    0.8 if r.confidence == ConfidenceLevel.HIGH else
                    0.6 if r.confidence == ConfidenceLevel.MEDIUM else
                    0.4 if r.confidence == ConfidenceLevel.LOW else 0.2
                    for r in reflections
                ]) / max(1, len(reflections))
            }
        }

This Meta-Reflection Layer implementation provides sophisticated self-evaluation capabilities. The reflection process examines each issue from multiple angles to assess confidence and identify areas of uncertainty.

The evidence counting mechanism looks for multiple sources of confirmation. An issue supported by code snippets, metrics, and references to best practices receives higher confidence than one based on a single heuristic.

The contradiction detection cross-references findings with ADRs and project context. If an issue flags something as a problem but an ADR documents it as an intentional decision, this contradiction is surfaced. This prevents the system from recommending changes that would violate documented architectural choices.

The uncertainty identification explicitly acknowledges limitations. Security issues often require understanding data flow across multiple files, which is difficult for automated analysis. Performance issues depend on runtime behavior and usage patterns. By flagging these uncertainties, the system sets appropriate expectations.

The confidence calculation combines evidence, contradictions, and uncertainties into a single confidence level. This allows users to prioritize high-confidence findings while treating low-confidence findings with appropriate skepticism.

The human review determination implements a sophisticated decision tree. Critical issues always need review if confidence is low or contradictions exist. Architectural issues always need review because they involve strategic decisions. Security issues with multiple uncertainties need review because the cost of false negatives is high.

The alternative interpretations provide context that prevents tunnel vision. A long method might be long for good reasons. Nested loops might be necessary for the algorithm. By presenting alternatives, the system encourages critical thinking rather than blind acceptance of findings.

COMPONENT TEN: CHAIN OF THOUGHT AND TREE OF THOUGHT REASONING

Chain of Thought and Tree of Thought reasoning enable deeper analysis of complex code patterns and architectural decisions. Rather than making snap judgments, these techniques encourage step-by-step reasoning and exploration of multiple solution paths.

Chain of Thought reasoning breaks down complex analysis into sequential steps. For example, when evaluating whether a class violates the Single Responsibility Principle, the system first identifies all responsibilities, then evaluates whether they are cohesive, then considers whether splitting would improve or harm the design.

Tree of Thought reasoning explores multiple analysis paths in parallel. When identifying the root cause of a performance issue, the system might explore algorithmic complexity, data structure choice, and I/O patterns simultaneously, then synthesize findings from all paths.

These reasoning techniques are particularly valuable for architectural analysis where there are often multiple valid perspectives and trade-offs to consider.

Here is the implementation of CoT and ToT reasoning:

from typing import List, Dict, Optional, Set, Tuple, Any, Callable
from dataclasses import dataclass, field
from enum import Enum


@dataclass
class ReasoningStep:
    step_number: int
    description: str
    observation: str
    reasoning: str
    conclusion: str
    confidence: float
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'step': self.step_number,
            'description': self.description,
            'observation': self.observation,
            'reasoning': self.reasoning,
            'conclusion': self.conclusion,
            'confidence': self.confidence
        }


@dataclass
class ThoughtPath:
    path_id: str
    description: str
    steps: List[ReasoningStep]
    final_conclusion: str
    overall_confidence: float
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'path_id': self.path_id,
            'description': self.description,
            'steps': [step.to_dict() for step in self.steps],
            'conclusion': self.final_conclusion,
            'confidence': self.overall_confidence
        }


class ChainOfThoughtAnalyzer:
    def __init__(self, code_storage: EnhancedCodeStorage, graph: CodeGraph,
                 context_storage: ProjectContextStorage):
        self.code_storage = code_storage
        self.graph = graph
        self.context_storage = context_storage
    
    def analyze_srp_violation(self, class_node: GraphNode) -> List[ReasoningStep]:
        steps = []
        
        step1 = ReasoningStep(
            step_number=1,
            description="Identify class responsibilities",
            observation=f"Class '{class_node.name}' is defined in {class_node.file_path}",
            reasoning="To evaluate SRP, we must first identify what responsibilities the class has",
            conclusion="Proceeding to enumerate methods and their purposes",
            confidence=1.0
        )
        steps.append(step1)
        
        methods = self.graph.get_outgoing_edges(class_node.id, EdgeType.CONTAINS)
        method_nodes = [node for node, _ in methods if node.node_type == NodeType.METHOD]
        
        step2 = ReasoningStep(
            step_number=2,
            description="Enumerate methods",
            observation=f"Found {len(method_nodes)} methods in the class",
            reasoning="The number and nature of methods indicates potential responsibilities",
            conclusion=f"Class has {len(method_nodes)} methods to analyze",
            confidence=1.0
        )
        steps.append(step2)
        
        responsibility_groups = self._group_methods_by_responsibility(method_nodes)
        
        step3 = ReasoningStep(
            step_number=3,
            description="Group methods by responsibility",
            observation=f"Methods can be grouped into {len(responsibility_groups)} categories",
            reasoning="Methods that work with similar data or serve similar purposes likely represent distinct responsibilities",
            conclusion=f"Identified {len(responsibility_groups)} potential responsibilities: {', '.join(responsibility_groups.keys())}",
            confidence=0.7
        )
        steps.append(step3)
        
        if len(responsibility_groups) > 1:
            step4 = ReasoningStep(
                step_number=4,
                description="Evaluate cohesion",
                observation=f"Multiple responsibility groups detected",
                reasoning="A class with multiple distinct responsibilities violates SRP",
                conclusion=f"Class likely violates SRP with {len(responsibility_groups)} responsibilities",
                confidence=0.8
            )
        else:
            step4 = ReasoningStep(
                step_number=4,
                description="Evaluate cohesion",
                observation="All methods appear to serve a single responsibility",
                reasoning="Methods working toward a common goal suggests good cohesion",
                conclusion="Class appears to follow SRP",
                confidence=0.7
            )
        steps.append(step4)
        
        return steps
    
    def _group_methods_by_responsibility(self, method_nodes: List[GraphNode]) -> Dict[str, List[str]]:
        groups = {}
        
        keywords = {
            'data_access': ['get', 'set', 'fetch', 'load', 'save', 'update', 'delete', 'query'],
            'validation': ['validate', 'check', 'verify', 'ensure'],
            'calculation': ['calculate', 'compute', 'process', 'transform'],
            'formatting': ['format', 'render', 'display', 'print'],
            'coordination': ['handle', 'manage', 'coordinate', 'orchestrate']
        }
        
        for method in method_nodes:
            method_name_lower = method.name.lower()
            
            assigned = False
            for group, group_keywords in keywords.items():
                if any(keyword in method_name_lower for keyword in group_keywords):
                    if group not in groups:
                        groups[group] = []
                    groups[group].append(method.name)
                    assigned = True
                    break
            
            if not assigned:
                if 'other' not in groups:
                    groups['other'] = []
                groups['other'].append(method.name)
        
        return groups
    
    def analyze_performance_issue(self, issue: CodeIssue, file_record: CodeFileRecord) -> List[ReasoningStep]:
        steps = []
        
        step1 = ReasoningStep(
            step_number=1,
            description="Identify the performance concern",
            observation=f"Issue: {issue.title} at line {issue.line_number}",
            reasoning="Understanding the specific performance concern is the first step",
            conclusion=f"Analyzing {issue.title}",
            confidence=1.0
        )
        steps.append(step1)
        
        if "Nested Loops" in issue.title:
            step2 = ReasoningStep(
                step_number=2,
                description="Analyze algorithmic complexity",
                observation="Multiple nested loops detected",
                reasoning="Nested loops often indicate polynomial time complexity",
                conclusion="Potential O(n^k) complexity where k is nesting depth",
                confidence=0.9
            )
            steps.append(step2)
            
            step3 = ReasoningStep(
                step_number=3,
                description="Consider data size",
                observation="Data size is unknown from static analysis",
                reasoning="Performance impact depends on input size",
                conclusion="Need runtime profiling to confirm impact",
                confidence=0.5
            )
            steps.append(step3)
            
            step4 = ReasoningStep(
                step_number=4,
                description="Evaluate alternatives",
                observation="Could potentially use hash maps or other data structures",
                reasoning="Better data structures can reduce complexity",
                conclusion="Recommend considering alternative approaches",
                confidence=0.7
            )
            steps.append(step4)
        
        return steps


class TreeOfThoughtAnalyzer:
    def __init__(self, code_storage: EnhancedCodeStorage, graph: CodeGraph,
                 context_storage: ProjectContextStorage):
        self.code_storage = code_storage
        self.graph = graph
        self.context_storage = context_storage
    
    def analyze_architectural_issue(self, issue: CodeIssue) -> List[ThoughtPath]:
        paths = []
        
        path1 = self._explore_technical_perspective(issue)
        paths.append(path1)
        
        path2 = self._explore_business_perspective(issue)
        paths.append(path2)
        
        path3 = self._explore_maintenance_perspective(issue)
        paths.append(path3)
        
        return paths
    
    def _explore_technical_perspective(self, issue: CodeIssue) -> ThoughtPath:
        steps = []
        
        step1 = ReasoningStep(
            step_number=1,
            description="Evaluate technical correctness",
            observation=f"Issue: {issue.title}",
            reasoning="From a purely technical standpoint, does this violate best practices?",
            conclusion="Analyzing against technical standards",
            confidence=0.9
        )
        steps.append(step1)
        
        step2 = ReasoningStep(
            step_number=2,
            description="Consider technical debt",
            observation="Issue may increase coupling or complexity",
            reasoning="Technical debt accumulates when we deviate from best practices",
            conclusion="This pattern may increase future maintenance burden",
            confidence=0.7
        )
        steps.append(step2)
        
        return ThoughtPath(
            path_id="technical",
            description="Technical perspective on the issue",
            steps=steps,
            final_conclusion="From a technical perspective, this is a concern",
            overall_confidence=0.8
        )
    
    def _explore_business_perspective(self, issue: CodeIssue) -> ThoughtPath:
        steps = []
        
        step1 = ReasoningStep(
            step_number=1,
            description="Evaluate business impact",
            observation=f"Issue severity: {issue.severity.value}",
            reasoning="Business impact depends on how this affects delivery and operations",
            conclusion="Assessing business risk",
            confidence=0.6
        )
        steps.append(step1)
        
        step2 = ReasoningStep(
            step_number=2,
            description="Consider time-to-market",
            observation="Fixing this issue requires refactoring time",
            reasoning="Business may prioritize features over technical perfection",
            conclusion="May be acceptable as temporary technical debt if delivery is critical",
            confidence=0.5
        )
        steps.append(step2)
        
        return ThoughtPath(
            path_id="business",
            description="Business perspective on the issue",
            steps=steps,
            final_conclusion="Business perspective may justify accepting this temporarily",
            overall_confidence=0.6
        )
    
    def _explore_maintenance_perspective(self, issue: CodeIssue) -> ThoughtPath:
        steps = []
        
        step1 = ReasoningStep(
            step_number=1,
            description="Evaluate long-term maintainability",
            observation=f"Issue category: {issue.category.value}",
            reasoning="Maintainability affects total cost of ownership",
            conclusion="Analyzing impact on future development",
            confidence=0.8
        )
        steps.append(step1)
        
        step2 = ReasoningStep(
            step_number=2,
            description="Consider team knowledge",
            observation="Team familiarity with codebase varies",
            reasoning="Complex code is harder for new team members to understand",
            conclusion="This issue may slow down onboarding and future changes",
            confidence=0.7
        )
        steps.append(step2)
        
        return ThoughtPath(
            path_id="maintenance",
            description="Maintenance perspective on the issue",
            steps=steps,
            final_conclusion="From a maintenance perspective, this should be addressed",
            overall_confidence=0.75
        )
    
    def synthesize_paths(self, paths: List[ThoughtPath]) -> Dict[str, Any]:
        all_conclusions = [path.final_conclusion for path in paths]
        
        avg_confidence = sum(path.overall_confidence for path in paths) / len(paths)
        
        perspectives = {path.path_id: path.to_dict() for path in paths}
        
        recommendation = self._generate_balanced_recommendation(paths)
        
        return {
            'perspectives': perspectives,
            'average_confidence': avg_confidence,
            'balanced_recommendation': recommendation,
            'decision_factors': [
                'Technical correctness and best practices',
                'Business priorities and time constraints',
                'Long-term maintainability and team capability'
            ]
        }
    
    def _generate_balanced_recommendation(self, paths: List[ThoughtPath]) -> str:
        technical_path = next((p for p in paths if p.path_id == 'technical'), None)
        business_path = next((p for p in paths if p.path_id == 'business'), None)
        maintenance_path = next((p for p in paths if p.path_id == 'maintenance'), None)
        
        if technical_path and maintenance_path:
            if technical_path.overall_confidence > 0.7 and maintenance_path.overall_confidence > 0.7:
                return "Strong recommendation to address this issue. Both technical and maintenance perspectives indicate concern."
        
        if business_path and business_path.overall_confidence > 0.6:
            return "Consider addressing this issue, but business priorities may justify deferring. Document as technical debt."
        
        return "Mixed signals. Recommend human review to balance technical, business, and maintenance concerns."


class ReasoningOrchestrator:
    def __init__(self, cot_analyzer: ChainOfThoughtAnalyzer, tot_analyzer: TreeOfThoughtAnalyzer):
        self.cot_analyzer = cot_analyzer
        self.tot_analyzer = tot_analyzer
    
    def deep_analyze_issue(self, issue: CodeIssue, file_record: Optional[CodeFileRecord] = None) -> Dict[str, Any]:
        cot_steps = []
        
        if issue.category == IssueCategory.CODE_SMELL and file_record:
            if "Single Responsibility" in issue.title:
                class_nodes = [e for e in file_record.entities if e.entity_type == 'class']
                if class_nodes:
                    pass
            
            if "Performance" in issue.category.value:
                cot_steps = self.cot_analyzer.analyze_performance_issue(issue, file_record)
        
        tot_paths = []
        if issue.category == IssueCategory.ARCHITECTURE:
            tot_paths = self.tot_analyzer.analyze_architectural_issue(issue)
        
        result = {
            'issue': issue.to_dict(),
            'chain_of_thought': [step.to_dict() for step in cot_steps] if cot_steps else None,
            'tree_of_thought': self.tot_analyzer.synthesize_paths(tot_paths) if tot_paths else None
        }
        
        return result

This Chain of Thought and Tree of Thought implementation demonstrates advanced reasoning capabilities. The ChainOfThoughtAnalyzer breaks down complex evaluations into sequential steps, making the reasoning process transparent and verifiable.

When analyzing Single Responsibility Principle violations, the system first identifies the class, then enumerates its methods, then groups methods by responsibility, and finally evaluates cohesion. Each step builds on the previous one, and confidence levels reflect the certainty at each stage.

The TreeOfThoughtAnalyzer explores multiple perspectives simultaneously. When evaluating an architectural issue, it considers technical correctness, business impact, and long-term maintainability as separate thought paths. Each path reaches its own conclusion with its own confidence level.

The synthesis of multiple thought paths provides a balanced view. Rather than presenting a single recommendation, the system acknowledges that different perspectives may lead to different conclusions. This mirrors how human architects think about trade-offs.

The reasoning orchestrator coordinates both CoT and ToT analysis, applying the appropriate technique based on the issue type. Code smells benefit from step-by-step analysis, while architectural issues benefit from multi-perspective exploration.

COMPONENT ELEVEN: SUMMARIZATION SYSTEM

The Summarization System condenses findings across multiple files and creates hierarchical summaries at different levels of abstraction. When analyzing a large codebase, presenting every individual finding would overwhelm users. Instead, we need to summarize at multiple levels: individual files, modules, subsystems, and the entire system.

The summarization system implements several strategies. First, aggregation where similar issues are grouped together. Second, prioritization where the most important findings are highlighted. Third, hierarchical organization where summaries exist at file, directory, and repository levels. Fourth, trend identification where patterns across multiple files are surfaced.

Here is the implementation of the Summarization System:

from typing import List, Dict, Optional, Set, Tuple, Any
from dataclasses import dataclass, field
from collections import defaultdict


@dataclass
class FileSummary:
    file_path: str
    language: str
    lines_of_code: int
    complexity_score: float
    maintainability_index: float
    issue_count: int
    critical_issues: int
    high_issues: int
    top_issues: List[Dict[str, Any]]
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'file': self.file_path,
            'language': self.language,
            'loc': self.lines_of_code,
            'complexity': round(self.complexity_score, 2),
            'maintainability': round(self.maintainability_index, 2),
            'total_issues': self.issue_count,
            'critical': self.critical_issues,
            'high': self.high_issues,
            'top_issues': self.top_issues
        }


@dataclass
class DirectorySummary:
    directory_path: str
    file_count: int
    total_loc: int
    avg_complexity: float
    avg_maintainability: float
    total_issues: int
    issue_distribution: Dict[str, int]
    most_problematic_files: List[str]
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'directory': self.directory_path,
            'files': self.file_count,
            'total_loc': self.total_loc,
            'avg_complexity': round(self.avg_complexity, 2),
            'avg_maintainability': round(self.avg_maintainability, 2),
            'total_issues': self.total_issues,
            'issue_distribution': self.issue_distribution,
            'most_problematic': self.most_problematic_files
        }


@dataclass
class RepositorySummary:
    total_files: int
    total_loc: int
    languages: Dict[str, int]
    total_issues: int
    issues_by_severity: Dict[str, int]
    issues_by_category: Dict[str, int]
    quality_score: float
    top_concerns: List[str]
    recommendations: List[str]
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'total_files': self.total_files,
            'total_loc': self.total_loc,
            'languages': self.languages,
            'total_issues': self.total_issues,
            'by_severity': self.issues_by_severity,
            'by_category': self.issues_by_category,
            'quality_score': round(self.quality_score, 2),
            'top_concerns': self.top_concerns,
            'recommendations': self.recommendations
        }


class SummarizationSystem:
    def __init__(self, code_storage: EnhancedCodeStorage):
        self.code_storage = code_storage
    
    def summarize_file(self, file_path: str, issues: List[CodeIssue]) -> FileSummary:
        file_record = self.code_storage.get_file_by_path(file_path)
        
        if not file_record:
            return None
        
        file_issues = [issue for issue in issues if issue.file_path == file_path]
        
        critical_count = len([i for i in file_issues if i.severity == IssueSeverity.CRITICAL])
        high_count = len([i for i in file_issues if i.severity == IssueSeverity.HIGH])
        
        top_issues = sorted(file_issues, key=lambda x: (
            0 if x.severity == IssueSeverity.CRITICAL else
            1 if x.severity == IssueSeverity.HIGH else
            2 if x.severity == IssueSeverity.MEDIUM else 3
        ))[:5]
        
        return FileSummary(
            file_path=file_path,
            language=file_record.metadata.language,
            lines_of_code=file_record.metadata.lines_of_code,
            complexity_score=file_record.complexity_score,
            maintainability_index=file_record.maintainability_index,
            issue_count=len(file_issues),
            critical_issues=critical_count,
            high_issues=high_count,
            top_issues=[{
                'title': issue.title,
                'severity': issue.severity.value,
                'line': issue.line_number
            } for issue in top_issues]
        )
    
    def summarize_directory(self, directory_path: str, issues: List[CodeIssue]) -> DirectorySummary:
        dir_files = [
            (path, record) for path, record in self.code_storage.files.items()
            if path.startswith(directory_path)
        ]
        
        if not dir_files:
            return None
        
        total_loc = sum(record.metadata.lines_of_code for _, record in dir_files)
        avg_complexity = sum(record.complexity_score for _, record in dir_files) / len(dir_files)
        avg_maintainability = sum(record.maintainability_index for _, record in dir_files) / len(dir_files)
        
        dir_issues = [
            issue for issue in issues
            if any(issue.file_path == path for path, _ in dir_files)
        ]
        
        issue_distribution = defaultdict(int)
        for issue in dir_issues:
            issue_distribution[issue.severity.value] += 1
        
        file_issue_counts = defaultdict(int)
        for issue in dir_issues:
            file_issue_counts[issue.file_path] += 1
        
        most_problematic = sorted(
            file_issue_counts.items(),
            key=lambda x: x[1],
            reverse=True
        )[:5]
        
        return DirectorySummary(
            directory_path=directory_path,
            file_count=len(dir_files),
            total_loc=total_loc,
            avg_complexity=avg_complexity,
            avg_maintainability=avg_maintainability,
            total_issues=len(dir_issues),
            issue_distribution=dict(issue_distribution),
            most_problematic_files=[path for path, _ in most_problematic]
        )
    
    def summarize_repository(self, issues: List[CodeIssue]) -> RepositorySummary:
        total_files = len(self.code_storage.files)
        total_loc = sum(record.metadata.lines_of_code for record in self.code_storage.files.values())
        
        languages = defaultdict(int)
        for record in self.code_storage.files.values():
            languages[record.metadata.language] += 1
        
        issues_by_severity = defaultdict(int)
        for issue in issues:
            issues_by_severity[issue.severity.value] += 1
        
        issues_by_category = defaultdict(int)
        for issue in issues:
            issues_by_category[issue.category.value] += 1
        
        quality_score = self._calculate_quality_score(issues, total_files, total_loc)
        
        top_concerns = self._identify_top_concerns(issues)
        
        recommendations = self._generate_recommendations(issues, quality_score)
        
        return RepositorySummary(
            total_files=total_files,
            total_loc=total_loc,
            languages=dict(languages),
            total_issues=len(issues),
            issues_by_severity=dict(issues_by_severity),
            issues_by_category=dict(issues_by_category),
            quality_score=quality_score,
            top_concerns=top_concerns,
            recommendations=recommendations
        )
    
    def _calculate_quality_score(self, issues: List[CodeIssue], total_files: int, total_loc: int) -> float:
        if total_files == 0:
            return 100.0
        
        critical_penalty = len([i for i in issues if i.severity == IssueSeverity.CRITICAL]) * 10
        high_penalty = len([i for i in issues if i.severity == IssueSeverity.HIGH]) * 5
        medium_penalty = len([i for i in issues if i.severity == IssueSeverity.MEDIUM]) * 2
        low_penalty = len([i for i in issues if i.severity == IssueSeverity.LOW]) * 1
        
        total_penalty = critical_penalty + high_penalty + medium_penalty + low_penalty
        
        normalized_penalty = (total_penalty / total_files) * 10
        
        score = max(0, 100 - normalized_penalty)
        
        return score
    
    def _identify_top_concerns(self, issues: List[CodeIssue]) -> List[str]:
        concerns = []
        
        critical_issues = [i for i in issues if i.severity == IssueSeverity.CRITICAL]
        if critical_issues:
            security_critical = [i for i in critical_issues if i.category == IssueCategory.SECURITY]
            if security_critical:
                concerns.append(f"{len(security_critical)} critical security vulnerabilities require immediate attention")
        
        category_counts = defaultdict(int)
        for issue in issues:
            category_counts[issue.category] += 1
        
        if category_counts[IssueCategory.CODE_SMELL] > len(issues) * 0.4:
            concerns.append("High prevalence of code smells suggests need for refactoring")
        
        if category_counts[IssueCategory.ARCHITECTURE] > 5:
            concerns.append("Multiple architectural issues detected, consider architecture review")
        
        return concerns[:5]
    
    def _generate_recommendations(self, issues: List[CodeIssue], quality_score: float) -> List[str]:
        recommendations = []
        
        if quality_score < 50:
            recommendations.append("Quality score is low. Prioritize addressing critical and high severity issues.")
        
        critical_issues = [i for i in issues if i.severity == IssueSeverity.CRITICAL]
        if critical_issues:
            recommendations.append(f"Address {len(critical_issues)} critical issues immediately")
        
        security_issues = [i for i in issues if i.category == IssueCategory.SECURITY]
        if security_issues:
            recommendations.append("Conduct security review and implement secure coding practices")
        
        performance_issues = [i for i in issues if i.category == IssueCategory.PERFORMANCE]
        if len(performance_issues) > 10:
            recommendations.append("Consider performance profiling to validate and prioritize performance issues")
        
        code_smells = [i for i in issues if i.category == IssueCategory.CODE_SMELL]
        if len(code_smells) > 20:
            recommendations.append("High number of code smells. Implement regular refactoring sessions")
        
        return recommendations
    
    def generate_hierarchical_summary(self, issues: List[CodeIssue]) -> Dict[str, Any]:
        repo_summary = self.summarize_repository(issues)
        
        directories = set()
        for file_path in self.code_storage.files.keys():
            dir_path = os.path.dirname(file_path)
            while dir_path:
                directories.add(dir_path)
                parent = os.path.dirname(dir_path)
                if parent == dir_path:
                    break
                dir_path = parent
        
        dir_summaries = {}
        for directory in sorted(directories):
            summary = self.summarize_directory(directory, issues)
            if summary:
                dir_summaries[directory] = summary.to_dict()
        
        file_summaries = {}
        for file_path in self.code_storage.files.keys():
            summary = self.summarize_file(file_path, issues)
            if summary:
                file_summaries[file_path] = summary.to_dict()
        
        return {
            'repository': repo_summary.to_dict(),
            'directories': dir_summaries,
            'files': file_summaries,
            'summary_timestamp': datetime.now().isoformat()
        }

This Summarization System implementation provides multi-level reporting capabilities. The FileSummary captures metrics and issues for individual files, highlighting the top issues by severity. The DirectorySummary aggregates information across all files in a directory, identifying the most problematic files. The RepositorySummary provides a high-level view of the entire codebase.

The quality score calculation uses a weighted penalty system where critical issues have much higher impact than low severity issues. The score is normalized by the number of files to make it comparable across projects of different sizes.

The top concerns identification uses heuristics to surface the most important patterns. If more than forty percent of issues are code smells, this suggests systemic quality problems. If there are multiple architectural issues, this suggests need for architectural review.

The recommendations are actionable and prioritized. Critical issues always come first. Security issues trigger recommendations for security review. High volumes of code smells trigger recommendations for refactoring sessions.

The hierarchical summary generation creates a complete picture at all levels, allowing users to drill down from repository to directory to file level.

COMPONENT TWELVE: MULTITHREADING AND PARALLEL PROCESSING

Multithreading and parallel processing enable efficient analysis of large codebases by analyzing multiple files simultaneously. Sequential analysis of hundreds or thousands of files would take too long. Parallel processing can dramatically reduce analysis time.

The multithreading system must handle several challenges. First, thread safety when multiple threads access shared data structures. Second, load balancing to ensure all threads have roughly equal work. Third, error handling to prevent one failed file from crashing the entire analysis. Fourth, result aggregation to combine findings from multiple threads.

Here is the implementation of the multithreading system:

import threading
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Optional, Set, Tuple, Any, Callable


class ThreadSafeIssueCollector:
    def __init__(self):
        self.issues: List[CodeIssue] = []
        self.lock = threading.Lock()
    
    def add_issue(self, issue: CodeIssue):
        with self.lock:
            self.issues.append(issue)
    
    def add_issues(self, issues: List[CodeIssue]):
        with self.lock:
            self.issues.extend(issues)
    
    def get_all_issues(self) -> List[CodeIssue]:
        with self.lock:
            return self.issues.copy()
    
    def get_count(self) -> int:
        with self.lock:
            return len(self.issues)


class ParallelAnalyzer:
    def __init__(self, code_storage: EnhancedCodeStorage, max_workers: Optional[int] = None):
        self.code_storage = code_storage
        self.max_workers = max_workers or min(32, (os.cpu_count() or 1) + 4)
        
        self.smell_detector = CodeSmellDetector()
        self.security_analyzer = SecurityAnalyzer()
        self.performance_analyzer = PerformanceAnalyzer()
        
        self.issue_collector = ThreadSafeIssueCollector()
        self.progress_lock = threading.Lock()
        self.files_processed = 0
        self.total_files = 0
    
    def analyze_all_files(self, progress_callback: Optional[Callable[[int, int], None]] = None) -> List[CodeIssue]:
        self.issue_collector = ThreadSafeIssueCollector()
        self.files_processed = 0
        
        file_paths = list(self.code_storage.files.keys())
        self.total_files = len(file_paths)
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_file = {
                executor.submit(self._analyze_single_file, file_path): file_path
                for file_path in file_paths
            }
            
            for future in as_completed(future_to_file):
                file_path = future_to_file[future]
                
                try:
                    issues = future.result()
                    self.issue_collector.add_issues(issues)
                    
                    with self.progress_lock:
                        self.files_processed += 1
                        if progress_callback:
                            progress_callback(self.files_processed, self.total_files)
                
                except Exception as e:
                    print(f"Error analyzing {file_path}: {e}")
                    with self.progress_lock:
                        self.files_processed += 1
        
        return self.issue_collector.get_all_issues()
    
    def _analyze_single_file(self, file_path: str) -> List[CodeIssue]:
        issues = []
        
        file_record = self.code_storage.get_file_by_path(file_path)
        if not file_record:
            return issues
        
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
        except:
            return issues
        
        ast_data = self.code_storage.get_ast_data(file_path)
        
        smell_issues = self.smell_detector.analyze_file(file_record, ast_data)
        issues.extend(smell_issues)
        
        security_issues = self.security_analyzer.analyze_file(file_record, content)
        issues.extend(security_issues)
        
        performance_issues = self.performance_analyzer.analyze_file(file_record, content, ast_data)
        issues.extend(performance_issues)
        
        return issues
    
    def analyze_files_by_language(self, language: str) -> List[CodeIssue]:
        self.issue_collector = ThreadSafeIssueCollector()
        self.files_processed = 0
        
        file_records = self.code_storage.get_files_by_language(language)
        file_paths = [record.metadata.file_path for record in file_records]
        self.total_files = len(file_paths)
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_file = {
                executor.submit(self._analyze_single_file, file_path): file_path
                for file_path in file_paths
            }
            
            for future in as_completed(future_to_file):
                try:
                    issues = future.result()
                    self.issue_collector.add_issues(issues)
                except Exception as e:
                    print(f"Error in parallel analysis: {e}")
        
        return self.issue_collector.get_all_issues()


class BatchProcessor:
    def __init__(self, batch_size: int = 10):
        self.batch_size = batch_size
    
    def process_in_batches(self, items: List[Any], processor: Callable[[Any], Any],
                          max_workers: int = 4) -> List[Any]:
        results = []
        
        batches = [items[i:i + self.batch_size] for i in range(0, len(items), self.batch_size)]
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_batch = {
                executor.submit(self._process_batch, batch, processor): batch
                for batch in batches
            }
            
            for future in as_completed(future_to_batch):
                try:
                    batch_results = future.result()
                    results.extend(batch_results)
                except Exception as e:
                    print(f"Error processing batch: {e}")
        
        return results
    
    def _process_batch(self, batch: List[Any], processor: Callable[[Any], Any]) -> List[Any]:
        return [processor(item) for item in batch]

This multithreading implementation provides efficient parallel processing capabilities. The ThreadSafeIssueCollector uses a lock to ensure thread-safe access to the shared issues list. Multiple threads can add issues concurrently without race conditions.

The ParallelAnalyzer uses ThreadPoolExecutor to manage a pool of worker threads. Each worker analyzes a single file independently. The as_completed iterator processes results as they become available rather than waiting for all threads to finish.

The progress tracking uses a lock to safely update the files processed counter from multiple threads. The progress callback allows external code to monitor analysis progress, which is useful for displaying progress bars in user interfaces.

Error handling is robust. If one file fails to analyze, the exception is caught and logged, but analysis continues for other files. This prevents a single problematic file from stopping the entire analysis.

The BatchProcessor provides an alternative approach where items are processed in batches. This can be more efficient when there is overhead in starting each task, as batching amortizes that overhead across multiple items.

COMPONENT THIRTEEN: MULTI-AGENT ORCHESTRATION

Multi-Agent orchestration coordinates multiple specialized agents, each with its own LLM and context. Different agents focus on different aspects of the codebase, and their findings are synthesized into a comprehensive analysis.

The multi-agent system includes several specialized agents. The Python Agent focuses on Python-specific issues and idioms. The JavaScript Agent handles JavaScript and TypeScript. The Security Agent specializes in security analysis across all languages. The Architecture Agent evaluates high-level design. The Documentation Agent assesses code documentation quality.

Each agent can use a different LLM or the same LLM with different prompts and context. Agents can communicate with each other to resolve conflicts or combine insights.

Here is the implementation of multi-agent orchestration:

from typing import List, Dict, Optional, Set, Tuple, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod


@dataclass
class AgentContext:
    agent_name: str
    specialization: str
    supported_languages: List[str]
    analysis_focus: List[str]


class AnalysisAgent(ABC):
    def __init__(self, context: AgentContext):
        self.context = context
        self.findings: List[CodeIssue] = []
    
    @abstractmethod
    def analyze(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        pass
    
    @abstractmethod
    def can_handle(self, file_record: CodeFileRecord) -> bool:
        pass
    
    def get_findings(self) -> List[CodeIssue]:
        return self.findings


class PythonAgent(AnalysisAgent):
    def __init__(self):
        context = AgentContext(
            agent_name="Python Specialist",
            specialization="Python code analysis",
            supported_languages=["Python"],
            analysis_focus=["pythonic idioms", "type hints", "decorators", "async/await"]
        )
        super().__init__(context)
        self.smell_detector = CodeSmellDetector()
    
    def can_handle(self, file_record: CodeFileRecord) -> bool:
        return file_record.metadata.language == "Python"
    
    def analyze(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        issues = []
        
        ast_data = None
        
        base_issues = self.smell_detector.analyze_file(file_record, ast_data)
        issues.extend(base_issues)
        
        python_specific = self._check_python_idioms(file_record, content)
        issues.extend(python_specific)
        
        self.findings = issues
        return issues
    
    def _check_python_idioms(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        issues = []
        lines = content.split('\n')
        
        for i, line in enumerate(lines):
            if re.search(r'range\(len\(', line):
                issue = CodeIssue(
                    id=f"python_idiom_{file_record.metadata.file_path}_{i+1}",
                    category=IssueCategory.BEST_PRACTICE,
                    severity=IssueSeverity.LOW,
                    title="Non-Pythonic Iteration",
                    description="Using range(len()) instead of direct iteration or enumerate",
                    file_path=file_record.metadata.file_path,
                    line_number=i + 1,
                    code_snippet=line.strip(),
                    recommendation="Use 'for item in collection' or 'for i, item in enumerate(collection)' instead",
                    references=["PEP 8", "Python Best Practices"]
                )
                issues.append(issue)
        
        return issues


class JavaScriptAgent(AnalysisAgent):
    def __init__(self):
        context = AgentContext(
            agent_name="JavaScript/TypeScript Specialist",
            specialization="JavaScript and TypeScript analysis",
            supported_languages=["JavaScript", "TypeScript"],
            analysis_focus=["async patterns", "promises", "ES6+ features", "type safety"]
        )
        super().__init__(context)
        self.smell_detector = CodeSmellDetector()
    
    def can_handle(self, file_record: CodeFileRecord) -> bool:
        return file_record.metadata.language in ["JavaScript", "TypeScript"]
    
    def analyze(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        issues = []
        
        base_issues = self.smell_detector.analyze_file(file_record, None)
        issues.extend(base_issues)
        
        js_specific = self._check_javascript_patterns(file_record, content)
        issues.extend(js_specific)
        
        self.findings = issues
        return issues
    
    def _check_javascript_patterns(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        issues = []
        lines = content.split('\n')
        
        for i, line in enumerate(lines):
            if re.search(r'\bvar\s+', line):
                issue = CodeIssue(
                    id=f"js_var_{file_record.metadata.file_path}_{i+1}",
                    category=IssueCategory.BEST_PRACTICE,
                    severity=IssueSeverity.LOW,
                    title="Use of 'var' keyword",
                    description="Using 'var' instead of 'let' or 'const'",
                    file_path=file_record.metadata.file_path,
                    line_number=i + 1,
                    code_snippet=line.strip(),
                    recommendation="Use 'const' for values that don't change, 'let' for values that do",
                    references=["ES6 Best Practices"]
                )
                issues.append(issue)
        
        return issues


class SecuritySpecialistAgent(AnalysisAgent):
    def __init__(self):
        context = AgentContext(
            agent_name="Security Specialist",
            specialization="Security vulnerability detection",
            supported_languages=["Python", "JavaScript", "TypeScript", "Java", "C#"],
            analysis_focus=["injection", "XSS", "authentication", "encryption", "secrets"]
        )
        super().__init__(context)
        self.security_analyzer = SecurityAnalyzer()
    
    def can_handle(self, file_record: CodeFileRecord) -> bool:
        return file_record.metadata.language in self.context.supported_languages
    
    def analyze(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        issues = self.security_analyzer.analyze_file(file_record, content)
        self.findings = issues
        return issues


class ArchitectureSpecialistAgent(AnalysisAgent):
    def __init__(self, graph: CodeGraph, context_storage: ProjectContextStorage):
        context = AgentContext(
            agent_name="Architecture Specialist",
            specialization="Architectural analysis",
            supported_languages=["all"],
            analysis_focus=["dependencies", "layering", "patterns", "principles"]
        )
        super().__init__(context)
        self.graph = graph
        self.context_storage = context_storage
        self.arch_analyzer = ArchitectureAnalyzer(graph, context_storage)
    
    def can_handle(self, file_record: CodeFileRecord) -> bool:
        return True
    
    def analyze(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        return []
    
    def analyze_architecture(self) -> List[CodeIssue]:
        issues = self.arch_analyzer.analyze_architecture()
        self.findings = issues
        return issues


class DocumentationAgent(AnalysisAgent):
    def __init__(self):
        context = AgentContext(
            agent_name="Documentation Specialist",
            specialization="Documentation quality assessment",
            supported_languages=["all"],
            analysis_focus=["docstrings", "comments", "API documentation", "README"]
        )
        super().__init__(context)
    
    def can_handle(self, file_record: CodeFileRecord) -> bool:
        return True
    
    def analyze(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        issues = []
        
        doc_issues = self._check_documentation(file_record, content)
        issues.extend(doc_issues)
        
        self.findings = issues
        return issues
    
    def _check_documentation(self, file_record: CodeFileRecord, content: str) -> List[CodeIssue]:
        issues = []
        
        public_functions = [
            e for e in file_record.entities
            if e.entity_type in ['function', 'method'] and not e.name.startswith('_')
        ]
        
        for func in public_functions:
            if not func.docstring:
                issue = CodeIssue(
                    id=f"missing_doc_{file_record.metadata.file_path}_{func.line_number}",
                    category=IssueCategory.BEST_PRACTICE,
                    severity=IssueSeverity.LOW,
                    title="Missing Documentation",
                    description=f"Public function '{func.name}' lacks documentation",
                    file_path=file_record.metadata.file_path,
                    line_number=func.line_number,
                    recommendation="Add a docstring describing the function's purpose, parameters, and return value",
                    references=["PEP 257", "Documentation Best Practices"]
                )
                issues.append(issue)
        
        return issues


class MultiAgentOrchestrator:
    def __init__(self, code_storage: EnhancedCodeStorage, graph: CodeGraph,
                 context_storage: ProjectContextStorage):
        self.code_storage = code_storage
        self.graph = graph
        self.context_storage = context_storage
        
        self.agents: List[AnalysisAgent] = []
        self._initialize_agents()
        
        self.all_issues: List[CodeIssue] = []
        self.agent_contributions: Dict[str, List[CodeIssue]] = {}
    
    def _initialize_agents(self):
        self.agents.append(PythonAgent())
        self.agents.append(JavaScriptAgent())
        self.agents.append(SecuritySpecialistAgent())
        self.agents.append(ArchitectureSpecialistAgent(self.graph, self.context_storage))
        self.agents.append(DocumentationAgent())
    
    def analyze_with_agents(self) -> List[CodeIssue]:
        self.all_issues = []
        self.agent_contributions = {}
        
        for file_path, file_record in self.code_storage.files.items():
            try:
                with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                    content = f.read()
            except:
                content = ""
            
            for agent in self.agents:
                if agent.can_handle(file_record):
                    issues = agent.analyze(file_record, content)
                    
                    if agent.context.agent_name not in self.agent_contributions:
                        self.agent_contributions[agent.context.agent_name] = []
                    self.agent_contributions[agent.context.agent_name].extend(issues)
                    
                    self.all_issues.extend(issues)
        
        arch_agent = next((a for a in self.agents if isinstance(a, ArchitectureSpecialistAgent)), None)
        if arch_agent:
            arch_issues = arch_agent.analyze_architecture()
            self.agent_contributions[arch_agent.context.agent_name] = arch_issues
            self.all_issues.extend(arch_issues)
        
        self.all_issues = self._deduplicate_issues(self.all_issues)
        
        return self.all_issues
    
    def _deduplicate_issues(self, issues: List[CodeIssue]) -> List[CodeIssue]:
        seen = set()
        unique_issues = []
        
        for issue in issues:
            key = (issue.file_path, issue.line_number, issue.title)
            if key not in seen:
                seen.add(key)
                unique_issues.append(issue)
        
        return unique_issues
    
    def get_agent_report(self) -> Dict[str, Any]:
        return {
            'total_issues': len(self.all_issues),
            'agents_used': len(self.agents),
            'contributions': {
                agent_name: len(issues)
                for agent_name, issues in self.agent_contributions.items()
            },
            'agent_details': [
                {
                    'name': agent.context.agent_name,
                    'specialization': agent.context.specialization,
                    'languages': agent.context.supported_languages,
                    'focus': agent.context.analysis_focus,
                    'findings': len(agent.get_findings())
                }
                for agent in self.agents
            ]
        }

This multi-agent orchestration implementation demonstrates how to coordinate specialized agents. Each agent has its own context and expertise. The PythonAgent understands Python-specific idioms. The JavaScriptAgent knows JavaScript best practices. The SecuritySpecialistAgent focuses solely on security across all languages.

The can_handle method allows each agent to determine whether it should analyze a given file. This enables flexible agent assignment based on file type, language, or other criteria.

The MultiAgentOrchestrator coordinates all agents, routing files to appropriate agents and collecting findings. It tracks which agent contributed which issues, enabling attribution and analysis of agent effectiveness.

The deduplication step removes duplicate issues that might be detected by multiple agents. For example, both the PythonAgent and u might detect the same hardcoded credential.

The agent report provides transparency about which agents were used and how much each contributed to the final analysis.

PRODUCTION-READY APPLICATION

Now we will integrate all components into a complete production-ready code analysis application. This application provides a command-line interface, configuration management, comprehensive reporting, and all the features we have implemented.

import sys
import argparse
import json
from pathlib import Path
from typing import Optional


class CodeAnalysisApplication:
    def __init__(self, config_path: Optional[str] = None):
        self.config = self._load_config(config_path)
        
        self.user_agent = UserInteractionAgent()
        self.adr_storage = ADRStorage()
        self.code_storage = EnhancedCodeStorage()
        self.context_storage = ProjectContextStorage()
        self.graph = CodeGraph()
        
        self.git_repo: Optional[GitRepository] = None
        self.change_agent: Optional[ChangeAgent] = None
        
        self.orchestrator: Optional[AnalysisOrchestrator] = None
        self.multi_agent_orchestrator: Optional[MultiAgentOrchestrator] = None
        self.parallel_analyzer: Optional[ParallelAnalyzer] = None
        
        self.meta_reflection: Optional[MetaReflectionLayer] = None
        self.cot_analyzer: Optional[ChainOfThoughtAnalyzer] = None
        self.tot_analyzer: Optional[TreeOfThoughtAnalyzer] = None
        
        self.summarization: Optional[SummarizationSystem] = None
        
        self.analysis_context: Optional[AnalysisContext] = None
        self.all_issues: List[CodeIssue] = []
    
    def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
        default_config = {
            'max_workers': 8,
            'enable_parallel': True,
            'enable_multi_agent': True,
            'enable_meta_reflection': True,
            'enable_cot_tot': True,
            'output_format': 'json',
            'output_dir': './analysis_results'
        }
        
        if config_path and os.path.exists(config_path):
            with open(config_path, 'r') as f:
                user_config = json.load(f)
            default_config.update(user_config)
        
        return default_config
    
    def run_interactive(self):
        print("Starting Code Analysis Application")
        print()
        
        self.analysis_context = self.user_agent.start_interaction()
        
        self._initialize_components()
        
        self._load_context_data()
        
        self._ingest_code()
        
        self._perform_analysis()
        
        self._generate_reports()
        
        print()
        print("Analysis complete!")
    
    def run_automated(self, repo_path: str, requirements_path: Optional[str] = None,
                     architecture_path: Optional[str] = None):
        print(f"Running automated analysis on {repo_path}")
        
        self.analysis_context = AnalysisContext()
        self.analysis_context.code_location = repo_path
        self.analysis_context.is_git_repo = os.path.isdir(os.path.join(repo_path, '.git'))
        
        if requirements_path and os.path.exists(requirements_path):
            with open(requirements_path, 'r') as f:
                self.analysis_context.requirements = f.read()
            self.analysis_context.has_requirements = True
        
        if architecture_path and os.path.isdir(architecture_path):
            self.analysis_context.architecture_docs = architecture_path
            self.analysis_context.has_architecture = True
        
        self._initialize_components()
        self._load_context_data()
        self._ingest_code()
        self._perform_analysis()
        self._generate_reports()
        
        print("Automated analysis complete!")
    
    def _initialize_components(self):
        print("Initializing analysis components...")
        
        if self.analysis_context.is_git_repo:
            self.git_repo = GitRepository(self.analysis_context.code_location)
        
        self.orchestrator = AnalysisOrchestrator(
            self.code_storage,
            self.graph,
            self.context_storage
        )
        
        if self.config['enable_multi_agent']:
            self.multi_agent_orchestrator = MultiAgentOrchestrator(
                self.code_storage,
                self.graph,
                self.context_storage
            )
        
        if self.config['enable_parallel']:
            self.parallel_analyzer = ParallelAnalyzer(
                self.code_storage,
                max_workers=self.config['max_workers']
            )
        
        if self.config['enable_meta_reflection']:
            self.meta_reflection = MetaReflectionLayer(
                self.code_storage,
                self.graph,
                self.context_storage,
                self.adr_storage
            )
        
        if self.config['enable_cot_tot']:
            self.cot_analyzer = ChainOfThoughtAnalyzer(
                self.code_storage,
                self.graph,
                self.context_storage
            )
            self.tot_analyzer = TreeOfThoughtAnalyzer(
                self.code_storage,
                self.graph,
                self.context_storage
            )
        
        self.summarization = SummarizationSystem(self.code_storage)
        
        if self.git_repo and self.orchestrator:
            self.change_agent = ChangeAgent(
                self.git_repo,
                self.code_storage,
                self.graph,
                self.orchestrator
            )
        
        print("Components initialized")
    
    def _load_context_data(self):
        print("Loading context data...")
        
        if self.analysis_context.has_architecture and self.analysis_context.architecture_docs:
            if os.path.isdir(self.analysis_context.architecture_docs):
                self.adr_storage.ingest_adr_directory(self.analysis_context.architecture_docs)
            elif os.path.isfile(self.analysis_context.architecture_docs):
                self.adr_storage.ingest_adr_file(self.analysis_context.architecture_docs)
        
        if self.analysis_context.has_principles and self.analysis_context.architectural_principles:
            for principle_name in self.analysis_context.architectural_principles:
                if "SOLID" in principle_name.upper():
                    self.context_storage.load_solid_principles()
                    break
        
        if self.analysis_context.has_business_goals and self.analysis_context.business_goals:
            self.context_storage.add_domain_knowledge(
                title="Business Goals",
                content=self.analysis_context.business_goals,
                domain="business",
                tags=['goals', 'strategy']
            )
        
        print("Context data loaded")
    
    def _ingest_code(self):
        print("Ingesting code files...")
        
        if self.analysis_context.code_location:
            if os.path.isfile(self.analysis_context.code_location):
                self.code_storage.ingest_file_with_ast(self.analysis_context.code_location)
                print(f"Ingested 1 file")
            elif os.path.isdir(self.analysis_context.code_location):
                count = self.code_storage.ingest_directory(self.analysis_context.code_location)
                print(f"Ingested {count} files")
        
        print("Building code graph...")
        graph_builder = GraphBuilder(self.code_storage)
        self.graph = graph_builder.build_graph()
        stats = self.graph.get_statistics()
        print(f"Graph built: {stats['total_nodes']} nodes, {stats['total_edges']} edges")
    
    def _perform_analysis(self):
        print()
        print("Performing code analysis...")
        
        if self.config['enable_multi_agent'] and self.multi_agent_orchestrator:
            print("Using multi-agent analysis...")
            self.all_issues = self.multi_agent_orchestrator.analyze_with_agents()
        elif self.config['enable_parallel'] and self.parallel_analyzer:
            print("Using parallel analysis...")
            
            def progress_callback(processed, total):
                percent = (processed / total) * 100
                print(f"\rProgress: {processed}/{total} files ({percent:.1f}%)", end='')
            
            self.all_issues = self.parallel_analyzer.analyze_all_files(progress_callback)
            print()
        else:
            print("Using sequential analysis...")
            self.all_issues = self.orchestrator.analyze_all()
        
        print(f"Found {len(self.all_issues)} issues")
        
        if self.config['enable_meta_reflection'] and self.meta_reflection:
            print("Performing meta-reflection...")
            reflections = self.meta_reflection.reflect_on_issues(self.all_issues)
            review_report = self.meta_reflection.generate_review_report(self.all_issues, reflections)
            print(f"Confidence score: {review_report['summary']['confidence_score']:.2f}")
            print(f"Issues needing review: {review_report['issues_needing_review']}")
    
    def _generate_reports(self):
        print()
        print("Generating reports...")
        
        output_dir = Path(self.config['output_dir'])
        output_dir.mkdir(parents=True, exist_ok=True)
        
        summary = self.summarization.generate_hierarchical_summary(self.all_issues)
        
        summary_file = output_dir / 'summary.json'
        with open(summary_file, 'w') as f:
            json.dump(summary, f, indent=2)
        print(f"Summary report: {summary_file}")
        
        issues_file = output_dir / 'issues.json'
        with open(issues_file, 'w') as f:
            json.dump([issue.to_dict() for issue in self.all_issues], f, indent=2)
        print(f"Issues report: {issues_file}")
        
        if self.multi_agent_orchestrator:
            agent_report = self.multi_agent_orchestrator.get_agent_report()
            agent_file = output_dir / 'agents.json'
            with open(agent_file, 'w') as f:
                json.dump(agent_report, f, indent=2)
            print(f"Agent report: {agent_file}")
        
        if self.meta_reflection:
            reflections = self.meta_reflection.reflect_on_issues(self.all_issues)
            review_report = self.meta_reflection.generate_review_report(self.all_issues, reflections)
            review_file = output_dir / 'review.json'
            with open(review_file, 'w') as f:
                json.dump(review_report, f, indent=2)
            print(f"Review report: {review_file}")
        
        self._generate_text_report(output_dir)
    
    def _generate_text_report(self, output_dir: Path):
        report_file = output_dir / 'report.txt'
        
        with open(report_file, 'w') as f:
            f.write("=" * 80 + "\n")
            f.write("CODE ANALYSIS REPORT\n")
            f.write("=" * 80 + "\n\n")
            
            f.write("SUMMARY\n")
            f.write("-" * 80 + "\n")
            f.write(f"Total Files Analyzed: {len(self.code_storage.files)}\n")
            f.write(f"Total Issues Found: {len(self.all_issues)}\n")
            f.write(f"Critical Issues: {len([i for i in self.all_issues if i.severity == IssueSeverity.CRITICAL])}\n")
            f.write(f"High Severity Issues: {len([i for i in self.all_issues if i.severity == IssueSeverity.HIGH])}\n")
            f.write("\n")
            
            if self.analysis_context:
                f.write("ANALYSIS CONTEXT\n")
                f.write("-" * 80 + "\n")
                f.write(f"Requirements Available: {'Yes' if self.analysis_context.has_requirements else 'No'}\n")
                f.write(f"Business Goals Available: {'Yes' if self.analysis_context.has_business_goals else 'No'}\n")
                f.write(f"Architecture Docs Available: {'Yes' if self.analysis_context.has_architecture else 'No'}\n")
                f.write(f"Principles Defined: {'Yes' if self.analysis_context.has_principles else 'No'}\n")
                f.write("\n")
            
            critical_issues = [i for i in self.all_issues if i.severity == IssueSeverity.CRITICAL]
            if critical_issues:
                f.write("CRITICAL ISSUES\n")
                f.write("-" * 80 + "\n")
                for issue in critical_issues[:10]:
                    f.write(f"\n{issue.title}\n")
                    f.write(f"File: {issue.file_path}:{issue.line_number}\n")
                    f.write(f"Category: {issue.category.value}\n")
                    f.write(f"Description: {issue.description}\n")
                    if issue.recommendation:
                        f.write(f"Recommendation: {issue.recommendation}\n")
                f.write("\n")
            
            f.write("=" * 80 + "\n")
        
        print(f"Text report: {report_file}")


def main():
    parser = argparse.ArgumentParser(description='Code Analysis with LLMs')
    parser.add_argument('--interactive', action='store_true', help='Run in interactive mode')
    parser.add_argument('--repo', type=str, help='Repository path for automated analysis')
    parser.add_argument('--requirements', type=str, help='Path to requirements document')
    parser.add_argument('--architecture', type=str, help='Path to architecture documentation')
    parser.add_argument('--config', type=str, help='Path to configuration file')
    
    args = parser.parse_args()
    
    app = CodeAnalysisApplication(config_path=args.config)
    
    if args.interactive:
        app.run_interactive()
    elif args.repo:
        app.run_automated(
            repo_path=args.repo,
            requirements_path=args.requirements,
            architecture_path=args.architecture
        )
    else:
        print("Please specify either --interactive or --repo")
        sys.exit(1)


if __name__ == '__main__':
    main()

This production-ready application integrates all components into a cohesive system. It supports both interactive and automated modes. Interactive mode walks users through context gathering and configuration. Automated mode accepts command-line arguments for batch processing.

The application follows a clear workflow: initialize components, load context data, ingest code, perform analysis, and generate reports. Each step is modular and can be configured through the configuration file.

The configuration system allows users to enable or disable features like parallel processing, multi-agent analysis, and meta-reflection. This flexibility supports different use cases from quick scans to deep comprehensive analysis.

The reporting system generates multiple output formats including JSON for programmatic consumption and text for human readability. The hierarchical summary provides insights at repository, directory, and file levels.

CONCLUSION AND SUMMARY

We have built a comprehensive code analysis system using large language models and supporting technologies. This system addresses all the limitations of LLMs when applied to code analysis through a multi-faceted approach.

The User Interaction Agent gathers essential context including requirements, business goals, architecture documentation, principles, and coding conventions. This context enables the analysis to evaluate code not just against generic best practices but against project-specific standards and intentions.

The RAG Storage Systems maintain multiple knowledge bases for ADRs, source code files, and project context. These enable retrieval of relevant information without overwhelming the LLM context window. The GraphRAG system represents code relationships as a graph, enabling sophisticated queries about dependencies, inheritance, and impact.

The Abstract Syntax Tree parsers provide precise code understanding for Python, JavaScript, and TypeScript, with a framework for adding additional languages. AST parsing enables accurate detection of code patterns and relationships that would be difficult or impossible with text-based analysis alone.

The Specialized Analysis Agents focus on specific quality aspects including code smells, security vulnerabilities, performance issues, and architectural concerns. Each agent applies domain-specific knowledge and techniques to produce high-quality findings.

The Change Agent integrates with Git to enable incremental analysis, tracking what changed and intelligently determining what needs re-analysis. This makes the system practical for continuous integration pipelines.

The Meta-Reflection Layer evaluates the quality and confidence of findings, identifying areas of uncertainty and determining when human review is needed. This prevents blind acceptance of automated findings and sets appropriate expectations.

The Chain of Thought and Tree of Thought reasoning enable deeper analysis of complex issues by breaking down reasoning into steps and exploring multiple perspectives. This mirrors how human experts think about architectural trade-offs.

The Summarization System provides multi-level reporting from individual files up to the entire repository, making findings actionable at all organizational levels.

The Multithreading system enables efficient parallel processing of large codebases, making the analysis practical for real-world projects with thousands of files.

The Multi-Agent Orchestration coordinates specialized agents, each potentially using different LLMs or prompts, to provide comprehensive analysis that leverages the strengths of each agent.

Together, these components create a production-ready code analysis system that overcomes the fundamental limitations of LLMs through intelligent architecture, comprehensive context management, and sophisticated reasoning techniques. The system provides verifiable, actionable findings with appropriate confidence levels and clear recommendations.

This system can be extended in several ways. Additional language support can be added by implementing new AST parsers. New specialized agents can be created for specific domains or technologies. Integration with development tools like IDEs and CI/CD systems can make the analysis part of the development workflow. Fine-tuning of LLMs on project-specific code can improve accuracy for domain-specific patterns.

The key insight is that effective LLM-based code analysis requires not just a powerful language model, but a comprehensive system that provides context, manages limitations, and applies domain knowledge. This tutorial has demonstrated how to build such a system.

No comments: