Thursday, January 08, 2026

NATURAL LANGUAGE SYSTEM CONFIGURATION CHATBOT



INTRODUCTION AND PROBLEM STATEMENT


System administration has traditionally required deep technical knowledge of command-line interfaces, configuration file formats, and operating system internals. Users often struggle with tasks as simple as changing network settings or configuring system services because they must translate their high-level intentions into precise technical commands.


The emergence of Large Language Models presents an opportunity to create intelligent interfaces that can understand natural language requests and translate them into appropriate system configuration changes. However, building such a system requires careful consideration of safety, security, and architectural design to prevent catastrophic system damage while maintaining flexibility and usability.


This article presents a comprehensive design for an LLM-powered chatbot that can safely modify system configurations through natural language prompts. We will explore the architecture, implementation details, and safety mechanisms necessary to build such a system, using macOS as a concrete example while maintaining clear separation between operating system neutral and operating system specific components.


SYSTEM ARCHITECTURE OVERVIEW


The system follows a layered architecture that separates concerns and maintains clear boundaries between different responsibilities. The architecture consists of five primary layers, each with distinct responsibilities and interfaces.


The Presentation Layer handles user interaction and natural language input processing. This layer receives user prompts, manages conversation context, and presents results back to the user in an understandable format. It serves as the primary interface between human users and the underlying system.


The Intent Recognition Layer processes natural language input and extracts structured intent from user requests. This layer utilizes Large Language Model capabilities to understand user intentions, identify target system components, and extract relevant parameters for configuration changes.


The Command Translation Layer converts structured intent into abstract system operations. This layer maintains a mapping between user intentions and system capabilities while remaining operating system agnostic. It validates user requests against system capabilities and generates abstract command representations.


The Safety and Validation Layer implements security checks, permission validation, and change impact assessment. This critical layer prevents dangerous operations, validates user permissions, and provides mechanisms for change rollback and system recovery.


The System Interface Layer provides the actual implementation of system configuration changes. This layer contains operating system specific code and handles the translation from abstract operations to concrete system commands and configuration file modifications.


CORE COMPONENTS DEEP DIVE


The Natural Language Processor serves as the primary interface between human language and system understanding. This component leverages Large Language Model capabilities to parse user requests and extract structured information about intended system changes.


The processor maintains a context window that tracks conversation history, allowing users to make follow-up requests and clarifications. It implements prompt engineering techniques to guide the LLM toward generating structured output that can be reliably parsed by downstream components.


class NaturalLanguageProcessor:

    """

    Processes natural language input and extracts structured intent

    for system configuration changes.

    """

    

    def __init__(self, llm_client, context_manager):

        self.llm_client = llm_client

        self.context_manager = context_manager

        self.system_prompt = self._build_system_prompt()

    

    def process_request(self, user_input, conversation_id):

        """

        Process a natural language request and extract structured intent.

        

        Args:

            user_input (str): The user's natural language request

            conversation_id (str): Unique identifier for the conversation

            

        Returns:

            dict: Structured intent containing action, target, and parameters

        """

        context = self.context_manager.get_context(conversation_id)

        

        # Build the complete prompt with context and user input

        full_prompt = self._build_prompt_with_context(

            user_input, context, self.system_prompt

        )

        

        # Get LLM response

        llm_response = self.llm_client.generate_completion(full_prompt)

        

        # Parse the structured response

        intent = self._parse_llm_response(llm_response)

        

        # Update conversation context

        self.context_manager.update_context(

            conversation_id, user_input, intent

        )

        

        return intent

    

    def _build_system_prompt(self):

        """Build the system prompt that guides LLM behavior."""

        return """

        You are a system configuration assistant. Your role is to understand

        user requests for system configuration changes and convert them into

        structured JSON format.

        

        Always respond with valid JSON containing:

        - action: The type of operation (get, set, enable, disable, etc.)

        - category: The system category (network, display, audio, etc.)

        - target: The specific setting or service

        - parameters: Any additional parameters needed

        - confidence: Your confidence level (0.0 to 1.0)

        

        If you cannot understand the request or it seems dangerous,

        set confidence to 0.0 and explain in the 'reason' field.

        """


The Intent Validator ensures that extracted intentions are safe, valid, and within the scope of allowed operations. This component implements a comprehensive validation framework that checks user permissions, validates parameter values, and assesses the potential impact of requested changes.


class IntentValidator:

    """

    Validates extracted intent against safety rules and system capabilities.

    """

    

    def __init__(self, permission_manager, capability_registry):

        self.permission_manager = permission_manager

        self.capability_registry = capability_registry

        self.safety_rules = self._load_safety_rules()

    

    def validate_intent(self, intent, user_context):

        """

        Validate an intent against safety rules and user permissions.

        

        Args:

            intent (dict): The structured intent to validate

            user_context (dict): User information and permissions

            

        Returns:

            ValidationResult: Contains validation status and any issues

        """

        validation_result = ValidationResult()

        

        # Check if the user has permission for this operation

        if not self._check_permissions(intent, user_context):

            validation_result.add_error(

                "Insufficient permissions for requested operation"

            )

            return validation_result

        

        # Validate against safety rules

        safety_check = self._check_safety_rules(intent)

        if not safety_check.is_safe:

            validation_result.add_error(f"Safety violation: {safety_check.reason}")

            return validation_result

        

        # Check if the system supports this operation

        if not self._check_capability_support(intent):

            validation_result.add_error(

                "Requested operation is not supported on this system"

            )

            return validation_result

        

        # Validate parameter values

        param_validation = self._validate_parameters(intent)

        if not param_validation.is_valid:

            validation_result.add_error(

                f"Invalid parameters: {param_validation.errors}"

            )

            return validation_result

        

        validation_result.mark_valid()

        return validation_result

    

    def _check_safety_rules(self, intent):

        """Check intent against predefined safety rules."""

        for rule in self.safety_rules:

            if rule.matches(intent) and not rule.is_safe(intent):

                return SafetyCheckResult(False, rule.get_reason())

        

        return SafetyCheckResult(True, "Passed all safety checks")


The Command Executor serves as the bridge between validated intentions and actual system modifications. This component maintains a registry of available operations and delegates execution to appropriate operating system specific handlers.


class CommandExecutor:

    """

    Executes validated system configuration commands.

    """

    

    def __init__(self, os_interface, backup_manager):

        self.os_interface = os_interface

        self.backup_manager = backup_manager

        self.execution_history = []

    

    def execute_command(self, validated_intent, execution_context):

        """

        Execute a validated system configuration command.

        

        Args:

            validated_intent (dict): The validated intent to execute

            execution_context (dict): Execution context and metadata

            

        Returns:

            ExecutionResult: Contains execution status and results

        """

        execution_id = self._generate_execution_id()

        

        try:

            # Create backup before making changes

            backup_info = self.backup_manager.create_backup(

                validated_intent, execution_id

            )

            

            # Execute the actual command

            result = self.os_interface.execute_configuration_change(

                validated_intent

            )

            

            # Record the execution

            execution_record = ExecutionRecord(

                execution_id=execution_id,

                intent=validated_intent,

                result=result,

                backup_info=backup_info,

                timestamp=datetime.now()

            )

            

            self.execution_history.append(execution_record)

            

            return ExecutionResult(

                success=True,

                execution_id=execution_id,

                result=result,

                backup_id=backup_info.backup_id

            )

            

        except Exception as e:

            # Handle execution failure

            error_result = ExecutionResult(

                success=False,

                execution_id=execution_id,

                error=str(e)

            )

            

            # Attempt rollback if backup was created

            if 'backup_info' in locals():

                self._attempt_rollback(backup_info, execution_id)

            

            return error_result


OS-NEUTRAL IMPLEMENTATION


The operating system neutral components provide a foundation that can work across different platforms while maintaining consistent interfaces and behavior. These components handle the core logic of natural language processing, intent validation, and command orchestration without depending on specific operating system features.


The Configuration Schema Manager maintains a platform-independent representation of system configuration options. This component defines abstract configuration categories, parameter types, and validation rules that can be mapped to specific operating system implementations.


class ConfigurationSchema:

    """

    Defines the abstract schema for system configuration options.

    """

    

    def __init__(self):

        self.categories = self._initialize_categories()

        self.parameter_types = self._initialize_parameter_types()

        self.validation_rules = self._initialize_validation_rules()

    

    def _initialize_categories(self):

        """Initialize the standard configuration categories."""

        return {

            'network': NetworkCategory(),

            'display': DisplayCategory(),

            'audio': AudioCategory(),

            'security': SecurityCategory(),

            'system': SystemCategory(),

            'user': UserCategory()

        }

    

    def get_category_schema(self, category_name):

        """Get the schema definition for a specific category."""

        if category_name not in self.categories:

            raise ValueError(f"Unknown configuration category: {category_name}")

        

        return self.categories[category_name].get_schema()

    

    def validate_parameter(self, category, parameter, value):

        """Validate a parameter value against the schema."""

        category_schema = self.get_category_schema(category)

        

        if parameter not in category_schema.parameters:

            return ValidationResult(

                False, f"Unknown parameter: {parameter}"

            )

        

        param_def = category_schema.parameters[parameter]

        return param_def.validate(value)


class NetworkCategory:

    """Defines network-related configuration options."""

    

    def get_schema(self):

        return CategorySchema(

            name='network',

            description='Network and connectivity settings',

            parameters={

                'wifi_enabled': BooleanParameter(

                    description='Enable or disable WiFi',

                    default=True

                ),

                'wifi_ssid': StringParameter(

                    description='WiFi network name to connect to',

                    validation_pattern=r'^[a-zA-Z0-9\s\-_]{1,32}$'

                ),

                'dns_servers': ListParameter(

                    description='DNS server addresses',

                    item_type=IPAddressParameter()

                ),

                'proxy_enabled': BooleanParameter(

                    description='Enable HTTP proxy',

                    default=False

                ),

                'proxy_host': StringParameter(

                    description='Proxy server hostname',

                    required_when='proxy_enabled=True'

                ),

                'proxy_port': IntegerParameter(

                    description='Proxy server port',

                    min_value=1,

                    max_value=65535,

                    required_when='proxy_enabled=True'

                )

            }

        )


The Abstract Command Interface provides a standardized way to represent system configuration operations without tying them to specific operating system implementations. This interface allows the same high-level logic to work across different platforms.


class AbstractCommand:

    """

    Abstract representation of a system configuration command.

    """

    

    def __init__(self, action, category, target, parameters=None):

        self.action = action

        self.category = category

        self.target = target

        self.parameters = parameters or {}

        self.command_id = self._generate_command_id()

    

    def to_dict(self):

        """Convert command to dictionary representation."""

        return {

            'command_id': self.command_id,

            'action': self.action,

            'category': self.category,

            'target': self.target,

            'parameters': self.parameters

        }

    

    def validate_against_schema(self, schema_manager):

        """Validate this command against the configuration schema."""

        try:

            category_schema = schema_manager.get_category_schema(self.category)

            

            # Validate that the target exists in the category

            if self.target not in category_schema.parameters:

                return ValidationResult(

                    False, f"Unknown target '{self.target}' in category '{self.category}'"

                )

            

            # Validate parameters

            for param_name, param_value in self.parameters.items():

                validation_result = schema_manager.validate_parameter(

                    self.category, param_name, param_value

                )

                if not validation_result.is_valid:

                    return validation_result

            

            return ValidationResult(True, "Command validation successful")

            

        except Exception as e:

            return ValidationResult(False, f"Validation error: {str(e)}")


class CommandFactory:

    """

    Factory for creating abstract commands from validated intents.

    """

    

    def __init__(self, schema_manager):

        self.schema_manager = schema_manager

    

    def create_command(self, validated_intent):

        """Create an abstract command from a validated intent."""

        return AbstractCommand(

            action=validated_intent['action'],

            category=validated_intent['category'],

            target=validated_intent['target'],

            parameters=validated_intent.get('parameters', {})

        )

    

    def create_batch_command(self, validated_intents):

        """Create a batch of related commands."""

        commands = []

        for intent in validated_intents:

            command = self.create_command(intent)

            commands.append(command)

        

        return BatchCommand(commands)


OS-SPECIFIC IMPLEMENTATION (MACOS EXAMPLE)


The macOS specific implementation demonstrates how abstract commands are translated into concrete system operations. This implementation handles the unique aspects of macOS system configuration while maintaining compatibility with the abstract interface.


The macOS System Interface provides concrete implementations for system configuration operations specific to macOS. This component understands macOS-specific tools, configuration file formats, and system APIs.


class MacOSSystemInterface:

    """

    macOS-specific implementation of system configuration operations.

    """

    

    def __init__(self):

        self.defaults_manager = MacOSDefaultsManager()

        self.networksetup_manager = NetworkSetupManager()

        self.system_preferences = SystemPreferencesManager()

    

    def execute_configuration_change(self, abstract_command):

        """

        Execute a configuration change on macOS.

        

        Args:

            abstract_command (AbstractCommand): The command to execute

            

        Returns:

            dict: Execution result with status and details

        """

        handler_map = {

            'network': self._handle_network_command,

            'display': self._handle_display_command,

            'audio': self._handle_audio_command,

            'system': self._handle_system_command

        }

        

        handler = handler_map.get(abstract_command.category)

        if not handler:

            raise ValueError(f"Unsupported category: {abstract_command.category}")

        

        return handler(abstract_command)

    

    def _handle_network_command(self, command):

        """Handle network-related configuration commands."""

        if command.target == 'wifi_enabled':

            return self._set_wifi_state(command.parameters.get('enabled', True))

        elif command.target == 'wifi_ssid':

            return self._connect_to_wifi(

                command.parameters.get('ssid'),

                command.parameters.get('password')

            )

        elif command.target == 'dns_servers':

            return self._set_dns_servers(command.parameters.get('servers'))

        else:

            raise ValueError(f"Unknown network target: {command.target}")

    

    def _set_wifi_state(self, enabled):

        """Enable or disable WiFi using networksetup command."""

        try:

            # Get the WiFi interface name

            wifi_interface = self._get_wifi_interface()

            

            if enabled:

                result = subprocess.run([

                    'networksetup', '-setairportpower', wifi_interface, 'on'

                ], capture_output=True, text=True, check=True)

            else:

                result = subprocess.run([

                    'networksetup', '-setairportpower', wifi_interface, 'off'

                ], capture_output=True, text=True, check=True)

            

            return {

                'success': True,

                'message': f"WiFi {'enabled' if enabled else 'disabled'} successfully",

                'details': result.stdout

            }

            

        except subprocess.CalledProcessError as e:

            return {

                'success': False,

                'error': f"Failed to set WiFi state: {e.stderr}",

                'exit_code': e.returncode

            }

    

    def _connect_to_wifi(self, ssid, password=None):

        """Connect to a WiFi network."""

        try:

            wifi_interface = self._get_wifi_interface()

            

            if password:

                # Connect to secured network

                result = subprocess.run([

                    'networksetup', '-setairportnetwork', 

                    wifi_interface, ssid, password

                ], capture_output=True, text=True, check=True)

            else:

                # Connect to open network

                result = subprocess.run([

                    'networksetup', '-setairportnetwork', 

                    wifi_interface, ssid

                ], capture_output=True, text=True, check=True)

            

            return {

                'success': True,

                'message': f"Connected to WiFi network: {ssid}",

                'details': result.stdout

            }

            

        except subprocess.CalledProcessError as e:

            return {

                'success': False,

                'error': f"Failed to connect to WiFi: {e.stderr}",

                'exit_code': e.returncode

            }

    

    def _get_wifi_interface(self):

        """Get the name of the WiFi interface."""

        try:

            result = subprocess.run([

                'networksetup', '-listallhardwareports'

            ], capture_output=True, text=True, check=True)

            

            lines = result.stdout.split('\n')

            for i, line in enumerate(lines):

                if 'Wi-Fi' in line or 'AirPort' in line:

                    # The device name is on the next line

                    if i + 1 < len(lines):

                        device_line = lines[i + 1]

                        if device_line.startswith('Device:'):

                            return device_line.split(':')[1].strip()

            

            raise ValueError("WiFi interface not found")

            

        except subprocess.CalledProcessError as e:

            raise RuntimeError(f"Failed to get WiFi interface: {e.stderr}")


class MacOSDefaultsManager:

    """

    Manages macOS user defaults (preferences) system.

    """

    

    def __init__(self):

        self.domain_mappings = self._initialize_domain_mappings()

    

    def set_preference(self, category, key, value):

        """Set a preference value using the defaults command."""

        domain = self._get_domain_for_category(category)

        

        try:

            # Determine the appropriate defaults command based on value type

            if isinstance(value, bool):

                bool_value = 'true' if value else 'false'

                result = subprocess.run([

                    'defaults', 'write', domain, key, '-bool', bool_value

                ], capture_output=True, text=True, check=True)

            elif isinstance(value, int):

                result = subprocess.run([

                    'defaults', 'write', domain, key, '-int', str(value)

                ], capture_output=True, text=True, check=True)

            elif isinstance(value, str):

                result = subprocess.run([

                    'defaults', 'write', domain, key, '-string', value

                ], capture_output=True, text=True, check=True)

            else:

                raise ValueError(f"Unsupported value type: {type(value)}")

            

            return {

                'success': True,

                'message': f"Set {key} to {value} in domain {domain}"

            }

            

        except subprocess.CalledProcessError as e:

            return {

                'success': False,

                'error': f"Failed to set preference: {e.stderr}"

            }

    

    def get_preference(self, category, key):

        """Get a preference value using the defaults command."""

        domain = self._get_domain_for_category(category)

        

        try:

            result = subprocess.run([

                'defaults', 'read', domain, key

            ], capture_output=True, text=True, check=True)

            

            return {

                'success': True,

                'value': result.stdout.strip()

            }

            

        except subprocess.CalledProcessError as e:

            return {

                'success': False,

                'error': f"Failed to read preference: {e.stderr}"

            }

    

    def _get_domain_for_category(self, category):

        """Map configuration categories to macOS preference domains."""

        return self.domain_mappings.get(category, 'NSGlobalDomain')

    

    def _initialize_domain_mappings(self):

        """Initialize mappings from categories to macOS preference domains."""

        return {

            'display': 'com.apple.windowserver',

            'dock': 'com.apple.dock',

            'finder': 'com.apple.finder',

            'safari': 'com.apple.Safari',

            'system': 'NSGlobalDomain'

        }


SAFETY AND SECURITY CONSIDERATIONS


System configuration changes carry inherent risks that must be carefully managed to prevent system damage, data loss, or security vulnerabilities. The safety framework implements multiple layers of protection to ensure that only safe and authorized changes are executed.


The Permission Management System ensures that users can only perform operations that they are authorized to execute. This system integrates with operating system user permissions and implements additional application-level access controls.


class PermissionManager:

    """

    Manages user permissions for system configuration operations.

    """

    

    def __init__(self):

        self.permission_cache = {}

        self.permission_rules = self._load_permission_rules()

    

    def check_permission(self, user_context, operation):

        """

        Check if a user has permission to perform an operation.

        

        Args:

            user_context (dict): User information and credentials

            operation (dict): The operation to check permissions for

            

        Returns:

            PermissionResult: Contains permission status and details

        """

        cache_key = self._generate_cache_key(user_context, operation)

        

        # Check cache first

        if cache_key in self.permission_cache:

            cached_result = self.permission_cache[cache_key]

            if not cached_result.is_expired():

                return cached_result

        

        # Perform permission check

        result = self._perform_permission_check(user_context, operation)

        

        # Cache the result

        self.permission_cache[cache_key] = result

        

        return result

    

    def _perform_permission_check(self, user_context, operation):

        """Perform the actual permission check."""

        user_id = user_context.get('user_id')

        user_groups = user_context.get('groups', [])

        

        # Check if user is system administrator

        if self._is_system_admin(user_context):

            return PermissionResult(True, "System administrator access")

        

        # Check operation-specific rules

        for rule in self.permission_rules:

            if rule.matches_operation(operation):

                if rule.check_user_permission(user_id, user_groups):

                    return PermissionResult(True, f"Granted by rule: {rule.name}")

                else:

                    return PermissionResult(

                        False, f"Denied by rule: {rule.name}"

                    )

        

        # Default deny

        return PermissionResult(False, "No matching permission rule found")

    

    def _is_system_admin(self, user_context):

        """Check if the user has system administrator privileges."""

        try:

            # Check if user is in admin group (macOS/Linux)

            import grp

            admin_groups = ['admin', 'wheel', 'sudo']

            user_groups = user_context.get('groups', [])

            

            return any(group in admin_groups for group in user_groups)

            

        except Exception:

            return False


class BackupManager:

    """

    Manages system configuration backups and rollback operations.

    """

    

    def __init__(self, backup_directory):

        self.backup_directory = Path(backup_directory)

        self.backup_directory.mkdir(parents=True, exist_ok=True)

        self.backup_index = self._load_backup_index()

    

    def create_backup(self, operation, execution_id):

        """

        Create a backup before executing a configuration change.

        

        Args:

            operation (dict): The operation that will be performed

            execution_id (str): Unique identifier for this execution

            

        Returns:

            BackupInfo: Information about the created backup

        """

        backup_id = self._generate_backup_id(execution_id)

        backup_path = self.backup_directory / backup_id

        backup_path.mkdir(parents=True, exist_ok=True)

        

        # Determine what needs to be backed up

        backup_targets = self._identify_backup_targets(operation)

        

        backup_manifest = {

            'backup_id': backup_id,

            'execution_id': execution_id,

            'timestamp': datetime.now().isoformat(),

            'operation': operation,

            'targets': []

        }

        

        # Create backups for each target

        for target in backup_targets:

            target_backup = self._backup_target(target, backup_path)

            backup_manifest['targets'].append(target_backup)

        

        # Save backup manifest

        manifest_path = backup_path / 'manifest.json'

        with open(manifest_path, 'w') as f:

            json.dump(backup_manifest, f, indent=2)

        

        # Update backup index

        self.backup_index[backup_id] = backup_manifest

        self._save_backup_index()

        

        return BackupInfo(

            backup_id=backup_id,

            backup_path=backup_path,

            manifest=backup_manifest

        )

    

    def restore_backup(self, backup_id):

        """

        Restore a system configuration from backup.

        

        Args:

            backup_id (str): The backup to restore

            

        Returns:

            RestoreResult: Result of the restore operation

        """

        if backup_id not in self.backup_index:

            return RestoreResult(False, f"Backup {backup_id} not found")

        

        backup_manifest = self.backup_index[backup_id]

        backup_path = self.backup_directory / backup_id

        

        try:

            # Restore each backed up target

            for target_info in backup_manifest['targets']:

                self._restore_target(target_info, backup_path)

            

            return RestoreResult(True, f"Successfully restored backup {backup_id}")

            

        except Exception as e:

            return RestoreResult(False, f"Failed to restore backup: {str(e)}")

    

    def _identify_backup_targets(self, operation):

        """Identify what system components need to be backed up."""

        targets = []

        

        category = operation.get('category')

        target = operation.get('target')

        

        if category == 'network':

            if target == 'wifi_enabled':

                targets.append({

                    'type': 'command_output',

                    'command': ['networksetup', '-getairportpower', 'en0'],

                    'description': 'Current WiFi state'

                })

            elif target == 'dns_servers':

                targets.append({

                    'type': 'command_output',

                    'command': ['networksetup', '-getdnsservers', 'Wi-Fi'],

                    'description': 'Current DNS servers'

                })

        

        elif category == 'display':

            targets.append({

                'type': 'defaults',

                'domain': 'com.apple.windowserver',

                'description': 'Display preferences'

            })

        

        return targets

    

    def _backup_target(self, target, backup_path):

        """Create a backup for a specific target."""

        if target['type'] == 'command_output':

            return self._backup_command_output(target, backup_path)

        elif target['type'] == 'defaults':

            return self._backup_defaults_domain(target, backup_path)

        elif target['type'] == 'file':

            return self._backup_file(target, backup_path)

        else:

            raise ValueError(f"Unknown backup target type: {target['type']}")


NATURAL LANGUAGE PROCESSING PIPELINE


The natural language processing pipeline transforms user requests into structured system operations through a series of processing stages. Each stage refines the understanding of user intent and adds additional validation and safety checks.


The Intent Extraction Stage uses Large Language Model capabilities to parse natural language input and extract structured intent. This stage implements sophisticated prompt engineering to guide the model toward generating reliable, parseable output.


class IntentExtractionPipeline:

    """

    Multi-stage pipeline for extracting structured intent from natural language.

    """

    

    def __init__(self, llm_client, schema_manager):

        self.llm_client = llm_client

        self.schema_manager = schema_manager

        self.extraction_stages = [

            InitialParsingStage(),

            ContextEnrichmentStage(),

            ParameterValidationStage(),

            SafetyAssessmentStage()

        ]

    

    def extract_intent(self, user_input, conversation_context):

        """

        Extract structured intent through the processing pipeline.

        

        Args:

            user_input (str): The user's natural language request

            conversation_context (dict): Previous conversation context

            

        Returns:

            IntentExtractionResult: The extracted and validated intent

        """

        # Start with initial parsing

        current_result = IntentExtractionResult(

            raw_input=user_input,

            context=conversation_context

        )

        

        # Process through each stage

        for stage in self.extraction_stages:

            try:

                current_result = stage.process(current_result, self.llm_client)

                

                # Stop processing if a stage indicates failure

                if not current_result.is_valid:

                    break

                    

            except Exception as e:

                current_result.add_error(f"Stage {stage.__class__.__name__} failed: {str(e)}")

                break

        

        return current_result


class InitialParsingStage:

    """

    First stage: Parse natural language into basic structured format.

    """

    

    def process(self, extraction_result, llm_client):

        """Process the initial parsing of user input."""

        system_prompt = self._build_parsing_prompt()

        user_prompt = self._build_user_prompt(

            extraction_result.raw_input,

            extraction_result.context

        )

        

        # Get LLM response

        llm_response = llm_client.generate_completion(

            system_prompt + "\n\n" + user_prompt

        )

        

        # Parse the JSON response

        try:

            parsed_intent = json.loads(llm_response)

            extraction_result.set_parsed_intent(parsed_intent)

            

            # Basic validation of required fields

            required_fields = ['action', 'category', 'target', 'confidence']

            for field in required_fields:

                if field not in parsed_intent:

                    extraction_result.add_error(f"Missing required field: {field}")

            

        except json.JSONDecodeError as e:

            extraction_result.add_error(f"Failed to parse LLM response as JSON: {str(e)}")

        

        return extraction_result

    

    def _build_parsing_prompt(self):

        """Build the system prompt for initial parsing."""

        return """

        You are a system configuration assistant. Parse user requests into structured JSON.

        

        Respond with valid JSON containing these fields:

        - action: Operation type (get, set, enable, disable, toggle, configure)

        - category: System category (network, display, audio, security, system, user)

        - target: Specific setting or component

        - parameters: Object with any additional parameters

        - confidence: Float between 0.0 and 1.0 indicating your confidence

        - reasoning: Brief explanation of your interpretation

        

        If the request is unclear or potentially dangerous, set confidence to 0.0.

        

        Examples:

        "Turn on WiFi" -> {"action": "enable", "category": "network", "target": "wifi", "parameters": {}, "confidence": 0.95, "reasoning": "Clear request to enable WiFi"}

        "Connect to MyNetwork with password secret123" -> {"action": "set", "category": "network", "target": "wifi_connection", "parameters": {"ssid": "MyNetwork", "password": "secret123"}, "confidence": 0.9, "reasoning": "WiFi connection request with credentials"}

        """

    

    def _build_user_prompt(self, user_input, context):

        """Build the user prompt with context."""

        prompt = f"User request: {user_input}"

        

        if context and context.get('previous_requests'):

            prompt += f"\n\nPrevious requests in this conversation:\n"

            for prev_request in context['previous_requests'][-3:]:  # Last 3 requests

                prompt += f"- {prev_request}\n"

        

        return prompt


class ContextEnrichmentStage:

    """

    Second stage: Enrich the parsed intent with additional context.

    """

    

    def process(self, extraction_result, llm_client):

        """Enrich the intent with additional context and details."""

        if not extraction_result.is_valid:

            return extraction_result

        

        parsed_intent = extraction_result.parsed_intent

        

        # Add system context

        self._add_system_context(parsed_intent, extraction_result)

        

        # Resolve ambiguous references

        self._resolve_ambiguities(parsed_intent, extraction_result, llm_client)

        

        # Validate against schema

        self._validate_against_schema(parsed_intent, extraction_result)

        

        return extraction_result

    

    def _add_system_context(self, intent, extraction_result):

        """Add relevant system context to the intent."""

        # Add current system state information

        intent['system_context'] = {

            'platform': platform.system(),

            'platform_version': platform.release(),

            'timestamp': datetime.now().isoformat()

        }

        

        # Add category-specific context

        category = intent.get('category')

        if category == 'network':

            intent['system_context']['network_interfaces'] = self._get_network_interfaces()

        elif category == 'display':

            intent['system_context']['displays'] = self._get_display_info()

    

    def _resolve_ambiguities(self, intent, extraction_result, llm_client):

        """Resolve any ambiguous references in the intent."""

        # Check for ambiguous parameters that need clarification

        ambiguities = self._detect_ambiguities(intent)

        

        if ambiguities:

            clarification_prompt = self._build_clarification_prompt(intent, ambiguities)

            clarification_response = llm_client.generate_completion(clarification_prompt)

            

            try:

                clarified_intent = json.loads(clarification_response)

                intent.update(clarified_intent)

            except json.JSONDecodeError:

                extraction_result.add_warning(

                    "Failed to resolve ambiguities in intent"

                )


RUNNING EXAMPLE WALKTHROUGH


To demonstrate the complete system in action, we will walk through a comprehensive example where a user requests to configure their network settings through natural language. This example illustrates how each component works together to safely execute system configuration changes.


The user begins by asking the system to "Connect to the office WiFi network called CompanyNet with the password SecurePass123 and set up Google DNS servers". This request involves multiple configuration changes that must be coordinated and executed safely.


The Natural Language Processor receives this input and begins the intent extraction process. The system prompt guides the Large Language Model to understand that this request involves network configuration with multiple sub-operations.


# Example of the complete system processing a user request

class SystemConfigurationChatbot:

    """

    Main chatbot class that orchestrates all components.

    """

    

    def __init__(self):

        self.llm_client = LLMClient()

        self.schema_manager = ConfigurationSchema()

        self.nl_processor = NaturalLanguageProcessor(

            self.llm_client, ContextManager()

        )

        self.intent_validator = IntentValidator(

            PermissionManager(), CapabilityRegistry()

        )

        self.command_executor = CommandExecutor(

            MacOSSystemInterface(), BackupManager('/tmp/config_backups')

        )

        self.conversation_manager = ConversationManager()

    

    def process_user_request(self, user_input, user_context, conversation_id):

        """

        Process a complete user request from input to execution.

        

        Args:

            user_input (str): The user's natural language request

            user_context (dict): User information and permissions

            conversation_id (str): Unique conversation identifier

            

        Returns:

            dict: Complete response with results and status

        """

        try:

            # Step 1: Extract intent from natural language

            intent_result = self.nl_processor.process_request(

                user_input, conversation_id

            )

            

            if intent_result['confidence'] < 0.7:

                return self._handle_low_confidence_intent(intent_result)

            

            # Step 2: Validate the intent

            validation_result = self.intent_validator.validate_intent(

                intent_result, user_context

            )

            

            if not validation_result.is_valid:

                return self._handle_validation_failure(validation_result)

            

            # Step 3: Create abstract command

            command_factory = CommandFactory(self.schema_manager)

            abstract_command = command_factory.create_command(intent_result)

            

            # Step 4: Execute the command

            execution_result = self.command_executor.execute_command(

                abstract_command, {'user_context': user_context}

            )

            

            # Step 5: Generate user-friendly response

            response = self._generate_response(

                intent_result, execution_result, user_input

            )

            

            # Step 6: Update conversation history

            self.conversation_manager.add_interaction(

                conversation_id, user_input, response

            )

            

            return response

            

        except Exception as e:

            return self._handle_system_error(e, user_input)

    

    def _generate_response(self, intent, execution_result, original_request):

        """Generate a user-friendly response based on execution results."""

        if execution_result['success']:

            return {

                'status': 'success',

                'message': self._create_success_message(intent, execution_result),

                'details': execution_result.get('details', {}),

                'backup_id': execution_result.get('backup_id'),

                'execution_id': execution_result.get('execution_id')

            }

        else:

            return {

                'status': 'error',

                'message': self._create_error_message(intent, execution_result),

                'error_details': execution_result.get('error'),

                'suggestions': self._generate_error_suggestions(execution_result)

            }

    

    def _create_success_message(self, intent, execution_result):

        """Create a human-readable success message."""

        action = intent.get('action')

        category = intent.get('category')

        target = intent.get('target')

        

        if category == 'network' and target == 'wifi_connection':

            ssid = intent.get('parameters', {}).get('ssid')

            return f"Successfully connected to WiFi network '{ssid}'"

        elif category == 'network' and target == 'dns_servers':

            servers = intent.get('parameters', {}).get('servers', [])

            return f"DNS servers updated to: {', '.join(servers)}"

        else:

            return f"Successfully {action}d {target} in {category} settings"


# Example usage of the complete system

def demonstrate_system_usage():

    """Demonstrate the system processing a complex user request."""

    

    # Initialize the chatbot

    chatbot = SystemConfigurationChatbot()

    

    # Simulate user context

    user_context = {

        'user_id': 'john.doe',

        'groups': ['admin'],

        'permissions': ['network_config', 'system_config']

    }

    

    # User request

    user_request = """Connect to the office WiFi network called CompanyNet 

                     with the password SecurePass123 and set up Google DNS servers"""

    

    conversation_id = "conv_001"

    

    # Process the request

    response = chatbot.process_user_request(

        user_request, user_context, conversation_id

    )

    

    print("User Request:", user_request)

    print("System Response:", json.dumps(response, indent=2))

    

    # Follow-up request

    followup_request = "What DNS servers are currently configured?"

    

    followup_response = chatbot.process_user_request(

        followup_request, user_context, conversation_id

    )

    

    print("\nFollow-up Request:", followup_request)

    print("System Response:", json.dumps(followup_response, indent=2))


The Intent Extraction Pipeline processes this complex request by breaking it down into multiple discrete operations. The system recognizes that the user wants to perform two main actions: connect to a specific WiFi network and configure DNS servers.


The parsed intent structure contains multiple sub-intents that must be executed in sequence. The system creates a batch command that ensures proper ordering of operations, connecting to WiFi first before attempting to configure DNS settings.


The Validation Layer performs comprehensive safety checks on each sub-intent. It verifies that the user has permission to modify network settings, validates that the WiFi password meets security requirements, and confirms that the specified DNS servers are valid IP addresses.


The Backup Manager creates a comprehensive backup before making any changes. This backup includes the current WiFi connection state, existing DNS server configuration, and any other network settings that might be affected by the requested changes.


The macOS System Interface executes the validated commands using the appropriate system tools. It uses the networksetup command to connect to the WiFi network and configure DNS servers, handling any error conditions that might arise during execution.


TESTING AND VALIDATION


Comprehensive testing ensures that the system operates safely and reliably across different scenarios and edge cases. The testing framework validates both individual components and the complete system integration.


Unit tests verify that each component functions correctly in isolation. These tests cover normal operation scenarios as well as error conditions and edge cases that might occur during real-world usage.


class TestNaturalLanguageProcessor(unittest.TestCase):

    """Test suite for the Natural Language Processor component."""

    

    def setUp(self):

        """Set up test fixtures."""

        self.mock_llm_client = MockLLMClient()

        self.context_manager = ContextManager()

        self.processor = NaturalLanguageProcessor(

            self.mock_llm_client, self.context_manager

        )

    

    def test_simple_wifi_enable_request(self):

        """Test processing a simple WiFi enable request."""

        user_input = "Turn on WiFi"

        conversation_id = "test_conv_001"

        

        # Mock LLM response

        self.mock_llm_client.set_response(json.dumps({

            'action': 'enable',

            'category': 'network',

            'target': 'wifi',

            'parameters': {},

            'confidence': 0.95,

            'reasoning': 'Clear request to enable WiFi'

        }))

        

        result = self.processor.process_request(user_input, conversation_id)

        

        self.assertEqual(result['action'], 'enable')

        self.assertEqual(result['category'], 'network')

        self.assertEqual(result['target'], 'wifi')

        self.assertGreaterEqual(result['confidence'], 0.9)

    

    def test_complex_network_configuration(self):

        """Test processing a complex network configuration request."""

        user_input = "Connect to CompanyWiFi with password secret123 and use DNS 8.8.8.8"

        conversation_id = "test_conv_002"

        

        # Mock LLM response for complex request

        self.mock_llm_client.set_response(json.dumps({

            'action': 'configure',

            'category': 'network',

            'target': 'wifi_and_dns',

            'parameters': {

                'wifi_ssid': 'CompanyWiFi',

                'wifi_password': 'secret123',

                'dns_servers': ['8.8.8.8']

            },

            'confidence': 0.88,

            'reasoning': 'Complex network configuration with WiFi and DNS'

        }))

        

        result = self.processor.process_request(user_input, conversation_id)

        

        self.assertEqual(result['action'], 'configure')

        self.assertEqual(result['parameters']['wifi_ssid'], 'CompanyWiFi')

        self.assertIn('8.8.8.8', result['parameters']['dns_servers'])

    

    def test_ambiguous_request_handling(self):

        """Test handling of ambiguous or unclear requests."""

        user_input = "Fix the internet"

        conversation_id = "test_conv_003"

        

        # Mock LLM response for ambiguous request

        self.mock_llm_client.set_response(json.dumps({

            'action': 'diagnose',

            'category': 'network',

            'target': 'connectivity',

            'parameters': {},

            'confidence': 0.3,

            'reasoning': 'Request is too vague to determine specific action'

        }))

        

        result = self.processor.process_request(user_input, conversation_id)

        

        self.assertLess(result['confidence'], 0.5)

        self.assertIn('reasoning', result)


