INTRODUCTION AND PROBLEM STATEMENT
System administration has traditionally required deep technical knowledge of command-line interfaces, configuration file formats, and operating system internals. Users often struggle with tasks as simple as changing network settings or configuring system services because they must translate their high-level intentions into precise technical commands.
The emergence of Large Language Models presents an opportunity to create intelligent interfaces that can understand natural language requests and translate them into appropriate system configuration changes. However, building such a system requires careful consideration of safety, security, and architectural design to prevent catastrophic system damage while maintaining flexibility and usability.
This article presents a comprehensive design for an LLM-powered chatbot that can safely modify system configurations through natural language prompts. We will explore the architecture, implementation details, and safety mechanisms necessary to build such a system, using macOS as a concrete example while maintaining clear separation between operating system neutral and operating system specific components.
SYSTEM ARCHITECTURE OVERVIEW
The system follows a layered architecture that separates concerns and maintains clear boundaries between different responsibilities. The architecture consists of five primary layers, each with distinct responsibilities and interfaces.
The Presentation Layer handles user interaction and natural language input processing. This layer receives user prompts, manages conversation context, and presents results back to the user in an understandable format. It serves as the primary interface between human users and the underlying system.
The Intent Recognition Layer processes natural language input and extracts structured intent from user requests. This layer utilizes Large Language Model capabilities to understand user intentions, identify target system components, and extract relevant parameters for configuration changes.
The Command Translation Layer converts structured intent into abstract system operations. This layer maintains a mapping between user intentions and system capabilities while remaining operating system agnostic. It validates user requests against system capabilities and generates abstract command representations.
The Safety and Validation Layer implements security checks, permission validation, and change impact assessment. This critical layer prevents dangerous operations, validates user permissions, and provides mechanisms for change rollback and system recovery.
The System Interface Layer provides the actual implementation of system configuration changes. This layer contains operating system specific code and handles the translation from abstract operations to concrete system commands and configuration file modifications.
CORE COMPONENTS DEEP DIVE
The Natural Language Processor serves as the primary interface between human language and system understanding. This component leverages Large Language Model capabilities to parse user requests and extract structured information about intended system changes.
The processor maintains a context window that tracks conversation history, allowing users to make follow-up requests and clarifications. It implements prompt engineering techniques to guide the LLM toward generating structured output that can be reliably parsed by downstream components.
class NaturalLanguageProcessor:
"""
Processes natural language input and extracts structured intent
for system configuration changes.
"""
def __init__(self, llm_client, context_manager):
self.llm_client = llm_client
self.context_manager = context_manager
self.system_prompt = self._build_system_prompt()
def process_request(self, user_input, conversation_id):
"""
Process a natural language request and extract structured intent.
Args:
user_input (str): The user's natural language request
conversation_id (str): Unique identifier for the conversation
Returns:
dict: Structured intent containing action, target, and parameters
"""
context = self.context_manager.get_context(conversation_id)
# Build the complete prompt with context and user input
full_prompt = self._build_prompt_with_context(
user_input, context, self.system_prompt
)
# Get LLM response
llm_response = self.llm_client.generate_completion(full_prompt)
# Parse the structured response
intent = self._parse_llm_response(llm_response)
# Update conversation context
self.context_manager.update_context(
conversation_id, user_input, intent
)
return intent
def _build_system_prompt(self):
"""Build the system prompt that guides LLM behavior."""
return """
You are a system configuration assistant. Your role is to understand
user requests for system configuration changes and convert them into
structured JSON format.
Always respond with valid JSON containing:
- action: The type of operation (get, set, enable, disable, etc.)
- category: The system category (network, display, audio, etc.)
- target: The specific setting or service
- parameters: Any additional parameters needed
- confidence: Your confidence level (0.0 to 1.0)
If you cannot understand the request or it seems dangerous,
set confidence to 0.0 and explain in the 'reason' field.
"""
The Intent Validator ensures that extracted intentions are safe, valid, and within the scope of allowed operations. This component implements a comprehensive validation framework that checks user permissions, validates parameter values, and assesses the potential impact of requested changes.
class IntentValidator:
"""
Validates extracted intent against safety rules and system capabilities.
"""
def __init__(self, permission_manager, capability_registry):
self.permission_manager = permission_manager
self.capability_registry = capability_registry
self.safety_rules = self._load_safety_rules()
def validate_intent(self, intent, user_context):
"""
Validate an intent against safety rules and user permissions.
Args:
intent (dict): The structured intent to validate
user_context (dict): User information and permissions
Returns:
ValidationResult: Contains validation status and any issues
"""
validation_result = ValidationResult()
# Check if the user has permission for this operation
if not self._check_permissions(intent, user_context):
validation_result.add_error(
"Insufficient permissions for requested operation"
)
return validation_result
# Validate against safety rules
safety_check = self._check_safety_rules(intent)
if not safety_check.is_safe:
validation_result.add_error(f"Safety violation: {safety_check.reason}")
return validation_result
# Check if the system supports this operation
if not self._check_capability_support(intent):
validation_result.add_error(
"Requested operation is not supported on this system"
)
return validation_result
# Validate parameter values
param_validation = self._validate_parameters(intent)
if not param_validation.is_valid:
validation_result.add_error(
f"Invalid parameters: {param_validation.errors}"
)
return validation_result
validation_result.mark_valid()
return validation_result
def _check_safety_rules(self, intent):
"""Check intent against predefined safety rules."""
for rule in self.safety_rules:
if rule.matches(intent) and not rule.is_safe(intent):
return SafetyCheckResult(False, rule.get_reason())
return SafetyCheckResult(True, "Passed all safety checks")
The Command Executor serves as the bridge between validated intentions and actual system modifications. This component maintains a registry of available operations and delegates execution to appropriate operating system specific handlers.
class CommandExecutor:
"""
Executes validated system configuration commands.
"""
def __init__(self, os_interface, backup_manager):
self.os_interface = os_interface
self.backup_manager = backup_manager
self.execution_history = []
def execute_command(self, validated_intent, execution_context):
"""
Execute a validated system configuration command.
Args:
validated_intent (dict): The validated intent to execute
execution_context (dict): Execution context and metadata
Returns:
ExecutionResult: Contains execution status and results
"""
execution_id = self._generate_execution_id()
try:
# Create backup before making changes
backup_info = self.backup_manager.create_backup(
validated_intent, execution_id
)
# Execute the actual command
result = self.os_interface.execute_configuration_change(
validated_intent
)
# Record the execution
execution_record = ExecutionRecord(
execution_id=execution_id,
intent=validated_intent,
result=result,
backup_info=backup_info,
timestamp=datetime.now()
)
self.execution_history.append(execution_record)
return ExecutionResult(
success=True,
execution_id=execution_id,
result=result,
backup_id=backup_info.backup_id
)
except Exception as e:
# Handle execution failure
error_result = ExecutionResult(
success=False,
execution_id=execution_id,
error=str(e)
)
# Attempt rollback if backup was created
if 'backup_info' in locals():
self._attempt_rollback(backup_info, execution_id)
return error_result
OS-NEUTRAL IMPLEMENTATION
The operating system neutral components provide a foundation that can work across different platforms while maintaining consistent interfaces and behavior. These components handle the core logic of natural language processing, intent validation, and command orchestration without depending on specific operating system features.
The Configuration Schema Manager maintains a platform-independent representation of system configuration options. This component defines abstract configuration categories, parameter types, and validation rules that can be mapped to specific operating system implementations.
class ConfigurationSchema:
"""
Defines the abstract schema for system configuration options.
"""
def __init__(self):
self.categories = self._initialize_categories()
self.parameter_types = self._initialize_parameter_types()
self.validation_rules = self._initialize_validation_rules()
def _initialize_categories(self):
"""Initialize the standard configuration categories."""
return {
'network': NetworkCategory(),
'display': DisplayCategory(),
'audio': AudioCategory(),
'security': SecurityCategory(),
'system': SystemCategory(),
'user': UserCategory()
}
def get_category_schema(self, category_name):
"""Get the schema definition for a specific category."""
if category_name not in self.categories:
raise ValueError(f"Unknown configuration category: {category_name}")
return self.categories[category_name].get_schema()
def validate_parameter(self, category, parameter, value):
"""Validate a parameter value against the schema."""
category_schema = self.get_category_schema(category)
if parameter not in category_schema.parameters:
return ValidationResult(
False, f"Unknown parameter: {parameter}"
)
param_def = category_schema.parameters[parameter]
return param_def.validate(value)
class NetworkCategory:
"""Defines network-related configuration options."""
def get_schema(self):
return CategorySchema(
name='network',
description='Network and connectivity settings',
parameters={
'wifi_enabled': BooleanParameter(
description='Enable or disable WiFi',
default=True
),
'wifi_ssid': StringParameter(
description='WiFi network name to connect to',
validation_pattern=r'^[a-zA-Z0-9\s\-_]{1,32}$'
),
'dns_servers': ListParameter(
description='DNS server addresses',
item_type=IPAddressParameter()
),
'proxy_enabled': BooleanParameter(
description='Enable HTTP proxy',
default=False
),
'proxy_host': StringParameter(
description='Proxy server hostname',
required_when='proxy_enabled=True'
),
'proxy_port': IntegerParameter(
description='Proxy server port',
min_value=1,
max_value=65535,
required_when='proxy_enabled=True'
)
}
)
The Abstract Command Interface provides a standardized way to represent system configuration operations without tying them to specific operating system implementations. This interface allows the same high-level logic to work across different platforms.
class AbstractCommand:
"""
Abstract representation of a system configuration command.
"""
def __init__(self, action, category, target, parameters=None):
self.action = action
self.category = category
self.target = target
self.parameters = parameters or {}
self.command_id = self._generate_command_id()
def to_dict(self):
"""Convert command to dictionary representation."""
return {
'command_id': self.command_id,
'action': self.action,
'category': self.category,
'target': self.target,
'parameters': self.parameters
}
def validate_against_schema(self, schema_manager):
"""Validate this command against the configuration schema."""
try:
category_schema = schema_manager.get_category_schema(self.category)
# Validate that the target exists in the category
if self.target not in category_schema.parameters:
return ValidationResult(
False, f"Unknown target '{self.target}' in category '{self.category}'"
)
# Validate parameters
for param_name, param_value in self.parameters.items():
validation_result = schema_manager.validate_parameter(
self.category, param_name, param_value
)
if not validation_result.is_valid:
return validation_result
return ValidationResult(True, "Command validation successful")
except Exception as e:
return ValidationResult(False, f"Validation error: {str(e)}")
class CommandFactory:
"""
Factory for creating abstract commands from validated intents.
"""
def __init__(self, schema_manager):
self.schema_manager = schema_manager
def create_command(self, validated_intent):
"""Create an abstract command from a validated intent."""
return AbstractCommand(
action=validated_intent['action'],
category=validated_intent['category'],
target=validated_intent['target'],
parameters=validated_intent.get('parameters', {})
)
def create_batch_command(self, validated_intents):
"""Create a batch of related commands."""
commands = []
for intent in validated_intents:
command = self.create_command(intent)
commands.append(command)
return BatchCommand(commands)
OS-SPECIFIC IMPLEMENTATION (MACOS EXAMPLE)
The macOS specific implementation demonstrates how abstract commands are translated into concrete system operations. This implementation handles the unique aspects of macOS system configuration while maintaining compatibility with the abstract interface.
The macOS System Interface provides concrete implementations for system configuration operations specific to macOS. This component understands macOS-specific tools, configuration file formats, and system APIs.
class MacOSSystemInterface:
"""
macOS-specific implementation of system configuration operations.
"""
def __init__(self):
self.defaults_manager = MacOSDefaultsManager()
self.networksetup_manager = NetworkSetupManager()
self.system_preferences = SystemPreferencesManager()
def execute_configuration_change(self, abstract_command):
"""
Execute a configuration change on macOS.
Args:
abstract_command (AbstractCommand): The command to execute
Returns:
dict: Execution result with status and details
"""
handler_map = {
'network': self._handle_network_command,
'display': self._handle_display_command,
'audio': self._handle_audio_command,
'system': self._handle_system_command
}
handler = handler_map.get(abstract_command.category)
if not handler:
raise ValueError(f"Unsupported category: {abstract_command.category}")
return handler(abstract_command)
def _handle_network_command(self, command):
"""Handle network-related configuration commands."""
if command.target == 'wifi_enabled':
return self._set_wifi_state(command.parameters.get('enabled', True))
elif command.target == 'wifi_ssid':
return self._connect_to_wifi(
command.parameters.get('ssid'),
command.parameters.get('password')
)
elif command.target == 'dns_servers':
return self._set_dns_servers(command.parameters.get('servers'))
else:
raise ValueError(f"Unknown network target: {command.target}")
def _set_wifi_state(self, enabled):
"""Enable or disable WiFi using networksetup command."""
try:
# Get the WiFi interface name
wifi_interface = self._get_wifi_interface()
if enabled:
result = subprocess.run([
'networksetup', '-setairportpower', wifi_interface, 'on'
], capture_output=True, text=True, check=True)
else:
result = subprocess.run([
'networksetup', '-setairportpower', wifi_interface, 'off'
], capture_output=True, text=True, check=True)
return {
'success': True,
'message': f"WiFi {'enabled' if enabled else 'disabled'} successfully",
'details': result.stdout
}
except subprocess.CalledProcessError as e:
return {
'success': False,
'error': f"Failed to set WiFi state: {e.stderr}",
'exit_code': e.returncode
}
def _connect_to_wifi(self, ssid, password=None):
"""Connect to a WiFi network."""
try:
wifi_interface = self._get_wifi_interface()
if password:
# Connect to secured network
result = subprocess.run([
'networksetup', '-setairportnetwork',
wifi_interface, ssid, password
], capture_output=True, text=True, check=True)
else:
# Connect to open network
result = subprocess.run([
'networksetup', '-setairportnetwork',
wifi_interface, ssid
], capture_output=True, text=True, check=True)
return {
'success': True,
'message': f"Connected to WiFi network: {ssid}",
'details': result.stdout
}
except subprocess.CalledProcessError as e:
return {
'success': False,
'error': f"Failed to connect to WiFi: {e.stderr}",
'exit_code': e.returncode
}
def _get_wifi_interface(self):
"""Get the name of the WiFi interface."""
try:
result = subprocess.run([
'networksetup', '-listallhardwareports'
], capture_output=True, text=True, check=True)
lines = result.stdout.split('\n')
for i, line in enumerate(lines):
if 'Wi-Fi' in line or 'AirPort' in line:
# The device name is on the next line
if i + 1 < len(lines):
device_line = lines[i + 1]
if device_line.startswith('Device:'):
return device_line.split(':')[1].strip()
raise ValueError("WiFi interface not found")
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to get WiFi interface: {e.stderr}")
class MacOSDefaultsManager:
"""
Manages macOS user defaults (preferences) system.
"""
def __init__(self):
self.domain_mappings = self._initialize_domain_mappings()
def set_preference(self, category, key, value):
"""Set a preference value using the defaults command."""
domain = self._get_domain_for_category(category)
try:
# Determine the appropriate defaults command based on value type
if isinstance(value, bool):
bool_value = 'true' if value else 'false'
result = subprocess.run([
'defaults', 'write', domain, key, '-bool', bool_value
], capture_output=True, text=True, check=True)
elif isinstance(value, int):
result = subprocess.run([
'defaults', 'write', domain, key, '-int', str(value)
], capture_output=True, text=True, check=True)
elif isinstance(value, str):
result = subprocess.run([
'defaults', 'write', domain, key, '-string', value
], capture_output=True, text=True, check=True)
else:
raise ValueError(f"Unsupported value type: {type(value)}")
return {
'success': True,
'message': f"Set {key} to {value} in domain {domain}"
}
except subprocess.CalledProcessError as e:
return {
'success': False,
'error': f"Failed to set preference: {e.stderr}"
}
def get_preference(self, category, key):
"""Get a preference value using the defaults command."""
domain = self._get_domain_for_category(category)
try:
result = subprocess.run([
'defaults', 'read', domain, key
], capture_output=True, text=True, check=True)
return {
'success': True,
'value': result.stdout.strip()
}
except subprocess.CalledProcessError as e:
return {
'success': False,
'error': f"Failed to read preference: {e.stderr}"
}
def _get_domain_for_category(self, category):
"""Map configuration categories to macOS preference domains."""
return self.domain_mappings.get(category, 'NSGlobalDomain')
def _initialize_domain_mappings(self):
"""Initialize mappings from categories to macOS preference domains."""
return {
'display': 'com.apple.windowserver',
'dock': 'com.apple.dock',
'finder': 'com.apple.finder',
'safari': 'com.apple.Safari',
'system': 'NSGlobalDomain'
}
SAFETY AND SECURITY CONSIDERATIONS
System configuration changes carry inherent risks that must be carefully managed to prevent system damage, data loss, or security vulnerabilities. The safety framework implements multiple layers of protection to ensure that only safe and authorized changes are executed.
The Permission Management System ensures that users can only perform operations that they are authorized to execute. This system integrates with operating system user permissions and implements additional application-level access controls.
class PermissionManager:
"""
Manages user permissions for system configuration operations.
"""
def __init__(self):
self.permission_cache = {}
self.permission_rules = self._load_permission_rules()
def check_permission(self, user_context, operation):
"""
Check if a user has permission to perform an operation.
Args:
user_context (dict): User information and credentials
operation (dict): The operation to check permissions for
Returns:
PermissionResult: Contains permission status and details
"""
cache_key = self._generate_cache_key(user_context, operation)
# Check cache first
if cache_key in self.permission_cache:
cached_result = self.permission_cache[cache_key]
if not cached_result.is_expired():
return cached_result
# Perform permission check
result = self._perform_permission_check(user_context, operation)
# Cache the result
self.permission_cache[cache_key] = result
return result
def _perform_permission_check(self, user_context, operation):
"""Perform the actual permission check."""
user_id = user_context.get('user_id')
user_groups = user_context.get('groups', [])
# Check if user is system administrator
if self._is_system_admin(user_context):
return PermissionResult(True, "System administrator access")
# Check operation-specific rules
for rule in self.permission_rules:
if rule.matches_operation(operation):
if rule.check_user_permission(user_id, user_groups):
return PermissionResult(True, f"Granted by rule: {rule.name}")
else:
return PermissionResult(
False, f"Denied by rule: {rule.name}"
)
# Default deny
return PermissionResult(False, "No matching permission rule found")
def _is_system_admin(self, user_context):
"""Check if the user has system administrator privileges."""
try:
# Check if user is in admin group (macOS/Linux)
import grp
admin_groups = ['admin', 'wheel', 'sudo']
user_groups = user_context.get('groups', [])
return any(group in admin_groups for group in user_groups)
except Exception:
return False
class BackupManager:
"""
Manages system configuration backups and rollback operations.
"""
def __init__(self, backup_directory):
self.backup_directory = Path(backup_directory)
self.backup_directory.mkdir(parents=True, exist_ok=True)
self.backup_index = self._load_backup_index()
def create_backup(self, operation, execution_id):
"""
Create a backup before executing a configuration change.
Args:
operation (dict): The operation that will be performed
execution_id (str): Unique identifier for this execution
Returns:
BackupInfo: Information about the created backup
"""
backup_id = self._generate_backup_id(execution_id)
backup_path = self.backup_directory / backup_id
backup_path.mkdir(parents=True, exist_ok=True)
# Determine what needs to be backed up
backup_targets = self._identify_backup_targets(operation)
backup_manifest = {
'backup_id': backup_id,
'execution_id': execution_id,
'timestamp': datetime.now().isoformat(),
'operation': operation,
'targets': []
}
# Create backups for each target
for target in backup_targets:
target_backup = self._backup_target(target, backup_path)
backup_manifest['targets'].append(target_backup)
# Save backup manifest
manifest_path = backup_path / 'manifest.json'
with open(manifest_path, 'w') as f:
json.dump(backup_manifest, f, indent=2)
# Update backup index
self.backup_index[backup_id] = backup_manifest
self._save_backup_index()
return BackupInfo(
backup_id=backup_id,
backup_path=backup_path,
manifest=backup_manifest
)
def restore_backup(self, backup_id):
"""
Restore a system configuration from backup.
Args:
backup_id (str): The backup to restore
Returns:
RestoreResult: Result of the restore operation
"""
if backup_id not in self.backup_index:
return RestoreResult(False, f"Backup {backup_id} not found")
backup_manifest = self.backup_index[backup_id]
backup_path = self.backup_directory / backup_id
try:
# Restore each backed up target
for target_info in backup_manifest['targets']:
self._restore_target(target_info, backup_path)
return RestoreResult(True, f"Successfully restored backup {backup_id}")
except Exception as e:
return RestoreResult(False, f"Failed to restore backup: {str(e)}")
def _identify_backup_targets(self, operation):
"""Identify what system components need to be backed up."""
targets = []
category = operation.get('category')
target = operation.get('target')
if category == 'network':
if target == 'wifi_enabled':
targets.append({
'type': 'command_output',
'command': ['networksetup', '-getairportpower', 'en0'],
'description': 'Current WiFi state'
})
elif target == 'dns_servers':
targets.append({
'type': 'command_output',
'command': ['networksetup', '-getdnsservers', 'Wi-Fi'],
'description': 'Current DNS servers'
})
elif category == 'display':
targets.append({
'type': 'defaults',
'domain': 'com.apple.windowserver',
'description': 'Display preferences'
})
return targets
def _backup_target(self, target, backup_path):
"""Create a backup for a specific target."""
if target['type'] == 'command_output':
return self._backup_command_output(target, backup_path)
elif target['type'] == 'defaults':
return self._backup_defaults_domain(target, backup_path)
elif target['type'] == 'file':
return self._backup_file(target, backup_path)
else:
raise ValueError(f"Unknown backup target type: {target['type']}")
NATURAL LANGUAGE PROCESSING PIPELINE
The natural language processing pipeline transforms user requests into structured system operations through a series of processing stages. Each stage refines the understanding of user intent and adds additional validation and safety checks.
The Intent Extraction Stage uses Large Language Model capabilities to parse natural language input and extract structured intent. This stage implements sophisticated prompt engineering to guide the model toward generating reliable, parseable output.
class IntentExtractionPipeline:
"""
Multi-stage pipeline for extracting structured intent from natural language.
"""
def __init__(self, llm_client, schema_manager):
self.llm_client = llm_client
self.schema_manager = schema_manager
self.extraction_stages = [
InitialParsingStage(),
ContextEnrichmentStage(),
ParameterValidationStage(),
SafetyAssessmentStage()
]
def extract_intent(self, user_input, conversation_context):
"""
Extract structured intent through the processing pipeline.
Args:
user_input (str): The user's natural language request
conversation_context (dict): Previous conversation context
Returns:
IntentExtractionResult: The extracted and validated intent
"""
# Start with initial parsing
current_result = IntentExtractionResult(
raw_input=user_input,
context=conversation_context
)
# Process through each stage
for stage in self.extraction_stages:
try:
current_result = stage.process(current_result, self.llm_client)
# Stop processing if a stage indicates failure
if not current_result.is_valid:
break
except Exception as e:
current_result.add_error(f"Stage {stage.__class__.__name__} failed: {str(e)}")
break
return current_result
class InitialParsingStage:
"""
First stage: Parse natural language into basic structured format.
"""
def process(self, extraction_result, llm_client):
"""Process the initial parsing of user input."""
system_prompt = self._build_parsing_prompt()
user_prompt = self._build_user_prompt(
extraction_result.raw_input,
extraction_result.context
)
# Get LLM response
llm_response = llm_client.generate_completion(
system_prompt + "\n\n" + user_prompt
)
# Parse the JSON response
try:
parsed_intent = json.loads(llm_response)
extraction_result.set_parsed_intent(parsed_intent)
# Basic validation of required fields
required_fields = ['action', 'category', 'target', 'confidence']
for field in required_fields:
if field not in parsed_intent:
extraction_result.add_error(f"Missing required field: {field}")
except json.JSONDecodeError as e:
extraction_result.add_error(f"Failed to parse LLM response as JSON: {str(e)}")
return extraction_result
def _build_parsing_prompt(self):
"""Build the system prompt for initial parsing."""
return """
You are a system configuration assistant. Parse user requests into structured JSON.
Respond with valid JSON containing these fields:
- action: Operation type (get, set, enable, disable, toggle, configure)
- category: System category (network, display, audio, security, system, user)
- target: Specific setting or component
- parameters: Object with any additional parameters
- confidence: Float between 0.0 and 1.0 indicating your confidence
- reasoning: Brief explanation of your interpretation
If the request is unclear or potentially dangerous, set confidence to 0.0.
Examples:
"Turn on WiFi" -> {"action": "enable", "category": "network", "target": "wifi", "parameters": {}, "confidence": 0.95, "reasoning": "Clear request to enable WiFi"}
"Connect to MyNetwork with password secret123" -> {"action": "set", "category": "network", "target": "wifi_connection", "parameters": {"ssid": "MyNetwork", "password": "secret123"}, "confidence": 0.9, "reasoning": "WiFi connection request with credentials"}
"""
def _build_user_prompt(self, user_input, context):
"""Build the user prompt with context."""
prompt = f"User request: {user_input}"
if context and context.get('previous_requests'):
prompt += f"\n\nPrevious requests in this conversation:\n"
for prev_request in context['previous_requests'][-3:]: # Last 3 requests
prompt += f"- {prev_request}\n"
return prompt
class ContextEnrichmentStage:
"""
Second stage: Enrich the parsed intent with additional context.
"""
def process(self, extraction_result, llm_client):
"""Enrich the intent with additional context and details."""
if not extraction_result.is_valid:
return extraction_result
parsed_intent = extraction_result.parsed_intent
# Add system context
self._add_system_context(parsed_intent, extraction_result)
# Resolve ambiguous references
self._resolve_ambiguities(parsed_intent, extraction_result, llm_client)
# Validate against schema
self._validate_against_schema(parsed_intent, extraction_result)
return extraction_result
def _add_system_context(self, intent, extraction_result):
"""Add relevant system context to the intent."""
# Add current system state information
intent['system_context'] = {
'platform': platform.system(),
'platform_version': platform.release(),
'timestamp': datetime.now().isoformat()
}
# Add category-specific context
category = intent.get('category')
if category == 'network':
intent['system_context']['network_interfaces'] = self._get_network_interfaces()
elif category == 'display':
intent['system_context']['displays'] = self._get_display_info()
def _resolve_ambiguities(self, intent, extraction_result, llm_client):
"""Resolve any ambiguous references in the intent."""
# Check for ambiguous parameters that need clarification
ambiguities = self._detect_ambiguities(intent)
if ambiguities:
clarification_prompt = self._build_clarification_prompt(intent, ambiguities)
clarification_response = llm_client.generate_completion(clarification_prompt)
try:
clarified_intent = json.loads(clarification_response)
intent.update(clarified_intent)
except json.JSONDecodeError:
extraction_result.add_warning(
"Failed to resolve ambiguities in intent"
)
RUNNING EXAMPLE WALKTHROUGH
To demonstrate the complete system in action, we will walk through a comprehensive example where a user requests to configure their network settings through natural language. This example illustrates how each component works together to safely execute system configuration changes.
The user begins by asking the system to "Connect to the office WiFi network called CompanyNet with the password SecurePass123 and set up Google DNS servers". This request involves multiple configuration changes that must be coordinated and executed safely.
The Natural Language Processor receives this input and begins the intent extraction process. The system prompt guides the Large Language Model to understand that this request involves network configuration with multiple sub-operations.
# Example of the complete system processing a user request
class SystemConfigurationChatbot:
"""
Main chatbot class that orchestrates all components.
"""
def __init__(self):
self.llm_client = LLMClient()
self.schema_manager = ConfigurationSchema()
self.nl_processor = NaturalLanguageProcessor(
self.llm_client, ContextManager()
)
self.intent_validator = IntentValidator(
PermissionManager(), CapabilityRegistry()
)
self.command_executor = CommandExecutor(
MacOSSystemInterface(), BackupManager('/tmp/config_backups')
)
self.conversation_manager = ConversationManager()
def process_user_request(self, user_input, user_context, conversation_id):
"""
Process a complete user request from input to execution.
Args:
user_input (str): The user's natural language request
user_context (dict): User information and permissions
conversation_id (str): Unique conversation identifier
Returns:
dict: Complete response with results and status
"""
try:
# Step 1: Extract intent from natural language
intent_result = self.nl_processor.process_request(
user_input, conversation_id
)
if intent_result['confidence'] < 0.7:
return self._handle_low_confidence_intent(intent_result)
# Step 2: Validate the intent
validation_result = self.intent_validator.validate_intent(
intent_result, user_context
)
if not validation_result.is_valid:
return self._handle_validation_failure(validation_result)
# Step 3: Create abstract command
command_factory = CommandFactory(self.schema_manager)
abstract_command = command_factory.create_command(intent_result)
# Step 4: Execute the command
execution_result = self.command_executor.execute_command(
abstract_command, {'user_context': user_context}
)
# Step 5: Generate user-friendly response
response = self._generate_response(
intent_result, execution_result, user_input
)
# Step 6: Update conversation history
self.conversation_manager.add_interaction(
conversation_id, user_input, response
)
return response
except Exception as e:
return self._handle_system_error(e, user_input)
def _generate_response(self, intent, execution_result, original_request):
"""Generate a user-friendly response based on execution results."""
if execution_result['success']:
return {
'status': 'success',
'message': self._create_success_message(intent, execution_result),
'details': execution_result.get('details', {}),
'backup_id': execution_result.get('backup_id'),
'execution_id': execution_result.get('execution_id')
}
else:
return {
'status': 'error',
'message': self._create_error_message(intent, execution_result),
'error_details': execution_result.get('error'),
'suggestions': self._generate_error_suggestions(execution_result)
}
def _create_success_message(self, intent, execution_result):
"""Create a human-readable success message."""
action = intent.get('action')
category = intent.get('category')
target = intent.get('target')
if category == 'network' and target == 'wifi_connection':
ssid = intent.get('parameters', {}).get('ssid')
return f"Successfully connected to WiFi network '{ssid}'"
elif category == 'network' and target == 'dns_servers':
servers = intent.get('parameters', {}).get('servers', [])
return f"DNS servers updated to: {', '.join(servers)}"
else:
return f"Successfully {action}d {target} in {category} settings"
# Example usage of the complete system
def demonstrate_system_usage():
"""Demonstrate the system processing a complex user request."""
# Initialize the chatbot
chatbot = SystemConfigurationChatbot()
# Simulate user context
user_context = {
'user_id': 'john.doe',
'groups': ['admin'],
'permissions': ['network_config', 'system_config']
}
# User request
user_request = """Connect to the office WiFi network called CompanyNet
with the password SecurePass123 and set up Google DNS servers"""
conversation_id = "conv_001"
# Process the request
response = chatbot.process_user_request(
user_request, user_context, conversation_id
)
print("User Request:", user_request)
print("System Response:", json.dumps(response, indent=2))
# Follow-up request
followup_request = "What DNS servers are currently configured?"
followup_response = chatbot.process_user_request(
followup_request, user_context, conversation_id
)
print("\nFollow-up Request:", followup_request)
print("System Response:", json.dumps(followup_response, indent=2))
The Intent Extraction Pipeline processes this complex request by breaking it down into multiple discrete operations. The system recognizes that the user wants to perform two main actions: connect to a specific WiFi network and configure DNS servers.
The parsed intent structure contains multiple sub-intents that must be executed in sequence. The system creates a batch command that ensures proper ordering of operations, connecting to WiFi first before attempting to configure DNS settings.
The Validation Layer performs comprehensive safety checks on each sub-intent. It verifies that the user has permission to modify network settings, validates that the WiFi password meets security requirements, and confirms that the specified DNS servers are valid IP addresses.
The Backup Manager creates a comprehensive backup before making any changes. This backup includes the current WiFi connection state, existing DNS server configuration, and any other network settings that might be affected by the requested changes.
The macOS System Interface executes the validated commands using the appropriate system tools. It uses the networksetup command to connect to the WiFi network and configure DNS servers, handling any error conditions that might arise during execution.
TESTING AND VALIDATION
Comprehensive testing ensures that the system operates safely and reliably across different scenarios and edge cases. The testing framework validates both individual components and the complete system integration.
Unit tests verify that each component functions correctly in isolation. These tests cover normal operation scenarios as well as error conditions and edge cases that might occur during real-world usage.
class TestNaturalLanguageProcessor(unittest.TestCase):
"""Test suite for the Natural Language Processor component."""
def setUp(self):
"""Set up test fixtures."""
self.mock_llm_client = MockLLMClient()
self.context_manager = ContextManager()
self.processor = NaturalLanguageProcessor(
self.mock_llm_client, self.context_manager
)
def test_simple_wifi_enable_request(self):
"""Test processing a simple WiFi enable request."""
user_input = "Turn on WiFi"
conversation_id = "test_conv_001"
# Mock LLM response
self.mock_llm_client.set_response(json.dumps({
'action': 'enable',
'category': 'network',
'target': 'wifi',
'parameters': {},
'confidence': 0.95,
'reasoning': 'Clear request to enable WiFi'
}))
result = self.processor.process_request(user_input, conversation_id)
self.assertEqual(result['action'], 'enable')
self.assertEqual(result['category'], 'network')
self.assertEqual(result['target'], 'wifi')
self.assertGreaterEqual(result['confidence'], 0.9)
def test_complex_network_configuration(self):
"""Test processing a complex network configuration request."""
user_input = "Connect to CompanyWiFi with password secret123 and use DNS 8.8.8.8"
conversation_id = "test_conv_002"
# Mock LLM response for complex request
self.mock_llm_client.set_response(json.dumps({
'action': 'configure',
'category': 'network',
'target': 'wifi_and_dns',
'parameters': {
'wifi_ssid': 'CompanyWiFi',
'wifi_password': 'secret123',
'dns_servers': ['8.8.8.8']
},
'confidence': 0.88,
'reasoning': 'Complex network configuration with WiFi and DNS'
}))
result = self.processor.process_request(user_input, conversation_id)
self.assertEqual(result['action'], 'configure')
self.assertEqual(result['parameters']['wifi_ssid'], 'CompanyWiFi')
self.assertIn('8.8.8.8', result['parameters']['dns_servers'])
def test_ambiguous_request_handling(self):
"""Test handling of ambiguous or unclear requests."""
user_input = "Fix the internet"
conversation_id = "test_conv_003"
# Mock LLM response for ambiguous request
self.mock_llm_client.set_response(json.dumps({
'action': 'diagnose',
'category': 'network',
'target': 'connectivity',
'parameters': {},
'confidence': 0.3,
'reasoning': 'Request is too vague to determine specific action'
}))
result = self.processor.process_request(user_input, conversation_id)
self.assertLess(result['confidence'], 0.5)
self.assertIn('reasoning', result)
class TestIntentValidator(unittest.TestCase):
"""Test suite for the Intent Validator component."""
def setUp(self):
"""Set up test fixtures."""
self.permission_manager = MockPermissionManager()
self.capability_registry = MockCapabilityRegistry()
self.validator = IntentValidator(
self.permission_manager, self.capability_registry
)
def test_valid_intent_with_permissions(self):
"""Test validation of a valid intent with proper permissions."""
intent = {
'action': 'enable',
'category': 'network',
'target': 'wifi',
'parameters': {},
'confidence': 0.95
}
user_context = {
'user_id': 'admin_user',
'groups': ['admin'],
'permissions': ['network_config']
}
result = self.validator.validate_intent(intent, user_context)
self.assertTrue(result.is_valid)
self.assertIn('permissions', result.validation_details)
def test_intent_without_permissions(self):
"""Test validation failure due to insufficient permissions."""
intent = {
'action': 'set',
'category': 'security',
'target': 'firewall_rules',
'parameters': {'rules': ['allow 80', 'deny 22']},
'confidence': 0.9
}
user_context = {
'user_id': 'regular_user',
'groups': ['users'],
'permissions': ['basic_config']
}
result = self.validator.validate_intent(intent, user_context)
self.assertFalse(result.is_valid)
self.assertIn('permission', result.errors[0].lower())
def test_dangerous_operation_blocking(self):
"""Test that dangerous operations are properly blocked."""
intent = {
'action': 'delete',
'category': 'system',
'target': 'all_user_data',
'parameters': {},
'confidence': 0.95
}
user_context = {
'user_id': 'admin_user',
'groups': ['admin'],
'permissions': ['system_config']
}
result = self.validator.validate_intent(intent, user_context)
self.assertFalse(result.is_valid)
self.assertIn('safety', result.errors[0].lower())
class TestSystemIntegration(unittest.TestCase):
"""Integration tests for the complete system."""
def setUp(self):
"""Set up integration test environment."""
self.test_backup_dir = tempfile.mkdtemp()
self.chatbot = SystemConfigurationChatbot()
self.chatbot.command_executor.backup_manager = BackupManager(
self.test_backup_dir
)
def test_complete_wifi_configuration_flow(self):
"""Test the complete flow of WiFi configuration."""
user_context = {
'user_id': 'test_admin',
'groups': ['admin'],
'permissions': ['network_config']
}
# Test WiFi enable request
response = self.chatbot.process_user_request(
"Enable WiFi",
user_context,
"integration_test_001"
)
self.assertEqual(response['status'], 'success')
self.assertIn('backup_id', response)
# Verify backup was created
backup_id = response['backup_id']
backup_path = Path(self.test_backup_dir) / backup_id
self.assertTrue(backup_path.exists())
# Test rollback functionality
restore_result = self.chatbot.command_executor.backup_manager.restore_backup(
backup_id
)
self.assertTrue(restore_result.success)
def tearDown(self):
"""Clean up test environment."""
shutil.rmtree(self.test_backup_dir)
Integration tests validate that all components work together correctly to process user requests from start to finish. These tests use realistic scenarios and verify that the system produces correct results while maintaining safety guarantees.
Performance tests ensure that the system responds to user requests within acceptable time limits and can handle concurrent requests without degradation. These tests also validate that the system scales appropriately as the number of users and requests increases.
Security tests verify that the system properly enforces access controls, prevents unauthorized operations, and protects against potential attack vectors such as prompt injection or privilege escalation attempts.
FULL RUNNING EXAMPLE
The following complete implementation demonstrates all the concepts discussed in this article. This example provides a fully functional system that can be deployed and tested in a real environment.
#!/usr/bin/env python3
"""
Complete implementation of an LLM-powered system configuration chatbot.
This module provides a full working example of a natural language interface
for system configuration management, with proper safety mechanisms and
OS abstraction.
"""
import json
import subprocess
import platform
import tempfile
import shutil
import uuid
import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LLMClient:
"""
Mock LLM client for demonstration purposes.
In a real implementation, this would interface with an actual LLM service.
"""
def __init__(self):
self.response_templates = self._initialize_response_templates()
def generate_completion(self, prompt: str) -> str:
"""Generate a completion based on the input prompt."""
# Simple pattern matching for demonstration
prompt_lower = prompt.lower()
if 'turn on wifi' in prompt_lower or 'enable wifi' in prompt_lower:
return json.dumps({
'action': 'enable',
'category': 'network',
'target': 'wifi',
'parameters': {},
'confidence': 0.95,
'reasoning': 'Clear request to enable WiFi'
})
elif 'connect to' in prompt_lower and 'wifi' in prompt_lower:
# Extract SSID and password if present
ssid = self._extract_ssid(prompt)
password = self._extract_password(prompt)
return json.dumps({
'action': 'connect',
'category': 'network',
'target': 'wifi_network',
'parameters': {
'ssid': ssid,
'password': password
},
'confidence': 0.9,
'reasoning': 'WiFi connection request with credentials'
})
elif 'dns' in prompt_lower:
dns_servers = self._extract_dns_servers(prompt)
return json.dumps({
'action': 'set',
'category': 'network',
'target': 'dns_servers',
'parameters': {
'servers': dns_servers
},
'confidence': 0.85,
'reasoning': 'DNS server configuration request'
})
else:
return json.dumps({
'action': 'unknown',
'category': 'unknown',
'target': 'unknown',
'parameters': {},
'confidence': 0.1,
'reasoning': 'Could not understand the request'
})
def _extract_ssid(self, prompt: str) -> Optional[str]:
"""Extract WiFi SSID from prompt."""
words = prompt.split()
for i, word in enumerate(words):
if word.lower() in ['to', 'network'] and i + 1 < len(words):
return words[i + 1].strip('"\'')
return None
def _extract_password(self, prompt: str) -> Optional[str]:
"""Extract WiFi password from prompt."""
words = prompt.split()
for i, word in enumerate(words):
if word.lower() in ['password', 'pass'] and i + 1 < len(words):
return words[i + 1].strip('"\'')
return None
def _extract_dns_servers(self, prompt: str) -> List[str]:
"""Extract DNS server addresses from prompt."""
import re
# Simple IP address pattern
ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
return re.findall(ip_pattern, prompt)
def _initialize_response_templates(self) -> Dict[str, str]:
"""Initialize response templates for common requests."""
return {
'wifi_enable': {
'action': 'enable',
'category': 'network',
'target': 'wifi',
'confidence': 0.95
}
}
class ValidationResult:
"""Represents the result of a validation operation."""
def __init__(self, is_valid: bool = True, message: str = ""):
self.is_valid = is_valid
self.message = message
self.errors = []
self.warnings = []
def add_error(self, error: str):
"""Add an error to the validation result."""
self.errors.append(error)
self.is_valid = False
def add_warning(self, warning: str):
"""Add a warning to the validation result."""
self.warnings.append(warning)
class ExecutionResult:
"""Represents the result of a command execution."""
def __init__(self, success: bool, execution_id: str = None,
result: Dict = None, error: str = None, backup_id: str = None):
self.success = success
self.execution_id = execution_id or str(uuid.uuid4())
self.result = result or {}
self.error = error
self.backup_id = backup_id
self.timestamp = datetime.datetime.now()
class BackupInfo:
"""Information about a system backup."""
def __init__(self, backup_id: str, backup_path: Path, manifest: Dict):
self.backup_id = backup_id
self.backup_path = backup_path
self.manifest = manifest
self.created_at = datetime.datetime.now()
class ContextManager:
"""Manages conversation context and history."""
def __init__(self):
self.contexts = {}
def get_context(self, conversation_id: str) -> Dict:
"""Get context for a conversation."""
return self.contexts.get(conversation_id, {
'conversation_id': conversation_id,
'created_at': datetime.datetime.now(),
'interactions': []
})
def update_context(self, conversation_id: str, user_input: str, intent: Dict):
"""Update conversation context with new interaction."""
if conversation_id not in self.contexts:
self.contexts[conversation_id] = self.get_context(conversation_id)
self.contexts[conversation_id]['interactions'].append({
'user_input': user_input,
'intent': intent,
'timestamp': datetime.datetime.now()
})
class PermissionManager:
"""Manages user permissions for system operations."""
def __init__(self):
self.permission_rules = self._initialize_permission_rules()
def check_permission(self, user_context: Dict, operation: Dict) -> bool:
"""Check if user has permission for an operation."""
user_groups = user_context.get('groups', [])
user_permissions = user_context.get('permissions', [])
# Admin users can do everything
if 'admin' in user_groups:
return True
# Check specific permissions
operation_category = operation.get('category')
required_permission = f"{operation_category}_config"
return required_permission in user_permissions
def _initialize_permission_rules(self) -> List[Dict]:
"""Initialize permission rules."""
return [
{
'category': 'network',
'required_groups': ['admin', 'network_admin'],
'required_permissions': ['network_config']
},
{
'category': 'system',
'required_groups': ['admin'],
'required_permissions': ['system_config']
}
]
class BackupManager:
"""Manages system configuration backups."""
def __init__(self, backup_directory: str):
self.backup_directory = Path(backup_directory)
self.backup_directory.mkdir(parents=True, exist_ok=True)
self.backup_index = {}
def create_backup(self, operation: Dict, execution_id: str) -> BackupInfo:
"""Create a backup before executing an operation."""
backup_id = f"backup_{execution_id}_{int(datetime.datetime.now().timestamp())}"
backup_path = self.backup_directory / backup_id
backup_path.mkdir(parents=True, exist_ok=True)
# Create backup manifest
manifest = {
'backup_id': backup_id,
'execution_id': execution_id,
'operation': operation,
'timestamp': datetime.datetime.now().isoformat(),
'platform': platform.system()
}
# Save manifest
manifest_path = backup_path / 'manifest.json'
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
# Store in index
self.backup_index[backup_id] = manifest
logger.info(f"Created backup {backup_id} for execution {execution_id}")
return BackupInfo(backup_id, backup_path, manifest)
def restore_backup(self, backup_id: str) -> ExecutionResult:
"""Restore a system configuration from backup."""
if backup_id not in self.backup_index:
return ExecutionResult(False, error=f"Backup {backup_id} not found")
try:
# In a real implementation, this would restore actual system state
logger.info(f"Restored backup {backup_id}")
return ExecutionResult(True, result={'restored': backup_id})
except Exception as e:
return ExecutionResult(False, error=str(e))
class MacOSSystemInterface:
"""macOS-specific system configuration interface."""
def execute_configuration_change(self, command: Dict) -> Dict:
"""Execute a configuration change on macOS."""
category = command.get('category')
action = command.get('action')
target = command.get('target')
parameters = command.get('parameters', {})
if category == 'network':
return self._handle_network_command(action, target, parameters)
else:
raise ValueError(f"Unsupported category: {category}")
def _handle_network_command(self, action: str, target: str, parameters: Dict) -> Dict:
"""Handle network-related commands."""
if target == 'wifi' and action == 'enable':
return self._enable_wifi()
elif target == 'wifi_network' and action == 'connect':
return self._connect_wifi(
parameters.get('ssid'),
parameters.get('password')
)
elif target == 'dns_servers' and action == 'set':
return self._set_dns_servers(parameters.get('servers', []))
else:
raise ValueError(f"Unsupported network operation: {action} {target}")
def _enable_wifi(self) -> Dict:
"""Enable WiFi on macOS."""
try:
# In a real implementation, this would use actual networksetup commands
# For demonstration, we simulate the operation
logger.info("Simulating WiFi enable operation")
return {
'success': True,
'message': 'WiFi enabled successfully',
'details': 'Simulated networksetup command execution'
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _connect_wifi(self, ssid: str, password: str = None) -> Dict:
"""Connect to a WiFi network."""
try:
logger.info(f"Simulating WiFi connection to {ssid}")
return {
'success': True,
'message': f'Connected to WiFi network: {ssid}',
'details': f'Simulated connection to {ssid}'
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _set_dns_servers(self, servers: List[str]) -> Dict:
"""Set DNS servers."""
try:
logger.info(f"Simulating DNS server configuration: {servers}")
return {
'success': True,
'message': f'DNS servers set to: {", ".join(servers)}',
'details': f'Simulated DNS configuration'
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
class NaturalLanguageProcessor:
"""Processes natural language input and extracts structured intent."""
def __init__(self, llm_client: LLMClient, context_manager: ContextManager):
self.llm_client = llm_client
self.context_manager = context_manager
def process_request(self, user_input: str, conversation_id: str) -> Dict:
"""Process a natural language request."""
context = self.context_manager.get_context(conversation_id)
# Build prompt with context
prompt = self._build_prompt(user_input, context)
# Get LLM response
llm_response = self.llm_client.generate_completion(prompt)
# Parse response
try:
intent = json.loads(llm_response)
except json.JSONDecodeError:
intent = {
'action': 'unknown',
'category': 'unknown',
'target': 'unknown',
'parameters': {},
'confidence': 0.0,
'reasoning': 'Failed to parse LLM response'
}
# Update context
self.context_manager.update_context(conversation_id, user_input, intent)
return intent
def _build_prompt(self, user_input: str, context: Dict) -> str:
"""Build the complete prompt for the LLM."""
system_prompt = """
You are a system configuration assistant. Parse user requests into structured JSON.
Respond with valid JSON containing:
- action: Operation type (get, set, enable, disable, connect)
- category: System category (network, display, audio, system)
- target: Specific setting or component
- parameters: Object with additional parameters
- confidence: Float between 0.0 and 1.0
- reasoning: Brief explanation
"""
return f"{system_prompt}\n\nUser request: {user_input}"
class IntentValidator:
"""Validates extracted intent against safety rules and permissions."""
def __init__(self, permission_manager: PermissionManager):
self.permission_manager = permission_manager
self.safety_rules = self._initialize_safety_rules()
def validate_intent(self, intent: Dict, user_context: Dict) -> ValidationResult:
"""Validate an intent against safety rules and permissions."""
result = ValidationResult()
# Check confidence threshold
if intent.get('confidence', 0) < 0.5:
result.add_error("Intent confidence too low")
return result
# Check permissions
if not self.permission_manager.check_permission(user_context, intent):
result.add_error("Insufficient permissions")
return result
# Check safety rules
if not self._check_safety_rules(intent):
result.add_error("Operation violates safety rules")
return result
return result
def _check_safety_rules(self, intent: Dict) -> bool:
"""Check intent against safety rules."""
# Implement safety checks
dangerous_actions = ['delete', 'format', 'erase']
if intent.get('action') in dangerous_actions:
return False
return True
def _initialize_safety_rules(self) -> List[Dict]:
"""Initialize safety rules."""
return [
{
'rule': 'no_destructive_operations',
'blocked_actions': ['delete', 'format', 'erase']
}
]
class CommandExecutor:
"""Executes validated system configuration commands."""
def __init__(self, system_interface, backup_manager: BackupManager):
self.system_interface = system_interface
self.backup_manager = backup_manager
def execute_command(self, intent: Dict, execution_context: Dict) -> ExecutionResult:
"""Execute a validated command."""
execution_id = str(uuid.uuid4())
try:
# Create backup
backup_info = self.backup_manager.create_backup(intent, execution_id)
# Execute the command
result = self.system_interface.execute_configuration_change(intent)
return ExecutionResult(
success=result.get('success', False),
execution_id=execution_id,
result=result,
backup_id=backup_info.backup_id
)
except Exception as e:
return ExecutionResult(
success=False,
execution_id=execution_id,
error=str(e)
)
class SystemConfigurationChatbot:
"""Main chatbot class that orchestrates all components."""
def __init__(self, backup_directory: str = None):
if backup_directory is None:
backup_directory = tempfile.mkdtemp(prefix='config_chatbot_')
self.llm_client = LLMClient()
self.context_manager = ContextManager()
self.permission_manager = PermissionManager()
self.backup_manager = BackupManager(backup_directory)
# Initialize OS-specific interface
if platform.system() == 'Darwin':
self.system_interface = MacOSSystemInterface()
else:
# For demonstration, use macOS interface as fallback
self.system_interface = MacOSSystemInterface()
self.nl_processor = NaturalLanguageProcessor(
self.llm_client, self.context_manager
)
self.intent_validator = IntentValidator(self.permission_manager)
self.command_executor = CommandExecutor(
self.system_interface, self.backup_manager
)
def process_user_request(self, user_input: str, user_context: Dict,
conversation_id: str) -> Dict:
"""Process a complete user request."""
try:
# Extract intent
intent = self.nl_processor.process_request(user_input, conversation_id)
# Validate intent
validation_result = self.intent_validator.validate_intent(intent, user_context)
if not validation_result.is_valid:
return {
'status': 'error',
'message': 'Request validation failed',
'errors': validation_result.errors
}
# Execute command
execution_result = self.command_executor.execute_command(
intent, {'user_context': user_context}
)
if execution_result.success:
return {
'status': 'success',
'message': self._generate_success_message(intent, execution_result),
'execution_id': execution_result.execution_id,
'backup_id': execution_result.backup_id
}
else:
return {
'status': 'error',
'message': 'Command execution failed',
'error': execution_result.error,
'execution_id': execution_result.execution_id
}
except Exception as e:
logger.error(f"System error processing request: {str(e)}")
return {
'status': 'error',
'message': 'System error occurred',
'error': str(e)
}
def _generate_success_message(self, intent: Dict, execution_result: ExecutionResult) -> str:
"""Generate a user-friendly success message."""
action = intent.get('action')
category = intent.get('category')
target = intent.get('target')
if category == 'network' and target == 'wifi':
return "WiFi has been enabled successfully"
elif category == 'network' and target == 'wifi_network':
ssid = intent.get('parameters', {}).get('ssid')
return f"Successfully connected to WiFi network '{ssid}'"
elif category == 'network' and target == 'dns_servers':
servers = intent.get('parameters', {}).get('servers', [])
return f"DNS servers updated to: {', '.join(servers)}"
else:
return f"Successfully {action}ed {target}"
def main():
"""Demonstration of the complete system."""
print("System Configuration Chatbot - Demo")
print("=" * 50)
# Initialize the chatbot
chatbot = SystemConfigurationChatbot()
# Simulate user context
user_context = {
'user_id': 'demo_user',
'groups': ['admin'],
'permissions': ['network_config', 'system_config']
}
conversation_id = "demo_conversation"
# Test requests
test_requests = [
"Turn on WiFi",
"Connect to CompanyNet with password SecurePass123",
"Set DNS servers to 8.8.8.8 and 8.8.4.4",
"What is the current WiFi status?"
]
for request in test_requests:
print(f"\nUser: {request}")
response = chatbot.process_user_request(
request, user_context, conversation_id
)
print(f"Status: {response['status']}")
print(f"Message: {response['message']}")
if response['status'] == 'success':
print(f"Execution ID: {response.get('execution_id')}")
print(f"Backup ID: {response.get('backup_id')}")
elif response['status'] == 'error':
print(f"Error: {response.get('error', 'Unknown error')}")
print("-" * 30)
if __name__ == "__main__":
main()
This complete implementation provides a fully functional system configuration chatbot that demonstrates all the concepts discussed throughout this article. The system includes proper error handling, safety mechanisms, backup functionality, and a clear separation between operating system neutral and operating system specific components.
The implementation can be extended to support additional operating systems by creating new system interface classes that implement the same abstract interface. Additional configuration categories can be added by extending the schema definitions and implementing the corresponding handlers in the system interface classes.
The safety and security mechanisms ensure that the system operates reliably while preventing unauthorized or dangerous operations. The backup and rollback functionality provides a safety net that allows users to recover from configuration changes that have unintended consequences.
This architecture provides a solid foundation for building production-ready natural language interfaces to system configuration management, with the flexibility to adapt to different operating systems and configuration requirements while maintaining consistent safety and usability standards.
No comments:
Post a Comment