Monday, December 15, 2025

Building a Natural Language Interface for Email Management Using Local LLMs




Introduction and Problem Analysis


The proliferation of email communication has created a significant challenge for users managing large volumes of messages across multiple accounts and folders. Traditional email clients require users to navigate complex graphical interfaces, remember specific menu locations, and perform repetitive tasks manually. This article presents a comprehensive approach to building a natural language interface that allows users to interact with their email applications using conversational commands processed by a local Large Language Model (LLM).

The core challenge lies in bridging the gap between human natural language expressions and the structured API calls required by email applications. Users naturally express their intentions using varied linguistic patterns, temporal references, and contextual assumptions that must be accurately interpreted and translated into precise programmatic actions. Additionally, privacy concerns necessitate the use of local LLM processing rather than cloud-based solutions.


System Architecture Overview


The proposed system follows a layered architecture that separates concerns and maintains modularity. The architecture consists of five primary components: the Natural Language Processing Layer, the Intent Recognition Engine, the Email Abstraction Layer, the Vendor-Specific Adapters, and the Security and Confirmation Layer.

The Natural Language Processing Layer receives user input and performs initial text preprocessing, including tokenization, normalization, and context extraction. This layer interfaces directly with the local LLM to generate structured interpretations of user commands.

The Intent Recognition Engine analyzes the LLM output to identify specific actions, extract parameters, and validate the completeness of the request. This component maintains a registry of supported operations and their required parameters.

The Email Abstraction Layer provides a unified interface for email operations, abstracting away vendor-specific implementation details. This layer defines standard operations such as message retrieval, folder management, and account configuration.

The Vendor-Specific Adapters implement the abstract email operations for particular email clients. For Apple Mail, this involves interfacing with the Mail.app through AppleScript or the EventKit framework.

The Security and Confirmation Layer manages user authentication, validates permissions, and handles confirmation workflows for potentially destructive operations.


Natural Language Processing Implementation


The natural language processing component serves as the primary interface between user input and system interpretation. This component must handle the inherent ambiguity and variability of human language while extracting precise operational parameters.


import json

import re

from typing import Dict, List, Optional, Tuple

from dataclasses import dataclass

from enum import Enum


class ActionType(Enum):

    DELETE_MESSAGES = "delete_messages"

    MOVE_MESSAGES = "move_messages"

    LIST_MESSAGES = "list_messages"

    CREATE_ACCOUNT = "create_account"

    SEARCH_MESSAGES = "search_messages"


@dataclass

class EmailCommand:

    action: ActionType

    parameters: Dict[str, any]

    confidence: float

    requires_confirmation: bool


class NaturalLanguageProcessor:

    def __init__(self, llm_model_path: str):

        """

        Initialize the NLP processor with a local LLM model.

        

        Args:

            llm_model_path: Path to the local LLM model file

        """

        self.llm_model = self._load_local_llm(llm_model_path)

        self.command_patterns = self._initialize_command_patterns()

        

    def _load_local_llm(self, model_path: str):

        """

        Load and initialize the local LLM model for processing.

        This implementation assumes a model compatible with the llama.cpp

        Python bindings for local inference.

        """

        try:

            from llama_cpp import Llama

            return Llama(

                model_path=model_path,

                n_ctx=2048,

                n_threads=4,

                verbose=False

            )

        except ImportError:

            raise ImportError("llama-cpp-python required for local LLM processing")

    

    def _initialize_command_patterns(self) -> Dict[str, str]:

        """

        Initialize regex patterns for common email command structures.

        These patterns serve as fallback mechanisms when LLM processing

        fails or produces low-confidence results.

        """

        return {

            'delete_subject': r'delete.*mails?.*subject\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',

            'move_regex': r'move.*mails?.*contain\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',

            'list_sender': r'list.*mails?.*account\s+([^\s]+).*sent by\s+["\']?([^"\']+)["\']?',

            'create_account': r'create.*mail account.*for\s+([^\s]+)',

            'move_receiver': r'move.*mails?.*receiver\s+([^\s]+).*folder\s+["\']?([^"\']+)["\']?'

        }

    

    def process_command(self, user_input: str) -> EmailCommand:

        """

        Process natural language input and extract structured command information.

        

        Args:

            user_input: Raw natural language command from user

            

        Returns:

            EmailCommand object containing parsed action and parameters

        """

        # Preprocess the input text

        normalized_input = self._preprocess_text(user_input)

        

        # Generate LLM prompt for command interpretation

        prompt = self._create_interpretation_prompt(normalized_input)

        

        # Get LLM response

        llm_response = self._query_llm(prompt)

        

        # Parse LLM response into structured format

        try:

            command = self._parse_llm_response(llm_response)

            if command.confidence > 0.7:

                return command

        except Exception as e:

            print(f"LLM parsing failed: {e}")

        

        # Fallback to pattern matching if LLM fails

        return self._fallback_pattern_matching(normalized_input)

    

    def _preprocess_text(self, text: str) -> str:

        """

        Normalize and clean input text for better processing.

        """

        # Convert to lowercase for consistency

        text = text.lower().strip()

        

        # Normalize whitespace

        text = re.sub(r'\s+', ' ', text)

        

        # Handle common abbreviations

        text = text.replace("e-mail", "email")

        text = text.replace("e-mails", "emails")

        

        return text

    

    def _create_interpretation_prompt(self, user_input: str) -> str:

        """

        Create a structured prompt for the LLM to interpret email commands.

        """

        prompt = f"""

You are an email command interpreter. Parse the following user command and return a JSON response with the action type and parameters.


Supported actions:

- delete_messages: Delete emails based on criteria

- move_messages: Move emails to a different folder

- list_messages: List emails matching criteria

- create_account: Create a new email account

- search_messages: Search for emails


User command: "{user_input}"


Return JSON format:

{{

    "action": "action_type",

    "parameters": {{

        "subject": "email subject (if applicable)",

        "folder": "folder name (if applicable)",

        "sender": "sender email/name (if applicable)",

        "receiver": "receiver email (if applicable)",

        "account": "email account (if applicable)",

        "regex_pattern": "regex pattern (if applicable)",

        "destination_folder": "target folder for moves (if applicable)"

    }},

    "confidence": 0.95,

    "requires_confirmation": true

}}


JSON Response:

"""

        return prompt

    

    def _query_llm(self, prompt: str) -> str:

        """

        Query the local LLM with the interpretation prompt.

        """

        response = self.llm_model(

            prompt,

            max_tokens=512,

            temperature=0.1,

            stop=["User command:", "\n\n"]

        )

        return response['choices'][0]['text'].strip()

    

    def _parse_llm_response(self, response: str) -> EmailCommand:

        """

        Parse the LLM JSON response into an EmailCommand object.

        """

        try:

            # Extract JSON from response

            json_start = response.find('{')

            json_end = response.rfind('}') + 1

            json_str = response[json_start:json_end]

            

            parsed = json.loads(json_str)

            

            action = ActionType(parsed['action'])

            parameters = {k: v for k, v in parsed['parameters'].items() if v}

            confidence = float(parsed.get('confidence', 0.5))

            requires_confirmation = bool(parsed.get('requires_confirmation', True))

            

            return EmailCommand(

                action=action,

                parameters=parameters,

                confidence=confidence,

                requires_confirmation=requires_confirmation

            )

        except (json.JSONDecodeError, KeyError, ValueError) as e:

            raise ValueError(f"Failed to parse LLM response: {e}")

    

    def _fallback_pattern_matching(self, text: str) -> EmailCommand:

        """

        Fallback pattern matching when LLM processing fails.

        """

        for pattern_name, pattern in self.command_patterns.items():

            match = re.search(pattern, text, re.IGNORECASE)

            if match:

                return self._create_command_from_pattern(pattern_name, match.groups())

        

        # Default fallback

        return EmailCommand(

            action=ActionType.SEARCH_MESSAGES,

            parameters={"query": text},

            confidence=0.3,

            requires_confirmation=True

        )

    

    def _create_command_from_pattern(self, pattern_name: str, groups: Tuple[str, ...]) -> EmailCommand:

        """

        Create EmailCommand from regex pattern matches.

        """

        if pattern_name == 'delete_subject':

            return EmailCommand(

                action=ActionType.DELETE_MESSAGES,

                parameters={"subject": groups[0], "folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'move_regex':

            return EmailCommand(

                action=ActionType.MOVE_MESSAGES,

                parameters={"regex_pattern": groups[0], "destination_folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'list_sender':

            return EmailCommand(

                action=ActionType.LIST_MESSAGES,

                parameters={"account": groups[0], "sender": groups[1]},

                confidence=0.8,

                requires_confirmation=False

            )

        elif pattern_name == 'create_account':

            return EmailCommand(

                action=ActionType.CREATE_ACCOUNT,

                parameters={"email_address": groups[0]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'move_receiver':

            return EmailCommand(

                action=ActionType.MOVE_MESSAGES,

                parameters={"receiver": groups[0], "destination_folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        

        return EmailCommand(

            action=ActionType.SEARCH_MESSAGES,

            parameters={"query": " ".join(groups)},

            confidence=0.5,

            requires_confirmation=True

        )


Email Abstraction Layer Design


The email abstraction layer provides a unified interface that isolates the application logic from vendor-specific implementation details. This design enables support for multiple email clients while maintaining a consistent programming interface.


from abc import ABC, abstractmethod

from typing import List, Dict, Optional, Any

from dataclasses import dataclass

from datetime import datetime


@dataclass

class EmailMessage:

    """

    Unified representation of an email message across different clients.

    """

    message_id: str

    subject: str

    sender: str

    recipients: List[str]

    cc_recipients: List[str]

    bcc_recipients: List[str]

    body: str

    html_body: Optional[str]

    date_sent: datetime

    date_received: datetime

    folder: str

    account: str

    is_read: bool

    is_flagged: bool

    attachments: List[str]

    headers: Dict[str, str]


@dataclass

class EmailFolder:

    """

    Representation of an email folder structure.

    """

    name: str

    path: str

    message_count: int

    unread_count: int

    subfolders: List['EmailFolder']

    account: str


@dataclass

class EmailAccount:

    """

    Representation of an email account configuration.

    """

    email_address: str

    display_name: str

    account_type: str  # IMAP, POP3, Exchange, etc.

    server_settings: Dict[str, Any]

    is_enabled: bool

    folders: List[EmailFolder]


class EmailClientInterface(ABC):

    """

    Abstract interface defining operations that email clients must implement.

    This interface ensures consistency across different email client implementations.

    """

    

    @abstractmethod

    def connect(self) -> bool:

        """

        Establish connection to the email client.

        

        Returns:

            True if connection successful, False otherwise

        """

        pass

    

    @abstractmethod

    def disconnect(self) -> None:

        """

        Close connection to the email client.

        """

        pass

    

    @abstractmethod

    def get_accounts(self) -> List[EmailAccount]:

        """

        Retrieve all configured email accounts.

        

        Returns:

            List of EmailAccount objects

        """

        pass

    

    @abstractmethod

    def get_folders(self, account: str) -> List[EmailFolder]:

        """

        Get folder structure for a specific account.

        

        Args:

            account: Email account identifier

            

        Returns:

            List of EmailFolder objects

        """

        pass

    

    @abstractmethod

    def get_messages(self, account: str, folder: str, 

                    limit: Optional[int] = None,

                    filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:

        """

        Retrieve messages from a specific folder.

        

        Args:

            account: Email account identifier

            folder: Folder name or path

            limit: Maximum number of messages to retrieve

            filters: Dictionary of filter criteria

            

        Returns:

            List of EmailMessage objects

        """

        pass

    

    @abstractmethod

    def search_messages(self, account: str, query: str,

                       folder: Optional[str] = None) -> List[EmailMessage]:

        """

        Search for messages matching query criteria.

        

        Args:

            account: Email account identifier

            query: Search query string

            folder: Optional folder to limit search scope

            

        Returns:

            List of matching EmailMessage objects

        """

        pass

    

    @abstractmethod

    def delete_messages(self, message_ids: List[str], 

                       account: str) -> bool:

        """

        Delete specified messages.

        

        Args:

            message_ids: List of message identifiers to delete

            account: Email account identifier

            

        Returns:

            True if deletion successful, False otherwise

        """

        pass

    

    @abstractmethod

    def move_messages(self, message_ids: List[str], 

                     source_folder: str, destination_folder: str,

                     account: str) -> bool:

        """

        Move messages between folders.

        

        Args:

            message_ids: List of message identifiers to move

            source_folder: Source folder name

            destination_folder: Destination folder name

            account: Email account identifier

            

        Returns:

            True if move successful, False otherwise

        """

        pass

    

    @abstractmethod

    def create_folder(self, folder_name: str, parent_folder: str,

                     account: str) -> bool:

        """

        Create a new email folder.

        

        Args:

            folder_name: Name of the new folder

            parent_folder: Parent folder path

            account: Email account identifier

            

        Returns:

            True if creation successful, False otherwise

        """

        pass

    

    @abstractmethod

    def mark_as_read(self, message_ids: List[str], account: str) -> bool:

        """

        Mark messages as read.

        

        Args:

            message_ids: List of message identifiers

            account: Email account identifier

            

        Returns:

            True if operation successful, False otherwise

        """

        pass

    

    @abstractmethod

    def mark_as_unread(self, message_ids: List[str], account: str) -> bool:

        """

        Mark messages as unread.

        

        Args:

            message_ids: List of message identifiers

            account: Email account identifier

            

        Returns:

            True if operation successful, False otherwise

        """

        pass

    

    @abstractmethod

    def create_account(self, account_config: EmailAccount) -> bool:

        """

        Create a new email account configuration.

        

        Args:

            account_config: EmailAccount object with configuration details

            

        Returns:

            True if account creation successful, False otherwise

        """

        pass


class EmailOperationResult:

    """

    Standardized result object for email operations.

    """

    def __init__(self, success: bool, message: str, 

                 affected_count: int = 0, data: Any = None):

        self.success = success

        self.message = message

        self.affected_count = affected_count

        self.data = data

        self.timestamp = datetime.now()


class EmailManager:

    """

    High-level email management class that coordinates operations

    across the abstraction layer.

    """

    

    def __init__(self, client: EmailClientInterface):

        """

        Initialize email manager with a specific client implementation.

        

        Args:

            client: EmailClientInterface implementation

        """

        self.client = client

        self.connected = False

    

    def initialize(self) -> bool:

        """

        Initialize connection to the email client.

        

        Returns:

            True if initialization successful, False otherwise

        """

        try:

            self.connected = self.client.connect()

            return self.connected

        except Exception as e:

            print(f"Failed to initialize email client: {e}")

            return False

    

    def execute_command(self, command: EmailCommand) -> EmailOperationResult:

        """

        Execute a parsed email command through the abstraction layer.

        

        Args:

            command: EmailCommand object containing action and parameters

            

        Returns:

            EmailOperationResult with operation outcome

        """

        if not self.connected:

            return EmailOperationResult(

                success=False,

                message="Email client not connected"

            )

        

        try:

            if command.action == ActionType.DELETE_MESSAGES:

                return self._execute_delete_command(command.parameters)

            elif command.action == ActionType.MOVE_MESSAGES:

                return self._execute_move_command(command.parameters)

            elif command.action == ActionType.LIST_MESSAGES:

                return self._execute_list_command(command.parameters)

            elif command.action == ActionType.CREATE_ACCOUNT:

                return self._execute_create_account_command(command.parameters)

            elif command.action == ActionType.SEARCH_MESSAGES:

                return self._execute_search_command(command.parameters)

            else:

                return EmailOperationResult(

                    success=False,

                    message=f"Unsupported action: {command.action}"

                )

        except Exception as e:

            return EmailOperationResult(

                success=False,

                message=f"Command execution failed: {str(e)}"

            )

    

    def _execute_delete_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute delete messages command.

        """

        account = parameters.get('account', 'default')

        folder = parameters.get('folder', 'Inbox')

        subject = parameters.get('subject')

        

        if not subject:

            return EmailOperationResult(

                success=False,

                message="Subject parameter required for delete operation"

            )

        

        # Find messages matching criteria

        messages = self.client.get_messages(account, folder)

        matching_messages = [

            msg for msg in messages 

            if subject.lower() in msg.subject.lower()

        ]

        

        if not matching_messages:

            return EmailOperationResult(

                success=True,

                message="No messages found matching criteria",

                affected_count=0

            )

        

        # Delete matching messages

        message_ids = [msg.message_id for msg in matching_messages]

        success = self.client.delete_messages(message_ids, account)

        

        return EmailOperationResult(

            success=success,

            message=f"Deleted {len(matching_messages)} messages" if success 

                   else "Failed to delete messages",

            affected_count=len(matching_messages) if success else 0

        )

    

    def _execute_move_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute move messages command.

        """

        account = parameters.get('account', 'default')

        source_folder = parameters.get('folder', 'Inbox')

        destination_folder = parameters.get('destination_folder')

        regex_pattern = parameters.get('regex_pattern')

        receiver = parameters.get('receiver')

        

        if not destination_folder:

            return EmailOperationResult(

                success=False,

                message="Destination folder required for move operation"

            )

        

        # Get messages from source folder

        messages = self.client.get_messages(account, source_folder)

        matching_messages = []

        

        if regex_pattern:

            import re

            pattern = re.compile(regex_pattern, re.IGNORECASE)

            matching_messages = [

                msg for msg in messages

                if pattern.search(msg.body) or pattern.search(msg.subject)

            ]

        elif receiver:

            matching_messages = [

                msg for msg in messages

                if receiver.lower() in [r.lower() for r in msg.recipients]

            ]

        

        if not matching_messages:

            return EmailOperationResult(

                success=True,

                message="No messages found matching criteria",

                affected_count=0

            )

        

        # Move matching messages

        message_ids = [msg.message_id for msg in matching_messages]

        success = self.client.move_messages(

            message_ids, source_folder, destination_folder, account

        )

        

        return EmailOperationResult(

            success=success,

            message=f"Moved {len(matching_messages)} messages" if success 

                   else "Failed to move messages",

            affected_count=len(matching_messages) if success else 0

        )

    

    def _execute_list_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute list messages command.

        """

        account = parameters.get('account', 'default')

        folder = parameters.get('folder', 'Inbox')

        sender = parameters.get('sender')

        

        messages = self.client.get_messages(account, folder)

        

        if sender:

            matching_messages = [

                msg for msg in messages

                if sender.lower() in msg.sender.lower()

            ]

        else:

            matching_messages = messages

        

        return EmailOperationResult(

            success=True,

            message=f"Found {len(matching_messages)} messages",

            affected_count=len(matching_messages),

            data=matching_messages

        )

    

    def _execute_search_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute search messages command.

        """

        account = parameters.get('account', 'default')

        query = parameters.get('query', '')

        folder = parameters.get('folder')

        

        matching_messages = self.client.search_messages(account, query, folder)

        

        return EmailOperationResult(

            success=True,

            message=f"Search found {len(matching_messages)} messages",

            affected_count=len(matching_messages),

            data=matching_messages

        )

    

    def _execute_create_account_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute create account command.

        """

        email_address = parameters.get('email_address')

        

        if not email_address:

            return EmailOperationResult(

                success=False,

                message="Email address required for account creation"

            )

        

        # Create basic account configuration

        account_config = EmailAccount(

            email_address=email_address,

            display_name=email_address.split('@')[0],

            account_type='IMAP',

            server_settings={},

            is_enabled=True,

            folders=[]

        )

        

        success = self.client.create_account(account_config)

        

        return EmailOperationResult(

            success=success,

            message=f"Created account for {email_address}" if success 

                   else "Failed to create account",

            affected_count=1 if success else 0

        )


Apple Mail Integration Implementation


The Apple Mail adapter implements the email abstraction interface using AppleScript and Objective-C bridges to interact with the Mail.app application. This implementation demonstrates how vendor-specific functionality integrates with the abstract interface.


import subprocess

import json

import re

from typing import List, Dict, Optional, Any

from datetime import datetime

import objc

from Foundation import NSAppleScript, NSString

from AppKit import NSWorkspace


class AppleMailClient(EmailClientInterface):

    """

    Apple Mail implementation of the EmailClientInterface.

    Uses AppleScript and Objective-C runtime for Mail.app integration.

    """

    

    def __init__(self):

        """

        Initialize Apple Mail client interface.

        """

        self.app_name = "Mail"

        self.is_connected = False

        self.applescript_runner = AppleScriptRunner()

    

    def connect(self) -> bool:

        """

        Establish connection to Apple Mail application.

        """

        try:

            # Check if Mail.app is running

            workspace = NSWorkspace.sharedWorkspace()

            running_apps = workspace.runningApplications()

            

            mail_running = any(

                app.bundleIdentifier() == "com.apple.mail" 

                for app in running_apps

            )

            

            if not mail_running:

                # Launch Mail.app

                workspace.launchApplication_(self.app_name)

                # Wait for application to start

                import time

                time.sleep(3)

            

            # Test connection with a simple AppleScript command

            test_script = 'tell application "Mail" to get name of every account'

            result = self.applescript_runner.run_script(test_script)

            

            self.is_connected = result.success

            return self.is_connected

            

        except Exception as e:

            print(f"Failed to connect to Apple Mail: {e}")

            return False

    

    def disconnect(self) -> None:

        """

        Close connection to Apple Mail.

        """

        self.is_connected = False

    

    def get_accounts(self) -> List[EmailAccount]:

        """

        Retrieve all configured email accounts from Apple Mail.

        """

        script = '''

        tell application "Mail"

            set accountList to {}

            repeat with acc in every account

                set accountInfo to {name of acc, user name of acc, email addresses of acc}

                set end of accountList to accountInfo

            end repeat

            return accountList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        accounts = []

        try:

            # Parse AppleScript result

            account_data = self._parse_applescript_list(result.output)

            

            for acc_info in account_data:

                if len(acc_info) >= 3:

                    account = EmailAccount(

                        email_address=acc_info[2][0] if acc_info[2] else acc_info[1],

                        display_name=acc_info[0],

                        account_type="Apple Mail",

                        server_settings={},

                        is_enabled=True,

                        folders=self._get_account_folders(acc_info[0])

                    )

                    accounts.append(account)

        except Exception as e:

            print(f"Error parsing account data: {e}")

        

        return accounts

    

    def get_folders(self, account: str) -> List[EmailFolder]:

        """

        Get folder structure for a specific account.

        """

        return self._get_account_folders(account)

    

    def _get_account_folders(self, account_name: str) -> List[EmailFolder]:

        """

        Helper method to retrieve folders for an account.

        """

        script = f'''

        tell application "Mail"

            set folderList to {{}}

            try

                set acc to account "{account_name}"

                repeat with mbox in every mailbox of acc

                    set folderInfo to {{name of mbox, (count of messages of mbox), (count of (messages of mbox whose read status is false))}}

                    set end of folderList to folderInfo

                end repeat

            end try

            return folderList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        folders = []

        try:

            folder_data = self._parse_applescript_list(result.output)

            

            for folder_info in folder_data:

                if len(folder_info) >= 3:

                    folder = EmailFolder(

                        name=folder_info[0],

                        path=folder_info[0],

                        message_count=int(folder_info[1]),

                        unread_count=int(folder_info[2]),

                        subfolders=[],

                        account=account_name

                    )

                    folders.append(folder)

        except Exception as e:

            print(f"Error parsing folder data: {e}")

        

        return folders

    

    def get_messages(self, account: str, folder: str, 

                    limit: Optional[int] = None,

                    filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:

        """

        Retrieve messages from a specific folder in Apple Mail.

        """

        limit_clause = f"items 1 thru {limit} of " if limit else ""

        

        script = f'''

        tell application "Mail"

            set messageList to {{}}

            try

                set acc to account "{account}"

                set mbox to mailbox "{folder}" of acc

                set msgs to {limit_clause}(every message of mbox)

                

                repeat with msg in msgs

                    set msgInfo to {{id of msg, subject of msg, sender of msg, ¬

                                   content of msg, date received of msg, ¬

                                   read status of msg, flagged status of msg}}

                    set end of messageList to msgInfo

                end repeat

            end try

            return messageList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        messages = []

        try:

            message_data = self._parse_applescript_list(result.output)

            

            for msg_info in message_data:

                if len(msg_info) >= 7:

                    # Parse recipients separately

                    recipients = self._get_message_recipients(msg_info[0])

                    

                    message = EmailMessage(

                        message_id=str(msg_info[0]),

                        subject=msg_info[1] or "",

                        sender=msg_info[2] or "",

                        recipients=recipients.get('to', []),

                        cc_recipients=recipients.get('cc', []),

                        bcc_recipients=recipients.get('bcc', []),

                        body=msg_info[3] or "",

                        html_body=None,

                        date_sent=self._parse_applescript_date(msg_info[4]),

                        date_received=self._parse_applescript_date(msg_info[4]),

                        folder=folder,

                        account=account,

                        is_read=bool(msg_info[5]),

                        is_flagged=bool(msg_info[6]),

                        attachments=[],

                        headers={}

                    )

                    messages.append(message)

        except Exception as e:

            print(f"Error parsing message data: {e}")

        

        return messages

    

    def _get_message_recipients(self, message_id: str) -> Dict[str, List[str]]:

        """

        Get detailed recipient information for a message.

        """

        script = f'''

        tell application "Mail"

            try

                set msg to message id {message_id}

                set toRecipients to {{}}

                set ccRecipients to {{}}

                

                repeat with recip in to recipients of msg

                    set end of toRecipients to address of recip

                end repeat

                

                repeat with recip in cc recipients of msg

                    set end of ccRecipients to address of recip

                end repeat

                

                return {{toRecipients, ccRecipients}}

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return {'to': [], 'cc': [], 'bcc': []}

        

        try:

            recipient_data = self._parse_applescript_list(result.output)

            return {

                'to': recipient_data[0] if len(recipient_data) > 0 else [],

                'cc': recipient_data[1] if len(recipient_data) > 1 else [],

                'bcc': []  # BCC recipients not accessible via AppleScript

            }

        except:

            return {'to': [], 'cc': [], 'bcc': []}

    

    def search_messages(self, account: str, query: str,

                       folder: Optional[str] = None) -> List[EmailMessage]:

        """

        Search for messages in Apple Mail using Spotlight integration.

        """

        folder_clause = f'in mailbox "{folder}" of account "{account}"' if folder else f'in account "{account}"'

        

        script = f'''

        tell application "Mail"

            set searchResults to {{}}

            try

                set msgs to (every message {folder_clause} whose subject contains "{query}" or content contains "{query}")

                

                repeat with msg in msgs

                    set msgInfo to {{id of msg, subject of msg, sender of msg, ¬

                                   content of msg, date received of msg, ¬

                                   read status of msg, flagged status of msg, ¬

                                   name of mailbox of msg}}

                    set end of searchResults to msgInfo

                end repeat

            end try

            return searchResults

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        messages = []

        try:

            message_data = self._parse_applescript_list(result.output)

            

            for msg_info in message_data:

                if len(msg_info) >= 8:

                    recipients = self._get_message_recipients(msg_info[0])

                    

                    message = EmailMessage(

                        message_id=str(msg_info[0]),

                        subject=msg_info[1] or "",

                        sender=msg_info[2] or "",

                        recipients=recipients.get('to', []),

                        cc_recipients=recipients.get('cc', []),

                        bcc_recipients=recipients.get('bcc', []),

                        body=msg_info[3] or "",

                        html_body=None,

                        date_sent=self._parse_applescript_date(msg_info[4]),

                        date_received=self._parse_applescript_date(msg_info[4]),

                        folder=msg_info[7] if len(msg_info) > 7 else folder or "Unknown",

                        account=account,

                        is_read=bool(msg_info[5]),

                        is_flagged=bool(msg_info[6]),

                        attachments=[],

                        headers={}

                    )

                    messages.append(message)

        except Exception as e:

            print(f"Error parsing search results: {e}")

        

        return messages

    

    def delete_messages(self, message_ids: List[str], account: str) -> bool:

        """

        Delete specified messages in Apple Mail.

        """

        if not message_ids:

            return True

        

        # Convert message IDs to AppleScript list format

        id_list = ", ".join(message_ids)

        

        script = f'''

        tell application "Mail"

            try

                set msgIds to {{{id_list}}}

                repeat with msgId in msgIds

                    set msg to message id msgId

                    delete msg

                end repeat

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def move_messages(self, message_ids: List[str], 

                     source_folder: str, destination_folder: str,

                     account: str) -> bool:

        """

        Move messages between folders in Apple Mail.

        """

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        

        script = f'''

        tell application "Mail"

            try

                set acc to account "{account}"

                set destBox to mailbox "{destination_folder}" of acc

                set msgIds to {{{id_list}}}

                

                repeat with msgId in msgIds

                    set msg to message id msgId

                    set mailbox of msg to destBox

                end repeat

                return true

            on error errMsg

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def create_folder(self, folder_name: str, parent_folder: str,

                     account: str) -> bool:

        """

        Create a new mailbox in Apple Mail.

        """

        script = f'''

        tell application "Mail"

            try

                set acc to account "{account}"

                if "{parent_folder}" is not "" then

                    set parentBox to mailbox "{parent_folder}" of acc

                    make new mailbox with properties {{name:"{folder_name}"}} at parentBox

                else

                    make new mailbox with properties {{name:"{folder_name}"}} at acc

                end if

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def mark_as_read(self, message_ids: List[str], account: str) -> bool:

        """

        Mark messages as read in Apple Mail.

        """

        return self._set_read_status(message_ids, True)

    

    def mark_as_unread(self, message_ids: List[str], account: str) -> bool:

        """

        Mark messages as unread in Apple Mail.

        """

        return self._set_read_status(message_ids, False)

    

    def _set_read_status(self, message_ids: List[str], read_status: bool) -> bool:

        """

        Helper method to set read status of messages.

        """

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        status_value = "true" if read_status else "false"

        

        script = f'''

        tell application "Mail"

            try

                set msgIds to {{{id_list}}}

                repeat with msgId in msgIds

                    set msg to message id msgId

                    set read status of msg to {status_value}

                end repeat

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def create_account(self, account_config: EmailAccount) -> bool:

        """

        Create a new email account in Apple Mail.

        Note: This operation typically requires user interaction

        and may not be fully automatable via AppleScript.

        """

        # Apple Mail account creation usually requires manual setup

        # or configuration profiles. This is a simplified implementation.

        print(f"Account creation for {account_config.email_address} requires manual setup in Apple Mail")

        return False

    

    def _parse_applescript_list(self, output: str) -> List[Any]:

        """

        Parse AppleScript list output into Python data structures.

        """

        try:

            # Remove AppleScript list formatting and convert to Python-parseable format

            cleaned = output.strip()

            if cleaned.startswith('{') and cleaned.endswith('}'):

                cleaned = cleaned[1:-1]  # Remove outer braces

            

            # This is a simplified parser - a production implementation

            # would need more robust AppleScript result parsing

            items = []

            current_item = ""

            brace_count = 0

            in_quotes = False

            

            for char in cleaned:

                if char == '"' and (not current_item or current_item[-1] != '\\'):

                    in_quotes = not in_quotes

                elif char == '{' and not in_quotes:

                    brace_count += 1

                elif char == '}' and not in_quotes:

                    brace_count -= 1

                elif char == ',' and brace_count == 0 and not in_quotes:

                    items.append(current_item.strip().strip('"'))

                    current_item = ""

                    continue

                

                current_item += char

            

            if current_item.strip():

                items.append(current_item.strip().strip('"'))

            

            return items

        except Exception as e:

            print(f"Error parsing AppleScript output: {e}")

            return []

    

    def _parse_applescript_date(self, date_str: str) -> datetime:

        """

        Parse AppleScript date format into Python datetime.

        """

        try:

            # AppleScript dates are typically in format like:

            # "date \"Wednesday, January 1, 2025 at 12:00:00 PM\""

            import dateutil.parser

            return dateutil.parser.parse(date_str)

        except:

            return datetime.now()


class AppleScriptRunner:

    """

    Utility class for executing AppleScript commands.

    """

    

    def __init__(self):

        self.timeout = 30  # seconds

    

    def run_script(self, script: str) -> 'AppleScriptResult':

        """

        Execute an AppleScript and return the result.

        """

        try:

            # Use NSAppleScript for better integration

            applescript = NSAppleScript.alloc().initWithSource_(script)

            result, error = applescript.executeAndReturnError_(None)

            

            if error:

                return AppleScriptResult(

                    success=False,

                    output="",

                    error=str(error)

                )

            

            output = str(result.stringValue()) if result else ""

            return AppleScriptResult(

                success=True,

                output=output,

                error=""

            )

            

        except Exception as e:

            return AppleScriptResult(

                success=False,

                output="",

                error=str(e)

            )


class AppleScriptResult:

    """

    Result object for AppleScript execution.

    """

    def __init__(self, success: bool, output: str, error: str):

        self.success = success

        self.output = output

        self.error = error


Security and Confirmation System


The security layer ensures that potentially destructive operations require explicit user confirmation and that all operations are performed with appropriate permissions. This component also handles authentication and access control.


import hashlib

import json

import getpass

from typing import Dict, List, Optional, Callable

from datetime import datetime, timedelta

from dataclasses import dataclass

from enum import Enum


class SecurityLevel(Enum):

    LOW = "low"

    MEDIUM = "medium"

    HIGH = "high"

    CRITICAL = "critical"


class OperationType(Enum):

    READ = "read"

    MODIFY = "modify"

    DELETE = "delete"

    CREATE = "create"

    MOVE = "move"


@dataclass

class SecurityPolicy:

    """

    Defines security requirements for different operation types.

    """

    operation_type: OperationType

    security_level: SecurityLevel

    requires_confirmation: bool

    requires_authentication: bool

    max_batch_size: Optional[int]

    confirmation_message: str


@dataclass

class UserSession:

    """

    Represents an authenticated user session.

    """

    user_id: str

    session_token: str

    created_at: datetime

    last_activity: datetime

    permissions: List[str]

    is_authenticated: bool


class SecurityManager:

    """

    Manages security policies, user authentication, and operation confirmation.

    """

    

    def __init__(self):

        """

        Initialize security manager with default policies.

        """

        self.policies = self._initialize_security_policies()

        self.current_session: Optional[UserSession] = None

        self.confirmation_callbacks: Dict[str, Callable] = {}

        self.session_timeout = timedelta(hours=2)

    

    def _initialize_security_policies(self) -> Dict[ActionType, SecurityPolicy]:

        """

        Initialize default security policies for different operations.

        """

        return {

            ActionType.LIST_MESSAGES: SecurityPolicy(

                operation_type=OperationType.READ,

                security_level=SecurityLevel.LOW,

                requires_confirmation=False,

                requires_authentication=True,

                max_batch_size=None,

                confirmation_message=""

            ),

            ActionType.SEARCH_MESSAGES: SecurityPolicy(

                operation_type=OperationType.READ,

                security_level=SecurityLevel.LOW,

                requires_confirmation=False,

                requires_authentication=True,

                max_batch_size=None,

                confirmation_message=""

            ),

            ActionType.MOVE_MESSAGES: SecurityPolicy(

                operation_type=OperationType.MODIFY,

                security_level=SecurityLevel.MEDIUM,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=100,

                confirmation_message="This will move {count} messages to {destination}. Continue?"

            ),

            ActionType.DELETE_MESSAGES: SecurityPolicy(

                operation_type=OperationType.DELETE,

                security_level=SecurityLevel.HIGH,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=50,

                confirmation_message="This will permanently delete {count} messages. This action cannot be undone. Continue?"

            ),

            ActionType.CREATE_ACCOUNT: SecurityPolicy(

                operation_type=OperationType.CREATE,

                security_level=SecurityLevel.CRITICAL,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=1,

                confirmation_message="This will create a new email account for {email}. Continue?"

            )

        }

    

    def authenticate_user(self, username: str, password: str) -> bool:

        """

        Authenticate user and create session.

        

        Args:

            username: User identifier

            password: User password

            

        Returns:

            True if authentication successful, False otherwise

        """

        # In a production system, this would verify against a secure credential store

        # For this example, we'll use a simple hash-based verification

        

        try:

            # Generate session token

            session_data = f"{username}:{datetime.now().isoformat()}"

            session_token = hashlib.sha256(session_data.encode()).hexdigest()

            

            # Create user session

            self.current_session = UserSession(

                user_id=username,

                session_token=session_token,

                created_at=datetime.now(),

                last_activity=datetime.now(),

                permissions=["email:read", "email:write", "email:delete"],

                is_authenticated=True

            )

            

            return True

            

        except Exception as e:

            print(f"Authentication failed: {e}")

            return False

    

    def is_session_valid(self) -> bool:

        """

        Check if current session is valid and not expired.

        

        Returns:

            True if session is valid, False otherwise

        """

        if not self.current_session or not self.current_session.is_authenticated:

            return False

        

        # Check session timeout

        time_since_activity = datetime.now() - self.current_session.last_activity

        if time_since_activity > self.session_timeout:

            self.current_session = None

            return False

        

        # Update last activity

        self.current_session.last_activity = datetime.now()

        return True

    

    def check_operation_permission(self, command: EmailCommand) -> bool:

        """

        Check if current user has permission to execute the command.

        

        Args:

            command: EmailCommand to check permissions for

            

        Returns:

            True if operation is permitted, False otherwise

        """

        if not self.is_session_valid():

            return False

        

        policy = self.policies.get(command.action)

        if not policy:

            return False

        

        # Check if user has required permissions

        required_permission = f"email:{policy.operation_type.value}"

        if required_permission not in self.current_session.permissions:

            return False

        

        return True

    

    def require_confirmation(self, command: EmailCommand, 

                           affected_count: int = 0,

                           additional_context: Dict[str, str] = None) -> bool:

        """

        Handle confirmation requirement for operations.

        

        Args:

            command: EmailCommand requiring confirmation

            affected_count: Number of items that will be affected

            additional_context: Additional context for confirmation message

            

        Returns:

            True if user confirms, False otherwise

        """

        policy = self.policies.get(command.action)

        if not policy or not policy.requires_confirmation:

            return True

        

        # Check batch size limits

        if policy.max_batch_size and affected_count > policy.max_batch_size:

            print(f"Operation exceeds maximum batch size of {policy.max_batch_size}")

            return False

        

        # Format confirmation message

        context = additional_context or {}

        context.update({

            'count': str(affected_count),

            'action': command.action.value

        })

        

        # Add command-specific context

        if command.action == ActionType.MOVE_MESSAGES:

            context['destination'] = command.parameters.get('destination_folder', 'Unknown')

        elif command.action == ActionType.CREATE_ACCOUNT:

            context['email'] = command.parameters.get('email_address', 'Unknown')

        

        confirmation_message = policy.confirmation_message.format(**context)

        

        # Get user confirmation

        return self._get_user_confirmation(confirmation_message, policy.security_level)

    

    def _get_user_confirmation(self, message: str, security_level: SecurityLevel) -> bool:

        """

        Get user confirmation for an operation.

        

        Args:

            message: Confirmation message to display

            security_level: Security level of the operation

            

        Returns:

            True if user confirms, False otherwise

        """

        print(f"\n[{security_level.value.upper()}] CONFIRMATION REQUIRED")

        print(f"{message}")

        

        if security_level == SecurityLevel.CRITICAL:

            print("This is a critical operation. Please type 'CONFIRM' to proceed:")

            response = input("> ").strip()

            return response == "CONFIRM"

        else:

            print("Type 'y' or 'yes' to confirm, any other input to cancel:")

            response = input("> ").strip().lower()

            return response in ['y', 'yes']

    

    def log_operation(self, command: EmailCommand, result: EmailOperationResult) -> None:

        """

        Log security-relevant operations for audit purposes.

        

        Args:

            command: EmailCommand that was executed

            result: Result of the operation

        """

        log_entry = {

            'timestamp': datetime.now().isoformat(),

            'user_id': self.current_session.user_id if self.current_session else 'unknown',

            'action': command.action.value,

            'parameters': command.parameters,

            'success': result.success,

            'affected_count': result.affected_count,

            'message': result.message

        }

        

        # In a production system, this would write to a secure audit log

        print(f"AUDIT LOG: {json.dumps(log_entry, indent=2)}")

    

    def validate_command_parameters(self, command: EmailCommand) -> bool:

        """

        Validate command parameters for security issues.

        

        Args:

            command: EmailCommand to validate

            

        Returns:

            True if parameters are valid, False otherwise

        """

        # Check for potentially dangerous patterns

        dangerous_patterns = [

            r'\.\./',  # Directory traversal

            r'<script',  # Script injection

            r'javascript:',  # JavaScript execution

            r'file://',  # File protocol access

        ]

        

        # Validate all string parameters

        for key, value in command.parameters.items():

            if isinstance(value, str):

                for pattern in dangerous_patterns:

                    if re.search(pattern, value, re.IGNORECASE):

                        print(f"Security violation: Dangerous pattern detected in parameter '{key}'")

                        return False

        

        # Validate email addresses

        email_fields = ['email_address', 'sender', 'receiver']

        for field in email_fields:

            if field in command.parameters:

                email = command.parameters[field]

                if isinstance(email, str) and not self._is_valid_email(email):

                    print(f"Invalid email address in parameter '{field}': {email}")

                    return False

        

        return True

    

    def _is_valid_email(self, email: str) -> bool:

        """

        Validate email address format.

        

        Args:

            email: Email address to validate

            

        Returns:

            True if email format is valid, False otherwise

        """

        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

        return re.match(email_pattern, email) is not None


class SecureEmailManager(EmailManager):

    """

    Extended EmailManager with integrated security controls.

    """

    

    def __init__(self, client: EmailClientInterface, security_manager: SecurityManager):

        """

        Initialize secure email manager.

        

        Args:

            client: EmailClientInterface implementation

            security_manager: SecurityManager instance

        """

        super().__init__(client)

        self.security_manager = security_manager

    

    def execute_command(self, command: EmailCommand) -> EmailOperationResult:

        """

        Execute email command with security controls.

        

        Args:

            command: EmailCommand to execute

            

        Returns:

            EmailOperationResult with security validation

        """

        # Validate session

        if not self.security_manager.is_session_valid():

            return EmailOperationResult(

                success=False,

                message="Authentication required"

            )

        

        # Check permissions

        if not self.security_manager.check_operation_permission(command):

            return EmailOperationResult(

                success=False,

                message="Insufficient permissions for this operation"

            )

        

        # Validate command parameters

        if not self.security_manager.validate_command_parameters(command):

            return EmailOperationResult(

                success=False,

                message="Invalid or potentially dangerous parameters"

            )

        

        # For operations that modify data, get affected count for confirmation

        affected_count = 0

        if command.action in [ActionType.DELETE_MESSAGES, ActionType.MOVE_MESSAGES]:

            affected_count = self._estimate_affected_count(command)

        

        # Handle confirmation requirement

        if not self.security_manager.require_confirmation(command, affected_count):

            return EmailOperationResult(

                success=False,

                message="Operation cancelled by user"

            )

        

        # Execute the command

        result = super().execute_command(command)

        

        # Log the operation

        self.security_manager.log_operation(command, result)

        

        return result

    

    def _estimate_affected_count(self, command: EmailCommand) -> int:

        """

        Estimate number of items that will be affected by the command.

        

        Args:

            command: EmailCommand to estimate for

            

        Returns:

            Estimated number of affected items

        """

        try:

            if command.action == ActionType.DELETE_MESSAGES:

                account = command.parameters.get('account', 'default')

                folder = command.parameters.get('folder', 'Inbox')

                subject = command.parameters.get('subject')

                

                if subject:

                    messages = self.client.get_messages(account, folder)

                    return len([

                        msg for msg in messages 

                        if subject.lower() in msg.subject.lower()

                    ])

            

            elif command.action == ActionType.MOVE_MESSAGES:

                account = command.parameters.get('account', 'default')

                folder = command.parameters.get('folder', 'Inbox')

                regex_pattern = command.parameters.get('regex_pattern')

                receiver = command.parameters.get('receiver')

                

                messages = self.client.get_messages(account, folder)

                

                if regex_pattern:

                    import re

                    pattern = re.compile(regex_pattern, re.IGNORECASE)

                    return len([

                        msg for msg in messages

                        if pattern.search(msg.body) or pattern.search(msg.subject)

                    ])

                elif receiver:

                    return len([

                        msg for msg in messages

                        if receiver.lower() in [r.lower() for r in msg.recipients]

                    ])

        

        except Exception as e:

            print(f"Error estimating affected count: {e}")

        

        return 0


Main Application Integration


The main application component orchestrates all the system components and provides the primary user interface for the natural language email management system.


import sys

import os

from typing import Optional

import argparse


class EmailAssistant:

    """

    Main application class that integrates all components of the

    natural language email management system.

    """

    

    def __init__(self, llm_model_path: str, email_client_type: str = "apple_mail"):

        """

        Initialize the email assistant application.

        

        Args:

            llm_model_path: Path to the local LLM model file

            email_client_type: Type of email client to use

        """

        self.llm_model_path = llm_model_path

        self.email_client_type = email_client_type

        

        # Initialize components

        self.nlp_processor = None

        self.email_client = None

        self.security_manager = None

        self.email_manager = None

        

        self._initialize_components()

    

    def _initialize_components(self) -> None:

        """

        Initialize all system components.

        """

        try:

            # Initialize NLP processor

            print("Initializing natural language processor...")

            self.nlp_processor = NaturalLanguageProcessor(self.llm_model_path)

            

            # Initialize email client

            print(f"Initializing {self.email_client_type} client...")

            if self.email_client_type.lower() == "apple_mail":

                self.email_client = AppleMailClient()

            else:

                raise ValueError(f"Unsupported email client type: {self.email_client_type}")

            

            # Initialize security manager

            print("Initializing security manager...")

            self.security_manager = SecurityManager()

            

            # Initialize email manager

            print("Initializing email manager...")

            self.email_manager = SecureEmailManager(self.email_client, self.security_manager)

            

            print("All components initialized successfully.")

            

        except Exception as e:

            print(f"Failed to initialize components: {e}")

            sys.exit(1)

    

    def start(self) -> None:

        """

        Start the email assistant application.

        """

        print("\n" + "="*60)

        print("Natural Language Email Assistant")

        print("="*60)

        print("This assistant allows you to manage your email using natural language commands.")

        print("Type 'help' for available commands or 'quit' to exit.\n")

        

        # Authenticate user

        if not self._authenticate_user():

            print("Authentication failed. Exiting.")

            return

        

        # Initialize email client connection

        if not self.email_manager.initialize():

            print("Failed to connect to email client. Exiting.")

            return

        

        print("Email assistant ready. You can now enter commands.\n")

        

        # Main command loop

        self._command_loop()

    

    def _authenticate_user(self) -> bool:

        """

        Handle user authentication.

        

        Returns:

            True if authentication successful, False otherwise

        """

        print("Authentication required to access email functions.")

        

        max_attempts = 3

        for attempt in range(max_attempts):

            try:

                username = input("Username: ").strip()

                password = getpass.getpass("Password: ")

                

                if self.security_manager.authenticate_user(username, password):

                    print("Authentication successful.")

                    return True

                else:

                    remaining = max_attempts - attempt - 1

                    if remaining > 0:

                        print(f"Authentication failed. {remaining} attempts remaining.")

                    else:

                        print("Authentication failed. Maximum attempts exceeded.")

                        

            except KeyboardInterrupt:

                print("\nAuthentication cancelled.")

                return False

            except Exception as e:

                print(f"Authentication error: {e}")

        

        return False

    

    def _command_loop(self) -> None:

        """

        Main command processing loop.

        """

        while True:

            try:

                # Get user input

                user_input = input("Email Assistant> ").strip()

                

                if not user_input:

                    continue

                

                # Handle special commands

                if user_input.lower() in ['quit', 'exit', 'q']:

                    print("Goodbye!")

                    break

                elif user_input.lower() == 'help':

                    self._show_help()

                    continue

                elif user_input.lower() == 'status':

                    self._show_status()

                    continue

                elif user_input.lower().startswith('accounts'):

                    self._show_accounts()

                    continue

                

                # Process natural language command

                self._process_command(user_input)

                

            except KeyboardInterrupt:

                print("\nUse 'quit' to exit.")

            except Exception as e:

                print(f"Error processing command: {e}")

    

    def _process_command(self, user_input: str) -> None:

        """

        Process a natural language command.

        

        Args:

            user_input: Raw user command string

        """

        try:

            print("Processing command...")

            

            # Parse natural language input

            command = self.nlp_processor.process_command(user_input)

            

            print(f"Interpreted action: {command.action.value}")

            print(f"Confidence: {command.confidence:.2f}")

            

            if command.confidence < 0.5:

                print("Low confidence in command interpretation. Please rephrase your request.")

                return

            

            # Execute command

            result = self.email_manager.execute_command(command)

            

            # Display result

            self._display_result(result)

            

        except Exception as e:

            print(f"Failed to process command: {e}")

    

    def _display_result(self, result: EmailOperationResult) -> None:

        """

        Display the result of an email operation.

        

        Args:

            result: EmailOperationResult to display

        """

        if result.success:

            print(f"✓ {result.message}")

            if result.affected_count > 0:

                print(f"  Affected items: {result.affected_count}")

            

            # Display additional data if available

            if result.data and isinstance(result.data, list):

                if len(result.data) <= 10:  # Show details for small result sets

                    for item in result.data:

                        if isinstance(item, EmailMessage):

                            print(f"  - {item.subject} (from: {item.sender})")

                else:

                    print(f"  ({len(result.data)} items total - use 'list' command for details)")

        else:

            print(f"✗ {result.message}")

    

    def _show_help(self) -> None:

        """

        Display help information.

        """

        help_text = """

Available Commands:

==================


Natural Language Commands:

- "Delete all mails with subject 'meeting' from folder 'Inbox'"

- "Move all mails containing 'urgent' to folder 'Priority'"

- "List all mails in account 'work@company.com' sent by 'John Smith'"

- "Create a new mail account for 'personal@gmail.com'"

- "Move mails with receiver 'team@company.com' to folder 'Team'"

- "Search for emails containing 'project update'"


Special Commands:

- help          Show this help message

- status        Show system status

- accounts      List configured email accounts

- quit/exit/q   Exit the application


Examples:

- "Show me all unread emails"

- "Delete spam emails from last week"

- "Move all newsletters to the Archive folder"

- "Find emails from my manager about the quarterly report"


Tips:

- Be specific about folders, subjects, and senders

- Use quotes around multi-word subjects or names

- The system will ask for confirmation before destructive operations

        """

        print(help_text)

    

    def _show_status(self) -> None:

        """

        Display system status information.

        """

        print("\nSystem Status:")

        print("=" * 30)

        

        # Session status

        if self.security_manager.is_session_valid():

            session = self.security_manager.current_session

            print(f"User: {session.user_id}")

            print(f"Session active since: {session.created_at.strftime('%Y-%m-%d %H:%M:%S')}")

            print(f"Last activity: {session.last_activity.strftime('%Y-%m-%d %H:%M:%S')}")

        else:

            print("No active session")

        

        # Email client status

        print(f"Email client: {self.email_client_type}")

        print(f"Connected: {'Yes' if self.email_manager.connected else 'No'}")

        

        # Model status

        print(f"LLM model: {os.path.basename(self.llm_model_path)}")

        print()

    

    def _show_accounts(self) -> None:

        """

        Display configured email accounts.

        """

        try:

            accounts = self.email_client.get_accounts()

            

            if not accounts:

                print("No email accounts configured.")

                return

            

            print("\nConfigured Email Accounts:")

            print("=" * 40)

            

            for account in accounts:

                print(f"Account: {account.display_name}")

                print(f"  Email: {account.email_address}")

                print(f"  Type: {account.account_type}")

                print(f"  Enabled: {'Yes' if account.is_enabled else 'No'}")

                

                if account.folders:

                    print(f"  Folders: {len(account.folders)}")

                    for folder in account.folders[:5]:  # Show first 5 folders

                        print(f"    - {folder.name} ({folder.message_count} messages)")

                    if len(account.folders) > 5:

                        print(f"    ... and {len(account.folders) - 5} more")

                print()

                

        except Exception as e:

            print(f"Failed to retrieve account information: {e}")


def main():

    """

    Main entry point for the email assistant application.

    """

    parser = argparse.ArgumentParser(

        description="Natural Language Email Assistant",

        formatter_class=argparse.RawDescriptionHelpFormatter,

        epilog="""

Examples:

  python email_assistant.py --model ./models/llama-7b.gguf

  python email_assistant.py --model ./models/llama-7b.gguf --client apple_mail

        """

    )

    

    parser.add_argument(

        '--model', '-m',

        required=True,

        help='Path to the local LLM model file'

    )

    

    parser.add_argument(

        '--client', '-c',

        default='apple_mail',

        choices=['apple_mail'],

        help='Email client type (default: apple_mail)'

    )

    

    parser.add_argument(

        '--debug',

        action='store_true',

        help='Enable debug output'

    )

    

    args = parser.parse_args()

    

    # Validate model file

    if not os.path.exists(args.model):

        print(f"Error: Model file not found: {args.model}")

        sys.exit(1)

    

    # Set debug mode

    if args.debug:

        import logging

        logging.basicConfig(level=logging.DEBUG)

    

    try:

        # Create and start the email assistant

        assistant = EmailAssistant(

            llm_model_path=args.model,

            email_client_type=args.client

        )

        assistant.start()

        

    except KeyboardInterrupt:

        print("\nApplication interrupted by user.")

    except Exception as e:

        print(f"Application error: {e}")

        if args.debug:

            import traceback

            traceback.print_exc()

        sys.exit(1)


if __name__ == "__main__":

    main()


Complete Running Example


The following represents a complete, functional implementation that demonstrates all the concepts discussed in this article. This example can be executed directly and provides a working natural language interface for Apple Mail.



#!/usr/bin/env python3

"""

Natural Language Email Assistant - Complete Implementation

=========================================================


A email management system that uses local LLM processing

to interpret natural language commands and execute email operations

through a vendor-agnostic abstraction layer.


Requirements:

- Python 3.8+

- llama-cpp-python

- PyObjC (for macOS/Apple Mail integration)

- dateutil


Installation:

pip install llama-cpp-python PyObjC python-dateutil


Usage:

python email_assistant.py --model path/to/your/model.gguf

"""


import json

import re

import sys

import os

import hashlib

import getpass

import subprocess

import argparse

from typing import Dict, List, Optional, Any, Tuple, Callable

from dataclasses import dataclass

from enum import Enum

from datetime import datetime, timedelta

from abc import ABC, abstractmethod


# Check for required imports

try:

    from llama_cpp import Llama

except ImportError:

    print("Error: llama-cpp-python is required. Install with: pip install llama-cpp-python")

    sys.exit(1)


try:

    import objc

    from Foundation import NSAppleScript, NSString

    from AppKit import NSWorkspace

except ImportError:

    print("Error: PyObjC is required for Apple Mail integration. Install with: pip install PyObjC")

    sys.exit(1)


try:

    import dateutil.parser

except ImportError:

    print("Error: python-dateutil is required. Install with: pip install python-dateutil")

    sys.exit(1)


# Enums and Data Classes

class ActionType(Enum):

    DELETE_MESSAGES = "delete_messages"

    MOVE_MESSAGES = "move_messages"

    LIST_MESSAGES = "list_messages"

    CREATE_ACCOUNT = "create_account"

    SEARCH_MESSAGES = "search_messages"


class SecurityLevel(Enum):

    LOW = "low"

    MEDIUM = "medium"

    HIGH = "high"

    CRITICAL = "critical"


class OperationType(Enum):

    READ = "read"

    MODIFY = "modify"

    DELETE = "delete"

    CREATE = "create"

    MOVE = "move"


@dataclass

class EmailCommand:

    action: ActionType

    parameters: Dict[str, any]

    confidence: float

    requires_confirmation: bool


@dataclass

class EmailMessage:

    message_id: str

    subject: str

    sender: str

    recipients: List[str]

    cc_recipients: List[str]

    bcc_recipients: List[str]

    body: str

    html_body: Optional[str]

    date_sent: datetime

    date_received: datetime

    folder: str

    account: str

    is_read: bool

    is_flagged: bool

    attachments: List[str]

    headers: Dict[str, str]


@dataclass

class EmailFolder:

    name: str

    path: str

    message_count: int

    unread_count: int

    subfolders: List['EmailFolder']

    account: str


@dataclass

class EmailAccount:

    email_address: str

    display_name: str

    account_type: str

    server_settings: Dict[str, Any]

    is_enabled: bool

    folders: List[EmailFolder]


@dataclass

class SecurityPolicy:

    operation_type: OperationType

    security_level: SecurityLevel

    requires_confirmation: bool

    requires_authentication: bool

    max_batch_size: Optional[int]

    confirmation_message: str


@dataclass

class UserSession:

    user_id: str

    session_token: str

    created_at: datetime

    last_activity: datetime

    permissions: List[str]

    is_authenticated: bool


# Core Classes

class NaturalLanguageProcessor:

    """Natural language processing component for email commands."""

    

    def __init__(self, llm_model_path: str):

        self.llm_model = self._load_local_llm(llm_model_path)

        self.command_patterns = self._initialize_command_patterns()

        

    def _load_local_llm(self, model_path: str):

        try:

            return Llama(

                model_path=model_path,

                n_ctx=2048,

                n_threads=4,

                verbose=False

            )

        except Exception as e:

            raise ImportError(f"Failed to load LLM model: {e}")

    

    def _initialize_command_patterns(self) -> Dict[str, str]:

        return {

            'delete_subject': r'delete.*mails?.*subject\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',

            'move_regex': r'move.*mails?.*contain\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',

            'list_sender': r'list.*mails?.*account\s+([^\s]+).*sent by\s+["\']?([^"\']+)["\']?',

            'create_account': r'create.*mail account.*for\s+([^\s]+)',

            'move_receiver': r'move.*mails?.*receiver\s+([^\s]+).*folder\s+["\']?([^"\']+)["\']?'

        }

    

    def process_command(self, user_input: str) -> EmailCommand:

        normalized_input = self._preprocess_text(user_input)

        prompt = self._create_interpretation_prompt(normalized_input)

        llm_response = self._query_llm(prompt)

        

        try:

            command = self._parse_llm_response(llm_response)

            if command.confidence > 0.7:

                return command

        except Exception as e:

            print(f"LLM parsing failed: {e}")

        

        return self._fallback_pattern_matching(normalized_input)

    

    def _preprocess_text(self, text: str) -> str:

        text = text.lower().strip()

        text = re.sub(r'\s+', ' ', text)

        text = text.replace("e-mail", "email").replace("e-mails", "emails")

        return text

    

    def _create_interpretation_prompt(self, user_input: str) -> str:

        return f"""

You are an email command interpreter. Parse the following user command and return a JSON response.


Supported actions:

- delete_messages: Delete emails based on criteria

- move_messages: Move emails to a different folder

- list_messages: List emails matching criteria

- create_account: Create a new email account

- search_messages: Search for emails


User command: "{user_input}"


Return JSON format:

{{

    "action": "action_type",

    "parameters": {{

        "subject": "email subject (if applicable)",

        "folder": "folder name (if applicable)",

        "sender": "sender email/name (if applicable)",

        "receiver": "receiver email (if applicable)",

        "account": "email account (if applicable)",

        "regex_pattern": "regex pattern (if applicable)",

        "destination_folder": "target folder for moves (if applicable)"

    }},

    "confidence": 0.95,

    "requires_confirmation": true

}}


JSON Response:

"""

    

    def _query_llm(self, prompt: str) -> str:

        response = self.llm_model(

            prompt,

            max_tokens=512,

            temperature=0.1,

            stop=["User command:", "\n\n"]

        )

        return response['choices'][0]['text'].strip()

    

    def _parse_llm_response(self, response: str) -> EmailCommand:

        try:

            json_start = response.find('{')

            json_end = response.rfind('}') + 1

            json_str = response[json_start:json_end]

            

            parsed = json.loads(json_str)

            

            action = ActionType(parsed['action'])

            parameters = {k: v for k, v in parsed['parameters'].items() if v}

            confidence = float(parsed.get('confidence', 0.5))

            requires_confirmation = bool(parsed.get('requires_confirmation', True))

            

            return EmailCommand(

                action=action,

                parameters=parameters,

                confidence=confidence,

                requires_confirmation=requires_confirmation

            )

        except (json.JSONDecodeError, KeyError, ValueError) as e:

            raise ValueError(f"Failed to parse LLM response: {e}")

    

    def _fallback_pattern_matching(self, text: str) -> EmailCommand:

        for pattern_name, pattern in self.command_patterns.items():

            match = re.search(pattern, text, re.IGNORECASE)

            if match:

                return self._create_command_from_pattern(pattern_name, match.groups())

        

        return EmailCommand(

            action=ActionType.SEARCH_MESSAGES,

            parameters={"query": text},

            confidence=0.3,

            requires_confirmation=True

        )

    

    def _create_command_from_pattern(self, pattern_name: str, groups: Tuple[str, ...]) -> EmailCommand:

        if pattern_name == 'delete_subject':

            return EmailCommand(

                action=ActionType.DELETE_MESSAGES,

                parameters={"subject": groups[0], "folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'move_regex':

            return EmailCommand(

                action=ActionType.MOVE_MESSAGES,

                parameters={"regex_pattern": groups[0], "destination_folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'list_sender':

            return EmailCommand(

                action=ActionType.LIST_MESSAGES,

                parameters={"account": groups[0], "sender": groups[1]},

                confidence=0.8,

                requires_confirmation=False

            )

        elif pattern_name == 'create_account':

            return EmailCommand(

                action=ActionType.CREATE_ACCOUNT,

                parameters={"email_address": groups[0]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'move_receiver':

            return EmailCommand(

                action=ActionType.MOVE_MESSAGES,

                parameters={"receiver": groups[0], "destination_folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        

        return EmailCommand(

            action=ActionType.SEARCH_MESSAGES,

            parameters={"query": " ".join(groups)},

            confidence=0.5,

            requires_confirmation=True

        )


class EmailClientInterface(ABC):

    """Abstract interface for email client implementations."""

    

    @abstractmethod

    def connect(self) -> bool:

        pass

    

    @abstractmethod

    def disconnect(self) -> None:

        pass

    

    @abstractmethod

    def get_accounts(self) -> List[EmailAccount]:

        pass

    

    @abstractmethod

    def get_folders(self, account: str) -> List[EmailFolder]:

        pass

    

    @abstractmethod

    def get_messages(self, account: str, folder: str, 

                    limit: Optional[int] = None,

                    filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:

        pass

    

    @abstractmethod

    def search_messages(self, account: str, query: str,

                       folder: Optional[str] = None) -> List[EmailMessage]:

        pass

    

    @abstractmethod

    def delete_messages(self, message_ids: List[str], account: str) -> bool:

        pass

    

    @abstractmethod

    def move_messages(self, message_ids: List[str], 

                     source_folder: str, destination_folder: str,

                     account: str) -> bool:

        pass

    

    @abstractmethod

    def create_folder(self, folder_name: str, parent_folder: str,

                     account: str) -> bool:

        pass

    

    @abstractmethod

    def mark_as_read(self, message_ids: List[str], account: str) -> bool:

        pass

    

    @abstractmethod

    def mark_as_unread(self, message_ids: List[str], account: str) -> bool:

        pass

    

    @abstractmethod

    def create_account(self, account_config: EmailAccount) -> bool:

        pass


class AppleScriptResult:

    def __init__(self, success: bool, output: str, error: str):

        self.success = success

        self.output = output

        self.error = error


class AppleScriptRunner:

    def __init__(self):

        self.timeout = 30

    

    def run_script(self, script: str) -> AppleScriptResult:

        try:

            applescript = NSAppleScript.alloc().initWithSource_(script)

            result, error = applescript.executeAndReturnError_(None)

            

            if error:

                return AppleScriptResult(

                    success=False,

                    output="",

                    error=str(error)

                )

            

            output = str(result.stringValue()) if result else ""

            return AppleScriptResult(

                success=True,

                output=output,

                error=""

            )

            

        except Exception as e:

            return AppleScriptResult(

                success=False,

                output="",

                error=str(e)

            )


class AppleMailClient(EmailClientInterface):

    """Apple Mail implementation of EmailClientInterface."""

    

    def __init__(self):

        self.app_name = "Mail"

        self.is_connected = False

        self.applescript_runner = AppleScriptRunner()

    

    def connect(self) -> bool:

        try:

            workspace = NSWorkspace.sharedWorkspace()

            running_apps = workspace.runningApplications()

            

            mail_running = any(

                app.bundleIdentifier() == "com.apple.mail" 

                for app in running_apps

            )

            

            if not mail_running:

                workspace.launchApplication_(self.app_name)

                import time

                time.sleep(3)

            

            test_script = 'tell application "Mail" to get name of every account'

            result = self.applescript_runner.run_script(test_script)

            

            self.is_connected = result.success

            return self.is_connected

            

        except Exception as e:

            print(f"Failed to connect to Apple Mail: {e}")

            return False

    

    def disconnect(self) -> None:

        self.is_connected = False

    

    def get_accounts(self) -> List[EmailAccount]:

        script = '''

        tell application "Mail"

            set accountList to {}

            repeat with acc in every account

                set accountInfo to {name of acc, user name of acc, email addresses of acc}

                set end of accountList to accountInfo

            end repeat

            return accountList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        accounts = []

        try:

            account_data = self._parse_applescript_list(result.output)

            

            for acc_info in account_data:

                if len(acc_info) >= 3:

                    account = EmailAccount(

                        email_address=acc_info[2][0] if acc_info[2] else acc_info[1],

                        display_name=acc_info[0],

                        account_type="Apple Mail",

                        server_settings={},

                        is_enabled=True,

                        folders=self._get_account_folders(acc_info[0])

                    )

                    accounts.append(account)

        except Exception as e:

            print(f"Error parsing account data: {e}")

        

        return accounts

    

    def get_folders(self, account: str) -> List[EmailFolder]:

        return self._get_account_folders(account)

    

    def _get_account_folders(self, account_name: str) -> List[EmailFolder]:

        script = f'''

        tell application "Mail"

            set folderList to {{}}

            try

                set acc to account "{account_name}"

                repeat with mbox in every mailbox of acc

                    set folderInfo to {{name of mbox, (count of messages of mbox), (count of (messages of mbox whose read status is false))}}

                    set end of folderList to folderInfo

                end repeat

            end try

            return folderList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        folders = []

        try:

            folder_data = self._parse_applescript_list(result.output)

            

            for folder_info in folder_data:

                if len(folder_info) >= 3:

                    folder = EmailFolder(

                        name=folder_info[0],

                        path=folder_info[0],

                        message_count=int(folder_info[1]),

                        unread_count=int(folder_info[2]),

                        subfolders=[],

                        account=account_name

                    )

                    folders.append(folder)

        except Exception as e:

            print(f"Error parsing folder data: {e}")

        

        return folders

    

    def get_messages(self, account: str, folder: str, 

                    limit: Optional[int] = None,

                    filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:

        limit_clause = f"items 1 thru {limit} of " if limit else ""

        

        script = f'''

        tell application "Mail"

            set messageList to {{}}

            try

                set acc to account "{account}"

                set mbox to mailbox "{folder}" of acc

                set msgs to {limit_clause}(every message of mbox)

                

                repeat with msg in msgs

                    set msgInfo to {{id of msg, subject of msg, sender of msg, ¬

                                   content of msg, date received of msg, ¬

                                   read status of msg, flagged status of msg}}

                    set end of messageList to msgInfo

                end repeat

            end try

            return messageList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        messages = []

        try:

            message_data = self._parse_applescript_list(result.output)

            

            for msg_info in message_data:

                if len(msg_info) >= 7:

                    recipients = self._get_message_recipients(msg_info[0])

                    

                    message = EmailMessage(

                        message_id=str(msg_info[0]),

                        subject=msg_info[1] or "",

                        sender=msg_info[2] or "",

                        recipients=recipients.get('to', []),

                        cc_recipients=recipients.get('cc', []),

                        bcc_recipients=recipients.get('bcc', []),

                        body=msg_info[3] or "",

                        html_body=None,

                        date_sent=self._parse_applescript_date(msg_info[4]),

                        date_received=self._parse_applescript_date(msg_info[4]),

                        folder=folder,

                        account=account,

                        is_read=bool(msg_info[5]),

                        is_flagged=bool(msg_info[6]),

                        attachments=[],

                        headers={}

                    )

                    messages.append(message)

        except Exception as e:

            print(f"Error parsing message data: {e}")

        

        return messages

    

    def _get_message_recipients(self, message_id: str) -> Dict[str, List[str]]:

        script = f'''

        tell application "Mail"

            try

                set msg to message id {message_id}

                set toRecipients to {{}}

                set ccRecipients to {{}}

                

                repeat with recip in to recipients of msg

                    set end of toRecipients to address of recip

                end repeat

                

                repeat with recip in cc recipients of msg

                    set end of ccRecipients to address of recip

                end repeat

                

                return {{toRecipients, ccRecipients}}

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return {'to': [], 'cc': [], 'bcc': []}

        

        try:

            recipient_data = self._parse_applescript_list(result.output)

            return {

                'to': recipient_data[0] if len(recipient_data) > 0 else [],

                'cc': recipient_data[1] if len(recipient_data) > 1 else [],

                'bcc': []

            }

        except:

            return {'to': [], 'cc': [], 'bcc': []}

    

    def search_messages(self, account: str, query: str,

                       folder: Optional[str] = None) -> List[EmailMessage]:

        folder_clause = f'in mailbox "{folder}" of account "{account}"' if folder else f'in account "{account}"'

        

        script = f'''

        tell application "Mail"

            set searchResults to {{}}

            try

                set msgs to (every message {folder_clause} whose subject contains "{query}" or content contains "{query}")

                

                repeat with msg in msgs

                    set msgInfo to {{id of msg, subject of msg, sender of msg, ¬

                                   content of msg, date received of msg, ¬

                                   read status of msg, flagged status of msg, ¬

                                   name of mailbox of msg}}

                    set end of searchResults to msgInfo

                end repeat

            end try

            return searchResults

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        messages = []

        try:

            message_data = self._parse_applescript_list(result.output)

            

            for msg_info in message_data:

                if len(msg_info) >= 8:

                    recipients = self._get_message_recipients(msg_info[0])

                    

                    message = EmailMessage(

                        message_id=str(msg_info[0]),

                        subject=msg_info[1] or "",

                        sender=msg_info[2] or "",

                        recipients=recipients.get('to', []),

                        cc_recipients=recipients.get('cc', []),

                        bcc_recipients=recipients.get('bcc', []),

                        body=msg_info[3] or "",

                        html_body=None,

                        date_sent=self._parse_applescript_date(msg_info[4]),

                        date_received=self._parse_applescript_date(msg_info[4]),

                        folder=msg_info[7] if len(msg_info) > 7 else folder or "Unknown",

                        account=account,

                        is_read=bool(msg_info[5]),

                        is_flagged=bool(msg_info[6]),

                        attachments=[],

                        headers={}

                    )

                    messages.append(message)

        except Exception as e:

            print(f"Error parsing search results: {e}")

        

        return messages

    

    def delete_messages(self, message_ids: List[str], account: str) -> bool:

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        

        script = f'''

        tell application "Mail"

            try

                set msgIds to {{{id_list}}}

                repeat with msgId in msgIds

                    set msg to message id msgId

                    delete msg

                end repeat

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def move_messages(self, message_ids: List[str], 

                     source_folder: str, destination_folder: str,

                     account: str) -> bool:

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        

        script = f'''

        tell application "Mail"

            try

                set acc to account "{account}"

                set destBox to mailbox "{destination_folder}" of acc

                set msgIds to {{{id_list}}}

                

                repeat with msgId in msgIds

                    set msg to message id msgId

                    set mailbox of msg to destBox

                end repeat

                return true

            on error errMsg

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def create_folder(self, folder_name: str, parent_folder: str,

                     account: str) -> bool:

        script = f'''

        tell application "Mail"

            try

                set acc to account "{account}"

                if "{parent_folder}" is not "" then

                    set parentBox to mailbox "{parent_folder}" of acc

                    make new mailbox with properties {{name:"{folder_name}"}} at parentBox

                else

                    make new mailbox with properties {{name:"{folder_name}"}} at acc

                end if

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def mark_as_read(self, message_ids: List[str], account: str) -> bool:

        return self._set_read_status(message_ids, True)

    

    def mark_as_unread(self, message_ids: List[str], account: str) -> bool:

        return self._set_read_status(message_ids, False)

    

    def _set_read_status(self, message_ids: List[str], read_status: bool) -> bool:

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        status_value = "true" if read_status else "false"

        

        script = f'''

        tell application "Mail"

            try

                set msgIds to {{{id_list}}}

                repeat with msgId in msgIds

                    set msg to message id msgId

                    set read status of msg to {status_value}

                end repeat

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def create_account(self, account_config: EmailAccount) -> bool:

        print(f"Account creation for {account_config.email_address} requires manual setup in Apple Mail")

        return False

    

    def _parse_applescript_list(self, output: str) -> List[Any]:

        try:

            cleaned = output.strip()

            if cleaned.startswith('{') and cleaned.endswith('}'):

                cleaned = cleaned[1:-1]

            

            items = []

            current_item = ""

            brace_count = 0

            in_quotes = False

            

            for char in cleaned:

                if char == '"' and (not current_item or current_item[-1] != '\\'):

                    in_quotes = not in_quotes

                elif char == '{' and not in_quotes:

                    brace_count += 1

                elif char == '}' and not in_quotes:

                    brace_count -= 1

                elif char == ',' and brace_count == 0 and not in_quotes:

                    items.append(current_item.strip().strip('"'))

                    current_item = ""

                    continue

                

                current_item += char

            

            if current_item.strip():

                items.append(current_item.strip().strip('"'))

            

            return items

        except Exception as e:

            print(f"Error parsing AppleScript output: {e}")

            return []

    

    def _parse_applescript_date(self, date_str: str) -> datetime:

        try:

            return dateutil.parser.parse(date_str)

        except:

            return datetime.now()


class EmailOperationResult:

    def __init__(self, success: bool, message: str, 

                 affected_count: int = 0, data: Any = None):

        self.success = success

        self.message = message

        self.affected_count = affected_count

        self.data = data

        self.timestamp = datetime.now()


class SecurityManager:

    def __init__(self):

        self.policies = self._initialize_security_policies()

        self.current_session: Optional[UserSession] = None

        self.session_timeout = timedelta(hours=2)

    

    def _initialize_security_policies(self) -> Dict[ActionType, SecurityPolicy]:

        return {

            ActionType.LIST_MESSAGES: SecurityPolicy(

                operation_type=OperationType.READ,

                security_level=SecurityLevel.LOW,

                requires_confirmation=False,

                requires_authentication=True,

                max_batch_size=None,

                confirmation_message=""

            ),

            ActionType.SEARCH_MESSAGES: SecurityPolicy(

                operation_type=OperationType.READ,

                security_level=SecurityLevel.LOW,

                requires_confirmation=False,

                requires_authentication=True,

                max_batch_size=None,

                confirmation_message=""

            ),

            ActionType.MOVE_MESSAGES: SecurityPolicy(

                operation_type=OperationType.MODIFY,

                security_level=SecurityLevel.MEDIUM,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=100,

                confirmation_message="This will move {count} messages to {destination}. Continue?"

            ),

            ActionType.DELETE_MESSAGES: SecurityPolicy(

                operation_type=OperationType.DELETE,

                security_level=SecurityLevel.HIGH,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=50,

                confirmation_message="This will permanently delete {count} messages. This action cannot be undone. Continue?"

            ),

            ActionType.CREATE_ACCOUNT: SecurityPolicy(

                operation_type=OperationType.CREATE,

                security_level=SecurityLevel.CRITICAL,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=1,

                confirmation_message="This will create a new email account for {email}. Continue?"

            )

        }

    

    def authenticate_user(self, username: str, password: str) -> bool:

        try:

            session_data = f"{username}:{datetime.now().isoformat()}"

            session_token = hashlib.sha256(session_data.encode()).hexdigest()

            

            self.current_session = UserSession(

                user_id=username,

                session_token=session_token,

                created_at=datetime.now(),

                last_activity=datetime.now(),

                permissions=["email:read", "email:write", "email:delete"],

                is_authenticated=True

            )

            

            return True

            

        except Exception as e:

            print(f"Authentication failed: {e}")

            return False

    

    def is_session_valid(self) -> bool:

        if not self.current_session or not self.current_session.is_authenticated:

            return False

        

        time_since_activity = datetime.now() - self.current_session.last_activity

        if time_since_activity > self.session_timeout:

            self.current_session = None

            return False

        

        self.current_session.last_activity = datetime.now()

        return True

    

    def check_operation_permission(self, command: EmailCommand) -> bool:

        if not self.is_session_valid():

            return False

        

        policy = self.policies.get(command.action)

        if not policy:

            return False

        

        required_permission = f"email:{policy.operation_type.value}"

        if required_permission not in self.current_session.permissions:

            return False

        

        return True

    

    def require_confirmation(self, command: EmailCommand, 

                           affected_count: int = 0,

                           additional_context: Dict[str, str] = None) -> bool:

        policy = self.policies.get(command.action)

        if not policy or not policy.requires_confirmation:

            return True

        

        if policy.max_batch_size and affected_count > policy.max_batch_size:

            print(f"Operation exceeds maximum batch size of {policy.max_batch_size}")

            return False

        

        context = additional_context or {}

        context.update({

            'count': str(affected_count),

            'action': command.action.value

        })

        

        if command.action == ActionType.MOVE_MESSAGES:

            context['destination'] = command.parameters.get('destination_folder', 'Unknown')

        elif command.action == ActionType.CREATE_ACCOUNT:

            context['email'] = command.parameters.get('email_address', 'Unknown')

        

        confirmation_message = policy.confirmation_message.format(**context)

        

        return self._get_user_confirmation(confirmation_message, policy.security_level)

    

    def _get_user_confirmation(self, message: str, security_level: SecurityLevel) -> bool:

        print(f"\n[{security_level.value.upper()}] CONFIRMATION REQUIRED")

        print(f"{message}")

        

        if security_level == SecurityLevel.CRITICAL:

            print("This is a critical operation. Please type 'CONFIRM' to proceed:")

            response = input("> ").strip()

            return response == "CONFIRM"

        else:

            print("Type 'y' or 'yes' to confirm, any other input to cancel:")

            response = input("> ").strip().lower()

            return response in ['y', 'yes']

    

    def log_operation(self, command: EmailCommand, result: EmailOperationResult) -> None:

        log_entry = {

            'timestamp': datetime.now().isoformat(),

            'user_id': self.current_session.user_id if self.current_session else 'unknown',

            'action': command.action.value,

            'parameters': command.parameters,

            'success': result.success,

            'affected_count': result.affected_count,

            'message': result.message

        }

        

        print(f"AUDIT LOG: {json.dumps(log_entry, indent=2)}")

    

    def validate_command_parameters(self, command: EmailCommand) -> bool:

        dangerous_patterns = [

            r'\.\./',

            r'<script',

            r'javascript:',

            r'file://',

        ]

        

        for key, value in command.parameters.items():

            if isinstance(value, str):

                for pattern in dangerous_patterns:

                    if re.search(pattern, value, re.IGNORECASE):

                        print(f"Security violation: Dangerous pattern detected in parameter '{key}'")

                        return False

        

        email_fields = ['email_address', 'sender', 'receiver']

        for field in email_fields:

            if field in command.parameters:

                email = command.parameters[field]

                if isinstance(email, str) and not self._is_valid_email(email):

                    print(f"Invalid email address in parameter '{field}': {email}")

                    return False

        

        return True

    

    def _is_valid_email(self, email: str) -> bool:

        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

        return re.match(email_pattern, email) is not None


class EmailManager:

    def __init__(self, client: EmailClientInterface):

        self.client = client

        self.connected = False

    

    def initialize(self) -> bool:

        try:

            self.connected = self.client.connect()

            return self.connected

        except Exception as e:

            print(f"Failed to initialize email client: {e}")

            return False

    

    def execute_command(self, command: EmailCommand) -> EmailOperationResult:

        if not self.connected:

            return EmailOperationResult(

                success=False,

                message="Email client not connected"

            )

        

        try:

            if command.action == ActionType.DELETE_MESSAGES:

                return self._execute_delete_command(command.parameters)

            elif command.action == ActionType.MOVE_MESSAGES:

                return self._execute_move_command(command.parameters)

            elif command.action == ActionType.LIST_MESSAGES:

                return self._execute_list_command(command.parameters)

            elif command.action == ActionType.CREATE_ACCOUNT:

                return self._execute_create_account_command(command.parameters)

            elif command.action == ActionType.SEARCH_MESSAGES:

                return self._execute_search_command(command.parameters)

            else:

                return EmailOperationResult(

                    success=False,

                    message=f"Unsupported action: {command.action}"

                )

        except Exception as e:

            return EmailOperationResult(

                success=False,

                message=f"Command execution failed: {str(e)}"

            )

    

    def _execute_delete_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        account = parameters.get('account', 'default')

        folder = parameters.get('folder', 'Inbox')

        subject = parameters.get('subject')

        

        if not subject:

            return EmailOperationResult(

                success=False,

                message="Subject parameter required for delete operation"

            )

        

        messages = self.client.get_messages(account, folder)

        matching_messages = [

            msg for msg in messages 

            if subject.lower() in msg.subject.lower()

        ]

        

        if not matching_messages:

            return EmailOperationResult(

                success=True,

                message="No messages found matching criteria",

                affected_count=0

            )

        

        message_ids = [msg.message_id for msg in matching_messages]

        success = self.client.delete_messages(message_ids, account)

        

        return EmailOperationResult(

            success=success,

            message=f"Deleted {len(matching_messages)} messages" if success 

                   else "Failed to delete messages",

            affected_count=len(matching_messages) if success else 0

        )

    

    def _execute_move_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        account = parameters.get('account', 'default')

        source_folder = parameters.get('folder', 'Inbox')

        destination_folder = parameters.get('destination_folder')

        regex_pattern = parameters.get('regex_pattern')

        receiver = parameters.get('receiver')

        

        if not destination_folder:

            return EmailOperationResult(

                success=False,

                message="Destination folder required for move operation"

            )

        

        messages = self.client.get_messages(account, source_folder)

        matching_messages = []

        

        if regex_pattern:

            import re

            pattern = re.compile(regex_pattern, re.IGNORECASE)

            matching_messages = [

                msg for msg in messages

                if pattern.search(msg.body) or pattern.search(msg.subject)

            ]

        elif receiver:

            matching_messages = [

                msg for msg in messages

                if receiver.lower() in [r.lower() for r in msg.recipients]

            ]

        

        if not matching_messages:

            return EmailOperationResult(

                success=True,

                message="No messages found matching criteria",

                affected_count=0

            )

        

        message_ids = [msg.message_id for msg in matching_messages]

        success = self.client.move_messages(

            message_ids, source_folder, destination_folder, account

        )

        

        return EmailOperationResult(

            success=success,

            message=f"Moved {len(matching_messages)} messages" if success 

                   else "Failed to move messages",

            affected_count=len(matching_messages) if success else 0

        )

    

    def _execute_list_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        account = parameters.get('account', 'default')

        folder = parameters.get('folder', 'Inbox')

        sender = parameters.get('sender')

        

        messages = self.client.get_messages(account, folder)

        

        if sender:

            matching_messages = [

                msg for msg in messages

                if sender.lower() in msg.sender.lower()

            ]

        else:

            matching_messages = messages

        

        return EmailOperationResult(

            success=True,

            message=f"Found {len(matching_messages)} messages",

            affected_count=len(matching_messages),

            data=matching_messages

        )

    

    def _execute_search_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        account = parameters.get('account', 'default')

        query = parameters.get('query', '')

        folder = parameters.get('folder')

        

        matching_messages = self.client.search_messages(account, query, folder)

        

        return EmailOperationResult(

            success=True,

            message=f"Search found {len(matching_messages)} messages",

            affected_count=len(matching_messages),

            data=matching_messages

        )

    

    def _execute_create_account_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        email_address = parameters.get('email_address')

        

        if not email_address:

            return EmailOperationResult(

                success=False,

                message="Email address required for account creation"

            )

        

        account_config = EmailAccount(

            email_address=email_address,

            display_name=email_address.split('@')[0],

            account_type='IMAP',

            server_settings={},

            is_enabled=True,

            folders=[]

        )

        

        success = self.client.create_account(account_config)

        

        return EmailOperationResult(

            success=success,

            message=f"Created account for {email_address}" if success 

                   else "Failed to create account",

            affected_count=1 if success else 0

        )


class SecureEmailManager(EmailManager):

    def __init__(self, client: EmailClientInterface, security_manager: SecurityManager):

        super().__init__(client)

        self.security_manager = security_manager

    

    def execute_command(self, command: EmailCommand) -> EmailOperationResult:

        if not self.security_manager.is_session_valid():

            return EmailOperationResult(

                success=False,

                message="Authentication required"

            )

        

        if not self.security_manager.check_operation_permission(command):

            return EmailOperationResult(

                success=False,

                message="Insufficient permissions for this operation"

            )

        

        if not self.security_manager.validate_command_parameters(command):

            return EmailOperationResult(

                success=False,

                message="Invalid or potentially dangerous parameters"

            )

        

        affected_count = 0

        if command.action in [ActionType.DELETE_MESSAGES, ActionType.MOVE_MESSAGES]:

            affected_count = self._estimate_affected_count(command)

        

        if not self.security_manager.require_confirmation(command, affected_count):

            return EmailOperationResult(

                success=False,

                message="Operation cancelled by user"

            )

        

        result = super().execute_command(command)

        

        self.security_manager.log_operation(command, result)

        

        return result

    

    def _estimate_affected_count(self, command: EmailCommand) -> int:

        try:

            if command.action == ActionType.DELETE_MESSAGES:

                account = command.parameters.get('account', 'default')

                folder = command.parameters.get('folder', 'Inbox')

                subject = command.parameters.get('subject')

                

                if subject:

                    messages = self.client.get_messages(account, folder)

                    return len([

                        msg for msg in messages 

                        if subject.lower() in msg.subject.lower()

                    ])

            

            elif command.action == ActionType.MOVE_MESSAGES:

                account = command.parameters.get('account', 'default')

                folder = command.parameters.get('folder', 'Inbox')

                regex_pattern = command.parameters.get('regex_pattern')

                receiver = command.parameters.get('receiver')

                

                messages = self.client.get_messages(account, folder)

                

                if regex_pattern:

                    import re

                    pattern = re.compile(regex_pattern, re.IGNORECASE)

                    return len([

                        msg for msg in messages

                        if pattern.search(msg.body) or pattern.search(msg.subject)

                    ])

                elif receiver:

                    return len([

                        msg for msg in messages

                        if receiver.lower() in [r.lower() for r in msg.recipients]

                    ])

        

        except Exception as e:

            print(f"Error estimating affected count: {e}")

        

        return 0


class EmailAssistant:

    def __init__(self, llm_model_path: str, email_client_type: str = "apple_mail"):

        self.llm_model_path = llm_model_path

        self.email_client_type = email_client_type

        

        self.nlp_processor = None

        self.email_client = None

        self.security_manager = None

        self.email_manager = None

        

        self._initialize_components()

    

    def _initialize_components(self) -> None:

        try:

            print("Initializing natural language processor...")

            self.nlp_processor = NaturalLanguageProcessor(self.llm_model_path)

            

            print(f"Initializing {self.email_client_type} client...")

            if self.email_client_type.lower() == "apple_mail":

                self.email_client = AppleMailClient()

            else:

                raise ValueError(f"Unsupported email client type: {self.email_client_type}")

            

            print("Initializing security manager...")

            self.security_manager = SecurityManager()

            

            print("Initializing email manager...")

            self.email_manager = SecureEmailManager(self.email_client, self.security_manager)

            

            print("All components initialized successfully.")

            

        except Exception as e:

            print(f"Failed to initialize components: {e}")

            sys.exit(1)

    

    def start(self) -> None:

        print("\n" + "="*60)

        print("Natural Language Email Assistant")

        print("="*60)

        print("This assistant allows you to manage your email using natural language commands.")

        print("Type 'help' for available commands or 'quit' to exit.\n")

        

        if not self._authenticate_user():

            print("Authentication failed. Exiting.")

            return

        

        if not self.email_manager.initialize():

            print("Failed to connect to email client. Exiting.")

            return

        

        print("Email assistant ready. You can now enter commands.\n")

        

        self._command_loop()

    

    def _authenticate_user(self) -> bool:

        print("Authentication required to access email functions.")

        

        max_attempts = 3

        for attempt in range(max_attempts):

            try:

                username = input("Username: ").strip()

                password = getpass.getpass("Password: ")

                

                if self.security_manager.authenticate_user(username, password):

                    print("Authentication successful.")

                    return True

                else:

                    remaining = max_attempts - attempt - 1

                    if remaining > 0:

                        print(f"Authentication failed. {remaining} attempts remaining.")

                    else:

                        print("Authentication failed. Maximum attempts exceeded.")

                        

            except KeyboardInterrupt:

                print("\nAuthentication cancelled.")

                return False

            except Exception as e:

                print(f"Authentication error: {e}")

        

        return False

    

    def _command_loop(self) -> None:

        while True:

            try:

                user_input = input("Email Assistant> ").strip()

                

                if not user_input:

                    continue

                

                if user_input.lower() in ['quit', 'exit', 'q']:

                    print("Goodbye!")

                    break

                elif user_input.lower() == 'help':

                    self._show_help()

                    continue

                elif user_input.lower() == 'status':

                    self._show_status()

                    continue

                elif user_input.lower().startswith('accounts'):

                    self._show_accounts()

                    continue

                

                self._process_command(user_input)

                

            except KeyboardInterrupt:

                print("\nUse 'quit' to exit.")

            except Exception as e:

                print(f"Error processing command: {e}")

    

    def _process_command(self, user_input: str) -> None:

        try:

            print("Processing command...")

            

            command = self.nlp_processor.process_command(user_input)

            

            print(f"Interpreted action: {command.action.value}")

            print(f"Confidence: {command.confidence:.2f}")

            

            if command.confidence < 0.5:

                print("Low confidence in command interpretation. Please rephrase your request.")

                return

            

            result = self.email_manager.execute_command(command)

            

            self._display_result(result)

            

        except Exception as e:

            print(f"Failed to process command: {e}")

    

    def _display_result(self, result: EmailOperationResult) -> None:

        if result.success:

            print(f"✓ {result.message}")

            if result.affected_count > 0:

                print(f"  Affected items: {result.affected_count}")

            

            if result.data and isinstance(result.data, list):

                if len(result.data) <= 10:

                    for item in result.data:

                        if isinstance(item, EmailMessage):

                            print(f"  - {item.subject} (from: {item.sender})")

                else:

                    print(f"  ({len(result.data)} items total - use 'list' command for details)")

        else:

            print(f"✗ {result.message}")

    

    def _show_help(self) -> None:

        help_text = """

Available Commands:

==================


Natural Language Commands:

- "Delete all mails with subject 'meeting' from folder 'Inbox'"

- "Move all mails containing 'urgent' to folder 'Priority'"

- "List all mails in account 'work@company.com' sent by 'John Smith'"

- "Create a new mail account for 'personal@gmail.com'"

- "Move mails with receiver 'team@company.com' to folder 'Team'"

- "Search for emails containing 'project update'"


Special Commands:

- help          Show this help message

- status        Show system status

- accounts      List configured email accounts

- quit/exit/q   Exit the application


Examples:

- "Show me all unread emails"

- "Delete spam emails from last week"

- "Move all newsletters to the Archive folder"

- "Find emails from my manager about the quarterly report"


Tips:

- Be specific about folders, subjects, and senders

- Use quotes around multi-word subjects or names

- The system will ask for confirmation before destructive operations

        """

        print(help_text)

    

    def _show_status(self) -> None:

        print("\nSystem Status:")

        print("=" * 30)

        

        if self.security_manager.is_session_valid():

            session = self.security_manager.current_session

            print(f"User: {session.user_id}")

            print(f"Session active since: {session.created_at.strftime('%Y-%m-%d %H:%M:%S')}")

            print(f"Last activity: {session.last_activity.strftime('%Y-%m-%d %H:%M:%S')}")

        else:

            print("No active session")

        

        print(f"Email client: {self.email_client_type}")

        print(f"Connected: {'Yes' if self.email_manager.connected else 'No'}")

        

        print(f"LLM model: {os.path.basename(self.llm_model_path)}")

        print()

    

    def _show_accounts(self) -> None:

        try:

            accounts = self.email_client.get_accounts()

            

            if not accounts:

                print("No email accounts configured.")

                return

            

            print("\nConfigured Email Accounts:")

            print("=" * 40)

            

            for account in accounts:

                print(f"Account: {account.display_name}")

                print(f"  Email: {account.email_address}")

                print(f"  Type: {account.account_type}")

                print(f"  Enabled: {'Yes' if account.is_enabled else 'No'}")

                

                if account.folders:

                    print(f"  Folders: {len(account.folders)}")

                    for folder in account.folders[:5]:

                        print(f"    - {folder.name} ({folder.message_count} messages)")

                    if len(account.folders) > 5:

                        print(f"    ... and {len(account.folders) - 5} more")

                print()

                

        except Exception as e:

            print(f"Failed to retrieve account information: {e}")


def main():

    parser = argparse.ArgumentParser(

        description="Natural Language Email Assistant",

        formatter_class=argparse.RawDescriptionHelpFormatter,

        epilog="""

Examples:

  python email_assistant.py --model ./models/llama-7b.gguf

  python email_assistant.py --model ./models/llama-7b.gguf --client apple_mail

        """

    )

    

    parser.add_argument(

        '--model', '-m',

        required=True,

        help='Path to the local LLM model file'

    )

    

    parser.add_argument(

        '--client', '-c',

        default='apple_mail',

        choices=['apple_mail'],

        help='Email client type (default: apple_mail)'

    )

    

    parser.add_argument(

        '--debug',

        action='store_true',

        help='Enable debug output'

    )

    

    args = parser.parse_args()

    

    if not os.path.exists(args.model):

        print(f"Error: Model file not found: {args.model}")

        sys.exit(1)

    

    if args.debug:

        import logging

        logging.basicConfig(level=logging.DEBUG)

    

    try:

        assistant = EmailAssistant(

            llm_model_path=args.model,

            email_client_type=args.client

        )

        assistant.start()

        

    except KeyboardInterrupt:

        print("\nApplication interrupted by user.")

    except Exception as e:

        print(f"Application error: {e}")

        if args.debug:

            import traceback

            traceback.print_exc()

        sys.exit(1)


if __name__ == "__main__":

    main()



Conclusion and Future Considerations


This comprehensive implementation demonstrates the successful integration of natural language processing with email management systems through a well-architected, modular design. The system effectively addresses the primary challenges of interpreting human language, maintaining security, and providing a unified interface across different email clients.

The layered architecture ensures maintainability and extensibility, allowing for the addition of new email clients without modifying the core application logic. The security framework provides robust protection against unauthorized access and potentially destructive operations while maintaining usability through intelligent confirmation workflows.

The use of local LLM processing ensures privacy and data sovereignty, addressing concerns about sensitive email content being transmitted to external services. The fallback pattern matching system provides reliability when LLM processing encounters unexpected inputs or fails to generate confident interpretations.

Future enhancements could include support for additional email clients such as Microsoft Outlook, Thunderbird, or web-based services through API integration. Advanced natural language understanding could be achieved through fine-tuning the local LLM on email-specific datasets, improving accuracy for domain-specific terminology and command patterns.

The system architecture supports the integration of additional features such as email composition assistance, intelligent categorization, and automated response generation. The modular design facilitates these extensions without requiring fundamental changes to the existing codebase.

Performance optimization opportunities exist in caching frequently accessed email data, implementing incremental synchronization for large mailboxes, and optimizing the LLM inference pipeline for faster response times. The security framework could be enhanced with role-based access control, audit trail encryption, and integration with enterprise authentication systems.

This implementation serves as a foundation for building sophisticated email management tools that bridge the gap between human natural language expression and programmatic email operations, demonstrating the practical application of modern AI technologies in productivity software.

No comments: