Introduction
The landscape of software development is rapidly evolving with the introduction of Agentic AI systems that can autonomously analyze, understand, and improve codebases. Unlike traditional static analysis tools that merely identify issues, an Agentic AI for code refactoring represents a sophisticated system capable of understanding code semantics, proposing meaningful improvements, and executing changes with human oversight. This article explores the architecture, functionality, and implementation details of such a system designed specifically for refactoring code files, directories, and entire repositories.
Understanding Agentic AI in the Context of Code Refactoring
An Agentic AI system for code refactoring differs fundamentally from conventional automated refactoring tools. While traditional tools apply predefined transformation rules, an Agentic AI leverages large language models to understand code context, identify complex patterns, and propose sophisticated improvements that go beyond simple syntactic changes. The system operates as an intelligent agent that can reason about code quality, maintainability, performance implications, and architectural concerns.
The core principle behind such a system lies in its ability to maintain agency throughout the refactoring process. This means the AI can make informed decisions about what changes to propose, how to prioritize them, and when to seek human input. The system maintains a continuous feedback loop with the developer, ensuring that all modifications align with the project's specific requirements and coding standards.
Core Architecture and System Components
The architecture of an Agentic AI refactoring system consists of several interconnected components that work together to provide comprehensive code analysis and improvement capabilities. The foundation of this system rests on a sophisticated analysis engine that can parse and understand code across multiple programming languages and paradigms.
At the heart of the system lies the Code Analysis Engine, which performs deep semantic analysis of source code. This engine goes beyond simple syntax parsing to understand the intent and purpose of code segments. It identifies patterns, anti-patterns, code smells, and potential optimization opportunities. The engine maintains a comprehensive understanding of programming best practices, design patterns, and language-specific idioms.
The Decision Making Component serves as the brain of the system, utilizing the analysis results to formulate refactoring strategies. This component weighs various factors such as code complexity, maintainability impact, performance implications, and risk assessment to prioritize potential improvements. It considers the broader context of the codebase, including dependencies, usage patterns, and architectural constraints.
The User Interface and Interaction Layer provides the mechanism through which developers communicate with the AI system. This component presents analysis results in a clear, actionable format and facilitates the decision-making process regarding which refactorings to apply. It maintains a conversational interface that allows developers to ask questions, request clarifications, and provide feedback on proposed changes.
Implementing an Agentic AI System for Refactoring
Creating an Agentic AI for code refactoring requires implementing several interconnected components that work together to analyze, understand, and improve code. The following sections provide detailed code examples that demonstrate how to build each major component of the system.
Building the Core Analysis Engine
The analysis engine serves as the foundation of the Agentic AI system. This component must parse code, understand its structure, and identify potential improvements. The following example demonstrates how to implement a basic analysis engine that can process Python code and identify common refactoring opportunities.
The analysis engine begins with a code parser that creates an abstract syntax tree representation of the source code. This AST allows the system to understand code structure and identify patterns that might benefit from refactoring. Here is an implementation of the core analysis engine:
import ast
import inspect
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
class IssueType(Enum):
COMPLEXITY = "complexity"
DUPLICATION = "duplication"
NAMING = "naming"
STRUCTURE = "structure"
PERFORMANCE = "performance"
@dataclass
class CodeIssue:
issue_type: IssueType
severity: str
line_number: int
description: str
suggestion: str
confidence: float
class CodeAnalysisEngine:
def __init__(self):
self.issues = []
self.metrics = {}
def analyze_file(self, file_path: str) -> List[CodeIssue]:
"""Analyze a Python file and return identified issues."""
with open(file_path, 'r') as file:
source_code = file.read()
tree = ast.parse(source_code)
self.issues = []
# Perform various analysis passes
self._analyze_complexity(tree)
self._analyze_naming_conventions(tree)
self._analyze_code_structure(tree)
self._analyze_duplication(tree)
return self.issues
def _analyze_complexity(self, tree: ast.AST) -> None:
"""Analyze cyclomatic complexity of functions."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
complexity = self._calculate_complexity(node)
if complexity > 10:
issue = CodeIssue(
issue_type=IssueType.COMPLEXITY,
severity="high",
line_number=node.lineno,
description=f"Function '{node.name}' has high complexity ({complexity})",
suggestion="Consider breaking this function into smaller functions",
confidence=0.9
)
self.issues.append(issue)
def _calculate_complexity(self, node: ast.FunctionDef) -> int:
"""Calculate cyclomatic complexity for a function."""
complexity = 1 # Base complexity
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.With)):
complexity += 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
return complexity
def _analyze_naming_conventions(self, tree: ast.AST) -> None:
"""Check for naming convention violations."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
if not node.name.islower() or '__' in node.name:
if not node.name.startswith('_'):
issue = CodeIssue(
issue_type=IssueType.NAMING,
severity="medium",
line_number=node.lineno,
description=f"Function '{node.name}' doesn't follow snake_case convention",
suggestion="Rename function to use snake_case naming",
confidence=0.8
)
self.issues.append(issue)
def _analyze_code_structure(self, tree: ast.AST) -> None:
"""Analyze code structure for improvement opportunities."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
# Check for deeply nested code
max_depth = self._calculate_nesting_depth(node)
if max_depth > 4:
issue = CodeIssue(
issue_type=IssueType.STRUCTURE,
severity="medium",
line_number=node.lineno,
description=f"Function '{node.name}' has deep nesting (depth: {max_depth})",
suggestion="Consider using early returns or extracting nested logic",
confidence=0.7
)
self.issues.append(issue)
def _calculate_nesting_depth(self, node: ast.AST, current_depth: int = 0) -> int:
"""Calculate maximum nesting depth in a code block."""
max_depth = current_depth
for child in ast.iter_child_nodes(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.With, ast.Try)):
child_depth = self._calculate_nesting_depth(child, current_depth + 1)
max_depth = max(max_depth, child_depth)
return max_depth
def _analyze_duplication(self, tree: ast.AST) -> None:
"""Identify potential code duplication."""
function_bodies = {}
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
body_hash = self._hash_function_body(node)
if body_hash in function_bodies:
issue = CodeIssue(
issue_type=IssueType.DUPLICATION,
severity="medium",
line_number=node.lineno,
description=f"Function '{node.name}' appears to duplicate logic",
suggestion="Consider extracting common logic into a shared function",
confidence=0.6
)
self.issues.append(issue)
else:
function_bodies[body_hash] = node.name
def _hash_function_body(self, node: ast.FunctionDef) -> str:
"""Create a simple hash of function body for duplication detection."""
body_str = ""
for stmt in node.body:
body_str += ast.dump(stmt)
return str(hash(body_str))
This analysis engine demonstrates the fundamental approach to code analysis within an Agentic AI system. The engine parses Python source code into an abstract syntax tree and then performs multiple analysis passes to identify different types of issues. Each analysis method focuses on a specific aspect of code quality, such as complexity, naming conventions, or structural problems.
Implementing the LLM Integration Layer
The Large Language Model integration layer provides the AI capabilities that enable the system to understand code semantics and generate intelligent refactoring suggestions. This component abstracts the interaction with both local and remote LLM services, providing a unified interface for code analysis and generation.
The following implementation shows how to create a flexible LLM integration system that supports both local and remote models:
from abc import ABC, abstractmethod
import json
import requests
from typing import Dict, List, Optional
import openai
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
class LLMProvider(ABC):
"""Abstract base class for LLM providers."""
@abstractmethod
def analyze_code(self, code: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze code and return suggestions."""
pass
@abstractmethod
def generate_refactored_code(self, original_code: str, instructions: str) -> str:
"""Generate refactored code based on instructions."""
pass
class RemoteLLMProvider(LLMProvider):
"""Provider for remote LLM services like OpenAI GPT."""
def __init__(self, api_key: str, model_name: str = "gpt-4"):
self.client = openai.OpenAI(api_key=api_key)
self.model_name = model_name
def analyze_code(self, code: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze code using remote LLM service."""
prompt = self._create_analysis_prompt(code, context)
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "You are an expert code analyzer. Analyze the provided code and suggest improvements."},
{"role": "user", "content": prompt}
],
temperature=0.1,
max_tokens=2000
)
return self._parse_analysis_response(response.choices[0].message.content)
except Exception as e:
return {"error": f"LLM analysis failed: {str(e)}"}
def generate_refactored_code(self, original_code: str, instructions: str) -> str:
"""Generate refactored code using remote LLM."""
prompt = f"""
Original code:
{original_code}
Refactoring instructions:
{instructions}
Please provide the refactored code that implements these improvements while maintaining the same functionality.
"""
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": "You are an expert software engineer. Refactor the provided code according to the given instructions."},
{"role": "user", "content": prompt}
],
temperature=0.1,
max_tokens=3000
)
return self._extract_code_from_response(response.choices[0].message.content)
except Exception as e:
return f"# Error generating refactored code: {str(e)}\n{original_code}"
def _create_analysis_prompt(self, code: str, context: Dict[str, Any]) -> str:
"""Create a detailed prompt for code analysis."""
return f"""
Please analyze the following code for potential improvements:
Code:
{code}
Context:
- Language: {context.get('language', 'Python')}
- Project type: {context.get('project_type', 'General')}
- Performance requirements: {context.get('performance_requirements', 'Standard')}
Please identify:
1. Code smells and anti-patterns
2. Performance optimization opportunities
3. Readability improvements
4. Maintainability enhancements
5. Security considerations
Format your response as JSON with the following structure:
{{
"issues": [
{{
"type": "issue_type",
"severity": "low|medium|high",
"description": "detailed description",
"suggestion": "specific improvement suggestion",
"line_numbers": [1, 2, 3]
}}
],
"overall_assessment": "general code quality assessment"
}}
"""
def _parse_analysis_response(self, response: str) -> Dict[str, Any]:
"""Parse the LLM response into structured data."""
try:
# Extract JSON from response if it's embedded in text
start_idx = response.find('{')
end_idx = response.rfind('}') + 1
if start_idx != -1 and end_idx != 0:
json_str = response[start_idx:end_idx]
return json.loads(json_str)
else:
return {"error": "Could not parse LLM response"}
except json.JSONDecodeError:
return {"error": "Invalid JSON in LLM response", "raw_response": response}
def _extract_code_from_response(self, response: str) -> str:
"""Extract code from LLM response."""
# Look for code blocks marked with triple backticks
lines = response.split('\n')
code_lines = []
in_code_block = False
for line in lines:
if line.strip().startswith('```'):
in_code_block = not in_code_block
continue
if in_code_block:
code_lines.append(line)
if code_lines:
return '\n'.join(code_lines)
else:
return response # Return full response if no code blocks found
class LocalLLMProvider(LLMProvider):
"""Provider for local LLM models."""
def __init__(self, model_path: str, device: str = "cuda" if torch.cuda.is_available() else "cpu"):
self.device = device
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto" if device == "cuda" else None
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
def analyze_code(self, code: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze code using local LLM."""
prompt = f"""
Analyze this code for improvements:
{code}
Identify issues and suggest improvements. Respond in JSON format.
"""
try:
response = self._generate_response(prompt, max_length=1000)
return self._parse_analysis_response(response)
except Exception as e:
return {"error": f"Local LLM analysis failed: {str(e)}"}
def generate_refactored_code(self, original_code: str, instructions: str) -> str:
"""Generate refactored code using local LLM."""
prompt = f"""
Refactor this code according to the instructions:
Original code:
{original_code}
Instructions:
{instructions}
Refactored code:
"""
try:
return self._generate_response(prompt, max_length=2000)
except Exception as e:
return f"# Error: {str(e)}\n{original_code}"
def _generate_response(self, prompt: str, max_length: int = 1000) -> str:
"""Generate response using the local model."""
inputs = self.tokenizer.encode(prompt, return_tensors="pt", truncation=True, max_length=512)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=max_length,
num_return_sequences=1,
temperature=0.1,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Remove the original prompt from the response
response = response[len(prompt):].strip()
return response
def _parse_analysis_response(self, response: str) -> Dict[str, Any]:
"""Parse analysis response from local LLM."""
try:
# Try to extract JSON from the response
start_idx = response.find('{')
end_idx = response.rfind('}') + 1
if start_idx != -1 and end_idx != 0:
json_str = response[start_idx:end_idx]
return json.loads(json_str)
else:
# If no JSON found, create a simple structure
return {
"issues": [],
"overall_assessment": response,
"note": "Could not parse structured response"
}
except json.JSONDecodeError:
return {
"error": "Could not parse JSON response",
"raw_response": response
}
class LLMManager:
"""Manager class for handling different LLM providers."""
def __init__(self, provider: LLMProvider):
self.provider = provider
def switch_provider(self, new_provider: LLMProvider):
"""Switch to a different LLM provider."""
self.provider = new_provider
def analyze_code(self, code: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Analyze code using the current provider."""
if context is None:
context = {}
return self.provider.analyze_code(code, context)
def generate_refactored_code(self, original_code: str, instructions: str) -> str:
"""Generate refactored code using the current provider."""
return self.provider.generate_refactored_code(original_code, instructions)
```
This LLM integration layer provides a flexible foundation for incorporating AI capabilities into the refactoring system. The abstract base class allows for easy switching between different LLM providers, while the concrete implementations handle the specifics of remote and local model interaction.
Creating the Decision Making and User Interaction Component
The decision-making component orchestrates the interaction between the analysis engine, LLM provider, and user interface. This component presents findings to the user, collects feedback, and coordinates the application of approved refactorings.
Here is an implementation of the decision-making and user interaction system:
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict
import json
from enum import Enum
class RefactoringAction(Enum):
EXTRACT_METHOD = "extract_method"
RENAME_VARIABLE = "rename_variable"
SIMPLIFY_CONDITIONAL = "simplify_conditional"
REDUCE_COMPLEXITY = "reduce_complexity"
OPTIMIZE_PERFORMANCE = "optimize_performance"
@dataclass
class RefactoringSuggestion:
action: RefactoringAction
description: str
original_code: str
refactored_code: str
confidence: float
impact_assessment: str
line_range: Tuple[int, int]
benefits: List[str]
risks: List[str]
class RefactoringDecisionEngine:
"""Engine for making refactoring decisions and managing user interaction."""
def __init__(self, analysis_engine: CodeAnalysisEngine, llm_manager: LLMManager):
self.analysis_engine = analysis_engine
self.llm_manager = llm_manager
self.suggestions = []
self.approved_suggestions = []
def analyze_and_suggest(self, file_path: str, code_content: str = None) -> List[RefactoringSuggestion]:
"""Analyze code and generate refactoring suggestions."""
if code_content is None:
with open(file_path, 'r') as file:
code_content = file.read()
# Perform static analysis
issues = self.analysis_engine.analyze_file(file_path)
# Get LLM insights
context = {
'language': 'Python',
'file_path': file_path,
'static_issues': [asdict(issue) for issue in issues]
}
llm_analysis = self.llm_manager.analyze_code(code_content, context)
# Generate refactoring suggestions
self.suggestions = self._create_refactoring_suggestions(
code_content, issues, llm_analysis
)
return self.suggestions
def _create_refactoring_suggestions(
self,
code: str,
static_issues: List[CodeIssue],
llm_analysis: Dict[str, Any]
) -> List[RefactoringSuggestion]:
"""Create detailed refactoring suggestions from analysis results."""
suggestions = []
code_lines = code.split('\n')
# Process static analysis issues
for issue in static_issues:
suggestion = self._create_suggestion_from_issue(issue, code, code_lines)
if suggestion:
suggestions.append(suggestion)
# Process LLM suggestions
if 'issues' in llm_analysis:
for llm_issue in llm_analysis['issues']:
suggestion = self._create_suggestion_from_llm_issue(llm_issue, code, code_lines)
if suggestion:
suggestions.append(suggestion)
return suggestions
def _create_suggestion_from_issue(
self,
issue: CodeIssue,
code: str,
code_lines: List[str]
) -> Optional[RefactoringSuggestion]:
"""Create a refactoring suggestion from a static analysis issue."""
if issue.issue_type == IssueType.COMPLEXITY:
return self._create_complexity_reduction_suggestion(issue, code, code_lines)
elif issue.issue_type == IssueType.NAMING:
return self._create_naming_suggestion(issue, code, code_lines)
elif issue.issue_type == IssueType.STRUCTURE:
return self._create_structure_improvement_suggestion(issue, code, code_lines)
return None
def _create_complexity_reduction_suggestion(
self,
issue: CodeIssue,
code: str,
code_lines: List[str]
) -> RefactoringSuggestion:
"""Create a suggestion for reducing code complexity."""
# Extract the problematic function
function_code = self._extract_function_at_line(code_lines, issue.line_number)
# Generate refactored version using LLM
instructions = f"""
Reduce the complexity of this function by:
1. Extracting helper methods for complex logic
2. Using early returns to reduce nesting
3. Simplifying conditional expressions
4. Breaking down large functions into smaller, focused functions
Maintain the same functionality while improving readability and maintainability.
"""
refactored_code = self.llm_manager.generate_refactored_code(function_code, instructions)
return RefactoringSuggestion(
action=RefactoringAction.REDUCE_COMPLEXITY,
description=f"Reduce complexity in function starting at line {issue.line_number}",
original_code=function_code,
refactored_code=refactored_code,
confidence=issue.confidence,
impact_assessment="Medium - Improves maintainability and testability",
line_range=(issue.line_number, issue.line_number + function_code.count('\n')),
benefits=[
"Improved readability",
"Easier testing",
"Better maintainability",
"Reduced cognitive load"
],
risks=[
"Potential introduction of bugs if not properly tested",
"May require updates to existing tests"
]
)
def _create_naming_suggestion(
self,
issue: CodeIssue,
code: str,
code_lines: List[str]
) -> RefactoringSuggestion:
"""Create a suggestion for improving naming conventions."""
line_content = code_lines[issue.line_number - 1] if issue.line_number <= len(code_lines) else ""
# Generate better name using LLM
instructions = f"""
Improve the naming in this code line to follow Python naming conventions:
{line_content}
Provide a more descriptive and convention-compliant name.
"""
refactored_line = self.llm_manager.generate_refactored_code(line_content, instructions)
return RefactoringSuggestion(
action=RefactoringAction.RENAME_VARIABLE,
description=f"Improve naming convention at line {issue.line_number}",
original_code=line_content,
refactored_code=refactored_line,
confidence=issue.confidence,
impact_assessment="Low - Simple naming improvement",
line_range=(issue.line_number, issue.line_number),
benefits=[
"Better code readability",
"Follows Python conventions",
"Improved code consistency"
],
risks=[
"Minimal risk - simple rename operation"
]
)
def _create_structure_improvement_suggestion(
self,
issue: CodeIssue,
code: str,
code_lines: List[str]
) -> RefactoringSuggestion:
"""Create a suggestion for improving code structure."""
function_code = self._extract_function_at_line(code_lines, issue.line_number)
instructions = f"""
Improve the structure of this function by:
1. Reducing nesting levels using early returns
2. Extracting complex conditional logic into helper functions
3. Using guard clauses where appropriate
4. Improving the overall flow and readability
Keep the same functionality while making the code more readable.
"""
refactored_code = self.llm_manager.generate_refactored_code(function_code, instructions)
return RefactoringSuggestion(
action=RefactoringAction.SIMPLIFY_CONDITIONAL,
description=f"Improve code structure starting at line {issue.line_number}",
original_code=function_code,
refactored_code=refactored_code,
confidence=issue.confidence,
impact_assessment="Medium - Improves code readability and flow",
line_range=(issue.line_number, issue.line_number + function_code.count('\n')),
benefits=[
"Reduced nesting complexity",
"Improved readability",
"Better code flow",
"Easier to understand logic"
],
risks=[
"May require careful testing of conditional logic",
"Potential for logic errors if not properly validated"
]
)
def _extract_function_at_line(self, code_lines: List[str], line_number: int) -> str:
"""Extract the complete function that starts at or contains the given line."""
# Simple implementation - in practice, this would use AST analysis
start_line = line_number - 1
# Find the start of the function
while start_line > 0 and not code_lines[start_line].strip().startswith('def '):
start_line -= 1
# Find the end of the function
end_line = start_line + 1
base_indent = len(code_lines[start_line]) - len(code_lines[start_line].lstrip())
while end_line < len(code_lines):
line = code_lines[end_line]
if line.strip() == "":
end_line += 1
continue
current_indent = len(line) - len(line.lstrip())
if current_indent <= base_indent and line.strip():
break
end_line += 1
return '\n'.join(code_lines[start_line:end_line])
def _create_suggestion_from_llm_issue(
self,
llm_issue: Dict[str, Any],
code: str,
code_lines: List[str]
) -> Optional[RefactoringSuggestion]:
"""Create a refactoring suggestion from an LLM-identified issue."""
issue_type = llm_issue.get('type', 'general')
line_numbers = llm_issue.get('line_numbers', [1])
start_line = min(line_numbers) if line_numbers else 1
# Extract relevant code section
relevant_code = self._extract_code_section(code_lines, line_numbers)
# Generate refactored version
instructions = f"""
Address this issue: {llm_issue.get('description', '')}
Suggestion: {llm_issue.get('suggestion', '')}
Please provide improved code that addresses these concerns.
"""
refactored_code = self.llm_manager.generate_refactored_code(relevant_code, instructions)
return RefactoringSuggestion(
action=RefactoringAction.OPTIMIZE_PERFORMANCE, # Default action
description=llm_issue.get('description', 'LLM-identified improvement'),
original_code=relevant_code,
refactored_code=refactored_code,
confidence=0.7, # Default confidence for LLM suggestions
impact_assessment=f"Severity: {llm_issue.get('severity', 'medium')}",
line_range=(start_line, max(line_numbers) if line_numbers else start_line),
benefits=[llm_issue.get('suggestion', 'General improvement')],
risks=["Requires careful review and testing"]
)
def _extract_code_section(self, code_lines: List[str], line_numbers: List[int]) -> str:
"""Extract a section of code based on line numbers."""
if not line_numbers:
return ""
start_line = max(0, min(line_numbers) - 1)
end_line = min(len(code_lines), max(line_numbers))
return '\n'.join(code_lines[start_line:end_line])
def present_suggestions_to_user(self) -> List[RefactoringSuggestion]:
"""Present suggestions to user and collect their decisions."""
print("=== Code Refactoring Suggestions ===\n")
approved_suggestions = []
for i, suggestion in enumerate(self.suggestions, 1):
print(f"Suggestion {i}: {suggestion.description}")
print(f"Action: {suggestion.action.value}")
print(f"Confidence: {suggestion.confidence:.1%}")
print(f"Impact: {suggestion.impact_assessment}")
print(f"Lines: {suggestion.line_range[0]}-{suggestion.line_range[1]}")
print("\nBenefits:")
for benefit in suggestion.benefits:
print(f" + {benefit}")
print("\nRisks:")
for risk in suggestion.risks:
print(f" - {risk}")
print(f"\nOriginal code:")
print(f"```\n{suggestion.original_code}\n```")
print(f"\nRefactored code:")
print(f"```\n{suggestion.refactored_code}\n```")
while True:
choice = input(f"\nApply this refactoring? (y/n/s for show details): ").lower().strip()
if choice in ['y', 'yes']:
approved_suggestions.append(suggestion)
print("✓ Refactoring approved")
break
elif choice in ['n', 'no']:
print("✗ Refactoring declined")
break
elif choice in ['s', 'show']:
self._show_detailed_comparison(suggestion)
else:
print("Please enter 'y' for yes, 'n' for no, or 's' to show details")
print("\n" + "="*60 + "\n")
self.approved_suggestions = approved_suggestions
return approved_suggestions
def _show_detailed_comparison(self, suggestion: RefactoringSuggestion):
"""Show a detailed side-by-side comparison of original and refactored code."""
print("\n--- DETAILED COMPARISON ---")
original_lines = suggestion.original_code.split('\n')
refactored_lines = suggestion.refactored_code.split('\n')
max_lines = max(len(original_lines), len(refactored_lines))
print(f"{'ORIGINAL':<40} | {'REFACTORED':<40}")
print("-" * 83)
for i in range(max_lines):
orig_line = original_lines[i] if i < len(original_lines) else ""
refact_line = refactored_lines[i] if i < len(refactored_lines) else ""
print(f"{orig_line:<40} | {refact_line:<40}")
print("-" * 83)
This decision-making engine provides a comprehensive interface for presenting refactoring suggestions to users and collecting their feedback. The system presents detailed information about each suggestion, including benefits, risks, and confidence levels, allowing users to make informed decisions about which refactorings to apply.
The implementation continues with the backup system and code application components.
Implementing the Backup and Rollback System
The backup and rollback system ensures that all refactoring operations can be safely reversed if needed. This component creates comprehensive backups before any modifications and provides mechanisms for restoring previous states of the codebase.
Here is a detailed implementation of the backup and rollback system:
import os
import shutil
import json
import hashlib
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import git
from git import Repo, InvalidGitRepositoryError
@dataclass
class BackupMetadata:
backup_id: str
timestamp: datetime
original_files: List[str]
backup_location: str
operation_description: str
file_checksums: Dict[str, str]
git_commit_hash: Optional[str] = None
git_branch: Optional[str] = None
class BackupManager:
"""Manages backup and rollback operations for code refactoring."""
def __init__(self, project_root: str, backup_root: str = None):
self.project_root = Path(project_root).resolve()
self.backup_root = Path(backup_root or self.project_root / ".refactoring_backups").resolve()
self.backup_root.mkdir(exist_ok=True)
self.metadata_file = self.backup_root / "backup_metadata.json"
self.backups = self._load_backup_metadata()
# Try to initialize git repository information
self.git_repo = self._initialize_git_repo()
def _initialize_git_repo(self) -> Optional[Repo]:
"""Initialize git repository if available."""
try:
return Repo(self.project_root, search_parent_directories=True)
except InvalidGitRepositoryError:
return None
def _load_backup_metadata(self) -> Dict[str, BackupMetadata]:
"""Load existing backup metadata from disk."""
if not self.metadata_file.exists():
return {}
try:
with open(self.metadata_file, 'r') as f:
data = json.load(f)
backups = {}
for backup_id, metadata in data.items():
metadata['timestamp'] = datetime.fromisoformat(metadata['timestamp'])
backups[backup_id] = BackupMetadata(**metadata)
return backups
except (json.JSONDecodeError, KeyError, ValueError) as e:
print(f"Warning: Could not load backup metadata: {e}")
return {}
def _save_backup_metadata(self):
"""Save backup metadata to disk."""
data = {}
for backup_id, metadata in self.backups.items():
metadata_dict = asdict(metadata)
metadata_dict['timestamp'] = metadata.timestamp.isoformat()
data[backup_id] = metadata_dict
with open(self.metadata_file, 'w') as f:
json.dump(data, f, indent=2)
def create_backup(self, files_to_backup: List[str], operation_description: str) -> str:
"""Create a backup of specified files before refactoring."""
backup_id = self._generate_backup_id(operation_description)
backup_dir = self.backup_root / backup_id
backup_dir.mkdir(exist_ok=True)
file_checksums = {}
backed_up_files = []
print(f"Creating backup {backup_id}...")
for file_path in files_to_backup:
source_path = Path(file_path)
if not source_path.is_absolute():
source_path = self.project_root / source_path
if not source_path.exists():
print(f"Warning: File {source_path} does not exist, skipping backup")
continue
# Calculate file checksum
checksum = self._calculate_file_checksum(source_path)
file_checksums[str(source_path.relative_to(self.project_root))] = checksum
# Create backup directory structure
relative_path = source_path.relative_to(self.project_root)
backup_file_path = backup_dir / relative_path
backup_file_path.parent.mkdir(parents=True, exist_ok=True)
# Copy file to backup location
shutil.copy2(source_path, backup_file_path)
backed_up_files.append(str(relative_path))
print(f" Backed up: {relative_path}")
# Capture git information if available
git_commit_hash = None
git_branch = None
if self.git_repo:
try:
git_commit_hash = self.git_repo.head.commit.hexsha
git_branch = self.git_repo.active_branch.name
except Exception as e:
print(f"Warning: Could not capture git information: {e}")
# Create backup metadata
metadata = BackupMetadata(
backup_id=backup_id,
timestamp=datetime.now(),
original_files=backed_up_files,
backup_location=str(backup_dir),
operation_description=operation_description,
file_checksums=file_checksums,
git_commit_hash=git_commit_hash,
git_branch=git_branch
)
self.backups[backup_id] = metadata
self._save_backup_metadata()
print(f"Backup {backup_id} created successfully")
return backup_id
def restore_backup(self, backup_id: str, files_to_restore: List[str] = None) -> bool:
"""Restore files from a specific backup."""
if backup_id not in self.backups:
print(f"Error: Backup {backup_id} not found")
return False
metadata = self.backups[backup_id]
backup_dir = Path(metadata.backup_location)
if not backup_dir.exists():
print(f"Error: Backup directory {backup_dir} does not exist")
return False
files_to_restore = files_to_restore or metadata.original_files
print(f"Restoring backup {backup_id}...")
restored_files = []
for relative_file_path in files_to_restore:
if relative_file_path not in metadata.original_files:
print(f"Warning: File {relative_file_path} was not in the original backup")
continue
backup_file_path = backup_dir / relative_file_path
target_file_path = self.project_root / relative_file_path
if not backup_file_path.exists():
print(f"Error: Backup file {backup_file_path} does not exist")
continue
# Ensure target directory exists
target_file_path.parent.mkdir(parents=True, exist_ok=True)
# Restore the file
shutil.copy2(backup_file_path, target_file_path)
restored_files.append(relative_file_path)
print(f" Restored: {relative_file_path}")
print(f"Restored {len(restored_files)} files from backup {backup_id}")
return True
def list_backups(self) -> List[BackupMetadata]:
"""List all available backups."""
return sorted(self.backups.values(), key=lambda x: x.timestamp, reverse=True)
def delete_backup(self, backup_id: str) -> bool:
"""Delete a specific backup."""
if backup_id not in self.backups:
print(f"Error: Backup {backup_id} not found")
return False
metadata = self.backups[backup_id]
backup_dir = Path(metadata.backup_location)
if backup_dir.exists():
shutil.rmtree(backup_dir)
del self.backups[backup_id]
self._save_backup_metadata()
print(f"Backup {backup_id} deleted successfully")
return True
def verify_backup_integrity(self, backup_id: str) -> bool:
"""Verify the integrity of a backup by checking file checksums."""
if backup_id not in self.backups:
print(f"Error: Backup {backup_id} not found")
return False
metadata = self.backups[backup_id]
backup_dir = Path(metadata.backup_location)
print(f"Verifying backup {backup_id}...")
all_files_valid = True
for relative_file_path, expected_checksum in metadata.file_checksums.items():
backup_file_path = backup_dir / relative_file_path
if not backup_file_path.exists():
print(f"Error: Backup file {backup_file_path} is missing")
all_files_valid = False
continue
actual_checksum = self._calculate_file_checksum(backup_file_path)
if actual_checksum != expected_checksum:
print(f"Error: Checksum mismatch for {relative_file_path}")
all_files_valid = False
else:
print(f" ✓ {relative_file_path}")
if all_files_valid:
print(f"Backup {backup_id} integrity verified successfully")
else:
print(f"Backup {backup_id} has integrity issues")
return all_files_valid
def cleanup_old_backups(self, max_backups: int = 10, max_age_days: int = 30):
"""Clean up old backups based on count and age limits."""
backups_by_date = sorted(self.backups.items(),
key=lambda x: x[1].timestamp, reverse=True)
cutoff_date = datetime.now() - timedelta(days=max_age_days)
backups_to_delete = []
# Mark backups for deletion based on count limit
if len(backups_by_date) > max_backups:
for backup_id, metadata in backups_by_date[max_backups:]:
backups_to_delete.append(backup_id)
# Mark backups for deletion based on age limit
for backup_id, metadata in backups_by_date:
if metadata.timestamp < cutoff_date and backup_id not in backups_to_delete:
backups_to_delete.append(backup_id)
# Delete marked backups
for backup_id in backups_to_delete:
print(f"Cleaning up old backup: {backup_id}")
self.delete_backup(backup_id)
if backups_to_delete:
print(f"Cleaned up {len(backups_to_delete)} old backups")
else:
print("No old backups to clean up")
def _generate_backup_id(self, operation_description: str) -> str:
"""Generate a unique backup ID."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
description_hash = hashlib.md5(operation_description.encode()).hexdigest()[:8]
return f"backup_{timestamp}_{description_hash}"
def _calculate_file_checksum(self, file_path: Path) -> str:
"""Calculate SHA-256 checksum of a file."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
class GitIntegratedBackupManager(BackupManager):
"""Extended backup manager with enhanced git integration."""
def create_git_backup_branch(self, operation_description: str) -> Optional[str]:
"""Create a git branch as a backup before refactoring."""
if not self.git_repo:
print("Warning: No git repository found, cannot create git backup branch")
return None
try:
# Ensure we're on a clean working directory
if self.git_repo.is_dirty():
print("Warning: Working directory has uncommitted changes")
response = input("Commit changes before creating backup branch? (y/n): ")
if response.lower() in ['y', 'yes']:
self.git_repo.git.add(A=True)
self.git_repo.git.commit(m="Pre-refactoring commit")
else:
print("Proceeding with dirty working directory")
# Create backup branch
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
branch_name = f"refactoring_backup_{timestamp}"
backup_branch = self.git_repo.create_head(branch_name)
print(f"Created git backup branch: {branch_name}")
return branch_name
except Exception as e:
print(f"Error creating git backup branch: {e}")
return None
def restore_from_git_branch(self, branch_name: str) -> bool:
"""Restore code from a git backup branch."""
if not self.git_repo:
print("Error: No git repository found")
return False
try:
# Check if branch exists
if branch_name not in [branch.name for branch in self.git_repo.branches]:
print(f"Error: Branch {branch_name} does not exist")
return False
# Warn about uncommitted changes
if self.git_repo.is_dirty():
print("Warning: You have uncommitted changes that will be lost")
response = input("Continue with restore? (y/n): ")
if response.lower() not in ['y', 'yes']:
return False
# Switch to backup branch
self.git_repo.git.checkout(branch_name)
print(f"Restored code from git branch: {branch_name}")
return True
except Exception as e:
print(f"Error restoring from git branch: {e}")
return False
Implementing the Code Application and Generation System
The code application system is responsible for taking approved refactoring suggestions and applying them to the actual source files. This component must handle the complexities of modifying code while maintaining file integrity and proper error handling.
Here is the implementation of the code application system:
import ast
import re
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
import difflib
from dataclasses import dataclass
@dataclass
class CodeModification:
file_path: str
start_line: int
end_line: int
original_content: str
new_content: str
modification_type: str
class CodeApplicationEngine:
"""Engine for applying refactoring changes to source code files."""
def __init__(self, backup_manager: BackupManager):
self.backup_manager = backup_manager
self.applied_modifications = []
def apply_refactoring_suggestions(
self,
suggestions: List[RefactoringSuggestion],
create_backup: bool = True
) -> Dict[str, Any]:
"""Apply a list of refactoring suggestions to the codebase."""
if not suggestions:
return {"success": True, "message": "No suggestions to apply"}
# Group suggestions by file
suggestions_by_file = self._group_suggestions_by_file(suggestions)
# Create backup if requested
backup_id = None
if create_backup:
files_to_backup = list(suggestions_by_file.keys())
backup_id = self.backup_manager.create_backup(
files_to_backup,
f"Refactoring operation with {len(suggestions)} suggestions"
)
results = {
"success": True,
"backup_id": backup_id,
"applied_suggestions": [],
"failed_suggestions": [],
"modifications": []
}
try:
# Apply suggestions file by file
for file_path, file_suggestions in suggestions_by_file.items():
file_results = self._apply_suggestions_to_file(file_path, file_suggestions)
results["applied_suggestions"].extend(file_results["applied"])
results["failed_suggestions"].extend(file_results["failed"])
results["modifications"].extend(file_results["modifications"])
if file_results["failed"]:
results["success"] = False
except Exception as e:
results["success"] = False
results["error"] = str(e)
# Restore backup if something went wrong
if backup_id:
print(f"Error occurred, restoring backup {backup_id}")
self.backup_manager.restore_backup(backup_id)
return results
def _group_suggestions_by_file(
self,
suggestions: List[RefactoringSuggestion]
) -> Dict[str, List[RefactoringSuggestion]]:
"""Group refactoring suggestions by the files they affect."""
suggestions_by_file = {}
for suggestion in suggestions:
# For this implementation, we assume each suggestion affects one file
# In practice, you might need to analyze the suggestion to determine affected files
file_path = self._determine_file_from_suggestion(suggestion)
if file_path not in suggestions_by_file:
suggestions_by_file[file_path] = []
suggestions_by_file[file_path].append(suggestion)
return suggestions_by_file
def _determine_file_from_suggestion(self, suggestion: RefactoringSuggestion) -> str:
"""Determine which file a suggestion affects."""
# This is a simplified implementation
# In practice, you would need more sophisticated logic to determine the target file
# For now, we assume the suggestion contains file information or we use a default
return getattr(suggestion, 'file_path', 'current_file.py')
def _apply_suggestions_to_file(
self,
file_path: str,
suggestions: List[RefactoringSuggestion]
) -> Dict[str, List]:
"""Apply multiple suggestions to a single file."""
# Read the current file content
try:
with open(file_path, 'r', encoding='utf-8') as f:
original_content = f.read()
except FileNotFoundError:
return {
"applied": [],
"failed": suggestions,
"modifications": [],
"error": f"File {file_path} not found"
}
# Sort suggestions by line number (reverse order to avoid line number shifts)
sorted_suggestions = sorted(
suggestions,
key=lambda s: s.line_range[0],
reverse=True
)
current_content = original_content
applied_suggestions = []
failed_suggestions = []
modifications = []
for suggestion in sorted_suggestions:
try:
modification_result = self._apply_single_suggestion(
current_content, suggestion, file_path
)
if modification_result["success"]:
current_content = modification_result["new_content"]
applied_suggestions.append(suggestion)
modifications.append(modification_result["modification"])
print(f"✓ Applied: {suggestion.description}")
else:
failed_suggestions.append(suggestion)
print(f"✗ Failed: {suggestion.description} - {modification_result['error']}")
except Exception as e:
failed_suggestions.append(suggestion)
print(f"✗ Error applying {suggestion.description}: {str(e)}")
# Write the modified content back to the file
if applied_suggestions:
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(current_content)
print(f"Updated file: {file_path}")
except Exception as e:
return {
"applied": [],
"failed": suggestions,
"modifications": [],
"error": f"Failed to write to {file_path}: {str(e)}"
}
return {
"applied": applied_suggestions,
"failed": failed_suggestions,
"modifications": modifications
}
def _apply_single_suggestion(
self,
content: str,
suggestion: RefactoringSuggestion,
file_path: str
) -> Dict[str, Any]:
"""Apply a single refactoring suggestion to file content."""
lines = content.split('\n')
start_line = suggestion.line_range[0] - 1 # Convert to 0-based indexing
end_line = suggestion.line_range[1] - 1
# Validate line ranges
if start_line < 0 or end_line >= len(lines) or start_line > end_line:
return {
"success": False,
"error": f"Invalid line range: {suggestion.line_range}"
}
# Extract the original code section
original_section = '\n'.join(lines[start_line:end_line + 1])
# Verify that the original code matches what we expect
if not self._code_sections_match(original_section, suggestion.original_code):
return {
"success": False,
"error": "Original code doesn't match current file content"
}
# Apply the refactoring
if suggestion.action == RefactoringAction.EXTRACT_METHOD:
new_content = self._apply_extract_method(content, suggestion)
elif suggestion.action == RefactoringAction.RENAME_VARIABLE:
new_content = self._apply_rename_variable(content, suggestion)
elif suggestion.action == RefactoringAction.SIMPLIFY_CONDITIONAL:
new_content = self._apply_simplify_conditional(content, suggestion)
elif suggestion.action == RefactoringAction.REDUCE_COMPLEXITY:
new_content = self._apply_reduce_complexity(content, suggestion)
else:
# Generic replacement
new_content = self._apply_generic_replacement(content, suggestion)
# Validate the new content
if not self._validate_syntax(new_content, file_path):
return {
"success": False,
"error": "Refactored code has syntax errors"
}
modification = CodeModification(
file_path=file_path,
start_line=start_line + 1,
end_line=end_line + 1,
original_content=original_section,
new_content=suggestion.refactored_code,
modification_type=suggestion.action.value
)
return {
"success": True,
"new_content": new_content,
"modification": modification
}
def _code_sections_match(self, section1: str, section2: str) -> bool:
"""Check if two code sections are functionally equivalent."""
# Normalize whitespace and compare
normalized1 = re.sub(r'\s+', ' ', section1.strip())
normalized2 = re.sub(r'\s+', ' ', section2.strip())
# Allow for some flexibility in matching
similarity = difflib.SequenceMatcher(None, normalized1, normalized2).ratio()
return similarity > 0.8 # 80% similarity threshold
def _apply_generic_replacement(
self,
content: str,
suggestion: RefactoringSuggestion
) -> str:
"""Apply a generic code replacement."""
lines = content.split('\n')
start_line = suggestion.line_range[0] - 1
end_line = suggestion.line_range[1] - 1
# Replace the specified lines with the refactored code
refactored_lines = suggestion.refactored_code.split('\n')
new_lines = lines[:start_line] + refactored_lines + lines[end_line + 1:]
return '\n'.join(new_lines)
def _apply_extract_method(
self,
content: str,
suggestion: RefactoringSuggestion
) -> str:
"""Apply method extraction refactoring."""
# This is a simplified implementation
# In practice, you would need sophisticated AST manipulation
return self._apply_generic_replacement(content, suggestion)
def _apply_rename_variable(
self,
content: str,
suggestion: RefactoringSuggestion
) -> str:
"""Apply variable renaming refactoring."""
# Extract old and new variable names from the suggestion
# This would require more sophisticated parsing in practice
return self._apply_generic_replacement(content, suggestion)
def _apply_simplify_conditional(
self,
content: str,
suggestion: RefactoringSuggestion
) -> str:
"""Apply conditional simplification refactoring."""
return self._apply_generic_replacement(content, suggestion)
def _apply_reduce_complexity(
self,
content: str,
suggestion: RefactoringSuggestion
) -> str:
"""Apply complexity reduction refactoring."""
return self._apply_generic_replacement(content, suggestion)
def _validate_syntax(self, content: str, file_path: str) -> bool:
"""Validate that the refactored code has correct syntax."""
try:
# For Python files, try to parse as AST
if file_path.endswith('.py'):
ast.parse(content)
return True
except SyntaxError:
return False
except Exception:
# For other file types, assume valid for now
return True
def generate_diff_report(self, modifications: List[CodeModification]) -> str:
"""Generate a detailed diff report of all modifications."""
report_lines = ["=== REFACTORING DIFF REPORT ===\n"]
for modification in modifications:
report_lines.append(f"File: {modification.file_path}")
report_lines.append(f"Lines: {modification.start_line}-{modification.end_line}")
report_lines.append(f"Type: {modification.modification_type}")
report_lines.append("")
# Generate unified diff
original_lines = modification.original_content.split('\n')
new_lines = modification.new_content.split('\n')
diff = difflib.unified_diff(
original_lines,
new_lines,
fromfile=f"{modification.file_path} (original)",
tofile=f"{modification.file_path} (refactored)",
lineterm=""
)
report_lines.extend(diff)
report_lines.append("\n" + "="*60 + "\n")
return '\n'.join(report_lines)
Creating the Main Agentic AI Orchestrator
The main orchestrator brings all components together into a cohesive system that can be easily used by developers. This component provides the primary interface for the Agentic AI refactoring system.
Here is the implementation of the main orchestrator:
from typing import List, Dict, Any, Optional
import argparse
import sys
from pathlib import Path
class AgenticRefactoringAI:
"""Main orchestrator for the Agentic AI refactoring system."""
def __init__(
self,
project_root: str,
llm_provider_type: str = "remote",
llm_config: Dict[str, Any] = None
):
self.project_root = Path(project_root).resolve()
self.llm_config = llm_config or {}
# Initialize components
self.analysis_engine = CodeAnalysisEngine()
self.backup_manager = BackupManager(str(self.project_root))
self.llm_manager = self._initialize_llm_manager(llm_provider_type)
self.decision_engine = RefactoringDecisionEngine(
self.analysis_engine,
self.llm_manager
)
self.application_engine = CodeApplicationEngine(self.backup_manager)
print(f"Agentic Refactoring AI initialized for project: {self.project_root}")
def _initialize_llm_manager(self, provider_type: str) -> LLMManager:
"""Initialize the appropriate LLM manager based on configuration."""
if provider_type == "remote":
api_key = self.llm_config.get("api_key")
model_name = self.llm_config.get("model_name", "gpt-4")
if not api_key:
raise ValueError("API key required for remote LLM provider")
provider = RemoteLLMProvider(api_key, model_name)
elif provider_type == "local":
model_path = self.llm_config.get("model_path")
device = self.llm_config.get("device", "auto")
if not model_path:
raise ValueError("Model path required for local LLM provider")
provider = LocalLLMProvider(model_path, device)
else:
raise ValueError(f"Unknown LLM provider type: {provider_type}")
return LLMManager(provider)
def analyze_file(self, file_path: str, interactive: bool = True) -> Dict[str, Any]:
"""Analyze a single file and optionally apply refactorings interactively."""
file_path = self._resolve_file_path(file_path)
print(f"Analyzing file: {file_path}")
try:
# Generate refactoring suggestions
suggestions = self.decision_engine.analyze_and_suggest(str(file_path))
if not suggestions:
return {
"success": True,
"message": "No refactoring suggestions found",
"suggestions": []
}
print(f"Found {len(suggestions)} refactoring suggestions")
if interactive:
# Present suggestions to user for approval
approved_suggestions = self.decision_engine.present_suggestions_to_user()
if approved_suggestions:
# Apply approved refactorings
application_results = self.application_engine.apply_refactoring_suggestions(
approved_suggestions
)
return {
"success": application_results["success"],
"suggestions": suggestions,
"approved_suggestions": approved_suggestions,
"application_results": application_results
}
else:
return {
"success": True,
"message": "No refactorings were approved",
"suggestions": suggestions
}
else:
return {
"success": True,
"suggestions": suggestions
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def analyze_directory(
self,
directory_path: str = None,
file_patterns: List[str] = None,
interactive: bool = True
) -> Dict[str, Any]:
"""Analyze all files in a directory matching specified patterns."""
if directory_path is None:
directory_path = self.project_root
else:
directory_path = self._resolve_file_path(directory_path)
if file_patterns is None:
file_patterns = ["*.py"] # Default to Python files
print(f"Analyzing directory: {directory_path}")
print(f"File patterns: {file_patterns}")
# Find matching files
matching_files = []
for pattern in file_patterns:
matching_files.extend(directory_path.glob(f"**/{pattern}"))
if not matching_files:
return {
"success": True,
"message": "No matching files found",
"files_analyzed": []
}
print(f"Found {len(matching_files)} files to analyze")
all_suggestions = []
file_results = {}
# Analyze each file
for file_path in matching_files:
print(f"\n--- Analyzing {file_path.relative_to(self.project_root)} ---")
try:
suggestions = self.decision_engine.analyze_and_suggest(str(file_path))
file_results[str(file_path)] = {
"suggestions": suggestions,
"success": True
}
all_suggestions.extend(suggestions)
print(f"Found {len(suggestions)} suggestions for {file_path.name}")
except Exception as e:
file_results[str(file_path)] = {
"suggestions": [],
"success": False,
"error": str(e)
}
print(f"Error analyzing {file_path.name}: {e}")
if not all_suggestions:
return {
"success": True,
"message": "No refactoring suggestions found",
"file_results": file_results
}
print(f"\nTotal suggestions found: {len(all_suggestions)}")
if interactive:
# Present all suggestions to user
self.decision_engine.suggestions = all_suggestions
approved_suggestions = self.decision_engine.present_suggestions_to_user()
if approved_suggestions:
# Apply approved refactorings
application_results = self.application_engine.apply_refactoring_suggestions(
approved_suggestions
)
return {
"success": application_results["success"],
"file_results": file_results,
"approved_suggestions": approved_suggestions,
"application_results": application_results
}
else:
return {
"success": True,
"message": "No refactorings were approved",
"file_results": file_results
}
else:
return {
"success": True,
"file_results": file_results,
"total_suggestions": len(all_suggestions)
}
def restore_backup(self, backup_id: str = None) -> bool:
"""Restore from a backup."""
if backup_id is None:
# Show available backups and let user choose
backups = self.backup_manager.list_backups()
if not backups:
print("No backups available")
return False
print("Available backups:")
for i, backup in enumerate(backups):
print(f"{i + 1}. {backup.backup_id} - {backup.timestamp} - {backup.operation_description}")
try:
choice = int(input("Select backup to restore (number): ")) - 1
if 0 <= choice < len(backups):
backup_id = backups[choice].backup_id
else:
print("Invalid selection")
return False
except ValueError:
print("Invalid input")
return False
return self.backup_manager.restore_backup(backup_id)
def list_backups(self):
"""List all available backups."""
backups = self.backup_manager.list_backups()
if not backups:
print("No backups available")
return
print("Available backups:")
for backup in backups:
print(f"ID: {backup.backup_id}")
print(f"Date: {backup.timestamp}")
print(f"Description: {backup.operation_description}")
print(f"Files: {len(backup.original_files)}")
if backup.git_commit_hash:
print(f"Git commit: {backup.git_commit_hash[:8]}")
print("-" * 40)
def _resolve_file_path(self, file_path: str) -> Path:
"""Resolve a file path relative to the project root."""
path = Path(file_path)
if not path.is_absolute():
path = self.project_root / path
return path.resolve()
def main():
"""Command-line interface for the Agentic Refactoring AI."""
parser = argparse.ArgumentParser(description="Agentic AI Code Refactoring Tool")
parser.add_argument("project_root", help="Root directory of the project to analyze")
parser.add_argument("--file", help="Specific file to analyze")
parser.add_argument("--directory", help="Directory to analyze (default: project root)")
parser.add_argument("--patterns", nargs="+", default=["*.py"],
help="File patterns to match (default: *.py)")
parser.add_argument("--llm-type", choices=["remote", "local"], default="remote",
help="Type of LLM provider to use")
parser.add_argument("--api-key", help="API key for remote LLM provider")
parser.add_argument("--model-path", help="Path to local LLM model")
parser.add_argument("--model-name", default="gpt-4", help="Name of remote model to use")
parser.add_argument("--non-interactive", action="store_true",
help="Run in non-interactive mode (analysis only)")
parser.add_argument("--list-backups", action="store_true",
help="List available backups")
parser.add_argument("--restore-backup", help="Restore from specific backup ID")
args = parser.parse_args()
# Prepare LLM configuration
llm_config = {}
if args.llm_type == "remote":
if not args.api_key:
print("Error: API key required for remote LLM provider")
sys.exit(1)
llm_config = {
"api_key": args.api_key,
"model_name": args.model_name
}
elif args.llm_type == "local":
if not args.model_path:
print("Error: Model path required for local LLM provider")
sys.exit(1)
llm_config = {
"model_path": args.model_path
}
try:
# Initialize the AI system
ai = AgenticRefactoringAI(
args.project_root,
args.llm_type,
llm_config
)
# Handle different commands
if args.list_backups:
ai.list_backups()
elif args.restore_backup:
success = ai.restore_backup(args.restore_backup)
if success:
print("Backup restored successfully")
else:
print("Failed to restore backup")
elif args.file:
# Analyze specific file
result = ai.analyze_file(args.file, not args.non_interactive)
if result["success"]:
print("File analysis completed successfully")
else:
print(f"File analysis failed: {result.get('error', 'Unknown error')}")
else:
# Analyze directory
result = ai.analyze_directory(
args.directory,
args.patterns,
not args.non_interactive
)
if result["success"]:
print("Directory analysis completed successfully")
else:
print(f"Directory analysis failed: {result.get('error', 'Unknown error')}")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Configuration and Usage Examples
The Agentic AI system can be configured and used in various ways depending on the specific requirements of the development team and project. The following examples demonstrate different usage scenarios and configuration options.
For basic usage with a remote LLM provider, developers can initialize the system with minimal configuration. The system requires an API key for services like OpenAI and can be configured to use different models based on performance and cost requirements. Here is an example of basic initialization and usage:
# Basic usage example with remote LLM
api_key = "your-openai-api-key"
project_path = "/path/to/your/project"
# Initialize the AI system
ai = AgenticRefactoringAI(
project_root=project_path,
llm_provider_type="remote",
llm_config={
"api_key": api_key,
"model_name": "gpt-4"
}
)
# Analyze a specific file
result = ai.analyze_file("src/main.py")
# Analyze entire project
result = ai.analyze_directory(
file_patterns=["*.py", "*.js"],
interactive=True
)
For organizations that prefer to keep their code private and have the computational resources available, the system can be configured to use local LLM models. This approach provides complete data privacy and eliminates dependencies on external services:
# Local LLM usage example
ai = AgenticRefactoringAI(
project_root="/path/to/project",
llm_provider_type="local",
llm_config={
"model_path": "/path/to/local/model",
"device": "cuda" # or "cpu"
}
)
# The usage remains the same regardless of LLM provider
result = ai.analyze_file("complex_module.py")
The system provides extensive configuration options for customizing the analysis and refactoring behavior. Teams can adjust the sensitivity of different analysis components, specify which types of refactorings to prioritize, and configure backup retention policies:
# Advanced configuration example
class CustomAgenticAI(AgenticRefactoringAI):
def __init__(self, project_root: str, **kwargs):
super().__init__(project_root, **kwargs)
# Customize analysis engine settings
self.analysis_engine.complexity_threshold = 8
self.analysis_engine.enable_performance_analysis = True
self.analysis_engine.enable_security_analysis = True
# Configure backup retention
self.backup_manager.max_backups = 20
self.backup_manager.max_age_days = 60
# Set refactoring preferences
self.decision_engine.preferred_patterns = [
"extract_method",
"simplify_conditional",
"reduce_complexity"
]
# Usage with custom configuration
custom_ai = CustomAgenticAI(
project_root="/path/to/project",
llm_provider_type="remote",
llm_config={"api_key": "your-key"}
)
Benefits and Limitations of the Agentic AI Approach
The Agentic AI approach to code refactoring offers significant advantages over traditional static analysis tools and manual refactoring processes. The system's ability to understand code semantics and context allows it to propose sophisticated improvements that go beyond simple pattern matching. The AI can identify complex architectural issues, suggest design pattern implementations, and propose optimizations that consider the broader codebase context.
The interactive nature of the system ensures that developers maintain control over the refactoring process while benefiting from AI insights. The comprehensive backup and rollback mechanisms provide safety nets that encourage experimentation with AI suggestions without fear of irreversible changes. The system's ability to work with both remote and local LLM providers offers flexibility in deployment scenarios, accommodating different security and privacy requirements.
However, the system also has important limitations that users must understand. The quality of refactoring suggestions depends heavily on the underlying LLM's training data and capabilities. The system may occasionally propose changes that, while syntactically correct, do not align with project-specific requirements or architectural decisions. The AI's understanding of business logic and domain-specific constraints may be limited, requiring careful human review of all suggestions.
The computational requirements for local LLM deployment can be substantial, particularly for larger models that provide better code understanding capabilities. Remote LLM usage introduces dependencies on external services and potential latency issues that may affect the user experience. The system's effectiveness may vary significantly across different programming languages and paradigms, with better performance typically observed for more commonly used languages like Python and JavaScript.
Future Considerations and Enhancements
The field of AI-assisted code refactoring continues to evolve rapidly, with several promising directions for future enhancement. Integration with advanced static analysis tools could provide the AI system with deeper insights into code quality metrics, dependency relationships, and architectural patterns. Machine learning techniques could be employed to learn from developer feedback and improve suggestion quality over time.
Enhanced integration with development environments and continuous integration pipelines could make the refactoring process more seamless and automated. The system could potentially be extended to support automated testing of refactored code, ensuring that changes maintain functional correctness while improving code quality.
The development of specialized models trained specifically on code refactoring tasks could significantly improve the quality and relevance of suggestions. These models could be fine-tuned on specific programming languages, frameworks, or domain-specific codebases to provide more targeted and accurate recommendations.
As the technology matures, we can expect to see more sophisticated understanding of code semantics, better integration with version control systems, and enhanced collaboration features that allow teams to share refactoring insights and best practices. The ultimate goal is to create AI systems that can serve as intelligent coding partners, helping developers maintain high-quality codebases while reducing the manual effort required for code improvement tasks.
The Agentic AI approach represents a significant step forward in automated code improvement, offering a balance between AI capabilities and human oversight that can significantly enhance software development productivity and code quality. As these systems continue to evolve, they will likely become essential tools in the modern software development toolkit, helping teams manage the growing complexity of software systems while maintaining high standards of code quality and maintainability.
No comments:
Post a Comment