Introduction and Problem Statement
Modern software development environments increasingly require sophisticated code analysis tools that can understand and process multiple programming languages simultaneously. Traditional static analysis tools often operate in isolation, focusing on single languages or specific aspects of code quality. However, contemporary software systems are polyglot in nature, incorporating multiple programming languages, frameworks, and architectural patterns within the same codebase.
The challenge lies in creating a unified analysis system that can leverage the power of Large Language Models while maintaining efficiency, accuracy, and scalability across diverse programming languages including Python, C, C++, Java, C#, Rust, Go, JavaScript, and TypeScript. This system must not only understand syntax and semantics but also capture complex relationships between code artifacts, design patterns, and architectural decisions.
The core problems we address include efficient code representation and chunking strategies, relationship modeling between code entities, context-aware analysis using Retrieval-Augmented Generation, LLM-based reasoning and insight generation, and optimization techniques for large-scale codebases. Each of these problems requires careful consideration of trade-offs between accuracy, performance, and resource utilization.
System Architecture Overview
The proposed LLM-based code analyzer follows a modular architecture designed around six core subsystems. The Language Processing Pipeline handles multi-language parsing and normalization. The Intelligent Chunking Engine implements syntactic and semantic segmentation strategies. The GraphRAG Knowledge Store maintains relationships between code entities. The LLM Analysis Engine performs reasoning and insight generation using Large Language Models. The Context-Aware Analysis Engine manages context optimization and memory usage. Finally, the Query and Retrieval Interface provides user-facing functionality.
This architecture emphasizes separation of concerns while enabling tight integration between components. The system operates on the principle of progressive refinement, where initial syntactic analysis informs semantic understanding, which in turn enables relationship extraction and contextual reasoning. The LLM serves as the central reasoning engine that transforms structured code representations into meaningful insights and actionable recommendations.
Problem 1: Multi-Language Parsing and Normalization
Problem Description
Different programming languages exhibit varying syntactic structures, semantic models, and paradigmatic approaches. Creating a unified representation that preserves language-specific nuances while enabling cross-language analysis presents significant challenges. The system must handle languages with different compilation models, type systems, and execution environments.
Solution Architecture
The Language Processing Pipeline employs a plugin-based architecture where each supported language implements a common interface. This design allows for language-specific optimizations while maintaining system coherence.
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class LanguageType(Enum):
PYTHON = "python"
C = "c"
CPP = "cpp"
JAVA = "java"
CSHARP = "csharp"
RUST = "rust"
GO = "go"
JAVASCRIPT = "javascript"
TYPESCRIPT = "typescript"
@dataclass
class CodeEntity:
"""Represents a normalized code entity across all languages."""
entity_type: str
name: str
signature: Optional[str]
body: str
start_line: int
end_line: int
language: LanguageType
metadata: Dict[str, Any]
dependencies: List[str]
complexity_metrics: Dict[str, float]
class LanguageProcessor(ABC):
"""Abstract base class for language-specific processors."""
@abstractmethod
def parse_file(self, file_path: str, content: str) -> List[CodeEntity]:
"""Parse a source file and extract code entities."""
pass
@abstractmethod
def extract_dependencies(self, entity: CodeEntity) -> List[str]:
"""Extract dependencies for a given code entity."""
pass
@abstractmethod
def calculate_complexity(self, entity: CodeEntity) -> Dict[str, float]:
"""Calculate complexity metrics for the entity."""
pass
@abstractmethod
def normalize_syntax(self, code: str) -> str:
"""Normalize language-specific syntax for cross-language analysis."""
pass
The normalization process addresses several key challenges. First, it standardizes naming conventions across languages, converting between different case styles and identifier patterns. Second, it creates unified representations for common programming constructs such as functions, classes, and modules. Third, it extracts and normalizes type information where available, creating consistent type signatures even for dynamically typed languages.
Implementation Strategy
Each language processor implements sophisticated parsing logic tailored to the specific language characteristics. For example, the Python processor handles dynamic typing and runtime binding, while the C++ processor manages template instantiation and namespace resolution.
class PythonProcessor(LanguageProcessor):
"""Python-specific code processor with AST analysis."""
def __init__(self):
self.ast_parser = ast
self.complexity_calculator = PythonComplexityCalculator()
def parse_file(self, file_path: str, content: str) -> List[CodeEntity]:
"""Parse Python file using AST analysis."""
try:
tree = self.ast_parser.parse(content)
entities = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
entity = self._extract_function_entity(node, file_path)
entities.append(entity)
elif isinstance(node, ast.ClassDef):
entity = self._extract_class_entity(node, file_path)
entities.append(entity)
elif isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom):
entity = self._extract_import_entity(node, file_path)
entities.append(entity)
return entities
except SyntaxError as e:
raise ParseError(f"Python syntax error in {file_path}: {e}")
def _extract_function_entity(self, node: ast.FunctionDef, file_path: str) -> CodeEntity:
"""Extract function entity with comprehensive metadata."""
signature = self._build_function_signature(node)
body = ast.unparse(node) if hasattr(ast, 'unparse') else ast.dump(node)
dependencies = self._extract_function_dependencies(node)
complexity = self.complexity_calculator.calculate_function_complexity(node)
metadata = {
'decorators': [ast.unparse(dec) for dec in node.decorator_list],
'arguments': [arg.arg for arg in node.args.args],
'return_annotation': ast.unparse(node.returns) if node.returns else None,
'docstring': ast.get_docstring(node),
'file_path': file_path
}
return CodeEntity(
entity_type='function',
name=node.name,
signature=signature,
body=body,
start_line=node.lineno,
end_line=node.end_lineno or node.lineno,
language=LanguageType.PYTHON,
metadata=metadata,
dependencies=dependencies,
complexity_metrics=complexity
)
The rationale for this approach centers on maintaining language fidelity while enabling cross-language analysis. By preserving language-specific metadata while creating normalized representations, the system can perform both deep language-specific analysis and broad architectural assessment.
Problem 2: Intelligent Syntactic and Semantic Chunking
Problem Description
Effective code analysis requires intelligent segmentation of source code into meaningful chunks that preserve semantic coherence while fitting within LLM context windows. Traditional line-based or character-based chunking often breaks logical units, leading to degraded analysis quality. The system must balance chunk size optimization with semantic preservation across different programming paradigms.
Solution Architecture
The Intelligent Chunking Engine implements a multi-layered approach combining syntactic analysis, semantic understanding, and context optimization. The system employs three primary chunking strategies that operate in coordination.
from typing import List, Tuple, Set
from dataclasses import dataclass
import networkx as nx
@dataclass
class CodeChunk:
"""Represents a semantically coherent code chunk."""
chunk_id: str
content: str
entities: List[CodeEntity]
chunk_type: str
semantic_hash: str
dependencies: Set[str]
size_metrics: Dict[str, int]
context_priority: float
class ChunkingStrategy(ABC):
"""Abstract base class for chunking strategies."""
@abstractmethod
def create_chunks(self, entities: List[CodeEntity]) -> List[CodeChunk]:
"""Create chunks from code entities."""
pass
@abstractmethod
def optimize_chunk_size(self, chunks: List[CodeChunk], max_tokens: int) -> List[CodeChunk]:
"""Optimize chunk sizes for LLM processing."""
pass
class SemanticChunkingEngine:
"""Advanced chunking engine with semantic awareness."""
def __init__(self, max_chunk_tokens: int = 4000):
self.max_chunk_tokens = max_chunk_tokens
self.syntactic_chunker = SyntacticChunker()
self.semantic_chunker = SemanticChunker()
self.dependency_analyzer = DependencyAnalyzer()
self.token_estimator = TokenEstimator()
def create_optimized_chunks(self, entities: List[CodeEntity]) -> List[CodeChunk]:
"""Create optimized chunks using multi-strategy approach."""
# Phase 1: Syntactic chunking based on language constructs
syntactic_chunks = self.syntactic_chunker.create_chunks(entities)
# Phase 2: Semantic refinement based on logical cohesion
semantic_chunks = self.semantic_chunker.refine_chunks(syntactic_chunks)
# Phase 3: Dependency-aware optimization
dependency_graph = self.dependency_analyzer.build_dependency_graph(entities)
optimized_chunks = self._optimize_for_dependencies(semantic_chunks, dependency_graph)
# Phase 4: Token-based size optimization
final_chunks = self._optimize_token_usage(optimized_chunks)
return final_chunks
def _optimize_for_dependencies(self, chunks: List[CodeChunk],
dependency_graph: nx.DiGraph) -> List[CodeChunk]:
"""Optimize chunks based on dependency relationships."""
optimized_chunks = []
processed_entities = set()
# Group strongly connected components together
for component in nx.strongly_connected_components(dependency_graph):
if len(component) == 1:
continue
component_entities = [entity for entity in chunks
if entity.chunk_id in component and
entity.chunk_id not in processed_entities]
if component_entities:
merged_chunk = self._merge_related_chunks(component_entities)
optimized_chunks.append(merged_chunk)
processed_entities.update(entity.chunk_id for entity in component_entities)
# Add remaining individual chunks
for chunk in chunks:
if chunk.chunk_id not in processed_entities:
optimized_chunks.append(chunk)
return optimized_chunks
Syntactic Chunking Strategy
Syntactic chunking operates at the language construct level, identifying natural boundaries such as function definitions, class declarations, and module boundaries. This approach ensures that logical programming units remain intact during analysis.
class SyntacticChunker(ChunkingStrategy):
"""Chunker based on syntactic language constructs."""
def create_chunks(self, entities: List[CodeEntity]) -> List[CodeChunk]:
"""Create chunks based on syntactic boundaries."""
chunks = []
current_chunk_entities = []
current_size = 0
# Sort entities by file and line number for coherent chunking
sorted_entities = sorted(entities, key=lambda e: (e.metadata.get('file_path', ''), e.start_line))
for entity in sorted_entities:
entity_size = self._estimate_entity_size(entity)
# Check if adding this entity would exceed size limits
if current_size + entity_size > self.max_chunk_tokens and current_chunk_entities:
chunk = self._create_chunk_from_entities(current_chunk_entities, 'syntactic')
chunks.append(chunk)
current_chunk_entities = [entity]
current_size = entity_size
else:
current_chunk_entities.append(entity)
current_size += entity_size
# Create final chunk if entities remain
if current_chunk_entities:
chunk = self._create_chunk_from_entities(current_chunk_entities, 'syntactic')
chunks.append(chunk)
return chunks
def _estimate_entity_size(self, entity: CodeEntity) -> int:
"""Estimate token count for a code entity."""
# Rough estimation: 4 characters per token on average
content_tokens = len(entity.body) // 4
metadata_tokens = sum(len(str(v)) for v in entity.metadata.values()) // 4
return content_tokens + metadata_tokens + 50 # Buffer for structure
Semantic Chunking Strategy
Semantic chunking builds upon syntactic analysis by considering logical relationships between code entities. This strategy groups related functions, classes, and modules that work together to implement specific functionality.
class SemanticChunker:
"""Advanced semantic chunking with relationship analysis."""
def __init__(self):
self.similarity_calculator = CodeSimilarityCalculator()
self.cohesion_analyzer = CohesionAnalyzer()
def refine_chunks(self, syntactic_chunks: List[CodeChunk]) -> List[CodeChunk]:
"""Refine syntactic chunks using semantic analysis."""
refined_chunks = []
for chunk in syntactic_chunks:
if self._should_split_chunk(chunk):
sub_chunks = self._split_semantically(chunk)
refined_chunks.extend(sub_chunks)
elif self._can_merge_with_neighbors(chunk, refined_chunks):
last_chunk = refined_chunks.pop()
merged_chunk = self._merge_chunks(last_chunk, chunk)
refined_chunks.append(merged_chunk)
else:
refined_chunks.append(chunk)
return refined_chunks
def _should_split_chunk(self, chunk: CodeChunk) -> bool:
"""Determine if a chunk should be split based on semantic analysis."""
if len(chunk.entities) < 3:
return False
# Calculate semantic cohesion within the chunk
cohesion_score = self.cohesion_analyzer.calculate_cohesion(chunk.entities)
# Split if cohesion is low and chunk is large
return cohesion_score < 0.3 and chunk.size_metrics['tokens'] > 2000
def _split_semantically(self, chunk: CodeChunk) -> List[CodeChunk]:
"""Split a chunk based on semantic boundaries."""
entity_groups = self._cluster_entities_by_semantics(chunk.entities)
sub_chunks = []
for group in entity_groups:
if group: # Ensure group is not empty
sub_chunk = self._create_chunk_from_entities(group, 'semantic')
sub_chunks.append(sub_chunk)
return sub_chunks
The semantic chunking strategy employs clustering algorithms to group related entities based on shared functionality, naming patterns, and interaction frequency. This approach significantly improves the quality of LLM analysis by ensuring that related code elements are processed together.
Problem 3: GraphRAG Implementation for Code Relationships
Problem Description
Understanding code requires modeling complex relationships between entities across files, modules, and even programming languages. Traditional vector-based RAG systems lose important structural information about how code components interact. The system needs to capture and leverage these relationships for more accurate analysis and reasoning.
Solution Architecture
The GraphRAG Knowledge Store implements a sophisticated graph-based approach to storing and retrieving code relationships. The system models entities as nodes and relationships as edges, enabling complex queries that consider both semantic similarity and structural connectivity.
import networkx as nx
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass, field
from enum import Enum
class RelationshipType(Enum):
CALLS = "calls"
INHERITS = "inherits"
IMPLEMENTS = "implements"
IMPORTS = "imports"
DEPENDS_ON = "depends_on"
CONTAINS = "contains"
OVERRIDES = "overrides"
INSTANTIATES = "instantiates"
REFERENCES = "references"
@dataclass
class CodeRelationship:
"""Represents a relationship between code entities."""
source_entity_id: str
target_entity_id: str
relationship_type: RelationshipType
strength: float
context: Dict[str, Any]
metadata: Dict[str, Any] = field(default_factory=dict)
class GraphRAGStore:
"""Graph-based RAG store for code relationships."""
def __init__(self):
self.entity_graph = nx.MultiDiGraph()
self.entity_embeddings = {}
self.relationship_weights = {}
self.semantic_clusters = {}
self.embedding_model = CodeEmbeddingModel()
def add_entity(self, entity: CodeEntity) -> None:
"""Add a code entity to the graph store."""
entity_id = self._generate_entity_id(entity)
# Add node with comprehensive attributes
self.entity_graph.add_node(
entity_id,
entity_type=entity.entity_type,
name=entity.name,
language=entity.language.value,
file_path=entity.metadata.get('file_path', ''),
complexity=entity.complexity_metrics,
metadata=entity.metadata
)
# Generate and store embeddings
embedding = self.embedding_model.encode_entity(entity)
self.entity_embeddings[entity_id] = embedding
# Update semantic clusters
self._update_semantic_clusters(entity_id, embedding)
def add_relationship(self, relationship: CodeRelationship) -> None:
"""Add a relationship between code entities."""
self.entity_graph.add_edge(
relationship.source_entity_id,
relationship.target_entity_id,
relationship_type=relationship.relationship_type.value,
strength=relationship.strength,
context=relationship.context,
metadata=relationship.metadata
)
# Update relationship weights for graph algorithms
edge_key = (relationship.source_entity_id, relationship.target_entity_id)
self.relationship_weights[edge_key] = relationship.strength
def find_related_entities(self, entity_id: str, max_depth: int = 3,
relationship_types: Optional[List[RelationshipType]] = None) -> List[str]:
"""Find entities related to the given entity within specified depth."""
if entity_id not in self.entity_graph:
return []
related_entities = set()
queue = [(entity_id, 0)]
visited = {entity_id}
while queue:
current_entity, depth = queue.pop(0)
if depth >= max_depth:
continue
# Explore outgoing relationships
for neighbor in self.entity_graph.successors(current_entity):
edge_data = self.entity_graph.get_edge_data(current_entity, neighbor)
# Filter by relationship types if specified
if relationship_types:
edge_types = [RelationshipType(data.get('relationship_type', ''))
for data in edge_data.values()]
if not any(rt in relationship_types for rt in edge_types):
continue
if neighbor not in visited:
visited.add(neighbor)
related_entities.add(neighbor)
queue.append((neighbor, depth + 1))
# Explore incoming relationships
for predecessor in self.entity_graph.predecessors(current_entity):
edge_data = self.entity_graph.get_edge_data(predecessor, current_entity)
if relationship_types:
edge_types = [RelationshipType(data.get('relationship_type', ''))
for data in edge_data.values()]
if not any(rt in relationship_types for rt in edge_types):
continue
if predecessor not in visited:
visited.add(predecessor)
related_entities.add(predecessor)
queue.append((predecessor, depth + 1))
return list(related_entities)
Relationship Extraction and Analysis
The system implements sophisticated relationship extraction that goes beyond simple syntactic analysis to understand semantic connections between code entities.
class RelationshipExtractor:
"""Extracts relationships between code entities."""
def __init__(self):
self.call_graph_analyzer = CallGraphAnalyzer()
self.inheritance_analyzer = InheritanceAnalyzer()
self.dependency_analyzer = DependencyAnalyzer()
self.semantic_analyzer = SemanticRelationshipAnalyzer()
def extract_relationships(self, entities: List[CodeEntity]) -> List[CodeRelationship]:
"""Extract all types of relationships between entities."""
relationships = []
# Extract syntactic relationships
relationships.extend(self._extract_call_relationships(entities))
relationships.extend(self._extract_inheritance_relationships(entities))
relationships.extend(self._extract_dependency_relationships(entities))
# Extract semantic relationships
relationships.extend(self._extract_semantic_relationships(entities))
# Calculate relationship strengths
self._calculate_relationship_strengths(relationships, entities)
return relationships
def _extract_call_relationships(self, entities: List[CodeEntity]) -> List[CodeRelationship]:
"""Extract function/method call relationships."""
call_relationships = []
entity_lookup = {entity.name: entity for entity in entities}
for entity in entities:
if entity.entity_type in ['function', 'method']:
called_functions = self.call_graph_analyzer.extract_function_calls(entity)
for called_function in called_functions:
if called_function in entity_lookup:
target_entity = entity_lookup[called_function]
relationship = CodeRelationship(
source_entity_id=self._generate_entity_id(entity),
target_entity_id=self._generate_entity_id(target_entity),
relationship_type=RelationshipType.CALLS,
strength=1.0, # Will be refined later
context={
'call_frequency': self._count_call_frequency(entity, called_function),
'call_context': self._extract_call_context(entity, called_function)
}
)
call_relationships.append(relationship)
return call_relationships
def _extract_semantic_relationships(self, entities: List[CodeEntity]) -> List[CodeRelationship]:
"""Extract semantic relationships based on naming and functionality."""
semantic_relationships = []
for i, entity1 in enumerate(entities):
for entity2 in entities[i+1:]:
similarity_score = self.semantic_analyzer.calculate_semantic_similarity(entity1, entity2)
if similarity_score > 0.7: # High semantic similarity threshold
relationship = CodeRelationship(
source_entity_id=self._generate_entity_id(entity1),
target_entity_id=self._generate_entity_id(entity2),
relationship_type=RelationshipType.REFERENCES,
strength=similarity_score,
context={
'similarity_type': 'semantic',
'similarity_score': similarity_score,
'common_concepts': self.semantic_analyzer.extract_common_concepts(entity1, entity2)
}
)
semantic_relationships.append(relationship)
return semantic_relationships
Graph-Based Retrieval and Reasoning
The GraphRAG system enables sophisticated retrieval that considers both semantic similarity and structural relationships. This approach provides more contextually relevant information for LLM analysis.
class GraphRAGRetriever:
"""Retrieves relevant code context using graph-based reasoning."""
def __init__(self, graph_store: GraphRAGStore):
self.graph_store = graph_store
self.ranking_algorithm = GraphRankingAlgorithm()
self.context_optimizer = ContextOptimizer()
def retrieve_context(self, query: str, max_entities: int = 20,
context_strategy: str = 'hybrid') -> List[CodeEntity]:
"""Retrieve relevant code entities for a given query."""
# Phase 1: Semantic similarity search
semantic_candidates = self._semantic_search(query, max_entities * 2)
# Phase 2: Graph-based expansion
if context_strategy in ['graph', 'hybrid']:
graph_candidates = self._expand_with_graph_context(semantic_candidates)
else:
graph_candidates = semantic_candidates
# Phase 3: Ranking and selection
ranked_entities = self.ranking_algorithm.rank_entities(
graph_candidates, query, self.graph_store.entity_graph
)
# Phase 4: Context optimization
optimized_context = self.context_optimizer.optimize_context(
ranked_entities[:max_entities], query
)
return optimized_context
def _semantic_search(self, query: str, max_results: int) -> List[str]:
"""Perform semantic similarity search."""
query_embedding = self.graph_store.embedding_model.encode_query(query)
similarities = []
for entity_id, entity_embedding in self.graph_store.entity_embeddings.items():
similarity = self._calculate_cosine_similarity(query_embedding, entity_embedding)
similarities.append((entity_id, similarity))
# Sort by similarity and return top results
similarities.sort(key=lambda x: x[1], reverse=True)
return [entity_id for entity_id, _ in similarities[:max_results]]
def _expand_with_graph_context(self, seed_entities: List[str]) -> List[str]:
"""Expand seed entities using graph relationships."""
expanded_entities = set(seed_entities)
for entity_id in seed_entities:
# Find strongly connected entities
related_entities = self.graph_store.find_related_entities(
entity_id, max_depth=2,
relationship_types=[RelationshipType.CALLS, RelationshipType.DEPENDS_ON]
)
# Add high-strength relationships
for related_id in related_entities:
edge_data = self.graph_store.entity_graph.get_edge_data(entity_id, related_id)
if edge_data and any(data.get('strength', 0) > 0.5 for data in edge_data.values()):
expanded_entities.add(related_id)
return list(expanded_entities)
The GraphRAG implementation provides significant advantages over traditional vector-based approaches by preserving and leveraging the structural relationships inherent in code. This enables more accurate context retrieval and supports complex reasoning about code architecture and design patterns.
Problem 4: LLM-Based Analysis and Reasoning Engine
Problem Description
The core challenge lies in effectively utilizing Large Language Models to perform sophisticated code analysis that goes beyond simple pattern matching. The system must transform structured code representations into meaningful insights, recommendations, and architectural assessments. This requires careful prompt engineering, response parsing, and integration with the GraphRAG context to ensure accurate and actionable analysis results.
Solution Architecture
The LLM Analysis Engine serves as the central reasoning component that processes code chunks with enriched context to generate comprehensive analysis results. The engine implements specialized prompt templates for different analysis types and sophisticated response parsing to extract structured insights.
import asyncio
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class AnalysisType(Enum):
CODE_QUALITY = "code_quality"
ARCHITECTURE = "architecture_analysis"
SECURITY = "security_analysis"
PERFORMANCE = "performance_analysis"
MAINTAINABILITY = "maintainability_analysis"
GENERAL = "general"
@dataclass
class AnalysisContext:
"""Represents analysis context with memory optimization."""
primary_entities: List[CodeEntity]
supporting_entities: List[CodeEntity]
relationship_graph: nx.Graph
analysis_history: List[Dict[str, Any]]
context_hash: str
priority_scores: Dict[str, float]
token_budget: int
optimization_metadata: Dict[str, Any]
class LLMAnalysisEngine:
"""Core LLM-based analysis engine with comprehensive reasoning capabilities."""
def __init__(self, model_name: str = "gpt-4"):
self.llm_client = self._initialize_llm_client(model_name)
self.prompt_templates = PromptTemplateManager()
self.response_parser = ResponseParser()
self.analysis_cache = AnalysisCache()
self.quality_assessor = AnalysisQualityAssessor()
async def analyze_with_context(self, context: AnalysisContext,
config: Dict[str, Any]) -> Dict[str, Any]:
"""Perform comprehensive LLM-based analysis with optimized context."""
analysis_type = AnalysisType(config.get('analysis_type', 'general'))
# Check cache for existing analysis
cache_key = self._generate_cache_key(context, analysis_type)
cached_result = await self.analysis_cache.get_cached_analysis(cache_key)
if cached_result:
return cached_result
# Build specialized analysis prompt
prompt = self.prompt_templates.build_analysis_prompt(
context, analysis_type, config
)
# Execute LLM analysis with error handling and retries
llm_response = await self._execute_llm_analysis(
prompt, config, max_retries=3
)
# Parse and structure response
structured_result = self.response_parser.parse_analysis_response(
llm_response, analysis_type
)
# Assess analysis quality
quality_score = self.quality_assessor.assess_analysis_quality(
structured_result, context, analysis_type
)
structured_result['quality_metadata'] = {
'quality_score': quality_score,
'analysis_timestamp': time.time(),
'model_used': self.llm_client.model_name,
'context_size': len(context.primary_entities) + len(context.supporting_entities)
}
# Cache result for future use
await self.analysis_cache.cache_analysis(cache_key, structured_result)
return structured_result
async def analyze_code_chunk(self, chunk: CodeChunk,
analysis_queries: List[str]) -> Dict[str, Any]:
"""Analyze a specific code chunk with multiple targeted queries."""
results = {}
for query in analysis_queries:
# Build query-specific prompt
prompt = self.prompt_templates.build_chunk_analysis_prompt(chunk, query)
# Execute analysis
response = await self.llm_client.generate_response(
prompt,
max_tokens=1500,
temperature=0.1
)
# Parse response
parsed_response = self.response_parser.parse_chunk_response(response, query)
results[query] = parsed_response
return results
async def _execute_llm_analysis(self, prompt: str, config: Dict[str, Any],
max_retries: int = 3) -> str:
"""Execute LLM analysis with error handling and retries."""
for attempt in range(max_retries):
try:
response = await self.llm_client.generate_response(
prompt,
max_tokens=config.get('max_response_tokens', 2000),
temperature=config.get('temperature', 0.1),
top_p=config.get('top_p', 0.9)
)
# Validate response quality
if self._validate_response_quality(response):
return response
else:
print(f"Response quality validation failed, attempt {attempt + 1}")
except Exception as e:
print(f"LLM analysis attempt {attempt + 1} failed: {str(e)}")
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
raise RuntimeError("LLM analysis failed after all retry attempts")
def _validate_response_quality(self, response: str) -> bool:
"""Validate the quality and completeness of LLM response."""
if len(response.strip()) < 100:
return False
# Check for common error patterns
error_indicators = [
"I cannot", "I'm unable", "I don't have access",
"Error:", "Exception:", "Failed to"
]
response_lower = response.lower()
if any(indicator.lower() in response_lower for indicator in error_indicators):
return False
return True
Prompt Template Management
The prompt template system ensures consistent and effective communication with the LLM across different analysis types. Each template is carefully crafted to elicit specific types of insights while maintaining clarity and focus.
class PromptTemplateManager:
"""Manages specialized LLM prompts for different analysis types."""
def build_analysis_prompt(self, context: AnalysisContext,
analysis_type: AnalysisType,
config: Dict[str, Any]) -> str:
"""Build comprehensive analysis prompt based on type and context."""
if analysis_type == AnalysisType.CODE_QUALITY:
return self._build_code_quality_prompt(context, config)
elif analysis_type == AnalysisType.ARCHITECTURE:
return self._build_architecture_prompt(context, config)
elif analysis_type == AnalysisType.SECURITY:
return self._build_security_prompt(context, config)
elif analysis_type == AnalysisType.PERFORMANCE:
return self._build_performance_prompt(context, config)
elif analysis_type == AnalysisType.MAINTAINABILITY:
return self._build_maintainability_prompt(context, config)
else:
return self._build_general_prompt(context, config)
def _build_code_quality_prompt(self, context: AnalysisContext,
config: Dict[str, Any]) -> str:
"""Build comprehensive code quality analysis prompt."""
primary_code = self._format_entities_for_prompt(context.primary_entities)
supporting_code = self._format_entities_for_prompt(context.supporting_entities)
relationships = self._format_relationships_for_prompt(context.relationship_graph)
prompt = f"""You are an expert software engineer and code quality analyst. Analyze the following code for quality issues, best practices adherence, and improvement opportunities.
PRIMARY CODE TO ANALYZE:
{primary_code}
SUPPORTING CONTEXT:
{supporting_code}
RELATIONSHIPS AND DEPENDENCIES:
{relationships}
ANALYSIS REQUIREMENTS:
1. Code Quality Assessment:
- Identify code smells and anti-patterns with specific examples
- Evaluate adherence to clean code principles (SRP, DRY, KISS, etc.)
- Assess naming conventions and code readability
- Check for proper error handling and edge case management
- Evaluate documentation quality and completeness
2. Design Pattern Analysis:
- Identify design patterns currently in use
- Suggest appropriate patterns where missing or misapplied
- Evaluate pattern implementation quality and correctness
- Assess pattern consistency across the codebase
3. Best Practices Compliance:
- Language-specific best practices adherence
- Framework and library usage patterns
- Testing approach and coverage considerations
- Performance implications of current implementation
4. Maintainability Assessment:
- Evaluate code complexity and cognitive load
- Assess modularity, coupling, and cohesion
- Identify code duplication and refactoring opportunities
- Evaluate testability and debugging ease
5. Specific Recommendations:
- Provide concrete, actionable improvement suggestions
- Prioritize recommendations by impact and effort required
- Include code examples demonstrating improvements
- Suggest refactoring strategies where appropriate
Please provide a structured analysis with:
- Overall quality score (1-10)
- Critical issues requiring immediate attention
- Medium priority improvements
- Long-term enhancement suggestions
- Specific code examples with before/after comparisons where helpful
Format your response as structured JSON with clear sections for each analysis area."""
return prompt
def _build_architecture_prompt(self, context: AnalysisContext,
config: Dict[str, Any]) -> str:
"""Build architectural analysis prompt focusing on system design."""
relationships = self._format_relationships_for_prompt(context.relationship_graph)
entities = self._format_entities_for_prompt(context.primary_entities)
prompt = f"""You are a senior software architect with expertise in system design and architectural patterns. Analyze the following codebase structure and relationships for architectural quality and design decisions.
CODE ENTITIES AND STRUCTURE:
{entities}
COMPONENT RELATIONSHIPS:
{relationships}
ARCHITECTURAL ANALYSIS REQUIREMENTS:
1. Architecture Pattern Identification:
- Identify architectural patterns currently implemented (MVC, MVP, MVVM, Layered, etc.)
- Evaluate pattern implementation quality and consistency
- Assess pattern appropriateness for the problem domain
- Suggest alternative patterns where beneficial
2. Component and Module Analysis:
- Analyze component boundaries and responsibilities
- Evaluate single responsibility principle adherence
- Assess component coupling and cohesion levels
- Identify potential architectural violations or inconsistencies
3. Dependency Management:
- Analyze dependency directions and identify circular dependencies
- Evaluate dependency injection usage and patterns
- Assess abstraction levels and interface design
- Suggest dependency structure improvements
4. Scalability and Extensibility Assessment:
- Evaluate architectural scalability potential
- Identify potential bottlenecks and performance constraints
- Assess extensibility and modification ease
- Suggest improvements for future growth
5. Cross-Cutting Concerns:
- Evaluate handling of logging, error management, security
- Assess configuration management approach
- Analyze data flow and state management
- Review separation of concerns implementation
6. Technical Debt Assessment:
- Identify architectural technical debt
- Assess impact of current design decisions
- Prioritize architectural improvements
- Suggest migration strategies for major changes
Provide detailed architectural insights with:
- Architecture quality score (1-10)
- Current pattern identification and assessment
- Critical architectural issues
- Improvement recommendations with implementation strategies
- Long-term architectural evolution suggestions
Format as structured JSON with clear architectural assessment sections."""
return prompt
def _build_security_prompt(self, context: AnalysisContext,
config: Dict[str, Any]) -> str:
"""Build security-focused analysis prompt."""
primary_code = self._format_entities_for_prompt(context.primary_entities)
prompt = f"""You are a cybersecurity expert specializing in secure code analysis. Examine the following code for security vulnerabilities, weaknesses, and compliance with security best practices.
CODE TO ANALYZE:
{primary_code}
SECURITY ANALYSIS REQUIREMENTS:
1. Vulnerability Assessment:
- Identify potential security vulnerabilities (OWASP Top 10)
- Check for injection flaws (SQL, XSS, command injection)
- Assess authentication and authorization mechanisms
- Evaluate input validation and sanitization
2. Data Security Analysis:
- Assess data encryption and protection mechanisms
- Evaluate sensitive data handling practices
- Check for data leakage potential
- Analyze data storage security
3. Access Control Evaluation:
- Review authentication implementation
- Assess authorization and permission systems
- Evaluate session management security
- Check for privilege escalation risks
4. Security Best Practices Compliance:
- Evaluate secure coding practices adherence
- Assess error handling security implications
- Review logging and monitoring for security events
- Check for hardcoded secrets or credentials
Provide security assessment with:
- Security risk score (1-10)
- Critical vulnerabilities requiring immediate attention
- Medium and low priority security issues
- Specific remediation recommendations
- Security best practices implementation suggestions
Format as structured JSON with vulnerability details and remediation steps."""
return prompt
def _format_entities_for_prompt(self, entities: List[CodeEntity]) -> str:
"""Format code entities for inclusion in LLM prompts."""
if not entities:
return "No entities provided."
formatted_entities = []
for entity in entities:
entity_info = f"""
Entity Type: {entity.entity_type}
Name: {entity.name}
Language: {entity.language.value}
File: {entity.metadata.get('file_path', 'Unknown')}
Lines: {entity.start_line}-{entity.end_line}
Signature: {entity.signature or 'N/A'}
Code:
{entity.body}
Complexity Metrics: {json.dumps(entity.complexity_metrics, indent=2)}
Dependencies: {', '.join(entity.dependencies) if entity.dependencies else 'None'}
---
"""
formatted_entities.append(entity_info)
return '\n'.join(formatted_entities)
def _format_relationships_for_prompt(self, relationship_graph: nx.Graph) -> str:
"""Format relationship graph for inclusion in prompts."""
if not relationship_graph or relationship_graph.number_of_edges() == 0:
return "No relationships identified."
relationships = []
for source, target, data in relationship_graph.edges(data=True):
rel_type = data.get('relationship_type', 'unknown')
strength = data.get('strength', 0.0)
relationships.append(f"{source} --[{rel_type}:{strength:.2f}]--> {target}")
return '\n'.join(relationships)
def build_chunk_analysis_prompt(self, chunk: CodeChunk, query: str) -> str:
"""Build prompt for analyzing a specific code chunk with a targeted query."""
chunk_content = chunk.content
chunk_metadata = {
'entity_count': len(chunk.entities),
'chunk_type': chunk.chunk_type,
'dependencies': list(chunk.dependencies),
'priority': chunk.context_priority
}
prompt = f"""You are an expert code analyst. Analyze the following code chunk and answer the specific query provided.
QUERY: {query}
CODE CHUNK TO ANALYZE:
{chunk_content}
CHUNK METADATA:
{json.dumps(chunk_metadata, indent=2)}
Please provide a detailed, specific answer to the query based on your analysis of the code chunk. Include:
- Direct observations from the code
- Specific examples and line references where relevant
- Actionable recommendations if applicable
- Any potential issues or improvements related to the query
Keep your response focused on the specific query while being thorough in your analysis."""
return prompt
Response Parsing and Structuring
The response parser extracts structured information from LLM responses, ensuring consistent and actionable output across different analysis types.
class ResponseParser:
"""Parses and structures LLM analysis responses."""
def parse_analysis_response(self, response: str,
analysis_type: AnalysisType) -> Dict[str, Any]:
"""Parse LLM response into structured analysis results."""
try:
# Attempt to parse as JSON first
if response.strip().startswith('{'):
return json.loads(response)
except json.JSONDecodeError:
pass
# Fallback to text parsing based on analysis type
if analysis_type == AnalysisType.CODE_QUALITY:
return self._parse_code_quality_response(response)
elif analysis_type == AnalysisType.ARCHITECTURE:
return self._parse_architecture_response(response)
elif analysis_type == AnalysisType.SECURITY:
return self._parse_security_response(response)
else:
return self._parse_general_response(response)
def _parse_code_quality_response(self, response: str) -> Dict[str, Any]:
"""Parse code quality analysis response."""
parsed_result = {
'analysis_type': 'code_quality',
'overall_score': self._extract_score(response),
'critical_issues': self._extract_issues(response, 'critical'),
'medium_issues': self._extract_issues(response, 'medium'),
'recommendations': self._extract_recommendations(response),
'code_smells': self._extract_code_smells(response),
'best_practices': self._extract_best_practices_assessment(response),
'maintainability_score': self._extract_maintainability_score(response),
'raw_response': response
}
return parsed_result
def _parse_architecture_response(self, response: str) -> Dict[str, Any]:
"""Parse architectural analysis response."""
parsed_result = {
'analysis_type': 'architecture',
'architecture_score': self._extract_score(response),
'patterns_identified': self._extract_patterns(response),
'architectural_issues': self._extract_architectural_issues(response),
'dependency_analysis': self._extract_dependency_analysis(response),
'scalability_assessment': self._extract_scalability_assessment(response),
'improvement_suggestions': self._extract_architectural_improvements(response),
'technical_debt': self._extract_technical_debt(response),
'raw_response': response
}
return parsed_result
def _parse_security_response(self, response: str) -> Dict[str, Any]:
"""Parse security analysis response."""
parsed_result = {
'analysis_type': 'security',
'security_score': self._extract_score(response),
'vulnerabilities': self._extract_vulnerabilities(response),
'security_issues': self._extract_security_issues(response),
'compliance_assessment': self._extract_compliance_assessment(response),
'remediation_steps': self._extract_remediation_steps(response),
'risk_assessment': self._extract_risk_assessment(response),
'raw_response': response
}
return parsed_result
def parse_chunk_response(self, response: str, query: str) -> Dict[str, Any]:
"""Parse response for chunk-specific analysis."""
return {
'query': query,
'response': response,
'key_findings': self._extract_key_findings(response),
'recommendations': self._extract_recommendations(response),
'code_references': self._extract_code_references(response),
'severity': self._assess_finding_severity(response)
}
def _extract_score(self, response: str) -> Optional[float]:
"""Extract numerical score from response."""
import re
# Look for score patterns like "score: 7/10", "7.5/10", "Score: 8"
score_patterns = [
r'score[:\s]+(\d+(?:\.\d+)?)/10',
r'score[:\s]+(\d+(?:\.\d+)?)',
r'(\d+(?:\.\d+)?)/10',
r'quality[:\s]+(\d+(?:\.\d+)?)'
]
for pattern in score_patterns:
match = re.search(pattern, response.lower())
if match:
try:
score = float(match.group(1))
return min(score, 10.0) # Cap at 10
except ValueError:
continue
return None
def _extract_issues(self, response: str, severity: str) -> List[Dict[str, Any]]:
"""Extract issues of specific severity from response."""
issues = []
# Look for sections containing the severity level
import re
section_pattern = rf'{severity}[^:]*:?(.*?)(?={"|".join(["medium", "low", "recommendations", "suggestions"])}|$)'
match = re.search(section_pattern, response.lower(), re.DOTALL | re.IGNORECASE)
if match:
section_text = match.group(1)
# Extract individual issues (assuming bullet points or numbered lists)
issue_patterns = [
r'[-*•]\s*(.+?)(?=[-*•]|$)',
r'\d+\.\s*(.+?)(?=\d+\.|$)'
]
for pattern in issue_patterns:
issue_matches = re.findall(pattern, section_text, re.DOTALL)
for issue_text in issue_matches:
if issue_text.strip():
issues.append({
'description': issue_text.strip(),
'severity': severity,
'category': self._categorize_issue(issue_text)
})
return issues
def _extract_recommendations(self, response: str) -> List[str]:
"""Extract recommendations from response."""
recommendations = []
# Look for recommendation sections
import re
rec_patterns = [
r'recommendations?[:\s]+(.*?)(?=\n\n|\n[A-Z]|$)',
r'suggestions?[:\s]+(.*?)(?=\n\n|\n[A-Z]|$)',
r'improvements?[:\s]+(.*?)(?=\n\n|\n[A-Z]|$)'
]
for pattern in rec_patterns:
matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE)
for match in matches:
# Extract individual recommendations
rec_items = re.findall(r'[-*•]\s*(.+?)(?=[-*•]|$)', match, re.DOTALL)
recommendations.extend([item.strip() for item in rec_items if item.strip()])
return recommendations
def _categorize_issue(self, issue_text: str) -> str:
"""Categorize an issue based on its content."""
issue_lower = issue_text.lower()
if any(keyword in issue_lower for keyword in ['security', 'vulnerability', 'injection', 'auth']):
return 'security'
elif any(keyword in issue_lower for keyword in ['performance', 'slow', 'optimization', 'memory']):
return 'performance'
elif any(keyword in issue_lower for keyword in ['maintainability', 'complex', 'coupling', 'cohesion']):
return 'maintainability'
elif any(keyword in issue_lower for keyword in ['style', 'naming', 'format', 'convention']):
return 'style'
else:
return 'general'
Analysis Quality Assessment
The quality assessor ensures that LLM-generated analysis meets standards for accuracy, completeness, and actionability.
class AnalysisQualityAssessor:
"""Assesses the quality of LLM-generated analysis results."""
def assess_analysis_quality(self, analysis_result: Dict[str, Any],
context: AnalysisContext,
analysis_type: AnalysisType) -> float:
"""Assess overall quality of analysis result."""
quality_factors = {
'completeness': self._assess_completeness(analysis_result, analysis_type),
'specificity': self._assess_specificity(analysis_result),
'actionability': self._assess_actionability(analysis_result),
'accuracy': self._assess_accuracy(analysis_result, context),
'relevance': self._assess_relevance(analysis_result, context)
}
# Weighted average of quality factors
weights = {
'completeness': 0.25,
'specificity': 0.20,
'actionability': 0.25,
'accuracy': 0.20,
'relevance': 0.10
}
quality_score = sum(
quality_factors[factor] * weights[factor]
for factor in quality_factors
)
return min(quality_score, 1.0)
def _assess_completeness(self, analysis_result: Dict[str, Any],
analysis_type: AnalysisType) -> float:
"""Assess completeness of analysis based on expected sections."""
expected_sections = self._get_expected_sections(analysis_type)
present_sections = set(analysis_result.keys())
coverage = len(present_sections.intersection(expected_sections)) / len(expected_sections)
return coverage
def _assess_specificity(self, analysis_result: Dict[str, Any]) -> float:
"""Assess how specific and detailed the analysis is."""
specificity_score = 0.0
# Check for specific code references
raw_response = analysis_result.get('raw_response', '')
if 'line' in raw_response.lower() or 'function' in raw_response.lower():
specificity_score += 0.3
# Check for concrete examples
if 'example' in raw_response.lower() or 'for instance' in raw_response.lower():
specificity_score += 0.3
# Check for detailed recommendations
recommendations = analysis_result.get('recommendations', [])
if recommendations and len(recommendations) > 2:
specificity_score += 0.4
return min(specificity_score, 1.0)
def _assess_actionability(self, analysis_result: Dict[str, Any]) -> float:
"""Assess how actionable the recommendations are."""
actionability_score = 0.0
recommendations = analysis_result.get('recommendations', [])
if not recommendations:
return 0.0
# Check for action verbs in recommendations
action_verbs = ['refactor', 'implement', 'add', 'remove', 'change', 'update', 'fix']
actionable_count = 0
for rec in recommendations:
if any(verb in rec.lower() for verb in action_verbs):
actionable_count += 1
actionability_score = actionable_count / len(recommendations)
return actionability_score
def _get_expected_sections(self, analysis_type: AnalysisType) -> Set[str]:
"""Get expected sections for different analysis types."""
base_sections = {'recommendations', 'raw_response'}
if analysis_type == AnalysisType.CODE_QUALITY:
return base_sections.union({
'overall_score', 'critical_issues', 'code_smells', 'best_practices'
})
elif analysis_type == AnalysisType.ARCHITECTURE:
return base_sections.union({
'architecture_score', 'patterns_identified', 'architectural_issues'
})
elif analysis_type == AnalysisType.SECURITY:
return base_sections.union({
'security_score', 'vulnerabilities', 'security_issues'
})
else:
return base_sections
The LLM Analysis Engine provides the core reasoning capabilities that transform structured code representations into meaningful insights. By implementing specialized prompts, robust response parsing, and quality assessment, the system ensures that LLM-generated analysis is accurate, actionable, and valuable for developers and architects.
Problem 5: Context-Aware Analysis with Memory Optimization
Problem Description
LLM-based code analysis faces significant challenges related to context window limitations and memory efficiency. Large codebases can easily exceed token limits, while maintaining relevant context across multiple analysis sessions requires sophisticated memory management. The system must balance comprehensive analysis with computational efficiency.
Solution Architecture
The Context-Aware Analysis Engine implements a multi-tiered approach to context management, combining intelligent context selection, hierarchical memory structures, and adaptive optimization strategies.
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import json
from collections import deque
import hashlib
@dataclass
class AnalysisContext:
"""Represents analysis context with memory optimization."""
primary_entities: List[CodeEntity]
supporting_entities: List[CodeEntity]
relationship_graph: nx.Graph
analysis_history: List[Dict[str, Any]]
context_hash: str
priority_scores: Dict[str, float]
token_budget: int
optimization_metadata: Dict[str, Any]
class ContextMemoryManager:
"""Manages context memory with optimization strategies."""
def __init__(self, max_context_tokens: int = 8000):
self.max_context_tokens = max_context_tokens
self.context_cache = {}
self.access_frequency = {}
self.context_hierarchy = ContextHierarchy()
self.compression_engine = ContextCompressionEngine()
self.relevance_scorer = RelevanceScorer()
def build_analysis_context(self, target_entities: List[CodeEntity],
query: str, analysis_type: str) -> AnalysisContext:
"""Build optimized analysis context for LLM processing."""
# Generate context hash for caching
context_hash = self._generate_context_hash(target_entities, query, analysis_type)
# Check cache first
if context_hash in self.context_cache:
cached_context = self.context_cache[context_hash]
self._update_access_frequency(context_hash)
return cached_context
# Build new context
context = self._build_fresh_context(target_entities, query, analysis_type)
# Cache the context
self._cache_context(context_hash, context)
return context
def _build_fresh_context(self, target_entities: List[CodeEntity],
query: str, analysis_type: str) -> AnalysisContext:
"""Build a fresh analysis context with optimization."""
# Phase 1: Priority scoring
priority_scores = self.relevance_scorer.score_entities(target_entities, query, analysis_type)
# Phase 2: Hierarchical organization
hierarchical_context = self.context_hierarchy.organize_entities(target_entities, priority_scores)
# Phase 3: Token budget allocation
token_allocation = self._allocate_token_budget(hierarchical_context, priority_scores)
# Phase 4: Context compression if needed
if token_allocation['total_tokens'] > self.max_context_tokens:
compressed_context = self.compression_engine.compress_context(
hierarchical_context, token_allocation, self.max_context_tokens
)
else:
compressed_context = hierarchical_context
# Phase 5: Build final context
analysis_context = AnalysisContext(
primary_entities=compressed_context['primary'],
supporting_entities=compressed_context['supporting'],
relationship_graph=compressed_context['relationships'],
analysis_history=[],
context_hash=self._generate_context_hash(target_entities, query, analysis_type),
priority_scores=priority_scores,
token_budget=self.max_context_tokens,
optimization_metadata=compressed_context['metadata']
)
return analysis_context
def _allocate_token_budget(self, hierarchical_context: Dict[str, Any],
priority_scores: Dict[str, float]) -> Dict[str, int]:
"""Allocate token budget based on entity priorities."""
total_available = self.max_context_tokens
# Reserve tokens for system prompts and response
system_overhead = 1000
response_budget = 1500
available_for_context = total_available - system_overhead - response_budget
# Calculate entity token requirements
entity_tokens = {}
total_required = 0
for level, entities in hierarchical_context.items():
level_tokens = 0
for entity in entities:
entity_token_count = self._estimate_entity_tokens(entity)
entity_tokens[entity.name] = entity_token_count
level_tokens += entity_token_count
total_required += level_tokens
# Allocate proportionally if over budget
if total_required > available_for_context:
scaling_factor = available_for_context / total_required
for entity_name in entity_tokens:
entity_tokens[entity_name] = int(entity_tokens[entity_name] * scaling_factor)
return {
'entity_tokens': entity_tokens,
'total_tokens': sum(entity_tokens.values()),
'available_budget': available_for_context,
'scaling_applied': total_required > available_for_context
}
Hierarchical Context Organization
The system organizes context information in a hierarchical structure that prioritizes the most relevant information while maintaining supporting context for comprehensive analysis.
class ContextHierarchy:
"""Organizes context in hierarchical levels based on relevance."""
def __init__(self):
self.level_definitions = {
'critical': {'priority_threshold': 0.8, 'max_entities': 5},
'important': {'priority_threshold': 0.6, 'max_entities': 10},
'supporting': {'priority_threshold': 0.4, 'max_entities': 15},
'background': {'priority_threshold': 0.2, 'max_entities': 20}
}
def organize_entities(self, entities: List[CodeEntity],
priority_scores: Dict[str, float]) -> Dict[str, List[CodeEntity]]:
"""Organize entities into hierarchical levels."""
organized_context = {level: [] for level in self.level_definitions.keys()}
# Sort entities by priority score
sorted_entities = sorted(entities,
key=lambda e: priority_scores.get(e.name, 0.0),
reverse=True)
# Assign entities to levels
for entity in sorted_entities:
entity_priority = priority_scores.get(entity.name, 0.0)
assigned = False
for level, config in self.level_definitions.items():
if (entity_priority >= config['priority_threshold'] and
len(organized_context[level]) < config['max_entities']):
organized_context[level].append(entity)
assigned = True
break
# If not assigned to any level, add to background if space available
if not assigned and len(organized_context['background']) < self.level_definitions['background']['max_entities']:
organized_context['background'].append(entity)
# Build relationship subgraph for organized entities
all_organized_entities = []
for level_entities in organized_context.values():
all_organized_entities.extend(level_entities)
relationship_graph = self._build_relationship_subgraph(all_organized_entities)
organized_context['relationships'] = relationship_graph
return organized_context
def _build_relationship_subgraph(self, entities: List[CodeEntity]) -> nx.Graph:
"""Build a subgraph of relationships between organized entities."""
subgraph = nx.Graph()
entity_names = {entity.name for entity in entities}
# Add nodes
for entity in entities:
subgraph.add_node(entity.name, entity_data=entity)
# Add edges based on dependencies and relationships
for entity in entities:
for dependency in entity.dependencies:
if dependency in entity_names:
subgraph.add_edge(entity.name, dependency, relationship_type='dependency')
return subgraph
Context Compression and Optimization
When context exceeds token limits, the system employs intelligent compression strategies that preserve the most important information while reducing token usage.
class ContextCompressionEngine:
"""Compresses context while preserving essential information."""
def __init__(self):
self.summarization_model = CodeSummarizationModel()
self.abstraction_engine = CodeAbstractionEngine()
self.essential_extractor = EssentialInformationExtractor()
def compress_context(self, hierarchical_context: Dict[str, List[CodeEntity]],
token_allocation: Dict[str, int],
max_tokens: int) -> Dict[str, Any]:
"""Compress context to fit within token limits."""
compressed_context = {
'primary': [],
'supporting': [],
'relationships': nx.Graph(),
'metadata': {'compression_applied': True, 'compression_ratio': 0.0}
}
total_original_tokens = token_allocation['total_tokens']
compression_target = max_tokens * 0.7 # Leave buffer for processing
# Compress each level with different strategies
for level, entities in hierarchical_context.items():
if level == 'relationships':
continue
if level in ['critical', 'important']:
# Preserve critical and important entities with minimal compression
compressed_entities = self._light_compression(entities)
if level == 'critical':
compressed_context['primary'].extend(compressed_entities)
else:
compressed_context['supporting'].extend(compressed_entities)
elif level in ['supporting', 'background']:
# Apply heavy compression or summarization
compressed_entities = self._heavy_compression(entities)
compressed_context['supporting'].extend(compressed_entities)
# Compress relationships
compressed_context['relationships'] = self._compress_relationship_graph(
hierarchical_context.get('relationships', nx.Graph())
)
# Calculate compression ratio
final_token_count = self._estimate_compressed_token_count(compressed_context)
compression_ratio = final_token_count / total_original_tokens if total_original_tokens > 0 else 1.0
compressed_context['metadata']['compression_ratio'] = compression_ratio
return compressed_context
def _light_compression(self, entities: List[CodeEntity]) -> List[CodeEntity]:
"""Apply light compression preserving most information."""
compressed_entities = []
for entity in entities:
# Extract essential information
essential_info = self.essential_extractor.extract_essentials(entity)
# Create compressed version
compressed_entity = CodeEntity(
entity_type=entity.entity_type,
name=entity.name,
signature=entity.signature,
body=essential_info['compressed_body'],
start_line=entity.start_line,
end_line=entity.end_line,
language=entity.language,
metadata={
**essential_info['essential_metadata'],
'compression_level': 'light',
'original_size': len(entity.body)
},
dependencies=entity.dependencies,
complexity_metrics=entity.complexity_metrics
)
compressed_entities.append(compressed_entity)
return compressed_entities
def _heavy_compression(self, entities: List[CodeEntity]) -> List[CodeEntity]:
"""Apply heavy compression with summarization."""
if not entities:
return []
# Group similar entities for batch summarization
entity_groups = self._group_similar_entities(entities)
compressed_entities = []
for group in entity_groups:
if len(group) == 1:
# Single entity - apply individual compression
summary = self.summarization_model.summarize_entity(group[0])
compressed_entity = self._create_summary_entity(group[0], summary)
compressed_entities.append(compressed_entity)
else:
# Multiple entities - create group summary
group_summary = self.summarization_model.summarize_entity_group(group)
summary_entity = self._create_group_summary_entity(group, group_summary)
compressed_entities.append(summary_entity)
return compressed_entities
Adaptive Context Management
The system continuously learns from analysis patterns to optimize context selection and memory usage over time.
class AdaptiveContextManager:
"""Manages context adaptation based on usage patterns."""
def __init__(self):
self.usage_patterns = {}
self.effectiveness_metrics = {}
self.adaptation_strategies = {}
self.learning_rate = 0.1
def update_context_effectiveness(self, context_hash: str,
analysis_results: Dict[str, Any],
user_feedback: Optional[Dict[str, Any]] = None) -> None:
"""Update context effectiveness based on analysis results and feedback."""
if context_hash not in self.effectiveness_metrics:
self.effectiveness_metrics[context_hash] = {
'accuracy_score': 0.0,
'completeness_score': 0.0,
'efficiency_score': 0.0,
'user_satisfaction': 0.0,
'usage_count': 0
}
metrics = self.effectiveness_metrics[context_hash]
# Update metrics based on analysis results
if 'accuracy_indicators' in analysis_results:
new_accuracy = self._calculate_accuracy_score(analysis_results['accuracy_indicators'])
metrics['accuracy_score'] = self._update_metric(metrics['accuracy_score'], new_accuracy)
if 'completeness_indicators' in analysis_results:
new_completeness = self._calculate_completeness_score(analysis_results['completeness_indicators'])
metrics['completeness_score'] = self._update_metric(metrics['completeness_score'], new_completeness)
# Update efficiency based on token usage and processing time
if 'performance_metrics' in analysis_results:
new_efficiency = self._calculate_efficiency_score(analysis_results['performance_metrics'])
metrics['efficiency_score'] = self._update_metric(metrics['efficiency_score'], new_efficiency)
# Incorporate user feedback if available
if user_feedback:
user_score = user_feedback.get('satisfaction_score', 0.5)
metrics['user_satisfaction'] = self._update_metric(metrics['user_satisfaction'], user_score)
metrics['usage_count'] += 1
# Adapt strategies based on updated metrics
self._adapt_context_strategies(context_hash, metrics)
def _update_metric(self, current_value: float, new_value: float) -> float:
"""Update metric using exponential moving average."""
return current_value * (1 - self.learning_rate) + new_value * self.learning_rate
def _adapt_context_strategies(self, context_hash: str, metrics: Dict[str, float]) -> None:
"""Adapt context strategies based on effectiveness metrics."""
overall_effectiveness = (
metrics['accuracy_score'] * 0.3 +
metrics['completeness_score'] * 0.3 +
metrics['efficiency_score'] * 0.2 +
metrics['user_satisfaction'] * 0.2
)
if context_hash not in self.adaptation_strategies:
self.adaptation_strategies[context_hash] = {
'compression_threshold': 0.7,
'priority_boost': 1.0,
'relationship_depth': 2
}
strategy = self.adaptation_strategies[context_hash]
# Adapt based on effectiveness
if overall_effectiveness < 0.6:
# Low effectiveness - reduce compression, increase context
strategy['compression_threshold'] = max(0.5, strategy['compression_threshold'] - 0.1)
strategy['relationship_depth'] = min(3, strategy['relationship_depth'] + 1)
elif overall_effectiveness > 0.8:
# High effectiveness - can afford more compression
strategy['compression_threshold'] = min(0.9, strategy['compression_threshold'] + 0.05)
# Adapt based on specific metric weaknesses
if metrics['completeness_score'] < 0.5:
strategy['relationship_depth'] = min(4, strategy['relationship_depth'] + 1)
if metrics['efficiency_score'] < 0.5:
strategy['compression_threshold'] = min(0.9, strategy['compression_threshold'] + 0.1)
The context-aware analysis engine provides a sophisticated framework for managing LLM interactions with large codebases. By implementing hierarchical organization, intelligent compression, and adaptive optimization, the system maintains high analysis quality while respecting computational constraints.
Problem 6: Advanced Optimization Techniques
Problem Description
Large-scale code analysis requires sophisticated optimization techniques to maintain performance and accuracy. The system must handle codebases with millions of lines of code while providing real-time analysis capabilities. This necessitates optimizations at multiple levels including caching, parallel processing, incremental analysis, and intelligent preprocessing.
Solution Architecture
The optimization framework implements a multi-layered approach combining caching strategies, parallel processing, incremental updates, and predictive prefetching to achieve optimal performance.
import asyncio
import concurrent.futures
from typing import List, Dict, Any, Optional, Set
from dataclasses import dataclass
import threading
import time
from collections import defaultdict
import pickle
import hashlib
@dataclass
class OptimizationMetrics:
"""Tracks optimization performance metrics."""
cache_hit_rate: float
average_processing_time: float
memory_usage_mb: float
parallel_efficiency: float
incremental_update_ratio: float
prefetch_accuracy: float
class PerformanceOptimizer:
"""Comprehensive performance optimization system."""
def __init__(self):
self.cache_manager = MultiLevelCacheManager()
self.parallel_processor = ParallelProcessingEngine()
self.incremental_analyzer = IncrementalAnalysisEngine()
self.prefetch_predictor = PrefetchPredictor()
self.metrics_collector = MetricsCollector()
self.optimization_config = OptimizationConfig()
async def optimize_analysis_pipeline(self, analysis_request: Dict[str, Any]) -> Dict[str, Any]:
"""Optimize the entire analysis pipeline for maximum performance."""
start_time = time.time()
# Phase 1: Check cache for complete results
cache_result = await self.cache_manager.get_cached_result(analysis_request)
if cache_result:
self.metrics_collector.record_cache_hit()
return cache_result
# Phase 2: Incremental analysis check
incremental_result = await self.incremental_analyzer.check_incremental_update(analysis_request)
if incremental_result:
self.metrics_collector.record_incremental_hit()
return incremental_result
# Phase 3: Parallel processing optimization
optimized_tasks = self.parallel_processor.optimize_task_distribution(analysis_request)
# Phase 4: Predictive prefetching
prefetch_tasks = self.prefetch_predictor.predict_future_needs(analysis_request)
asyncio.create_task(self._execute_prefetch_tasks(prefetch_tasks))
# Phase 5: Execute optimized analysis
results = await self.parallel_processor.execute_parallel_analysis(optimized_tasks)
# Phase 6: Cache results for future use
await self.cache_manager.cache_results(analysis_request, results)
# Phase 7: Update metrics
processing_time = time.time() - start_time
self.metrics_collector.record_processing_time(processing_time)
return results
Multi-Level Caching System
The caching system implements multiple levels of caching to optimize different aspects of the analysis pipeline.
class MultiLevelCacheManager:
"""Multi-level caching system with intelligent eviction."""
def __init__(self):
self.l1_cache = {} # In-memory cache for frequent access
self.l2_cache = {} # Compressed cache for medium-term storage
self.l3_cache = PersistentCache() # Disk-based cache for long-term storage
self.cache_stats = defaultdict(int)
self.access_patterns = defaultdict(list)
self.cache_locks = defaultdict(threading.RLock)
# Cache configuration
self.l1_max_size = 1000
self.l2_max_size = 5000
self.compression_threshold = 10000 # bytes
async def get_cached_result(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Retrieve cached result from appropriate cache level."""
cache_key = self._generate_cache_key(request)
# Check L1 cache first (fastest)
with self.cache_locks[cache_key]:
if cache_key in self.l1_cache:
self.cache_stats['l1_hits'] += 1
self._update_access_pattern(cache_key, 'l1')
return self.l1_cache[cache_key]
# Check L2 cache (compressed)
if cache_key in self.l2_cache:
self.cache_stats['l2_hits'] += 1
compressed_data = self.l2_cache[cache_key]
decompressed_result = self._decompress_cache_data(compressed_data)
# Promote to L1 cache if frequently accessed
if self._should_promote_to_l1(cache_key):
self._promote_to_l1(cache_key, decompressed_result)
self._update_access_pattern(cache_key, 'l2')
return decompressed_result
# Check L3 cache (persistent)
l3_result = await self.l3_cache.get(cache_key)
if l3_result:
self.cache_stats['l3_hits'] += 1
# Promote to appropriate level based on access pattern
if self._should_promote_to_l2(cache_key):
compressed_data = self._compress_cache_data(l3_result)
self.l2_cache[cache_key] = compressed_data
self._update_access_pattern(cache_key, 'l3')
return l3_result
# Cache miss
self.cache_stats['misses'] += 1
return None
async def cache_results(self, request: Dict[str, Any], results: Dict[str, Any]) -> None:
"""Cache results at appropriate level based on size and access patterns."""
cache_key = self._generate_cache_key(request)
result_size = self._estimate_data_size(results)
# Determine appropriate cache level
if result_size < self.compression_threshold and len(self.l1_cache) < self.l1_max_size:
# Store in L1 cache
with self.cache_locks[cache_key]:
self.l1_cache[cache_key] = results
self._manage_l1_eviction()
elif len(self.l2_cache) < self.l2_max_size:
# Store in L2 cache with compression
compressed_data = self._compress_cache_data(results)
self.l2_cache[cache_key] = compressed_data
self._manage_l2_eviction()
else:
# Store in L3 cache (persistent)
await self.l3_cache.set(cache_key, results)
def _manage_l1_eviction(self) -> None:
"""Manage L1 cache eviction using LRU with access frequency consideration."""
if len(self.l1_cache) > self.l1_max_size:
# Calculate eviction scores based on recency and frequency
eviction_candidates = []
current_time = time.time()
for cache_key in self.l1_cache:
access_history = self.access_patterns[cache_key]
if access_history:
last_access = access_history[-1]
access_frequency = len(access_history)
recency_score = 1.0 / (current_time - last_access + 1)
frequency_score = access_frequency / 100.0 # Normalize
# Combined score favoring both recent and frequent access
eviction_score = recency_score * 0.7 + frequency_score * 0.3
eviction_candidates.append((cache_key, eviction_score))
# Sort by eviction score (lowest first) and remove least valuable entries
eviction_candidates.sort(key=lambda x: x[1])
entries_to_remove = len(self.l1_cache) - self.l1_max_size + 1
for cache_key, _ in eviction_candidates[:entries_to_remove]:
# Move to L2 cache before evicting from L1
if cache_key in self.l1_cache:
data = self.l1_cache[cache_key]
compressed_data = self._compress_cache_data(data)
self.l2_cache[cache_key] = compressed_data
del self.l1_cache[cache_key]
Parallel Processing Engine
The parallel processing engine optimizes task distribution and execution across multiple cores and processes.
class ParallelProcessingEngine:
"""Advanced parallel processing with intelligent task distribution."""
def __init__(self):
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)
self.process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=4)
self.task_scheduler = TaskScheduler()
self.load_balancer = LoadBalancer()
self.dependency_resolver = DependencyResolver()
def optimize_task_distribution(self, analysis_request: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Optimize task distribution for parallel execution."""
# Parse analysis request into individual tasks
raw_tasks = self._decompose_analysis_request(analysis_request)
# Resolve task dependencies
dependency_graph = self.dependency_resolver.build_dependency_graph(raw_tasks)
# Optimize task scheduling
optimized_schedule = self.task_scheduler.optimize_schedule(dependency_graph)
# Balance load across available resources
balanced_tasks = self.load_balancer.balance_task_load(optimized_schedule)
return balanced_tasks
async def execute_parallel_analysis(self, optimized_tasks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute analysis tasks in parallel with dependency management."""
task_results = {}
completed_tasks = set()
pending_tasks = {task['id']: task for task in optimized_tasks}
# Execute tasks in dependency order
while pending_tasks:
# Find tasks ready for execution (dependencies satisfied)
ready_tasks = []
for task_id, task in pending_tasks.items():
dependencies = task.get('dependencies', [])
if all(dep in completed_tasks for dep in dependencies):
ready_tasks.append(task)
if not ready_tasks:
raise RuntimeError("Circular dependency detected in task graph")
# Execute ready tasks in parallel
execution_futures = []
for task in ready_tasks:
if task['type'] == 'cpu_intensive':
future = self.process_pool.submit(self._execute_cpu_task, task)
else:
future = self.thread_pool.submit(self._execute_io_task, task)
execution_futures.append((task['id'], future))
del pending_tasks[task['id']]
# Wait for completion and collect results
for task_id, future in execution_futures:
try:
result = future.result(timeout=300) # 5 minute timeout
task_results[task_id] = result
completed_tasks.add(task_id)
except Exception as e:
raise RuntimeError(f"Task {task_id} failed: {str(e)}")
# Combine results into final analysis
final_result = self._combine_task_results(task_results)
return final_result
def _execute_cpu_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Execute CPU-intensive analysis task."""
task_type = task['task_type']
task_data = task['data']
if task_type == 'syntax_analysis':
return self._perform_syntax_analysis(task_data)
elif task_type == 'complexity_calculation':
return self._calculate_complexity_metrics(task_data)
elif task_type == 'relationship_extraction':
return self._extract_relationships(task_data)
else:
raise ValueError(f"Unknown CPU task type: {task_type}")
def _execute_io_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Execute I/O-intensive analysis task."""
task_type = task['task_type']
task_data = task['data']
if task_type == 'file_parsing':
return self._parse_source_files(task_data)
elif task_type == 'embedding_generation':
return self._generate_embeddings(task_data)
elif task_type == 'graph_storage':
return self._store_graph_data(task_data)
else:
raise ValueError(f"Unknown I/O task type: {task_type}")
Incremental Analysis Engine
The incremental analysis engine minimizes redundant processing by tracking changes and updating only affected components.
class IncrementalAnalysisEngine:
"""Manages incremental analysis with change tracking."""
def __init__(self):
self.change_tracker = ChangeTracker()
self.dependency_tracker = DependencyTracker()
self.analysis_cache = AnalysisCache()
self.impact_analyzer = ImpactAnalyzer()
async def check_incremental_update(self, analysis_request: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Check if analysis can be performed incrementally."""
# Detect changes since last analysis
changes = self.change_tracker.detect_changes(analysis_request)
if not changes:
# No changes detected - return cached result
return await self.analysis_cache.get_cached_analysis(analysis_request)
# Analyze impact of changes
impact_analysis = self.impact_analyzer.analyze_change_impact(changes)
# Determine if incremental update is beneficial
if self._should_perform_incremental_update(impact_analysis):
return await self._perform_incremental_update(analysis_request, changes, impact_analysis)
# Full analysis required
return None
async def _perform_incremental_update(self, analysis_request: Dict[str, Any],
changes: List[Dict[str, Any]],
impact_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Perform incremental analysis update."""
# Get base analysis from cache
base_analysis = await self.analysis_cache.get_cached_analysis(analysis_request)
if not base_analysis:
return None # No base analysis available
# Process only affected components
affected_entities = impact_analysis['affected_entities']
affected_relationships = impact_analysis['affected_relationships']
# Update affected entities
updated_entities = {}
for entity_id in affected_entities:
entity_data = self._get_entity_data(entity_id, changes)
if entity_data:
updated_analysis = await self._analyze_single_entity(entity_data)
updated_entities[entity_id] = updated_analysis
# Update affected relationships
updated_relationships = {}
for relationship_id in affected_relationships:
relationship_data = self._get_relationship_data(relationship_id, changes)
if relationship_data:
updated_relationship = await self._analyze_single_relationship(relationship_data)
updated_relationships[relationship_id] = updated_relationship
# Merge updates with base analysis
incremental_result = self._merge_incremental_updates(
base_analysis, updated_entities, updated_relationships
)
# Update cache with new result
await self.analysis_cache.update_cached_analysis(analysis_request, incremental_result)
return incremental_result
def _should_perform_incremental_update(self, impact_analysis: Dict[str, Any]) -> bool:
"""Determine if incremental update is more efficient than full analysis."""
total_entities = impact_analysis['total_entities']
affected_entities = len(impact_analysis['affected_entities'])
# Use incremental update if less than 30% of entities are affected
incremental_threshold = 0.3
impact_ratio = affected_entities / total_entities if total_entities > 0 else 1.0
return impact_ratio < incremental_threshold
Predictive Prefetching System
The prefetching system anticipates future analysis needs and preloads relevant data to reduce latency.
class PrefetchPredictor:
"""Predicts and prefetches likely future analysis requests."""
def __init__(self):
self.usage_pattern_analyzer = UsagePatternAnalyzer()
self.prediction_model = PredictionModel()
self.prefetch_scheduler = PrefetchScheduler()
self.prefetch_cache = PrefetchCache()
def predict_future_needs(self, current_request: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Predict likely future analysis requests based on current request."""
# Analyze current request context
request_context = self._extract_request_context(current_request)
# Get historical usage patterns
similar_patterns = self.usage_pattern_analyzer.find_similar_patterns(request_context)
# Generate predictions using machine learning model
predictions = self.prediction_model.predict_next_requests(request_context, similar_patterns)
# Filter and prioritize predictions
prioritized_predictions = self._prioritize_predictions(predictions, current_request)
# Convert to prefetch tasks
prefetch_tasks = self._create_prefetch_tasks(prioritized_predictions)
return prefetch_tasks
def _extract_request_context(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Extract contextual features from analysis request."""
context = {
'file_types': self._extract_file_types(request),
'analysis_types': request.get('analysis_types', []),
'project_structure': self._analyze_project_structure(request),
'user_patterns': self._extract_user_patterns(request),
'time_context': self._extract_time_context(request)
}
return context
def _prioritize_predictions(self, predictions: List[Dict[str, Any]],
current_request: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Prioritize predictions based on likelihood and value."""
prioritized = []
for prediction in predictions:
# Calculate priority score
likelihood = prediction['likelihood']
value = self._calculate_prediction_value(prediction, current_request)
cost = self._estimate_prefetch_cost(prediction)
priority_score = (likelihood * value) / (cost + 1)
prediction['priority_score'] = priority_score
prioritized.append(prediction)
# Sort by priority score and return top predictions
prioritized.sort(key=lambda x: x['priority_score'], reverse=True)
return prioritized[:10] # Limit to top 10 predictions
async def execute_prefetch_tasks(self, prefetch_tasks: List[Dict[str, Any]]) -> None:
"""Execute prefetch tasks asynchronously."""
for task in prefetch_tasks:
try:
# Check if already cached
if await self.prefetch_cache.is_cached(task['request']):
continue
# Execute prefetch analysis
prefetch_result = await self._execute_prefetch_analysis(task['request'])
# Cache prefetch result
await self.prefetch_cache.cache_prefetch_result(task['request'], prefetch_result)
except Exception as e:
# Log prefetch failure but don't interrupt main analysis
print(f"Prefetch task failed: {str(e)}")
continue
The comprehensive optimization framework provides significant performance improvements through intelligent caching, parallel processing, incremental analysis, and predictive prefetching. These optimizations work together to create a highly efficient code analysis system capable of handling large-scale codebases while maintaining real-time responsiveness.
Integration and System Coordination
The various components of the LLM-based code analyzer must work together seamlessly to provide comprehensive analysis capabilities. The integration layer coordinates between the language processing pipeline, chunking engine, GraphRAG store, LLM analysis engine, context-aware analysis engine, and optimization framework.
class CodeAnalysisOrchestrator:
"""Main orchestrator coordinating all analysis components with full LLM integration."""
def __init__(self):
self.language_processors = self._initialize_language_processors()
self.chunking_engine = SemanticChunkingEngine()
self.graph_store = GraphRAGStore()
self.context_manager = ContextMemoryManager()
self.analysis_engine = LLMAnalysisEngine() # Core LLM integration
self.optimizer = PerformanceOptimizer()
self.relationship_extractor = RelationshipExtractor()
self.synthesis_engine = AnalysisSynthesisEngine()
async def analyze_codebase(self, codebase_path: str,
analysis_config: Dict[str, Any]) -> Dict[str, Any]:
"""Perform comprehensive LLM-based codebase analysis."""
# Phase 1: Discovery and parsing
discovered_files = await self._discover_source_files(codebase_path)
parsed_entities = await self._parse_all_files(discovered_files)
# Phase 2: Relationship extraction and graph construction
relationships = self.relationship_extractor.extract_relationships(parsed_entities)
await self._populate_graph_store(parsed_entities, relationships)
# Phase 3: Intelligent chunking
optimized_chunks = self.chunking_engine.create_optimized_chunks(parsed_entities)
# Phase 4: LLM-based contextual analysis
analysis_results = await self._perform_llm_analysis(
optimized_chunks, analysis_config
)
# Phase 5: LLM-based synthesis and insights
final_report = await self._synthesize_with_llm(analysis_results, analysis_config)
return final_report
async def _perform_llm_analysis(self, chunks: List[CodeChunk],
config: Dict[str, Any]) -> Dict[str, Any]:
"""Perform LLM-based analysis on code chunks with GraphRAG context."""
analysis_results = {}
for chunk in chunks:
# Build GraphRAG-enhanced context for this chunk
related_entity_ids = []
for entity in chunk.entities:
entity_id = self.graph_store._generate_entity_id(entity)
related_ids = self.graph_store.find_related_entities(entity_id, max_depth=2)
related_entity_ids.extend(related_ids)
# Retrieve related entities from graph store
related_entities = []
for entity_id in set(related_entity_ids):
entity = self.graph_store.get_entity(entity_id)
if entity:
related_entities.append(entity)
# Build optimized context using context manager
context = self.context_manager.build_analysis_context(
chunk.entities + related_entities,
config.get('query', ''),
config.get('analysis_type', 'general')
)
# Perform LLM analysis with enriched context
chunk_analysis = await self.analysis_engine.analyze_with_context(context, config)
# Store results with metadata
analysis_results[chunk.chunk_id] = {
**chunk_analysis,
'chunk_metadata': {
'entity_count': len(chunk.entities),
'related_entity_count': len(related_entities),
'context_size': len(context.primary_entities) + len(context.supporting_entities),
'chunk_priority': chunk.context_priority
}
}
return analysis_results
async def _synthesize_with_llm(self, analysis_results: Dict[str, Any],
config: Dict[str, Any]) -> Dict[str, Any]:
"""Use LLM to synthesize final insights from chunk analyses."""
# Prepare synthesis data
synthesis_data = {
'chunk_count': len(analysis_results),
'analysis_type': config.get('analysis_type', 'general'),
'key_findings': self._extract_key_findings(analysis_results),
'quality_scores': self._extract_quality_scores(analysis_results),
'common_issues': self._identify_common_issues(analysis_results),
'architectural_patterns': self._identify_architectural_patterns(analysis_results)
}
# Build synthesis prompt
synthesis_prompt = self._build_synthesis_prompt(synthesis_data, analysis_results)
# Generate synthesis using LLM
synthesis_response = await self.analysis_engine.llm_client.generate_response(
synthesis_prompt,
max_tokens=3000,
temperature=0.1
)
# Parse synthesis response
parsed_synthesis = self.analysis_engine.response_parser.parse_synthesis_response(
synthesis_response, config.get('analysis_type', 'general')
)
return {
'executive_summary': parsed_synthesis,
'individual_analyses': analysis_results,
'synthesis_metadata': {
'analysis_timestamp': time.time(),
'chunks_analyzed': len(analysis_results),
'analysis_type': config.get('analysis_type', 'general'),
'total_entities': sum(
result['chunk_metadata']['entity_count']
for result in analysis_results.values()
),
'synthesis_quality_score': parsed_synthesis.get('quality_score', 0.0)
}
}
def _build_synthesis_prompt(self, synthesis_data: Dict[str, Any],
analysis_results: Dict[str, Any]) -> str:
"""Build comprehensive synthesis prompt for LLM."""
analysis_type = synthesis_data['analysis_type']
prompt = f"""You are a senior software architect and technical lead. Synthesize the following code analysis results into comprehensive insights and actionable recommendations.
ANALYSIS OVERVIEW:
- Analysis Type: {analysis_type}
- Chunks Analyzed: {synthesis_data['chunk_count']}
- Overall Quality Scores: {json.dumps(synthesis_data['quality_scores'], indent=2)}
KEY FINDINGS SUMMARY:
{json.dumps(synthesis_data['key_findings'], indent=2)}
COMMON ISSUES IDENTIFIED:
{json.dumps(synthesis_data['common_issues'], indent=2)}
ARCHITECTURAL PATTERNS:
{json.dumps(synthesis_data['architectural_patterns'], indent=2)}
DETAILED CHUNK ANALYSES:
{json.dumps(analysis_results, indent=2)}
SYNTHESIS REQUIREMENTS:
1. Executive Summary:
- Overall codebase health assessment
- Critical findings that require immediate attention
- Positive aspects and strengths identified
- Risk assessment and impact analysis
2. Prioritized Action Plan:
- Critical issues requiring immediate action
- Medium-term improvements and refactoring opportunities
- Long-term architectural evolution recommendations
- Resource allocation suggestions
3. Technical Insights:
- Code quality trends and patterns
- Architectural strengths and weaknesses
- Technology stack assessment
- Maintainability and scalability evaluation
4. Best Practices Recommendations:
- Development process improvements
- Code review and quality assurance enhancements
- Documentation and knowledge sharing suggestions
- Tool and framework recommendations
5. Implementation Roadmap:
- Phased approach to addressing identified issues
- Success metrics and monitoring strategies
- Team training and skill development needs
- Timeline and milestone suggestions
Please provide a comprehensive synthesis that transforms technical findings into strategic insights and actionable business recommendations. Include specific examples from the analysis where relevant.
Format your response as structured JSON with clear sections for each synthesis requirement."""
return prompt