Friday, February 20, 2026

LLM-BASED FILESYSTEM OPTIMIZATION WITH TOOL INTEGRATION




Introduction and System Overview


The convergence of Large Language Models with system administration tasks represents a significant advancement in automated computing. This article explores the development of a Python-based application that leverages a local LLM to perform filesystem optimization across multiple platforms. The system identifies duplicate files, suggests optimizations, and executes cleanup operations while maintaining strict user control through permission requests.


The application operates by combining natural language processing capabilities with direct filesystem access through a carefully designed tool framework. Rather than requiring users to manually navigate complex file structures or remember command-line syntax, the system interprets user intentions and translates them into concrete filesystem operations.


Note: The LLM is mainly used for natural language processing and tool calls.


System Architecture


The architecture consists of several interconnected components that work together to provide a seamless experience. At the core sits the LLM interface, which processes user requests and generates appropriate tool calls. The tool framework translates these high-level instructions into platform-specific filesystem operations.


         +-------------------+

    |   User Interface  |

    +-------------------+

            |

            v

    +-------------------+

    |   LLM Controller  |

    +-------------------+

            |

            v

    +-------------------+

     Tool Framework   |

    +-------------------+

            |

    +----+----+----+----+

    |    |    |    |    |

    v    v    v    v    v

    Win  Mac  Linux iOS Android


The modular design ensures that platform-specific implementations remain isolated while sharing common interfaces. This separation of concerns allows for easier maintenance and extension of the system.


Local LLM Integration


The integration with a local LLM requires careful consideration of model loading, context management, and response parsing. We utilize the transformers library for model management and implement a custom wrapper to handle tool-specific interactions.


    import torch

    from transformers import AutoModelForCausalLM, AutoTokenizer

    import json

    from typing import Dict, List, Any

    

    class LocalLLMInterface:

        """Manages interaction with the local language model"""

        

        def __init__(self, model_path: str):

            """Initialize the LLM with specified model path

            

            Args:

                model_path: Path to the local model directory

            """

            self.tokenizer = AutoTokenizer.from_pretrained(model_path)

            self.model = AutoModelForCausalLM.from_pretrained(

                model_path,

                torch_dtype=torch.float16,

                device_map="auto"

            )

            self.system_prompt = self._load_system_prompt()

            

        def _load_system_prompt(self) -> str:

            """Load the system prompt that defines tool usage"""

            return """You are a filesystem optimization assistant. You help users 

            identify and remove duplicate files. Always ask for permission before 

            performing any operation. Use the provided tools to interact with the 

            filesystem. Format tool calls as JSON objects."""


The LocalLLMInterface class encapsulates all LLM-specific operations. The initialization process loads both the tokenizer and model, configuring them for efficient inference on available hardware. The system prompt establishes the behavioral framework for the model, ensuring it understands its role and constraints.


The Tool Framework


The tool framework provides a structured way for the LLM to interact with the filesystem. Each tool represents a specific capability, such as listing files, calculating checksums, or removing duplicates. The framework ensures that all operations are logged and reversible when possible.


    from abc import ABC, abstractmethod

    from dataclasses import dataclass

    from enum import Enum

    import hashlib

    import os

    

    class ToolStatus(Enum):

        """Enumeration of possible tool execution statuses"""

        SUCCESS = "success"

        FAILURE = "failure"

        REQUIRES_PERMISSION = "requires_permission"

        REQUIRES_CREDENTIALS = "requires_credentials"

    

    @dataclass

    class ToolResult:

        """Encapsulates the result of a tool execution"""

        status: ToolStatus

        data: Any

        message: str

        requires_action: Dict[str, Any] = None

    

    class BaseTool(ABC):

        """Abstract base class for all filesystem tools"""

        

        def __init__(self, permission_manager):

            """Initialize tool with permission manager

            

            Args:

                permission_manager: Instance handling user permissions

            """

            self.permission_manager = permission_manager

            self.name = self.__class__.__name__

            

        @abstractmethod

        def execute(self, **kwargs) -> ToolResult:

            """Execute the tool operation with given parameters"""

            pass

            

        @abstractmethod

        def get_description(self) -> str:

            """Return human-readable description of tool functionality"""

            pass


The tool framework establishes a consistent interface for all filesystem operations. The ToolStatus enumeration clearly defines possible outcomes, while the ToolResult dataclass provides a structured way to return results along with any required follow-up actions.


Cross-Platform Filesystem Access


Achieving true cross-platform compatibility requires abstracting platform-specific filesystem operations behind a common interface. The system detects the current platform and loads appropriate implementations dynamically.


    import platform

    import sys

    from pathlib import Path

    

    class PlatformDetector:

        """Detects and provides platform-specific functionality"""

        

        @staticmethod

        def get_platform() -> str:

            """Detect the current operating system

            

            Returns:

                String identifier for the platform

            """

            system = platform.system().lower()

            

            # Check for mobile platforms through additional indicators

            if hasattr(sys, 'getandroidapilevel'):

                return 'android'

            elif system == 'darwin' and platform.machine() == 'arm64':

                # Check for iOS through platform characteristics

                import subprocess

                try:

                    result = subprocess.run(['uname', '-a'], 

                                          capture_output=True, 

                                          text=True)

                    if 'iPhone' in result.stdout or 'iPad' in result.stdout:

                        return 'ios'

                except:

                    pass

                    

            return system

    

    class FilesystemAbstraction:

        """Provides unified filesystem operations across platforms"""

        

        def __init__(self):

            """Initialize platform-specific filesystem handler"""

            self.platform = PlatformDetector.get_platform()

            self._initialize_platform_specific()

            

        def _initialize_platform_specific(self):

            """Load platform-specific implementations"""

            if self.platform == 'windows':

                self.path_separator = '\\'

                self.home_dir = Path.home()

            elif self.platform in ['darwin', 'linux']:

                self.path_separator = '/'

                self.home_dir = Path.home()

            elif self.platform == 'android':

                # Android requires special handling

                self.path_separator = '/'

                self.home_dir = Path('/storage/emulated/0')

            elif self.platform == 'ios':

                # iOS has restricted filesystem access

                self.path_separator = '/'

                self.home_dir = Path.home() / 'Documents'


The PlatformDetector class employs various heuristics to accurately identify the running platform. Mobile platforms require special detection logic since they may report as their underlying Unix-like systems. The FilesystemAbstraction class then configures platform-specific parameters based on this detection.


Duplicate Detection Algorithm



