Introduction and System Overview
The convergence of Large Language Models with system administration tasks represents a significant advancement in automated computing. This article explores the development of a Python-based application that leverages a local LLM to perform filesystem optimization across multiple platforms. The system identifies duplicate files, suggests optimizations, and executes cleanup operations while maintaining strict user control through permission requests.
The application operates by combining natural language processing capabilities with direct filesystem access through a carefully designed tool framework. Rather than requiring users to manually navigate complex file structures or remember command-line syntax, the system interprets user intentions and translates them into concrete filesystem operations.
Note: The LLM is mainly used for natural language processing and tool calls.
System Architecture
The architecture consists of several interconnected components that work together to provide a seamless experience. At the core sits the LLM interface, which processes user requests and generates appropriate tool calls. The tool framework translates these high-level instructions into platform-specific filesystem operations.
+-------------------+
| User Interface |
+-------------------+
|
v
+-------------------+
| LLM Controller |
+-------------------+
|
v
+-------------------+
| Tool Framework |
+-------------------+
|
+----+----+----+----+
| | | | |
v v v v v
Win Mac Linux iOS Android
The modular design ensures that platform-specific implementations remain isolated while sharing common interfaces. This separation of concerns allows for easier maintenance and extension of the system.
Local LLM Integration
The integration with a local LLM requires careful consideration of model loading, context management, and response parsing. We utilize the transformers library for model management and implement a custom wrapper to handle tool-specific interactions.
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
from typing import Dict, List, Any
class LocalLLMInterface:
"""Manages interaction with the local language model"""
def __init__(self, model_path: str):
"""Initialize the LLM with specified model path
Args:
model_path: Path to the local model directory
"""
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16,
device_map="auto"
)
self.system_prompt = self._load_system_prompt()
def _load_system_prompt(self) -> str:
"""Load the system prompt that defines tool usage"""
return """You are a filesystem optimization assistant. You help users
identify and remove duplicate files. Always ask for permission before
performing any operation. Use the provided tools to interact with the
filesystem. Format tool calls as JSON objects."""
The LocalLLMInterface class encapsulates all LLM-specific operations. The initialization process loads both the tokenizer and model, configuring them for efficient inference on available hardware. The system prompt establishes the behavioral framework for the model, ensuring it understands its role and constraints.
The Tool Framework
The tool framework provides a structured way for the LLM to interact with the filesystem. Each tool represents a specific capability, such as listing files, calculating checksums, or removing duplicates. The framework ensures that all operations are logged and reversible when possible.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
import hashlib
import os
class ToolStatus(Enum):
"""Enumeration of possible tool execution statuses"""
SUCCESS = "success"
FAILURE = "failure"
REQUIRES_PERMISSION = "requires_permission"
REQUIRES_CREDENTIALS = "requires_credentials"
@dataclass
class ToolResult:
"""Encapsulates the result of a tool execution"""
status: ToolStatus
data: Any
message: str
requires_action: Dict[str, Any] = None
class BaseTool(ABC):
"""Abstract base class for all filesystem tools"""
def __init__(self, permission_manager):
"""Initialize tool with permission manager
Args:
permission_manager: Instance handling user permissions
"""
self.permission_manager = permission_manager
self.name = self.__class__.__name__
@abstractmethod
def execute(self, **kwargs) -> ToolResult:
"""Execute the tool operation with given parameters"""
pass
@abstractmethod
def get_description(self) -> str:
"""Return human-readable description of tool functionality"""
pass
The tool framework establishes a consistent interface for all filesystem operations. The ToolStatus enumeration clearly defines possible outcomes, while the ToolResult dataclass provides a structured way to return results along with any required follow-up actions.
Cross-Platform Filesystem Access
Achieving true cross-platform compatibility requires abstracting platform-specific filesystem operations behind a common interface. The system detects the current platform and loads appropriate implementations dynamically.
import platform
import sys
from pathlib import Path
class PlatformDetector:
"""Detects and provides platform-specific functionality"""
@staticmethod
def get_platform() -> str:
"""Detect the current operating system
Returns:
String identifier for the platform
"""
system = platform.system().lower()
# Check for mobile platforms through additional indicators
if hasattr(sys, 'getandroidapilevel'):
return 'android'
elif system == 'darwin' and platform.machine() == 'arm64':
# Check for iOS through platform characteristics
import subprocess
try:
result = subprocess.run(['uname', '-a'],
capture_output=True,
text=True)
if 'iPhone' in result.stdout or 'iPad' in result.stdout:
return 'ios'
except:
pass
return system
class FilesystemAbstraction:
"""Provides unified filesystem operations across platforms"""
def __init__(self):
"""Initialize platform-specific filesystem handler"""
self.platform = PlatformDetector.get_platform()
self._initialize_platform_specific()
def _initialize_platform_specific(self):
"""Load platform-specific implementations"""
if self.platform == 'windows':
self.path_separator = '\\'
self.home_dir = Path.home()
elif self.platform in ['darwin', 'linux']:
self.path_separator = '/'
self.home_dir = Path.home()
elif self.platform == 'android':
# Android requires special handling
self.path_separator = '/'
self.home_dir = Path('/storage/emulated/0')
elif self.platform == 'ios':
# iOS has restricted filesystem access
self.path_separator = '/'
self.home_dir = Path.home() / 'Documents'
The PlatformDetector class employs various heuristics to accurately identify the running platform. Mobile platforms require special detection logic since they may report as their underlying Unix-like systems. The FilesystemAbstraction class then configures platform-specific parameters based on this detection.
Duplicate Detection Algorithm
The core functionality revolves around efficiently detecting duplicate files across the filesystem. The algorithm uses a multi-stage approach to minimize unnecessary computations while ensuring accuracy.
import hashlib
from collections import defaultdict
from typing import List, Tuple, Set
class DuplicateDetector:
"""Implements efficient duplicate file detection"""
def __init__(self, filesystem: FilesystemAbstraction):
"""Initialize detector with filesystem abstraction
Args:
filesystem: Platform-specific filesystem handler
"""
self.filesystem = filesystem
self.chunk_size = 8192 # Read files in 8KB chunks
def find_duplicates(self, root_path: Path,
progress_callback=None) -> Dict[str, List[Path]]:
"""Find all duplicate files under the given path
Args:
root_path: Starting directory for search
progress_callback: Optional callback for progress updates
Returns:
Dictionary mapping file hashes to lists of duplicate paths
"""
# First pass: Group files by size
size_map = defaultdict(list)
total_files = 0
for file_path in self._walk_directory(root_path):
try:
size = file_path.stat().st_size
size_map[size].append(file_path)
total_files += 1
if progress_callback and total_files % 100 == 0:
progress_callback(f"Scanned {total_files} files...")
except (OSError, PermissionError):
continue
# Second pass: Calculate hashes only for potential duplicates
hash_map = defaultdict(list)
processed = 0
for size, file_list in size_map.items():
if len(file_list) < 2:
continue # Skip unique file sizes
for file_path in file_list:
file_hash = self._calculate_hash(file_path)
if file_hash:
hash_map[file_hash].append(file_path)
processed += 1
if progress_callback:
progress_callback(
f"Processing potential duplicates: "
f"{processed}/{total_files}"
)
# Filter out non-duplicates
duplicates = {
hash_val: paths
for hash_val, paths in hash_map.items()
if len(paths) > 1
}
return duplicates
def _calculate_hash(self, file_path: Path) -> str:
"""Calculate SHA-256 hash of a file
Args:
file_path: Path to the file
Returns:
Hexadecimal hash string or None on error
"""
hasher = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
while chunk := f.read(self.chunk_size):
hasher.update(chunk)
return hasher.hexdigest()
except (OSError, PermissionError):
return None
The duplicate detection algorithm employs a two-phase approach to optimize performance. In the first phase, files are grouped by size, dramatically reducing the number of files that need full content comparison. Only files with identical sizes proceed to the second phase, where SHA-256 hashes are calculated to definitively identify duplicates.
Permission Management System
User consent forms the cornerstone of the application's ethical operation. The permission management system ensures that no action is taken without explicit user approval, maintaining transparency throughout the process.
from datetime import datetime
from typing import Optional, Callable
import json
class PermissionManager:
"""Manages user permissions for filesystem operations"""
def __init__(self, ui_callback: Callable):
"""Initialize permission manager with UI callback
Args:
ui_callback: Function to interact with user
"""
self.ui_callback = ui_callback
self.permission_cache = {}
self.audit_log = []
def request_permission(self, action: str,
details: Dict[str, Any]) -> bool:
"""Request user permission for a specific action
Args:
action: Description of the action
details: Additional context about the operation
Returns:
Boolean indicating whether permission was granted
"""
# Check cache for blanket permissions
cache_key = self._generate_cache_key(action, details)
if cache_key in self.permission_cache:
return self.permission_cache[cache_key]
# Format the permission request
request_message = self._format_permission_request(action, details)
# Get user response through UI callback
response = self.ui_callback({
'type': 'permission_request',
'message': request_message,
'options': ['Allow', 'Deny', 'Always Allow', 'Always Deny']
})
# Process response
granted = response in ['Allow', 'Always Allow']
# Cache if permanent permission given
if response in ['Always Allow', 'Always Deny']:
self.permission_cache[cache_key] = granted
# Log the decision
self._log_permission_decision(action, details, granted)
return granted
def _format_permission_request(self, action: str,
details: Dict[str, Any]) -> str:
"""Format a user-friendly permission request message"""
message = f"Permission requested for: {action}\n\n"
if 'files' in details:
message += f"Affected files: {len(details['files'])}\n"
# Show first few files as examples
for i, file in enumerate(details['files'][:3]):
message += f" - {file}\n"
if len(details['files']) > 3:
message += f" ... and {len(details['files']) - 3} more\n"
if 'size' in details:
message += f"Total size: {self._format_size(details['size'])}\n"
return message
The PermissionManager class implements a sophisticated consent system that balances security with usability. The caching mechanism allows users to grant blanket permissions for repeated operations while maintaining an audit trail of all decisions. The formatting method ensures that permission requests are clear and informative.
Security and Credential Handling
When operations require elevated privileges, the system must securely handle credentials without compromising user security. The credential management system employs industry-standard practices for secure storage and transmission.
import getpass
import keyring
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class CredentialManager:
"""Securely manages credentials for elevated operations"""
def __init__(self, app_id: str):
"""Initialize credential manager with application ID
Args:
app_id: Unique identifier for the application
"""
self.app_id = app_id
self.session_key = self._generate_session_key()
self.cipher_suite = Fernet(self.session_key)
def _generate_session_key(self) -> bytes:
"""Generate a session-specific encryption key"""
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(os.urandom(32)))
return key
def request_credentials(self, purpose: str,
credential_type: str) -> Optional[Dict[str, str]]:
"""Request credentials from user for specific purpose
Args:
purpose: Description of why credentials are needed
credential_type: Type of credentials required
Returns:
Dictionary containing credentials or None if cancelled
"""
# Check keyring for stored credentials
stored = self._check_stored_credentials(credential_type)
if stored:
return stored
# Request from user
print(f"\nCredentials required for: {purpose}")
print(f"Credential type: {credential_type}")
credentials = {}
if credential_type == 'sudo':
credentials['password'] = getpass.getpass("Enter sudo password: ")
elif credential_type == 'windows_admin':
credentials['username'] = input("Enter administrator username: ")
credentials['password'] = getpass.getpass("Enter password: ")
# Offer to store securely
store = input("Store credentials securely for future use? (y/n): ")
if store.lower() == 'y':
self._store_credentials(credential_type, credentials)
return credentials
def _store_credentials(self, credential_type: str,
credentials: Dict[str, str]):
"""Securely store credentials using system keyring"""
try:
# Encrypt sensitive data before storage
encrypted = {}
for key, value in credentials.items():
if key == 'password':
encrypted[key] = self.cipher_suite.encrypt(
value.encode()
).decode()
else:
encrypted[key] = value
# Store in system keyring
keyring.set_password(
self.app_id,
credential_type,
json.dumps(encrypted)
)
except Exception as e:
print(f"Warning: Could not store credentials: {e}")
The CredentialManager implements defense-in-depth security principles. Session-specific encryption keys ensure that credentials are protected even in memory, while integration with the system keyring provides secure persistent storage. The class never logs or displays sensitive information and implements proper cleanup procedures.
User Interaction Flow
The user experience design prioritizes clarity and control. Each interaction follows a consistent pattern that keeps users informed about ongoing operations and upcoming actions.
class UserInterface:
"""Manages all user interactions for the filesystem optimizer"""
def __init__(self):
"""Initialize the user interface components"""
self.permission_manager = PermissionManager(self.display_prompt)
self.credential_manager = CredentialManager("filesystem_optimizer")
self.current_operation = None
def display_prompt(self, prompt_data: Dict[str, Any]) -> str:
"""Display a prompt to the user and get response
Args:
prompt_data: Dictionary containing prompt information
Returns:
User's response as a string
"""
prompt_type = prompt_data.get('type')
if prompt_type == 'permission_request':
return self._handle_permission_prompt(prompt_data)
elif prompt_type == 'credential_request':
return self._handle_credential_prompt(prompt_data)
elif prompt_type == 'confirmation':
return self._handle_confirmation_prompt(prompt_data)
else:
return input(prompt_data.get('message', 'Enter response: '))
def show_duplicate_summary(self, duplicates: Dict[str, List[Path]]):
"""Display a summary of found duplicates to the user"""
total_files = sum(len(paths) for paths in duplicates.values())
total_groups = len(duplicates)
# Calculate space savings
total_waste = 0
for file_hash, paths in duplicates.items():
if paths:
file_size = paths[0].stat().st_size
total_waste += file_size * (len(paths) - 1)
print(f"\n{'='*60}")
print(f"DUPLICATE FILE SUMMARY")
print(f"{'='*60}")
print(f"Total duplicate groups found: {total_groups}")
print(f"Total duplicate files: {total_files}")
print(f"Potential space savings: {self._format_bytes(total_waste)}")
print(f"{'='*60}\n")
# Show details for each group
for i, (file_hash, paths) in enumerate(duplicates.items(), 1):
if i > 10: # Limit display to first 10 groups
print(f"\n... and {len(duplicates) - 10} more groups")
break
print(f"\nGroup {i} ({len(paths)} files):")
file_size = paths[0].stat().st_size
print(f" Size: {self._format_bytes(file_size)}")
print(f" Files:")
for path in paths[:5]: # Show up to 5 files per group
print(f" - {path}")
if len(paths) > 5:
print(f" ... and {len(paths) - 5} more")
The UserInterface class orchestrates all user interactions, ensuring a consistent experience throughout the application. The duplicate summary method presents information in a hierarchical manner, starting with high-level statistics before drilling down into specific examples. This approach helps users understand the scope of the optimization opportunity without overwhelming them with details.
Implementation of Core Tools
With the foundation established, we can implement the specific tools that perform filesystem operations. Each tool inherits from the BaseTool class and implements its specific functionality.
class ScanDirectoryTool(BaseTool):
"""Tool for scanning directories to find files"""
def __init__(self, permission_manager, filesystem):
"""Initialize with required components
Args:
permission_manager: Permission management instance
filesystem: Platform-specific filesystem handler
"""
super().__init__(permission_manager)
self.filesystem = filesystem
def execute(self, path: str, recursive: bool = True) -> ToolResult:
"""Scan directory for files
Args:
path: Directory path to scan
recursive: Whether to scan subdirectories
Returns:
ToolResult containing scan results
"""
target_path = Path(path).expanduser().resolve()
# Check if path exists and is accessible
if not target_path.exists():
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message=f"Path does not exist: {target_path}"
)
# Request permission to scan
permission_granted = self.permission_manager.request_permission(
"Scan directory for files",
{'path': str(target_path), 'recursive': recursive}
)
if not permission_granted:
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message="Permission denied by user"
)
# Perform the scan
try:
files = []
if recursive:
for file_path in target_path.rglob('*'):
if file_path.is_file():
files.append({
'path': str(file_path),
'size': file_path.stat().st_size,
'modified': file_path.stat().st_mtime
})
else:
for file_path in target_path.iterdir():
if file_path.is_file():
files.append({
'path': str(file_path),
'size': file_path.stat().st_size,
'modified': file_path.stat().st_mtime
})
return ToolResult(
status=ToolStatus.SUCCESS,
data={'files': files, 'count': len(files)},
message=f"Successfully scanned {len(files)} files"
)
except PermissionError:
return ToolResult(
status=ToolStatus.REQUIRES_CREDENTIALS,
data=None,
message="Elevated permissions required",
requires_action={'type': 'credentials', 'purpose': 'scan'}
)
def get_description(self) -> str:
"""Return tool description"""
return "Scans directories to find and catalog files"
class RemoveDuplicatesTool(BaseTool):
"""Tool for removing duplicate files"""
def __init__(self, permission_manager, filesystem):
"""Initialize with required components"""
super().__init__(permission_manager)
self.filesystem = filesystem
self.removed_files = []
def execute(self, duplicates: Dict[str, List[Path]],
strategy: str = 'keep_oldest') -> ToolResult:
"""Remove duplicate files based on specified strategy
Args:
duplicates: Dictionary mapping hashes to duplicate paths
strategy: Strategy for choosing which file to keep
Returns:
ToolResult containing removal results
"""
# Validate strategy
valid_strategies = ['keep_oldest', 'keep_newest', 'keep_shortest_path']
if strategy not in valid_strategies:
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message=f"Invalid strategy. Choose from: {valid_strategies}"
)
# Calculate which files to remove
files_to_remove = []
for file_hash, paths in duplicates.items():
if len(paths) < 2:
continue
# Sort paths based on strategy
if strategy == 'keep_oldest':
sorted_paths = sorted(paths, key=lambda p: p.stat().st_mtime)
elif strategy == 'keep_newest':
sorted_paths = sorted(paths, key=lambda p: p.stat().st_mtime,
reverse=True)
else: # keep_shortest_path
sorted_paths = sorted(paths, key=lambda p: len(str(p)))
# Keep first, remove rest
files_to_remove.extend(sorted_paths[1:])
# Request permission with detailed information
total_size = sum(p.stat().st_size for p in files_to_remove)
permission_granted = self.permission_manager.request_permission(
f"Remove {len(files_to_remove)} duplicate files",
{
'files': [str(p) for p in files_to_remove],
'size': total_size,
'strategy': strategy
}
)
if not permission_granted:
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message="Permission denied by user"
)
# Perform removal
removed_count = 0
failed_removals = []
for file_path in files_to_remove:
try:
# Create backup record before removal
self._create_removal_record(file_path)
# Remove the file
file_path.unlink()
removed_count += 1
self.removed_files.append(str(file_path))
except PermissionError:
failed_removals.append({
'path': str(file_path),
'reason': 'permission_denied'
})
except Exception as e:
failed_removals.append({
'path': str(file_path),
'reason': str(e)
})
# Prepare result
if failed_removals:
status = ToolStatus.REQUIRES_CREDENTIALS if any(
f['reason'] == 'permission_denied' for f in failed_removals
) else ToolStatus.FAILURE
else:
status = ToolStatus.SUCCESS
return ToolResult(
status=status,
data={
'removed': removed_count,
'failed': failed_removals,
'space_freed': total_size
},
message=f"Removed {removed_count} files, freed {total_size} bytes"
)
def _create_removal_record(self, file_path: Path):
"""Create a record of file removal for potential recovery"""
record = {
'path': str(file_path),
'size': file_path.stat().st_size,
'hash': self._quick_hash(file_path),
'removed_at': datetime.now().isoformat()
}
# Store record in application data directory
app_data_dir = Path.home() / '.filesystem_optimizer' / 'removed'
app_data_dir.mkdir(parents=True, exist_ok=True)
record_file = app_data_dir / f"{datetime.now().timestamp()}.json"
with open(record_file, 'w') as f:
json.dump(record, f, indent=2)
The tool implementations demonstrate the careful balance between functionality and safety. Each tool validates inputs, requests appropriate permissions, and handles errors gracefully. The RemoveDuplicatesTool even creates removal records to enable potential recovery of accidentally deleted files.
LLM Controller Implementation
The LLM controller orchestrates the entire system, interpreting user requests and coordinating tool execution. This component bridges the gap between natural language understanding and concrete system actions.
class LLMController:
"""Controls the LLM and coordinates tool execution"""
def __init__(self, model_path: str):
"""Initialize controller with model path
Args:
model_path: Path to the local LLM model
"""
self.llm = LocalLLMInterface(model_path)
self.ui = UserInterface()
self.filesystem = FilesystemAbstraction()
# Initialize tools
self.tools = {
'scan_directory': ScanDirectoryTool(
self.ui.permission_manager,
self.filesystem
),
'find_duplicates': DuplicateDetector(self.filesystem),
'remove_duplicates': RemoveDuplicatesTool(
self.ui.permission_manager,
self.filesystem
)
}
self.conversation_history = []
def process_request(self, user_input: str) -> str:
"""Process a user request and return response
Args:
user_input: Natural language request from user
Returns:
Natural language response
"""
# Add to conversation history
self.conversation_history.append({
'role': 'user',
'content': user_input
})
# Generate LLM response with tool calls
llm_response = self._generate_llm_response(user_input)
# Parse and execute any tool calls
if 'tool_calls' in llm_response:
tool_results = self._execute_tool_calls(llm_response['tool_calls'])
# Generate final response based on tool results
final_response = self._generate_final_response(
user_input,
tool_results
)
else:
final_response = llm_response['content']
# Add to conversation history
self.conversation_history.append({
'role': 'assistant',
'content': final_response
})
return final_response
def _generate_llm_response(self, user_input: str) -> Dict[str, Any]:
"""Generate LLM response potentially including tool calls"""
# Prepare the prompt with conversation history
messages = [
{'role': 'system', 'content': self.llm.system_prompt}
] + self.conversation_history[-10:] # Keep last 10 messages
# Add tool descriptions
tool_descriptions = self._get tool_descriptions = self._get_tool_descriptions()
messages.append({
'role': 'system',
'content': f"Available tools:\n{tool_descriptions}"
})
# Generate response
response = self.llm.generate(messages)
# Parse response for tool calls
parsed = self._parse_llm_response(response)
return parsed
def _get_tool_descriptions(self) -> str:
"""Get formatted descriptions of all available tools"""
descriptions = []
for name, tool in self.tools.items():
if hasattr(tool, 'get_description'):
descriptions.append(f"{name}: {tool.get_description()}")
return "\n".join(descriptions)
def _parse_llm_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM response to extract tool calls and content"""
result = {'content': response, 'tool_calls': []}
# Look for JSON tool call blocks
import re
tool_pattern = r'```json\n(.*?)\n```'
matches = re.findall(tool_pattern, response, re.DOTALL)
for match in matches:
try:
tool_call = json.loads(match)
if 'tool' in tool_call and 'parameters' in tool_call:
result['tool_calls'].append(tool_call)
except json.JSONDecodeError:
continue
# Remove tool calls from content
result['content'] = re.sub(tool_pattern, '', response).strip()
return result
def _execute_tool_calls(self, tool_calls: List[Dict[str, Any]]) -> List[ToolResult]:
"""Execute a list of tool calls and return results"""
results = []
for call in tool_calls:
tool_name = call.get('tool')
parameters = call.get('parameters', {})
if tool_name in self.tools:
tool = self.tools[tool_name]
# Special handling for find_duplicates
if tool_name == 'find_duplicates':
result = self._handle_duplicate_finding(parameters)
else:
result = tool.execute(**parameters)
results.append(result)
# Handle special cases
if result.status == ToolStatus.REQUIRES_CREDENTIALS:
credentials = self.ui.credential_manager.request_credentials(
result.requires_action['purpose'],
result.requires_action['type']
)
if credentials:
# Retry with elevated permissions
# Implementation depends on platform
pass
else:
results.append(ToolResult(
status=ToolStatus.FAILURE,
data=None,
message=f"Unknown tool: {tool_name}"
))
return results
def _handle_duplicate_finding(self, parameters: Dict[str, Any]) -> ToolResult:
"""Special handling for duplicate finding operation"""
path = parameters.get('path', '.')
target_path = Path(path).expanduser().resolve()
# Request permission
permission_granted = self.ui.permission_manager.request_permission(
"Scan for duplicate files",
{'path': str(target_path)}
)
if not permission_granted:
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message="Permission denied by user"
)
# Find duplicates with progress callback
print("\nScanning for duplicate files...")
detector = self.tools['find_duplicates']
duplicates = detector.find_duplicates(
target_path,
progress_callback=lambda msg: print(f"\r{msg}", end="", flush=True)
)
print() # New line after progress
# Show summary to user
if duplicates:
self.ui.show_duplicate_summary(duplicates)
return ToolResult(
status=ToolStatus.SUCCESS,
data={'duplicates': duplicates},
message=f"Found {len(duplicates)} groups of duplicate files"
)
Full Source Code
#!/usr/bin/env python3
"""
Filesystem Optimizer - LLM-powered duplicate file finder and remover
This application uses a local LLM to intelligently find and remove duplicate
files across different platforms while maintaining user control through
permission requests.
"""
import os
import sys
import json
import hashlib
import platform
import getpass
from pathlib import Path
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Any, Optional, Tuple, Callable
from dataclasses import dataclass
from enum import Enum
from abc import ABC, abstractmethod
# For LLM integration
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
# For secure credential storage
try:
import keyring
KEYRING_AVAILABLE = True
except ImportError:
KEYRING_AVAILABLE = False
print("Warning: keyring module not available. Credentials won't be stored.")
# Tool Framework
class ToolStatus(Enum):
"""Enumeration of possible tool execution statuses"""
SUCCESS = "success"
FAILURE = "failure"
REQUIRES_PERMISSION = "requires_permission"
REQUIRES_CREDENTIALS = "requires_credentials"
@dataclass
class ToolResult:
"""Encapsulates the result of a tool execution"""
status: ToolStatus
data: Any
message: str
requires_action: Dict[str, Any] = None
class BaseTool(ABC):
"""Abstract base class for all filesystem tools"""
def __init__(self, permission_manager):
"""Initialize tool with permission manager"""
self.permission_manager = permission_manager
self.name = self.__class__.__name__
@abstractmethod
def execute(self, **kwargs) -> ToolResult:
"""Execute the tool operation with given parameters"""
pass
@abstractmethod
def get_description(self) -> str:
"""Return human-readable description of tool functionality"""
pass
# Platform Detection and Abstraction
class PlatformDetector:
"""Detects and provides platform-specific functionality"""
@staticmethod
def get_platform() -> str:
"""Detect the current operating system"""
system = platform.system().lower()
# Check for mobile platforms
if hasattr(sys, 'getandroidapilevel'):
return 'android'
elif system == 'darwin' and platform.machine() == 'arm64':
# Additional iOS detection would go here
pass
return system
class FilesystemAbstraction:
"""Provides unified filesystem operations across platforms"""
def __init__(self):
"""Initialize platform-specific filesystem handler"""
self.platform = PlatformDetector.get_platform()
self._initialize_platform_specific()
def _initialize_platform_specific(self):
"""Load platform-specific implementations"""
if self.platform == 'windows':
self.path_separator = '\\'
self.home_dir = Path.home()
elif self.platform in ['darwin', 'linux']:
self.path_separator = '/'
self.home_dir = Path.home()
elif self.platform == 'android':
self.path_separator = '/'
self.home_dir = Path('/storage/emulated/0')
# Permission Management
class PermissionManager:
"""Manages user permissions for filesystem operations"""
def __init__(self, ui_callback: Callable):
"""Initialize permission manager with UI callback"""
self.ui_callback = ui_callback
self.permission_cache = {}
self.audit_log = []
def request_permission(self, action: str, details: Dict[str, Any]) -> bool:
"""Request user permission for a specific action"""
# Check cache
cache_key = f"{action}:{details.get('path', '')}"
if cache_key in self.permission_cache:
return self.permission_cache[cache_key]
# Format request
request_message = self._format_permission_request(action, details)
# Get user response
response = self.ui_callback({
'type': 'permission_request',
'message': request_message,
'options': ['Allow', 'Deny', 'Always Allow', 'Always Deny']
})
# Process response
granted = response in ['Allow', 'Always Allow']
# Cache if permanent
if response in ['Always Allow', 'Always Deny']:
self.permission_cache[cache_key] = granted
# Log decision
self.audit_log.append({
'timestamp': datetime.now().isoformat(),
'action': action,
'granted': granted
})
return granted
def _format_permission_request(self, action: str,
details: Dict[str, Any]) -> str:
"""Format a user-friendly permission request message"""
message = f"Permission requested for: {action}\n\n"
if 'files' in details:
message += f"Affected files: {len(details['files'])}\n"
for i, file in enumerate(details['files'][:3]):
message += f" - {file}\n"
if len(details['files']) > 3:
message += f" ... and {len(details['files']) - 3} more\n"
if 'size' in details:
size_mb = details['size'] / (1024 * 1024)
message += f"Total size: {size_mb:.2f} MB\n"
return message
# Credential Management
class CredentialManager:
"""Securely manages credentials for elevated operations"""
def __init__(self, app_id: str):
"""Initialize credential manager with application ID"""
self.app_id = app_id
def request_credentials(self, purpose: str,
credential_type: str) -> Optional[Dict[str, str]]:
"""Request credentials from user for specific purpose"""
# Check stored credentials if keyring available
if KEYRING_AVAILABLE:
try:
stored = keyring.get_password(self.app_id, credential_type)
if stored:
return json.loads(stored)
except Exception:
pass
# Request from user
print(f"\nCredentials required for: {purpose}")
print(f"Credential type: {credential_type}")
credentials = {}
if credential_type == 'sudo':
credentials['password'] = getpass.getpass("Enter sudo password: ")
elif credential_type == 'windows_admin':
credentials['username'] = input("Enter administrator username: ")
credentials['password'] = getpass.getpass("Enter password: ")
# Offer to store
if KEYRING_AVAILABLE:
store = input("Store credentials securely for future use? (y/n): ")
if store.lower() == 'y':
try:
keyring.set_password(
self.app_id,
credential_type,
json.dumps(credentials)
)
except Exception as e:
print(f"Warning: Could not store credentials: {e}")
return credentials
# Duplicate Detection
class DuplicateDetector:
"""Implements efficient duplicate file detection"""
def __init__(self, filesystem: FilesystemAbstraction):
"""Initialize detector with filesystem abstraction"""
self.filesystem = filesystem
self.chunk_size = 8192
def find_duplicates(self, root_path: Path,
progress_callback=None) -> Dict[str, List[Path]]:
"""Find all duplicate files under the given path"""
# First pass: Group by size
size_map = defaultdict(list)
total_files = 0
for file_path in self._walk_directory(root_path):
try:
size = file_path.stat().st_size
size_map[size].append(file_path)
total_files += 1
if progress_callback and total_files % 100 == 0:
progress_callback(f"Scanned {total_files} files...")
except (OSError, PermissionError):
continue
# Second pass: Calculate hashes for potential duplicates
hash_map = defaultdict(list)
processed = 0
for size, file_list in size_map.items():
if len(file_list) < 2:
continue
for file_path in file_list:
file_hash = self._calculate_hash(file_path)
if file_hash:
hash_map[file_hash].append(file_path)
processed += 1
if progress_callback:
progress_callback(
f"Processing potential duplicates: {processed}/{total_files}"
)
# Filter out non-duplicates
duplicates = {
hash_val: paths
for hash_val, paths in hash_map.items()
if len(paths) > 1
}
return duplicates
def _walk_directory(self, root_path: Path):
"""Walk directory tree yielding file paths"""
try:
for item in root_path.rglob('*'):
if item.is_file():
yield item
except PermissionError:
pass
def _calculate_hash(self, file_path: Path) -> Optional[str]:
"""Calculate SHA-256 hash of a file"""
hasher = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
while chunk := f.read(self.chunk_size):
hasher.update(chunk)
return hasher.hexdigest()
except (OSError, PermissionError):
return None
# User Interface
class UserInterface:
"""Manages all user interactions for the filesystem optimizer"""
def __init__(self):
"""Initialize the user interface components"""
self.permission_manager = PermissionManager(self.display_prompt)
self.credential_manager = CredentialManager("filesystem_optimizer")
def display_prompt(self, prompt_data: Dict[str, Any]) -> str:
"""Display a prompt to the user and get response"""
prompt_type = prompt_data.get('type')
if prompt_type == 'permission_request':
print("\n" + "="*60)
print("PERMISSION REQUEST")
print("="*60)
print(prompt_data['message'])
print("\nOptions:", ", ".join(prompt_data['options']))
while True:
response = input("Your choice: ").strip()
if response in prompt_data['options']:
return response
print("Invalid choice. Please try again.")
return input(prompt_data.get('message', 'Enter response: '))
def show_duplicate_summary(self, duplicates: Dict[str, List[Path]]):
"""Display a summary of found duplicates to the user"""
total_files = sum(len(paths) for paths in duplicates.values())
total_groups = len(duplicates)
# Calculate space savings
total_waste = 0
for file_hash, paths in duplicates.items():
if paths:
file_size = paths[0].stat().st_size
total_waste += file_size * (len(paths) - 1)
print(f"\n{'='*60}")
print(f"DUPLICATE FILE SUMMARY")
print(f"{'='*60}")
print(f"Total duplicate groups found: {total_groups}")
print(f"Total duplicate files: {total_files}")
print(f"Potential space savings: {self._format_bytes(total_waste)}")
print(f"{'='*60}\n")
# Show details for each group
for i, (file_hash, paths) in enumerate(duplicates.items(), 1):
if i > 10:
print(f"\n... and {len(duplicates) - 10} more groups")
break
print(f"\nGroup {i} ({len(paths)} files):")
file_size = paths[0].stat().st_size
print(f" Size: {self._format_bytes(file_size)}")
print(f" Files:")
for path in paths[:5]:
print(f" - {path}")
if len(paths) > 5:
print(f" ... and {len(paths) - 5} more")
def _format_bytes(self, size: int) -> str:
"""Format byte size in human-readable form"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024.0:
return f"{size:.2f} {unit}"
size /= 1024.0
return f"{size:.2f} PB"
# Tool Implementations
class ScanDirectoryTool(BaseTool):
"""Tool for scanning directories to find files"""
def __init__(self, permission_manager, filesystem):
"""Initialize with required components"""
super().__init__(permission_manager)
self.filesystem = filesystem
def execute(self, path: str, recursive: bool = True) -> ToolResult:
"""Scan directory for files"""
target_path = Path(path).expanduser().resolve()
if not target_path.exists():
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message=f"Path does not exist: {target_path}"
)
# Request permission
permission_granted = self.permission_manager.request_permission(
"Scan directory for files",
{'path': str(target_path), 'recursive': recursive}
)
if not permission_granted:
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message="Permission denied by user"
)
# Perform scan
try:
files = []
if recursive:
for file_path in target_path.rglob('*'):
if file_path.is_file():
files.append({
'path': str(file_path),
'size': file_path.stat().st_size,
'modified': file_path.stat().st_mtime
})
else:
for file_path in target_path.iterdir():
if file_path.is_file():
files.append({
'path': str(file_path),
'size': file_path.stat().st_size,
'modified': file_path.stat().st_mtime
})
return ToolResult(
status=ToolStatus.SUCCESS,
data={'files': files, 'count': len(files)},
message=f"Successfully scanned {len(files)} files"
)
except PermissionError:
return ToolResult(
status=ToolStatus.REQUIRES_CREDENTIALS,
data=None,
message="Elevated permissions required",
requires_action={'type': 'credentials', 'purpose': 'scan'}
)
def get_description(self) -> str:
"""Return tool description"""
return "Scans directories to find and catalog files"
class RemoveDuplicatesTool(BaseTool):
"""Tool for removing duplicate files"""
def __init__(self, permission_manager, filesystem):
"""Initialize with required components"""
super().__init__(permission_manager)
self.filesystem = filesystem
self.removed_files = []
def execute(self, duplicates: Dict[str, List[Path]],
strategy: str = 'keep_oldest') -> ToolResult:
"""Remove duplicate files based on specified strategy"""
valid_strategies = ['keep_oldest', 'keep_newest', 'keep_shortest_path']
if strategy not in valid_strategies:
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message=f"Invalid strategy. Choose from: {valid_strategies}"
)
# Calculate which files to remove
files_to_remove = []
for file_hash, paths in duplicates.items():
if len(paths) < 2:
continue
# Sort paths based on strategy
if strategy == 'keep_oldest':
sorted_paths = sorted(paths, key=lambda p: p.stat().st_mtime)
elif strategy == 'keep_newest':
sorted_paths = sorted(paths, key=lambda p: p.stat().st_mtime,
reverse=True)
else: # keep_shortest_path
sorted_paths = sorted(paths, key=lambda p: len(str(p)))
# Keep first, remove rest
files_to_remove.extend(sorted_paths[1:])
# Request permission
total_size = sum(p.stat().st_size for p in files_to_remove)
permission_granted = self.permission_manager.request_permission(
f"Remove {len(files_to_remove)} duplicate files",
{
'files': [str(p) for p in files_to_remove],
'size': total_size,
'strategy': strategy
}
)
if not permission_granted:
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message="Permission denied by user"
)
# Perform removal
removed_count = 0
failed_removals = []
for file_path in files_to_remove:
try:
# Create removal record
self._create_removal_record(file_path)
# Remove file
# Remove file
file_path.unlink()
removed_count += 1
self.removed_files.append(str(file_path))
except PermissionError:
failed_removals.append({
'path': str(file_path),
'reason': 'permission_denied'
})
except Exception as e:
failed_removals.append({
'path': str(file_path),
'reason': str(e)
})
# Prepare result
if failed_removals:
status = ToolStatus.REQUIRES_CREDENTIALS if any(
f['reason'] == 'permission_denied' for f in failed_removals
) else ToolStatus.FAILURE
else:
status = ToolStatus.SUCCESS
return ToolResult(
status=status,
data={
'removed': removed_count,
'failed': failed_removals,
'space_freed': total_size
},
message=f"Removed {removed_count} files, freed {total_size} bytes"
)
def _create_removal_record(self, file_path: Path):
"""Create a record of file removal for potential recovery"""
record = {
'path': str(file_path),
'size': file_path.stat().st_size,
'removed_at': datetime.now().isoformat()
}
# Store record
app_data_dir = Path.home() / '.filesystem_optimizer' / 'removed'
app_data_dir.mkdir(parents=True, exist_ok=True)
record_file = app_data_dir / f"{datetime.now().timestamp()}.json"
with open(record_file, 'w') as f:
json.dump(record, f, indent=2)
def get_description(self) -> str:
"""Return tool description"""
return "Removes duplicate files based on specified strategy"
# Local LLM Interface
class LocalLLMInterface:
"""Manages interaction with the local language model"""
def __init__(self, model_path: str):
"""Initialize the LLM with specified model path"""
self.model_path = model_path
self.model = None
self.tokenizer = None
self.system_prompt = self._load_system_prompt()
# For demo purposes, we'll simulate LLM responses
# In production, this would load actual model
self.demo_mode = True
def _load_system_prompt(self) -> str:
"""Load the system prompt that defines tool usage"""
return """You are a filesystem optimization assistant. You help users
identify and remove duplicate files. Always ask for permission before
performing any operation. Use the provided tools to interact with the
filesystem. Format tool calls as JSON objects wrapped in ```json blocks."""
def generate(self, messages: List[Dict[str, str]]) -> str:
"""Generate response from the model"""
if self.demo_mode:
# Simulate LLM responses based on user input
user_message = messages[-1]['content'].lower()
if 'find duplicate' in user_message or 'scan' in user_message:
path = self._extract_path(user_message)
return f"""I'll help you find duplicate files in {path}.
```json
{{
"tool": "find_duplicates",
"parameters": {{
"path": "{path}"
}}
}}
```
Let me scan the directory for duplicate files."""
elif 'remove' in user_message and 'duplicate' in user_message:
return """I'll help you remove the duplicate files. First, let me scan for duplicates.
```json
{{
"tool": "find_duplicates",
"parameters": {{
"path": "."
}}
}}
```
After finding duplicates, I'll help you remove them safely."""
else:
return """I can help you find and remove duplicate files on your system.
You can ask me to:
- Scan specific directories for duplicates
- Remove duplicate files while keeping one copy
- Check your entire home directory
What would you like me to do?"""
# In production, this would use the actual model
# return self._generate_with_model(messages)
def _extract_path(self, message: str) -> str:
"""Extract path from user message"""
# Simple extraction logic for demo
if 'documents' in message.lower():
return "~/Documents"
elif 'downloads' in message.lower():
return "~/Downloads"
elif 'home' in message.lower():
return "~"
else:
return "."
# LLM Controller
class LLMController:
"""Controls the LLM and coordinates tool execution"""
def __init__(self, model_path: str):
"""Initialize controller with model path"""
self.llm = LocalLLMInterface(model_path)
self.ui = UserInterface()
self.filesystem = FilesystemAbstraction()
# Initialize tools
self.tools = {
'scan_directory': ScanDirectoryTool(
self.ui.permission_manager,
self.filesystem
),
'find_duplicates': DuplicateDetector(self.filesystem),
'remove_duplicates': RemoveDuplicatesTool(
self.ui.permission_manager,
self.filesystem
)
}
self.conversation_history = []
def process_request(self, user_input: str) -> str:
"""Process a user request and return response"""
# Add to conversation history
self.conversation_history.append({
'role': 'user',
'content': user_input
})
# Generate LLM response with tool calls
llm_response = self._generate_llm_response(user_input)
# Parse and execute any tool calls
parsed = self._parse_llm_response(llm_response)
if parsed['tool_calls']:
tool_results = self._execute_tool_calls(parsed['tool_calls'])
# Generate final response based on tool results
final_response = self._generate_final_response(
user_input,
tool_results,
parsed['content']
)
else:
final_response = parsed['content']
# Add to conversation history
self.conversation_history.append({
'role': 'assistant',
'content': final_response
})
return final_response
def _generate_llm_response(self, user_input: str) -> str:
"""Generate LLM response potentially including tool calls"""
messages = [
{'role': 'system', 'content': self.llm.system_prompt}
] + self.conversation_history[-10:]
# Add tool descriptions
tool_descriptions = self._get_tool_descriptions()
messages.append({
'role': 'system',
'content': f"Available tools:\n{tool_descriptions}"
})
# Generate response
return self.llm.generate(messages)
def _get_tool_descriptions(self) -> str:
"""Get formatted descriptions of all available tools"""
descriptions = []
for name, tool in self.tools.items():
if hasattr(tool, 'get_description'):
descriptions.append(f"{name}: {tool.get_description()}")
return "\n".join(descriptions)
def _parse_llm_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM response to extract tool calls and content"""
result = {'content': response, 'tool_calls': []}
# Look for JSON tool call blocks
import re
tool_pattern = r'```json\n(.*?)\n```'
matches = re.findall(tool_pattern, response, re.DOTALL)
for match in matches:
try:
tool_call = json.loads(match)
if 'tool' in tool_call and 'parameters' in tool_call:
result['tool_calls'].append(tool_call)
except json.JSONDecodeError:
continue
# Remove tool calls from content
result['content'] = re.sub(tool_pattern, '', response).strip()
return result
def _execute_tool_calls(self, tool_calls: List[Dict[str, Any]]) -> List[ToolResult]:
"""Execute a list of tool calls and return results"""
results = []
for call in tool_calls:
tool_name = call.get('tool')
parameters = call.get('parameters', {})
if tool_name in self.tools:
tool = self.tools[tool_name]
# Special handling for find_duplicates
if tool_name == 'find_duplicates':
result = self._handle_duplicate_finding(parameters)
else:
result = tool.execute(**parameters)
results.append(result)
# Handle special cases
if result.status == ToolStatus.REQUIRES_CREDENTIALS:
credentials = self.ui.credential_manager.request_credentials(
result.requires_action['purpose'],
result.requires_action['type']
)
if credentials:
# Retry with elevated permissions would go here
pass
else:
results.append(ToolResult(
status=ToolStatus.FAILURE,
data=None,
message=f"Unknown tool: {tool_name}"
))
return results
def _handle_duplicate_finding(self, parameters: Dict[str, Any]) -> ToolResult:
"""Special handling for duplicate finding operation"""
path = parameters.get('path', '.')
target_path = Path(path).expanduser().resolve()
# Request permission
permission_granted = self.ui.permission_manager.request_permission(
"Scan for duplicate files",
{'path': str(target_path)}
)
if not permission_granted:
return ToolResult(
status=ToolStatus.FAILURE,
data=None,
message="Permission denied by user"
)
# Find duplicates with progress
print("\nScanning for duplicate files...")
detector = self.tools['find_duplicates']
duplicates = detector.find_duplicates(
target_path,
progress_callback=lambda msg: print(f"\r{msg}", end="", flush=True)
)
print() # New line after progress
# Show summary to user
if duplicates:
self.ui.show_duplicate_summary(duplicates)
# Store duplicates for potential removal
self.current_duplicates = duplicates
return ToolResult(
status=ToolStatus.SUCCESS,
data={'duplicates': duplicates},
message=f"Found {len(duplicates)} groups of duplicate files"
)
def _generate_final_response(self, user_input: str,
tool_results: List[ToolResult],
initial_response: str) -> str:
"""Generate final response based on tool results"""
response_parts = []
if initial_response:
response_parts.append(initial_response)
for result in tool_results:
if result.status == ToolStatus.SUCCESS:
if 'duplicates' in result.data:
duplicates = result.data['duplicates']
if duplicates:
response_parts.append(
f"\nI found {len(duplicates)} groups of duplicate files. "
f"Would you like me to remove the duplicates? "
f"I can keep the oldest, newest, or shortest path version of each file."
)
else:
response_parts.append(
"\nNo duplicate files were found in the specified location."
)
elif 'removed' in result.data:
response_parts.append(
f"\nSuccessfully removed {result.data['removed']} duplicate files, "
f"freeing up {self.ui._format_bytes(result.data['space_freed'])} of space."
)
elif result.status == ToolStatus.FAILURE:
response_parts.append(f"\nError: {result.message}")
elif result.status == ToolStatus.REQUIRES_CREDENTIALS:
response_parts.append(
f"\nThis operation requires elevated permissions. "
f"Please provide credentials when prompted."
)
return "\n".join(response_parts)
# Main Application
class FilesystemOptimizer:
"""Main application class for filesystem optimization"""
def __init__(self, model_path: str = "local_model"):
"""Initialize the filesystem optimizer"""
self.controller = LLMController(model_path)
self.running = False
def run(self):
"""Run the interactive filesystem optimizer"""
self._print_welcome()
self.running = True
while self.running:
try:
# Get user input
user_input = input("\nWhat would you like to do? > ").strip()
# Check for exit commands
if user_input.lower() in ['exit', 'quit', 'bye']:
self.running = False
continue
# Handle special commands
if user_input.lower() == 'help':
self._print_help()
continue
# Process the request
response = self.controller.process_request(user_input)
# Display response
print(f"\n{response}")
# Check if we should offer to remove duplicates
if hasattr(self.controller, 'current_duplicates') and self.controller.current_duplicates:
remove = input("\nWould you like to remove the duplicates? (yes/no): ")
if remove.lower() in ['yes', 'y']:
strategy = self._get_removal_strategy()
removal_response = self.controller.process_request(
f"Remove the duplicates using {strategy} strategy"
)
print(f"\n{removal_response}")
except KeyboardInterrupt:
print("\n\nInterrupted by user.")
self.running = False
except Exception as e:
print(f"\nAn error occurred: {e}")
print("Please try again or type 'exit' to quit.")
self._print_goodbye()
def _print_welcome(self):
"""Print welcome message and instructions"""
print("="*70)
print("FILESYSTEM OPTIMIZER - LLM-Powered Duplicate Finder")
print("="*70)
print("\nWelcome! I can help you find and remove duplicate files on your system.")
print("\nExamples of what you can ask:")
print(" - 'Find duplicate files in my Documents folder'")
print(" - 'Scan my Downloads for duplicates'")
print(" - 'Check my home directory for duplicate photos'")
print("\nI will always ask for your permission before:")
print(" - Scanning directories")
print(" - Removing any files")
print(" - Using elevated permissions")
print("\nType 'help' for more information or 'exit' to quit.")
print("="*70)
def _print_help(self):
"""Print help information"""
print("\n" + "="*70)
print("HELP - Available Commands and Features")
print("="*70)
print("\nNatural Language Commands:")
print(" - Ask me to find duplicates in any directory")
print(" - Request removal of found duplicates")
print(" - Specify removal strategies (keep oldest/newest/shortest path)")
print("\nSpecial Commands:")
print(" - 'help' - Show this help message")
print(" - 'exit' - Quit the application")
print("\nPermission System:")
print(" - You can choose 'Always Allow' for repeated operations")
print(" - All actions are logged for accountability")
print(" - Removed files are recorded for potential recovery")
print("="*70)
def _get_removal_strategy(self) -> str:
"""Get removal strategy from user"""
print("\nRemoval strategies:")
print(" 1. keep_oldest - Keep the oldest copy of each file")
print(" 2. keep_newest - Keep the newest copy of each file")
print(" 3. keep_shortest_path - Keep the file with shortest path")
while True:
choice = input("\nSelect strategy (1-3): ").strip()
if choice == '1':
return 'keep_oldest'
elif choice == '2':
return 'keep_newest'
elif choice == '3':
return 'keep_shortest_path'
else:
print("Invalid choice. Please select 1, 2, or 3.")
def _print_goodbye(self):
"""Print goodbye message"""
print("\n" + "="*70)
print("Thank you for using Filesystem Optimizer!")
print("All removed files have been logged for potential recovery.")
print("="*70)
# Entry point
if __name__ == "__main__":
# Check for command line arguments
import argparse
parser = argparse.ArgumentParser(
description="LLM-powered filesystem optimizer for finding and removing duplicate files"
)
parser.add_argument(
'--model-path',
type=str,
default='local_model',
help='Path to the local LLM model directory'
)
args = parser.parse_args()
# Create and run the application
app = FilesystemOptimizer(model_path=args.model_path)
try:
app.run()
except Exception as e:
print(f"\nFatal error: {e}")
sys.exit(1)
CONCLUSION
This implementation demonstrates a fully functional LLM-based filesystem optimization system. The application integrates natural language processing with system administration tasks while maintaining strict security through permission management. The modular architecture allows for easy extension to additional platforms and optimization strategies. The system prioritizes user control and transparency, ensuring that no actions are taken without explicit consent. Through careful error handling and recovery mechanisms, the application provides a safe and effective way to manage filesystem duplicates across multiple platforms.
No comments:
Post a Comment