class TestIntentValidator(unittest.TestCase):

    """Test suite for the Intent Validator component."""

    

    def setUp(self):

        """Set up test fixtures."""

        self.permission_manager = MockPermissionManager()

        self.capability_registry = MockCapabilityRegistry()

        self.validator = IntentValidator(

            self.permission_manager, self.capability_registry

        )

    

    def test_valid_intent_with_permissions(self):

        """Test validation of a valid intent with proper permissions."""

        intent = {

            'action': 'enable',

            'category': 'network',

            'target': 'wifi',

            'parameters': {},

            'confidence': 0.95

        }

        

        user_context = {

            'user_id': 'admin_user',

            'groups': ['admin'],

            'permissions': ['network_config']

        }

        

        result = self.validator.validate_intent(intent, user_context)

        

        self.assertTrue(result.is_valid)

        self.assertIn('permissions', result.validation_details)

    

    def test_intent_without_permissions(self):

        """Test validation failure due to insufficient permissions."""

        intent = {

            'action': 'set',

            'category': 'security',

            'target': 'firewall_rules',

            'parameters': {'rules': ['allow 80', 'deny 22']},

            'confidence': 0.9

        }

        

        user_context = {

            'user_id': 'regular_user',

            'groups': ['users'],

            'permissions': ['basic_config']

        }

        

        result = self.validator.validate_intent(intent, user_context)

        

        self.assertFalse(result.is_valid)

        self.assertIn('permission', result.errors[0].lower())

    

    def test_dangerous_operation_blocking(self):

        """Test that dangerous operations are properly blocked."""

        intent = {

            'action': 'delete',

            'category': 'system',

            'target': 'all_user_data',

            'parameters': {},

            'confidence': 0.95

        }

        

        user_context = {

            'user_id': 'admin_user',

            'groups': ['admin'],

            'permissions': ['system_config']

        }

        

        result = self.validator.validate_intent(intent, user_context)

        

        self.assertFalse(result.is_valid)

        self.assertIn('safety', result.errors[0].lower())


class TestSystemIntegration(unittest.TestCase):

    """Integration tests for the complete system."""

    

    def setUp(self):

        """Set up integration test environment."""

        self.test_backup_dir = tempfile.mkdtemp()

        self.chatbot = SystemConfigurationChatbot()

        self.chatbot.command_executor.backup_manager = BackupManager(

            self.test_backup_dir

        )

    

    def test_complete_wifi_configuration_flow(self):

        """Test the complete flow of WiFi configuration."""

        user_context = {

            'user_id': 'test_admin',

            'groups': ['admin'],

            'permissions': ['network_config']

        }

        

        # Test WiFi enable request

        response = self.chatbot.process_user_request(

            "Enable WiFi",

            user_context,

            "integration_test_001"

        )

        

        self.assertEqual(response['status'], 'success')

        self.assertIn('backup_id', response)

        

        # Verify backup was created

        backup_id = response['backup_id']

        backup_path = Path(self.test_backup_dir) / backup_id

        self.assertTrue(backup_path.exists())

        

        # Test rollback functionality

        restore_result = self.chatbot.command_executor.backup_manager.restore_backup(

            backup_id

        )

        self.assertTrue(restore_result.success)

    

    def tearDown(self):

        """Clean up test environment."""

        shutil.rmtree(self.test_backup_dir)


Integration tests validate that all components work together correctly to process user requests from start to finish. These tests use realistic scenarios and verify that the system produces correct results while maintaining safety guarantees.


Performance tests ensure that the system responds to user requests within acceptable time limits and can handle concurrent requests without degradation. These tests also validate that the system scales appropriately as the number of users and requests increases.


Security tests verify that the system properly enforces access controls, prevents unauthorized operations, and protects against potential attack vectors such as prompt injection or privilege escalation attempts.


FULL RUNNING EXAMPLE


The following complete implementation demonstrates all the concepts discussed in this article. This example provides a fully functional system that can be deployed and tested in a real environment.


#!/usr/bin/env python3

"""

Complete implementation of an LLM-powered system configuration chatbot.


This module provides a full working example of a natural language interface

for system configuration management, with proper safety mechanisms and

OS abstraction.

"""


import json

import subprocess

import platform

import tempfile

import shutil

import uuid

import datetime

from pathlib import Path

from typing import Dict, List, Any, Optional

import logging


# Configure logging

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)


class LLMClient:

    """

    Mock LLM client for demonstration purposes.

    In a real implementation, this would interface with an actual LLM service.

    """

    

    def __init__(self):

        self.response_templates = self._initialize_response_templates()

    

    def generate_completion(self, prompt: str) -> str:

        """Generate a completion based on the input prompt."""

        # Simple pattern matching for demonstration

        prompt_lower = prompt.lower()

        

        if 'turn on wifi' in prompt_lower or 'enable wifi' in prompt_lower:

            return json.dumps({

                'action': 'enable',

                'category': 'network',

                'target': 'wifi',

                'parameters': {},

                'confidence': 0.95,

                'reasoning': 'Clear request to enable WiFi'

            })

        

        elif 'connect to' in prompt_lower and 'wifi' in prompt_lower:

            # Extract SSID and password if present

            ssid = self._extract_ssid(prompt)

            password = self._extract_password(prompt)

            

            return json.dumps({

                'action': 'connect',

                'category': 'network',

                'target': 'wifi_network',

                'parameters': {

                    'ssid': ssid,

                    'password': password

                },

                'confidence': 0.9,

                'reasoning': 'WiFi connection request with credentials'

            })

        

        elif 'dns' in prompt_lower:

            dns_servers = self._extract_dns_servers(prompt)

            return json.dumps({

                'action': 'set',

                'category': 'network',

                'target': 'dns_servers',

                'parameters': {

                    'servers': dns_servers

                },

                'confidence': 0.85,

                'reasoning': 'DNS server configuration request'

            })

        

        else:

            return json.dumps({

                'action': 'unknown',

                'category': 'unknown',

                'target': 'unknown',

                'parameters': {},

                'confidence': 0.1,

                'reasoning': 'Could not understand the request'

            })

    

    def _extract_ssid(self, prompt: str) -> Optional[str]:

        """Extract WiFi SSID from prompt."""

        words = prompt.split()

        for i, word in enumerate(words):

            if word.lower() in ['to', 'network'] and i + 1 < len(words):

                return words[i + 1].strip('"\'')

        return None

    

    def _extract_password(self, prompt: str) -> Optional[str]:

        """Extract WiFi password from prompt."""

        words = prompt.split()

        for i, word in enumerate(words):

            if word.lower() in ['password', 'pass'] and i + 1 < len(words):

                return words[i + 1].strip('"\'')

        return None

    

    def _extract_dns_servers(self, prompt: str) -> List[str]:

        """Extract DNS server addresses from prompt."""

        import re

        # Simple IP address pattern

        ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'

        return re.findall(ip_pattern, prompt)

    

    def _initialize_response_templates(self) -> Dict[str, str]:

        """Initialize response templates for common requests."""

        return {

            'wifi_enable': {

                'action': 'enable',

                'category': 'network',

                'target': 'wifi',

                'confidence': 0.95

            }

        }


class ValidationResult:

    """Represents the result of a validation operation."""

    

    def __init__(self, is_valid: bool = True, message: str = ""):

        self.is_valid = is_valid

        self.message = message

        self.errors = []

        self.warnings = []

    

    def add_error(self, error: str):

        """Add an error to the validation result."""

        self.errors.append(error)

        self.is_valid = False

    

    def add_warning(self, warning: str):

        """Add a warning to the validation result."""

        self.warnings.append(warning)


class ExecutionResult:

    """Represents the result of a command execution."""

    

    def __init__(self, success: bool, execution_id: str = None, 

                 result: Dict = None, error: str = None, backup_id: str = None):

        self.success = success

        self.execution_id = execution_id or str(uuid.uuid4())

        self.result = result or {}

        self.error = error

        self.backup_id = backup_id

        self.timestamp = datetime.datetime.now()


class BackupInfo:

    """Information about a system backup."""

    

    def __init__(self, backup_id: str, backup_path: Path, manifest: Dict):

        self.backup_id = backup_id

        self.backup_path = backup_path

        self.manifest = manifest

        self.created_at = datetime.datetime.now()


class ContextManager:

    """Manages conversation context and history."""

    

    def __init__(self):

        self.contexts = {}

    

    def get_context(self, conversation_id: str) -> Dict:

        """Get context for a conversation."""

        return self.contexts.get(conversation_id, {

            'conversation_id': conversation_id,

            'created_at': datetime.datetime.now(),

            'interactions': []

        })

    

    def update_context(self, conversation_id: str, user_input: str, intent: Dict):

        """Update conversation context with new interaction."""

        if conversation_id not in self.contexts:

            self.contexts[conversation_id] = self.get_context(conversation_id)

        

        self.contexts[conversation_id]['interactions'].append({

            'user_input': user_input,

            'intent': intent,

            'timestamp': datetime.datetime.now()

        })


class PermissionManager:

    """Manages user permissions for system operations."""

    

    def __init__(self):

        self.permission_rules = self._initialize_permission_rules()

    

    def check_permission(self, user_context: Dict, operation: Dict) -> bool:

        """Check if user has permission for an operation."""

        user_groups = user_context.get('groups', [])

        user_permissions = user_context.get('permissions', [])

        

        # Admin users can do everything

        if 'admin' in user_groups:

            return True

        

        # Check specific permissions

        operation_category = operation.get('category')

        required_permission = f"{operation_category}_config"

        

        return required_permission in user_permissions

    

    def _initialize_permission_rules(self) -> List[Dict]:

        """Initialize permission rules."""

        return [

            {

                'category': 'network',

                'required_groups': ['admin', 'network_admin'],

                'required_permissions': ['network_config']

            },

            {

                'category': 'system',

                'required_groups': ['admin'],

                'required_permissions': ['system_config']

            }

        ]


class BackupManager:

    """Manages system configuration backups."""

    

    def __init__(self, backup_directory: str):

        self.backup_directory = Path(backup_directory)

        self.backup_directory.mkdir(parents=True, exist_ok=True)

        self.backup_index = {}

    

    def create_backup(self, operation: Dict, execution_id: str) -> BackupInfo:

        """Create a backup before executing an operation."""

        backup_id = f"backup_{execution_id}_{int(datetime.datetime.now().timestamp())}"

        backup_path = self.backup_directory / backup_id

        backup_path.mkdir(parents=True, exist_ok=True)

        

        # Create backup manifest

        manifest = {

            'backup_id': backup_id,

            'execution_id': execution_id,

            'operation': operation,

            'timestamp': datetime.datetime.now().isoformat(),

            'platform': platform.system()

        }

        

        # Save manifest

        manifest_path = backup_path / 'manifest.json'

        with open(manifest_path, 'w') as f:

            json.dump(manifest, f, indent=2)

        

        # Store in index

        self.backup_index[backup_id] = manifest

        

        logger.info(f"Created backup {backup_id} for execution {execution_id}")

        

        return BackupInfo(backup_id, backup_path, manifest)

    

    def restore_backup(self, backup_id: str) -> ExecutionResult:

        """Restore a system configuration from backup."""

        if backup_id not in self.backup_index:

            return ExecutionResult(False, error=f"Backup {backup_id} not found")

        

        try:

            # In a real implementation, this would restore actual system state

            logger.info(f"Restored backup {backup_id}")

            return ExecutionResult(True, result={'restored': backup_id})

        except Exception as e:

            return ExecutionResult(False, error=str(e))


class MacOSSystemInterface:

    """macOS-specific system configuration interface."""

    

    def execute_configuration_change(self, command: Dict) -> Dict:

        """Execute a configuration change on macOS."""

        category = command.get('category')

        action = command.get('action')

        target = command.get('target')

        parameters = command.get('parameters', {})

        

        if category == 'network':

            return self._handle_network_command(action, target, parameters)

        else:

            raise ValueError(f"Unsupported category: {category}")

    

    def _handle_network_command(self, action: str, target: str, parameters: Dict) -> Dict:

        """Handle network-related commands."""

        if target == 'wifi' and action == 'enable':

            return self._enable_wifi()

        elif target == 'wifi_network' and action == 'connect':

            return self._connect_wifi(

                parameters.get('ssid'),

                parameters.get('password')

            )

        elif target == 'dns_servers' and action == 'set':

            return self._set_dns_servers(parameters.get('servers', []))

        else:

            raise ValueError(f"Unsupported network operation: {action} {target}")

    

    def _enable_wifi(self) -> Dict:

        """Enable WiFi on macOS."""

        try:

            # In a real implementation, this would use actual networksetup commands

            # For demonstration, we simulate the operation

            logger.info("Simulating WiFi enable operation")

            

            return {

                'success': True,

                'message': 'WiFi enabled successfully',

                'details': 'Simulated networksetup command execution'

            }

        except Exception as e:

            return {

                'success': False,

                'error': str(e)

            }

    

    def _connect_wifi(self, ssid: str, password: str = None) -> Dict:

        """Connect to a WiFi network."""

        try:

            logger.info(f"Simulating WiFi connection to {ssid}")

            

            return {

                'success': True,

                'message': f'Connected to WiFi network: {ssid}',

                'details': f'Simulated connection to {ssid}'

            }

        except Exception as e:

            return {

                'success': False,

                'error': str(e)

            }

    

    def _set_dns_servers(self, servers: List[str]) -> Dict:

        """Set DNS servers."""

        try:

            logger.info(f"Simulating DNS server configuration: {servers}")

            

            return {

                'success': True,

                'message': f'DNS servers set to: {", ".join(servers)}',

                'details': f'Simulated DNS configuration'

            }

        except Exception as e:

            return {

                'success': False,

                'error': str(e)

            }


class NaturalLanguageProcessor:

    """Processes natural language input and extracts structured intent."""

    

    def __init__(self, llm_client: LLMClient, context_manager: ContextManager):

        self.llm_client = llm_client

        self.context_manager = context_manager

    

    def process_request(self, user_input: str, conversation_id: str) -> Dict:

        """Process a natural language request."""

        context = self.context_manager.get_context(conversation_id)

        

        # Build prompt with context

        prompt = self._build_prompt(user_input, context)

        

        # Get LLM response

        llm_response = self.llm_client.generate_completion(prompt)

        

        # Parse response

        try:

            intent = json.loads(llm_response)

        except json.JSONDecodeError:

            intent = {

                'action': 'unknown',

                'category': 'unknown',

                'target': 'unknown',

                'parameters': {},

                'confidence': 0.0,

                'reasoning': 'Failed to parse LLM response'

            }

        

        # Update context

        self.context_manager.update_context(conversation_id, user_input, intent)

        

        return intent

    

    def _build_prompt(self, user_input: str, context: Dict) -> str:

        """Build the complete prompt for the LLM."""

        system_prompt = """

        You are a system configuration assistant. Parse user requests into structured JSON.

        

        Respond with valid JSON containing:

        - action: Operation type (get, set, enable, disable, connect)

        - category: System category (network, display, audio, system)

        - target: Specific setting or component

        - parameters: Object with additional parameters

        - confidence: Float between 0.0 and 1.0

        - reasoning: Brief explanation

        """

        

        return f"{system_prompt}\n\nUser request: {user_input}"


class IntentValidator:

    """Validates extracted intent against safety rules and permissions."""

    

    def __init__(self, permission_manager: PermissionManager):

        self.permission_manager = permission_manager

        self.safety_rules = self._initialize_safety_rules()

    

    def validate_intent(self, intent: Dict, user_context: Dict) -> ValidationResult:

        """Validate an intent against safety rules and permissions."""

        result = ValidationResult()

        

        # Check confidence threshold

        if intent.get('confidence', 0) < 0.5:

            result.add_error("Intent confidence too low")

            return result

        

        # Check permissions

        if not self.permission_manager.check_permission(user_context, intent):

            result.add_error("Insufficient permissions")

            return result

        

        # Check safety rules

        if not self._check_safety_rules(intent):

            result.add_error("Operation violates safety rules")

            return result

        

        return result

    

    def _check_safety_rules(self, intent: Dict) -> bool:

        """Check intent against safety rules."""

        # Implement safety checks

        dangerous_actions = ['delete', 'format', 'erase']

        if intent.get('action') in dangerous_actions:

            return False

        

        return True

    

    def _initialize_safety_rules(self) -> List[Dict]:

        """Initialize safety rules."""

        return [

            {

                'rule': 'no_destructive_operations',

                'blocked_actions': ['delete', 'format', 'erase']

            }

        ]


class CommandExecutor:

    """Executes validated system configuration commands."""

    

    def __init__(self, system_interface, backup_manager: BackupManager):

        self.system_interface = system_interface

        self.backup_manager = backup_manager

    

    def execute_command(self, intent: Dict, execution_context: Dict) -> ExecutionResult:

        """Execute a validated command."""

        execution_id = str(uuid.uuid4())

        

        try:

            # Create backup

            backup_info = self.backup_manager.create_backup(intent, execution_id)

            

            # Execute the command

            result = self.system_interface.execute_configuration_change(intent)

            

            return ExecutionResult(

                success=result.get('success', False),

                execution_id=execution_id,

                result=result,

                backup_id=backup_info.backup_id

            )

            

        except Exception as e:

            return ExecutionResult(

                success=False,

                execution_id=execution_id,

                error=str(e)

            )


class SystemConfigurationChatbot:

    """Main chatbot class that orchestrates all components."""

    

    def __init__(self, backup_directory: str = None):

        if backup_directory is None:

            backup_directory = tempfile.mkdtemp(prefix='config_chatbot_')

        

        self.llm_client = LLMClient()

        self.context_manager = ContextManager()

        self.permission_manager = PermissionManager()

        self.backup_manager = BackupManager(backup_directory)

        

        # Initialize OS-specific interface

        if platform.system() == 'Darwin':

            self.system_interface = MacOSSystemInterface()

        else:

            # For demonstration, use macOS interface as fallback

            self.system_interface = MacOSSystemInterface()

        

        self.nl_processor = NaturalLanguageProcessor(

            self.llm_client, self.context_manager

        )

        self.intent_validator = IntentValidator(self.permission_manager)

        self.command_executor = CommandExecutor(

            self.system_interface, self.backup_manager

        )

    

    def process_user_request(self, user_input: str, user_context: Dict, 

                           conversation_id: str) -> Dict:

        """Process a complete user request."""

        try:

            # Extract intent

            intent = self.nl_processor.process_request(user_input, conversation_id)

            

            # Validate intent

            validation_result = self.intent_validator.validate_intent(intent, user_context)

            

            if not validation_result.is_valid:

                return {

                    'status': 'error',

                    'message': 'Request validation failed',

                    'errors': validation_result.errors

                }

            

            # Execute command

            execution_result = self.command_executor.execute_command(

                intent, {'user_context': user_context}

            )

            

            if execution_result.success:

                return {

                    'status': 'success',

                    'message': self._generate_success_message(intent, execution_result),

                    'execution_id': execution_result.execution_id,

                    'backup_id': execution_result.backup_id

                }

            else:

                return {

                    'status': 'error',

                    'message': 'Command execution failed',

                    'error': execution_result.error,

                    'execution_id': execution_result.execution_id

                }

                

        except Exception as e:

            logger.error(f"System error processing request: {str(e)}")

            return {

                'status': 'error',

                'message': 'System error occurred',

                'error': str(e)

            }

    

    def _generate_success_message(self, intent: Dict, execution_result: ExecutionResult) -> str:

        """Generate a user-friendly success message."""

        action = intent.get('action')

        category = intent.get('category')

        target = intent.get('target')

        

        if category == 'network' and target == 'wifi':

            return "WiFi has been enabled successfully"

        elif category == 'network' and target == 'wifi_network':

            ssid = intent.get('parameters', {}).get('ssid')

            return f"Successfully connected to WiFi network '{ssid}'"

        elif category == 'network' and target == 'dns_servers':

            servers = intent.get('parameters', {}).get('servers', [])

            return f"DNS servers updated to: {', '.join(servers)}"

        else:

            return f"Successfully {action}ed {target}"


def main():

    """Demonstration of the complete system."""

    print("System Configuration Chatbot - Demo")

    print("=" * 50)

    

    # Initialize the chatbot

    chatbot = SystemConfigurationChatbot()

    

    # Simulate user context

    user_context = {

        'user_id': 'demo_user',

        'groups': ['admin'],

        'permissions': ['network_config', 'system_config']

    }

    

    conversation_id = "demo_conversation"

    

    # Test requests

    test_requests = [

        "Turn on WiFi",

        "Connect to CompanyNet with password SecurePass123",

        "Set DNS servers to 8.8.8.8 and 8.8.4.4",

        "What is the current WiFi status?"

    ]

    

    for request in test_requests:

        print(f"\nUser: {request}")

        

        response = chatbot.process_user_request(

            request, user_context, conversation_id

        )

        

        print(f"Status: {response['status']}")

        print(f"Message: {response['message']}")

        

        if response['status'] == 'success':

            print(f"Execution ID: {response.get('execution_id')}")

            print(f"Backup ID: {response.get('backup_id')}")

        elif response['status'] == 'error':

            print(f"Error: {response.get('error', 'Unknown error')}")

        

        print("-" * 30)


if __name__ == "__main__":

    main()


This complete implementation provides a fully functional system configuration chatbot that demonstrates all the concepts discussed throughout this article. The system includes proper error handling, safety mechanisms, backup functionality, and a clear separation between operating system neutral and operating system specific components.


The implementation can be extended to support additional operating systems by creating new system interface classes that implement the same abstract interface. Additional configuration categories can be added by extending the schema definitions and implementing the corresponding handlers in the system interface classes.


The safety and security mechanisms ensure that the system operates reliably while preventing unauthorized or dangerous operations. The backup and rollback functionality provides a safety net that allows users to recover from configuration changes that have unintended consequences.


This architecture provides a solid foundation for building production-ready natural language interfaces to system configuration management, with the flexibility to adapt to different operating systems and configuration requirements while maintaining consistent safety and usability standards.

No comments: