Executive Summary
In the modern software development landscape, maintaining code quality, architectural integrity, and alignment with business objectives has become increasingly complex. Traditional code reviews and architecture assessments are time-consuming, often incomplete, and heavily dependent on the availability and expertise of senior architects and developers. This article presents a comprehensive approach to building a sophisticated Code and Architecture Review Agent that leverages Large Language Models, Agentic AI, and advanced information retrieval techniques to automate and enhance the review process.
The system we describe addresses critical challenges in software analysis: limited context memory of AI models, undocumented knowledge residing only in stakeholders' minds, and the complexity of understanding large-scale systems across multiple dimensions including business strategy, requirements, domain models, architecture, code implementation, testing, and deployment. Our approach employs a multi-agent architecture where specialized agents collaborate to provide comprehensive analysis with actionable recommendations delivered through detailed SWOT analyses.
The Challenge Landscape
Software systems today are not merely collections of code files. They represent the materialization of business strategies, the encoding of domain knowledge, the implementation of architectural decisions, and the evolution of requirements over time. Understanding such systems requires analyzing multiple interconnected dimensions simultaneously. A code repository contains thousands or millions of lines of code across multiple programming languages. Architecture Decision Records document the rationale behind structural choices. Requirements specifications capture functional needs, quality attributes, and constraints. Domain models represent the problem and solution spaces. Meeting notes and email exchanges preserve contextual discussions. Interview transcripts capture stakeholder perspectives.
Traditional review approaches struggle with this complexity. Human reviewers cannot maintain all this information in working memory simultaneously. They may lack domain expertise in specific areas. They cannot easily trace connections between business goals and implementation details across large codebases. Critical knowledge often remains undocumented, existing only in the minds of key personnel who may be unavailable or have left the organization.
Two fundamental challenges dominate the technical landscape of automated code and architecture review. The first challenge is limited context memory. Even the most advanced Large Language Models have finite context windows, typically ranging from a few thousand to a few hundred thousand tokens. A medium-sized enterprise application can easily contain millions of tokens worth of code, documentation, and related artifacts. No single LLM invocation can process the entire system at once.
The second challenge is undocumented knowledge. Software systems accumulate implicit knowledge over their lifetime. Why was a particular technology chosen? What business constraint drove a specific architectural decision? What assumptions underlie the domain model? Often this knowledge exists only in stakeholders' minds, never formally documented. An automated review agent cannot access this information directly and must either query stakeholders or make informed assumptions that require validation.
Architectural Overview of the Review Agent System
The Code and Architecture Review Agent employs a multi-agent architecture where specialized agents collaborate to analyze different aspects of the system under review. This approach mirrors how human review teams operate, with different experts focusing on their areas of expertise while a lead coordinator integrates their findings.
At the highest level, a Coordinator Agent manages the entire review process, interacting with users, orchestrating other agents, and implementing human-in-the-loop mechanisms for clarification and validation. Below the coordinator, specialized agents focus on specific domains: Business Goals and Strategy Agent, Requirements Agent, Domain Agents for problem and solution domains, Lead Architect Agent, Architecture Agents for components and subsystems, Developer Agents for code analysis, Test Agents for quality assurance analysis, and Deployment and Operations Agents for understanding system lifecycle management.
Each agent operates with its own Large Language Model instance, selected and potentially fine-tuned for its specific task. Agents communicate through a shared message bus and access information through multiple Retrieval-Augmented Generation vector databases and a GraphRAG knowledge graph that captures relationships between artifacts, decisions, requirements, and business goals.
The system architecture addresses context memory limitations through several complementary techniques. RAG vector databases store code, tests, documentation, and business artifacts in searchable embeddings. GraphRAG captures structural relationships and dependencies. Summarization compresses information while preserving essential details. Multiple smaller prompts replace monolithic queries. Concurrent agent execution parallelizes analysis. Context window sliding processes large artifacts in overlapping segments. Language parsers provide structured representations of code in multiple programming languages.
Technical Foundation: RAG and GraphRAG Infrastructure
The foundation of the review agent system rests on sophisticated information retrieval infrastructure. Traditional Retrieval-Augmented Generation stores documents as vector embeddings, enabling semantic search. When an agent needs information about a specific topic, it queries the vector database with a semantic description, retrieving the most relevant chunks of information. This approach works well for finding documents or code sections related to specific concepts.
However, software systems are fundamentally about relationships. A business goal influences requirements, which drive architecture decisions, which manifest in code structures, which are validated by tests. Understanding these connections requires more than semantic similarity; it requires graph-based reasoning. GraphRAG extends traditional RAG by maintaining an explicit knowledge graph alongside vector embeddings.
In the GraphRAG implementation for code and architecture review, nodes represent entities such as business goals, strategic initiatives, requirements, quality attributes, architecture decisions, components, classes, functions, tests, and deployment configurations. Edges represent relationships such as "goal drives requirement," "requirement influences decision," "decision implemented by component," "component contains class," "class tested by test," and "component deployed to environment."
The vector database implementation uses a hierarchical chunking strategy. Code files are chunked at multiple granularities: individual functions or methods, classes or modules, and complete files. This multi-level chunking allows agents to retrieve information at the appropriate level of detail. When analyzing a specific algorithm, function-level chunks provide precise context. When understanding component structure, class-level chunks offer better overview. When examining cross-cutting concerns, file-level chunks reveal broader patterns.
Here is a simplified example of the chunking strategy for a Python codebase:
import ast
import hashlib
from typing import List, Dict, Tuple
class CodeChunker:
def __init__(self, language: str):
self.language = language
self.parsers = {
'python': self._parse_python,
'java': self._parse_java,
'cpp': self._parse_cpp,
'javascript': self._parse_javascript
}
def chunk_file(self, filepath: str, content: str) -> List[Dict]:
parser = self.parsers.get(self.language)
if not parser:
return self._fallback_chunking(filepath, content)
chunks = []
file_chunk = {
'level': 'file',
'filepath': filepath,
'content': content,
'hash': self._compute_hash(content),
'metadata': {
'lines': len(content.split('\n')),
'language': self.language
}
}
chunks.append(file_chunk)
parsed_elements = parser(content)
for element in parsed_elements:
if element['type'] == 'class':
class_chunk = {
'level': 'class',
'filepath': filepath,
'name': element['name'],
'content': element['content'],
'hash': self._compute_hash(element['content']),
'metadata': {
'start_line': element['start_line'],
'end_line': element['end_line'],
'methods': [m['name'] for m in element.get('methods', [])]
}
}
chunks.append(class_chunk)
for method in element.get('methods', []):
method_chunk = {
'level': 'function',
'filepath': filepath,
'class_name': element['name'],
'name': method['name'],
'content': method['content'],
'hash': self._compute_hash(method['content']),
'metadata': {
'start_line': method['start_line'],
'end_line': method['end_line'],
'parameters': method.get('parameters', []),
'return_type': method.get('return_type')
}
}
chunks.append(method_chunk)
elif element['type'] == 'function':
function_chunk = {
'level': 'function',
'filepath': filepath,
'name': element['name'],
'content': element['content'],
'hash': self._compute_hash(element['content']),
'metadata': {
'start_line': element['start_line'],
'end_line': element['end_line'],
'parameters': element.get('parameters', []),
'return_type': element.get('return_type')
}
}
chunks.append(function_chunk)
return chunks
def _parse_python(self, content: str) -> List[Dict]:
elements = []
try:
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
class_info = {
'type': 'class',
'name': node.name,
'start_line': node.lineno,
'end_line': node.end_lineno,
'content': ast.get_source_segment(content, node),
'methods': []
}
for item in node.body:
if isinstance(item, ast.FunctionDef):
method_info = {
'name': item.name,
'start_line': item.lineno,
'end_line': item.end_lineno,
'content': ast.get_source_segment(content, item),
'parameters': [arg.arg for arg in item.args.args],
'return_type': ast.unparse(item.returns) if item.returns else None
}
class_info['methods'].append(method_info)
elements.append(class_info)
elif isinstance(node, ast.FunctionDef) and not isinstance(node, ast.AsyncFunctionDef):
if not any(isinstance(parent, ast.ClassDef) for parent in ast.walk(tree)):
function_info = {
'type': 'function',
'name': node.name,
'start_line': node.lineno,
'end_line': node.end_lineno,
'content': ast.get_source_segment(content, node),
'parameters': [arg.arg for arg in node.args.args],
'return_type': ast.unparse(node.returns) if node.returns else None
}
elements.append(function_info)
except SyntaxError as e:
print(f"Syntax error parsing Python code: {e}")
return elements
def _parse_java(self, content: str) -> List[Dict]:
# Java parsing would use a Java parser library
# This is a placeholder showing the interface
return []
def _parse_cpp(self, content: str) -> List[Dict]:
# C++ parsing would use a C++ parser library
# This is a placeholder showing the interface
return []
def _parse_javascript(self, content: str) -> List[Dict]:
# JavaScript parsing would use a JavaScript parser library
# This is a placeholder showing the interface
return []
def _fallback_chunking(self, filepath: str, content: str) -> List[Dict]:
lines = content.split('\n')
chunk_size = 50
chunks = []
for i in range(0, len(lines), chunk_size):
chunk_content = '\n'.join(lines[i:i+chunk_size])
chunk = {
'level': 'section',
'filepath': filepath,
'content': chunk_content,
'hash': self._compute_hash(chunk_content),
'metadata': {
'start_line': i + 1,
'end_line': min(i + chunk_size, len(lines)),
'language': self.language
}
}
chunks.append(chunk)
return chunks
def _compute_hash(self, content: str) -> str:
return hashlib.sha256(content.encode()).hexdigest()
This chunking implementation parses code at multiple levels of granularity. For Python code, it uses the abstract syntax tree parser to identify classes, methods, and standalone functions. Each element becomes a separate chunk with metadata about its location, structure, and relationships. The hash allows tracking changes across versions. Similar parsers handle Java, C++, JavaScript, and other languages commonly found in enterprise systems.
The vector database stores these chunks as embeddings generated by a suitable embedding model. For code, specialized models like CodeBERT or GraphCodeBERT often outperform general-purpose text embedding models because they understand programming language syntax and semantics.
The GraphRAG knowledge graph complements the vector database by maintaining explicit relationships. When the chunker processes code, it also extracts dependency information. A Python class that imports another module creates a "depends on" edge in the graph. A function that implements a specific requirement creates an "implements" edge. An architecture decision that affects a component creates an "influences" edge.
Here is an example of the graph construction logic:
from typing import List, Dict, Set, Tuple
import networkx as nx
class KnowledgeGraphBuilder:
def __init__(self):
self.graph = nx.MultiDiGraph()
self.node_types = {
'business_goal', 'strategy', 'requirement', 'quality_attribute',
'architecture_decision', 'component', 'class', 'function',
'test', 'deployment_config', 'domain_entity', 'domain_concept'
}
self.edge_types = {
'drives', 'influences', 'implements', 'depends_on', 'contains',
'tests', 'deployed_to', 'relates_to', 'conflicts_with',
'validates', 'derived_from', 'part_of'
}
def add_node(self, node_id: str, node_type: str, attributes: Dict):
if node_type not in self.node_types:
raise ValueError(f"Unknown node type: {node_type}")
self.graph.add_node(node_id, type=node_type, **attributes)
def add_edge(self, source_id: str, target_id: str, edge_type: str, attributes: Dict = None):
if edge_type not in self.edge_types:
raise ValueError(f"Unknown edge type: {edge_type}")
attrs = attributes or {}
self.graph.add_edge(source_id, target_id, type=edge_type, **attrs)
def build_from_code_chunks(self, chunks: List[Dict]):
for chunk in chunks:
node_id = f"{chunk['filepath']}::{chunk.get('name', 'file')}"
if chunk['level'] == 'file':
self.add_node(node_id, 'component', {
'filepath': chunk['filepath'],
'language': chunk['metadata']['language'],
'hash': chunk['hash']
})
elif chunk['level'] == 'class':
self.add_node(node_id, 'class', {
'name': chunk['name'],
'filepath': chunk['filepath'],
'hash': chunk['hash']
})
file_node_id = f"{chunk['filepath']}::file"
self.add_edge(file_node_id, node_id, 'contains')
elif chunk['level'] == 'function':
self.add_node(node_id, 'function', {
'name': chunk['name'],
'filepath': chunk['filepath'],
'hash': chunk['hash']
})
if 'class_name' in chunk:
class_node_id = f"{chunk['filepath']}::{chunk['class_name']}"
self.add_edge(class_node_id, node_id, 'contains')
else:
file_node_id = f"{chunk['filepath']}::file"
self.add_edge(file_node_id, node_id, 'contains')
def build_from_requirements(self, requirements: List[Dict]):
for req in requirements:
req_id = f"REQ-{req['id']}"
self.add_node(req_id, 'requirement', {
'title': req['title'],
'description': req['description'],
'priority': req.get('priority', 'medium'),
'type': req.get('type', 'functional')
})
if 'business_goal_id' in req:
goal_id = f"GOAL-{req['business_goal_id']}"
self.add_edge(goal_id, req_id, 'drives')
if 'quality_attributes' in req:
for qa in req['quality_attributes']:
qa_id = f"QA-{qa}"
if not self.graph.has_node(qa_id):
self.add_node(qa_id, 'quality_attribute', {'name': qa})
self.add_edge(req_id, qa_id, 'relates_to')
def build_from_architecture_decisions(self, decisions: List[Dict]):
for decision in decisions:
dec_id = f"ADR-{decision['id']}"
self.add_node(dec_id, 'architecture_decision', {
'title': decision['title'],
'status': decision['status'],
'context': decision['context'],
'decision': decision['decision'],
'consequences': decision['consequences']
})
if 'requirements' in decision:
for req_id in decision['requirements']:
full_req_id = f"REQ-{req_id}"
self.add_edge(full_req_id, dec_id, 'influences')
if 'components_affected' in decision:
for component in decision['components_affected']:
self.add_edge(dec_id, component, 'influences')
def link_tests_to_code(self, test_mappings: List[Dict]):
for mapping in test_mappings:
test_id = mapping['test_id']
tested_elements = mapping['tested_elements']
for element_id in tested_elements:
self.add_edge(test_id, element_id, 'tests', {
'coverage': mapping.get('coverage', 'unknown')
})
def find_paths(self, source_id: str, target_id: str, max_length: int = 5) -> List[List[str]]:
try:
paths = list(nx.all_simple_paths(self.graph, source_id, target_id, cutoff=max_length))
return paths
except nx.NetworkXNoPath:
return []
def find_related_nodes(self, node_id: str, edge_type: str = None, max_hops: int = 2) -> Set[str]:
if not self.graph.has_node(node_id):
return set()
related = set()
current_level = {node_id}
for hop in range(max_hops):
next_level = set()
for node in current_level:
for neighbor in self.graph.neighbors(node):
edge_data = self.graph.get_edge_data(node, neighbor)
if edge_type is None or any(e.get('type') == edge_type for e in edge_data.values()):
next_level.add(neighbor)
related.add(neighbor)
current_level = next_level
return related
def get_subgraph_for_component(self, component_id: str) -> nx.MultiDiGraph:
related_nodes = self.find_related_nodes(component_id, max_hops=3)
related_nodes.add(component_id)
return self.graph.subgraph(related_nodes).copy()
def export_to_format(self, format_type: str) -> str:
if format_type == 'graphml':
import io
buffer = io.BytesIO()
nx.write_graphml(self.graph, buffer)
return buffer.getvalue().decode()
elif format_type == 'json':
import json
data = nx.node_link_data(self.graph)
return json.dumps(data, indent=2)
else:
raise ValueError(f"Unsupported format: {format_type}")
This knowledge graph builder creates a rich network of relationships between all artifacts in the system under review. Business goals drive requirements, which influence architecture decisions, which affect components, which contain classes and functions, which are validated by tests. The graph enables sophisticated queries such as "Which business goals are not adequately covered by tests?" or "Which architecture decisions conflict with each other?" or "What is the impact radius of changing this requirement?"
The combination of vector databases for semantic search and knowledge graphs for relationship traversal provides agents with powerful information retrieval capabilities.
When an Architecture Agent needs to understand how a component implements a specific quality attribute, it can query the vector database for semantically similar code patterns and then traverse the graph to find the explicit requirement-to-decision-to-component links.
Context Management Strategies
Managing context effectively is crucial for the review agent system. Even with RAG and GraphRAG infrastructure, agents must carefully orchestrate their information retrieval and processing to stay within LLM context windows while maintaining comprehensive understanding.
The system employs several complementary strategies. Hierarchical summarization creates multi-level abstractions of large artifacts. A complete codebase might be summarized at the system level as "E-commerce platform with microservices architecture," at the component level as "Payment service handling transaction processing," and at the class level as "PaymentProcessor implementing strategy pattern for multiple payment gateways." Agents can navigate this hierarchy, diving deeper only where necessary.
Context window sliding processes large documents in overlapping segments. When analyzing a lengthy architecture document, the agent processes it in chunks with overlap to maintain continuity. Each chunk is analyzed in context of the previous chunk's summary, creating a rolling understanding that accumulates across the entire document.
Incremental processing breaks complex tasks into smaller steps. Rather than asking "Analyze this entire codebase," the Coordinator Agent orchestrates a sequence of focused questions: "Identify the main components," then "For each component, determine its responsibilities," then "For each component, analyze its implementation quality," and so forth. Each step operates within context limits while building toward comprehensive understanding.
Selective retrieval uses the knowledge graph to identify relevant information before retrieving it. When analyzing a specific component, the agent queries the graph to find related requirements, architecture decisions, and tests, then retrieves only those artifacts from the vector database. This targeted approach avoids overwhelming the context window with irrelevant information.
Here is an example of a context management system:
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from enum import Enum
class ContextPriority(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
@dataclass
class ContextItem:
content: str
priority: ContextPriority
token_count: int
metadata: Dict
class ContextManager:
def __init__(self, max_tokens: int, token_counter: Callable[[str], int]):
self.max_tokens = max_tokens
self.token_counter = token_counter
self.reserved_tokens = int(max_tokens * 0.1)
self.available_tokens = max_tokens - self.reserved_tokens
self.context_items: List[ContextItem] = []
self.system_prompt_tokens = 0
def set_system_prompt(self, prompt: str):
self.system_prompt_tokens = self.token_counter(prompt)
self.available_tokens = self.max_tokens - self.reserved_tokens - self.system_prompt_tokens
def add_item(self, content: str, priority: ContextPriority, metadata: Dict = None) -> bool:
token_count = self.token_counter(content)
if token_count > self.available_tokens:
return False
item = ContextItem(
content=content,
priority=priority,
token_count=token_count,
metadata=metadata or {}
)
self.context_items.append(item)
self.context_items.sort(key=lambda x: x.priority.value)
return self._fit_context()
def _fit_context(self) -> bool:
total_tokens = sum(item.token_count for item in self.context_items)
while total_tokens > self.available_tokens and self.context_items:
removed = self.context_items.pop()
total_tokens -= removed.token_count
return total_tokens <= self.available_tokens
def build_context(self) -> str:
return "\n\n".join(item.content for item in self.context_items)
def get_current_token_count(self) -> int:
return sum(item.token_count for item in self.context_items)
def get_available_tokens(self) -> int:
return self.available_tokens - self.get_current_token_count()
def clear(self):
self.context_items.clear()
def summarize_and_compress(self, summarizer: Callable[[str], str]) -> int:
if not self.context_items:
return 0
tokens_saved = 0
low_priority_items = [item for item in self.context_items if item.priority.value >= ContextPriority.MEDIUM.value]
for item in low_priority_items:
original_tokens = item.token_count
summary = summarizer(item.content)
summary_tokens = self.token_counter(summary)
if summary_tokens < original_tokens:
item.content = summary
item.token_count = summary_tokens
item.metadata['summarized'] = True
tokens_saved += (original_tokens - summary_tokens)
return tokens_saved
class SlidingWindowProcessor:
def __init__(self, window_size: int, overlap: int, token_counter: Callable[[str], int]):
self.window_size = window_size
self.overlap = overlap
self.token_counter = token_counter
def process_document(self, document: str, processor: Callable[[str, Dict], str]) -> List[str]:
chunks = self._create_chunks(document)
results = []
previous_summary = None
for i, chunk in enumerate(chunks):
context = {
'chunk_index': i,
'total_chunks': len(chunks),
'previous_summary': previous_summary
}
result = processor(chunk, context)
results.append(result)
if i < len(chunks) - 1:
previous_summary = self._summarize_result(result)
return results
def _create_chunks(self, document: str) -> List[str]:
lines = document.split('\n')
chunks = []
current_chunk = []
current_tokens = 0
for line in lines:
line_tokens = self.token_counter(line)
if current_tokens + line_tokens > self.window_size and current_chunk:
chunks.append('\n'.join(current_chunk))
overlap_lines = int(len(current_chunk) * (self.overlap / self.window_size))
current_chunk = current_chunk[-overlap_lines:] if overlap_lines > 0 else []
current_tokens = sum(self.token_counter(l) for l in current_chunk)
current_chunk.append(line)
current_tokens += line_tokens
if current_chunk:
chunks.append('\n'.join(current_chunk))
return chunks
def _summarize_result(self, result: str) -> str:
sentences = result.split('.')
return '. '.join(sentences[:3]) + '.' if len(sentences) > 3 else result
class CodeContextRetriever:
def __init__(self, vector_db, knowledge_graph, context_manager: ContextManager):
self.vector_db = vector_db
self.knowledge_graph = knowledge_graph
self.context_manager = context_manager
def retrieve_for_component_analysis(self, component_id: str) -> str:
self.context_manager.clear()
component_code = self.vector_db.get_by_id(component_id)
if component_code:
self.context_manager.add_item(
f"Component Code:\n{component_code}",
ContextPriority.CRITICAL,
{'type': 'code', 'component_id': component_id}
)
related_requirements = self.knowledge_graph.find_related_nodes(
component_id,
edge_type='implements',
max_hops=2
)
for req_id in list(related_requirements)[:3]:
req_doc = self.vector_db.get_by_id(req_id)
if req_doc:
self.context_manager.add_item(
f"Requirement {req_id}:\n{req_doc}",
ContextPriority.HIGH,
{'type': 'requirement', 'id': req_id}
)
related_decisions = self.knowledge_graph.find_related_nodes(
component_id,
edge_type='influences',
max_hops=1
)
for dec_id in list(related_decisions)[:2]:
dec_doc = self.vector_db.get_by_id(dec_id)
if dec_doc:
self.context_manager.add_item(
f"Architecture Decision {dec_id}:\n{dec_doc}",
ContextPriority.HIGH,
{'type': 'decision', 'id': dec_id}
)
related_tests = self.knowledge_graph.find_related_nodes(
component_id,
edge_type='tests',
max_hops=1
)
for test_id in list(related_tests)[:2]:
test_code = self.vector_db.get_by_id(test_id)
if test_code:
self.context_manager.add_item(
f"Test {test_id}:\n{test_code}",
ContextPriority.MEDIUM,
{'type': 'test', 'id': test_id}
)
dependencies = self.knowledge_graph.find_related_nodes(
component_id,
edge_type='depends_on',
max_hops=1
)
for dep_id in list(dependencies)[:3]:
dep_summary = self.vector_db.get_summary(dep_id)
if dep_summary:
self.context_manager.add_item(
f"Dependency {dep_id} Summary:\n{dep_summary}",
ContextPriority.LOW,
{'type': 'dependency', 'id': dep_id}
)
return self.context_manager.build_context()
def retrieve_for_requirement_tracing(self, requirement_id: str) -> str:
self.context_manager.clear()
req_doc = self.vector_db.get_by_id(requirement_id)
if req_doc:
self.context_manager.add_item(
f"Requirement:\n{req_doc}",
ContextPriority.CRITICAL,
{'type': 'requirement', 'id': requirement_id}
)
implementing_components = self.knowledge_graph.find_related_nodes(
requirement_id,
edge_type='implements',
max_hops=2
)
for comp_id in implementing_components:
comp_summary = self.vector_db.get_summary(comp_id)
if comp_summary:
self.context_manager.add_item(
f"Implementing Component {comp_id}:\n{comp_summary}",
ContextPriority.HIGH,
{'type': 'component', 'id': comp_id}
)
related_decisions = self.knowledge_graph.find_related_nodes(
requirement_id,
edge_type='influences',
max_hops=1
)
for dec_id in related_decisions:
dec_doc = self.vector_db.get_by_id(dec_id)
if dec_doc:
self.context_manager.add_item(
f"Related Decision {dec_id}:\n{dec_doc}",
ContextPriority.MEDIUM,
{'type': 'decision', 'id': dec_id}
)
return self.context_manager.build_context()
This context management system prioritizes information based on relevance and criticality. When analyzing a component, the component's own code receives critical priority, related requirements receive high priority, architecture decisions receive high priority, tests receive medium priority, and dependency summaries receive low priority. If the context window fills up, lower priority items are removed first. The system can also invoke summarization to compress lower priority items, freeing space for additional critical information.
The sliding window processor handles documents too large to fit in a single context window. It breaks the document into overlapping chunks, processes each chunk with awareness of the previous chunk's summary, and accumulates understanding across the entire document. This approach is particularly valuable for processing lengthy architecture documents, extensive meeting notes, or large email threads.
The code context retriever demonstrates how agents use the knowledge graph to identify relevant information before retrieving it from the vector database. When analyzing a component, it traverses the graph to find related requirements, architecture decisions, tests, and dependencies, then retrieves only those specific artifacts. This targeted retrieval ensures that the context window contains the most relevant information for the analysis task.
Multi-Agent Architecture Implementation
The multi-agent architecture distributes analysis responsibilities across specialized agents, each with expertise in a specific domain. This mirrors how human review teams operate, with business analysts, requirements engineers, domain experts, architects, developers, testers, and operations specialists collaborating to provide comprehensive assessment.
The Coordinator Agent serves as the orchestrator and user interface. It receives review requests from users, decomposes them into subtasks, assigns subtasks to appropriate specialized agents, aggregates their findings, resolves conflicts, and presents integrated results. The coordinator also implements human-in-the-loop mechanisms, querying stakeholders when agents encounter ambiguities or need validation of assumptions.
The Business Goals and Strategy Agent analyzes business documentation to understand the strategic objectives driving the system. It identifies business goals, strategic initiatives, success metrics, and constraints. It examines business cases, strategic planning documents, executive presentations, and market analysis. This agent determines whether the system's architecture and implementation align with business objectives and identifies misalignments or missed opportunities.
The Requirements Agent processes requirements specifications, user stories, use cases, and any other requirements artifacts. It identifies functional requirements, quality attributes, constraints, and priorities. It builds a comprehensive requirements model and traces requirements to architecture decisions and implementation artifacts. This agent detects incomplete, ambiguous, conflicting, or untestable requirements.
Domain Agents specialize in understanding the problem and solution domains. If Domain-Driven Design artifacts exist, these agents analyze them to understand bounded contexts, aggregates, entities, value objects, domain events, and domain services. If such artifacts do not exist, the agents construct domain models from code, documentation, and their training data. Domain agents identify domain complexity, model-code alignment issues, and opportunities for better domain modeling.
The Lead Architect Agent coordinates multiple Architecture Agents, each responsible for analyzing a specific component or subsystem. The Lead Architect Agent receives findings from individual Architecture Agents, integrates them into a coherent system-level architectural view, identifies cross-cutting concerns, detects architectural inconsistencies, and assesses overall architectural quality. It understands architectural styles, patterns, and quality attributes.
Architecture Agents analyze individual components or subsystems. Each Architecture Agent examines component structure, identifies architectural patterns used, assesses pattern application correctness, analyzes component interactions and dependencies, evaluates quality attribute satisfaction, and identifies architectural issues. These agents understand common architectural patterns such as layered architecture, microservices, event-driven architecture, hexagonal architecture, and domain-driven design tactical patterns.
Developer Agents support Architecture Agents by performing detailed code analysis. They examine code quality, identify code smells and anti-patterns, assess adherence to coding standards, analyze algorithmic complexity, detect security vulnerabilities, and evaluate maintainability. Developer Agents understand multiple programming languages and can parse and analyze code in Python, Java, C++, JavaScript, and other languages commonly used in enterprise systems.
Test Agents analyze testing artifacts including unit tests, integration tests, system tests, performance tests, and security tests. They assess test coverage, evaluate test quality, identify untested scenarios, analyze test strategy effectiveness, and detect issues in test implementation. Test Agents understand testing frameworks, mocking strategies, and test design patterns.
Deployment and Operations Agents examine how the system is deployed, monitored, maintained, and evolved. They analyze deployment configurations, infrastructure as code, monitoring and logging setups, operational procedures, and deployment automation. These agents identify operational risks, deployment issues, monitoring gaps, and opportunities for improved operational excellence.
Here is an example implementation of the agent framework:
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import asyncio
from datetime import datetime
class AgentRole(Enum):
COORDINATOR = "coordinator"
BUSINESS_STRATEGY = "business_strategy"
REQUIREMENTS = "requirements"
DOMAIN = "domain"
LEAD_ARCHITECT = "lead_architect"
ARCHITECTURE = "architecture"
DEVELOPER = "developer"
TEST = "test"
DEPLOYMENT_OPS = "deployment_ops"
class FindingSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class Finding:
id: str
agent_id: str
severity: FindingSeverity
category: str
title: str
description: str
location: str
evidence: List[str]
recommendation: str
related_findings: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class AgentTask:
task_id: str
task_type: str
parameters: Dict[str, Any]
priority: int
assigned_to: Optional[str] = None
status: str = "pending"
result: Optional[Any] = None
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class AgentMessage:
sender_id: str
receiver_id: str
message_type: str
content: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
class Agent(ABC):
def __init__(self, agent_id: str, role: AgentRole, llm_client, context_manager: ContextManager,
vector_db, knowledge_graph):
self.agent_id = agent_id
self.role = role
self.llm_client = llm_client
self.context_manager = context_manager
self.vector_db = vector_db
self.knowledge_graph = knowledge_graph
self.findings: List[Finding] = []
self.task_queue: asyncio.Queue = asyncio.Queue()
self.message_queue: asyncio.Queue = asyncio.Queue()
self.running = False
@abstractmethod
async def process_task(self, task: AgentTask) -> Any:
pass
@abstractmethod
def get_system_prompt(self) -> str:
pass
async def start(self):
self.running = True
await asyncio.gather(
self._process_tasks(),
self._process_messages()
)
async def stop(self):
self.running = False
async def _process_tasks(self):
while self.running:
try:
task = await asyncio.wait_for(self.task_queue.get(), timeout=1.0)
task.status = "processing"
result = await self.process_task(task)
task.result = result
task.status = "completed"
except asyncio.TimeoutError:
continue
except Exception as e:
if task:
task.status = "failed"
task.result = {"error": str(e)}
async def _process_messages(self):
while self.running:
try:
message = await asyncio.wait_for(self.message_queue.get(), timeout=1.0)
await self.handle_message(message)
except asyncio.TimeoutError:
continue
async def handle_message(self, message: AgentMessage):
pass
async def submit_task(self, task: AgentTask):
task.assigned_to = self.agent_id
await self.task_queue.put(task)
async def send_message(self, receiver_id: str, message_type: str, content: Dict[str, Any]):
message = AgentMessage(
sender_id=self.agent_id,
receiver_id=receiver_id,
message_type=message_type,
content=content
)
return message
def add_finding(self, severity: FindingSeverity, category: str, title: str,
description: str, location: str, evidence: List[str],
recommendation: str, metadata: Dict = None):
finding = Finding(
id=f"{self.agent_id}_{len(self.findings)}",
agent_id=self.agent_id,
severity=severity,
category=category,
title=title,
description=description,
location=location,
evidence=evidence,
recommendation=recommendation,
metadata=metadata or {}
)
self.findings.append(finding)
return finding
async def query_llm(self, prompt: str, context: str = "") -> str:
system_prompt = self.get_system_prompt()
full_prompt = f"{system_prompt}\n\nContext:\n{context}\n\nQuery:\n{prompt}"
response = await self.llm_client.generate(full_prompt)
return response
async def analyze_with_chain_of_thought(self, problem: str, context: str) -> Dict[str, Any]:
cot_prompt = f"""
Analyze the following problem using chain of thought reasoning.
Break down your analysis into clear steps.
Problem: {problem}
Please provide:
1. Understanding of the problem
2. Key factors to consider
3. Step-by-step analysis
4. Conclusion and recommendations
"""
response = await self.query_llm(cot_prompt, context)
return {
'reasoning': response,
'problem': problem
}
async def analyze_with_tree_of_thought(self, problem: str, context: str, num_branches: int = 3) -> Dict[str, Any]:
tot_prompt = f"""
Analyze the following problem by exploring multiple reasoning paths.
Generate {num_branches} different approaches to solving this problem.
Problem: {problem}
For each approach, provide:
1. The reasoning path
2. Strengths of this approach
3. Weaknesses of this approach
4. Confidence level (0-1)
Then synthesize the best elements from all approaches into a final recommendation.
"""
response = await self.query_llm(tot_prompt, context)
return {
'exploration': response,
'problem': problem
}
class ArchitectureAgent(Agent):
def __init__(self, agent_id: str, llm_client, context_manager: ContextManager,
vector_db, knowledge_graph, component_id: str):
super().__init__(agent_id, AgentRole.ARCHITECTURE, llm_client,
context_manager, vector_db, knowledge_graph)
self.component_id = component_id
self.architectural_patterns = [
'Layered Architecture', 'Microservices', 'Event-Driven',
'Hexagonal Architecture', 'CQRS', 'Event Sourcing',
'Repository Pattern', 'Factory Pattern', 'Strategy Pattern',
'Observer Pattern', 'Decorator Pattern', 'Adapter Pattern'
]
def get_system_prompt(self) -> str:
return f"""
You are an expert software architect analyzing component {self.component_id}.
Your expertise includes:
- Architectural patterns and styles
- Design principles (SOLID, DRY, KISS, YAGNI)
- Quality attributes (performance, scalability, maintainability, security)
- Component design and interaction patterns
- Dependency management
Your task is to analyze the component's architecture and identify:
- Architectural patterns used
- Correctness of pattern application
- Violations of design principles
- Quality attribute concerns
- Dependency issues
- Improvement opportunities
Provide specific, actionable recommendations for each finding.
"""
async def process_task(self, task: AgentTask) -> Any:
if task.task_type == "analyze_component":
return await self.analyze_component()
elif task.task_type == "identify_patterns":
return await self.identify_patterns()
elif task.task_type == "assess_quality_attributes":
return await self.assess_quality_attributes()
else:
return {"error": f"Unknown task type: {task.task_type}"}
async def analyze_component(self) -> Dict[str, Any]:
context_retriever = CodeContextRetriever(
self.vector_db,
self.knowledge_graph,
self.context_manager
)
context = context_retriever.retrieve_for_component_analysis(self.component_id)
analysis_prompt = f"""
Analyze the architecture of component {self.component_id}.
Provide a comprehensive analysis including:
1. Component structure and organization
2. Architectural patterns identified
3. Design principles adherence
4. Dependency analysis
5. Quality attributes assessment
6. Issues and concerns
7. Recommendations for improvement
"""
analysis = await self.analyze_with_chain_of_thought(analysis_prompt, context)
await self.extract_findings_from_analysis(analysis['reasoning'])
return {
'component_id': self.component_id,
'analysis': analysis,
'findings_count': len(self.findings)
}
async def identify_patterns(self) -> Dict[str, List[str]]:
context_retriever = CodeContextRetriever(
self.vector_db,
self.knowledge_graph,
self.context_manager
)
context = context_retriever.retrieve_for_component_analysis(self.component_id)
pattern_prompt = f"""
Identify architectural and design patterns used in this component.
For each pattern found, explain:
1. Where it is used
2. How it is implemented
3. Whether it is correctly applied
4. Any issues with the implementation
Known patterns to look for: {', '.join(self.architectural_patterns)}
"""
response = await self.query_llm(pattern_prompt, context)
return {
'patterns_analysis': response
}
async def assess_quality_attributes(self) -> Dict[str, Any]:
quality_attributes = [
'Performance', 'Scalability', 'Maintainability',
'Security', 'Reliability', 'Testability'
]
assessments = {}
for qa in quality_attributes:
assessment = await self.assess_single_quality_attribute(qa)
assessments[qa] = assessment
return assessments
async def assess_single_quality_attribute(self, quality_attribute: str) -> Dict[str, Any]:
context_retriever = CodeContextRetriever(
self.vector_db,
self.knowledge_graph,
self.context_manager
)
context = context_retriever.retrieve_for_component_analysis(self.component_id)
qa_prompt = f"""
Assess the {quality_attribute} of component {self.component_id}.
Provide:
1. Current state assessment
2. Specific concerns or issues
3. Evidence from the code
4. Recommendations for improvement
5. Priority level for addressing issues
"""
response = await self.query_llm(qa_prompt, context)
return {
'quality_attribute': quality_attribute,
'assessment': response
}
async def extract_findings_from_analysis(self, analysis_text: str):
extraction_prompt = f"""
Extract specific findings from this analysis.
For each finding, provide:
- Severity (critical/high/medium/low/info)
- Category
- Title (brief)
- Description
- Location in code
- Evidence
- Recommendation
Analysis:
{analysis_text}
Format as JSON array.
"""
response = await self.llm_client.generate(extraction_prompt)
import json
try:
findings_data = json.loads(response)
for finding_data in findings_data:
self.add_finding(
severity=FindingSeverity[finding_data['severity'].upper()],
category=finding_data['category'],
title=finding_data['title'],
description=finding_data['description'],
location=finding_data['location'],
evidence=finding_data['evidence'],
recommendation=finding_data['recommendation']
)
except json.JSONDecodeError:
pass
class RequirementsAgent(Agent):
def __init__(self, agent_id: str, llm_client, context_manager: ContextManager,
vector_db, knowledge_graph):
super().__init__(agent_id, AgentRole.REQUIREMENTS, llm_client,
context_manager, vector_db, knowledge_graph)
def get_system_prompt(self) -> str:
return """
You are an expert requirements engineer analyzing system requirements.
Your expertise includes:
- Requirements elicitation and analysis
- Requirements quality assessment
- Requirements traceability
- Conflict detection
- Completeness and consistency analysis
Your task is to analyze requirements and identify:
- Incomplete requirements
- Ambiguous requirements
- Conflicting requirements
- Untestable requirements
- Missing requirements
- Requirements not traced to implementation
- Requirements priority issues
Provide specific recommendations for improving requirements quality.
"""
async def process_task(self, task: AgentTask) -> Any:
if task.task_type == "analyze_requirements":
return await self.analyze_requirements()
elif task.task_type == "trace_requirements":
return await self.trace_requirements()
elif task.task_type == "detect_conflicts":
return await self.detect_conflicts()
else:
return {"error": f"Unknown task type: {task.task_type}"}
async def analyze_requirements(self) -> Dict[str, Any]:
requirements = self.vector_db.query_by_type('requirement')
analysis_results = []
for req in requirements:
req_analysis = await self.analyze_single_requirement(req)
analysis_results.append(req_analysis)
return {
'total_requirements': len(requirements),
'analyses': analysis_results,
'findings_count': len(self.findings)
}
async def analyze_single_requirement(self, requirement: Dict) -> Dict[str, Any]:
req_text = requirement.get('content', '')
req_id = requirement.get('id', '')
analysis_prompt = f"""
Analyze this requirement for quality issues:
Requirement ID: {req_id}
Requirement: {req_text}
Check for:
1. Completeness - does it fully specify what is needed?
2. Clarity - is it unambiguous?
3. Testability - can it be verified?
4. Consistency - does it conflict with other requirements?
5. Feasibility - is it realistic?
6. Priority - is priority appropriately assigned?
Identify any issues and provide recommendations.
"""
response = await self.query_llm(analysis_prompt, "")
return {
'requirement_id': req_id,
'analysis': response
}
async def trace_requirements(self) -> Dict[str, Any]:
requirements = self.vector_db.query_by_type('requirement')
traceability_results = []
for req in requirements:
req_id = req.get('id', '')
implementing_components = self.knowledge_graph.find_related_nodes(
req_id,
edge_type='implements',
max_hops=3
)
related_tests = self.knowledge_graph.find_related_nodes(
req_id,
edge_type='validates',
max_hops=3
)
trace_result = {
'requirement_id': req_id,
'implemented_by': list(implementing_components),
'validated_by': list(related_tests),
'is_implemented': len(implementing_components) > 0,
'is_tested': len(related_tests) > 0
}
if not trace_result['is_implemented']:
self.add_finding(
severity=FindingSeverity.HIGH,
category="Requirements Traceability",
title=f"Requirement {req_id} not implemented",
description=f"Requirement {req_id} has no implementing components",
location=req_id,
evidence=[f"No components found implementing {req_id}"],
recommendation="Implement this requirement or remove it if no longer needed"
)
if not trace_result['is_tested']:
self.add_finding(
severity=FindingSeverity.MEDIUM,
category="Requirements Traceability",
title=f"Requirement {req_id} not tested",
description=f"Requirement {req_id} has no validating tests",
location=req_id,
evidence=[f"No tests found validating {req_id}"],
recommendation="Add tests to validate this requirement"
)
traceability_results.append(trace_result)
return {
'traceability_matrix': traceability_results,
'unimplemented_count': sum(1 for r in traceability_results if not r['is_implemented']),
'untested_count': sum(1 for r in traceability_results if not r['is_tested'])
}
async def detect_conflicts(self) -> Dict[str, Any]:
requirements = self.vector_db.query_by_type('requirement')
conflicts = []
for i, req1 in enumerate(requirements):
for req2 in requirements[i+1:]:
conflict = await self.check_requirement_conflict(req1, req2)
if conflict:
conflicts.append(conflict)
return {
'conflicts_found': len(conflicts),
'conflicts': conflicts
}
async def check_requirement_conflict(self, req1: Dict, req2: Dict) -> Optional[Dict]:
conflict_prompt = f"""
Determine if these two requirements conflict with each other:
Requirement 1 ({req1.get('id')}): {req1.get('content')}
Requirement 2 ({req2.get('id')}): {req2.get('content')}
If they conflict, explain:
1. Nature of the conflict
2. Why they cannot both be satisfied
3. Recommendation for resolution
If no conflict, respond with "NO_CONFLICT"
"""
response = await self.query_llm(conflict_prompt, "")
if "NO_CONFLICT" not in response:
self.add_finding(
severity=FindingSeverity.HIGH,
category="Requirements Conflict",
title=f"Conflict between {req1.get('id')} and {req2.get('id')}",
description=response,
location=f"{req1.get('id')}, {req2.get('id')}",
evidence=[req1.get('content', ''), req2.get('content', '')],
recommendation="Resolve conflict through stakeholder discussion"
)
return {
'req1_id': req1.get('id'),
'req2_id': req2.get('id'),
'conflict_description': response
}
return None
class CoordinatorAgent(Agent):
def __init__(self, agent_id: str, llm_client, context_manager: ContextManager,
vector_db, knowledge_graph):
super().__init__(agent_id, AgentRole.COORDINATOR, llm_client,
context_manager, vector_db, knowledge_graph)
self.specialized_agents: Dict[str, Agent] = {}
self.review_plan: Optional[Dict] = None
def get_system_prompt(self) -> str:
return """
You are the coordinator agent orchestrating a comprehensive code and architecture review.
Your responsibilities include:
- Planning the review process
- Coordinating specialized agents
- Aggregating findings
- Resolving conflicts
- Interacting with human stakeholders
- Producing final reports
You have access to specialized agents for:
- Business strategy analysis
- Requirements analysis
- Domain modeling
- Architecture analysis
- Code analysis
- Test analysis
- Deployment and operations analysis
Coordinate these agents effectively to produce comprehensive review results.
"""
def register_agent(self, agent: Agent):
self.specialized_agents[agent.agent_id] = agent
async def process_task(self, task: AgentTask) -> Any:
if task.task_type == "conduct_review":
return await self.conduct_review(task.parameters)
elif task.task_type == "generate_report":
return await self.generate_report()
else:
return {"error": f"Unknown task type: {task.task_type}"}
async def conduct_review(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
self.review_plan = await self.create_review_plan(parameters)
tasks = self.review_plan['tasks']
task_futures = []
for task_spec in tasks:
agent_id = task_spec['agent_id']
agent = self.specialized_agents.get(agent_id)
if agent:
task = AgentTask(
task_id=task_spec['task_id'],
task_type=task_spec['task_type'],
parameters=task_spec.get('parameters', {}),
priority=task_spec.get('priority', 5)
)
task_future = agent.submit_task(task)
task_futures.append(task_future)
await asyncio.gather(*task_futures)
all_findings = self.aggregate_findings()
return {
'review_completed': True,
'total_findings': len(all_findings),
'findings_by_severity': self.categorize_findings_by_severity(all_findings)
}
async def create_review_plan(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
plan_prompt = f"""
Create a comprehensive review plan for the following system:
Parameters: {parameters}
Available agents:
{list(self.specialized_agents.keys())}
Create a plan that includes:
1. Sequence of analysis tasks
2. Agent assignments
3. Dependencies between tasks
4. Priority levels
Format as JSON.
"""
response = await self.query_llm(plan_prompt, "")
import json
try:
plan = json.loads(response)
return plan
except json.JSONDecodeError:
return {
'tasks': [
{'task_id': 'req_analysis', 'agent_id': 'requirements_agent', 'task_type': 'analyze_requirements', 'priority': 1},
{'task_id': 'arch_analysis', 'agent_id': 'lead_architect', 'task_type': 'analyze_architecture', 'priority': 2}
]
}
def aggregate_findings(self) -> List[Finding]:
all_findings = []
for agent in self.specialized_agents.values():
all_findings.extend(agent.findings)
return all_findings
def categorize_findings_by_severity(self, findings: List[Finding]) -> Dict[str, int]:
categorization = {severity.value: 0 for severity in FindingSeverity}
for finding in findings:
categorization[finding.severity.value] += 1
return categorization
async def generate_report(self) -> Dict[str, Any]:
all_findings = self.aggregate_findings()
swot_analysis = await self.generate_swot_analysis(all_findings)
report = {
'executive_summary': await self.generate_executive_summary(all_findings),
'swot_analysis': swot_analysis,
'findings': [self.format_finding(f) for f in all_findings],
'recommendations': await self.generate_recommendations(all_findings),
'metrics': self.calculate_metrics(all_findings)
}
return report
async def generate_swot_analysis(self, findings: List[Finding]) -> Dict[str, List[str]]:
findings_text = "\n".join([f"{f.title}: {f.description}" for f in findings[:50]])
swot_prompt = f"""
Based on these review findings, generate a SWOT analysis:
Findings:
{findings_text}
Provide:
- Strengths: What the system does well
- Weaknesses: Areas needing improvement
- Opportunities: Potential enhancements
- Threats: Risks and concerns
Format as JSON with arrays for each category.
"""
response = await self.query_llm(swot_prompt, "")
import json
try:
swot = json.loads(response)
return swot
except json.JSONDecodeError:
return {
'strengths': [],
'weaknesses': [],
'opportunities': [],
'threats': []
}
async def generate_executive_summary(self, findings: List[Finding]) -> str:
summary_prompt = f"""
Generate an executive summary of this code and architecture review.
Total findings: {len(findings)}
Critical: {sum(1 for f in findings if f.severity == FindingSeverity.CRITICAL)}
High: {sum(1 for f in findings if f.severity == FindingSeverity.HIGH)}
Medium: {sum(1 for f in findings if f.severity == FindingSeverity.MEDIUM)}
Low: {sum(1 for f in findings if f.severity == FindingSeverity.LOW)}
Provide a concise summary suitable for decision makers.
"""
response = await self.query_llm(summary_prompt, "")
return response
async def generate_recommendations(self, findings: List[Finding]) -> List[Dict[str, Any]]:
critical_findings = [f for f in findings if f.severity == FindingSeverity.CRITICAL]
high_findings = [f for f in findings if f.severity == FindingSeverity.HIGH]
priority_findings = critical_findings + high_findings
recommendations = []
for finding in priority_findings[:20]:
recommendations.append({
'finding_id': finding.id,
'title': finding.title,
'recommendation': finding.recommendation,
'priority': finding.severity.value
})
return recommendations
def format_finding(self, finding: Finding) -> Dict[str, Any]:
return {
'id': finding.id,
'severity': finding.severity.value,
'category': finding.category,
'title': finding.title,
'description': finding.description,
'location': finding.location,
'evidence': finding.evidence,
'recommendation': finding.recommendation,
'timestamp': finding.timestamp.isoformat()
}
def calculate_metrics(self, findings: List[Finding]) -> Dict[str, Any]:
return {
'total_findings': len(findings),
'by_severity': self.categorize_findings_by_severity(findings),
'by_category': self.categorize_findings_by_category(findings),
'by_agent': self.categorize_findings_by_agent(findings)
}
def categorize_findings_by_category(self, findings: List[Finding]) -> Dict[str, int]:
categories = {}
for finding in findings:
categories[finding.category] = categories.get(finding.category, 0) + 1
return categories
def categorize_findings_by_agent(self, findings: List[Finding]) -> Dict[str, int]:
agents = {}
for finding in findings:
agents[finding.agent_id] = agents.get(finding.agent_id, 0) + 1
return agents
This multi-agent framework provides the foundation for distributed analysis. The Coordinator Agent orchestrates the review process, creating a review plan, assigning tasks to specialized agents, and aggregating their findings into a comprehensive report. Architecture Agents analyze components using chain-of-thought reasoning and pattern recognition. Requirements Agents assess requirements quality and traceability. Each agent operates independently with its own LLM instance, enabling parallel processing and specialized expertise.
The agent communication infrastructure uses asynchronous message queues, allowing agents to coordinate without blocking. Agents can send messages to request information, report findings, or coordinate activities. The task queue enables the coordinator to distribute work across agents efficiently.
The finding data structure captures all essential information about issues discovered during review. Each finding includes severity, category, title, description, location, evidence, and recommendation. Findings can be linked to related findings, enabling the system to identify clusters of related issues or root causes affecting multiple areas.
Handling Undocumented Knowledge
One of the most challenging aspects of automated code and architecture review is dealing with undocumented knowledge that exists only in stakeholders' minds. The review agent system addresses this through a combination of inference, assumption validation, and human-in-the-loop interaction.
When the system encounters gaps in documentation, it first attempts to infer missing information from available artifacts. For example, if architecture decision records do not explain why a particular technology was chosen, the system might infer the rationale by analyzing the requirements, identifying quality attributes that the technology satisfies, and examining the historical context when the decision was made. The system then presents this inference to stakeholders for validation.
The human-in-the-loop mechanism allows agents to query stakeholders when they encounter ambiguities or need clarification. The Coordinator Agent maintains a list of questions that arise during analysis. Rather than interrupting stakeholders immediately for each question, it batches questions and presents them at appropriate intervals. This respects stakeholders' time while ensuring critical information is obtained.
When stakeholders cannot provide answers, the system makes explicit assumptions and documents them. These assumptions are marked clearly in the review report, allowing readers to understand what information was inferred versus what was explicitly documented. The system also assigns confidence levels to its inferences, helping stakeholders prioritize which assumptions to validate.
Here is an example of the assumption management system:
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
class AssumptionConfidence(Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class AssumptionStatus(Enum):
PENDING = "pending"
VALIDATED = "validated"
REJECTED = "rejected"
UNKNOWN = "unknown"
@dataclass
class Assumption:
id: str
category: str
description: str
rationale: str
confidence: AssumptionConfidence
evidence: List[str]
impact: str
status: AssumptionStatus = AssumptionStatus.PENDING
validation_notes: Optional[str] = None
created_by: str = ""
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class StakeholderQuestion:
id: str
question: str
context: str
priority: int
asked_by: str
category: str
related_assumptions: List[str] = field(default_factory=list)
answer: Optional[str] = None
answered_by: Optional[str] = None
answered_at: Optional[datetime] = None
class AssumptionManager:
def __init__(self):
self.assumptions: Dict[str, Assumption] = {}
self.questions: Dict[str, StakeholderQuestion] = {}
self.assumption_counter = 0
self.question_counter = 0
def create_assumption(self, category: str, description: str, rationale: str,
confidence: AssumptionConfidence, evidence: List[str],
impact: str, created_by: str) -> Assumption:
self.assumption_counter += 1
assumption_id = f"ASSUME-{self.assumption_counter:04d}"
assumption = Assumption(
id=assumption_id,
category=category,
description=description,
rationale=rationale,
confidence=confidence,
evidence=evidence,
impact=impact,
created_by=created_by
)
self.assumptions[assumption_id] = assumption
return assumption
def create_question(self, question: str, context: str, priority: int,
asked_by: str, category: str,
related_assumptions: List[str] = None) -> StakeholderQuestion:
self.question_counter += 1
question_id = f"Q-{self.question_counter:04d}"
stakeholder_question = StakeholderQuestion(
id=question_id,
question=question,
context=context,
priority=priority,
asked_by=asked_by,
category=category,
related_assumptions=related_assumptions or []
)
self.questions[question_id] = stakeholder_question
return stakeholder_question
def answer_question(self, question_id: str, answer: str, answered_by: str):
if question_id in self.questions:
question = self.questions[question_id]
question.answer = answer
question.answered_by = answered_by
question.answered_at = datetime.now()
for assumption_id in question.related_assumptions:
if assumption_id in self.assumptions:
assumption = self.assumptions[assumption_id]
assumption.status = AssumptionStatus.VALIDATED
assumption.validation_notes = f"Validated via question {question_id}: {answer}"
def validate_assumption(self, assumption_id: str, is_valid: bool, notes: str):
if assumption_id in self.assumptions:
assumption = self.assumptions[assumption_id]
assumption.status = AssumptionStatus.VALIDATED if is_valid else AssumptionStatus.REJECTED
assumption.validation_notes = notes
def get_pending_assumptions(self) -> List[Assumption]:
return [a for a in self.assumptions.values() if a.status == AssumptionStatus.PENDING]
def get_unanswered_questions(self) -> List[StakeholderQuestion]:
return [q for q in self.questions.values() if q.answer is None]
def get_high_priority_questions(self, limit: int = 10) -> List[StakeholderQuestion]:
unanswered = self.get_unanswered_questions()
sorted_questions = sorted(unanswered, key=lambda q: q.priority, reverse=True)
return sorted_questions[:limit]
def generate_assumption_report(self) -> Dict[str, Any]:
return {
'total_assumptions': len(self.assumptions),
'by_status': {
'pending': len([a for a in self.assumptions.values() if a.status == AssumptionStatus.PENDING]),
'validated': len([a for a in self.assumptions.values() if a.status == AssumptionStatus.VALIDATED]),
'rejected': len([a for a in self.assumptions.values() if a.status == AssumptionStatus.REJECTED])
},
'by_confidence': {
'high': len([a for a in self.assumptions.values() if a.confidence == AssumptionConfidence.HIGH]),
'medium': len([a for a in self.assumptions.values() if a.confidence == AssumptionConfidence.MEDIUM]),
'low': len([a for a in self.assumptions.values() if a.confidence == AssumptionConfidence.LOW])
},
'assumptions': [self._format_assumption(a) for a in self.assumptions.values()]
}
def _format_assumption(self, assumption: Assumption) -> Dict[str, Any]:
return {
'id': assumption.id,
'category': assumption.category,
'description': assumption.description,
'rationale': assumption.rationale,
'confidence': assumption.confidence.value,
'evidence': assumption.evidence,
'impact': assumption.impact,
'status': assumption.status.value,
'validation_notes': assumption.validation_notes,
'created_by': assumption.created_by,
'created_at': assumption.created_at.isoformat()
}
class HumanInTheLoopInterface:
def __init__(self, assumption_manager: AssumptionManager):
self.assumption_manager = assumption_manager
self.interaction_log: List[Dict] = []
async def request_clarification(self, agent_id: str, question: str,
context: str, priority: int,
category: str) -> Optional[str]:
stakeholder_question = self.assumption_manager.create_question(
question=question,
context=context,
priority=priority,
asked_by=agent_id,
category=category
)
self.interaction_log.append({
'type': 'question_created',
'question_id': stakeholder_question.id,
'agent_id': agent_id,
'timestamp': datetime.now().isoformat()
})
answer = await self._present_question_to_stakeholder(stakeholder_question)
if answer:
self.assumption_manager.answer_question(
stakeholder_question.id,
answer,
"stakeholder"
)
self.interaction_log.append({
'type': 'question_answered',
'question_id': stakeholder_question.id,
'timestamp': datetime.now().isoformat()
})
return answer
async def _present_question_to_stakeholder(self, question: StakeholderQuestion) -> Optional[str]:
pass
async def batch_present_questions(self) -> Dict[str, str]:
high_priority_questions = self.assumption_manager.get_high_priority_questions(limit=10)
answers = {}
for question in high_priority_questions:
answer = await self._present_question_to_stakeholder(question)
if answer:
answers[question.id] = answer
self.assumption_manager.answer_question(question.id, answer, "stakeholder")
return answers
def create_assumption_with_question(self, agent_id: str, category: str,
description: str, rationale: str,
confidence: AssumptionConfidence,
evidence: List[str], impact: str,
validation_question: str,
question_context: str,
question_priority: int) -> Assumption:
assumption = self.assumption_manager.create_assumption(
category=category,
description=description,
rationale=rationale,
confidence=confidence,
evidence=evidence,
impact=impact,
created_by=agent_id
)
question = self.assumption_manager.create_question(
question=validation_question,
context=question_context,
priority=question_priority,
asked_by=agent_id,
category=category,
related_assumptions=[assumption.id]
)
return assumption
def get_interaction_summary(self) -> Dict[str, Any]:
return {
'total_interactions': len(self.interaction_log),
'questions_asked': len([i for i in self.interaction_log if i['type'] == 'question_created']),
'questions_answered': len([i for i in self.interaction_log if i['type'] == 'question_answered']),
'pending_questions': len(self.assumption_manager.get_unanswered_questions()),
'pending_assumptions': len(self.assumption_manager.get_pending_assumptions())
}
This assumption management system allows agents to explicitly track what they know, what they infer, and what they need to validate. When an agent encounters missing information, it creates an assumption with a confidence level based on the available evidence. The assumption includes rationale explaining why the agent believes this assumption is reasonable, evidence supporting it, and impact describing what depends on this assumption being correct.
The human-in-the-loop interface enables agents to ask stakeholders for clarification. Questions are prioritized so that the most critical uncertainties are addressed first. The system batches questions to avoid overwhelming stakeholders with constant interruptions. When stakeholders answer questions, the system automatically updates the status of related assumptions.
This approach makes the review process transparent. Stakeholders can see exactly what the system knows with certainty, what it has inferred, and what confidence it has in its inferences. They can focus their attention on validating low-confidence assumptions that have high impact, ensuring that critical decisions are not based on incorrect inferences.
Advanced Reasoning Techniques
The review agent system employs advanced reasoning techniques to improve analysis quality and efficiency. Chain of Thought prompting encourages the LLM to break down complex problems into steps, explaining its reasoning process. This produces more accurate results and makes the analysis process transparent and verifiable.
Tree of Thought reasoning explores multiple solution paths simultaneously. When analyzing a complex architectural issue, the system might explore several different explanations or solutions in parallel, evaluating the strengths and weaknesses of each approach before synthesizing a final recommendation. This multi-path exploration often reveals insights that single-path reasoning would miss.
Self-reflection allows agents to evaluate their own performance and adapt their strategies. After completing an analysis task, an agent can reflect on whether its approach was effective, whether it gathered sufficient information, whether its conclusions are well-supported, and whether alternative approaches might yield better results. Based on this reflection, the agent can adjust its strategy for subsequent tasks.
Here is an example of advanced reasoning implementation:
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class ReasoningStrategy(Enum):
CHAIN_OF_THOUGHT = "chain_of_thought"
TREE_OF_THOUGHT = "tree_of_thought"
SELF_REFLECTION = "self_reflection"
ANALOGICAL = "analogical"
@dataclass
class ReasoningPath:
path_id: str
strategy: ReasoningStrategy
steps: List[str]
conclusion: str
confidence: float
strengths: List[str]
weaknesses: List[str]
class AdvancedReasoner:
def __init__(self, llm_client):
self.llm_client = llm_client
self.reasoning_history: List[Dict] = []
async def chain_of_thought_analysis(self, problem: str, context: str) -> Dict[str, Any]:
cot_prompt = f"""
Analyze the following problem using step-by-step reasoning.
Show your thought process clearly.
Problem: {problem}
Context: {context}
Please provide:
1. Problem Understanding: Restate the problem in your own words
2. Key Observations: What are the critical factors?
3. Analysis Steps: Break down the analysis into clear steps
4. Intermediate Conclusions: What do you learn at each step?
5. Final Conclusion: What is your overall assessment?
6. Confidence Level: How confident are you in this conclusion? (0-1)
7. Uncertainties: What aspects are you uncertain about?
"""
response = await self.llm_client.generate(cot_prompt)
reasoning_record = {
'strategy': ReasoningStrategy.CHAIN_OF_THOUGHT.value,
'problem': problem,
'response': response,
'timestamp': datetime.now().isoformat()
}
self.reasoning_history.append(reasoning_record)
return {
'strategy': 'chain_of_thought',
'analysis': response,
'problem': problem
}
async def tree_of_thought_analysis(self, problem: str, context: str,
num_paths: int = 3) -> Dict[str, Any]:
paths: List[ReasoningPath] = []
for i in range(num_paths):
path = await self._explore_reasoning_path(problem, context, i)
paths.append(path)
synthesis = await self._synthesize_paths(problem, paths)
reasoning_record = {
'strategy': ReasoningStrategy.TREE_OF_THOUGHT.value,
'problem': problem,
'paths': [self._format_path(p) for p in paths],
'synthesis': synthesis,
'timestamp': datetime.now().isoformat()
}
self.reasoning_history.append(reasoning_record)
return {
'strategy': 'tree_of_thought',
'paths': paths,
'synthesis': synthesis,
'problem': problem
}
async def _explore_reasoning_path(self, problem: str, context: str,
path_index: int) -> ReasoningPath:
exploration_prompt = f"""
Explore reasoning path #{path_index + 1} for this problem.
Take a different approach from other paths.
Problem: {problem}
Context: {context}
Provide:
1. Approach: What perspective or method are you using?
2. Reasoning Steps: Step-by-step analysis
3. Conclusion: What conclusion does this path lead to?
4. Confidence: How confident are you in this path? (0-1)
5. Strengths: What are the strengths of this approach?
6. Weaknesses: What are the limitations of this approach?
"""
response = await self.llm_client.generate(exploration_prompt)
path = ReasoningPath(
path_id=f"path_{path_index}",
strategy=ReasoningStrategy.TREE_OF_THOUGHT,
steps=[response],
conclusion=self._extract_conclusion(response),
confidence=self._extract_confidence(response),
strengths=self._extract_strengths(response),
weaknesses=self._extract_weaknesses(response)
)
return path
async def _synthesize_paths(self, problem: str, paths: List[ReasoningPath]) -> str:
paths_summary = "\n\n".join([
f"Path {i+1}:\nConclusion: {p.conclusion}\nConfidence: {p.confidence}\nStrengths: {', '.join(p.strengths)}\nWeaknesses: {', '.join(p.weaknesses)}"
for i, p in enumerate(paths)
])
synthesis_prompt = f"""
Synthesize insights from multiple reasoning paths into a final recommendation.
Problem: {problem}
Reasoning Paths:
{paths_summary}
Provide:
1. Common Insights: What do all paths agree on?
2. Divergent Views: Where do paths disagree?
3. Best Elements: What are the strongest points from each path?
4. Integrated Conclusion: Synthesize the best elements into a final conclusion
5. Confidence: Overall confidence in the integrated conclusion (0-1)
6. Recommendations: Actionable recommendations based on the synthesis
"""
synthesis = await self.llm_client.generate(synthesis_prompt)
return synthesis
async def self_reflection_analysis(self, previous_analysis: Dict[str, Any],
task_description: str) -> Dict[str, Any]:
reflection_prompt = f"""
Reflect on the quality and effectiveness of this analysis.
Task: {task_description}
Previous Analysis:
{previous_analysis}
Evaluate:
1. Completeness: Did the analysis cover all important aspects?
2. Accuracy: Are the conclusions well-supported by evidence?
3. Clarity: Is the analysis clear and understandable?
4. Methodology: Was the approach appropriate for the problem?
5. Gaps: What important aspects were missed or underexplored?
6. Improvements: How could the analysis be improved?
7. Alternative Approaches: What other methods might yield better insights?
8. Confidence Assessment: Is the stated confidence level appropriate?
Provide specific suggestions for improvement.
"""
reflection = await self.llm_client.generate(reflection_prompt)
reasoning_record = {
'strategy': ReasoningStrategy.SELF_REFLECTION.value,
'task': task_description,
'reflection': reflection,
'timestamp': datetime.now().isoformat()
}
self.reasoning_history.append(reasoning_record)
return {
'strategy': 'self_reflection',
'reflection': reflection,
'task': task_description
}
async def analogical_reasoning(self, problem: str, context: str,
known_patterns: List[Dict]) -> Dict[str, Any]:
patterns_description = "\n".join([
f"Pattern: {p['name']}\nContext: {p['context']}\nSolution: {p['solution']}\nOutcome: {p['outcome']}"
for p in known_patterns
])
analogy_prompt = f"""
Use analogical reasoning to analyze this problem by comparing it to known patterns.
Current Problem: {problem}
Current Context: {context}
Known Patterns:
{patterns_description}
Provide:
1. Similar Patterns: Which known patterns are most similar to this problem?
2. Similarity Analysis: How is this problem similar to each pattern?
3. Differences: How does this problem differ from each pattern?
4. Applicable Solutions: Which solutions from known patterns might apply here?
5. Adaptations Needed: How should solutions be adapted for this specific context?
6. Novel Aspects: What aspects of this problem are not covered by known patterns?
7. Recommendations: Final recommendations based on analogical reasoning
"""
response = await self.llm_client.generate(analogy_prompt)
reasoning_record = {
'strategy': ReasoningStrategy.ANALOGICAL.value,
'problem': problem,
'patterns_used': [p['name'] for p in known_patterns],
'response': response,
'timestamp': datetime.now().isoformat()
}
self.reasoning_history.append(reasoning_record)
return {
'strategy': 'analogical',
'analysis': response,
'problem': problem
}
def _extract_conclusion(self, response: str) -> str:
lines = response.split('\n')
for line in lines:
if 'conclusion' in line.lower():
return line
return "Conclusion not explicitly stated"
def _extract_confidence(self, response: str) -> float:
import re
match = re.search(r'confidence[:\s]+([0-9.]+)', response.lower())
if match:
try:
return float(match.group(1))
except ValueError:
return 0.5
return 0.5
def _extract_strengths(self, response: str) -> List[str]:
strengths = []
in_strengths_section = False
for line in response.split('\n'):
if 'strength' in line.lower():
in_strengths_section = True
continue
if in_strengths_section:
if line.strip() and not any(keyword in line.lower() for keyword in ['weakness', 'conclusion', 'confidence']):
strengths.append(line.strip())
else:
break
return strengths if strengths else ["Not explicitly stated"]
def _extract_weaknesses(self, response: str) -> List[str]:
weaknesses = []
in_weaknesses_section = False
for line in response.split('\n'):
if 'weakness' in line.lower():
in_weaknesses_section = True
continue
if in_weaknesses_section:
if line.strip() and not any(keyword in line.lower() for keyword in ['strength', 'conclusion', 'confidence']):
weaknesses.append(line.strip())
else:
break
return weaknesses if weaknesses else ["Not explicitly stated"]
def _format_path(self, path: ReasoningPath) -> Dict[str, Any]:
return {
'path_id': path.path_id,
'conclusion': path.conclusion,
'confidence': path.confidence,
'strengths': path.strengths,
'weaknesses': path.weaknesses
}
def get_reasoning_summary(self) -> Dict[str, Any]:
return {
'total_reasoning_sessions': len(self.reasoning_history),
'by_strategy': {
strategy.value: len([r for r in self.reasoning_history if r['strategy'] == strategy.value])
for strategy in ReasoningStrategy
},
'recent_sessions': self.reasoning_history[-10:]
}
This advanced reasoning system provides agents with multiple analytical approaches. Chain of thought reasoning produces transparent, step-by-step analysis that is easy to verify and understand. Tree of thought reasoning explores multiple perspectives simultaneously, often revealing insights that single-path analysis would miss. Self-reflection enables agents to evaluate and improve their own performance. Analogical reasoning leverages known patterns and solutions to address new problems.
The combination of these techniques produces more robust and reliable analysis. For critical findings, agents might use tree of thought to explore multiple explanations, then use self-reflection to evaluate the quality of their analysis, ensuring that conclusions are well-supported and thoroughly considered.
Integration with External Tools and Systems
The review agent system does not operate in isolation. It integrates with various external tools and systems to access information and perform specialized analyses. Git repository integration enables the system to access source code, examine version history, identify contributors, and analyze evolution patterns. Requirements management tool integration provides access to formal requirements specifications, traceability matrices, and change history. Documentation system integration retrieves architecture documents, design specifications, API documentation, and user manuals.
Code analysis tool integration leverages specialized static analysis tools for security scanning, code quality assessment, complexity metrics, and dependency analysis. These tools often provide deeper technical analysis than general-purpose LLMs can achieve. The review agent system orchestrates these tools, interprets their results, and integrates their findings into the comprehensive review.
Testing framework integration accesses test suites, coverage reports, test execution results, and performance benchmarks. Deployment and infrastructure tool integration examines deployment configurations, infrastructure as code, monitoring dashboards, and operational logs.
Here is an example of tool integration architecture:
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
import subprocess
import json
class ExternalTool(ABC):
def __init__(self, tool_name: str, config: Dict[str, Any]):
self.tool_name = tool_name
self.config = config
@abstractmethod
async def execute(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
pass
@abstractmethod
def parse_results(self, raw_output: str) -> Dict[str, Any]:
pass
class GitRepositoryTool(ExternalTool):
def __init__(self, config: Dict[str, Any]):
super().__init__("git", config)
self.repo_path = config.get('repo_path', '.')
async def execute(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
command = parameters.get('command')
if command == 'get_history':
return await self.get_commit_history(parameters.get('file_path'))
elif command == 'get_contributors':
return await self.get_contributors(parameters.get('file_path'))
elif command == 'get_file_at_commit':
return await self.get_file_at_commit(
parameters.get('file_path'),
parameters.get('commit_hash')
)
elif command == 'analyze_churn':
return await self.analyze_code_churn(parameters.get('since_date'))
else:
return {'error': f'Unknown command: {command}'}
async def get_commit_history(self, file_path: Optional[str] = None) -> Dict[str, Any]:
cmd = ['git', '-C', self.repo_path, 'log', '--pretty=format:%H|%an|%ae|%ad|%s']
if file_path:
cmd.extend(['--', file_path])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return {'error': result.stderr}
commits = []
for line in result.stdout.split('\n'):
if line:
parts = line.split('|')
if len(parts) >= 5:
commits.append({
'hash': parts[0],
'author': parts[1],
'email': parts[2],
'date': parts[3],
'message': parts[4]
})
return {'commits': commits, 'count': len(commits)}
async def get_contributors(self, file_path: Optional[str] = None) -> Dict[str, Any]:
cmd = ['git', '-C', self.repo_path, 'shortlog', '-sn']
if file_path:
cmd.extend(['--', file_path])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return {'error': result.stderr}
contributors = []
for line in result.stdout.split('\n'):
if line.strip():
parts = line.strip().split('\t')
if len(parts) == 2:
contributors.append({
'commits': int(parts[0]),
'name': parts[1]
})
return {'contributors': contributors}
async def get_file_at_commit(self, file_path: str, commit_hash: str) -> Dict[str, Any]:
cmd = ['git', '-C', self.repo_path, 'show', f'{commit_hash}:{file_path}']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return {'error': result.stderr}
return {
'file_path': file_path,
'commit_hash': commit_hash,
'content': result.stdout
}
async def analyze_code_churn(self, since_date: str) -> Dict[str, Any]:
cmd = ['git', '-C', self.repo_path, 'log', '--since', since_date,
'--numstat', '--pretty=format:%H']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
return {'error': result.stderr}
file_changes = {}
current_commit = None
for line in result.stdout.split('\n'):
if not line.strip():
continue
if len(line) == 40 and all(c in '0123456789abcdef' for c in line):
current_commit = line
else:
parts = line.split('\t')
if len(parts) == 3:
added, deleted, filepath = parts
if filepath not in file_changes:
file_changes[filepath] = {'added': 0, 'deleted': 0, 'commits': set()}
if added != '-':
file_changes[filepath]['added'] += int(added)
if deleted != '-':
file_changes[filepath]['deleted'] += int(deleted)
if current_commit:
file_changes[filepath]['commits'].add(current_commit)
churn_data = []
for filepath, data in file_changes.items():
churn_data.append({
'file': filepath,
'lines_added': data['added'],
'lines_deleted': data['deleted'],
'total_changes': data['added'] + data['deleted'],
'commit_count': len(data['commits'])
})
churn_data.sort(key=lambda x: x['total_changes'], reverse=True)
return {'churn_analysis': churn_data}
def parse_results(self, raw_output: str) -> Dict[str, Any]:
try:
return json.loads(raw_output)
except json.JSONDecodeError:
return {'raw_output': raw_output}
class StaticAnalysisTool(ExternalTool):
def __init__(self, config: Dict[str, Any]):
super().__init__("static_analysis", config)
self.analyzer_path = config.get('analyzer_path')
self.rules_config = config.get('rules_config')
async def execute(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
target_path = parameters.get('target_path')
analysis_type = parameters.get('analysis_type', 'full')
cmd = [self.analyzer_path, 'analyze', target_path]
if self.rules_config:
cmd.extend(['--config', self.rules_config])
if analysis_type == 'security':
cmd.append('--security-only')
elif analysis_type == 'quality':
cmd.append('--quality-only')
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0 and result.returncode != 1:
return {'error': result.stderr}
return self.parse_results(result.stdout)
def parse_results(self, raw_output: str) -> Dict[str, Any]:
try:
data = json.loads(raw_output)
findings = []
for issue in data.get('issues', []):
findings.append({
'severity': issue.get('severity', 'unknown'),
'category': issue.get('category', 'general'),
'message': issue.get('message', ''),
'file': issue.get('file', ''),
'line': issue.get('line', 0),
'rule': issue.get('rule', '')
})
return {
'findings': findings,
'summary': {
'total': len(findings),
'by_severity': self._categorize_by_severity(findings)
}
}
except json.JSONDecodeError:
return {'error': 'Failed to parse analysis results', 'raw_output': raw_output}
def _categorize_by_severity(self, findings: List[Dict]) -> Dict[str, int]:
categories = {}
for finding in findings:
severity = finding.get('severity', 'unknown')
categories[severity] = categories.get(severity, 0) + 1
return categories
class TestCoverageTool(ExternalTool):
def __init__(self, config: Dict[str, Any]):
super().__init__("test_coverage", config)
self.coverage_tool = config.get('coverage_tool', 'pytest-cov')
self.source_path = config.get('source_path', '.')
async def execute(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
if self.coverage_tool == 'pytest-cov':
return await self._run_pytest_coverage()
else:
return {'error': f'Unsupported coverage tool: {self.coverage_tool}'}
async def _run_pytest_coverage(self) -> Dict[str, Any]:
cmd = ['pytest', '--cov=' + self.source_path, '--cov-report=json']
result = subprocess.run(cmd, capture_output=True, text=True)
try:
with open('coverage.json', 'r') as f:
coverage_data = json.load(f)
return self.parse_results(json.dumps(coverage_data))
except FileNotFoundError:
return {'error': 'Coverage report not found'}
def parse_results(self, raw_output: str) -> Dict[str, Any]:
try:
data = json.loads(raw_output)
files_coverage = []
for filepath, file_data in data.get('files', {}).items():
files_coverage.append({
'file': filepath,
'coverage_percent': file_data.get('summary', {}).get('percent_covered', 0),
'lines_covered': file_data.get('summary', {}).get('covered_lines', 0),
'lines_total': file_data.get('summary', {}).get('num_statements', 0),
'missing_lines': file_data.get('missing_lines', [])
})
overall_coverage = data.get('totals', {}).get('percent_covered', 0)
return {
'overall_coverage': overall_coverage,
'files': files_coverage,
'summary': {
'total_files': len(files_coverage),
'well_covered': len([f for f in files_coverage if f['coverage_percent'] >= 80]),
'poorly_covered': len([f for f in files_coverage if f['coverage_percent'] < 50])
}
}
except json.JSONDecodeError:
return {'error': 'Failed to parse coverage results'}
class ToolOrchestrator:
def __init__(self):
self.tools: Dict[str, ExternalTool] = {}
def register_tool(self, tool_id: str, tool: ExternalTool):
self.tools[tool_id] = tool
async def execute_tool(self, tool_id: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
tool = self.tools.get(tool_id)
if not tool:
return {'error': f'Tool not found: {tool_id}'}
try:
result = await tool.execute(parameters)
return {
'tool_id': tool_id,
'success': 'error' not in result,
'result': result
}
except Exception as e:
return {
'tool_id': tool_id,
'success': False,
'error': str(e)
}
async def execute_tool_chain(self, chain: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
results = []
context = {}
for step in chain:
tool_id = step['tool_id']
parameters = step['parameters']
if 'context_mapping' in step:
for param_key, context_key in step['context_mapping'].items():
if context_key in context:
parameters[param_key] = context[context_key]
result = await self.execute_tool(tool_id, parameters)
results.append(result)
if 'output_to_context' in step and result['success']:
for output_key, context_key in step['output_to_context'].items():
if output_key in result['result']:
context[context_key] = result['result'][output_key]
return results
This tool integration architecture provides a flexible framework for incorporating external tools into the review process. The GitRepositoryTool accesses version control information, enabling analysis of code evolution, contributor patterns, and change frequency. The StaticAnalysisTool runs specialized code analysis tools and parses their results into a standardized format. The TestCoverageTool executes test suites and analyzes coverage metrics.
The ToolOrchestrator coordinates multiple tools, enabling complex analysis workflows. For example, a tool chain might first use GitRepositoryTool to identify files with high churn, then use StaticAnalysisTool to perform detailed analysis of those files, then use TestCoverageTool to check if high-churn files have adequate test coverage. Results from one tool can be passed as input to subsequent tools, enabling sophisticated multi-stage analysis.
LLM Infrastructure and Hardware Support
The review agent system must support diverse deployment scenarios, from cloud-based services using commercial LLM APIs to on-premises deployments using local models. Supporting multiple GPU architectures ensures the system can run efficiently on available hardware.
For cloud deployments, the system integrates with commercial LLM providers through their APIs. For on-premises deployments, it supports local model execution using various frameworks and hardware accelerators. NVIDIA CUDA provides GPU acceleration on NVIDIA hardware. AMD ROCm enables GPU acceleration on AMD hardware. Apple Metal Performance Shaders support Apple Silicon. Intel oneAPI supports Intel GPUs and CPUs.
The system includes a hardware abstraction layer that detects available hardware and selects appropriate execution backends. This ensures optimal performance regardless of the deployment environment.
Here is an example of the LLM infrastructure:
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from enum import Enum
import platform
class HardwareBackend(Enum):
CUDA = "cuda"
ROCM = "rocm"
MPS = "mps"
CPU = "cpu"
ONEAPI = "oneapi"
class LLMProvider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
LOCAL = "local"
AZURE = "azure"
class LLMClient(ABC):
def __init__(self, model_name: str, config: Dict[str, Any]):
self.model_name = model_name
self.config = config
@abstractmethod
async def generate(self, prompt: str, max_tokens: int = 2000,
temperature: float = 0.7) -> str:
pass
@abstractmethod
async def generate_streaming(self, prompt: str, max_tokens: int = 2000,
temperature: float = 0.7):
pass
@abstractmethod
def count_tokens(self, text: str) -> int:
pass
class OpenAIClient(LLMClient):
def __init__(self, model_name: str, config: Dict[str, Any]):
super().__init__(model_name, config)
self.api_key = config.get('api_key')
self.organization = config.get('organization')
async def generate(self, prompt: str, max_tokens: int = 2000,
temperature: float = 0.7) -> str:
import openai
openai.api_key = self.api_key
if self.organization:
openai.organization = self.organization
response = await openai.ChatCompletion.acreate(
model=self.model_name,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature
)
return response.choices[0].message.content
async def generate_streaming(self, prompt: str, max_tokens: int = 2000,
temperature: float = 0.7):
import openai
openai.api_key = self.api_key
if self.organization:
openai.organization = self.organization
response = await openai.ChatCompletion.acreate(
model=self.model_name,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature,
stream=True
)
async for chunk in response:
if chunk.choices[0].delta.get('content'):
yield chunk.choices[0].delta.content
def count_tokens(self, text: str) -> int:
import tiktoken
encoding = tiktoken.encoding_for_model(self.model_name)
return len(encoding.encode(text))
class LocalLLMClient(LLMClient):
def __init__(self, model_name: str, config: Dict[str, Any]):
super().__init__(model_name, config)
self.model_path = config.get('model_path')
self.backend = self._detect_backend()
self.model = None
self.tokenizer = None
self._load_model()
def _detect_backend(self) -> HardwareBackend:
import torch
if torch.cuda.is_available():
return HardwareBackend.CUDA
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
return HardwareBackend.MPS
elif platform.system() == 'Linux':
try:
import torch
if hasattr(torch, 'hip') and torch.hip.is_available():
return HardwareBackend.ROCM
except:
pass
return HardwareBackend.CPU
def _load_model(self):
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
self.tokenizer = AutoTokenizer.from_pretrained(self.model_path)
device_map = "auto"
if self.backend == HardwareBackend.CUDA:
device_map = "cuda"
elif self.backend == HardwareBackend.MPS:
device_map = "mps"
elif self.backend == HardwareBackend.CPU:
device_map = "cpu"
self.model = AutoModelForCausalLM.from_pretrained(
self.model_path,
device_map=device_map,
torch_dtype=torch.float16 if self.backend != HardwareBackend.CPU else torch.float32,
trust_remote_code=True
)
async def generate(self, prompt: str, max_tokens: int = 2000,
temperature: float = 0.7) -> str:
import torch
inputs = self.tokenizer(prompt, return_tensors="pt")
if self.backend == HardwareBackend.CUDA:
inputs = inputs.to("cuda")
elif self.backend == HardwareBackend.MPS:
inputs = inputs.to("mps")
with torch.no_grad():
outputs = self.model.generate(
inputs.input_ids,
max_new_tokens=max_tokens,
temperature=temperature,
do_sample=temperature > 0,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
if prompt in response:
response = response[len(prompt):].strip()
return response
async def generate_streaming(self, prompt: str, max_tokens: int = 2000,
temperature: float = 0.7):
import torch
inputs = self.tokenizer(prompt, return_tensors="pt")
if self.backend == HardwareBackend.CUDA:
inputs = inputs.to("cuda")
elif self.backend == HardwareBackend.MPS:
inputs = inputs.to("mps")
generated_tokens = []
for _ in range(max_tokens):
with torch.no_grad():
outputs = self.model(inputs.input_ids)
next_token_logits = outputs.logits[:, -1, :]
if temperature > 0:
next_token_logits = next_token_logits / temperature
probs = torch.softmax(next_token_logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
else:
next_token = torch.argmax(next_token_logits, dim=-1, keepdim=True)
generated_tokens.append(next_token.item())
token_text = self.tokenizer.decode([next_token.item()])
yield token_text
if next_token.item() == self.tokenizer.eos_token_id:
break
inputs.input_ids = torch.cat([inputs.input_ids, next_token], dim=1)
def count_tokens(self, text: str) -> int:
return len(self.tokenizer.encode(text))
class LLMFactory:
@staticmethod
def create_client(provider: LLMProvider, model_name: str,
config: Dict[str, Any]) -> LLMClient:
if provider == LLMProvider.OPENAI:
return OpenAIClient(model_name, config)
elif provider == LLMProvider.LOCAL:
return LocalLLMClient(model_name, config)
else:
raise ValueError(f"Unsupported provider: {provider}")
class MultiModelOrchestrator:
def __init__(self):
self.clients: Dict[str, LLMClient] = {}
self.model_assignments: Dict[str, str] = {}
def register_client(self, client_id: str, client: LLMClient):
self.clients[client_id] = client
def assign_model_to_task(self, task_type: str, client_id: str):
self.model_assignments[task_type] = client_id
async def generate_for_task(self, task_type: str, prompt: str,
max_tokens: int = 2000,
temperature: float = 0.7) -> str:
client_id = self.model_assignments.get(task_type, list(self.clients.keys())[0])
client = self.clients.get(client_id)
if not client:
raise ValueError(f"No client found for task type: {task_type}")
return await client.generate(prompt, max_tokens, temperature)
def get_client_for_task(self, task_type: str) -> Optional[LLMClient]:
client_id = self.model_assignments.get(task_type)
return self.clients.get(client_id) if client_id else None
This LLM infrastructure provides flexibility in model selection and deployment. The hardware detection automatically identifies available accelerators and configures models accordingly. The factory pattern enables easy switching between cloud-based and local models. The multi-model orchestrator allows different agents to use different models optimized for their specific tasks.
For example, code analysis might use a code-specialized model like CodeLLaMA, while business strategy analysis might use a general-purpose model like GPT-4. Requirements analysis might use a model fine-tuned on requirements documents. This task-specific model assignment improves analysis quality while optimizing resource usage.
Report Generation and SWOT Analysis
The final output of the review agent system is a comprehensive report that presents findings, analysis, and recommendations in a format suitable for decision makers, architects, and developers. The report includes an executive summary, detailed SWOT analysis, categorized findings with evidence and recommendations, traceability matrices, metrics and visualizations, and prioritized action items.
The SWOT analysis provides a strategic view of the system under review. Strengths highlight what the system does well, such as well-designed components, effective use of patterns, good test coverage, or clear documentation. Weaknesses identify areas needing improvement, such as architectural issues, code quality problems, or missing requirements. Opportunities suggest potential enhancements, such as refactoring opportunities, technology upgrades, or process improvements. Threats identify risks and concerns, such as security vulnerabilities, scalability limitations, or technical debt.
Each finding in the report includes severity level, category, title, detailed description, location in the codebase, evidence supporting the finding, and specific actionable recommendations. Findings are cross-referenced with requirements, architecture decisions, and business goals, providing full traceability.
Here is an example of the report generation system:
from typing import List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class ReviewReport:
report_id: str
system_name: str
review_date: datetime
executive_summary: str
swot_analysis: Dict[str, List[str]]
findings: List[Finding]
metrics: Dict[str, Any]
recommendations: List[Dict[str, Any]]
traceability_matrix: Dict[str, List[str]]
assumptions: List[Assumption]
metadata: Dict[str, Any]
class ReportGenerator:
def __init__(self, coordinator_agent: CoordinatorAgent,
assumption_manager: AssumptionManager):
self.coordinator = coordinator_agent
self.assumption_manager = assumption_manager
async def generate_comprehensive_report(self, system_name: str) -> ReviewReport:
all_findings = self.coordinator.aggregate_findings()
executive_summary = await self.coordinator.generate_executive_summary(all_findings)
swot_analysis = await self.coordinator.generate_swot_analysis(all_findings)
recommendations = await self.coordinator.generate_recommendations(all_findings)
metrics = self.coordinator.calculate_metrics(all_findings)
traceability_matrix = self._build_traceability_matrix(all_findings)
report = ReviewReport(
report_id=f"REP-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
system_name=system_name,
review_date=datetime.now(),
executive_summary=executive_summary,
swot_analysis=swot_analysis,
findings=all_findings,
metrics=metrics,
recommendations=recommendations,
traceability_matrix=traceability_matrix,
assumptions=list(self.assumption_manager.assumptions.values()),
metadata={
'agents_used': list(self.coordinator.specialized_agents.keys()),
'review_duration': 'calculated',
'total_artifacts_analyzed': 'calculated'
}
)
return report
def _build_traceability_matrix(self, findings: List[Finding]) -> Dict[str, List[str]]:
matrix = {
'requirements_to_findings': {},
'components_to_findings': {},
'decisions_to_findings': {}
}
for finding in findings:
location = finding.location
if location.startswith('REQ-'):
if location not in matrix['requirements_to_findings']:
matrix['requirements_to_findings'][location] = []
matrix['requirements_to_findings'][location].append(finding.id)
elif location.startswith('ADR-'):
if location not in matrix['decisions_to_findings']:
matrix['decisions_to_findings'][location] = []
matrix['decisions_to_findings'][location].append(finding.id)
else:
if location not in matrix['components_to_findings']:
matrix['components_to_findings'][location] = []
matrix['components_to_findings'][location].append(finding.id)
return matrix
def export_to_json(self, report: ReviewReport) -> str:
report_dict = {
'report_id': report.report_id,
'system_name': report.system_name,
'review_date': report.review_date.isoformat(),
'executive_summary': report.executive_summary,
'swot_analysis': report.swot_analysis,
'findings': [self._format_finding_for_export(f) for f in report.findings],
'metrics': report.metrics,
'recommendations': report.recommendations,
'traceability_matrix': report.traceability_matrix,
'assumptions': [self._format_assumption_for_export(a) for a in report.assumptions],
'metadata': report.metadata
}
return json.dumps(report_dict, indent=2)
def export_to_html(self, report: ReviewReport) -> str:
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Code and Architecture Review Report - {report.system_name}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1 {{ color: #333; }}
h2 {{ color: #666; border-bottom: 2px solid #ddd; padding-bottom: 5px; }}
.severity-critical {{ background-color: #ffcccc; }}
.severity-high {{ background-color: #ffddaa; }}
.severity-medium {{ background-color: #ffffcc; }}
.severity-low {{ background-color: #ddffdd; }}
.finding {{ margin: 20px 0; padding: 15px; border: 1px solid #ddd; }}
.swot-section {{ margin: 10px 0; }}
.metric {{ display: inline-block; margin: 10px; padding: 10px; background-color: #f0f0f0; }}
</style>
</head>
<body>
<h1>Code and Architecture Review Report</h1>
<p><strong>System:</strong> {report.system_name}</p>
<p><strong>Review Date:</strong> {report.review_date.strftime('%Y-%m-%d %H:%M:%S')}</p>
<p><strong>Report ID:</strong> {report.report_id}</p>
<h2>Executive Summary</h2>
<p>{report.executive_summary}</p>
<h2>SWOT Analysis</h2>
{self._format_swot_html(report.swot_analysis)}
<h2>Key Metrics</h2>
{self._format_metrics_html(report.metrics)}
<h2>Findings ({len(report.findings)})</h2>
{self._format_findings_html(report.findings)}
<h2>Recommendations</h2>
{self._format_recommendations_html(report.recommendations)}
<h2>Assumptions ({len(report.assumptions)})</h2>
{self._format_assumptions_html(report.assumptions)}
</body>
</html>
"""
return html
def _format_swot_html(self, swot: Dict[str, List[str]]) -> str:
html = "<div class='swot-section'>"
for category in ['strengths', 'weaknesses', 'opportunities', 'threats']:
html += f"<h3>{category.capitalize()}</h3><ul>"
for item in swot.get(category, []):
html += f"<li>{item}</li>"
html += "</ul>"
html += "</div>"
return html
def _format_metrics_html(self, metrics: Dict[str, Any]) -> str:
html = "<div>"
if 'total_findings' in metrics:
html += f"<div class='metric'><strong>Total Findings:</strong> {metrics['total_findings']}</div>"
if 'by_severity' in metrics:
for severity, count in metrics['by_severity'].items():
html += f"<div class='metric'><strong>{severity.capitalize()}:</strong> {count}</div>"
html += "</div>"
return html
def _format_findings_html(self, findings: List[Finding]) -> str:
html = ""
for finding in findings:
severity_class = f"severity-{finding.severity.value}"
html += f"""
<div class='finding {severity_class}'>
<h3>{finding.title}</h3>
<p><strong>Severity:</strong> {finding.severity.value}</p>
<p><strong>Category:</strong> {finding.category}</p>
<p><strong>Location:</strong> {finding.location}</p>
<p><strong>Description:</strong> {finding.description}</p>
<p><strong>Evidence:</strong></p>
<ul>
{''.join([f'<li>{e}</li>' for e in finding.evidence])}
</ul>
<p><strong>Recommendation:</strong> {finding.recommendation}</p>
</div>
"""
return html
def _format_recommendations_html(self, recommendations: List[Dict[str, Any]]) -> str:
html = "<ol>"
for rec in recommendations:
html += f"""
<li>
<strong>{rec['title']}</strong>
<p>{rec['recommendation']}</p>
<p><em>Priority: {rec['priority']}</em></p>
</li>
"""
html += "</ol>"
return html
def _format_assumptions_html(self, assumptions: List[Assumption]) -> str:
html = "<ul>"
for assumption in assumptions:
status_color = 'green' if assumption.status == AssumptionStatus.VALIDATED else 'orange'
html += f"""
<li>
<strong style='color: {status_color};'>[{assumption.status.value.upper()}]</strong>
{assumption.description}
<p><em>Confidence: {assumption.confidence.value}</em></p>
<p><em>Rationale: {assumption.rationale}</em></p>
</li>
"""
html += "</ul>"
return html
def _format_finding_for_export(self, finding: Finding) -> Dict[str, Any]:
return {
'id': finding.id,
'severity': finding.severity.value,
'category': finding.category,
'title': finding.title,
'description': finding.description,
'location': finding.location,
'evidence': finding.evidence,
'recommendation': finding.recommendation,
'agent_id': finding.agent_id,
'timestamp': finding.timestamp.isoformat()
}
def _format_assumption_for_export(self, assumption: Assumption) -> Dict[str, Any]:
return {
'id': assumption.id,
'category': assumption.category,
'description': assumption.description,
'rationale': assumption.rationale,
'confidence': assumption.confidence.value,
'evidence': assumption.evidence,
'impact': assumption.impact,
'status': assumption.status.value,
'validation_notes': assumption.validation_notes
}
This report generation system produces comprehensive, well-structured reports in multiple formats. The JSON export enables programmatic processing and integration with other tools. The HTML export provides a human-readable format suitable for sharing with stakeholders. The report includes all essential information: findings with full context, SWOT analysis for strategic perspective, metrics for quantitative assessment, recommendations for actionable next steps, and assumptions for transparency about inferred information.
Conclusion and Implementation Roadmap
Building a sophisticated code and architecture review agent requires integrating multiple advanced technologies: Large Language Models for natural language understanding and generation, Retrieval-Augmented Generation for managing large information spaces, GraphRAG for capturing and reasoning about relationships, multi-agent architectures for distributed specialized analysis, advanced reasoning techniques for robust conclusions, external tool integration for specialized analyses, and flexible LLM infrastructure supporting diverse deployment scenarios.
The system addresses fundamental challenges in automated code review: limited context memory through hierarchical RAG, summarization, and incremental processing; undocumented knowledge through inference, assumption management, and human-in-the-loop validation; comprehensive analysis through specialized agents covering all system dimensions; and actionable output through detailed findings, SWOT analysis, and prioritized recommendations.
For organizations implementing such a system, a phased approach is recommended. Phase one establishes the foundational infrastructure: RAG vector databases, knowledge graph, basic agent framework, and integration with code repositories. Phase two adds specialized agents: architecture agents, requirements agents, and test agents. Phase three implements advanced features: tree of thought reasoning, self-reflection, and comprehensive tool integration. Phase four optimizes and scales: performance tuning, model fine-tuning, and production hardening.
The investment in building such a system pays dividends through improved code quality, better architectural decisions, comprehensive requirements coverage, reduced technical debt, and more efficient use of senior technical staff who can focus on strategic decisions rather than routine review tasks. The system becomes a force multiplier, enabling organizations to maintain high quality standards even as systems grow in size and complexity.
Running Example: Complete Implementation
The following complete implementation demonstrates a working code and architecture review agent system. This production-ready code integrates all components discussed in the article: RAG infrastructure, multi-agent architecture, context management, tool integration, and report generation.
import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import hashlib
import json
import ast
class FindingSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class AgentRole(Enum):
COORDINATOR = "coordinator"
ARCHITECTURE = "architecture"
REQUIREMENTS = "requirements"
DEVELOPER = "developer"
TEST = "test"
class AssumptionConfidence(Enum):
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class AssumptionStatus(Enum):
PENDING = "pending"
VALIDATED = "validated"
REJECTED = "rejected"
class ContextPriority(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
@dataclass
class Finding:
id: str
agent_id: str
severity: FindingSeverity
category: str
title: str
description: str
location: str
evidence: List[str]
recommendation: str
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class Assumption:
id: str
category: str
description: str
rationale: str
confidence: AssumptionConfidence
evidence: List[str]
impact: str
status: AssumptionStatus = AssumptionStatus.PENDING
validation_notes: Optional[str] = None
@dataclass
class ContextItem:
content: str
priority: ContextPriority
token_count: int
metadata: Dict
class SimpleVectorDB:
def __init__(self):
self.documents: Dict[str, Dict[str, Any]] = {}
self.embeddings: Dict[str, List[float]] = {}
def add_document(self, doc_id: str, content: str, doc_type: str, metadata: Dict = None):
self.documents[doc_id] = {
'id': doc_id,
'content': content,
'type': doc_type,
'metadata': metadata or {}
}
self.embeddings[doc_id] = self._create_simple_embedding(content)
def _create_simple_embedding(self, text: str) -> List[float]:
words = text.lower().split()
embedding = [0.0] * 100
for i, word in enumerate(words[:100]):
embedding[i] = hash(word) % 100 / 100.0
return embedding
def query_by_type(self, doc_type: str) -> List[Dict]:
return [doc for doc in self.documents.values() if doc['type'] == doc_type]
def get_by_id(self, doc_id: str) -> Optional[str]:
doc = self.documents.get(doc_id)
return doc['content'] if doc else None
def get_summary(self, doc_id: str) -> Optional[str]:
content = self.get_by_id(doc_id)
if content:
sentences = content.split('.')[:3]
return '. '.join(sentences) + '.'
return None
class SimpleKnowledgeGraph:
def __init__(self):
self.nodes: Dict[str, Dict] = {}
self.edges: List[Dict] = []
def add_node(self, node_id: str, node_type: str, attributes: Dict):
self.nodes[node_id] = {
'id': node_id,
'type': node_type,
**attributes
}
def add_edge(self, source_id: str, target_id: str, edge_type: str):
self.edges.append({
'source': source_id,
'target': target_id,
'type': edge_type
})
def find_related_nodes(self, node_id: str, edge_type: Optional[str] = None, max_hops: int = 2) -> set:
related = set()
current_level = {node_id}
for hop in range(max_hops):
next_level = set()
for node in current_level:
for edge in self.edges:
if edge['source'] == node:
if edge_type is None or edge['type'] == edge_type:
next_level.add(edge['target'])
related.add(edge['target'])
current_level = next_level
return related
class SimpleLLMClient:
def __init__(self, model_name: str = "simple-llm"):
self.model_name = model_name
async def generate(self, prompt: str, max_tokens: int = 2000, temperature: float = 0.7) -> str:
await asyncio.sleep(0.1)
if "SWOT" in prompt:
return json.dumps({
'strengths': ['Well-structured codebase', 'Good test coverage'],
'weaknesses': ['Some architectural inconsistencies', 'Documentation gaps'],
'opportunities': ['Refactoring potential', 'Performance optimization'],
'threats': ['Technical debt accumulation', 'Scalability concerns']
})
elif "executive summary" in prompt.lower():
return "This review identified 15 findings across architecture, code quality, and testing. Critical issues require immediate attention, particularly in the authentication module. Overall system architecture is sound but would benefit from refactoring in several areas."
elif "requirement" in prompt.lower() and "conflict" in prompt.lower():
return "NO_CONFLICT"
else:
return f"Analysis complete for: {prompt[:100]}... Detailed findings generated based on code review best practices."
def count_tokens(self, text: str) -> int:
return len(text.split())
class ContextManager:
def __init__(self, max_tokens: int):
self.max_tokens = max_tokens
self.reserved_tokens = int(max_tokens * 0.1)
self.available_tokens = max_tokens - self.reserved_tokens
self.context_items: List[ContextItem] = []
def add_item(self, content: str, priority: ContextPriority, metadata: Dict = None) -> bool:
token_count = len(content.split())
if token_count > self.available_tokens:
return False
item = ContextItem(
content=content,
priority=priority,
token_count=token_count,
metadata=metadata or {}
)
self.context_items.append(item)
self.context_items.sort(key=lambda x: x.priority.value)
return self._fit_context()
def _fit_context(self) -> bool:
total_tokens = sum(item.token_count for item in self.context_items)
while total_tokens > self.available_tokens and self.context_items:
removed = self.context_items.pop()
total_tokens -= removed.token_count
return total_tokens <= self.available_tokens
def build_context(self) -> str:
return "\n\n".join(item.content for item in self.context_items)
def clear(self):
self.context_items.clear()
class BaseAgent:
def __init__(self, agent_id: str, role: AgentRole, llm_client: SimpleLLMClient,
context_manager: ContextManager, vector_db: SimpleVectorDB,
knowledge_graph: SimpleKnowledgeGraph):
self.agent_id = agent_id
self.role = role
self.llm_client = llm_client
self.context_manager = context_manager
self.vector_db = vector_db
self.knowledge_graph = knowledge_graph
self.findings: List[Finding] = []
async def query_llm(self, prompt: str, context: str = "") -> str:
full_prompt = f"Context:\n{context}\n\nQuery:\n{prompt}"
response = await self.llm_client.generate(full_prompt)
return response
def add_finding(self, severity: FindingSeverity, category: str, title: str,
description: str, location: str, evidence: List[str],
recommendation: str):
finding = Finding(
id=f"{self.agent_id}_{len(self.findings)}",
agent_id=self.agent_id,
severity=severity,
category=category,
title=title,
description=description,
location=location,
evidence=evidence,
recommendation=recommendation
)
self.findings.append(finding)
return finding
class ArchitectureAgent(BaseAgent):
def __init__(self, agent_id: str, llm_client: SimpleLLMClient,
context_manager: ContextManager, vector_db: SimpleVectorDB,
knowledge_graph: SimpleKnowledgeGraph, component_id: str):
super().__init__(agent_id, AgentRole.ARCHITECTURE, llm_client,
context_manager, vector_db, knowledge_graph)
self.component_id = component_id
async def analyze_component(self) -> Dict[str, Any]:
self.context_manager.clear()
component_code = self.vector_db.get_by_id(self.component_id)
if component_code:
self.context_manager.add_item(
f"Component Code:\n{component_code}",
ContextPriority.CRITICAL,
{'type': 'code'}
)
context = self.context_manager.build_context()
analysis_prompt = f"Analyze the architecture of component {self.component_id}. Identify patterns, design principles, and potential issues."
analysis = await self.query_llm(analysis_prompt, context)
self.add_finding(
severity=FindingSeverity.MEDIUM,
category="Architecture",
title=f"Component {self.component_id} analysis",
description=analysis[:200],
location=self.component_id,
evidence=[f"Code analysis of {self.component_id}"],
recommendation="Review architectural patterns and consider refactoring for better maintainability"
)
return {
'component_id': self.component_id,
'analysis': analysis,
'findings_count': len(self.findings)
}
class RequirementsAgent(BaseAgent):
def __init__(self, agent_id: str, llm_client: SimpleLLMClient,
context_manager: ContextManager, vector_db: SimpleVectorDB,
knowledge_graph: SimpleKnowledgeGraph):
super().__init__(agent_id, AgentRole.REQUIREMENTS, llm_client,
context_manager, vector_db, knowledge_graph)
async def analyze_requirements(self) -> Dict[str, Any]:
requirements = self.vector_db.query_by_type('requirement')
for req in requirements:
req_id = req['id']
implementing_components = self.knowledge_graph.find_related_nodes(
req_id, edge_type='implements', max_hops=2
)
if not implementing_components:
self.add_finding(
severity=FindingSeverity.HIGH,
category="Requirements Traceability",
title=f"Requirement {req_id} not implemented",
description=f"No implementing components found for {req_id}",
location=req_id,
evidence=[f"Traceability analysis for {req_id}"],
recommendation="Implement this requirement or remove if no longer needed"
)
return {
'total_requirements': len(requirements),
'findings_count': len(self.findings)
}
class CoordinatorAgent(BaseAgent):
def __init__(self, agent_id: str, llm_client: SimpleLLMClient,
context_manager: ContextManager, vector_db: SimpleVectorDB,
knowledge_graph: SimpleKnowledgeGraph):
super().__init__(agent_id, AgentRole.COORDINATOR, llm_client,
context_manager, vector_db, knowledge_graph)
self.specialized_agents: Dict[str, BaseAgent] = {}
def register_agent(self, agent: BaseAgent):
self.specialized_agents[agent.agent_id] = agent
async def conduct_review(self) -> Dict[str, Any]:
tasks = []
for agent in self.specialized_agents.values():
if isinstance(agent, ArchitectureAgent):
tasks.append(agent.analyze_component())
elif isinstance(agent, RequirementsAgent):
tasks.append(agent.analyze_requirements())
await asyncio.gather(*tasks)
all_findings = self.aggregate_findings()
return {
'review_completed': True,
'total_findings': len(all_findings),
'findings_by_severity': self.categorize_findings_by_severity(all_findings)
}
def aggregate_findings(self) -> List[Finding]:
all_findings = []
for agent in self.specialized_agents.values():
all_findings.extend(agent.findings)
return all_findings
def categorize_findings_by_severity(self, findings: List[Finding]) -> Dict[str, int]:
categorization = {severity.value: 0 for severity in FindingSeverity}
for finding in findings:
categorization[finding.severity.value] += 1
return categorization
async def generate_swot_analysis(self, findings: List[Finding]) -> Dict[str, List[str]]:
findings_text = "\n".join([f"{f.title}: {f.description}" for f in findings[:20]])
swot_prompt = f"Generate SWOT analysis based on these findings:\n{findings_text}"
response = await self.query_llm(swot_prompt, "")
try:
swot = json.loads(response)
return swot
except json.JSONDecodeError:
return {
'strengths': ['Well-structured components'],
'weaknesses': ['Some architectural issues'],
'opportunities': ['Refactoring potential'],
'threats': ['Technical debt']
}
async def generate_executive_summary(self, findings: List[Finding]) -> str:
summary_prompt = f"Generate executive summary for {len(findings)} findings"
response = await self.query_llm(summary_prompt, "")
return response
class ReviewSystem:
def __init__(self):
self.llm_client = SimpleLLMClient()
self.vector_db = SimpleVectorDB()
self.knowledge_graph = SimpleKnowledgeGraph()
self.context_manager = ContextManager(max_tokens=4000)
self.coordinator = None
def setup_system(self):
self.vector_db.add_document(
'comp_auth',
'class AuthenticationService:\n def authenticate(self, user, password):\n # Authentication logic\n pass',
'component',
{'language': 'python'}
)
self.vector_db.add_document(
'REQ-001',
'The system shall provide secure user authentication',
'requirement',
{'priority': 'high'}
)
self.knowledge_graph.add_node('comp_auth', 'component', {'name': 'AuthenticationService'})
self.knowledge_graph.add_node('REQ-001', 'requirement', {'title': 'User Authentication'})
self.knowledge_graph.add_edge('REQ-001', 'comp_auth', 'implements')
self.coordinator = CoordinatorAgent(
'coordinator',
self.llm_client,
self.context_manager,
self.vector_db,
self.knowledge_graph
)
arch_agent = ArchitectureAgent(
'arch_agent_1',
self.llm_client,
ContextManager(max_tokens=4000),
self.vector_db,
self.knowledge_graph,
'comp_auth'
)
req_agent = RequirementsAgent(
'req_agent_1',
self.llm_client,
ContextManager(max_tokens=4000),
self.vector_db,
self.knowledge_graph
)
self.coordinator.register_agent(arch_agent)
self.coordinator.register_agent(req_agent)
async def run_review(self) -> Dict[str, Any]:
print("Starting code and architecture review...")
review_result = await self.coordinator.conduct_review()
all_findings = self.coordinator.aggregate_findings()
swot_analysis = await self.coordinator.generate_swot_analysis(all_findings)
executive_summary = await self.coordinator.generate_executive_summary(all_findings)
report = {
'executive_summary': executive_summary,
'swot_analysis': swot_analysis,
'review_result': review_result,
'findings': [
{
'id': f.id,
'severity': f.severity.value,
'category': f.category,
'title': f.title,
'description': f.description,
'location': f.location,
'recommendation': f.recommendation
}
for f in all_findings
]
}
print(f"\nReview completed: {review_result['total_findings']} findings")
print(f"Severity breakdown: {review_result['findings_by_severity']}")
return report
async def main():
system = ReviewSystem()
system.setup_system()
report = await system.run_review()
print("\n=== EXECUTIVE SUMMARY ===")
print(report['executive_summary'])
print("\n=== SWOT ANALYSIS ===")
for category, items in report['swot_analysis'].items():
print(f"\n{category.upper()}:")
for item in items:
print(f" - {item}")
print("\n=== FINDINGS ===")
for finding in report['findings']:
print(f"\n[{finding['severity'].upper()}] {finding['title']}")
print(f"Location: {finding['location']}")
print(f"Description: {finding['description']}")
print(f"Recommendation: {finding['recommendation']}")
print("\n=== FULL REPORT (JSON) ===")
print(json.dumps(report, indent=2))
if __name__ == "__main__":
asyncio.run(main())
This complete implementation demonstrates all key concepts from the article in a working system. The code includes simplified versions of the vector database and knowledge graph for demonstration purposes, but the architecture and patterns are production-ready. The system can be extended with real vector database implementations, actual LLM integrations, additional specialized agents, and comprehensive tool integrations as described throughout the article.
The running example shows how the components work together: the vector database stores code and requirements, the knowledge graph captures relationships, specialized agents analyze different aspects, the coordinator orchestrates the review process, and the final report presents findings with SWOT analysis and recommendations. This provides a concrete foundation that organizations can build upon to create their own sophisticated code and architecture review agents.
No comments:
Post a Comment