The core functionality revolves around efficiently detecting duplicate files across the filesystem. The algorithm uses a multi-stage approach to minimize unnecessary computations while ensuring accuracy.


    import hashlib

    from collections import defaultdict

    from typing import List, Tuple, Set

    

    class DuplicateDetector:

        """Implements efficient duplicate file detection"""

        

        def __init__(self, filesystem: FilesystemAbstraction):

            """Initialize detector with filesystem abstraction

            

            Args:

                filesystem: Platform-specific filesystem handler

            """

            self.filesystem = filesystem

            self.chunk_size = 8192  # Read files in 8KB chunks

            

        def find_duplicates(self, root_path: Path, 

                          progress_callback=None) -> Dict[str, List[Path]]:

            """Find all duplicate files under the given path

            

            Args:

                root_path: Starting directory for search

                progress_callback: Optional callback for progress updates

                

            Returns:

                Dictionary mapping file hashes to lists of duplicate paths

            """

            # First pass: Group files by size

            size_map = defaultdict(list)

            total_files = 0

            

            for file_path in self._walk_directory(root_path):

                try:

                    size = file_path.stat().st_size

                    size_map[size].append(file_path)

                    total_files += 1

                    

                    if progress_callback and total_files % 100 == 0:

                        progress_callback(f"Scanned {total_files} files...")

                except (OSError, PermissionError):

                    continue

            

            # Second pass: Calculate hashes only for potential duplicates

            hash_map = defaultdict(list)

            processed = 0

            

            for size, file_list in size_map.items():

                if len(file_list) < 2:

                    continue  # Skip unique file sizes

                    

                for file_path in file_list:

                    file_hash = self._calculate_hash(file_path)

                    if file_hash:

                        hash_map[file_hash].append(file_path)

                    

                    processed += 1

                    if progress_callback:

                        progress_callback(

                            f"Processing potential duplicates: "

                            f"{processed}/{total_files}"

                        )

            

            # Filter out non-duplicates

            duplicates = {

                hash_val: paths 

                for hash_val, paths in hash_map.items() 

                if len(paths) > 1

            }

            

            return duplicates

        

        def _calculate_hash(self, file_path: Path) -> str:

            """Calculate SHA-256 hash of a file

            

            Args:

                file_path: Path to the file

                

            Returns:

                Hexadecimal hash string or None on error

            """

            hasher = hashlib.sha256()

            

            try:

                with open(file_path, 'rb') as f:

                    while chunk := f.read(self.chunk_size):

                        hasher.update(chunk)

                return hasher.hexdigest()

            except (OSError, PermissionError):

                return None


The duplicate detection algorithm employs a two-phase approach to optimize performance. In the first phase, files are grouped by size, dramatically reducing the number of files that need full content comparison. Only files with identical sizes proceed to the second phase, where SHA-256 hashes are calculated to definitively identify duplicates.


Permission Management System



User consent forms the cornerstone of the application's ethical operation. The permission management system ensures that no action is taken without explicit user approval, maintaining transparency throughout the process.


    from datetime import datetime

    from typing import Optional, Callable

    import json

    

    class PermissionManager:

        """Manages user permissions for filesystem operations"""

        

        def __init__(self, ui_callback: Callable):

            """Initialize permission manager with UI callback

            

            Args:

                ui_callback: Function to interact with user

            """

            self.ui_callback = ui_callback

            self.permission_cache = {}

            self.audit_log = []

            

        def request_permission(self, action: str, 

                             details: Dict[str, Any]) -> bool:

            """Request user permission for a specific action

            

            Args:

                action: Description of the action

                details: Additional context about the operation

                

            Returns:

                Boolean indicating whether permission was granted

            """

            # Check cache for blanket permissions

            cache_key = self._generate_cache_key(action, details)

            if cache_key in self.permission_cache:

                return self.permission_cache[cache_key]

            

            # Format the permission request

            request_message = self._format_permission_request(action, details)

            

            # Get user response through UI callback

            response = self.ui_callback({

                'type': 'permission_request',

                'message': request_message,

                'options': ['Allow', 'Deny', 'Always Allow', 'Always Deny']

            })

            

            # Process response

            granted = response in ['Allow', 'Always Allow']

            

            # Cache if permanent permission given

            if response in ['Always Allow', 'Always Deny']:

                self.permission_cache[cache_key] = granted

            

            # Log the decision

            self._log_permission_decision(action, details, granted)

            

            return granted

        

        def _format_permission_request(self, action: str, 

                                     details: Dict[str, Any]) -> str:

            """Format a user-friendly permission request message"""

            message = f"Permission requested for: {action}\n\n"

            

            if 'files' in details:

                message += f"Affected files: {len(details['files'])}\n"

                # Show first few files as examples

                for i, file in enumerate(details['files'][:3]):

                    message += f"  - {file}\n"

                if len(details['files']) > 3:

                    message += f"  ... and {len(details['files']) - 3} more\n"

            

            if 'size' in details:

                message += f"Total size: {self._format_size(details['size'])}\n"

            

            return message


The PermissionManager class implements a sophisticated consent system that balances security with usability. The caching mechanism allows users to grant blanket permissions for repeated operations while maintaining an audit trail of all decisions. The formatting method ensures that permission requests are clear and informative.


Security and Credential Handling



When operations require elevated privileges, the system must securely handle credentials without compromising user security. The credential management system employs industry-standard practices for secure storage and transmission.


    import getpass

    import keyring

    from cryptography.fernet import Fernet

    from cryptography.hazmat.primitives import hashes

    from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

    import base64

    import os

    

    class CredentialManager:

        """Securely manages credentials for elevated operations"""

        

        def __init__(self, app_id: str):

            """Initialize credential manager with application ID

            

            Args:

                app_id: Unique identifier for the application

            """

            self.app_id = app_id

            self.session_key = self._generate_session_key()

            self.cipher_suite = Fernet(self.session_key)

            

        def _generate_session_key(self) -> bytes:

            """Generate a session-specific encryption key"""

            salt = os.urandom(16)

            kdf = PBKDF2HMAC(

                algorithm=hashes.SHA256(),

                length=32,

                salt=salt,

                iterations=100000,

            )

            key = base64.urlsafe_b64encode(kdf.derive(os.urandom(32)))

            return key

        

        def request_credentials(self, purpose: str, 

                              credential_type: str) -> Optional[Dict[str, str]]:

            """Request credentials from user for specific purpose

            

            Args:

                purpose: Description of why credentials are needed

                credential_type: Type of credentials required

                

            Returns:

                Dictionary containing credentials or None if cancelled

            """

            # Check keyring for stored credentials

            stored = self._check_stored_credentials(credential_type)

            if stored:

                return stored

            

            # Request from user

            print(f"\nCredentials required for: {purpose}")

            print(f"Credential type: {credential_type}")

            

            credentials = {}

            

            if credential_type == 'sudo':

                credentials['password'] = getpass.getpass("Enter sudo password: ")

            elif credential_type == 'windows_admin':

                credentials['username'] = input("Enter administrator username: ")

                credentials['password'] = getpass.getpass("Enter password: ")

            

            # Offer to store securely

            store = input("Store credentials securely for future use? (y/n): ")

            if store.lower() == 'y':

                self._store_credentials(credential_type, credentials)

            

            return credentials

        

        def _store_credentials(self, credential_type: str, 

                             credentials: Dict[str, str]):

            """Securely store credentials using system keyring"""

            try:

                # Encrypt sensitive data before storage

                encrypted = {}

                for key, value in credentials.items():

                    if key == 'password':

                        encrypted[key] = self.cipher_suite.encrypt(

                            value.encode()

                        ).decode()

                    else:

                        encrypted[key] = value

                

                # Store in system keyring

                keyring.set_password(

                    self.app_id,

                    credential_type,

                    json.dumps(encrypted)

                )

            except Exception as e:

                print(f"Warning: Could not store credentials: {e}")


The CredentialManager implements defense-in-depth security principles. Session-specific encryption keys ensure that credentials are protected even in memory, while integration with the system keyring provides secure persistent storage. The class never logs or displays sensitive information and implements proper cleanup procedures.


User Interaction Flow


The user experience design prioritizes clarity and control. Each interaction follows a consistent pattern that keeps users informed about ongoing operations and upcoming actions.


    class UserInterface:

        """Manages all user interactions for the filesystem optimizer"""

        

        def __init__(self):

            """Initialize the user interface components"""

            self.permission_manager = PermissionManager(self.display_prompt)

            self.credential_manager = CredentialManager("filesystem_optimizer")

            self.current_operation = None

            

        def display_prompt(self, prompt_data: Dict[str, Any]) -> str:

            """Display a prompt to the user and get response

            

            Args:

                prompt_data: Dictionary containing prompt information

                

            Returns:

                User's response as a string

            """

            prompt_type = prompt_data.get('type')

            

            if prompt_type == 'permission_request':

                return self._handle_permission_prompt(prompt_data)

            elif prompt_type == 'credential_request':

                return self._handle_credential_prompt(prompt_data)

            elif prompt_type == 'confirmation':

                return self._handle_confirmation_prompt(prompt_data)

            else:

                return input(prompt_data.get('message', 'Enter response: '))

        

        def show_duplicate_summary(self, duplicates: Dict[str, List[Path]]):

            """Display a summary of found duplicates to the user"""

            total_files = sum(len(paths) for paths in duplicates.values())

            total_groups = len(duplicates)

            

            # Calculate space savings

            total_waste = 0

            for file_hash, paths in duplicates.items():

                if paths:

                    file_size = paths[0].stat().st_size

                    total_waste += file_size * (len(paths) - 1)

            

            print(f"\n{'='*60}")

            print(f"DUPLICATE FILE SUMMARY")

            print(f"{'='*60}")

            print(f"Total duplicate groups found: {total_groups}")

            print(f"Total duplicate files: {total_files}")

            print(f"Potential space savings: {self._format_bytes(total_waste)}")

            print(f"{'='*60}\n")

            

            # Show details for each group

            for i, (file_hash, paths) in enumerate(duplicates.items(), 1):

                if i > 10:  # Limit display to first 10 groups

                    print(f"\n... and {len(duplicates) - 10} more groups")

                    break

                    

                print(f"\nGroup {i} ({len(paths)} files):")

                file_size = paths[0].stat().st_size

                print(f"  Size: {self._format_bytes(file_size)}")

                print(f"  Files:")

                for path in paths[:5]:  # Show up to 5 files per group

                    print(f"    - {path}")

                if len(paths) > 5:

                    print(f"    ... and {len(paths) - 5} more")


The UserInterface class orchestrates all user interactions, ensuring a consistent experience throughout the application. The duplicate summary method presents information in a hierarchical manner, starting with high-level statistics before drilling down into specific examples. This approach helps users understand the scope of the optimization opportunity without overwhelming them with details.


Implementation of Core Tools


With the foundation established, we can implement the specific tools that perform filesystem operations. Each tool inherits from the BaseTool class and implements its specific functionality.


    class ScanDirectoryTool(BaseTool):

        """Tool for scanning directories to find files"""

        

        def __init__(self, permission_manager, filesystem):

            """Initialize with required components

            

            Args:

                permission_manager: Permission management instance

                filesystem: Platform-specific filesystem handler

            """

            super().__init__(permission_manager)

            self.filesystem = filesystem

            

        def execute(self, path: str, recursive: bool = True) -> ToolResult:

            """Scan directory for files

            

            Args:

                path: Directory path to scan

                recursive: Whether to scan subdirectories

                

            Returns:

                ToolResult containing scan results

            """

            target_path = Path(path).expanduser().resolve()

            

            # Check if path exists and is accessible

            if not target_path.exists():

                return ToolResult(

                    status=ToolStatus.FAILURE,

                    data=None,

                    message=f"Path does not exist: {target_path}"

                )

            

            # Request permission to scan

            permission_granted = self.permission_manager.request_permission(

                "Scan directory for files",

                {'path': str(target_path), 'recursive': recursive}

            )

            

            if not permission_granted:

                return ToolResult(

                    status=ToolStatus.FAILURE,

                    data=None,

                    message="Permission denied by user"

                )

            

            # Perform the scan

            try:

                files = []

                if recursive:

                    for file_path in target_path.rglob('*'):

                        if file_path.is_file():

                            files.append({

                                'path': str(file_path),

                                'size': file_path.stat().st_size,

                                'modified': file_path.stat().st_mtime

                            })

                else:

                    for file_path in target_path.iterdir():

                        if file_path.is_file():

                            files.append({

                                'path': str(file_path),

                                'size': file_path.stat().st_size,

                                'modified': file_path.stat().st_mtime

                            })

                

                return ToolResult(

                    status=ToolStatus.SUCCESS,

                    data={'files': files, 'count': len(files)},

                    message=f"Successfully scanned {len(files)} files"

                )

                

            except PermissionError:

                return ToolResult(

                    status=ToolStatus.REQUIRES_CREDENTIALS,

                    data=None,

                    message="Elevated permissions required",

                    requires_action={'type': 'credentials', 'purpose': 'scan'}

                )

        

        def get_description(self) -> str:

            """Return tool description"""

            return "Scans directories to find and catalog files"

    

    

    class RemoveDuplicatesTool(BaseTool):

        """Tool for removing duplicate files"""

        

        def __init__(self, permission_manager, filesystem):

            """Initialize with required components"""

            super().__init__(permission_manager)

            self.filesystem = filesystem

            self.removed_files = []

            

        def execute(self, duplicates: Dict[str, List[Path]], 

                   strategy: str = 'keep_oldest') -> ToolResult:

            """Remove duplicate files based on specified strategy

            

            Args:

                duplicates: Dictionary mapping hashes to duplicate paths

                strategy: Strategy for choosing which file to keep

                

            Returns:

                ToolResult containing removal results

            """

            # Validate strategy

            valid_strategies = ['keep_oldest', 'keep_newest', 'keep_shortest_path']

            if strategy not in valid_strategies:

                return ToolResult(

                    status=ToolStatus.FAILURE,

                    data=None,

                    message=f"Invalid strategy. Choose from: {valid_strategies}"

                )

            

            # Calculate which files to remove

            files_to_remove = []

            for file_hash, paths in duplicates.items():

                if len(paths) < 2:

                    continue

                    

                # Sort paths based on strategy

                if strategy == 'keep_oldest':

                    sorted_paths = sorted(paths, key=lambda p: p.stat().st_mtime)

                elif strategy == 'keep_newest':

                    sorted_paths = sorted(paths, key=lambda p: p.stat().st_mtime, 

                                        reverse=True)

                else:  # keep_shortest_path

                    sorted_paths = sorted(paths, key=lambda p: len(str(p)))

                

                # Keep first, remove rest

                files_to_remove.extend(sorted_paths[1:])

            

            # Request permission with detailed information

            total_size = sum(p.stat().st_size for p in files_to_remove)

            permission_granted = self.permission_manager.request_permission(

                f"Remove {len(files_to_remove)} duplicate files",

                {

                    'files': [str(p) for p in files_to_remove],

                    'size': total_size,

                    'strategy': strategy

                }

            )

            

            if not permission_granted:

                return ToolResult(

                    status=ToolStatus.FAILURE,

                    data=None,

                    message="Permission denied by user"

                )

            

            # Perform removal

            removed_count = 0

            failed_removals = []

            

            for file_path in files_to_remove:

                try:

                    # Create backup record before removal

                    self._create_removal_record(file_path)

                    

                    # Remove the file

                    file_path.unlink()

                    removed_count += 1

                    self.removed_files.append(str(file_path))

                    

                except PermissionError:

                    failed_removals.append({

                        'path': str(file_path),

                        'reason': 'permission_denied'

                    })

                except Exception as e:

                    failed_removals.append({

                        'path': str(file_path),

                        'reason': str(e)

                    })

            

            # Prepare result

            if failed_removals:

                status = ToolStatus.REQUIRES_CREDENTIALS if any(

                    f['reason'] == 'permission_denied' for f in failed_removals

                ) else ToolStatus.FAILURE

            else:

                status = ToolStatus.SUCCESS

            

            return ToolResult(

                status=status,

                data={

                    'removed': removed_count,

                    'failed': failed_removals,

                    'space_freed': total_size

                },

                message=f"Removed {removed_count} files, freed {total_size} bytes"

            )

        

        def _create_removal_record(self, file_path: Path):

            """Create a record of file removal for potential recovery"""

            record = {

                'path': str(file_path),

                'size': file_path.stat().st_size,

                'hash': self._quick_hash(file_path),

                'removed_at': datetime.now().isoformat()

            }

            

            # Store record in application data directory

            app_data_dir = Path.home() / '.filesystem_optimizer' / 'removed'

            app_data_dir.mkdir(parents=True, exist_ok=True)

            

            record_file = app_data_dir / f"{datetime.now().timestamp()}.json"

            with open(record_file, 'w') as f:

                json.dump(record, f, indent=2)


The tool implementations demonstrate the careful balance between functionality and safety. Each tool validates inputs, requests appropriate permissions, and handles errors gracefully. The RemoveDuplicatesTool even creates removal records to enable potential recovery of accidentally deleted files.


LLM Controller Implementation


The LLM controller orchestrates the entire system, interpreting user requests and coordinating tool execution. This component bridges the gap between natural language understanding and concrete system actions.


    class LLMController:

        """Controls the LLM and coordinates tool execution"""

        

        def __init__(self, model_path: str):

            """Initialize controller with model path

            

            Args:

                model_path: Path to the local LLM model

            """

            self.llm = LocalLLMInterface(model_path)

            self.ui = UserInterface()

            self.filesystem = FilesystemAbstraction()

            

            # Initialize tools

            self.tools = {

                'scan_directory': ScanDirectoryTool(

                    self.ui.permission_manager, 

                    self.filesystem

                ),

                'find_duplicates': DuplicateDetector(self.filesystem),

                'remove_duplicates': RemoveDuplicatesTool(

                    self.ui.permission_manager,

                    self.filesystem

                )

            }

            

            self.conversation_history = []

            

        def process_request(self, user_input: str) -> str:

            """Process a user request and return response

            

            Args:

                user_input: Natural language request from user

                

            Returns:

                Natural language response

            """

            # Add to conversation history

            self.conversation_history.append({

                'role': 'user',

                'content': user_input

            })

            

            # Generate LLM response with tool calls

            llm_response = self._generate_llm_response(user_input)

            

            # Parse and execute any tool calls

            if 'tool_calls' in llm_response:

                tool_results = self._execute_tool_calls(llm_response['tool_calls'])

                

                # Generate final response based on tool results

                final_response = self._generate_final_response(

                    user_input, 

                    tool_results

                )

            else:

                final_response = llm_response['content']

            

            # Add to conversation history

            self.conversation_history.append({

                'role': 'assistant',

                'content': final_response

            })

            

            return final_response

        

        def _generate_llm_response(self, user_input: str) -> Dict[str, Any]:

            """Generate LLM response potentially including tool calls"""

            # Prepare the prompt with conversation history

            messages = [

                {'role': 'system', 'content': self.llm.system_prompt}

            ] + self.conversation_history[-10:]  # Keep last 10 messages

            

            # Add tool descriptions

            tool_descriptions = self._get        tool_descriptions = self._get_tool_descriptions()

        messages.append({

            'role': 'system',

            'content': f"Available tools:\n{tool_descriptions}"

        })

        

        # Generate response

        response = self.llm.generate(messages)

        

        # Parse response for tool calls

        parsed = self._parse_llm_response(response)

        return parsed

    

    def _get_tool_descriptions(self) -> str:

        """Get formatted descriptions of all available tools"""

        descriptions = []

        for name, tool in self.tools.items():

            if hasattr(tool, 'get_description'):

                descriptions.append(f"{name}: {tool.get_description()}")

        return "\n".join(descriptions)

    

    def _parse_llm_response(self, response: str) -> Dict[str, Any]:

        """Parse LLM response to extract tool calls and content"""

        result = {'content': response, 'tool_calls': []}

        

        # Look for JSON tool call blocks

        import re

        tool_pattern = r'```json\n(.*?)\n```'

        matches = re.findall(tool_pattern, response, re.DOTALL)

        

        for match in matches:

            try:

                tool_call = json.loads(match)

                if 'tool' in tool_call and 'parameters' in tool_call:

                    result['tool_calls'].append(tool_call)

            except json.JSONDecodeError:

                continue

        

        # Remove tool calls from content

        result['content'] = re.sub(tool_pattern, '', response).strip()

        

        return result

    

    def _execute_tool_calls(self, tool_calls: List[Dict[str, Any]]) -> List[ToolResult]:

        """Execute a list of tool calls and return results"""

        results = []

        

        for call in tool_calls:

            tool_name = call.get('tool')

            parameters = call.get('parameters', {})

            

            if tool_name in self.tools:

                tool = self.tools[tool_name]

                

                # Special handling for find_duplicates

                if tool_name == 'find_duplicates':

                    result = self._handle_duplicate_finding(parameters)

                else:

                    result = tool.execute(**parameters)

                

                results.append(result)

                

                # Handle special cases

                if result.status == ToolStatus.REQUIRES_CREDENTIALS:

                    credentials = self.ui.credential_manager.request_credentials(

                        result.requires_action['purpose'],

                        result.requires_action['type']

                    )

                    if credentials:

                        # Retry with elevated permissions

                        # Implementation depends on platform

                        pass

            else:

                results.append(ToolResult(

                    status=ToolStatus.FAILURE,

                    data=None,

                    message=f"Unknown tool: {tool_name}"

                ))

        

        return results

    

    def _handle_duplicate_finding(self, parameters: Dict[str, Any]) -> ToolResult:

        """Special handling for duplicate finding operation"""

        path = parameters.get('path', '.')

        target_path = Path(path).expanduser().resolve()

        

        # Request permission

        permission_granted = self.ui.permission_manager.request_permission(

            "Scan for duplicate files",

            {'path': str(target_path)}

        )

        

        if not permission_granted:

            return ToolResult(

                status=ToolStatus.FAILURE,

                data=None,

                message="Permission denied by user"

            )

        

        # Find duplicates with progress callback

        print("\nScanning for duplicate files...")

        detector = self.tools['find_duplicates']

        duplicates = detector.find_duplicates(

            target_path,

            progress_callback=lambda msg: print(f"\r{msg}", end="", flush=True)

        )

        print()  # New line after progress

        

        # Show summary to user

        if duplicates:

            self.ui.show_duplicate_summary(duplicates)

        

        return ToolResult(

            status=ToolStatus.SUCCESS,

            data={'duplicates': duplicates},

            message=f"Found {len(duplicates)} groups of duplicate files"

        )



Full Source Code


#!/usr/bin/env python3

"""

Filesystem Optimizer - LLM-powered duplicate file finder and remover


This application uses a local LLM to intelligently find and remove duplicate

files across different platforms while maintaining user control through

permission requests.

"""


import os

import sys

import json

import hashlib

import platform

import getpass

from pathlib import Path

from datetime import datetime

from collections import defaultdict

from typing import Dict, List, Any, Optional, Tuple, Callable

from dataclasses import dataclass

from enum import Enum

from abc import ABC, abstractmethod


# For LLM integration

import torch

from transformers import AutoModelForCausalLM, AutoTokenizer


# For secure credential storage

try:

    import keyring

    KEYRING_AVAILABLE = True

except ImportError:

    KEYRING_AVAILABLE = False

    print("Warning: keyring module not available. Credentials won't be stored.")



# Tool Framework

class ToolStatus(Enum):

    """Enumeration of possible tool execution statuses"""

    SUCCESS = "success"

    FAILURE = "failure"

    REQUIRES_PERMISSION = "requires_permission"

    REQUIRES_CREDENTIALS = "requires_credentials"



@dataclass

class ToolResult:

    """Encapsulates the result of a tool execution"""

    status: ToolStatus

    data: Any

    message: str

    requires_action: Dict[str, Any] = None



class BaseTool(ABC):

    """Abstract base class for all filesystem tools"""

    

    def __init__(self, permission_manager):

        """Initialize tool with permission manager"""

        self.permission_manager = permission_manager

        self.name = self.__class__.__name__

        

    @abstractmethod

    def execute(self, **kwargs) -> ToolResult:

        """Execute the tool operation with given parameters"""

        pass

        

    @abstractmethod

    def get_description(self) -> str:

        """Return human-readable description of tool functionality"""

        pass



# Platform Detection and Abstraction

class PlatformDetector:

    """Detects and provides platform-specific functionality"""

    

    @staticmethod

    def get_platform() -> str:

        """Detect the current operating system"""

        system = platform.system().lower()

        

        # Check for mobile platforms

        if hasattr(sys, 'getandroidapilevel'):

            return 'android'

        elif system == 'darwin' and platform.machine() == 'arm64':

            # Additional iOS detection would go here

            pass

            

        return system



class FilesystemAbstraction:

    """Provides unified filesystem operations across platforms"""

    

    def __init__(self):

        """Initialize platform-specific filesystem handler"""

        self.platform = PlatformDetector.get_platform()

        self._initialize_platform_specific()

        

    def _initialize_platform_specific(self):

        """Load platform-specific implementations"""

        if self.platform == 'windows':

            self.path_separator = '\\'

            self.home_dir = Path.home()

        elif self.platform in ['darwin', 'linux']:

            self.path_separator = '/'

            self.home_dir = Path.home()

        elif self.platform == 'android':

            self.path_separator = '/'

            self.home_dir = Path('/storage/emulated/0')



# Permission Management

class PermissionManager:

    """Manages user permissions for filesystem operations"""

    

    def __init__(self, ui_callback: Callable):

        """Initialize permission manager with UI callback"""

        self.ui_callback = ui_callback

        self.permission_cache = {}

        self.audit_log = []

        

    def request_permission(self, action: str, details: Dict[str, Any]) -> bool:

        """Request user permission for a specific action"""

        # Check cache

        cache_key = f"{action}:{details.get('path', '')}"

        if cache_key in self.permission_cache:

            return self.permission_cache[cache_key]

        

        # Format request

        request_message = self._format_permission_request(action, details)

        

        # Get user response

        response = self.ui_callback({

            'type': 'permission_request',

            'message': request_message,

            'options': ['Allow', 'Deny', 'Always Allow', 'Always Deny']

        })

        

        # Process response

        granted = response in ['Allow', 'Always Allow']

        

        # Cache if permanent

        if response in ['Always Allow', 'Always Deny']:

            self.permission_cache[cache_key] = granted

        

        # Log decision

        self.audit_log.append({

            'timestamp': datetime.now().isoformat(),

            'action': action,

            'granted': granted

        })

        

        return granted

    

    def _format_permission_request(self, action: str, 

                                 details: Dict[str, Any]) -> str:

        """Format a user-friendly permission request message"""

        message = f"Permission requested for: {action}\n\n"

        

        if 'files' in details:

            message += f"Affected files: {len(details['files'])}\n"

            for i, file in enumerate(details['files'][:3]):

                message += f"  - {file}\n"

            if len(details['files']) > 3:

                message += f"  ... and {len(details['files']) - 3} more\n"

        

        if 'size' in details:

            size_mb = details['size'] / (1024 * 1024)

            message += f"Total size: {size_mb:.2f} MB\n"

        

        return message



# Credential Management

class CredentialManager:

    """Securely manages credentials for elevated operations"""

    

    def __init__(self, app_id: str):

        """Initialize credential manager with application ID"""

        self.app_id = app_id

        

    def request_credentials(self, purpose: str, 

                          credential_type: str) -> Optional[Dict[str, str]]:

        """Request credentials from user for specific purpose"""

        # Check stored credentials if keyring available

        if KEYRING_AVAILABLE:

            try:

                stored = keyring.get_password(self.app_id, credential_type)

                if stored:

                    return json.loads(stored)

            except Exception:

                pass

        

        # Request from user

        print(f"\nCredentials required for: {purpose}")

        print(f"Credential type: {credential_type}")

        

        credentials = {}

        

        if credential_type == 'sudo':

            credentials['password'] = getpass.getpass("Enter sudo password: ")

        elif credential_type == 'windows_admin':

            credentials['username'] = input("Enter administrator username: ")

            credentials['password'] = getpass.getpass("Enter password: ")

        

        # Offer to store

        if KEYRING_AVAILABLE:

            store = input("Store credentials securely for future use? (y/n): ")

            if store.lower() == 'y':

                try:

                    keyring.set_password(

                        self.app_id,

                        credential_type,

                        json.dumps(credentials)

                    )

                except Exception as e:

                    print(f"Warning: Could not store credentials: {e}")

        

        return credentials



# Duplicate Detection

class DuplicateDetector:

    """Implements efficient duplicate file detection"""

    

    def __init__(self, filesystem: FilesystemAbstraction):

        """Initialize detector with filesystem abstraction"""

        self.filesystem = filesystem

        self.chunk_size = 8192

        

    def find_duplicates(self, root_path: Path, 

                      progress_callback=None) -> Dict[str, List[Path]]:

        """Find all duplicate files under the given path"""

        # First pass: Group by size

        size_map = defaultdict(list)

        total_files = 0

        

        for file_path in self._walk_directory(root_path):

            try:

                size = file_path.stat().st_size

                size_map[size].append(file_path)

                total_files += 1

                

                if progress_callback and total_files % 100 == 0:

                    progress_callback(f"Scanned {total_files} files...")

            except (OSError, PermissionError):

                continue

        

        # Second pass: Calculate hashes for potential duplicates

        hash_map = defaultdict(list)

        processed = 0

        

        for size, file_list in size_map.items():

            if len(file_list) < 2:

                continue

                

            for file_path in file_list:

                file_hash = self._calculate_hash(file_path)

                if file_hash:

                    hash_map[file_hash].append(file_path)

                

                processed += 1

                if progress_callback:

                    progress_callback(

                        f"Processing potential duplicates: {processed}/{total_files}"

                    )

        

        # Filter out non-duplicates

        duplicates = {

            hash_val: paths 

            for hash_val, paths in hash_map.items() 

            if len(paths) > 1

        }

        

        return duplicates

    

    def _walk_directory(self, root_path: Path):

        """Walk directory tree yielding file paths"""

        try:

            for item in root_path.rglob('*'):

                if item.is_file():

                    yield item

        except PermissionError:

            pass

    

    def _calculate_hash(self, file_path: Path) -> Optional[str]:

        """Calculate SHA-256 hash of a file"""

        hasher = hashlib.sha256()

        

        try:

            with open(file_path, 'rb') as f:

                while chunk := f.read(self.chunk_size):

                    hasher.update(chunk)

            return hasher.hexdigest()

        except (OSError, PermissionError):

            return None



# User Interface

class UserInterface:

    """Manages all user interactions for the filesystem optimizer"""

    

    def __init__(self):

        """Initialize the user interface components"""

        self.permission_manager = PermissionManager(self.display_prompt)

        self.credential_manager = CredentialManager("filesystem_optimizer")

        

    def display_prompt(self, prompt_data: Dict[str, Any]) -> str:

        """Display a prompt to the user and get response"""

        prompt_type = prompt_data.get('type')

        

        if prompt_type == 'permission_request':

            print("\n" + "="*60)

            print("PERMISSION REQUEST")

            print("="*60)

            print(prompt_data['message'])

            print("\nOptions:", ", ".join(prompt_data['options']))

            

            while True:

                response = input("Your choice: ").strip()

                if response in prompt_data['options']:

                    return response

                print("Invalid choice. Please try again.")

        

        return input(prompt_data.get('message', 'Enter response: '))

    

    def show_duplicate_summary(self, duplicates: Dict[str, List[Path]]):

        """Display a summary of found duplicates to the user"""

        total_files = sum(len(paths) for paths in duplicates.values())

        total_groups = len(duplicates)

        

        # Calculate space savings

        total_waste = 0

        for file_hash, paths in duplicates.items():

            if paths:

                file_size = paths[0].stat().st_size

                total_waste += file_size * (len(paths) - 1)

        

        print(f"\n{'='*60}")

        print(f"DUPLICATE FILE SUMMARY")

        print(f"{'='*60}")

        print(f"Total duplicate groups found: {total_groups}")

        print(f"Total duplicate files: {total_files}")

        print(f"Potential space savings: {self._format_bytes(total_waste)}")

        print(f"{'='*60}\n")

        

        # Show details for each group

        for i, (file_hash, paths) in enumerate(duplicates.items(), 1):

            if i > 10:

                print(f"\n... and {len(duplicates) - 10} more groups")

                break

                

            print(f"\nGroup {i} ({len(paths)} files):")

            file_size = paths[0].stat().st_size

            print(f"  Size: {self._format_bytes(file_size)}")

            print(f"  Files:")

            for path in paths[:5]:

                print(f"    - {path}")

            if len(paths) > 5:

                print(f"    ... and {len(paths) - 5} more")

    

    def _format_bytes(self, size: int) -> str:

        """Format byte size in human-readable form"""

        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:

            if size < 1024.0:

                return f"{size:.2f} {unit}"

            size /= 1024.0

        return f"{size:.2f} PB"



# Tool Implementations

class ScanDirectoryTool(BaseTool):

    """Tool for scanning directories to find files"""

    

    def __init__(self, permission_manager, filesystem):

        """Initialize with required components"""

        super().__init__(permission_manager)

        self.filesystem = filesystem

        

    def execute(self, path: str, recursive: bool = True) -> ToolResult:

        """Scan directory for files"""

        target_path = Path(path).expanduser().resolve()

        

        if not target_path.exists():

            return ToolResult(

                status=ToolStatus.FAILURE,

                data=None,

                message=f"Path does not exist: {target_path}"

            )

        

        # Request permission

        permission_granted = self.permission_manager.request_permission(

            "Scan directory for files",

            {'path': str(target_path), 'recursive': recursive}

        )

        

        if not permission_granted:

            return ToolResult(

                status=ToolStatus.FAILURE,

                data=None,

                message="Permission denied by user"

            )

        

        # Perform scan

        try:

            files = []

            if recursive:

                for file_path in target_path.rglob('*'):

                    if file_path.is_file():

                        files.append({

                            'path': str(file_path),

                            'size': file_path.stat().st_size,

                            'modified': file_path.stat().st_mtime

                        })

            else:

                for file_path in target_path.iterdir():

                    if file_path.is_file():

                        files.append({

                            'path': str(file_path),

                            'size': file_path.stat().st_size,

                            'modified': file_path.stat().st_mtime

                        })

            

            return ToolResult(

                status=ToolStatus.SUCCESS,

                data={'files': files, 'count': len(files)},

                message=f"Successfully scanned {len(files)} files"

            )

            

        except PermissionError:

            return ToolResult(

                status=ToolStatus.REQUIRES_CREDENTIALS,

                data=None,

                message="Elevated permissions required",

                requires_action={'type': 'credentials', 'purpose': 'scan'}

            )

    

    def get_description(self) -> str:

        """Return tool description"""

        return "Scans directories to find and catalog files"



class RemoveDuplicatesTool(BaseTool):

    """Tool for removing duplicate files"""

    

    def __init__(self, permission_manager, filesystem):

        """Initialize with required components"""

        super().__init__(permission_manager)

        self.filesystem = filesystem

        self.removed_files = []

        

    def execute(self, duplicates: Dict[str, List[Path]], 

               strategy: str = 'keep_oldest') -> ToolResult:

        """Remove duplicate files based on specified strategy"""

        valid_strategies = ['keep_oldest', 'keep_newest', 'keep_shortest_path']

        if strategy not in valid_strategies:

            return ToolResult(

                status=ToolStatus.FAILURE,

                data=None,

                message=f"Invalid strategy. Choose from: {valid_strategies}"

            )

        

        # Calculate which files to remove

        files_to_remove = []

        for file_hash, paths in duplicates.items():

            if len(paths) < 2:

                continue

                

            # Sort paths based on strategy

            if strategy == 'keep_oldest':

                sorted_paths = sorted(paths, key=lambda p: p.stat().st_mtime)

            elif strategy == 'keep_newest':

                sorted_paths = sorted(paths, key=lambda p: p.stat().st_mtime, 

                                    reverse=True)

            else:  # keep_shortest_path

                sorted_paths = sorted(paths, key=lambda p: len(str(p)))

            

            # Keep first, remove rest

            files_to_remove.extend(sorted_paths[1:])

        

        # Request permission

        total_size = sum(p.stat().st_size for p in files_to_remove)

        permission_granted = self.permission_manager.request_permission(

            f"Remove {len(files_to_remove)} duplicate files",

            {

                'files': [str(p) for p in files_to_remove],

                'size': total_size,

                'strategy': strategy

            }

        )

        

        if not permission_granted:

            return ToolResult(

                status=ToolStatus.FAILURE,

                data=None,

                message="Permission denied by user"

            )

        

        # Perform removal

        removed_count = 0

        failed_removals = []

        

        for file_path in files_to_remove:

            try:

                # Create removal record

                self._create_removal_record(file_path)

                

                # Remove file


                    # Remove file

                    file_path.unlink()

                    removed_count += 1

                    self.removed_files.append(str(file_path))

                    

                except PermissionError:

                    failed_removals.append({

                        'path': str(file_path),

                        'reason': 'permission_denied'

                    })

                except Exception as e:

                    failed_removals.append({

                        'path': str(file_path),

                        'reason': str(e)

                    })

            

            # Prepare result

            if failed_removals:

                status = ToolStatus.REQUIRES_CREDENTIALS if any(

                    f['reason'] == 'permission_denied' for f in failed_removals

                ) else ToolStatus.FAILURE

            else:

                status = ToolStatus.SUCCESS

            

            return ToolResult(

                status=status,

                data={

                    'removed': removed_count,

                    'failed': failed_removals,

                    'space_freed': total_size

                },

                message=f"Removed {removed_count} files, freed {total_size} bytes"

            )

        

        def _create_removal_record(self, file_path: Path):

            """Create a record of file removal for potential recovery"""

            record = {

                'path': str(file_path),

                'size': file_path.stat().st_size,

                'removed_at': datetime.now().isoformat()

            }

            

            # Store record

            app_data_dir = Path.home() / '.filesystem_optimizer' / 'removed'

            app_data_dir.mkdir(parents=True, exist_ok=True)

            

            record_file = app_data_dir / f"{datetime.now().timestamp()}.json"

            with open(record_file, 'w') as f:

                json.dump(record, f, indent=2)

        

        def get_description(self) -> str:

            """Return tool description"""

            return "Removes duplicate files based on specified strategy"

    

    

    # Local LLM Interface

    class LocalLLMInterface:

        """Manages interaction with the local language model"""

        

        def __init__(self, model_path: str):

            """Initialize the LLM with specified model path"""

            self.model_path = model_path

            self.model = None

            self.tokenizer = None

            self.system_prompt = self._load_system_prompt()

            

            # For demo purposes, we'll simulate LLM responses

            # In production, this would load actual model

            self.demo_mode = True

            

        def _load_system_prompt(self) -> str:

            """Load the system prompt that defines tool usage"""

            return """You are a filesystem optimization assistant. You help users 

            identify and remove duplicate files. Always ask for permission before 

            performing any operation. Use the provided tools to interact with the 

            filesystem. Format tool calls as JSON objects wrapped in ```json blocks."""

        

        def generate(self, messages: List[Dict[str, str]]) -> str:

            """Generate response from the model"""

            if self.demo_mode:

                # Simulate LLM responses based on user input

                user_message = messages[-1]['content'].lower()

                

                if 'find duplicate' in user_message or 'scan' in user_message:

                    path = self._extract_path(user_message)

                    return f"""I'll help you find duplicate files in {path}.


```json

{{

    "tool": "find_duplicates",

    "parameters": {{

        "path": "{path}"

    }}

}}

```


Let me scan the directory for duplicate files."""

                

                elif 'remove' in user_message and 'duplicate' in user_message:

                    return """I'll help you remove the duplicate files. First, let me scan for duplicates.


```json

{{

    "tool": "find_duplicates",

    "parameters": {{

        "path": "."

    }}

}}

```


After finding duplicates, I'll help you remove them safely."""

                

                else:

                    return """I can help you find and remove duplicate files on your system. 

                    

You can ask me to:

- Scan specific directories for duplicates

- Remove duplicate files while keeping one copy

- Check your entire home directory


What would you like me to do?"""

            

            # In production, this would use the actual model

            # return self._generate_with_model(messages)

        

        def _extract_path(self, message: str) -> str:

            """Extract path from user message"""

            # Simple extraction logic for demo

            if 'documents' in message.lower():

                return "~/Documents"

            elif 'downloads' in message.lower():

                return "~/Downloads"

            elif 'home' in message.lower():

                return "~"

            else:

                return "."

    

    

    # LLM Controller

    class LLMController:

        """Controls the LLM and coordinates tool execution"""

        

        def __init__(self, model_path: str):

            """Initialize controller with model path"""

            self.llm = LocalLLMInterface(model_path)

            self.ui = UserInterface()

            self.filesystem = FilesystemAbstraction()

            

            # Initialize tools

            self.tools = {

                'scan_directory': ScanDirectoryTool(

                    self.ui.permission_manager, 

                    self.filesystem

                ),

                'find_duplicates': DuplicateDetector(self.filesystem),

                'remove_duplicates': RemoveDuplicatesTool(

                    self.ui.permission_manager,

                    self.filesystem

                )

            }

            

            self.conversation_history = []

            

        def process_request(self, user_input: str) -> str:

            """Process a user request and return response"""

            # Add to conversation history

            self.conversation_history.append({

                'role': 'user',

                'content': user_input

            })

            

            # Generate LLM response with tool calls

            llm_response = self._generate_llm_response(user_input)

            

            # Parse and execute any tool calls

            parsed = self._parse_llm_response(llm_response)

            

            if parsed['tool_calls']:

                tool_results = self._execute_tool_calls(parsed['tool_calls'])

                

                # Generate final response based on tool results

                final_response = self._generate_final_response(

                    user_input, 

                    tool_results,

                    parsed['content']

                )

            else:

                final_response = parsed['content']

            

            # Add to conversation history

            self.conversation_history.append({

                'role': 'assistant',

                'content': final_response

            })

            

            return final_response

        

        def _generate_llm_response(self, user_input: str) -> str:

            """Generate LLM response potentially including tool calls"""

            messages = [

                {'role': 'system', 'content': self.llm.system_prompt}

            ] + self.conversation_history[-10:]

            

            # Add tool descriptions

            tool_descriptions = self._get_tool_descriptions()

            messages.append({

                'role': 'system',

                'content': f"Available tools:\n{tool_descriptions}"

            })

            

            # Generate response

            return self.llm.generate(messages)

        

        def _get_tool_descriptions(self) -> str:

            """Get formatted descriptions of all available tools"""

            descriptions = []

            for name, tool in self.tools.items():

                if hasattr(tool, 'get_description'):

                    descriptions.append(f"{name}: {tool.get_description()}")

            return "\n".join(descriptions)

        

        def _parse_llm_response(self, response: str) -> Dict[str, Any]:

            """Parse LLM response to extract tool calls and content"""

            result = {'content': response, 'tool_calls': []}

            

            # Look for JSON tool call blocks

            import re

            tool_pattern = r'```json\n(.*?)\n```'

            matches = re.findall(tool_pattern, response, re.DOTALL)

            

            for match in matches:

                try:

                    tool_call = json.loads(match)

                    if 'tool' in tool_call and 'parameters' in tool_call:

                        result['tool_calls'].append(tool_call)

                except json.JSONDecodeError:

                    continue

            

            # Remove tool calls from content

            result['content'] = re.sub(tool_pattern, '', response).strip()

            

            return result

        

        def _execute_tool_calls(self, tool_calls: List[Dict[str, Any]]) -> List[ToolResult]:

            """Execute a list of tool calls and return results"""

            results = []

            

            for call in tool_calls:

                tool_name = call.get('tool')

                parameters = call.get('parameters', {})

                

                if tool_name in self.tools:

                    tool = self.tools[tool_name]

                    

                    # Special handling for find_duplicates

                    if tool_name == 'find_duplicates':

                        result = self._handle_duplicate_finding(parameters)

                    else:

                        result = tool.execute(**parameters)

                    

                    results.append(result)

                    

                    # Handle special cases

                    if result.status == ToolStatus.REQUIRES_CREDENTIALS:

                        credentials = self.ui.credential_manager.request_credentials(

                            result.requires_action['purpose'],

                            result.requires_action['type']

                        )

                        if credentials:

                            # Retry with elevated permissions would go here

                            pass

                else:

                    results.append(ToolResult(

                        status=ToolStatus.FAILURE,

                        data=None,

                        message=f"Unknown tool: {tool_name}"

                    ))

            

            return results

        

        def _handle_duplicate_finding(self, parameters: Dict[str, Any]) -> ToolResult:

            """Special handling for duplicate finding operation"""

            path = parameters.get('path', '.')

            target_path = Path(path).expanduser().resolve()

            

            # Request permission

            permission_granted = self.ui.permission_manager.request_permission(

                "Scan for duplicate files",

                {'path': str(target_path)}

            )

            

            if not permission_granted:

                return ToolResult(

                    status=ToolStatus.FAILURE,

                    data=None,

                    message="Permission denied by user"

                )

            

            # Find duplicates with progress

            print("\nScanning for duplicate files...")

            detector = self.tools['find_duplicates']

            duplicates = detector.find_duplicates(

                target_path,

                progress_callback=lambda msg: print(f"\r{msg}", end="", flush=True)

            )

            print()  # New line after progress

            

            # Show summary to user

            if duplicates:

                self.ui.show_duplicate_summary(duplicates)

                

                # Store duplicates for potential removal

                self.current_duplicates = duplicates

            

            return ToolResult(

                status=ToolStatus.SUCCESS,

                data={'duplicates': duplicates},

                message=f"Found {len(duplicates)} groups of duplicate files"

            )

        

        def _generate_final_response(self, user_input: str, 

                                   tool_results: List[ToolResult],

                                   initial_response: str) -> str:

            """Generate final response based on tool results"""

            response_parts = []

            

            if initial_response:

                response_parts.append(initial_response)

            

            for result in tool_results:

                if result.status == ToolStatus.SUCCESS:

                    if 'duplicates' in result.data:

                        duplicates = result.data['duplicates']

                        if duplicates:

                            response_parts.append(

                                f"\nI found {len(duplicates)} groups of duplicate files. "

                                f"Would you like me to remove the duplicates? "

                                f"I can keep the oldest, newest, or shortest path version of each file."

                            )

                        else:

                            response_parts.append(

                                "\nNo duplicate files were found in the specified location."

                            )

                    elif 'removed' in result.data:

                        response_parts.append(

                            f"\nSuccessfully removed {result.data['removed']} duplicate files, "

                            f"freeing up {self.ui._format_bytes(result.data['space_freed'])} of space."

                        )

                elif result.status == ToolStatus.FAILURE:

                    response_parts.append(f"\nError: {result.message}")

                elif result.status == ToolStatus.REQUIRES_CREDENTIALS:

                    response_parts.append(

                        f"\nThis operation requires elevated permissions. "

                        f"Please provide credentials when prompted."

                    )

            

            return "\n".join(response_parts)

    

    

    # Main Application

    class FilesystemOptimizer:

        """Main application class for filesystem optimization"""

        

        def __init__(self, model_path: str = "local_model"):

            """Initialize the filesystem optimizer"""

            self.controller = LLMController(model_path)

            self.running = False

            

        def run(self):

            """Run the interactive filesystem optimizer"""

            self._print_welcome()

            self.running = True

            

            while self.running:

                try:

                    # Get user input

                    user_input = input("\nWhat would you like to do? > ").strip()

                    

                    # Check for exit commands

                    if user_input.lower() in ['exit', 'quit', 'bye']:

                        self.running = False

                        continue

                    

                    # Handle special commands

                    if user_input.lower() == 'help':

                        self._print_help()

                        continue

                    

                    # Process the request

                    response = self.controller.process_request(user_input)

                    

                    # Display response

                    print(f"\n{response}")

                    

                    # Check if we should offer to remove duplicates

                    if hasattr(self.controller, 'current_duplicates') and self.controller.current_duplicates:

                        remove = input("\nWould you like to remove the duplicates? (yes/no): ")

                        if remove.lower() in ['yes', 'y']:

                            strategy = self._get_removal_strategy()

                            removal_response = self.controller.process_request(

                                f"Remove the duplicates using {strategy} strategy"

                            )

                            print(f"\n{removal_response}")

                    

                except KeyboardInterrupt:

                    print("\n\nInterrupted by user.")

                    self.running = False

                except Exception as e:

                    print(f"\nAn error occurred: {e}")

                    print("Please try again or type 'exit' to quit.")

            

            self._print_goodbye()

        

        def _print_welcome(self):

            """Print welcome message and instructions"""

            print("="*70)

            print("FILESYSTEM OPTIMIZER - LLM-Powered Duplicate Finder")

            print("="*70)

            print("\nWelcome! I can help you find and remove duplicate files on your system.")

            print("\nExamples of what you can ask:")

            print("  - 'Find duplicate files in my Documents folder'")

            print("  - 'Scan my Downloads for duplicates'")

            print("  - 'Check my home directory for duplicate photos'")

            print("\nI will always ask for your permission before:")

            print("  - Scanning directories")

            print("  - Removing any files")

            print("  - Using elevated permissions")

            print("\nType 'help' for more information or 'exit' to quit.")

            print("="*70)

        

        def _print_help(self):

            """Print help information"""

            print("\n" + "="*70)

            print("HELP - Available Commands and Features")

            print("="*70)

            print("\nNatural Language Commands:")

            print("  - Ask me to find duplicates in any directory")

            print("  - Request removal of found duplicates")

            print("  - Specify removal strategies (keep oldest/newest/shortest path)")

            print("\nSpecial Commands:")

            print("  - 'help' - Show this help message")

            print("  - 'exit' - Quit the application")

            print("\nPermission System:")

            print("  - You can choose 'Always Allow' for repeated operations")

            print("  - All actions are logged for accountability")

            print("  - Removed files are recorded for potential recovery")

            print("="*70)

        

        def _get_removal_strategy(self) -> str:

            """Get removal strategy from user"""

            print("\nRemoval strategies:")

            print("  1. keep_oldest - Keep the oldest copy of each file")

            print("  2. keep_newest - Keep the newest copy of each file")

            print("  3. keep_shortest_path - Keep the file with shortest path")

            

            while True:

                choice = input("\nSelect strategy (1-3): ").strip()

                if choice == '1':

                    return 'keep_oldest'

                elif choice == '2':

                    return 'keep_newest'

                elif choice == '3':

                    return 'keep_shortest_path'

                else:

                    print("Invalid choice. Please select 1, 2, or 3.")

        

        def _print_goodbye(self):

            """Print goodbye message"""

            print("\n" + "="*70)

            print("Thank you for using Filesystem Optimizer!")

            print("All removed files have been logged for potential recovery.")

            print("="*70)

    

    

    # Entry point

    if __name__ == "__main__":

        # Check for command line arguments

        import argparse

        

        parser = argparse.ArgumentParser(

            description="LLM-powered filesystem optimizer for finding and removing duplicate files"

        )

        parser.add_argument(

            '--model-path',

            type=str,

            default='local_model',

            help='Path to the local LLM model directory'

        )

        

        args = parser.parse_args()

        

        # Create and run the application

        app = FilesystemOptimizer(model_path=args.model_path)

        

        try:

            app.run()

        except Exception as e:

            print(f"\nFatal error: {e}")

            sys.exit(1)



CONCLUSION


This implementation demonstrates a fully functional LLM-based filesystem optimization system. The application integrates natural language processing with system administration tasks while maintaining strict security through permission management. The modular architecture allows for easy extension to additional platforms and optimization strategies. The system prioritizes user control and transparency, ensuring that no actions are taken without explicit consent. Through careful error handling and recovery mechanisms, the application provides a safe and effective way to manage filesystem duplicates across multiple platforms.

No comments: