Introduction and Problem Definition
The concept of controlling an operating system through natural language commands represents a significant leap forward in human-computer interaction. Traditional command-line interfaces require users to memorize specific syntax and parameters, while graphical user interfaces limit users to predefined actions through menus and buttons. An LLM-powered agent bridges this gap by interpreting natural language instructions and translating them into executable system operations.
The challenge lies not merely in understanding what the user wants to accomplish, but in safely and accurately executing those intentions within the constraints and capabilities of the underlying operating system. When a user says "move window Y to the middle of the screen," the agent must identify the specific window, calculate screen dimensions, determine the appropriate positioning coordinates, and execute the window management commands through the operating system's API.
This implementation requires several interconnected components working in harmony. The natural language processing component must parse and understand user intent. The command classification system must map intentions to specific system operations. The execution engine must safely carry out these operations while providing appropriate feedback to the user.
Core Architecture Overview
The architecture of such an agent follows a layered approach where each component has distinct responsibilities. The topmost layer handles natural language input from users and converts it into structured intent representations. The middle layer contains the business logic for command classification, parameter extraction, and safety validation. The bottom layer interfaces directly with the operating system through various APIs and system calls.
The flow begins when a user provides a natural language command. This input passes through a preprocessing stage where the text is cleaned and normalized. The processed text then enters the intent recognition system, which leverages the LLM to understand what action the user wants to perform. Once the intent is clear, the system extracts relevant parameters such as file names, application names, or window identifiers.
The extracted intent and parameters undergo validation to ensure the requested operation is safe and feasible. This validation includes checking file permissions, verifying that target applications exist, and ensuring the user has sufficient privileges for the requested action. Only after successful validation does the system proceed to execute the command through the appropriate operating system interface.
Natural Language Processing Component
The natural language processing component serves as the entry point for user commands and must handle the inherent ambiguity and variability of human language. Users may express the same intent in numerous ways, and the system must recognize these variations while extracting the essential information needed for execution.
Consider the following code example that demonstrates how to implement a basic intent recognition system using a language model:
import openai
import json
import re
class IntentRecognizer:
def __init__(self, api_key):
self.client = openai.OpenAI(api_key=api_key)
self.system_prompt = """
You are an OS command interpreter. Parse user commands and return JSON with:
- action: the primary action (run, move, create, list, change_directory, open)
- target: the object being acted upon
- parameters: additional details needed for execution
- confidence: your confidence level (0-1)
"""
def parse_command(self, user_input):
try:
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_input}
],
temperature=0.1
)
result = json.loads(response.choices[0].message.content)
return self.validate_intent(result)
except Exception as e:
return {"error": f"Intent recognition failed: {str(e)}"}
def validate_intent(self, intent):
required_fields = ["action", "target", "parameters", "confidence"]
if not all(field in intent for field in required_fields):
return {"error": "Incomplete intent structure"}
if intent["confidence"] < 0.7:
return {"error": "Low confidence in intent recognition"}
return intent
This code example demonstrates a basic intent recognition system that uses an LLM to parse natural language commands. The IntentRecognizer class initializes with an API key for the language model service and defines a system prompt that instructs the model on how to interpret user commands. The parse_command method sends the user input to the language model along with the system prompt and expects a JSON response containing the parsed intent.
The system prompt is crucial because it defines the expected output format and the types of actions the system can handle. By specifying that the response should include action, target, parameters, and confidence fields, we ensure consistent output that subsequent components can reliably process. The confidence score allows the system to reject ambiguous or unclear commands rather than attempting potentially incorrect operations.
The validate_intent method performs basic validation on the parsed intent to ensure it contains all required fields and meets minimum confidence thresholds. This validation step prevents the system from proceeding with
poorly understood commands that could lead to unintended consequences.
Command Classification and Intent Recognition
Once the natural language input has been processed, the system must classify the command into specific categories that correspond to different types of operating system operations. This classification goes beyond simple keyword matching and requires understanding the context and relationships between different elements of the command.
The classification system must handle various command types including application management, file system operations, window management, and system configuration changes. Each category requires different parameter extraction and validation logic. For instance, a file creation command needs a filename and potentially a directory path, while a window movement command requires window identification and positioning coordinates.
Here is an implementation example that demonstrates how to build a comprehensive command classifier:
import re
import psutil
import subprocess
from typing import Dict, List, Optional
class CommandClassifier:
def __init__(self):
self.action_handlers = {
"run": self.handle_run_application
,
"move": self.handle_move_window,
"create": self.handle_create_file,
"list": self.handle_list_directory,
"change_directory": self.handle_change_directory,
"open": self.handle_open_application
}
self.running_processes = self.get_running_processes()
def classify_and_execute(self, intent: Dict) -> Dict:
action = intent.get("action")
if action not in self.action_handlers:
return {"error": f"Unsupported action: {action}"}
handler = self.action_handlers[action]
return handler(intent)
def handle_run_application(self, intent: Dict) -> Dict:
app_name = intent["target"]
parameters = intent.get("parameters", {})
# Normalize application name
normalized_name = self.normalize_app_name(app_name)
if not normalized_name:
return {"error": f"Application '{app_name}' not found"}
try:
if parameters.get("arguments"):
cmd = [normalized_name] + parameters["arguments"]
else:
cmd = [normalized_name]
process = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return {"success": True, "pid": process.pid, "command": cmd}
except Exception as e:
return {"error": f"Failed to run {app_name}: {str(e)}"}
def normalize_app_name(self, app_name: str) -> Optional[str]:
# This method would contain logic to map common application names
# to their actual executable names on the system
app_mappings = {
"notepad": "notepad.exe",
"calculator": "calc.exe",
"excel": "excel.exe",
"word": "winword.exe",
"chrome": "chrome.exe",
"firefox": "firefox.exe"
}
app_lower = app_name.lower()
if app_lower in app_mappings:
return app_mappings[app_lower]
# Try to find the application in the system PATH
try:
result = subprocess.run(["where", app_name],
capture_output=True, text=True)
if result.returncode == 0:
return result.stdout.strip().split('\n')[0]
except:
pass
return None
def get_running_processes(self) -> List[Dict]:
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return processes
This code example shows how to implement a command classifier that can handle different types of operating system operations. The CommandClassifier class maintains a dictionary of action handlers, where each handler is responsible for processing a specific type of command. This design allows for easy extension when adding support for new command types.
The handle_run_application method demonstrates how to process application launch commands. It extracts the application name from the intent, normalizes it to find the actual executable, and then uses subprocess.Popen to launch the application. The normalization step is crucial because users often refer to applications by their common names rather than their executable filenames.
The normalize_app_name method shows how to map user-friendly application names to their actual executable names. This mapping can be extended to include more applications and can even query the system PATH to find executables that are not in the predefined mapping. This flexibility allows the system to handle both common applications and user-installed software.
Operating System Interface Layer
The operating system interface layer provides the bridge between high-level command intentions and low-level system operations. This layer must abstract the differences between operating systems while providing consistent functionality for the command execution engine. Different operating systems expose their functionality through various APIs, system calls, and command-line utilities.
On Windows systems, the interface layer might use the Windows API through libraries like pywin32, while on Linux systems it might rely more heavily on command-line utilities and system calls. The abstraction layer ensures that higher-level components do not need to be aware of these platform-specific details.
Consider this implementation example that demonstrates how to create a cross-platform file system interface:
import os
import platform
import shutil
import subprocess
from pathlib import Path
from typing import List, Dict, Optional
class OSInterface:
def __init__(self):
self.platform = platform.system().lower()
self.current_directory = os.getcwd()
def create_file(self, filename: str, content: str = "", directory: Optional[str] = None) -> Dict:
try:
if directory:
target_dir = Path(directory)
if not target_dir.exists():
return {"error": f"Directory {directory} does not exist"}
file_path = target_dir / filename
else:
file_path = Path(self.current_directory) / filename
# Check if file already exists
if file_path.exists():
return {"error": f"File {filename} already exists"}
# Create the file
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return {"success": True, "path": str(file_path), "size": len(content)}
except PermissionError:
return {"error": f"Permission denied creating file {filename}"}
except Exception as e:
return {"error": f"Failed to create file {filename}: {str(e)}"}
def list_directory(self, directory: Optional[str] = None) -> Dict:
try:
target_dir = Path(directory) if directory else Path(self.current_directory)
if not target_dir.exists():
return {"error": f"Directory {directory} does not exist"}
if not target_dir.is_dir():
return {"error": f"{directory} is not a directory"}
items = []
for item in target_dir.iterdir():
item_info = {
"name": item.name,
"type": "directory" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else None,
"modified": item.stat().st_mtime
}
items.append(item_info)
return {"success": True, "directory": str(target_dir), "items": items}
except PermissionError:
return {"error": f"Permission denied accessing directory {directory}"}
except Exception as e:
return {"error": f"Failed to list directory: {str(e)}"}
def change_directory(self, new_directory: str) -> Dict:
try:
target_path = Path(new_directory)
# Handle relative paths
if not target_path.is_absolute():
target_path = Path(self.current_directory) / target_path
# Resolve any symbolic links and relative components
target_path = target_path.resolve()
if not target_path.exists():
return {"error": f"Directory {new_directory} does not exist"}
if not target_path.is_dir():
return {"error": f"{new_directory} is not a directory"}
# Change the current directory
os.chdir(target_path)
self.current_directory = str(target_path)
return {"success": True, "new_directory": self.current_directory}
except PermissionError:
return {"error": f"Permission denied accessing directory {new_directory}"}
except Exception as e:
return {"error": f"Failed to change directory: {str(e)}"}
This code example demonstrates how to implement a cross-platform operating system interface that handles common file system operations. The OSInterface class provides methods for creating files, listing directory contents, and changing directories while handling platform-specific differences and error conditions.
The create_file method shows how to safely create files with proper error handling. It checks whether the target directory exists, whether the file already exists, and handles permission errors gracefully. The method also supports creating files in specific directories or in the current working directory.
The list_directory method demonstrates how to gather comprehensive information about directory contents. It returns not just file names but also metadata such as file types, sizes, and modification times. This additional information can be useful for more sophisticated commands that need to operate on files based on their properties.
The change_directory method illustrates how to handle path resolution and validation. It supports both absolute and relative paths, resolves symbolic links, and validates that the target is actually a directory before attempting to change to it. This robust validation prevents common errors and provides clear feedback when operations cannot be completed.
Command Execution Engine
The command execution engine serves as the orchestrator that coordinates between the intent recognition system and the operating system interface layer. This component must ensure that commands are executed in the correct order, handle dependencies between operations, and manage system resources appropriately.
The execution engine must also implement safety mechanisms to prevent dangerous operations and provide rollback capabilities when possible. For instance, before deleting files, the engine might create backups or move files to a trash directory instead of permanently removing them.
Here is an implementation example that shows how to build a robust command execution engine:
import threading
import time
import uuid
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
from enum import Enum
class ExecutionStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class ExecutionContext:
command_id: str
user_command: str
parsed_intent: Dict
status: ExecutionStatus
start_time: Optional[float] = None
end_time: Optional[float] = None
result: Optional[Dict] = None
error: Optional[str] = None
class CommandExecutionEngine:
def __init__(self, os_interface, max_concurrent_commands=5):
self.os_interface = os_interface
self.max_concurrent_commands = max_concurrent_commands
self.active_commands = {}
self.command_history = []
self.safety_checks = []
self.execution_lock = threading.Lock()
def register_safety_check(self, check_function: Callable[[Dict], bool]):
"""Register a safety check function that validates commands before execution"""
self.safety_checks.append(check_function)
def execute_command(self, user_command: str, parsed_intent: Dict) -> str:
"""Execute a command and return a command ID for tracking"""
command_id = str(uuid.uuid4())
context = ExecutionContext(
command_id=command_id,
user_command=user_command,
parsed_intent=parsed_intent,
status=ExecutionStatus.PENDING
)
# Run safety checks
for safety_check in self.safety_checks:
if not safety_check(parsed_intent):
context.status = ExecutionStatus.FAILED
context.error = "Command failed safety validation"
self.command_history.append(context)
return command_id
# Check if we can execute more commands
with self.execution_lock:
if len(self.active_commands) >= self.max_concurrent_commands:
context.status = ExecutionStatus.FAILED
context.error = "Maximum concurrent commands reached"
self.command_history.append(context)
return command_id
self.active_commands[command_id] = context
# Start execution in a separate thread
thread = threading.Thread(target=self._execute_command_thread, args=(context,))
thread.daemon = True
thread.start()
return command_id
def _execute_command_thread(self, context: ExecutionContext):
"""Execute a command in a separate thread"""
try:
context.status = ExecutionStatus.RUNNING
context.start_time = time.time()
action = context.parsed_intent["action"]
result = self._dispatch_command(action, context.parsed_intent)
context.end_time = time.time()
context.result = result
if "error" in result:
context.status = ExecutionStatus.FAILED
context.error = result["error"]
else:
context.status = ExecutionStatus.COMPLETED
except Exception as e:
context.status = ExecutionStatus.FAILED
context.error = f"Execution error: {str(e)}"
context.end_time = time.time()
finally:
# Move from active to history
with self.execution_lock:
if context.command_id in self.active_commands:
del self.active_commands[context.command_id]
self.command_history.append(context)
def _dispatch_command(self, action: str, intent: Dict) -> Dict:
"""Dispatch command to appropriate handler"""
if action == "create":
return self.os_interface.create_file(
intent["target"],
intent.get("parameters", {}).get("content", "")
)
elif action == "list":
directory = intent.get("parameters", {}).get("directory")
return self.os_interface.list_directory(directory)
elif action == "change_directory":
return self.os_interface.change_directory(intent["target"])
else:
return {"error": f"Unsupported action: {action}"}
def get_command_status(self, command_id: str) -> Optional[ExecutionContext]:
"""Get the current status of a command"""
# Check active commands first
if command_id in self.active_commands:
return self.active_commands[command_id]
# Check command history
for context in self.command_history:
if context.command_id == command_id:
return context
return None
def cancel_command(self, command_id: str) -> bool:
"""Attempt to cancel a running command"""
with self.execution_lock:
if command_id in self.active_commands:
context = self.active_commands[command_id]
if context.status == ExecutionStatus.PENDING:
context.status = ExecutionStatus.CANCELLED
del self.active_commands[command_id]
self.command_history.append(context)
return True
return False
# Example safety check functions
def prevent_system_file_deletion(intent: Dict) -> bool:
"""Prevent deletion of critical system files"""
if intent["action"] == "delete":
target = intent["target"].lower()
dangerous_patterns = ["system32", "boot", "etc/passwd", "etc/shadow"]
return not any(pattern in target for pattern in dangerous_patterns)
return True
def validate_file_permissions(intent: Dict) -> bool:
"""Validate that the user has appropriate permissions"""
if intent["action"] in ["create", "delete", "modify"]:
# This would contain actual permission checking logic
# For now, we'll just return True
return True
return True
This code example demonstrates a comprehensive command execution engine that handles concurrent command execution, safety validation, and command tracking. The ExecutionContext dataclass maintains the state of each command throughout its lifecycle, including timing information and results.
The CommandExecutionEngine class implements several important features for robust command execution. It limits the number of concurrent commands to prevent system overload, runs safety checks before executing commands, and provides detailed tracking of command status and results.
The safety check system allows the registration of validation functions that can prevent dangerous operations. The example includes safety checks for preventing system file deletion and validating file permissions. These checks run before command execution begins, providing an early opportunity to reject potentially harmful operations.
The threading implementation allows multiple commands to execute concurrently while maintaining thread safety through proper locking mechanisms. Each command executes in its own thread, preventing long-running operations from blocking other commands.
Safety and Security Considerations
Safety and security represent critical aspects of any system that can execute arbitrary commands on behalf of users. The agent must implement multiple layers of protection to prevent accidental damage, malicious exploitation, and unauthorized access to system resources.
The first layer of protection involves validating user permissions and ensuring that the agent only performs operations that the user is authorized to execute. This validation must occur both at the command level and at the individual operation level, since some commands might require elevated privileges that the user does not possess.
Here is an implementation example that demonstrates how to implement comprehensive safety and security measures:
import os
import pwd
import grp
import stat
import hashlib
import time
from typing import Dict, List, Set, Optional
from pathlib import Path
class SecurityManager:
def __init__(self):
self.dangerous_commands = {
"delete", "remove", "format", "shutdown", "reboot"
}
self.protected_directories = {
"/etc", "/bin", "/sbin", "/usr/bin", "/usr/sbin",
"/boot", "/sys", "/proc", "/dev"
}
self.protected_files = {
"/etc/passwd", "/etc/shadow", "/etc/sudoers",
"/boot/grub/grub.cfg"
}
self.command_rate_limits = {}
self.max_commands_per_minute = 30
def validate_command_safety(self, intent: Dict, user_context: Dict) -> Dict:
"""Comprehensive safety validation for commands"""
validation_result = {
"safe": True,
"warnings": [],
"errors": [],
"requires_confirmation": False
}
# Check rate limiting
if not self._check_rate_limit(user_context["user_id"]):
validation_result["safe"] = False
validation_result["errors"].append("Rate limit exceeded")
return validation_result
# Validate dangerous commands
if intent["action"] in self.dangerous_commands:
validation_result["requires_confirmation"] = True
validation_result["warnings"].append(
f"Command '{intent['action']}' is potentially dangerous"
)
# Validate file system operations
if intent["action"] in ["create", "delete", "modify"]:
file_validation = self._validate_file_operation(intent, user_context)
validation_result["warnings"].extend(file_validation["warnings"])
validation_result["errors"].extend(file_validation["errors"])
if file_validation["errors"]:
validation_result["safe"] = False
# Validate application execution
if intent["action"] == "run":
app_validation = self._validate_application_execution(intent, user_context)
validation_result["warnings"].extend(app_validation["warnings"])
validation_result["errors"].extend(app_validation["errors"])
if app_validation["errors"]:
validation_result["safe"] = False
return validation_result
def _check_rate_limit(self, user_id: str) -> bool:
"""Check if user has exceeded command rate limits"""
current_time = time.time()
minute_ago = current_time - 60
if user_id not in self.command_rate_limits:
self.command_rate_limits[user_id] = []
# Remove old entries
self.command_rate_limits[user_id] = [
timestamp for timestamp in self.command_rate_limits[user_id]
if timestamp > minute_ago
]
# Check if under limit
if len(self.command_rate_limits[user_id]) >= self.max_commands_per_minute:
return False
# Add current command
self.command_rate_limits[user_id].append(current_time)
return True
def _validate_file_operation(self, intent: Dict, user_context: Dict) -> Dict:
"""Validate file system operations for safety"""
result = {"warnings": [], "errors": []}
target = intent["target"]
try:
target_path = Path(target).resolve()
# Check protected directories
for protected_dir in self.protected_directories:
if str(target_path).startswith(protected_dir):
result["errors"].append(
f"Access to protected directory {protected_dir} denied"
)
return result
# Check protected files
if str(target_path) in self.protected_files:
result["errors"].append(
f"Access to protected file {target_path} denied"
)
return result
# Check file permissions
if target_path.exists():
file_stat = target_path.stat()
# Check if user owns the file or has write permissions
if not self._check_file_permissions(target_path, user_context, intent["action"]):
result["errors"].append(
f"Insufficient permissions for {intent['action']} on {target_path}"
)
# Warn about system files
if file_stat.st_mode & stat.S_ISUID or file_stat.st_mode & stat.S_ISGID:
result["warnings"].append(
f"Target file {target_path} has special permissions"
)
except Exception as e:
result["errors"].append(f"Error validating file operation: {str(e)}")
return result
def _check_file_permissions(self, file_path: Path, user_context: Dict, action: str) -> bool:
"""Check if user has appropriate permissions for file operation"""
try:
file_stat = file_path.stat()
user_id = user_context["user_id"]
# Get user and group information
user_info = pwd.getpwuid(os.getuid())
user_groups = [g.gr_gid for g in grp.getgrall() if user_info.pw_name in g.gr_mem]
# Check owner permissions
if file_stat.st_uid == os.getuid():
if action in ["read", "list"]:
return file_stat.st_mode & stat.S_IRUSR
elif action in ["create", "modify", "delete"]:
return file_stat.st_mode & stat.S_IWUSR
elif action == "execute":
return file_stat.st_mode & stat.S_IXUSR
# Check group permissions
if file_stat.st_gid in user_groups:
if action in ["read", "list"]:
return file_stat.st_mode & stat.S_IRGRP
elif action in ["create", "modify", "delete"]:
return file_stat.st_mode & stat.S_IWGRP
elif action == "execute":
return file_stat.st_mode & stat.S_IXGRP
# Check other permissions
if action in ["read", "list"]:
return file_stat.st_mode & stat.S_IROTH
elif action in ["create", "modify", "delete"]:
return file_stat.st_mode & stat.S_IWOTH
elif action == "execute":
return file_stat.st_mode & stat.S_IXOTH
except Exception:
return False
return False
def _validate_application_execution(self, intent: Dict, user_context: Dict) -> Dict:
"""Validate application execution for safety"""
result = {"warnings": [], "errors": []}
app_name = intent["target"]
# List of potentially dangerous applications
dangerous_apps = {
"rm", "dd", "mkfs", "fdisk", "parted", "shutdown", "reboot"
}
if app_name.lower() in dangerous_apps:
result["warnings"].append(
f"Application '{app_name}' can perform dangerous system operations"
)
# Check if application exists and is executable
try:
app_path = self._find_application_path(app_name)
if not app_path:
result["errors"].append(f"Application '{app_name}' not found")
return result
if not os.access(app_path, os.X_OK):
result["errors"].append(f"No execute permission for '{app_name}'")
except Exception as e:
result["errors"].append(f"Error validating application: {str(e)}")
return result
def _find_application_path(self, app_name: str) -> Optional[str]:
"""Find the full path of an application"""
# Check if it's already a full path
if os.path.isabs(app_name) and os.path.isfile(app_name):
return app_name
# Search in PATH
for path_dir in os.environ.get("PATH", "").split(os.pathsep):
app_path = os.path.join(path_dir, app_name)
if os.path.isfile(app_path) and os.access(app_path, os.X_OK):
return app_path
return None
This code example demonstrates a comprehensive security manager that implements multiple layers of protection for command execution. The SecurityManager class validates commands against various security criteria including rate limiting, protected file access, and dangerous command detection.
The validate_command_safety method serves as the main entry point for security validation. It performs multiple checks including rate limiting to prevent abuse, dangerous command detection to warn users about potentially harmful operations, and specific validations for different types of operations.
The rate limiting mechanism prevents users from overwhelming the system with too many commands in a short period. This protection helps prevent both accidental abuse and potential denial-of-service attacks. The system maintains a sliding window of command timestamps for each user and rejects commands that would exceed the configured rate limit.
The file operation validation demonstrates how to check file permissions and protect critical system files and directories. The system maintains lists of protected locations and validates that users have appropriate permissions before allowing file operations to proceed.
Error Handling and User Feedback
Effective error handling and user feedback mechanisms are essential for creating a reliable and user-friendly LLM agent. The system must gracefully handle various types of errors including network failures, permission issues, invalid commands, and system resource limitations. More importantly, it must provide clear and actionable feedback to users when problems occur.
The error handling system should distinguish between different types of errors and respond appropriately to each. Temporary errors such as network timeouts might trigger automatic retries, while permanent errors such as permission denials should immediately inform the user with suggestions for resolution.
Here is an implementation example that demonstrates comprehensive error handling and user feedback:
import logging
import traceback
import time
from typing import Dict, List, Optional, Any
from enum import Enum
from dataclasses import dataclass
class ErrorSeverity(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class ErrorCategory(Enum):
PERMISSION = "permission"
NETWORK = "network"
VALIDATION = "validation"
EXECUTION = "execution"
SYSTEM = "system"
USER_INPUT = "user_input"
@dataclass
class ErrorContext:
error_id: str
category: ErrorCategory
severity: ErrorSeverity
message: str
user_message: str
technical_details: Optional[str] = None
suggested_actions: List[str] = None
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.suggested_actions is None:
self.suggested_actions = []
class ErrorHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.error_history = []
self.retry_strategies = {
ErrorCategory.NETWORK: self._retry_network_operation,
ErrorCategory.SYSTEM: self._retry_system_operation
}
self.max_retries = 3
self.retry_delay = 1.0
def handle_error(self, error: Exception, context: Dict[str, Any]) -> ErrorContext:
"""Handle an error and return appropriate error context"""
error_context = self._classify_error(error, context)
# Log the error
self._log_error(error_context, error)
# Add to error history
self.error_history.append(error_context)
# Determine if retry is appropriate
if self._should_retry(error_context):
retry_result = self._attempt_retry(error_context, context)
if retry_result["success"]:
return self._create_success_context(retry_result)
return error_context
def _classify_error(self, error: Exception, context: Dict[str, Any]) -> ErrorContext:
"""Classify an error and create appropriate error context"""
error_type = type(error).__name__
error_message = str(error)
# Permission errors
if isinstance(error, PermissionError) or "permission" in error_message.lower():
return ErrorContext(
error_id=self._generate_error_id(),
category=ErrorCategory.PERMISSION,
severity=ErrorSeverity.ERROR,
message=f"Permission denied: {error_message}",
user_message="You don't have permission to perform this operation.",
technical_details=f"Error type: {error_type}, Details: {error_message}",
suggested_actions=[
"Check if you have the necessary permissions",
"Try running with elevated privileges if appropriate",
"Contact your system administrator if needed"
]
)
# File not found errors
if isinstance(error, FileNotFoundError):
return ErrorContext(
error_id=self._generate_error_id(),
category=ErrorCategory.VALIDATION,
severity=ErrorSeverity.ERROR,
message=f"File or directory not found: {error_message}",
user_message="The specified file or directory could not be found.",
technical_details=f"Error type: {error_type}, Details: {error_message}",
suggested_actions=[
"Check that the file or directory path is correct",
"Verify that the file or directory exists",
"Use absolute paths to avoid confusion"
]
)
# Network-related errors
if "network" in error_message.lower() or "connection" in error_message.lower():
return ErrorContext(
error_id=self._generate_error_id(),
category=ErrorCategory.NETWORK,
severity=ErrorSeverity.WARNING,
message=f"Network error: {error_message}",
user_message="A network error occurred. The operation will be retried automatically.",
technical_details=f"Error type: {error_type}, Details: {error_message}",
suggested_actions=[
"Check your internet connection",
"Wait a moment and try again",
"Contact support if the problem persists"
]
)
# Generic system errors
return ErrorContext(
error_id=self._generate_error_id(),
category=ErrorCategory.SYSTEM,
severity=ErrorSeverity.ERROR,
message=f"System error: {error_message}",
user_message="An unexpected system error occurred.",
technical_details=f"Error type: {error_type}, Details: {error_message}",
suggested_actions=[
"Try the operation again",
"Check system resources and permissions",
"Contact support with the error ID if the problem persists"
]
)
def _should_retry(self, error_context: ErrorContext) -> bool:
"""Determine if an error should trigger a retry attempt"""
retryable_categories = {ErrorCategory.NETWORK, ErrorCategory.SYSTEM}
retryable_severities = {ErrorSeverity.WARNING, ErrorSeverity.ERROR}
return (error_context.category in retryable_categories and
error_context.severity in retryable_severities)
def _attempt_retry(self, error_context: ErrorContext, context: Dict[str, Any]) -> Dict:
"""Attempt to retry a failed operation"""
if error_context.category not in self.retry_strategies:
return {"success": False, "reason": "No retry strategy available"}
retry_strategy = self.retry_strategies[error_context.category]
for attempt in range(self.max_retries):
try:
time.sleep(self.retry_delay * (attempt + 1)) # Exponential backoff
result = retry_strategy(context)
if result["success"]:
self.logger.info(f"Retry successful after {attempt + 1} attempts")
return result
except Exception as retry_error:
self.logger.warning(f"Retry attempt {attempt + 1} failed: {retry_error}")
return {"success": False, "reason": "All retry attempts failed"}
def _retry_network_operation(self, context: Dict[str, Any]) -> Dict:
"""Retry strategy for network operations"""
# This would contain the actual retry logic for network operations
# For demonstration, we'll simulate a retry
return {"success": True, "message": "Network operation retried successfully"}
def _retry_system_operation(self, context: Dict[str, Any]) -> Dict:
"""Retry strategy for system operations"""
# This would contain the actual retry logic for system operations
# For demonstration, we'll simulate a retry
return {"success": True, "message": "System operation retried successfully"}
def _log_error(self, error_context: ErrorContext, original_error: Exception):
"""Log error details for debugging and monitoring"""
log_level = {
ErrorSeverity.INFO: logging.INFO,
ErrorSeverity.WARNING: logging.WARNING,
ErrorSeverity.ERROR: logging.ERROR,
ErrorSeverity.CRITICAL: logging.CRITICAL
}[error_context.severity]
self.logger.log(
log_level,
f"Error {error_context.error_id}: {error_context.message}",
extra={
"error_id": error_context.error_id,
"category": error_context.category.value,
"severity": error_context.severity.value,
"technical_details": error_context.technical_details,
"traceback": traceback.format_exc()
}
)
def _generate_error_id(self) -> str:
"""Generate a unique error ID for tracking"""
import uuid
return f"ERR-{uuid.uuid4().hex[:8].upper()}"
def _create_success_context(self, retry_result: Dict) -> ErrorContext:
"""Create a success context after successful retry"""
return ErrorContext(
error_id=self._generate_error_id(),
category=ErrorCategory.SYSTEM,
severity=ErrorSeverity.INFO,
message="Operation completed successfully after retry",
user_message=retry_result["message"],
suggested_actions=[]
)
def format_user_message(self, error_context: ErrorContext) -> str:
"""Format error context into a user-friendly message"""
message_parts = [
f"Error ID: {error_context.error_id}",
f"Message: {error_context.user_message}"
]
if error_context.suggested_actions:
message_parts.append("Suggested actions:")
for action in error_context.suggested_actions:
message_parts.append(f" - {action}")
return "\n".join(message_parts)
This code example demonstrates a sophisticated error handling system that categorizes errors, provides appropriate user feedback, and implements retry strategies for recoverable failures. The ErrorHandler class maintains a comprehensive error classification system that maps different types of exceptions to appropriate user messages and suggested actions.
The error classification system distinguishes between different categories of errors such as permission issues, network problems, and validation failures. Each category receives different treatment in terms of severity assessment, retry strategies, and user communication. This differentiated approach ensures that users receive relevant and actionable feedback for each type of problem.
The retry mechanism implements exponential backoff for network and system errors that might be temporary. This automatic recovery capability improves the user experience by resolving transient issues without requiring user intervention. The system logs all retry attempts for debugging and monitoring purposes.
Testing and Validation Strategies
Testing an LLM agent for operating system control requires a comprehensive approach that covers multiple aspects including natural language understanding accuracy, command execution correctness, safety mechanism effectiveness, and system integration reliability. The testing strategy must account for the inherent variability in natural language input and the potential consequences of incorrect command execution.
The testing approach should include unit tests for individual components, integration tests for component interactions, end-to-end tests for complete user scenarios, and safety tests to verify that protective mechanisms work correctly. Additionally, the system requires ongoing monitoring and validation in production environments to detect and address issues that may not appear during development testing.
Here is an implementation example that demonstrates how to build a comprehensive testing framework for the LLM agent:
import unittest
import tempfile
import shutil
import os
import json
import time
from unittest.mock import Mock, patch, MagicMock
from typing import Dict, List, Any
import subprocess
class LLMAgentTestFramework:
def __init__(self, agent_instance):
self.agent = agent_instance
self.test_results = []
self.temp_directories = []
self.created_files = []
def setup_test_environment(self):
"""Create isolated test environment"""
# Create temporary directory for file operations
temp_dir = tempfile.mkdtemp(prefix="llm_agent_test_")
self.temp_directories.append(temp_dir)
# Change to test directory
self.original_cwd = os.getcwd()
os.chdir(temp_dir)
return temp_dir
def cleanup_test_environment(self):
"""Clean up test environment"""
# Restore original directory
os.chdir(self.original_cwd)
# Remove temporary directories
for temp_dir in self.temp_directories:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# Clean up created files
for file_path in self.created_files:
if os.path.exists(file_path):
os.remove(file_path)
def test_natural_language_understanding(self) -> Dict[str, Any]:
"""Test natural language understanding accuracy"""
test_cases = [
{
"input": "create a file called test.txt",
"expected_action": "create",
"expected_target": "test.txt",
"description": "Simple file creation command"
},
{
"input": "list the contents of the current directory",
"expected_action": "list",
"expected_target": ".",
"description": "Directory listing command"
},
{
"input": "run notepad",
"expected_action": "run",
"expected_target": "notepad",
"description": "Application launch command"
},
{
"input": "move the calculator window to the center of the screen",
"expected_action": "move",
"expected_target": "calculator",
"description": "Window management command"
}
]
results = {
"total_tests": len(test_cases),
"passed": 0,
"failed": 0,
"details": []
}
for test_case in test_cases:
try:
parsed_intent = self.agent.intent_recognizer.parse_command(test_case["input"])
test_result = {
"description": test_case["description"],
"input": test_case["input"],
"expected": {
"action": test_case["expected_action"],
"target": test_case["expected_target"]
},
"actual": parsed_intent,
"passed": False
}
if ("error" not in parsed_intent and
parsed_intent.get("action") == test_case["expected_action"] and
parsed_intent.get("target") == test_case["expected_target"]):
test_result["passed"] = True
results["passed"] += 1
else:
results["failed"] += 1
results["details"].append(test_result)
except Exception as e:
results["failed"] += 1
results["details"].append({
"description": test_case["description"],
"input": test_case["input"],
"error": str(e),
"passed": False
})
return results
def test_command_execution_safety(self) -> Dict[str, Any]:
"""Test safety mechanisms for dangerous commands"""
dangerous_test_cases = [
{
"input": "delete all files in /etc",
"should_block": True,
"description": "Attempt to delete system files"
},
{
"input": "format the hard drive",
"should_block": True,
"description": "Attempt to format storage"
},
{
"input": "create a file in /tmp/safe_test.txt",
"should_block": False,
"description": "Safe file creation"
}
]
results = {
"total_tests": len(dangerous_test_cases),
"passed": 0,
"failed": 0,
"details": []
}
for test_case in dangerous_test_cases:
try:
parsed_intent = self.agent.intent_recognizer.parse_command(test_case["input"])
if "error" in parsed_intent:
# Intent parsing failed
test_result = {
"description": test_case["description"],
"input": test_case["input"],
"result": "Intent parsing failed",
"passed": test_case["should_block"]
}
else:
# Check safety validation
safety_result = self.agent.security_manager.validate_command_safety(
parsed_intent, {"user_id": "test_user"}
)
blocked = not safety_result["safe"] or safety_result["requires_confirmation"]
test_result = {
"description": test_case["description"],
"input": test_case["input"],
"blocked": blocked,
"should_block": test_case["should_block"],
"passed": blocked == test_case["should_block"],
"safety_details": safety_result
}
if test_result["passed"]:
results["passed"] += 1
else:
results["failed"] += 1
results["details"].append(test_result)
except Exception as e:
results["failed"] += 1
results["details"].append({
"description": test_case["description"],
"input": test_case["input"],
"error": str(e),
"passed": False
})
return results
def test_file_operations(self) -> Dict[str, Any]:
"""Test file system operations in isolated environment"""
test_dir = self.setup_test_environment()
file_operation_tests = [
{
"command": "create a file called test1.txt with content 'Hello World'",
"verify": lambda: os.path.exists("test1.txt") and
open("test1.txt").read() == "Hello World",
"description": "File creation with content"
},
{
"command": "list the files in the current directory",
"verify": lambda: True, # Just check it doesn't crash
"description": "Directory listing"
},
{
"command": "create a subdirectory called testdir",
"verify": lambda: os.path.isdir("testdir"),
"description": "Directory creation"
}
]
results = {
"total_tests": len(file_operation_tests),
"passed": 0,
"failed": 0,
"details": []
}
try:
for test_case in file_operation_tests:
try:
# Execute the command
parsed_intent = self.agent.intent_recognizer.parse_command(test_case["command"])
if "error" not in parsed_intent:
command_id = self.agent.execution_engine.execute_command(
test_case["command"], parsed_intent
)
# Wait for completion
timeout = 10
start_time = time.time()
while time.time() - start_time < timeout:
status = self.agent.execution_engine.get_command_status(command_id)
if status and status.status.value in ["completed", "failed"]:
break
time.sleep(0.1)
# Verify result
verification_passed = test_case["verify"]()
test_result = {
"description": test_case["description"],
"command": test_case["command"],
"execution_status": status.status.value if status else "timeout",
"verification_passed": verification_passed,
"passed": verification_passed and status and status.status.value == "completed"
}
else:
test_result = {
"description": test_case["description"],
"command": test_case["command"],
"error": "Intent parsing failed",
"passed": False
}
if test_result["passed"]:
results["passed"] += 1
else:
results["failed"] += 1
results["details"].append(test_result)
except Exception as e:
results["failed"] += 1
results["details"].append({
"description": test_case["description"],
"command": test_case["command"],
"error": str(e),
"passed": False
})
finally:
self.cleanup_test_environment()
return results
def run_performance_tests(self) -> Dict[str, Any]:
"""Test system performance under various loads"""
performance_tests = [
{
"name": "Sequential command execution",
"test_func": self._test_sequential_performance,
"description": "Measure time for sequential command execution"
},
{
"name": "Concurrent command execution",
"test_func": self._test_concurrent_performance,
"description": "Measure performance with concurrent commands"
},
{
"name": "Memory usage",
"test_func": self._test_memory_usage,
"description": "Monitor memory usage during operation"
}
]
results = {
"total_tests": len(performance_tests),
"results": []
}
for test in performance_tests:
try:
test_result = test["test_func"]()
test_result["name"] = test["name"]
test_result["description"] = test["description"]
results["results"].append(test_result)
except Exception as e:
results["results"].append({
"name": test["name"],
"description": test["description"],
"error": str(e),
"passed": False
})
return results
def _test_sequential_performance(self) -> Dict[str, Any]:
"""Test sequential command execution performance"""
commands = [
"create file test1.txt",
"create file test2.txt",
"list current directory",
"create file test3.txt"
]
start_time = time.time()
for command in commands:
parsed_intent = self.agent.intent_recognizer.parse_command(command)
if "error" not in parsed_intent:
command_id = self.agent.execution_engine.execute_command(command, parsed_intent)
# Wait for completion
while True:
status = self.agent.execution_engine.get_command_status(command_id)
if status and status.status.value in ["completed", "failed"]:
break
time.sleep(0.01)
total_time = time.time() - start_time
return {
"total_time": total_time,
"commands_per_second": len(commands) / total_time,
"passed": total_time < 30 # Arbitrary threshold
}
def _test_concurrent_performance(self) -> Dict[str, Any]:
"""Test concurrent command execution performance"""
# This would implement concurrent command testing
return {"passed": True, "note": "Concurrent testing not implemented"}
def _test_memory_usage(self) -> Dict[str, Any]:
"""Test memory usage during operation"""
# This would implement memory usage monitoring
return {"passed": True, "note": "Memory testing not implemented"}
def generate_test_report(self, test_results: Dict[str, Any]) -> str:
"""Generate a comprehensive test report"""
report_lines = [
"LLM Agent Test Report",
"=" * 50,
f"Generated at: {time.strftime('%Y-%m-%d %H:%M:%S')}",
""
]
for test_name, results in test_results.items():
report_lines.extend([
f"{test_name}:",
f" Total Tests: {results.get('total_tests', 'N/A')}",
f" Passed: {results.get('passed', 'N/A')}",
f" Failed: {results.get('failed', 'N/A')}",
""
])
if "details" in results:
for detail in results["details"]:
status = "PASS" if detail.get("passed", False) else "FAIL"
report_lines.append(f" [{status}] {detail.get('description', 'Unknown test')}")
report_lines.append("")
return "\n".join(report_lines)
This code example demonstrates a comprehensive testing framework that validates multiple aspects of the LLM agent's functionality. The LLMAgentTestFramework class provides methods for testing natural language understanding, safety mechanisms, file operations, and system performance.
The natural language understanding tests verify that the agent correctly interprets various types of user commands and extracts the appropriate action and target information. These tests help ensure that the intent recognition system works reliably across different phrasings and command types.
The safety testing validates that the security mechanisms correctly identify and block dangerous operations while allowing safe commands to proceed. This testing is crucial for ensuring that the agent cannot be used to perform unauthorized or harmful actions on the system.
The file operations testing creates an isolated environment where actual file system operations can be tested without affecting the host system. This approach allows for comprehensive validation of command execution while maintaining system safety during testing.
Performance Optimization
Performance optimization for an LLM agent involves multiple considerations including response time for natural language processing, efficiency of command execution, resource utilization, and scalability under concurrent usage. The system must balance accuracy with speed while maintaining safety and reliability.
The optimization strategy should focus on caching frequently used results, optimizing API calls to language models, implementing efficient command queuing and execution, and minimizing system resource consumption. Additionally, the system should be designed to scale gracefully as the number of concurrent users increases.
The implementation requires careful attention to bottlenecks in the natural language processing pipeline, efficient management of system resources, and intelligent caching strategies that reduce redundant processing while maintaining accuracy and freshness of results.
TOOL CALLING AS A FOUNDATION FOR LLM AGENTS
Tool calling, also known as function calling, represents a paradigm shift in how Large Language Models interact with external systems. Instead of relying solely on text generation and parsing, modern LLMs can be configured to recognize when they need to invoke specific functions or tools to accomplish user requests. This approach provides a more structured and reliable method for implementing an operating system control agent.
The fundamental concept behind tool calling involves defining a set of available functions that the LLM can invoke, along with their parameters and expected behaviors. When a user provides a natural language command, the LLM analyzes the request and determines which tools need to be called, what parameters to pass, and in what sequence to execute them.
This approach offers several advantages over traditional text parsing methods. The LLM can make more informed decisions about which operations to perform, handle complex multi-step procedures more effectively, and provide better error handling when operations fail. Additionally, tool calling provides a clear separation between the language understanding capabilities of the LLM and the actual system operations.
IMPLEMENTING TOOL CALLING ARCHITECTURE
The implementation of a tool calling based agent requires careful design of the tool registry, parameter validation, and execution coordination. The system must define tools in a format that the LLM can understand while ensuring that each tool provides appropriate safety checks and error handling.
Here is a comprehensive implementation example that demonstrates how to build a tool calling system for operating system control:
import json
import inspect
import functools
import os
import subprocess
import psutil
from typing import Dict, List, Any, Callable, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import openai
class ToolParameterType(Enum):
STRING = "string"
INTEGER = "integer"
BOOLEAN = "boolean"
ARRAY = "array"
OBJECT = "object"
@dataclass
class ToolParameter:
name: str
type: ToolParameterType
description: str
required: bool = True
enum_values: Optional[List[str]] = None
default_value: Any = None
@dataclass
class ToolDefinition:
name: str
description: str
parameters: List[ToolParameter]
function: Callable
requires_confirmation: bool = False
safety_level: str = "safe" # safe, warning, dangerous
class ToolRegistry:
def __init__(self):
self.tools = {}
self.execution_history = []
def register_tool(self, tool_def: ToolDefinition):
"""Register a tool with the registry"""
self.tools[tool_def.name] = tool_def
def get_tool_schemas(self) -> List[Dict]:
"""Generate OpenAI function calling schemas for all registered tools"""
schemas = []
for tool_name, tool_def in self.tools.items():
schema = {
"type": "function",
"function": {
"name": tool_name,
"description": tool_def.description,
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
}
for param in tool_def.parameters:
param_schema = {
"type": param.type.value,
"description": param.description
}
if param.enum_values:
param_schema["enum"] = param.enum_values
schema["function"]["parameters"]["properties"][param.name] = param_schema
if param.required:
schema["function"]["parameters"]["required"].append(param.name)
schemas.append(schema)
return schemas
def execute_tool(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool with given parameters"""
if tool_name not in self.tools:
return {"error": f"Tool '{tool_name}' not found"}
tool_def = self.tools[tool_name]
# Validate parameters
validation_result = self._validate_parameters(tool_def, parameters)
if not validation_result["valid"]:
return {"error": f"Parameter validation failed: {validation_result['errors']}"}
# Check safety requirements
if tool_def.requires_confirmation and not parameters.get("confirmed", False):
return {
"requires_confirmation": True,
"message": f"Tool '{tool_name}' requires explicit confirmation due to safety level: {tool_def.safety_level}",
"tool_name": tool_name,
"parameters": parameters
}
try:
# Execute the tool function
result = tool_def.function(**parameters)
# Record execution
self.execution_history.append({
"tool_name": tool_name,
"parameters": parameters,
"result": result,
"timestamp": time.time()
})
return result
except Exception as e:
error_result = {"error": f"Tool execution failed: {str(e)}"}
self.execution_history.append({
"tool_name": tool_name,
"parameters": parameters,
"result": error_result,
"timestamp": time.time()
})
return error_result
def _validate_parameters(self, tool_def: ToolDefinition, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Validate parameters against tool definition"""
errors = []
# Check required parameters
for param in tool_def.parameters:
if param.required and param.name not in parameters:
errors.append(f"Required parameter '{param.name}' missing")
if param.name in parameters:
value = parameters[param.name]
# Type validation
if param.type == ToolParameterType.STRING and not isinstance(value, str):
errors.append(f"Parameter '{param.name}' must be a string")
elif param.type == ToolParameterType.INTEGER and not isinstance(value, int):
errors.append(f"Parameter '{param.name}' must be an integer")
elif param.type == ToolParameterType.BOOLEAN and not isinstance(value, bool):
errors.append(f"Parameter '{param.name}' must be a boolean")
elif param.type == ToolParameterType.ARRAY and not isinstance(value, list):
errors.append(f"Parameter '{param.name}' must be an array")
# Enum validation
if param.enum_values and value not in param.enum_values:
errors.append(f"Parameter '{param.name}' must be one of: {param.enum_values}")
return {"valid": len(errors) == 0, "errors": errors}
class OSControlTools:
def __init__(self):
self.registry = ToolRegistry()
self._register_all_tools()
def _register_all_tools(self):
"""Register all available OS control tools"""
# File creation tool
self.registry.register_tool(ToolDefinition(
name="create_file",
description="Create a new file with optional content",
parameters=[
ToolParameter("filename", ToolParameterType.STRING, "Name of the file to create"),
ToolParameter("content", ToolParameterType.STRING, "Content to write to the file", required=False, default_value=""),
ToolParameter("directory", ToolParameterType.STRING, "Directory where to create the file", required=False)
],
function=self._create_file,
safety_level="safe"
))
# Directory listing tool
self.registry.register_tool(ToolDefinition(
name="list_directory",
description="List contents of a directory",
parameters=[
ToolParameter("path", ToolParameterType.STRING, "Path to the directory to list", required=False, default_value="."),
ToolParameter("show_hidden", ToolParameterType.BOOLEAN, "Whether to show hidden files", required=False, default_value=False)
],
function=self._list_directory,
safety_level="safe"
))
# Application launch tool
self.registry.register_tool(ToolDefinition(
name="launch_application",
description="Launch an application or program",
parameters=[
ToolParameter("application", ToolParameterType.STRING, "Name or path of the application to launch"),
ToolParameter("arguments", ToolParameterType.ARRAY, "Command line arguments", required=False, default_value=[]),
ToolParameter("wait_for_completion", ToolParameterType.BOOLEAN, "Wait for application to complete", required=False, default_value=False)
],
function=self._launch_application,
safety_level="warning"
))
# File deletion tool
self.registry.register_tool(ToolDefinition(
name="delete_file",
description="Delete a file or directory",
parameters=[
ToolParameter("path", ToolParameterType.STRING, "Path to the file or directory to delete"),
ToolParameter("recursive", ToolParameterType.BOOLEAN, "Delete directories recursively", required=False, default_value=False)
],
function=self._delete_file,
requires_confirmation=True,
safety_level="dangerous"
))
# System information tool
self.registry.register_tool(ToolDefinition(
name="get_system_info",
description="Get system information including CPU, memory, and disk usage",
parameters=[
ToolParameter("info_type", ToolParameterType.STRING, "Type of information to retrieve",
enum_values=["cpu", "memory", "disk", "all"], default_value="all")
],
function=self._get_system_info,
safety_level="safe"
))
def _create_file(self, filename: str, content: str = "", directory: str = None) -> Dict[str, Any]:
"""Implementation of file creation tool"""
try:
if directory:
file_path = os.path.join(directory, filename)
os.makedirs(directory, exist_ok=True)
else:
file_path = filename
if os.path.exists(file_path):
return {"error": f"File '{file_path}' already exists"}
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return {
"success": True,
"message": f"File '{file_path}' created successfully",
"path": os.path.abspath(file_path),
"size": len(content.encode('utf-8'))
}
except PermissionError:
return {"error": f"Permission denied creating file '{filename}'"}
except Exception as e:
return {"error": f"Failed to create file: {str(e)}"}
def _list_directory(self, path: str = ".", show_hidden: bool = False) -> Dict[str, Any]:
"""Implementation of directory listing tool"""
try:
if not os.path.exists(path):
return {"error": f"Directory '{path}' does not exist"}
if not os.path.isdir(path):
return {"error": f"'{path}' is not a directory"}
items = []
for item_name in os.listdir(path):
if not show_hidden and item_name.startswith('.'):
continue
item_path = os.path.join(path, item_name)
item_stat = os.stat(item_path)
items.append({
"name": item_name,
"type": "directory" if os.path.isdir(item_path) else "file",
"size": item_stat.st_size,
"modified": item_stat.st_mtime,
"permissions": oct(item_stat.st_mode)[-3:]
})
return {
"success": True,
"directory": os.path.abspath(path),
"item_count": len(items),
"items": items
}
except PermissionError:
return {"error": f"Permission denied accessing directory '{path}'"}
except Exception as e:
return {"error": f"Failed to list directory: {str(e)}"}
def _launch_application(self, application: str, arguments: List[str] = None,
wait_for_completion: bool = False) -> Dict[str, Any]:
"""Implementation of application launch tool"""
try:
if arguments is None:
arguments = []
cmd = [application] + arguments
if wait_for_completion:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return {
"success": True,
"message": f"Application '{application}' completed",
"return_code": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr
}
else:
process = subprocess.Popen(cmd)
return {
"success": True,
"message": f"Application '{application}' launched",
"pid": process.pid,
"command": cmd
}
except FileNotFoundError:
return {"error": f"Application '{application}' not found"}
except subprocess.TimeoutExpired:
return {"error": f"Application '{application}' timed out"}
except Exception as e:
return {"error": f"Failed to launch application: {str(e)}"}
def _delete_file(self, path: str, recursive: bool = False) -> Dict[str, Any]:
"""Implementation of file deletion tool"""
try:
if not os.path.exists(path):
return {"error": f"Path '{path}' does not exist"}
# Safety check for critical system paths
critical_paths = ['/etc', '/bin', '/sbin', '/usr/bin', '/boot', '/sys', '/proc']
abs_path = os.path.abspath(path)
for critical_path in critical_paths:
if abs_path.startswith(critical_path):
return {"error": f"Cannot delete critical system path '{path}'"}
if os.path.isfile(path):
os.remove(path)
return {
"success": True,
"message": f"File '{path}' deleted successfully",
"type": "file"
}
elif os.path.isdir(path):
if recursive:
import shutil
shutil.rmtree(path)
return {
"success": True,
"message": f"Directory '{path}' deleted recursively",
"type": "directory"
}
else:
os.rmdir(path)
return {
"success": True,
"message": f"Empty directory '{path}' deleted",
"type": "directory"
}
except PermissionError:
return {"error": f"Permission denied deleting '{path}'"}
except OSError as e:
if e.errno == 39: # Directory not empty
return {"error": f"Directory '{path}' is not empty. Use recursive=true to delete"}
return {"error": f"Failed to delete '{path}': {str(e)}"}
except Exception as e:
return {"error": f"Failed to delete '{path}': {str(e)}"}
def _get_system_info(self, info_type: str = "all") -> Dict[str, Any]:
"""Implementation of system information tool"""
try:
info = {}
if info_type in ["cpu", "all"]:
info["cpu"] = {
"usage_percent": psutil.cpu_percent(interval=1),
"count": psutil.cpu_count(),
"frequency": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None
}
if info_type in ["memory", "all"]:
memory = psutil.virtual_memory()
info["memory"] = {
"total": memory.total,
"available": memory.available,
"used": memory.used,
"percent": memory.percent
}
if info_type in ["disk", "all"]:
disk = psutil.disk_usage('/')
info["disk"] = {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100
}
return {
"success": True,
"system_info": info,
"timestamp": time.time()
}
except Exception as e:
return {"error": f"Failed to get system information: {str(e)}"}
class LLMAgentWithToolCalling:
def __init__(self, openai_api_key: str):
self.client = openai.OpenAI(api_key=openai_api_key)
self.os_tools = OSControlTools()
self.conversation_history = []
self.pending_confirmations = {}
def process_user_command(self, user_input: str) -> Dict[str, Any]:
"""Process a user command using tool calling"""
try:
# Add user message to conversation history
self.conversation_history.append({
"role": "user",
"content": user_input
})
# Get tool schemas for the LLM
tool_schemas = self.os_tools.registry.get_tool_schemas()
# Call the LLM with tool calling capability
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": """You are an AI assistant that can control operating system functions through tool calls.
When a user requests an operation, use the appropriate tools to accomplish their goal.
Always explain what you're doing and ask for confirmation for potentially dangerous operations.
If a tool requires confirmation, inform the user and wait for their approval."""
}
] + self.conversation_history,
tools=tool_schemas,
tool_choice="auto"
)
response_message = response.choices[0].message
# Add assistant response to conversation history
self.conversation_history.append({
"role": "assistant",
"content": response_message.content,
"tool_calls": response_message.tool_calls
})
results = []
# Process tool calls if any
if response_message.tool_calls:
for tool_call in response_message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# Execute the tool
tool_result = self.os_tools.registry.execute_tool(function_name, function_args)
# Handle confirmation requirements
if tool_result.get("requires_confirmation"):
confirmation_id = f"confirm_{len(self.pending_confirmations)}"
self.pending_confirmations[confirmation_id] = {
"tool_name": function_name,
"parameters": function_args,
"tool_call_id": tool_call.id
}
tool_result["confirmation_id"] = confirmation_id
results.append({
"tool_call_id": tool_call.id,
"function_name": function_name,
"arguments": function_args,
"result": tool_result
})
# Add tool result to conversation history
self.conversation_history.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(tool_result)
})
return {
"success": True,
"response": response_message.content,
"tool_results": results,
"requires_confirmation": any(r["result"].get("requires_confirmation") for r in results)
}
except Exception as e:
return {"error": f"Failed to process command: {str(e)}"}
def confirm_operation(self, confirmation_id: str, confirmed: bool) -> Dict[str, Any]:
"""Confirm or deny a pending operation"""
if confirmation_id not in self.pending_confirmations:
return {"error": "Invalid confirmation ID"}
pending_op = self.pending_confirmations[confirmation_id]
if confirmed:
# Add confirmation parameter and execute
pending_op["parameters"]["confirmed"] = True
result = self.os_tools.registry.execute_tool(
pending_op["tool_name"],
pending_op["parameters"]
)
# Add result to conversation history
self.conversation_history.append({
"role": "tool",
"tool_call_id": pending_op["tool_call_id"],
"content": json.dumps(result)
})
del self.pending_confirmations[confirmation_id]
return {"success": True, "result": result, "message": "Operation confirmed and executed"}
else:
del self.pending_confirmations[confirmation_id]
return {"success": True, "message": "Operation cancelled by user"}
This comprehensive implementation demonstrates how tool calling can be used to create a robust LLM agent for operating system control. The ToolRegistry class manages the available tools and their schemas, while the OSControlTools class implements specific operating system functions as callable tools.
The tool calling approach provides several key advantages. The LLM receives structured information about available functions, including parameter types and descriptions, which enables more accurate function selection and parameter extraction. The system can handle complex multi-step operations by calling multiple tools in sequence, and it provides built-in safety mechanisms through confirmation requirements and parameter validation.
The LLMAgentWithToolCalling class demonstrates how to integrate tool calling with conversation management and safety confirmations. When the LLM determines that a tool should be called, it provides the function name and parameters in a structured format that can be directly executed by the tool registry.
ADVANCED TOOL CALLING PATTERNS
Tool calling can be extended to support more sophisticated patterns including tool chaining, conditional execution, and dynamic tool generation. These advanced patterns enable the creation of more capable agents that can handle complex workflows and adapt to different operating environments.
Tool chaining allows the output of one tool to be used as input for another tool, enabling complex multi-step operations. Conditional execution enables tools to make decisions based on the results of previous operations. Dynamic tool generation allows the system to create new tools based on user requirements or system capabilities.
Here is an implementation example that demonstrates these advanced patterns:
import time
import uuid
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class WorkflowStepType(Enum):
TOOL_CALL = "tool_call"
CONDITION = "condition"
LOOP = "loop"
PARALLEL = "parallel"
@dataclass
class WorkflowStep:
step_id: str
step_type: WorkflowStepType
tool_name: Optional[str] = None
parameters: Optional[Dict[str, Any]] = None
condition: Optional[str] = None
sub_steps: Optional[List['WorkflowStep']] = None
depends_on: Optional[List[str]] = None
class WorkflowEngine:
def __init__(self, tool_registry: ToolRegistry):
self.tool_registry = tool_registry
self.workflow_history = []
self.variable_context = {}
def execute_workflow(self, steps: List[WorkflowStep]) -> Dict[str, Any]:
"""Execute a workflow consisting of multiple steps"""
workflow_id = str(uuid.uuid4())
execution_context = {
"workflow_id": workflow_id,
"start_time": time.time(),
"steps_completed": [],
"step_results": {},
"variables": self.variable_context.copy()
}
try:
for step in steps:
if self._check_dependencies(step, execution_context):
result = self._execute_step(step, execution_context)
execution_context["step_results"][step.step_id] = result
execution_context["steps_completed"].append(step.step_id)
if not result.get("success", False):
return {
"success": False,
"error": f"Step {step.step_id} failed: {result.get('error', 'Unknown error')}",
"execution_context": execution_context
}
execution_context["end_time"] = time.time()
execution_context["duration"] = execution_context["end_time"] - execution_context["start_time"]
self.workflow_history.append(execution_context)
return {
"success": True,
"workflow_id": workflow_id,
"execution_context": execution_context
}
except Exception as e:
return {
"success": False,
"error": f"Workflow execution failed: {str(e)}",
"execution_context": execution_context
}
def _check_dependencies(self, step: WorkflowStep, context: Dict[str, Any]) -> bool:
"""Check if step dependencies are satisfied"""
if not step.depends_on:
return True
return all(dep_id in context["steps_completed"] for dep_id in step.depends_on)
def _execute_step(self, step: WorkflowStep, context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a single workflow step"""
if step.step_type == WorkflowStepType.TOOL_CALL:
return self._execute_tool_step(step, context)
elif step.step_type == WorkflowStepType.CONDITION:
return self._execute_condition_step(step, context)
elif step.step_type == WorkflowStepType.PARALLEL:
return self._execute_parallel_step(step, context)
else:
return {"success": False, "error": f"Unsupported step type: {step.step_type}"}
def _execute_tool_step(self, step: WorkflowStep, context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool call step with variable substitution"""
# Substitute variables in parameters
resolved_params = self._resolve_variables(step.parameters, context)
# Execute the tool
result = self.tool_registry.execute_tool(step.tool_name, resolved_params)
# Store result variables
if result.get("success") and "variables" in result:
context["variables"].update(result["variables"])
return result
def _resolve_variables(self, parameters: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve variable references in parameters"""
if not parameters:
return {}
resolved = {}
for key, value in parameters.items():
if isinstance(value, str) and value.startswith("${") and value.endswith("}"):
var_name = value[2:-1]
if var_name in context["variables"]:
resolved[key] = context["variables"][var_name]
elif var_name in context["step_results"]:
resolved[key] = context["step_results"][var_name]
else:
resolved[key] = value # Keep original if variable not found
else:
resolved[key] = value
return resolved
def _execute_condition_step(self, step: WorkflowStep, context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a conditional step"""
# This would implement condition evaluation logic
# For now, we'll return a simple success
return {"success": True, "message": "Condition step executed"}
def _execute_parallel_step(self, step: WorkflowStep, context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute parallel sub-steps"""
# This would implement parallel execution logic
# For now, we'll execute sequentially
results = []
for sub_step in step.sub_steps or []:
result = self._execute_step(sub_step, context)
results.append(result)
return {"success": True, "sub_results": results}
class AdvancedLLMAgent:
def __init__(self, openai_api_key: str):
self.client = openai.OpenAI(api_key=openai_api_key)
self.os_tools = OSControlTools()
self.workflow_engine = WorkflowEngine(self.os_tools.registry)
self.conversation_history = []
def process_complex_command(self, user_input: str) -> Dict[str, Any]:
"""Process complex commands that may require multiple tool calls"""
try:
# Enhanced system prompt for complex operations
system_prompt = """You are an advanced AI assistant that can control operating systems through tool calls.
For complex operations, you can plan and execute multi-step workflows.
Available capabilities:
1. Single tool calls for simple operations
2. Multi-step workflows for complex operations
3. Variable passing between steps
4. Conditional execution based on results
When planning complex operations:
1. Break down the task into logical steps
2. Identify dependencies between steps
3. Plan for error handling and rollback if needed
4. Use variables to pass data between steps
Always explain your plan before executing and ask for confirmation for potentially dangerous operations."""
self.conversation_history.append({"role": "user", "content": user_input})
# Get tool schemas
tool_schemas = self.os_tools.registry.get_tool_schemas()
# Add workflow planning tool
workflow_tool = {
"type": "function",
"function": {
"name": "plan_workflow",
"description": "Plan a multi-step workflow for complex operations",
"parameters": {
"type": "object",
"properties": {
"steps": {
"type": "array",
"description": "List of workflow steps",
"items": {
"type": "object",
"properties": {
"step_id": {"type": "string"},
"tool_name": {"type": "string"},
"parameters": {"type": "object"},
"depends_on": {"type": "array", "items": {"type": "string"}}
}
}
},
"description": {"type": "string", "description": "Description of the workflow"}
},
"required": ["steps", "description"]
}
}
}
all_tools = tool_schemas + [workflow_tool]
response = self.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "system", "content": system_prompt}] + self.conversation_history,
tools=all_tools,
tool_choice="auto"
)
response_message = response.choices[0].message
self.conversation_history.append({
"role": "assistant",
"content": response_message.content,
"tool_calls": response_message.tool_calls
})
results = []
if response_message.tool_calls:
for tool_call in response_message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
if function_name == "plan_workflow":
# Execute workflow
workflow_result = self._execute_planned_workflow(function_args)
results.append({
"tool_call_id": tool_call.id,
"function_name": function_name,
"result": workflow_result
})
else:
# Execute single tool
tool_result = self.os_tools.registry.execute_tool(function_name, function_args)
results.append({
"tool_call_id": tool_call.id,
"function_name": function_name,
"result": tool_result
})
return {
"success": True,
"response": response_message.content,
"tool_results": results
}
except Exception as e:
return {"error": f"Failed to process complex command: {str(e)}"}
def _execute_planned_workflow(self, workflow_plan: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a planned workflow"""
try:
steps = []
for step_data in workflow_plan["steps"]:
step = WorkflowStep(
step_id=step_data["step_id"],
step_type=WorkflowStepType.TOOL_CALL,
tool_name=step_data["tool_name"],
parameters=step_data["parameters"],
depends_on=step_data.get("depends_on", [])
)
steps.append(step)
return self.workflow_engine.execute_workflow(steps)
except Exception as e:
return {"success": False, "error": f"Workflow execution failed: {str(e)}"}
This advanced implementation demonstrates how tool calling can be extended to support complex workflows and multi-step operations. The WorkflowEngine class manages the execution of complex workflows that involve multiple tools, variable passing between steps, and dependency management.
The workflow system enables the LLM to plan and execute sophisticated operations that require multiple steps, such as creating a project directory structure, setting up development environments, or performing complex file management tasks. The variable resolution system allows data to flow between workflow steps, enabling more dynamic and flexible operations.
Tool calling represents a significant advancement in LLM agent capabilities, providing a structured and reliable method for implementing operating system control while maintaining safety and flexibility. This approach enables the creation of more sophisticated agents that can handle complex user requests while providing clear audit trails and error handling mechanisms.
Future Considerations and Limitations
The development of LLM agents for operating system control represents an evolving field with significant potential for advancement and several important limitations that must be acknowledged. Current implementations face challenges related to the accuracy of natural language understanding, the complexity of mapping intentions to system operations, and the inherent security risks of automated command execution.
Future developments may include improved language models with better understanding of technical contexts, more sophisticated safety mechanisms that can handle complex scenarios, and enhanced integration with operating system APIs for more comprehensive control capabilities. However, fundamental challenges remain around ensuring safety, handling ambiguous commands, and maintaining system security.
The limitations of current approaches include dependency on external language model services, potential for misinterpretation of user intentions, limited ability to handle complex multi-step operations, and challenges in maintaining consistency across different operating systems and configurations. These limitations must be carefully considered when deploying such systems in production environments.
The field continues to evolve rapidly, with ongoing research into more reliable natural language understanding, better safety mechanisms, and more efficient execution strategies. Future implementations may benefit from advances in language model technology, improved operating system APIs, and better understanding of human-computer interaction patterns in natural language interfaces.
Understanding these considerations and limitations is essential for software engineers working on similar systems, as it helps inform design decisions, set appropriate expectations, and identify areas where additional research and development efforts may be most beneficial.