Tuesday, October 28, 2025

Agentic AI for Intelligent Code Refactoring: A Smart Guide




Introduction

The landscape of software development is rapidly evolving with the introduction of Agentic AI systems that can autonomously analyze, understand, and improve codebases. Unlike traditional static analysis tools that merely identify issues, an Agentic AI for code refactoring represents a sophisticated system capable of understanding code semantics, proposing meaningful improvements, and executing changes with human oversight. This article explores the architecture, functionality, and implementation details of such a system designed specifically for refactoring code files, directories, and entire repositories.


Understanding Agentic AI in the Context of Code Refactoring

An Agentic AI system for code refactoring differs fundamentally from conventional automated refactoring tools. While traditional tools apply predefined transformation rules, an Agentic AI leverages large language models to understand code context, identify complex patterns, and propose sophisticated improvements that go beyond simple syntactic changes. The system operates as an intelligent agent that can reason about code quality, maintainability, performance implications, and architectural concerns.

The core principle behind such a system lies in its ability to maintain agency throughout the refactoring process. This means the AI can make informed decisions about what changes to propose, how to prioritize them, and when to seek human input. The system maintains a continuous feedback loop with the developer, ensuring that all modifications align with the project's specific requirements and coding standards.


Core Architecture and System Components

The architecture of an Agentic AI refactoring system consists of several interconnected components that work together to provide comprehensive code analysis and improvement capabilities. The foundation of this system rests on a sophisticated analysis engine that can parse and understand code across multiple programming languages and paradigms.

At the heart of the system lies the Code Analysis Engine, which performs deep semantic analysis of source code. This engine goes beyond simple syntax parsing to understand the intent and purpose of code segments. It identifies patterns, anti-patterns, code smells, and potential optimization opportunities. The engine maintains a comprehensive understanding of programming best practices, design patterns, and language-specific idioms.

The Decision Making Component serves as the brain of the system, utilizing the analysis results to formulate refactoring strategies. This component weighs various factors such as code complexity, maintainability impact, performance implications, and risk assessment to prioritize potential improvements. It considers the broader context of the codebase, including dependencies, usage patterns, and architectural constraints.

The User Interface and Interaction Layer provides the mechanism through which developers communicate with the AI system. This component presents analysis results in a clear, actionable format and facilitates the decision-making process regarding which refactorings to apply. It maintains a conversational interface that allows developers to ask questions, request clarifications, and provide feedback on proposed changes.


Implementing an  Agentic AI System for Refactoring

Creating an Agentic AI for code refactoring requires implementing several interconnected components that work together to analyze, understand, and improve code. The following sections provide detailed code examples that demonstrate how to build each major component of the system.

Building the Core Analysis Engine

The analysis engine serves as the foundation of the Agentic AI system. This component must parse code, understand its structure, and identify potential improvements. The following example demonstrates how to implement a basic analysis engine that can process Python code and identify common refactoring opportunities.

The analysis engine begins with a code parser that creates an abstract syntax tree representation of the source code. This AST allows the system to understand code structure and identify patterns that might benefit from refactoring. Here is an implementation of the core analysis engine:


import ast

import inspect

from typing import List, Dict, Any, Optional

from dataclasses import dataclass

from enum import Enum


class IssueType(Enum):

    COMPLEXITY = "complexity"

    DUPLICATION = "duplication"

    NAMING = "naming"

    STRUCTURE = "structure"

    PERFORMANCE = "performance"


@dataclass

class CodeIssue:

    issue_type: IssueType

    severity: str

    line_number: int

    description: str

    suggestion: str

    confidence: float


class CodeAnalysisEngine:

    def __init__(self):

        self.issues = []

        self.metrics = {}

        

    def analyze_file(self, file_path: str) -> List[CodeIssue]:

        """Analyze a Python file and return identified issues."""

        with open(file_path, 'r') as file:

            source_code = file.read()

        

        tree = ast.parse(source_code)

        self.issues = []

        

        # Perform various analysis passes

        self._analyze_complexity(tree)

        self._analyze_naming_conventions(tree)

        self._analyze_code_structure(tree)

        self._analyze_duplication(tree)

        

        return self.issues

    

    def _analyze_complexity(self, tree: ast.AST) -> None:

        """Analyze cyclomatic complexity of functions."""

        for node in ast.walk(tree):

            if isinstance(node, ast.FunctionDef):

                complexity = self._calculate_complexity(node)

                if complexity > 10:

                    issue = CodeIssue(

                        issue_type=IssueType.COMPLEXITY,

                        severity="high",

                        line_number=node.lineno,

                        description=f"Function '{node.name}' has high complexity ({complexity})",

                        suggestion="Consider breaking this function into smaller functions",

                        confidence=0.9

                    )

                    self.issues.append(issue)

    

    def _calculate_complexity(self, node: ast.FunctionDef) -> int:

        """Calculate cyclomatic complexity for a function."""

        complexity = 1  # Base complexity

        

        for child in ast.walk(node):

            if isinstance(child, (ast.If, ast.While, ast.For, ast.With)):

                complexity += 1

            elif isinstance(child, ast.ExceptHandler):

                complexity += 1

            elif isinstance(child, ast.BoolOp):

                complexity += len(child.values) - 1

        

        return complexity

    

    def _analyze_naming_conventions(self, tree: ast.AST) -> None:

        """Check for naming convention violations."""

        for node in ast.walk(tree):

            if isinstance(node, ast.FunctionDef):

                if not node.name.islower() or '__' in node.name:

                    if not node.name.startswith('_'):

                        issue = CodeIssue(

                            issue_type=IssueType.NAMING,

                            severity="medium",

                            line_number=node.lineno,

                            description=f"Function '{node.name}' doesn't follow snake_case convention",

                            suggestion="Rename function to use snake_case naming",

                            confidence=0.8

                        )

                        self.issues.append(issue)

    

    def _analyze_code_structure(self, tree: ast.AST) -> None:

        """Analyze code structure for improvement opportunities."""

        for node in ast.walk(tree):

            if isinstance(node, ast.FunctionDef):

                # Check for deeply nested code

                max_depth = self._calculate_nesting_depth(node)

                if max_depth > 4:

                    issue = CodeIssue(

                        issue_type=IssueType.STRUCTURE,

                        severity="medium",

                        line_number=node.lineno,

                        description=f"Function '{node.name}' has deep nesting (depth: {max_depth})",

                        suggestion="Consider using early returns or extracting nested logic",

                        confidence=0.7

                    )

                    self.issues.append(issue)

    

    def _calculate_nesting_depth(self, node: ast.AST, current_depth: int = 0) -> int:

        """Calculate maximum nesting depth in a code block."""

        max_depth = current_depth

        

        for child in ast.iter_child_nodes(node):

            if isinstance(child, (ast.If, ast.While, ast.For, ast.With, ast.Try)):

                child_depth = self._calculate_nesting_depth(child, current_depth + 1)

                max_depth = max(max_depth, child_depth)

        

        return max_depth

    

    def _analyze_duplication(self, tree: ast.AST) -> None:

        """Identify potential code duplication."""

        function_bodies = {}

        

        for node in ast.walk(tree):

            if isinstance(node, ast.FunctionDef):

                body_hash = self._hash_function_body(node)

                if body_hash in function_bodies:

                    issue = CodeIssue(

                        issue_type=IssueType.DUPLICATION,

                        severity="medium",

                        line_number=node.lineno,

                        description=f"Function '{node.name}' appears to duplicate logic",

                        suggestion="Consider extracting common logic into a shared function",

                        confidence=0.6

                    )

                    self.issues.append(issue)

                else:

                    function_bodies[body_hash] = node.name

    

    def _hash_function_body(self, node: ast.FunctionDef) -> str:

        """Create a simple hash of function body for duplication detection."""

        body_str = ""

        for stmt in node.body:

            body_str += ast.dump(stmt)

        return str(hash(body_str))


This analysis engine demonstrates the fundamental approach to code analysis within an Agentic AI system. The engine parses Python source code into an abstract syntax tree and then performs multiple analysis passes to identify different types of issues. Each analysis method focuses on a specific aspect of code quality, such as complexity, naming conventions, or structural problems.


Implementing the LLM Integration Layer

The Large Language Model integration layer provides the AI capabilities that enable the system to understand code semantics and generate intelligent refactoring suggestions. This component abstracts the interaction with both local and remote LLM services, providing a unified interface for code analysis and generation.

The following implementation shows how to create a flexible LLM integration system that supports both local and remote models:


from abc import ABC, abstractmethod

import json

import requests

from typing import Dict, List, Optional

import openai

from transformers import AutoTokenizer, AutoModelForCausalLM

import torch


class LLMProvider(ABC):

    """Abstract base class for LLM providers."""

    

    @abstractmethod

    def analyze_code(self, code: str, context: Dict[str, Any]) -> Dict[str, Any]:

        """Analyze code and return suggestions."""

        pass

    

    @abstractmethod

    def generate_refactored_code(self, original_code: str, instructions: str) -> str:

        """Generate refactored code based on instructions."""

        pass


class RemoteLLMProvider(LLMProvider):

    """Provider for remote LLM services like OpenAI GPT."""

    

    def __init__(self, api_key: str, model_name: str = "gpt-4"):

        self.client = openai.OpenAI(api_key=api_key)

        self.model_name = model_name

    

    def analyze_code(self, code: str, context: Dict[str, Any]) -> Dict[str, Any]:

        """Analyze code using remote LLM service."""

        prompt = self._create_analysis_prompt(code, context)

        

        try:

            response = self.client.chat.completions.create(

                model=self.model_name,

                messages=[

                    {"role": "system", "content": "You are an expert code analyzer. Analyze the provided code and suggest improvements."},

                    {"role": "user", "content": prompt}

                ],

                temperature=0.1,

                max_tokens=2000

            )

            

            return self._parse_analysis_response(response.choices[0].message.content)

        

        except Exception as e:

            return {"error": f"LLM analysis failed: {str(e)}"}

    

    def generate_refactored_code(self, original_code: str, instructions: str) -> str:

        """Generate refactored code using remote LLM."""

        prompt = f"""

        Original code:

        {original_code}

        

        Refactoring instructions:

        {instructions}

        

        Please provide the refactored code that implements these improvements while maintaining the same functionality.

        """

        

        try:

            response = self.client.chat.completions.create(

                model=self.model_name,

                messages=[

                    {"role": "system", "content": "You are an expert software engineer. Refactor the provided code according to the given instructions."},

                    {"role": "user", "content": prompt}

                ],

                temperature=0.1,

                max_tokens=3000

            )

            

            return self._extract_code_from_response(response.choices[0].message.content)

        

        except Exception as e:

            return f"# Error generating refactored code: {str(e)}\n{original_code}"

    

    def _create_analysis_prompt(self, code: str, context: Dict[str, Any]) -> str:

        """Create a detailed prompt for code analysis."""

        return f"""

        Please analyze the following code for potential improvements:

        

        Code:

        {code}

        

        Context:

        - Language: {context.get('language', 'Python')}

        - Project type: {context.get('project_type', 'General')}

        - Performance requirements: {context.get('performance_requirements', 'Standard')}

        

        Please identify:

        1. Code smells and anti-patterns

        2. Performance optimization opportunities

        3. Readability improvements

        4. Maintainability enhancements

        5. Security considerations

        

        Format your response as JSON with the following structure:

        {{

            "issues": [

                {{

                    "type": "issue_type",

                    "severity": "low|medium|high",

                    "description": "detailed description",

                    "suggestion": "specific improvement suggestion",

                    "line_numbers": [1, 2, 3]

                }}

            ],

            "overall_assessment": "general code quality assessment"

        }}

        """

    

    def _parse_analysis_response(self, response: str) -> Dict[str, Any]:

        """Parse the LLM response into structured data."""

        try:

            # Extract JSON from response if it's embedded in text

            start_idx = response.find('{')

            end_idx = response.rfind('}') + 1

            

            if start_idx != -1 and end_idx != 0:

                json_str = response[start_idx:end_idx]

                return json.loads(json_str)

            else:

                return {"error": "Could not parse LLM response"}

        

        except json.JSONDecodeError:

            return {"error": "Invalid JSON in LLM response", "raw_response": response}

    

    def _extract_code_from_response(self, response: str) -> str:

        """Extract code from LLM response."""

        # Look for code blocks marked with triple backticks

        lines = response.split('\n')

        code_lines = []

        in_code_block = False

        

        for line in lines:

            if line.strip().startswith('```'):

                in_code_block = not in_code_block

                continue

            

            if in_code_block:

                code_lines.append(line)

        

        if code_lines:

            return '\n'.join(code_lines)

        else:

            return response  # Return full response if no code blocks found


class LocalLLMProvider(LLMProvider):

    """Provider for local LLM models."""

    

    def __init__(self, model_path: str, device: str = "cuda" if torch.cuda.is_available() else "cpu"):

        self.device = device

        self.tokenizer = AutoTokenizer.from_pretrained(model_path)

        self.model = AutoModelForCausalLM.from_pretrained(

            model_path,

            torch_dtype=torch.float16 if device == "cuda" else torch.float32,

            device_map="auto" if device == "cuda" else None

        )

        

        if self.tokenizer.pad_token is None:

            self.tokenizer.pad_token = self.tokenizer.eos_token

    

    def analyze_code(self, code: str, context: Dict[str, Any]) -> Dict[str, Any]:

        """Analyze code using local LLM."""

        prompt = f"""

        Analyze this code for improvements:

        

        {code}

        

        Identify issues and suggest improvements. Respond in JSON format.

        """

        

        try:

            response = self._generate_response(prompt, max_length=1000)

            return self._parse_analysis_response(response)

        

        except Exception as e:

            return {"error": f"Local LLM analysis failed: {str(e)}"}

    

    def generate_refactored_code(self, original_code: str, instructions: str) -> str:

        """Generate refactored code using local LLM."""

        prompt = f"""

        Refactor this code according to the instructions:

        

        Original code:

        {original_code}

        

        Instructions:

        {instructions}

        

        Refactored code:

        """

        

        try:

            return self._generate_response(prompt, max_length=2000)

        

        except Exception as e:

            return f"# Error: {str(e)}\n{original_code}"

    

    def _generate_response(self, prompt: str, max_length: int = 1000) -> str:

        """Generate response using the local model."""

        inputs = self.tokenizer.encode(prompt, return_tensors="pt", truncation=True, max_length=512)

        inputs = inputs.to(self.device)

        

        with torch.no_grad():

            outputs = self.model.generate(

                inputs,

                max_length=max_length,

                num_return_sequences=1,

                temperature=0.1,

                do_sample=True,

                pad_token_id=self.tokenizer.eos_token_id

            )

        

        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        # Remove the original prompt from the response

        response = response[len(prompt):].strip()

        

        return response

    

    def _parse_analysis_response(self, response: str) -> Dict[str, Any]:

        """Parse analysis response from local LLM."""

        try:

            # Try to extract JSON from the response

            start_idx = response.find('{')

            end_idx = response.rfind('}') + 1

            

            if start_idx != -1 and end_idx != 0:

                json_str = response[start_idx:end_idx]

                return json.loads(json_str)

            else:

                # If no JSON found, create a simple structure

                return {

                    "issues": [],

                    "overall_assessment": response,

                    "note": "Could not parse structured response"

                }

        

        except json.JSONDecodeError:

            return {

                "error": "Could not parse JSON response",

                "raw_response": response

            }


class LLMManager:

    """Manager class for handling different LLM providers."""

    

    def __init__(self, provider: LLMProvider):

        self.provider = provider

    

    def switch_provider(self, new_provider: LLMProvider):

        """Switch to a different LLM provider."""

        self.provider = new_provider

    

    def analyze_code(self, code: str, context: Dict[str, Any] = None) -> Dict[str, Any]:

        """Analyze code using the current provider."""

        if context is None:

            context = {}

        

        return self.provider.analyze_code(code, context)

    

    def generate_refactored_code(self, original_code: str, instructions: str) -> str:

        """Generate refactored code using the current provider."""

        return self.provider.generate_refactored_code(original_code, instructions)

```


This LLM integration layer provides a flexible foundation for incorporating AI capabilities into the refactoring system. The abstract base class allows for easy switching between different LLM providers, while the concrete implementations handle the specifics of remote and local model interaction.


Creating the Decision Making and User Interaction Component

The decision-making component orchestrates the interaction between the analysis engine, LLM provider, and user interface. This component presents findings to the user, collects feedback, and coordinates the application of approved refactorings.

Here is an implementation of the decision-making and user interaction system:


from typing import List, Dict, Any, Optional, Tuple

from dataclasses import dataclass, asdict

import json

from enum import Enum


class RefactoringAction(Enum):

    EXTRACT_METHOD = "extract_method"

    RENAME_VARIABLE = "rename_variable"

    SIMPLIFY_CONDITIONAL = "simplify_conditional"

    REDUCE_COMPLEXITY = "reduce_complexity"

    OPTIMIZE_PERFORMANCE = "optimize_performance"


@dataclass

class RefactoringSuggestion:

    action: RefactoringAction

    description: str

    original_code: str

    refactored_code: str

    confidence: float

    impact_assessment: str

    line_range: Tuple[int, int]

    benefits: List[str]

    risks: List[str]


class RefactoringDecisionEngine:

    """Engine for making refactoring decisions and managing user interaction."""

    

    def __init__(self, analysis_engine: CodeAnalysisEngine, llm_manager: LLMManager):

        self.analysis_engine = analysis_engine

        self.llm_manager = llm_manager

        self.suggestions = []

        self.approved_suggestions = []

    

    def analyze_and_suggest(self, file_path: str, code_content: str = None) -> List[RefactoringSuggestion]:

        """Analyze code and generate refactoring suggestions."""

        if code_content is None:

            with open(file_path, 'r') as file:

                code_content = file.read()

        

        # Perform static analysis

        issues = self.analysis_engine.analyze_file(file_path)

        

        # Get LLM insights

        context = {

            'language': 'Python',

            'file_path': file_path,

            'static_issues': [asdict(issue) for issue in issues]

        }

        

        llm_analysis = self.llm_manager.analyze_code(code_content, context)

        

        # Generate refactoring suggestions

        self.suggestions = self._create_refactoring_suggestions(

            code_content, issues, llm_analysis

        )

        

        return self.suggestions

    

    def _create_refactoring_suggestions(

        self, 

        code: str, 

        static_issues: List[CodeIssue], 

        llm_analysis: Dict[str, Any]

    ) -> List[RefactoringSuggestion]:

        """Create detailed refactoring suggestions from analysis results."""

        suggestions = []

        code_lines = code.split('\n')

        

        # Process static analysis issues

        for issue in static_issues:

            suggestion = self._create_suggestion_from_issue(issue, code, code_lines)

            if suggestion:

                suggestions.append(suggestion)

        

        # Process LLM suggestions

        if 'issues' in llm_analysis:

            for llm_issue in llm_analysis['issues']:

                suggestion = self._create_suggestion_from_llm_issue(llm_issue, code, code_lines)

                if suggestion:

                    suggestions.append(suggestion)

        

        return suggestions

    

    def _create_suggestion_from_issue(

        self, 

        issue: CodeIssue, 

        code: str, 

        code_lines: List[str]

    ) -> Optional[RefactoringSuggestion]:

        """Create a refactoring suggestion from a static analysis issue."""

        

        if issue.issue_type == IssueType.COMPLEXITY:

            return self._create_complexity_reduction_suggestion(issue, code, code_lines)

        elif issue.issue_type == IssueType.NAMING:

            return self._create_naming_suggestion(issue, code, code_lines)

        elif issue.issue_type == IssueType.STRUCTURE:

            return self._create_structure_improvement_suggestion(issue, code, code_lines)

        

        return None

    

    def _create_complexity_reduction_suggestion(

        self, 

        issue: CodeIssue, 

        code: str, 

        code_lines: List[str]

    ) -> RefactoringSuggestion:

        """Create a suggestion for reducing code complexity."""

        

        # Extract the problematic function

        function_code = self._extract_function_at_line(code_lines, issue.line_number)

        

        # Generate refactored version using LLM

        instructions = f"""

        Reduce the complexity of this function by:

        1. Extracting helper methods for complex logic

        2. Using early returns to reduce nesting

        3. Simplifying conditional expressions

        4. Breaking down large functions into smaller, focused functions

        

        Maintain the same functionality while improving readability and maintainability.

        """

        

        refactored_code = self.llm_manager.generate_refactored_code(function_code, instructions)

        

        return RefactoringSuggestion(

            action=RefactoringAction.REDUCE_COMPLEXITY,

            description=f"Reduce complexity in function starting at line {issue.line_number}",

            original_code=function_code,

            refactored_code=refactored_code,

            confidence=issue.confidence,

            impact_assessment="Medium - Improves maintainability and testability",

            line_range=(issue.line_number, issue.line_number + function_code.count('\n')),

            benefits=[

                "Improved readability",

                "Easier testing",

                "Better maintainability",

                "Reduced cognitive load"

            ],

            risks=[

                "Potential introduction of bugs if not properly tested",

                "May require updates to existing tests"

            ]

        )

    

    def _create_naming_suggestion(

        self, 

        issue: CodeIssue, 

        code: str, 

        code_lines: List[str]

    ) -> RefactoringSuggestion:

        """Create a suggestion for improving naming conventions."""

        

        line_content = code_lines[issue.line_number - 1] if issue.line_number <= len(code_lines) else ""

        

        # Generate better name using LLM

        instructions = f"""

        Improve the naming in this code line to follow Python naming conventions:

        {line_content}

        

        Provide a more descriptive and convention-compliant name.

        """

        

        refactored_line = self.llm_manager.generate_refactored_code(line_content, instructions)

        

        return RefactoringSuggestion(

            action=RefactoringAction.RENAME_VARIABLE,

            description=f"Improve naming convention at line {issue.line_number}",

            original_code=line_content,

            refactored_code=refactored_line,

            confidence=issue.confidence,

            impact_assessment="Low - Simple naming improvement",

            line_range=(issue.line_number, issue.line_number),

            benefits=[

                "Better code readability",

                "Follows Python conventions",

                "Improved code consistency"

            ],

            risks=[

                "Minimal risk - simple rename operation"

            ]

        )

    

    def _create_structure_improvement_suggestion(

        self, 

        issue: CodeIssue, 

        code: str, 

        code_lines: List[str]

    ) -> RefactoringSuggestion:

        """Create a suggestion for improving code structure."""

        

        function_code = self._extract_function_at_line(code_lines, issue.line_number)

        

        instructions = f"""

        Improve the structure of this function by:

        1. Reducing nesting levels using early returns

        2. Extracting complex conditional logic into helper functions

        3. Using guard clauses where appropriate

        4. Improving the overall flow and readability

        

        Keep the same functionality while making the code more readable.

        """

        

        refactored_code = self.llm_manager.generate_refactored_code(function_code, instructions)

        

        return RefactoringSuggestion(

            action=RefactoringAction.SIMPLIFY_CONDITIONAL,

            description=f"Improve code structure starting at line {issue.line_number}",

            original_code=function_code,

            refactored_code=refactored_code,

            confidence=issue.confidence,

            impact_assessment="Medium - Improves code readability and flow",

            line_range=(issue.line_number, issue.line_number + function_code.count('\n')),

            benefits=[

                "Reduced nesting complexity",

                "Improved readability",

                "Better code flow",

                "Easier to understand logic"

            ],

            risks=[

                "May require careful testing of conditional logic",

                "Potential for logic errors if not properly validated"

            ]

        )

    

    def _extract_function_at_line(self, code_lines: List[str], line_number: int) -> str:

        """Extract the complete function that starts at or contains the given line."""

        # Simple implementation - in practice, this would use AST analysis

        start_line = line_number - 1

        

        # Find the start of the function

        while start_line > 0 and not code_lines[start_line].strip().startswith('def '):

            start_line -= 1

        

        # Find the end of the function

        end_line = start_line + 1

        base_indent = len(code_lines[start_line]) - len(code_lines[start_line].lstrip())

        

        while end_line < len(code_lines):

            line = code_lines[end_line]

            if line.strip() == "":

                end_line += 1

                continue

            

            current_indent = len(line) - len(line.lstrip())

            if current_indent <= base_indent and line.strip():

                break

            

            end_line += 1

        

        return '\n'.join(code_lines[start_line:end_line])

    

    def _create_suggestion_from_llm_issue(

        self, 

        llm_issue: Dict[str, Any], 

        code: str, 

        code_lines: List[str]

    ) -> Optional[RefactoringSuggestion]:

        """Create a refactoring suggestion from an LLM-identified issue."""

        

        issue_type = llm_issue.get('type', 'general')

        line_numbers = llm_issue.get('line_numbers', [1])

        start_line = min(line_numbers) if line_numbers else 1

        

        # Extract relevant code section

        relevant_code = self._extract_code_section(code_lines, line_numbers)

        

        # Generate refactored version

        instructions = f"""

        Address this issue: {llm_issue.get('description', '')}

        

        Suggestion: {llm_issue.get('suggestion', '')}

        

        Please provide improved code that addresses these concerns.

        """

        

        refactored_code = self.llm_manager.generate_refactored_code(relevant_code, instructions)

        

        return RefactoringSuggestion(

            action=RefactoringAction.OPTIMIZE_PERFORMANCE,  # Default action

            description=llm_issue.get('description', 'LLM-identified improvement'),

            original_code=relevant_code,

            refactored_code=refactored_code,

            confidence=0.7,  # Default confidence for LLM suggestions

            impact_assessment=f"Severity: {llm_issue.get('severity', 'medium')}",

            line_range=(start_line, max(line_numbers) if line_numbers else start_line),

            benefits=[llm_issue.get('suggestion', 'General improvement')],

            risks=["Requires careful review and testing"]

        )

    

    def _extract_code_section(self, code_lines: List[str], line_numbers: List[int]) -> str:

        """Extract a section of code based on line numbers."""

        if not line_numbers:

            return ""

        

        start_line = max(0, min(line_numbers) - 1)

        end_line = min(len(code_lines), max(line_numbers))

        

        return '\n'.join(code_lines[start_line:end_line])

    

    def present_suggestions_to_user(self) -> List[RefactoringSuggestion]:

        """Present suggestions to user and collect their decisions."""

        print("=== Code Refactoring Suggestions ===\n")

        

        approved_suggestions = []

        

        for i, suggestion in enumerate(self.suggestions, 1):

            print(f"Suggestion {i}: {suggestion.description}")

            print(f"Action: {suggestion.action.value}")

            print(f"Confidence: {suggestion.confidence:.1%}")

            print(f"Impact: {suggestion.impact_assessment}")

            print(f"Lines: {suggestion.line_range[0]}-{suggestion.line_range[1]}")

            

            print("\nBenefits:")

            for benefit in suggestion.benefits:

                print(f"  + {benefit}")

            

            print("\nRisks:")

            for risk in suggestion.risks:

                print(f"  - {risk}")

            

            print(f"\nOriginal code:")

            print(f"```\n{suggestion.original_code}\n```")

            

            print(f"\nRefactored code:")

            print(f"```\n{suggestion.refactored_code}\n```")

            

            while True:

                choice = input(f"\nApply this refactoring? (y/n/s for show details): ").lower().strip()

                if choice in ['y', 'yes']:

                    approved_suggestions.append(suggestion)

                    print("✓ Refactoring approved")

                    break

                elif choice in ['n', 'no']:

                    print("✗ Refactoring declined")

                    break

                elif choice in ['s', 'show']:

                    self._show_detailed_comparison(suggestion)

                else:

                    print("Please enter 'y' for yes, 'n' for no, or 's' to show details")

            

            print("\n" + "="*60 + "\n")

        

        self.approved_suggestions = approved_suggestions

        return approved_suggestions

    

    def _show_detailed_comparison(self, suggestion: RefactoringSuggestion):

        """Show a detailed side-by-side comparison of original and refactored code."""

        print("\n--- DETAILED COMPARISON ---")

        

        original_lines = suggestion.original_code.split('\n')

        refactored_lines = suggestion.refactored_code.split('\n')

        

        max_lines = max(len(original_lines), len(refactored_lines))

        

        print(f"{'ORIGINAL':<40} | {'REFACTORED':<40}")

        print("-" * 83)

        

        for i in range(max_lines):

            orig_line = original_lines[i] if i < len(original_lines) else ""

            refact_line = refactored_lines[i] if i < len(refactored_lines) else ""

            

            print(f"{orig_line:<40} | {refact_line:<40}")

        

        print("-" * 83)


This decision-making engine provides a comprehensive interface for presenting refactoring suggestions to users and collecting their feedback. The system presents detailed information about each suggestion, including benefits, risks, and confidence levels, allowing users to make informed decisions about which refactorings to apply.

The implementation continues with the backup system and code application components.


Implementing the Backup and Rollback System

The backup and rollback system ensures that all refactoring operations can be safely reversed if needed. This component creates comprehensive backups before any modifications and provides mechanisms for restoring previous states of the codebase.

Here is a detailed implementation of the backup and rollback system:


import os

import shutil

import json

import hashlib

from datetime import datetime

from typing import Dict, List, Optional, Tuple

from dataclasses import dataclass, asdict

from pathlib import Path

import git

from git import Repo, InvalidGitRepositoryError


@dataclass

class BackupMetadata:

    backup_id: str

    timestamp: datetime

    original_files: List[str]

    backup_location: str

    operation_description: str

    file_checksums: Dict[str, str]

    git_commit_hash: Optional[str] = None

    git_branch: Optional[str] = None


class BackupManager:

    """Manages backup and rollback operations for code refactoring."""

    

    def __init__(self, project_root: str, backup_root: str = None):

        self.project_root = Path(project_root).resolve()

        self.backup_root = Path(backup_root or self.project_root / ".refactoring_backups").resolve()

        self.backup_root.mkdir(exist_ok=True)

        self.metadata_file = self.backup_root / "backup_metadata.json"

        self.backups = self._load_backup_metadata()

        

        # Try to initialize git repository information

        self.git_repo = self._initialize_git_repo()

    

    def _initialize_git_repo(self) -> Optional[Repo]:

        """Initialize git repository if available."""

        try:

            return Repo(self.project_root, search_parent_directories=True)

        except InvalidGitRepositoryError:

            return None

    

    def _load_backup_metadata(self) -> Dict[str, BackupMetadata]:

        """Load existing backup metadata from disk."""

        if not self.metadata_file.exists():

            return {}

        

        try:

            with open(self.metadata_file, 'r') as f:

                data = json.load(f)

            

            backups = {}

            for backup_id, metadata in data.items():

                metadata['timestamp'] = datetime.fromisoformat(metadata['timestamp'])

                backups[backup_id] = BackupMetadata(**metadata)

            

            return backups

        

        except (json.JSONDecodeError, KeyError, ValueError) as e:

            print(f"Warning: Could not load backup metadata: {e}")

            return {}

    

    def _save_backup_metadata(self):

        """Save backup metadata to disk."""

        data = {}

        for backup_id, metadata in self.backups.items():

            metadata_dict = asdict(metadata)

            metadata_dict['timestamp'] = metadata.timestamp.isoformat()

            data[backup_id] = metadata_dict

        

        with open(self.metadata_file, 'w') as f:

            json.dump(data, f, indent=2)

    

    def create_backup(self, files_to_backup: List[str], operation_description: str) -> str:

        """Create a backup of specified files before refactoring."""

        backup_id = self._generate_backup_id(operation_description)

        backup_dir = self.backup_root / backup_id

        backup_dir.mkdir(exist_ok=True)

        

        file_checksums = {}

        backed_up_files = []

        

        print(f"Creating backup {backup_id}...")

        

        for file_path in files_to_backup:

            source_path = Path(file_path)

            if not source_path.is_absolute():

                source_path = self.project_root / source_path

            

            if not source_path.exists():

                print(f"Warning: File {source_path} does not exist, skipping backup")

                continue

            

            # Calculate file checksum

            checksum = self._calculate_file_checksum(source_path)

            file_checksums[str(source_path.relative_to(self.project_root))] = checksum

            

            # Create backup directory structure

            relative_path = source_path.relative_to(self.project_root)

            backup_file_path = backup_dir / relative_path

            backup_file_path.parent.mkdir(parents=True, exist_ok=True)

            

            # Copy file to backup location

            shutil.copy2(source_path, backup_file_path)

            backed_up_files.append(str(relative_path))

            

            print(f"  Backed up: {relative_path}")

        

        # Capture git information if available

        git_commit_hash = None

        git_branch = None

        if self.git_repo:

            try:

                git_commit_hash = self.git_repo.head.commit.hexsha

                git_branch = self.git_repo.active_branch.name

            except Exception as e:

                print(f"Warning: Could not capture git information: {e}")

        

        # Create backup metadata

        metadata = BackupMetadata(

            backup_id=backup_id,

            timestamp=datetime.now(),

            original_files=backed_up_files,

            backup_location=str(backup_dir),

            operation_description=operation_description,

            file_checksums=file_checksums,

            git_commit_hash=git_commit_hash,

            git_branch=git_branch

        )

        

        self.backups[backup_id] = metadata

        self._save_backup_metadata()

        

        print(f"Backup {backup_id} created successfully")

        return backup_id

    

    def restore_backup(self, backup_id: str, files_to_restore: List[str] = None) -> bool:

        """Restore files from a specific backup."""

        if backup_id not in self.backups:

            print(f"Error: Backup {backup_id} not found")

            return False

        

        metadata = self.backups[backup_id]

        backup_dir = Path(metadata.backup_location)

        

        if not backup_dir.exists():

            print(f"Error: Backup directory {backup_dir} does not exist")

            return False

        

        files_to_restore = files_to_restore or metadata.original_files

        

        print(f"Restoring backup {backup_id}...")

        

        restored_files = []

        for relative_file_path in files_to_restore:

            if relative_file_path not in metadata.original_files:

                print(f"Warning: File {relative_file_path} was not in the original backup")

                continue

            

            backup_file_path = backup_dir / relative_file_path

            target_file_path = self.project_root / relative_file_path

            

            if not backup_file_path.exists():

                print(f"Error: Backup file {backup_file_path} does not exist")

                continue

            

            # Ensure target directory exists

            target_file_path.parent.mkdir(parents=True, exist_ok=True)

            

            # Restore the file

            shutil.copy2(backup_file_path, target_file_path)

            restored_files.append(relative_file_path)

            

            print(f"  Restored: {relative_file_path}")

        

        print(f"Restored {len(restored_files)} files from backup {backup_id}")

        return True

    

    def list_backups(self) -> List[BackupMetadata]:

        """List all available backups."""

        return sorted(self.backups.values(), key=lambda x: x.timestamp, reverse=True)

    

    def delete_backup(self, backup_id: str) -> bool:

        """Delete a specific backup."""

        if backup_id not in self.backups:

            print(f"Error: Backup {backup_id} not found")

            return False

        

        metadata = self.backups[backup_id]

        backup_dir = Path(metadata.backup_location)

        

        if backup_dir.exists():

            shutil.rmtree(backup_dir)

        

        del self.backups[backup_id]

        self._save_backup_metadata()

        

        print(f"Backup {backup_id} deleted successfully")

        return True

    

    def verify_backup_integrity(self, backup_id: str) -> bool:

        """Verify the integrity of a backup by checking file checksums."""

        if backup_id not in self.backups:

            print(f"Error: Backup {backup_id} not found")

            return False

        

        metadata = self.backups[backup_id]

        backup_dir = Path(metadata.backup_location)

        

        print(f"Verifying backup {backup_id}...")

        

        all_files_valid = True

        for relative_file_path, expected_checksum in metadata.file_checksums.items():

            backup_file_path = backup_dir / relative_file_path

            

            if not backup_file_path.exists():

                print(f"Error: Backup file {backup_file_path} is missing")

                all_files_valid = False

                continue

            

            actual_checksum = self._calculate_file_checksum(backup_file_path)

            if actual_checksum != expected_checksum:

                print(f"Error: Checksum mismatch for {relative_file_path}")

                all_files_valid = False

            else:

                print(f"  ✓ {relative_file_path}")

        

        if all_files_valid:

            print(f"Backup {backup_id} integrity verified successfully")

        else:

            print(f"Backup {backup_id} has integrity issues")

        

        return all_files_valid

    

    def cleanup_old_backups(self, max_backups: int = 10, max_age_days: int = 30):

        """Clean up old backups based on count and age limits."""

        backups_by_date = sorted(self.backups.items(), 

                                key=lambda x: x[1].timestamp, reverse=True)

        

        cutoff_date = datetime.now() - timedelta(days=max_age_days)

        backups_to_delete = []

        

        # Mark backups for deletion based on count limit

        if len(backups_by_date) > max_backups:

            for backup_id, metadata in backups_by_date[max_backups:]:

                backups_to_delete.append(backup_id)

        

        # Mark backups for deletion based on age limit

        for backup_id, metadata in backups_by_date:

            if metadata.timestamp < cutoff_date and backup_id not in backups_to_delete:

                backups_to_delete.append(backup_id)

        

        # Delete marked backups

        for backup_id in backups_to_delete:

            print(f"Cleaning up old backup: {backup_id}")

            self.delete_backup(backup_id)

        

        if backups_to_delete:

            print(f"Cleaned up {len(backups_to_delete)} old backups")

        else:

            print("No old backups to clean up")

    

    def _generate_backup_id(self, operation_description: str) -> str:

        """Generate a unique backup ID."""

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        description_hash = hashlib.md5(operation_description.encode()).hexdigest()[:8]

        return f"backup_{timestamp}_{description_hash}"

    

    def _calculate_file_checksum(self, file_path: Path) -> str:

        """Calculate SHA-256 checksum of a file."""

        sha256_hash = hashlib.sha256()

        with open(file_path, "rb") as f:

            for chunk in iter(lambda: f.read(4096), b""):

                sha256_hash.update(chunk)

        return sha256_hash.hexdigest()


class GitIntegratedBackupManager(BackupManager):

    """Extended backup manager with enhanced git integration."""

    

    def create_git_backup_branch(self, operation_description: str) -> Optional[str]:

        """Create a git branch as a backup before refactoring."""

        if not self.git_repo:

            print("Warning: No git repository found, cannot create git backup branch")

            return None

        

        try:

            # Ensure we're on a clean working directory

            if self.git_repo.is_dirty():

                print("Warning: Working directory has uncommitted changes")

                response = input("Commit changes before creating backup branch? (y/n): ")

                if response.lower() in ['y', 'yes']:

                    self.git_repo.git.add(A=True)

                    self.git_repo.git.commit(m="Pre-refactoring commit")

                else:

                    print("Proceeding with dirty working directory")

            

            # Create backup branch

            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

            branch_name = f"refactoring_backup_{timestamp}"

            

            backup_branch = self.git_repo.create_head(branch_name)

            print(f"Created git backup branch: {branch_name}")

            

            return branch_name

        

        except Exception as e:

            print(f"Error creating git backup branch: {e}")

            return None

    

    def restore_from_git_branch(self, branch_name: str) -> bool:

        """Restore code from a git backup branch."""

        if not self.git_repo:

            print("Error: No git repository found")

            return False

        

        try:

            # Check if branch exists

            if branch_name not in [branch.name for branch in self.git_repo.branches]:

                print(f"Error: Branch {branch_name} does not exist")

                return False

            

            # Warn about uncommitted changes

            if self.git_repo.is_dirty():

                print("Warning: You have uncommitted changes that will be lost")

                response = input("Continue with restore? (y/n): ")

                if response.lower() not in ['y', 'yes']:

                    return False

            

            # Switch to backup branch

            self.git_repo.git.checkout(branch_name)

            print(f"Restored code from git branch: {branch_name}")

            

            return True

        

        except Exception as e:

            print(f"Error restoring from git branch: {e}")

            return False


Implementing the Code Application and Generation System

The code application system is responsible for taking approved refactoring suggestions and applying them to the actual source files. This component must handle the complexities of modifying code while maintaining file integrity and proper error handling.

Here is the implementation of the code application system:


import ast

import re

from typing import List, Dict, Any, Optional, Tuple

from pathlib import Path

import difflib

from dataclasses import dataclass


@dataclass

class CodeModification:

    file_path: str

    start_line: int

    end_line: int

    original_content: str

    new_content: str

    modification_type: str


class CodeApplicationEngine:

    """Engine for applying refactoring changes to source code files."""

    

    def __init__(self, backup_manager: BackupManager):

        self.backup_manager = backup_manager

        self.applied_modifications = []

    

    def apply_refactoring_suggestions(

        self, 

        suggestions: List[RefactoringSuggestion],

        create_backup: bool = True

    ) -> Dict[str, Any]:

        """Apply a list of refactoring suggestions to the codebase."""

        

        if not suggestions:

            return {"success": True, "message": "No suggestions to apply"}

        

        # Group suggestions by file

        suggestions_by_file = self._group_suggestions_by_file(suggestions)

        

        # Create backup if requested

        backup_id = None

        if create_backup:

            files_to_backup = list(suggestions_by_file.keys())

            backup_id = self.backup_manager.create_backup(

                files_to_backup, 

                f"Refactoring operation with {len(suggestions)} suggestions"

            )

        

        results = {

            "success": True,

            "backup_id": backup_id,

            "applied_suggestions": [],

            "failed_suggestions": [],

            "modifications": []

        }

        

        try:

            # Apply suggestions file by file

            for file_path, file_suggestions in suggestions_by_file.items():

                file_results = self._apply_suggestions_to_file(file_path, file_suggestions)

                

                results["applied_suggestions"].extend(file_results["applied"])

                results["failed_suggestions"].extend(file_results["failed"])

                results["modifications"].extend(file_results["modifications"])

                

                if file_results["failed"]:

                    results["success"] = False

        

        except Exception as e:

            results["success"] = False

            results["error"] = str(e)

            

            # Restore backup if something went wrong

            if backup_id:

                print(f"Error occurred, restoring backup {backup_id}")

                self.backup_manager.restore_backup(backup_id)

        

        return results

    

    def _group_suggestions_by_file(

        self, 

        suggestions: List[RefactoringSuggestion]

    ) -> Dict[str, List[RefactoringSuggestion]]:

        """Group refactoring suggestions by the files they affect."""

        suggestions_by_file = {}

        

        for suggestion in suggestions:

            # For this implementation, we assume each suggestion affects one file

            # In practice, you might need to analyze the suggestion to determine affected files

            file_path = self._determine_file_from_suggestion(suggestion)

            

            if file_path not in suggestions_by_file:

                suggestions_by_file[file_path] = []

            

            suggestions_by_file[file_path].append(suggestion)

        

        return suggestions_by_file

    

    def _determine_file_from_suggestion(self, suggestion: RefactoringSuggestion) -> str:

        """Determine which file a suggestion affects."""

        # This is a simplified implementation

        # In practice, you would need more sophisticated logic to determine the target file

        # For now, we assume the suggestion contains file information or we use a default

        return getattr(suggestion, 'file_path', 'current_file.py')

    

    def _apply_suggestions_to_file(

        self, 

        file_path: str, 

        suggestions: List[RefactoringSuggestion]

    ) -> Dict[str, List]:

        """Apply multiple suggestions to a single file."""

        

        # Read the current file content

        try:

            with open(file_path, 'r', encoding='utf-8') as f:

                original_content = f.read()

        except FileNotFoundError:

            return {

                "applied": [],

                "failed": suggestions,

                "modifications": [],

                "error": f"File {file_path} not found"

            }

        

        # Sort suggestions by line number (reverse order to avoid line number shifts)

        sorted_suggestions = sorted(

            suggestions, 

            key=lambda s: s.line_range[0], 

            reverse=True

        )

        

        current_content = original_content

        applied_suggestions = []

        failed_suggestions = []

        modifications = []

        

        for suggestion in sorted_suggestions:

            try:

                modification_result = self._apply_single_suggestion(

                    current_content, suggestion, file_path

                )

                

                if modification_result["success"]:

                    current_content = modification_result["new_content"]

                    applied_suggestions.append(suggestion)

                    modifications.append(modification_result["modification"])

                    

                    print(f"✓ Applied: {suggestion.description}")

                else:

                    failed_suggestions.append(suggestion)

                    print(f"✗ Failed: {suggestion.description} - {modification_result['error']}")

            

            except Exception as e:

                failed_suggestions.append(suggestion)

                print(f"✗ Error applying {suggestion.description}: {str(e)}")

        

        # Write the modified content back to the file

        if applied_suggestions:

            try:

                with open(file_path, 'w', encoding='utf-8') as f:

                    f.write(current_content)

                

                print(f"Updated file: {file_path}")

            

            except Exception as e:

                return {

                    "applied": [],

                    "failed": suggestions,

                    "modifications": [],

                    "error": f"Failed to write to {file_path}: {str(e)}"

                }

        

        return {

            "applied": applied_suggestions,

            "failed": failed_suggestions,

            "modifications": modifications

        }

    

    def _apply_single_suggestion(

        self, 

        content: str, 

        suggestion: RefactoringSuggestion,

        file_path: str

    ) -> Dict[str, Any]:

        """Apply a single refactoring suggestion to file content."""

        

        lines = content.split('\n')

        start_line = suggestion.line_range[0] - 1  # Convert to 0-based indexing

        end_line = suggestion.line_range[1] - 1

        

        # Validate line ranges

        if start_line < 0 or end_line >= len(lines) or start_line > end_line:

            return {

                "success": False,

                "error": f"Invalid line range: {suggestion.line_range}"

            }

        

        # Extract the original code section

        original_section = '\n'.join(lines[start_line:end_line + 1])

        

        # Verify that the original code matches what we expect

        if not self._code_sections_match(original_section, suggestion.original_code):

            return {

                "success": False,

                "error": "Original code doesn't match current file content"

            }

        

        # Apply the refactoring

        if suggestion.action == RefactoringAction.EXTRACT_METHOD:

            new_content = self._apply_extract_method(content, suggestion)

        elif suggestion.action == RefactoringAction.RENAME_VARIABLE:

            new_content = self._apply_rename_variable(content, suggestion)

        elif suggestion.action == RefactoringAction.SIMPLIFY_CONDITIONAL:

            new_content = self._apply_simplify_conditional(content, suggestion)

        elif suggestion.action == RefactoringAction.REDUCE_COMPLEXITY:

            new_content = self._apply_reduce_complexity(content, suggestion)

        else:

            # Generic replacement

            new_content = self._apply_generic_replacement(content, suggestion)

        

        # Validate the new content

        if not self._validate_syntax(new_content, file_path):

            return {

                "success": False,

                "error": "Refactored code has syntax errors"

            }

        

        modification = CodeModification(

            file_path=file_path,

            start_line=start_line + 1,

            end_line=end_line + 1,

            original_content=original_section,

            new_content=suggestion.refactored_code,

            modification_type=suggestion.action.value

        )

        

        return {

            "success": True,

            "new_content": new_content,

            "modification": modification

        }

    

    def _code_sections_match(self, section1: str, section2: str) -> bool:

        """Check if two code sections are functionally equivalent."""

        # Normalize whitespace and compare

        normalized1 = re.sub(r'\s+', ' ', section1.strip())

        normalized2 = re.sub(r'\s+', ' ', section2.strip())

        

        # Allow for some flexibility in matching

        similarity = difflib.SequenceMatcher(None, normalized1, normalized2).ratio()

        return similarity > 0.8  # 80% similarity threshold

    

    def _apply_generic_replacement(

        self, 

        content: str, 

        suggestion: RefactoringSuggestion

    ) -> str:

        """Apply a generic code replacement."""

        lines = content.split('\n')

        start_line = suggestion.line_range[0] - 1

        end_line = suggestion.line_range[1] - 1

        

        # Replace the specified lines with the refactored code

        refactored_lines = suggestion.refactored_code.split('\n')

        new_lines = lines[:start_line] + refactored_lines + lines[end_line + 1:]

        

        return '\n'.join(new_lines)

    

    def _apply_extract_method(

        self, 

        content: str, 

        suggestion: RefactoringSuggestion

    ) -> str:

        """Apply method extraction refactoring."""

        # This is a simplified implementation

        # In practice, you would need sophisticated AST manipulation

        return self._apply_generic_replacement(content, suggestion)

    

    def _apply_rename_variable(

        self, 

        content: str, 

        suggestion: RefactoringSuggestion

    ) -> str:

        """Apply variable renaming refactoring."""

        # Extract old and new variable names from the suggestion

        # This would require more sophisticated parsing in practice

        return self._apply_generic_replacement(content, suggestion)

    

    def _apply_simplify_conditional(

        self, 

        content: str, 

        suggestion: RefactoringSuggestion

    ) -> str:

        """Apply conditional simplification refactoring."""

        return self._apply_generic_replacement(content, suggestion)

    

    def _apply_reduce_complexity(

        self, 

        content: str, 

        suggestion: RefactoringSuggestion

    ) -> str:

        """Apply complexity reduction refactoring."""

        return self._apply_generic_replacement(content, suggestion)

    

    def _validate_syntax(self, content: str, file_path: str) -> bool:

        """Validate that the refactored code has correct syntax."""

        try:

            # For Python files, try to parse as AST

            if file_path.endswith('.py'):

                ast.parse(content)

            return True

        except SyntaxError:

            return False

        except Exception:

            # For other file types, assume valid for now

            return True

    

    def generate_diff_report(self, modifications: List[CodeModification]) -> str:

        """Generate a detailed diff report of all modifications."""

        report_lines = ["=== REFACTORING DIFF REPORT ===\n"]

        

        for modification in modifications:

            report_lines.append(f"File: {modification.file_path}")

            report_lines.append(f"Lines: {modification.start_line}-{modification.end_line}")

            report_lines.append(f"Type: {modification.modification_type}")

            report_lines.append("")

            

            # Generate unified diff

            original_lines = modification.original_content.split('\n')

            new_lines = modification.new_content.split('\n')

            

            diff = difflib.unified_diff(

                original_lines,

                new_lines,

                fromfile=f"{modification.file_path} (original)",

                tofile=f"{modification.file_path} (refactored)",

                lineterm=""

            )

            

            report_lines.extend(diff)

            report_lines.append("\n" + "="*60 + "\n")

        

        return '\n'.join(report_lines)


Creating the Main Agentic AI Orchestrator


The main orchestrator brings all components together into a cohesive system that can be easily used by developers. This component provides the primary interface for the Agentic AI refactoring system.

Here is the implementation of the main orchestrator:


from typing import List, Dict, Any, Optional

import argparse

import sys

from pathlib import Path


class AgenticRefactoringAI:

    """Main orchestrator for the Agentic AI refactoring system."""

    

    def __init__(

        self, 

        project_root: str,

        llm_provider_type: str = "remote",

        llm_config: Dict[str, Any] = None

    ):

        self.project_root = Path(project_root).resolve()

        self.llm_config = llm_config or {}

        

        # Initialize components

        self.analysis_engine = CodeAnalysisEngine()

        self.backup_manager = BackupManager(str(self.project_root))

        self.llm_manager = self._initialize_llm_manager(llm_provider_type)

        self.decision_engine = RefactoringDecisionEngine(

            self.analysis_engine, 

            self.llm_manager

        )

        self.application_engine = CodeApplicationEngine(self.backup_manager)

        

        print(f"Agentic Refactoring AI initialized for project: {self.project_root}")

    

    def _initialize_llm_manager(self, provider_type: str) -> LLMManager:

        """Initialize the appropriate LLM manager based on configuration."""

        if provider_type == "remote":

            api_key = self.llm_config.get("api_key")

            model_name = self.llm_config.get("model_name", "gpt-4")

            

            if not api_key:

                raise ValueError("API key required for remote LLM provider")

            

            provider = RemoteLLMProvider(api_key, model_name)

        

        elif provider_type == "local":

            model_path = self.llm_config.get("model_path")

            device = self.llm_config.get("device", "auto")

            

            if not model_path:

                raise ValueError("Model path required for local LLM provider")

            

            provider = LocalLLMProvider(model_path, device)

        

        else:

            raise ValueError(f"Unknown LLM provider type: {provider_type}")

        

        return LLMManager(provider)

    

    def analyze_file(self, file_path: str, interactive: bool = True) -> Dict[str, Any]:

        """Analyze a single file and optionally apply refactorings interactively."""

        file_path = self._resolve_file_path(file_path)

        

        print(f"Analyzing file: {file_path}")

        

        try:

            # Generate refactoring suggestions

            suggestions = self.decision_engine.analyze_and_suggest(str(file_path))

            

            if not suggestions:

                return {

                    "success": True,

                    "message": "No refactoring suggestions found",

                    "suggestions": []

                }

            

            print(f"Found {len(suggestions)} refactoring suggestions")

            

            if interactive:

                # Present suggestions to user for approval

                approved_suggestions = self.decision_engine.present_suggestions_to_user()

                

                if approved_suggestions:

                    # Apply approved refactorings

                    application_results = self.application_engine.apply_refactoring_suggestions(

                        approved_suggestions

                    )

                    

                    return {

                        "success": application_results["success"],

                        "suggestions": suggestions,

                        "approved_suggestions": approved_suggestions,

                        "application_results": application_results

                    }

                else:

                    return {

                        "success": True,

                        "message": "No refactorings were approved",

                        "suggestions": suggestions

                    }

            else:

                return {

                    "success": True,

                    "suggestions": suggestions

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": str(e)

            }

    

    def analyze_directory(

        self, 

        directory_path: str = None, 

        file_patterns: List[str] = None,

        interactive: bool = True

    ) -> Dict[str, Any]:

        """Analyze all files in a directory matching specified patterns."""

        

        if directory_path is None:

            directory_path = self.project_root

        else:

            directory_path = self._resolve_file_path(directory_path)

        

        if file_patterns is None:

            file_patterns = ["*.py"]  # Default to Python files

        

        print(f"Analyzing directory: {directory_path}")

        print(f"File patterns: {file_patterns}")

        

        # Find matching files

        matching_files = []

        for pattern in file_patterns:

            matching_files.extend(directory_path.glob(f"**/{pattern}"))

        

        if not matching_files:

            return {

                "success": True,

                "message": "No matching files found",

                "files_analyzed": []

            }

        

        print(f"Found {len(matching_files)} files to analyze")

        

        all_suggestions = []

        file_results = {}

        

        # Analyze each file

        for file_path in matching_files:

            print(f"\n--- Analyzing {file_path.relative_to(self.project_root)} ---")

            

            try:

                suggestions = self.decision_engine.analyze_and_suggest(str(file_path))

                file_results[str(file_path)] = {

                    "suggestions": suggestions,

                    "success": True

                }

                all_suggestions.extend(suggestions)

                

                print(f"Found {len(suggestions)} suggestions for {file_path.name}")

            

            except Exception as e:

                file_results[str(file_path)] = {

                    "suggestions": [],

                    "success": False,

                    "error": str(e)

                }

                print(f"Error analyzing {file_path.name}: {e}")

        

        if not all_suggestions:

            return {

                "success": True,

                "message": "No refactoring suggestions found",

                "file_results": file_results

            }

        

        print(f"\nTotal suggestions found: {len(all_suggestions)}")

        

        if interactive:

            # Present all suggestions to user

            self.decision_engine.suggestions = all_suggestions

            approved_suggestions = self.decision_engine.present_suggestions_to_user()

            

            if approved_suggestions:

                # Apply approved refactorings

                application_results = self.application_engine.apply_refactoring_suggestions(

                    approved_suggestions

                )

                

                return {

                    "success": application_results["success"],

                    "file_results": file_results,

                    "approved_suggestions": approved_suggestions,

                    "application_results": application_results

                }

            else:

                return {

                    "success": True,

                    "message": "No refactorings were approved",

                    "file_results": file_results

                }

        else:

            return {

                "success": True,

                "file_results": file_results,

                "total_suggestions": len(all_suggestions)

            }

    

    def restore_backup(self, backup_id: str = None) -> bool:

        """Restore from a backup."""

        if backup_id is None:

            # Show available backups and let user choose

            backups = self.backup_manager.list_backups()

            

            if not backups:

                print("No backups available")

                return False

            

            print("Available backups:")

            for i, backup in enumerate(backups):

                print(f"{i + 1}. {backup.backup_id} - {backup.timestamp} - {backup.operation_description}")

            

            try:

                choice = int(input("Select backup to restore (number): ")) - 1

                if 0 <= choice < len(backups):

                    backup_id = backups[choice].backup_id

                else:

                    print("Invalid selection")

                    return False

            except ValueError:

                print("Invalid input")

                return False

        

        return self.backup_manager.restore_backup(backup_id)

    

    def list_backups(self):

        """List all available backups."""

        backups = self.backup_manager.list_backups()

        

        if not backups:

            print("No backups available")

            return

        

        print("Available backups:")

        for backup in backups:

            print(f"ID: {backup.backup_id}")

            print(f"Date: {backup.timestamp}")

            print(f"Description: {backup.operation_description}")

            print(f"Files: {len(backup.original_files)}")

            if backup.git_commit_hash:

                print(f"Git commit: {backup.git_commit_hash[:8]}")

            print("-" * 40)

    

    def _resolve_file_path(self, file_path: str) -> Path:

        """Resolve a file path relative to the project root."""

        path = Path(file_path)

        if not path.is_absolute():

            path = self.project_root / path

        return path.resolve()


def main():

    """Command-line interface for the Agentic Refactoring AI."""

    parser = argparse.ArgumentParser(description="Agentic AI Code Refactoring Tool")

    parser.add_argument("project_root", help="Root directory of the project to analyze")

    parser.add_argument("--file", help="Specific file to analyze")

    parser.add_argument("--directory", help="Directory to analyze (default: project root)")

    parser.add_argument("--patterns", nargs="+", default=["*.py"], 

                       help="File patterns to match (default: *.py)")

    parser.add_argument("--llm-type", choices=["remote", "local"], default="remote",

                       help="Type of LLM provider to use")

    parser.add_argument("--api-key", help="API key for remote LLM provider")

    parser.add_argument("--model-path", help="Path to local LLM model")

    parser.add_argument("--model-name", default="gpt-4", help="Name of remote model to use")

    parser.add_argument("--non-interactive", action="store_true",

                       help="Run in non-interactive mode (analysis only)")

    parser.add_argument("--list-backups", action="store_true",

                       help="List available backups")

    parser.add_argument("--restore-backup", help="Restore from specific backup ID")

    

    args = parser.parse_args()

    

    # Prepare LLM configuration

    llm_config = {}

    if args.llm_type == "remote":

        if not args.api_key:

            print("Error: API key required for remote LLM provider")

            sys.exit(1)

        llm_config = {

            "api_key": args.api_key,

            "model_name": args.model_name

        }

    elif args.llm_type == "local":

        if not args.model_path:

            print("Error: Model path required for local LLM provider")

            sys.exit(1)

        llm_config = {

            "model_path": args.model_path

        }

    

    try:

        # Initialize the AI system

        ai = AgenticRefactoringAI(

            args.project_root,

            args.llm_type,

            llm_config

        )

        

        # Handle different commands

        if args.list_backups:

            ai.list_backups()

        elif args.restore_backup:

            success = ai.restore_backup(args.restore_backup)

            if success:

                print("Backup restored successfully")

            else:

                print("Failed to restore backup")

        elif args.file:

            # Analyze specific file

            result = ai.analyze_file(args.file, not args.non_interactive)

            if result["success"]:

                print("File analysis completed successfully")

            else:

                print(f"File analysis failed: {result.get('error', 'Unknown error')}")

        else:

            # Analyze directory

            result = ai.analyze_directory(

                args.directory,

                args.patterns,

                not args.non_interactive

            )

            if result["success"]:

                print("Directory analysis completed successfully")

            else:

                print(f"Directory analysis failed: {result.get('error', 'Unknown error')}")

    

    except Exception as e:

        print(f"Error: {e}")

        sys.exit(1)


if __name__ == "__main__":

    main()


Configuration and Usage Examples

The Agentic AI system can be configured and used in various ways depending on the specific requirements of the development team and project. The following examples demonstrate different usage scenarios and configuration options.

For basic usage with a remote LLM provider, developers can initialize the system with minimal configuration. The system requires an API key for services like OpenAI and can be configured to use different models based on performance and cost requirements. Here is an example of basic initialization and usage:


# Basic usage example with remote LLM

api_key = "your-openai-api-key"

project_path = "/path/to/your/project"


# Initialize the AI system

ai = AgenticRefactoringAI(

    project_root=project_path,

    llm_provider_type="remote",

    llm_config={

        "api_key": api_key,

        "model_name": "gpt-4"

    }

)


# Analyze a specific file

result = ai.analyze_file("src/main.py")


# Analyze entire project

result = ai.analyze_directory(

    file_patterns=["*.py", "*.js"],

    interactive=True

)


For organizations that prefer to keep their code private and have the computational resources available, the system can be configured to use local LLM models. This approach provides complete data privacy and eliminates dependencies on external services:


# Local LLM usage example

ai = AgenticRefactoringAI(

    project_root="/path/to/project",

    llm_provider_type="local",

    llm_config={

        "model_path": "/path/to/local/model",

        "device": "cuda"  # or "cpu"

    }

)


# The usage remains the same regardless of LLM provider

result = ai.analyze_file("complex_module.py")


The system provides extensive configuration options for customizing the analysis and refactoring behavior. Teams can adjust the sensitivity of different analysis components, specify which types of refactorings to prioritize, and configure backup retention policies:


# Advanced configuration example

class CustomAgenticAI(AgenticRefactoringAI):

    def __init__(self, project_root: str, **kwargs):

        super().__init__(project_root, **kwargs)

        

        # Customize analysis engine settings

        self.analysis_engine.complexity_threshold = 8

        self.analysis_engine.enable_performance_analysis = True

        self.analysis_engine.enable_security_analysis = True

        

        # Configure backup retention

        self.backup_manager.max_backups = 20

        self.backup_manager.max_age_days = 60

        

        # Set refactoring preferences

        self.decision_engine.preferred_patterns = [

            "extract_method",

            "simplify_conditional",

            "reduce_complexity"

        ]


# Usage with custom configuration

custom_ai = CustomAgenticAI(

    project_root="/path/to/project",

    llm_provider_type="remote",

    llm_config={"api_key": "your-key"}

)


Benefits and Limitations of the Agentic AI Approach

The Agentic AI approach to code refactoring offers significant advantages over traditional static analysis tools and manual refactoring processes. The system's ability to understand code semantics and context allows it to propose sophisticated improvements that go beyond simple pattern matching. The AI can identify complex architectural issues, suggest design pattern implementations, and propose optimizations that consider the broader codebase context.

The interactive nature of the system ensures that developers maintain control over the refactoring process while benefiting from AI insights. The comprehensive backup and rollback mechanisms provide safety nets that encourage experimentation with AI suggestions without fear of irreversible changes. The system's ability to work with both remote and local LLM providers offers flexibility in deployment scenarios, accommodating different security and privacy requirements.

However, the system also has important limitations that users must understand. The quality of refactoring suggestions depends heavily on the underlying LLM's training data and capabilities. The system may occasionally propose changes that, while syntactically correct, do not align with project-specific requirements or architectural decisions. The AI's understanding of business logic and domain-specific constraints may be limited, requiring careful human review of all suggestions.

The computational requirements for local LLM deployment can be substantial, particularly for larger models that provide better code understanding capabilities. Remote LLM usage introduces dependencies on external services and potential latency issues that may affect the user experience. The system's effectiveness may vary significantly across different programming languages and paradigms, with better performance typically observed for more commonly used languages like Python and JavaScript.


Future Considerations and Enhancements

The field of AI-assisted code refactoring continues to evolve rapidly, with several promising directions for future enhancement. Integration with advanced static analysis tools could provide the AI system with deeper insights into code quality metrics, dependency relationships, and architectural patterns. Machine learning techniques could be employed to learn from developer feedback and improve suggestion quality over time.

Enhanced integration with development environments and continuous integration pipelines could make the refactoring process more seamless and automated. The system could potentially be extended to support automated testing of refactored code, ensuring that changes maintain functional correctness while improving code quality.

The development of specialized models trained specifically on code refactoring tasks could significantly improve the quality and relevance of suggestions. These models could be fine-tuned on specific programming languages, frameworks, or domain-specific codebases to provide more targeted and accurate recommendations.

As the technology matures, we can expect to see more sophisticated understanding of code semantics, better integration with version control systems, and enhanced collaboration features that allow teams to share refactoring insights and best practices. The ultimate goal is to create AI systems that can serve as intelligent coding partners, helping developers maintain high-quality codebases while reducing the manual effort required for code improvement tasks.

The Agentic AI approach represents a significant step forward in automated code improvement, offering a balance between AI capabilities and human oversight that can significantly enhance software development productivity and code quality. As these systems continue to evolve, they will likely become essential tools in the modern software development toolkit, helping teams manage the growing complexity of software systems while maintaining high standards of code quality and maintainability.

No comments: