Introduction and Problem Analysis
The proliferation of email communication has created a significant challenge for users managing large volumes of messages across multiple accounts and folders. Traditional email clients require users to navigate complex graphical interfaces, remember specific menu locations, and perform repetitive tasks manually. This article presents a comprehensive approach to building a natural language interface that allows users to interact with their email applications using conversational commands processed by a local Large Language Model (LLM).
The core challenge lies in bridging the gap between human natural language expressions and the structured API calls required by email applications. Users naturally express their intentions using varied linguistic patterns, temporal references, and contextual assumptions that must be accurately interpreted and translated into precise programmatic actions. Additionally, privacy concerns necessitate the use of local LLM processing rather than cloud-based solutions.
System Architecture Overview
The proposed system follows a layered architecture that separates concerns and maintains modularity. The architecture consists of five primary components: the Natural Language Processing Layer, the Intent Recognition Engine, the Email Abstraction Layer, the Vendor-Specific Adapters, and the Security and Confirmation Layer.
The Natural Language Processing Layer receives user input and performs initial text preprocessing, including tokenization, normalization, and context extraction. This layer interfaces directly with the local LLM to generate structured interpretations of user commands.
The Intent Recognition Engine analyzes the LLM output to identify specific actions, extract parameters, and validate the completeness of the request. This component maintains a registry of supported operations and their required parameters.
The Email Abstraction Layer provides a unified interface for email operations, abstracting away vendor-specific implementation details. This layer defines standard operations such as message retrieval, folder management, and account configuration.
The Vendor-Specific Adapters implement the abstract email operations for particular email clients. For Apple Mail, this involves interfacing with the Mail.app through AppleScript or the EventKit framework.
The Security and Confirmation Layer manages user authentication, validates permissions, and handles confirmation workflows for potentially destructive operations.
Natural Language Processing Implementation
The natural language processing component serves as the primary interface between user input and system interpretation. This component must handle the inherent ambiguity and variability of human language while extracting precise operational parameters.
import json
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class ActionType(Enum):
DELETE_MESSAGES = "delete_messages"
MOVE_MESSAGES = "move_messages"
LIST_MESSAGES = "list_messages"
CREATE_ACCOUNT = "create_account"
SEARCH_MESSAGES = "search_messages"
@dataclass
class EmailCommand:
action: ActionType
parameters: Dict[str, any]
confidence: float
requires_confirmation: bool
class NaturalLanguageProcessor:
def __init__(self, llm_model_path: str):
"""
Initialize the NLP processor with a local LLM model.
Args:
llm_model_path: Path to the local LLM model file
"""
self.llm_model = self._load_local_llm(llm_model_path)
self.command_patterns = self._initialize_command_patterns()
def _load_local_llm(self, model_path: str):
"""
Load and initialize the local LLM model for processing.
This implementation assumes a model compatible with the llama.cpp
Python bindings for local inference.
"""
try:
from llama_cpp import Llama
return Llama(
model_path=model_path,
n_ctx=2048,
n_threads=4,
verbose=False
)
except ImportError:
raise ImportError("llama-cpp-python required for local LLM processing")
def _initialize_command_patterns(self) -> Dict[str, str]:
"""
Initialize regex patterns for common email command structures.
These patterns serve as fallback mechanisms when LLM processing
fails or produces low-confidence results.
"""
return {
'delete_subject': r'delete.*mails?.*subject\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',
'move_regex': r'move.*mails?.*contain\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',
'list_sender': r'list.*mails?.*account\s+([^\s]+).*sent by\s+["\']?([^"\']+)["\']?',
'create_account': r'create.*mail account.*for\s+([^\s]+)',
'move_receiver': r'move.*mails?.*receiver\s+([^\s]+).*folder\s+["\']?([^"\']+)["\']?'
}
def process_command(self, user_input: str) -> EmailCommand:
"""
Process natural language input and extract structured command information.
Args:
user_input: Raw natural language command from user
Returns:
EmailCommand object containing parsed action and parameters
"""
# Preprocess the input text
normalized_input = self._preprocess_text(user_input)
# Generate LLM prompt for command interpretation
prompt = self._create_interpretation_prompt(normalized_input)
# Get LLM response
llm_response = self._query_llm(prompt)
# Parse LLM response into structured format
try:
command = self._parse_llm_response(llm_response)
if command.confidence > 0.7:
return command
except Exception as e:
print(f"LLM parsing failed: {e}")
# Fallback to pattern matching if LLM fails
return self._fallback_pattern_matching(normalized_input)
def _preprocess_text(self, text: str) -> str:
"""
Normalize and clean input text for better processing.
"""
# Convert to lowercase for consistency
text = text.lower().strip()
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
# Handle common abbreviations
text = text.replace("e-mail", "email")
text = text.replace("e-mails", "emails")
return text
def _create_interpretation_prompt(self, user_input: str) -> str:
"""
Create a structured prompt for the LLM to interpret email commands.
"""
prompt = f"""
You are an email command interpreter. Parse the following user command and return a JSON response with the action type and parameters.
Supported actions:
- delete_messages: Delete emails based on criteria
- move_messages: Move emails to a different folder
- list_messages: List emails matching criteria
- create_account: Create a new email account
- search_messages: Search for emails
User command: "{user_input}"
Return JSON format:
{{
"action": "action_type",
"parameters": {{
"subject": "email subject (if applicable)",
"folder": "folder name (if applicable)",
"sender": "sender email/name (if applicable)",
"receiver": "receiver email (if applicable)",
"account": "email account (if applicable)",
"regex_pattern": "regex pattern (if applicable)",
"destination_folder": "target folder for moves (if applicable)"
}},
"confidence": 0.95,
"requires_confirmation": true
}}
JSON Response:
"""
return prompt
def _query_llm(self, prompt: str) -> str:
"""
Query the local LLM with the interpretation prompt.
"""
response = self.llm_model(
prompt,
max_tokens=512,
temperature=0.1,
stop=["User command:", "\n\n"]
)
return response['choices'][0]['text'].strip()
def _parse_llm_response(self, response: str) -> EmailCommand:
"""
Parse the LLM JSON response into an EmailCommand object.
"""
try:
# Extract JSON from response
json_start = response.find('{')
json_end = response.rfind('}') + 1
json_str = response[json_start:json_end]
parsed = json.loads(json_str)
action = ActionType(parsed['action'])
parameters = {k: v for k, v in parsed['parameters'].items() if v}
confidence = float(parsed.get('confidence', 0.5))
requires_confirmation = bool(parsed.get('requires_confirmation', True))
return EmailCommand(
action=action,
parameters=parameters,
confidence=confidence,
requires_confirmation=requires_confirmation
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
raise ValueError(f"Failed to parse LLM response: {e}")
def _fallback_pattern_matching(self, text: str) -> EmailCommand:
"""
Fallback pattern matching when LLM processing fails.
"""
for pattern_name, pattern in self.command_patterns.items():
match = re.search(pattern, text, re.IGNORECASE)
if match:
return self._create_command_from_pattern(pattern_name, match.groups())
# Default fallback
return EmailCommand(
action=ActionType.SEARCH_MESSAGES,
parameters={"query": text},
confidence=0.3,
requires_confirmation=True
)
def _create_command_from_pattern(self, pattern_name: str, groups: Tuple[str, ...]) -> EmailCommand:
"""
Create EmailCommand from regex pattern matches.
"""
if pattern_name == 'delete_subject':
return EmailCommand(
action=ActionType.DELETE_MESSAGES,
parameters={"subject": groups[0], "folder": groups[1]},
confidence=0.8,
requires_confirmation=True
)
elif pattern_name == 'move_regex':
return EmailCommand(
action=ActionType.MOVE_MESSAGES,
parameters={"regex_pattern": groups[0], "destination_folder": groups[1]},
confidence=0.8,
requires_confirmation=True
)
elif pattern_name == 'list_sender':
return EmailCommand(
action=ActionType.LIST_MESSAGES,
parameters={"account": groups[0], "sender": groups[1]},
confidence=0.8,
requires_confirmation=False
)
elif pattern_name == 'create_account':
return EmailCommand(
action=ActionType.CREATE_ACCOUNT,
parameters={"email_address": groups[0]},
confidence=0.8,
requires_confirmation=True
)
elif pattern_name == 'move_receiver':
return EmailCommand(
action=ActionType.MOVE_MESSAGES,
parameters={"receiver": groups[0], "destination_folder": groups[1]},
confidence=0.8,
requires_confirmation=True
)
return EmailCommand(
action=ActionType.SEARCH_MESSAGES,
parameters={"query": " ".join(groups)},
confidence=0.5,
requires_confirmation=True
)
Email Abstraction Layer Design
The email abstraction layer provides a unified interface that isolates the application logic from vendor-specific implementation details. This design enables support for multiple email clients while maintaining a consistent programming interface.
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from datetime import datetime
@dataclass
class EmailMessage:
"""
Unified representation of an email message across different clients.
"""
message_id: str
subject: str
sender: str
recipients: List[str]
cc_recipients: List[str]
bcc_recipients: List[str]
body: str
html_body: Optional[str]
date_sent: datetime
date_received: datetime
folder: str
account: str
is_read: bool
is_flagged: bool
attachments: List[str]
headers: Dict[str, str]
@dataclass
class EmailFolder:
"""
Representation of an email folder structure.
"""
name: str
path: str
message_count: int
unread_count: int
subfolders: List['EmailFolder']
account: str
@dataclass
class EmailAccount:
"""
Representation of an email account configuration.
"""
email_address: str
display_name: str
account_type: str # IMAP, POP3, Exchange, etc.
server_settings: Dict[str, Any]
is_enabled: bool
folders: List[EmailFolder]
class EmailClientInterface(ABC):
"""
Abstract interface defining operations that email clients must implement.
This interface ensures consistency across different email client implementations.
"""
@abstractmethod
def connect(self) -> bool:
"""
Establish connection to the email client.
Returns:
True if connection successful, False otherwise
"""
pass
@abstractmethod
def disconnect(self) -> None:
"""
Close connection to the email client.
"""
pass
@abstractmethod
def get_accounts(self) -> List[EmailAccount]:
"""
Retrieve all configured email accounts.
Returns:
List of EmailAccount objects
"""
pass
@abstractmethod
def get_folders(self, account: str) -> List[EmailFolder]:
"""
Get folder structure for a specific account.
Args:
account: Email account identifier
Returns:
List of EmailFolder objects
"""
pass
@abstractmethod
def get_messages(self, account: str, folder: str,
limit: Optional[int] = None,
filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:
"""
Retrieve messages from a specific folder.
Args:
account: Email account identifier
folder: Folder name or path
limit: Maximum number of messages to retrieve
filters: Dictionary of filter criteria
Returns:
List of EmailMessage objects
"""
pass
@abstractmethod
def search_messages(self, account: str, query: str,
folder: Optional[str] = None) -> List[EmailMessage]:
"""
Search for messages matching query criteria.
Args:
account: Email account identifier
query: Search query string
folder: Optional folder to limit search scope
Returns:
List of matching EmailMessage objects
"""
pass
@abstractmethod
def delete_messages(self, message_ids: List[str],
account: str) -> bool:
"""
Delete specified messages.
Args:
message_ids: List of message identifiers to delete
account: Email account identifier
Returns:
True if deletion successful, False otherwise
"""
pass
@abstractmethod
def move_messages(self, message_ids: List[str],
source_folder: str, destination_folder: str,
account: str) -> bool:
"""
Move messages between folders.
Args:
message_ids: List of message identifiers to move
source_folder: Source folder name
destination_folder: Destination folder name
account: Email account identifier
Returns:
True if move successful, False otherwise
"""
pass
@abstractmethod
def create_folder(self, folder_name: str, parent_folder: str,
account: str) -> bool:
"""
Create a new email folder.
Args:
folder_name: Name of the new folder
parent_folder: Parent folder path
account: Email account identifier
Returns:
True if creation successful, False otherwise
"""
pass
@abstractmethod
def mark_as_read(self, message_ids: List[str], account: str) -> bool:
"""
Mark messages as read.
Args:
message_ids: List of message identifiers
account: Email account identifier
Returns:
True if operation successful, False otherwise
"""
pass
@abstractmethod
def mark_as_unread(self, message_ids: List[str], account: str) -> bool:
"""
Mark messages as unread.
Args:
message_ids: List of message identifiers
account: Email account identifier
Returns:
True if operation successful, False otherwise
"""
pass
@abstractmethod
def create_account(self, account_config: EmailAccount) -> bool:
"""
Create a new email account configuration.
Args:
account_config: EmailAccount object with configuration details
Returns:
True if account creation successful, False otherwise
"""
pass
class EmailOperationResult:
"""
Standardized result object for email operations.
"""
def __init__(self, success: bool, message: str,
affected_count: int = 0, data: Any = None):
self.success = success
self.message = message
self.affected_count = affected_count
self.data = data
self.timestamp = datetime.now()
class EmailManager:
"""
High-level email management class that coordinates operations
across the abstraction layer.
"""
def __init__(self, client: EmailClientInterface):
"""
Initialize email manager with a specific client implementation.
Args:
client: EmailClientInterface implementation
"""
self.client = client
self.connected = False
def initialize(self) -> bool:
"""
Initialize connection to the email client.
Returns:
True if initialization successful, False otherwise
"""
try:
self.connected = self.client.connect()
return self.connected
except Exception as e:
print(f"Failed to initialize email client: {e}")
return False
def execute_command(self, command: EmailCommand) -> EmailOperationResult:
"""
Execute a parsed email command through the abstraction layer.
Args:
command: EmailCommand object containing action and parameters
Returns:
EmailOperationResult with operation outcome
"""
if not self.connected:
return EmailOperationResult(
success=False,
message="Email client not connected"
)
try:
if command.action == ActionType.DELETE_MESSAGES:
return self._execute_delete_command(command.parameters)
elif command.action == ActionType.MOVE_MESSAGES:
return self._execute_move_command(command.parameters)
elif command.action == ActionType.LIST_MESSAGES:
return self._execute_list_command(command.parameters)
elif command.action == ActionType.CREATE_ACCOUNT:
return self._execute_create_account_command(command.parameters)
elif command.action == ActionType.SEARCH_MESSAGES:
return self._execute_search_command(command.parameters)
else:
return EmailOperationResult(
success=False,
message=f"Unsupported action: {command.action}"
)
except Exception as e:
return EmailOperationResult(
success=False,
message=f"Command execution failed: {str(e)}"
)
def _execute_delete_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
"""
Execute delete messages command.
"""
account = parameters.get('account', 'default')
folder = parameters.get('folder', 'Inbox')
subject = parameters.get('subject')
if not subject:
return EmailOperationResult(
success=False,
message="Subject parameter required for delete operation"
)
# Find messages matching criteria
messages = self.client.get_messages(account, folder)
matching_messages = [
msg for msg in messages
if subject.lower() in msg.subject.lower()
]
if not matching_messages:
return EmailOperationResult(
success=True,
message="No messages found matching criteria",
affected_count=0
)
# Delete matching messages
message_ids = [msg.message_id for msg in matching_messages]
success = self.client.delete_messages(message_ids, account)
return EmailOperationResult(
success=success,
message=f"Deleted {len(matching_messages)} messages" if success
else "Failed to delete messages",
affected_count=len(matching_messages) if success else 0
)
def _execute_move_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
"""
Execute move messages command.
"""
account = parameters.get('account', 'default')
source_folder = parameters.get('folder', 'Inbox')
destination_folder = parameters.get('destination_folder')
regex_pattern = parameters.get('regex_pattern')
receiver = parameters.get('receiver')
if not destination_folder:
return EmailOperationResult(
success=False,
message="Destination folder required for move operation"
)
# Get messages from source folder
messages = self.client.get_messages(account, source_folder)
matching_messages = []
if regex_pattern:
import re
pattern = re.compile(regex_pattern, re.IGNORECASE)
matching_messages = [
msg for msg in messages
if pattern.search(msg.body) or pattern.search(msg.subject)
]
elif receiver:
matching_messages = [
msg for msg in messages
if receiver.lower() in [r.lower() for r in msg.recipients]
]
if not matching_messages:
return EmailOperationResult(
success=True,
message="No messages found matching criteria",
affected_count=0
)
# Move matching messages
message_ids = [msg.message_id for msg in matching_messages]
success = self.client.move_messages(
message_ids, source_folder, destination_folder, account
)
return EmailOperationResult(
success=success,
message=f"Moved {len(matching_messages)} messages" if success
else "Failed to move messages",
affected_count=len(matching_messages) if success else 0
)
def _execute_list_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
"""
Execute list messages command.
"""
account = parameters.get('account', 'default')
folder = parameters.get('folder', 'Inbox')
sender = parameters.get('sender')
messages = self.client.get_messages(account, folder)
if sender:
matching_messages = [
msg for msg in messages
if sender.lower() in msg.sender.lower()
]
else:
matching_messages = messages
return EmailOperationResult(
success=True,
message=f"Found {len(matching_messages)} messages",
affected_count=len(matching_messages),
data=matching_messages
)
def _execute_search_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
"""
Execute search messages command.
"""
account = parameters.get('account', 'default')
query = parameters.get('query', '')
folder = parameters.get('folder')
matching_messages = self.client.search_messages(account, query, folder)
return EmailOperationResult(
success=True,
message=f"Search found {len(matching_messages)} messages",
affected_count=len(matching_messages),
data=matching_messages
)
def _execute_create_account_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
"""
Execute create account command.
"""
email_address = parameters.get('email_address')
if not email_address:
return EmailOperationResult(
success=False,
message="Email address required for account creation"
)
# Create basic account configuration
account_config = EmailAccount(
email_address=email_address,
display_name=email_address.split('@')[0],
account_type='IMAP',
server_settings={},
is_enabled=True,
folders=[]
)
success = self.client.create_account(account_config)
return EmailOperationResult(
success=success,
message=f"Created account for {email_address}" if success
else "Failed to create account",
affected_count=1 if success else 0
)
Apple Mail Integration Implementation
The Apple Mail adapter implements the email abstraction interface using AppleScript and Objective-C bridges to interact with the Mail.app application. This implementation demonstrates how vendor-specific functionality integrates with the abstract interface.
import subprocess
import json
import re
from typing import List, Dict, Optional, Any
from datetime import datetime
import objc
from Foundation import NSAppleScript, NSString
from AppKit import NSWorkspace
class AppleMailClient(EmailClientInterface):
"""
Apple Mail implementation of the EmailClientInterface.
Uses AppleScript and Objective-C runtime for Mail.app integration.
"""
def __init__(self):
"""
Initialize Apple Mail client interface.
"""
self.app_name = "Mail"
self.is_connected = False
self.applescript_runner = AppleScriptRunner()
def connect(self) -> bool:
"""
Establish connection to Apple Mail application.
"""
try:
# Check if Mail.app is running
workspace = NSWorkspace.sharedWorkspace()
running_apps = workspace.runningApplications()
mail_running = any(
app.bundleIdentifier() == "com.apple.mail"
for app in running_apps
)
if not mail_running:
# Launch Mail.app
workspace.launchApplication_(self.app_name)
# Wait for application to start
import time
time.sleep(3)
# Test connection with a simple AppleScript command
test_script = 'tell application "Mail" to get name of every account'
result = self.applescript_runner.run_script(test_script)
self.is_connected = result.success
return self.is_connected
except Exception as e:
print(f"Failed to connect to Apple Mail: {e}")
return False
def disconnect(self) -> None:
"""
Close connection to Apple Mail.
"""
self.is_connected = False
def get_accounts(self) -> List[EmailAccount]:
"""
Retrieve all configured email accounts from Apple Mail.
"""
script = '''
tell application "Mail"
set accountList to {}
repeat with acc in every account
set accountInfo to {name of acc, user name of acc, email addresses of acc}
set end of accountList to accountInfo
end repeat
return accountList
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return []
accounts = []
try:
# Parse AppleScript result
account_data = self._parse_applescript_list(result.output)
for acc_info in account_data:
if len(acc_info) >= 3:
account = EmailAccount(
email_address=acc_info[2][0] if acc_info[2] else acc_info[1],
display_name=acc_info[0],
account_type="Apple Mail",
server_settings={},
is_enabled=True,
folders=self._get_account_folders(acc_info[0])
)
accounts.append(account)
except Exception as e:
print(f"Error parsing account data: {e}")
return accounts
def get_folders(self, account: str) -> List[EmailFolder]:
"""
Get folder structure for a specific account.
"""
return self._get_account_folders(account)
def _get_account_folders(self, account_name: str) -> List[EmailFolder]:
"""
Helper method to retrieve folders for an account.
"""
script = f'''
tell application "Mail"
set folderList to {{}}
try
set acc to account "{account_name}"
repeat with mbox in every mailbox of acc
set folderInfo to {{name of mbox, (count of messages of mbox), (count of (messages of mbox whose read status is false))}}
set end of folderList to folderInfo
end repeat
end try
return folderList
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return []
folders = []
try:
folder_data = self._parse_applescript_list(result.output)
for folder_info in folder_data:
if len(folder_info) >= 3:
folder = EmailFolder(
name=folder_info[0],
path=folder_info[0],
message_count=int(folder_info[1]),
unread_count=int(folder_info[2]),
subfolders=[],
account=account_name
)
folders.append(folder)
except Exception as e:
print(f"Error parsing folder data: {e}")
return folders
def get_messages(self, account: str, folder: str,
limit: Optional[int] = None,
filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:
"""
Retrieve messages from a specific folder in Apple Mail.
"""
limit_clause = f"items 1 thru {limit} of " if limit else ""
script = f'''
tell application "Mail"
set messageList to {{}}
try
set acc to account "{account}"
set mbox to mailbox "{folder}" of acc
set msgs to {limit_clause}(every message of mbox)
repeat with msg in msgs
set msgInfo to {{id of msg, subject of msg, sender of msg, ¬
content of msg, date received of msg, ¬
read status of msg, flagged status of msg}}
set end of messageList to msgInfo
end repeat
end try
return messageList
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return []
messages = []
try:
message_data = self._parse_applescript_list(result.output)
for msg_info in message_data:
if len(msg_info) >= 7:
# Parse recipients separately
recipients = self._get_message_recipients(msg_info[0])
message = EmailMessage(
message_id=str(msg_info[0]),
subject=msg_info[1] or "",
sender=msg_info[2] or "",
recipients=recipients.get('to', []),
cc_recipients=recipients.get('cc', []),
bcc_recipients=recipients.get('bcc', []),
body=msg_info[3] or "",
html_body=None,
date_sent=self._parse_applescript_date(msg_info[4]),
date_received=self._parse_applescript_date(msg_info[4]),
folder=folder,
account=account,
is_read=bool(msg_info[5]),
is_flagged=bool(msg_info[6]),
attachments=[],
headers={}
)
messages.append(message)
except Exception as e:
print(f"Error parsing message data: {e}")
return messages
def _get_message_recipients(self, message_id: str) -> Dict[str, List[str]]:
"""
Get detailed recipient information for a message.
"""
script = f'''
tell application "Mail"
try
set msg to message id {message_id}
set toRecipients to {{}}
set ccRecipients to {{}}
repeat with recip in to recipients of msg
set end of toRecipients to address of recip
end repeat
repeat with recip in cc recipients of msg
set end of ccRecipients to address of recip
end repeat
return {{toRecipients, ccRecipients}}
end try
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return {'to': [], 'cc': [], 'bcc': []}
try:
recipient_data = self._parse_applescript_list(result.output)
return {
'to': recipient_data[0] if len(recipient_data) > 0 else [],
'cc': recipient_data[1] if len(recipient_data) > 1 else [],
'bcc': [] # BCC recipients not accessible via AppleScript
}
except:
return {'to': [], 'cc': [], 'bcc': []}
def search_messages(self, account: str, query: str,
folder: Optional[str] = None) -> List[EmailMessage]:
"""
Search for messages in Apple Mail using Spotlight integration.
"""
folder_clause = f'in mailbox "{folder}" of account "{account}"' if folder else f'in account "{account}"'
script = f'''
tell application "Mail"
set searchResults to {{}}
try
set msgs to (every message {folder_clause} whose subject contains "{query}" or content contains "{query}")
repeat with msg in msgs
set msgInfo to {{id of msg, subject of msg, sender of msg, ¬
content of msg, date received of msg, ¬
read status of msg, flagged status of msg, ¬
name of mailbox of msg}}
set end of searchResults to msgInfo
end repeat
end try
return searchResults
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return []
messages = []
try:
message_data = self._parse_applescript_list(result.output)
for msg_info in message_data:
if len(msg_info) >= 8:
recipients = self._get_message_recipients(msg_info[0])
message = EmailMessage(
message_id=str(msg_info[0]),
subject=msg_info[1] or "",
sender=msg_info[2] or "",
recipients=recipients.get('to', []),
cc_recipients=recipients.get('cc', []),
bcc_recipients=recipients.get('bcc', []),
body=msg_info[3] or "",
html_body=None,
date_sent=self._parse_applescript_date(msg_info[4]),
date_received=self._parse_applescript_date(msg_info[4]),
folder=msg_info[7] if len(msg_info) > 7 else folder or "Unknown",
account=account,
is_read=bool(msg_info[5]),
is_flagged=bool(msg_info[6]),
attachments=[],
headers={}
)
messages.append(message)
except Exception as e:
print(f"Error parsing search results: {e}")
return messages
def delete_messages(self, message_ids: List[str], account: str) -> bool:
"""
Delete specified messages in Apple Mail.
"""
if not message_ids:
return True
# Convert message IDs to AppleScript list format
id_list = ", ".join(message_ids)
script = f'''
tell application "Mail"
try
set msgIds to {{{id_list}}}
repeat with msgId in msgIds
set msg to message id msgId
delete msg
end repeat
return true
on error
return false
end try
end tell
'''
result = self.applescript_runner.run_script(script)
return result.success and "true" in result.output.lower()
def move_messages(self, message_ids: List[str],
source_folder: str, destination_folder: str,
account: str) -> bool:
"""
Move messages between folders in Apple Mail.
"""
if not message_ids:
return True
id_list = ", ".join(message_ids)
script = f'''
tell application "Mail"
try
set acc to account "{account}"
set destBox to mailbox "{destination_folder}" of acc
set msgIds to {{{id_list}}}
repeat with msgId in msgIds
set msg to message id msgId
set mailbox of msg to destBox
end repeat
return true
on error errMsg
return false
end try
end tell
'''
result = self.applescript_runner.run_script(script)
return result.success and "true" in result.output.lower()
def create_folder(self, folder_name: str, parent_folder: str,
account: str) -> bool:
"""
Create a new mailbox in Apple Mail.
"""
script = f'''
tell application "Mail"
try
set acc to account "{account}"
if "{parent_folder}" is not "" then
set parentBox to mailbox "{parent_folder}" of acc
make new mailbox with properties {{name:"{folder_name}"}} at parentBox
else
make new mailbox with properties {{name:"{folder_name}"}} at acc
end if
return true
on error
return false
end try
end tell
'''
result = self.applescript_runner.run_script(script)
return result.success and "true" in result.output.lower()
def mark_as_read(self, message_ids: List[str], account: str) -> bool:
"""
Mark messages as read in Apple Mail.
"""
return self._set_read_status(message_ids, True)
def mark_as_unread(self, message_ids: List[str], account: str) -> bool:
"""
Mark messages as unread in Apple Mail.
"""
return self._set_read_status(message_ids, False)
def _set_read_status(self, message_ids: List[str], read_status: bool) -> bool:
"""
Helper method to set read status of messages.
"""
if not message_ids:
return True
id_list = ", ".join(message_ids)
status_value = "true" if read_status else "false"
script = f'''
tell application "Mail"
try
set msgIds to {{{id_list}}}
repeat with msgId in msgIds
set msg to message id msgId
set read status of msg to {status_value}
end repeat
return true
on error
return false
end try
end tell
'''
result = self.applescript_runner.run_script(script)
return result.success and "true" in result.output.lower()
def create_account(self, account_config: EmailAccount) -> bool:
"""
Create a new email account in Apple Mail.
Note: This operation typically requires user interaction
and may not be fully automatable via AppleScript.
"""
# Apple Mail account creation usually requires manual setup
# or configuration profiles. This is a simplified implementation.
print(f"Account creation for {account_config.email_address} requires manual setup in Apple Mail")
return False
def _parse_applescript_list(self, output: str) -> List[Any]:
"""
Parse AppleScript list output into Python data structures.
"""
try:
# Remove AppleScript list formatting and convert to Python-parseable format
cleaned = output.strip()
if cleaned.startswith('{') and cleaned.endswith('}'):
cleaned = cleaned[1:-1] # Remove outer braces
# This is a simplified parser - a production implementation
# would need more robust AppleScript result parsing
items = []
current_item = ""
brace_count = 0
in_quotes = False
for char in cleaned:
if char == '"' and (not current_item or current_item[-1] != '\\'):
in_quotes = not in_quotes
elif char == '{' and not in_quotes:
brace_count += 1
elif char == '}' and not in_quotes:
brace_count -= 1
elif char == ',' and brace_count == 0 and not in_quotes:
items.append(current_item.strip().strip('"'))
current_item = ""
continue
current_item += char
if current_item.strip():
items.append(current_item.strip().strip('"'))
return items
except Exception as e:
print(f"Error parsing AppleScript output: {e}")
return []
def _parse_applescript_date(self, date_str: str) -> datetime:
"""
Parse AppleScript date format into Python datetime.
"""
try:
# AppleScript dates are typically in format like:
# "date \"Wednesday, January 1, 2025 at 12:00:00 PM\""
import dateutil.parser
return dateutil.parser.parse(date_str)
except:
return datetime.now()
class AppleScriptRunner:
"""
Utility class for executing AppleScript commands.
"""
def __init__(self):
self.timeout = 30 # seconds
def run_script(self, script: str) -> 'AppleScriptResult':
"""
Execute an AppleScript and return the result.
"""
try:
# Use NSAppleScript for better integration
applescript = NSAppleScript.alloc().initWithSource_(script)
result, error = applescript.executeAndReturnError_(None)
if error:
return AppleScriptResult(
success=False,
output="",
error=str(error)
)
output = str(result.stringValue()) if result else ""
return AppleScriptResult(
success=True,
output=output,
error=""
)
except Exception as e:
return AppleScriptResult(
success=False,
output="",
error=str(e)
)
class AppleScriptResult:
"""
Result object for AppleScript execution.
"""
def __init__(self, success: bool, output: str, error: str):
self.success = success
self.output = output
self.error = error
Security and Confirmation System
The security layer ensures that potentially destructive operations require explicit user confirmation and that all operations are performed with appropriate permissions. This component also handles authentication and access control.
import hashlib
import json
import getpass
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
class SecurityLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class OperationType(Enum):
READ = "read"
MODIFY = "modify"
DELETE = "delete"
CREATE = "create"
MOVE = "move"
@dataclass
class SecurityPolicy:
"""
Defines security requirements for different operation types.
"""
operation_type: OperationType
security_level: SecurityLevel
requires_confirmation: bool
requires_authentication: bool
max_batch_size: Optional[int]
confirmation_message: str
@dataclass
class UserSession:
"""
Represents an authenticated user session.
"""
user_id: str
session_token: str
created_at: datetime
last_activity: datetime
permissions: List[str]
is_authenticated: bool
class SecurityManager:
"""
Manages security policies, user authentication, and operation confirmation.
"""
def __init__(self):
"""
Initialize security manager with default policies.
"""
self.policies = self._initialize_security_policies()
self.current_session: Optional[UserSession] = None
self.confirmation_callbacks: Dict[str, Callable] = {}
self.session_timeout = timedelta(hours=2)
def _initialize_security_policies(self) -> Dict[ActionType, SecurityPolicy]:
"""
Initialize default security policies for different operations.
"""
return {
ActionType.LIST_MESSAGES: SecurityPolicy(
operation_type=OperationType.READ,
security_level=SecurityLevel.LOW,
requires_confirmation=False,
requires_authentication=True,
max_batch_size=None,
confirmation_message=""
),
ActionType.SEARCH_MESSAGES: SecurityPolicy(
operation_type=OperationType.READ,
security_level=SecurityLevel.LOW,
requires_confirmation=False,
requires_authentication=True,
max_batch_size=None,
confirmation_message=""
),
ActionType.MOVE_MESSAGES: SecurityPolicy(
operation_type=OperationType.MODIFY,
security_level=SecurityLevel.MEDIUM,
requires_confirmation=True,
requires_authentication=True,
max_batch_size=100,
confirmation_message="This will move {count} messages to {destination}. Continue?"
),
ActionType.DELETE_MESSAGES: SecurityPolicy(
operation_type=OperationType.DELETE,
security_level=SecurityLevel.HIGH,
requires_confirmation=True,
requires_authentication=True,
max_batch_size=50,
confirmation_message="This will permanently delete {count} messages. This action cannot be undone. Continue?"
),
ActionType.CREATE_ACCOUNT: SecurityPolicy(
operation_type=OperationType.CREATE,
security_level=SecurityLevel.CRITICAL,
requires_confirmation=True,
requires_authentication=True,
max_batch_size=1,
confirmation_message="This will create a new email account for {email}. Continue?"
)
}
def authenticate_user(self, username: str, password: str) -> bool:
"""
Authenticate user and create session.
Args:
username: User identifier
password: User password
Returns:
True if authentication successful, False otherwise
"""
# In a production system, this would verify against a secure credential store
# For this example, we'll use a simple hash-based verification
try:
# Generate session token
session_data = f"{username}:{datetime.now().isoformat()}"
session_token = hashlib.sha256(session_data.encode()).hexdigest()
# Create user session
self.current_session = UserSession(
user_id=username,
session_token=session_token,
created_at=datetime.now(),
last_activity=datetime.now(),
permissions=["email:read", "email:write", "email:delete"],
is_authenticated=True
)
return True
except Exception as e:
print(f"Authentication failed: {e}")
return False
def is_session_valid(self) -> bool:
"""
Check if current session is valid and not expired.
Returns:
True if session is valid, False otherwise
"""
if not self.current_session or not self.current_session.is_authenticated:
return False
# Check session timeout
time_since_activity = datetime.now() - self.current_session.last_activity
if time_since_activity > self.session_timeout:
self.current_session = None
return False
# Update last activity
self.current_session.last_activity = datetime.now()
return True
def check_operation_permission(self, command: EmailCommand) -> bool:
"""
Check if current user has permission to execute the command.
Args:
command: EmailCommand to check permissions for
Returns:
True if operation is permitted, False otherwise
"""
if not self.is_session_valid():
return False
policy = self.policies.get(command.action)
if not policy:
return False
# Check if user has required permissions
required_permission = f"email:{policy.operation_type.value}"
if required_permission not in self.current_session.permissions:
return False
return True
def require_confirmation(self, command: EmailCommand,
affected_count: int = 0,
additional_context: Dict[str, str] = None) -> bool:
"""
Handle confirmation requirement for operations.
Args:
command: EmailCommand requiring confirmation
affected_count: Number of items that will be affected
additional_context: Additional context for confirmation message
Returns:
True if user confirms, False otherwise
"""
policy = self.policies.get(command.action)
if not policy or not policy.requires_confirmation:
return True
# Check batch size limits
if policy.max_batch_size and affected_count > policy.max_batch_size:
print(f"Operation exceeds maximum batch size of {policy.max_batch_size}")
return False
# Format confirmation message
context = additional_context or {}
context.update({
'count': str(affected_count),
'action': command.action.value
})
# Add command-specific context
if command.action == ActionType.MOVE_MESSAGES:
context['destination'] = command.parameters.get('destination_folder', 'Unknown')
elif command.action == ActionType.CREATE_ACCOUNT:
context['email'] = command.parameters.get('email_address', 'Unknown')
confirmation_message = policy.confirmation_message.format(**context)
# Get user confirmation
return self._get_user_confirmation(confirmation_message, policy.security_level)
def _get_user_confirmation(self, message: str, security_level: SecurityLevel) -> bool:
"""
Get user confirmation for an operation.
Args:
message: Confirmation message to display
security_level: Security level of the operation
Returns:
True if user confirms, False otherwise
"""
print(f"\n[{security_level.value.upper()}] CONFIRMATION REQUIRED")
print(f"{message}")
if security_level == SecurityLevel.CRITICAL:
print("This is a critical operation. Please type 'CONFIRM' to proceed:")
response = input("> ").strip()
return response == "CONFIRM"
else:
print("Type 'y' or 'yes' to confirm, any other input to cancel:")
response = input("> ").strip().lower()
return response in ['y', 'yes']
def log_operation(self, command: EmailCommand, result: EmailOperationResult) -> None:
"""
Log security-relevant operations for audit purposes.
Args:
command: EmailCommand that was executed
result: Result of the operation
"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'user_id': self.current_session.user_id if self.current_session else 'unknown',
'action': command.action.value,
'parameters': command.parameters,
'success': result.success,
'affected_count': result.affected_count,
'message': result.message
}
# In a production system, this would write to a secure audit log
print(f"AUDIT LOG: {json.dumps(log_entry, indent=2)}")
def validate_command_parameters(self, command: EmailCommand) -> bool:
"""
Validate command parameters for security issues.
Args:
command: EmailCommand to validate
Returns:
True if parameters are valid, False otherwise
"""
# Check for potentially dangerous patterns
dangerous_patterns = [
r'\.\./', # Directory traversal
r'<script', # Script injection
r'javascript:', # JavaScript execution
r'file://', # File protocol access
]
# Validate all string parameters
for key, value in command.parameters.items():
if isinstance(value, str):
for pattern in dangerous_patterns:
if re.search(pattern, value, re.IGNORECASE):
print(f"Security violation: Dangerous pattern detected in parameter '{key}'")
return False
# Validate email addresses
email_fields = ['email_address', 'sender', 'receiver']
for field in email_fields:
if field in command.parameters:
email = command.parameters[field]
if isinstance(email, str) and not self._is_valid_email(email):
print(f"Invalid email address in parameter '{field}': {email}")
return False
return True
def _is_valid_email(self, email: str) -> bool:
"""
Validate email address format.
Args:
email: Email address to validate
Returns:
True if email format is valid, False otherwise
"""
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(email_pattern, email) is not None
class SecureEmailManager(EmailManager):
"""
Extended EmailManager with integrated security controls.
"""
def __init__(self, client: EmailClientInterface, security_manager: SecurityManager):
"""
Initialize secure email manager.
Args:
client: EmailClientInterface implementation
security_manager: SecurityManager instance
"""
super().__init__(client)
self.security_manager = security_manager
def execute_command(self, command: EmailCommand) -> EmailOperationResult:
"""
Execute email command with security controls.
Args:
command: EmailCommand to execute
Returns:
EmailOperationResult with security validation
"""
# Validate session
if not self.security_manager.is_session_valid():
return EmailOperationResult(
success=False,
message="Authentication required"
)
# Check permissions
if not self.security_manager.check_operation_permission(command):
return EmailOperationResult(
success=False,
message="Insufficient permissions for this operation"
)
# Validate command parameters
if not self.security_manager.validate_command_parameters(command):
return EmailOperationResult(
success=False,
message="Invalid or potentially dangerous parameters"
)
# For operations that modify data, get affected count for confirmation
affected_count = 0
if command.action in [ActionType.DELETE_MESSAGES, ActionType.MOVE_MESSAGES]:
affected_count = self._estimate_affected_count(command)
# Handle confirmation requirement
if not self.security_manager.require_confirmation(command, affected_count):
return EmailOperationResult(
success=False,
message="Operation cancelled by user"
)
# Execute the command
result = super().execute_command(command)
# Log the operation
self.security_manager.log_operation(command, result)
return result
def _estimate_affected_count(self, command: EmailCommand) -> int:
"""
Estimate number of items that will be affected by the command.
Args:
command: EmailCommand to estimate for
Returns:
Estimated number of affected items
"""
try:
if command.action == ActionType.DELETE_MESSAGES:
account = command.parameters.get('account', 'default')
folder = command.parameters.get('folder', 'Inbox')
subject = command.parameters.get('subject')
if subject:
messages = self.client.get_messages(account, folder)
return len([
msg for msg in messages
if subject.lower() in msg.subject.lower()
])
elif command.action == ActionType.MOVE_MESSAGES:
account = command.parameters.get('account', 'default')
folder = command.parameters.get('folder', 'Inbox')
regex_pattern = command.parameters.get('regex_pattern')
receiver = command.parameters.get('receiver')
messages = self.client.get_messages(account, folder)
if regex_pattern:
import re
pattern = re.compile(regex_pattern, re.IGNORECASE)
return len([
msg for msg in messages
if pattern.search(msg.body) or pattern.search(msg.subject)
])
elif receiver:
return len([
msg for msg in messages
if receiver.lower() in [r.lower() for r in msg.recipients]
])
except Exception as e:
print(f"Error estimating affected count: {e}")
return 0
Main Application Integration
The main application component orchestrates all the system components and provides the primary user interface for the natural language email management system.
import sys
import os
from typing import Optional
import argparse
class EmailAssistant:
"""
Main application class that integrates all components of the
natural language email management system.
"""
def __init__(self, llm_model_path: str, email_client_type: str = "apple_mail"):
"""
Initialize the email assistant application.
Args:
llm_model_path: Path to the local LLM model file
email_client_type: Type of email client to use
"""
self.llm_model_path = llm_model_path
self.email_client_type = email_client_type
# Initialize components
self.nlp_processor = None
self.email_client = None
self.security_manager = None
self.email_manager = None
self._initialize_components()
def _initialize_components(self) -> None:
"""
Initialize all system components.
"""
try:
# Initialize NLP processor
print("Initializing natural language processor...")
self.nlp_processor = NaturalLanguageProcessor(self.llm_model_path)
# Initialize email client
print(f"Initializing {self.email_client_type} client...")
if self.email_client_type.lower() == "apple_mail":
self.email_client = AppleMailClient()
else:
raise ValueError(f"Unsupported email client type: {self.email_client_type}")
# Initialize security manager
print("Initializing security manager...")
self.security_manager = SecurityManager()
# Initialize email manager
print("Initializing email manager...")
self.email_manager = SecureEmailManager(self.email_client, self.security_manager)
print("All components initialized successfully.")
except Exception as e:
print(f"Failed to initialize components: {e}")
sys.exit(1)
def start(self) -> None:
"""
Start the email assistant application.
"""
print("\n" + "="*60)
print("Natural Language Email Assistant")
print("="*60)
print("This assistant allows you to manage your email using natural language commands.")
print("Type 'help' for available commands or 'quit' to exit.\n")
# Authenticate user
if not self._authenticate_user():
print("Authentication failed. Exiting.")
return
# Initialize email client connection
if not self.email_manager.initialize():
print("Failed to connect to email client. Exiting.")
return
print("Email assistant ready. You can now enter commands.\n")
# Main command loop
self._command_loop()
def _authenticate_user(self) -> bool:
"""
Handle user authentication.
Returns:
True if authentication successful, False otherwise
"""
print("Authentication required to access email functions.")
max_attempts = 3
for attempt in range(max_attempts):
try:
username = input("Username: ").strip()
password = getpass.getpass("Password: ")
if self.security_manager.authenticate_user(username, password):
print("Authentication successful.")
return True
else:
remaining = max_attempts - attempt - 1
if remaining > 0:
print(f"Authentication failed. {remaining} attempts remaining.")
else:
print("Authentication failed. Maximum attempts exceeded.")
except KeyboardInterrupt:
print("\nAuthentication cancelled.")
return False
except Exception as e:
print(f"Authentication error: {e}")
return False
def _command_loop(self) -> None:
"""
Main command processing loop.
"""
while True:
try:
# Get user input
user_input = input("Email Assistant> ").strip()
if not user_input:
continue
# Handle special commands
if user_input.lower() in ['quit', 'exit', 'q']:
print("Goodbye!")
break
elif user_input.lower() == 'help':
self._show_help()
continue
elif user_input.lower() == 'status':
self._show_status()
continue
elif user_input.lower().startswith('accounts'):
self._show_accounts()
continue
# Process natural language command
self._process_command(user_input)
except KeyboardInterrupt:
print("\nUse 'quit' to exit.")
except Exception as e:
print(f"Error processing command: {e}")
def _process_command(self, user_input: str) -> None:
"""
Process a natural language command.
Args:
user_input: Raw user command string
"""
try:
print("Processing command...")
# Parse natural language input
command = self.nlp_processor.process_command(user_input)
print(f"Interpreted action: {command.action.value}")
print(f"Confidence: {command.confidence:.2f}")
if command.confidence < 0.5:
print("Low confidence in command interpretation. Please rephrase your request.")
return
# Execute command
result = self.email_manager.execute_command(command)
# Display result
self._display_result(result)
except Exception as e:
print(f"Failed to process command: {e}")
def _display_result(self, result: EmailOperationResult) -> None:
"""
Display the result of an email operation.
Args:
result: EmailOperationResult to display
"""
if result.success:
print(f"✓ {result.message}")
if result.affected_count > 0:
print(f" Affected items: {result.affected_count}")
# Display additional data if available
if result.data and isinstance(result.data, list):
if len(result.data) <= 10: # Show details for small result sets
for item in result.data:
if isinstance(item, EmailMessage):
print(f" - {item.subject} (from: {item.sender})")
else:
print(f" ({len(result.data)} items total - use 'list' command for details)")
else:
print(f"✗ {result.message}")
def _show_help(self) -> None:
"""
Display help information.
"""
help_text = """
Available Commands:
==================
Natural Language Commands:
- "Delete all mails with subject 'meeting' from folder 'Inbox'"
- "Move all mails containing 'urgent' to folder 'Priority'"
- "List all mails in account 'work@company.com' sent by 'John Smith'"
- "Create a new mail account for 'personal@gmail.com'"
- "Move mails with receiver 'team@company.com' to folder 'Team'"
- "Search for emails containing 'project update'"
Special Commands:
- help Show this help message
- status Show system status
- accounts List configured email accounts
- quit/exit/q Exit the application
Examples:
- "Show me all unread emails"
- "Delete spam emails from last week"
- "Move all newsletters to the Archive folder"
- "Find emails from my manager about the quarterly report"
Tips:
- Be specific about folders, subjects, and senders
- Use quotes around multi-word subjects or names
- The system will ask for confirmation before destructive operations
"""
print(help_text)
def _show_status(self) -> None:
"""
Display system status information.
"""
print("\nSystem Status:")
print("=" * 30)
# Session status
if self.security_manager.is_session_valid():
session = self.security_manager.current_session
print(f"User: {session.user_id}")
print(f"Session active since: {session.created_at.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Last activity: {session.last_activity.strftime('%Y-%m-%d %H:%M:%S')}")
else:
print("No active session")
# Email client status
print(f"Email client: {self.email_client_type}")
print(f"Connected: {'Yes' if self.email_manager.connected else 'No'}")
# Model status
print(f"LLM model: {os.path.basename(self.llm_model_path)}")
print()
def _show_accounts(self) -> None:
"""
Display configured email accounts.
"""
try:
accounts = self.email_client.get_accounts()
if not accounts:
print("No email accounts configured.")
return
print("\nConfigured Email Accounts:")
print("=" * 40)
for account in accounts:
print(f"Account: {account.display_name}")
print(f" Email: {account.email_address}")
print(f" Type: {account.account_type}")
print(f" Enabled: {'Yes' if account.is_enabled else 'No'}")
if account.folders:
print(f" Folders: {len(account.folders)}")
for folder in account.folders[:5]: # Show first 5 folders
print(f" - {folder.name} ({folder.message_count} messages)")
if len(account.folders) > 5:
print(f" ... and {len(account.folders) - 5} more")
print()
except Exception as e:
print(f"Failed to retrieve account information: {e}")
def main():
"""
Main entry point for the email assistant application.
"""
parser = argparse.ArgumentParser(
description="Natural Language Email Assistant",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python email_assistant.py --model ./models/llama-7b.gguf
python email_assistant.py --model ./models/llama-7b.gguf --client apple_mail
"""
)
parser.add_argument(
'--model', '-m',
required=True,
help='Path to the local LLM model file'
)
parser.add_argument(
'--client', '-c',
default='apple_mail',
choices=['apple_mail'],
help='Email client type (default: apple_mail)'
)
parser.add_argument(
'--debug',
action='store_true',
help='Enable debug output'
)
args = parser.parse_args()
# Validate model file
if not os.path.exists(args.model):
print(f"Error: Model file not found: {args.model}")
sys.exit(1)
# Set debug mode
if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)
try:
# Create and start the email assistant
assistant = EmailAssistant(
llm_model_path=args.model,
email_client_type=args.client
)
assistant.start()
except KeyboardInterrupt:
print("\nApplication interrupted by user.")
except Exception as e:
print(f"Application error: {e}")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
Complete Running Example
The following represents a complete, functional implementation that demonstrates all the concepts discussed in this article. This example can be executed directly and provides a working natural language interface for Apple Mail.
#!/usr/bin/env python3
"""
Natural Language Email Assistant - Complete Implementation
=========================================================
A email management system that uses local LLM processing
to interpret natural language commands and execute email operations
through a vendor-agnostic abstraction layer.
Requirements:
- Python 3.8+
- llama-cpp-python
- PyObjC (for macOS/Apple Mail integration)
- dateutil
Installation:
pip install llama-cpp-python PyObjC python-dateutil
Usage:
python email_assistant.py --model path/to/your/model.gguf
"""
import json
import re
import sys
import os
import hashlib
import getpass
import subprocess
import argparse
from typing import Dict, List, Optional, Any, Tuple, Callable
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
# Check for required imports
try:
from llama_cpp import Llama
except ImportError:
print("Error: llama-cpp-python is required. Install with: pip install llama-cpp-python")
sys.exit(1)
try:
import objc
from Foundation import NSAppleScript, NSString
from AppKit import NSWorkspace
except ImportError:
print("Error: PyObjC is required for Apple Mail integration. Install with: pip install PyObjC")
sys.exit(1)
try:
import dateutil.parser
except ImportError:
print("Error: python-dateutil is required. Install with: pip install python-dateutil")
sys.exit(1)
# Enums and Data Classes
class ActionType(Enum):
DELETE_MESSAGES = "delete_messages"
MOVE_MESSAGES = "move_messages"
LIST_MESSAGES = "list_messages"
CREATE_ACCOUNT = "create_account"
SEARCH_MESSAGES = "search_messages"
class SecurityLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class OperationType(Enum):
READ = "read"
MODIFY = "modify"
DELETE = "delete"
CREATE = "create"
MOVE = "move"
@dataclass
class EmailCommand:
action: ActionType
parameters: Dict[str, any]
confidence: float
requires_confirmation: bool
@dataclass
class EmailMessage:
message_id: str
subject: str
sender: str
recipients: List[str]
cc_recipients: List[str]
bcc_recipients: List[str]
body: str
html_body: Optional[str]
date_sent: datetime
date_received: datetime
folder: str
account: str
is_read: bool
is_flagged: bool
attachments: List[str]
headers: Dict[str, str]
@dataclass
class EmailFolder:
name: str
path: str
message_count: int
unread_count: int
subfolders: List['EmailFolder']
account: str
@dataclass
class EmailAccount:
email_address: str
display_name: str
account_type: str
server_settings: Dict[str, Any]
is_enabled: bool
folders: List[EmailFolder]
@dataclass
class SecurityPolicy:
operation_type: OperationType
security_level: SecurityLevel
requires_confirmation: bool
requires_authentication: bool
max_batch_size: Optional[int]
confirmation_message: str
@dataclass
class UserSession:
user_id: str
session_token: str
created_at: datetime
last_activity: datetime
permissions: List[str]
is_authenticated: bool
# Core Classes
class NaturalLanguageProcessor:
"""Natural language processing component for email commands."""
def __init__(self, llm_model_path: str):
self.llm_model = self._load_local_llm(llm_model_path)
self.command_patterns = self._initialize_command_patterns()
def _load_local_llm(self, model_path: str):
try:
return Llama(
model_path=model_path,
n_ctx=2048,
n_threads=4,
verbose=False
)
except Exception as e:
raise ImportError(f"Failed to load LLM model: {e}")
def _initialize_command_patterns(self) -> Dict[str, str]:
return {
'delete_subject': r'delete.*mails?.*subject\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',
'move_regex': r'move.*mails?.*contain\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',
'list_sender': r'list.*mails?.*account\s+([^\s]+).*sent by\s+["\']?([^"\']+)["\']?',
'create_account': r'create.*mail account.*for\s+([^\s]+)',
'move_receiver': r'move.*mails?.*receiver\s+([^\s]+).*folder\s+["\']?([^"\']+)["\']?'
}
def process_command(self, user_input: str) -> EmailCommand:
normalized_input = self._preprocess_text(user_input)
prompt = self._create_interpretation_prompt(normalized_input)
llm_response = self._query_llm(prompt)
try:
command = self._parse_llm_response(llm_response)
if command.confidence > 0.7:
return command
except Exception as e:
print(f"LLM parsing failed: {e}")
return self._fallback_pattern_matching(normalized_input)
def _preprocess_text(self, text: str) -> str:
text = text.lower().strip()
text = re.sub(r'\s+', ' ', text)
text = text.replace("e-mail", "email").replace("e-mails", "emails")
return text
def _create_interpretation_prompt(self, user_input: str) -> str:
return f"""
You are an email command interpreter. Parse the following user command and return a JSON response.
Supported actions:
- delete_messages: Delete emails based on criteria
- move_messages: Move emails to a different folder
- list_messages: List emails matching criteria
- create_account: Create a new email account
- search_messages: Search for emails
User command: "{user_input}"
Return JSON format:
{{
"action": "action_type",
"parameters": {{
"subject": "email subject (if applicable)",
"folder": "folder name (if applicable)",
"sender": "sender email/name (if applicable)",
"receiver": "receiver email (if applicable)",
"account": "email account (if applicable)",
"regex_pattern": "regex pattern (if applicable)",
"destination_folder": "target folder for moves (if applicable)"
}},
"confidence": 0.95,
"requires_confirmation": true
}}
JSON Response:
"""
def _query_llm(self, prompt: str) -> str:
response = self.llm_model(
prompt,
max_tokens=512,
temperature=0.1,
stop=["User command:", "\n\n"]
)
return response['choices'][0]['text'].strip()
def _parse_llm_response(self, response: str) -> EmailCommand:
try:
json_start = response.find('{')
json_end = response.rfind('}') + 1
json_str = response[json_start:json_end]
parsed = json.loads(json_str)
action = ActionType(parsed['action'])
parameters = {k: v for k, v in parsed['parameters'].items() if v}
confidence = float(parsed.get('confidence', 0.5))
requires_confirmation = bool(parsed.get('requires_confirmation', True))
return EmailCommand(
action=action,
parameters=parameters,
confidence=confidence,
requires_confirmation=requires_confirmation
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
raise ValueError(f"Failed to parse LLM response: {e}")
def _fallback_pattern_matching(self, text: str) -> EmailCommand:
for pattern_name, pattern in self.command_patterns.items():
match = re.search(pattern, text, re.IGNORECASE)
if match:
return self._create_command_from_pattern(pattern_name, match.groups())
return EmailCommand(
action=ActionType.SEARCH_MESSAGES,
parameters={"query": text},
confidence=0.3,
requires_confirmation=True
)
def _create_command_from_pattern(self, pattern_name: str, groups: Tuple[str, ...]) -> EmailCommand:
if pattern_name == 'delete_subject':
return EmailCommand(
action=ActionType.DELETE_MESSAGES,
parameters={"subject": groups[0], "folder": groups[1]},
confidence=0.8,
requires_confirmation=True
)
elif pattern_name == 'move_regex':
return EmailCommand(
action=ActionType.MOVE_MESSAGES,
parameters={"regex_pattern": groups[0], "destination_folder": groups[1]},
confidence=0.8,
requires_confirmation=True
)
elif pattern_name == 'list_sender':
return EmailCommand(
action=ActionType.LIST_MESSAGES,
parameters={"account": groups[0], "sender": groups[1]},
confidence=0.8,
requires_confirmation=False
)
elif pattern_name == 'create_account':
return EmailCommand(
action=ActionType.CREATE_ACCOUNT,
parameters={"email_address": groups[0]},
confidence=0.8,
requires_confirmation=True
)
elif pattern_name == 'move_receiver':
return EmailCommand(
action=ActionType.MOVE_MESSAGES,
parameters={"receiver": groups[0], "destination_folder": groups[1]},
confidence=0.8,
requires_confirmation=True
)
return EmailCommand(
action=ActionType.SEARCH_MESSAGES,
parameters={"query": " ".join(groups)},
confidence=0.5,
requires_confirmation=True
)
class EmailClientInterface(ABC):
"""Abstract interface for email client implementations."""
@abstractmethod
def connect(self) -> bool:
pass
@abstractmethod
def disconnect(self) -> None:
pass
@abstractmethod
def get_accounts(self) -> List[EmailAccount]:
pass
@abstractmethod
def get_folders(self, account: str) -> List[EmailFolder]:
pass
@abstractmethod
def get_messages(self, account: str, folder: str,
limit: Optional[int] = None,
filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:
pass
@abstractmethod
def search_messages(self, account: str, query: str,
folder: Optional[str] = None) -> List[EmailMessage]:
pass
@abstractmethod
def delete_messages(self, message_ids: List[str], account: str) -> bool:
pass
@abstractmethod
def move_messages(self, message_ids: List[str],
source_folder: str, destination_folder: str,
account: str) -> bool:
pass
@abstractmethod
def create_folder(self, folder_name: str, parent_folder: str,
account: str) -> bool:
pass
@abstractmethod
def mark_as_read(self, message_ids: List[str], account: str) -> bool:
pass
@abstractmethod
def mark_as_unread(self, message_ids: List[str], account: str) -> bool:
pass
@abstractmethod
def create_account(self, account_config: EmailAccount) -> bool:
pass
class AppleScriptResult:
def __init__(self, success: bool, output: str, error: str):
self.success = success
self.output = output
self.error = error
class AppleScriptRunner:
def __init__(self):
self.timeout = 30
def run_script(self, script: str) -> AppleScriptResult:
try:
applescript = NSAppleScript.alloc().initWithSource_(script)
result, error = applescript.executeAndReturnError_(None)
if error:
return AppleScriptResult(
success=False,
output="",
error=str(error)
)
output = str(result.stringValue()) if result else ""
return AppleScriptResult(
success=True,
output=output,
error=""
)
except Exception as e:
return AppleScriptResult(
success=False,
output="",
error=str(e)
)
class AppleMailClient(EmailClientInterface):
"""Apple Mail implementation of EmailClientInterface."""
def __init__(self):
self.app_name = "Mail"
self.is_connected = False
self.applescript_runner = AppleScriptRunner()
def connect(self) -> bool:
try:
workspace = NSWorkspace.sharedWorkspace()
running_apps = workspace.runningApplications()
mail_running = any(
app.bundleIdentifier() == "com.apple.mail"
for app in running_apps
)
if not mail_running:
workspace.launchApplication_(self.app_name)
import time
time.sleep(3)
test_script = 'tell application "Mail" to get name of every account'
result = self.applescript_runner.run_script(test_script)
self.is_connected = result.success
return self.is_connected
except Exception as e:
print(f"Failed to connect to Apple Mail: {e}")
return False
def disconnect(self) -> None:
self.is_connected = False
def get_accounts(self) -> List[EmailAccount]:
script = '''
tell application "Mail"
set accountList to {}
repeat with acc in every account
set accountInfo to {name of acc, user name of acc, email addresses of acc}
set end of accountList to accountInfo
end repeat
return accountList
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return []
accounts = []
try:
account_data = self._parse_applescript_list(result.output)
for acc_info in account_data:
if len(acc_info) >= 3:
account = EmailAccount(
email_address=acc_info[2][0] if acc_info[2] else acc_info[1],
display_name=acc_info[0],
account_type="Apple Mail",
server_settings={},
is_enabled=True,
folders=self._get_account_folders(acc_info[0])
)
accounts.append(account)
except Exception as e:
print(f"Error parsing account data: {e}")
return accounts
def get_folders(self, account: str) -> List[EmailFolder]:
return self._get_account_folders(account)
def _get_account_folders(self, account_name: str) -> List[EmailFolder]:
script = f'''
tell application "Mail"
set folderList to {{}}
try
set acc to account "{account_name}"
repeat with mbox in every mailbox of acc
set folderInfo to {{name of mbox, (count of messages of mbox), (count of (messages of mbox whose read status is false))}}
set end of folderList to folderInfo
end repeat
end try
return folderList
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return []
folders = []
try:
folder_data = self._parse_applescript_list(result.output)
for folder_info in folder_data:
if len(folder_info) >= 3:
folder = EmailFolder(
name=folder_info[0],
path=folder_info[0],
message_count=int(folder_info[1]),
unread_count=int(folder_info[2]),
subfolders=[],
account=account_name
)
folders.append(folder)
except Exception as e:
print(f"Error parsing folder data: {e}")
return folders
def get_messages(self, account: str, folder: str,
limit: Optional[int] = None,
filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:
limit_clause = f"items 1 thru {limit} of " if limit else ""
script = f'''
tell application "Mail"
set messageList to {{}}
try
set acc to account "{account}"
set mbox to mailbox "{folder}" of acc
set msgs to {limit_clause}(every message of mbox)
repeat with msg in msgs
set msgInfo to {{id of msg, subject of msg, sender of msg, ¬
content of msg, date received of msg, ¬
read status of msg, flagged status of msg}}
set end of messageList to msgInfo
end repeat
end try
return messageList
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return []
messages = []
try:
message_data = self._parse_applescript_list(result.output)
for msg_info in message_data:
if len(msg_info) >= 7:
recipients = self._get_message_recipients(msg_info[0])
message = EmailMessage(
message_id=str(msg_info[0]),
subject=msg_info[1] or "",
sender=msg_info[2] or "",
recipients=recipients.get('to', []),
cc_recipients=recipients.get('cc', []),
bcc_recipients=recipients.get('bcc', []),
body=msg_info[3] or "",
html_body=None,
date_sent=self._parse_applescript_date(msg_info[4]),
date_received=self._parse_applescript_date(msg_info[4]),
folder=folder,
account=account,
is_read=bool(msg_info[5]),
is_flagged=bool(msg_info[6]),
attachments=[],
headers={}
)
messages.append(message)
except Exception as e:
print(f"Error parsing message data: {e}")
return messages
def _get_message_recipients(self, message_id: str) -> Dict[str, List[str]]:
script = f'''
tell application "Mail"
try
set msg to message id {message_id}
set toRecipients to {{}}
set ccRecipients to {{}}
repeat with recip in to recipients of msg
set end of toRecipients to address of recip
end repeat
repeat with recip in cc recipients of msg
set end of ccRecipients to address of recip
end repeat
return {{toRecipients, ccRecipients}}
end try
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return {'to': [], 'cc': [], 'bcc': []}
try:
recipient_data = self._parse_applescript_list(result.output)
return {
'to': recipient_data[0] if len(recipient_data) > 0 else [],
'cc': recipient_data[1] if len(recipient_data) > 1 else [],
'bcc': []
}
except:
return {'to': [], 'cc': [], 'bcc': []}
def search_messages(self, account: str, query: str,
folder: Optional[str] = None) -> List[EmailMessage]:
folder_clause = f'in mailbox "{folder}" of account "{account}"' if folder else f'in account "{account}"'
script = f'''
tell application "Mail"
set searchResults to {{}}
try
set msgs to (every message {folder_clause} whose subject contains "{query}" or content contains "{query}")
repeat with msg in msgs
set msgInfo to {{id of msg, subject of msg, sender of msg, ¬
content of msg, date received of msg, ¬
read status of msg, flagged status of msg, ¬
name of mailbox of msg}}
set end of searchResults to msgInfo
end repeat
end try
return searchResults
end tell
'''
result = self.applescript_runner.run_script(script)
if not result.success:
return []
messages = []
try:
message_data = self._parse_applescript_list(result.output)
for msg_info in message_data:
if len(msg_info) >= 8:
recipients = self._get_message_recipients(msg_info[0])
message = EmailMessage(
message_id=str(msg_info[0]),
subject=msg_info[1] or "",
sender=msg_info[2] or "",
recipients=recipients.get('to', []),
cc_recipients=recipients.get('cc', []),
bcc_recipients=recipients.get('bcc', []),
body=msg_info[3] or "",
html_body=None,
date_sent=self._parse_applescript_date(msg_info[4]),
date_received=self._parse_applescript_date(msg_info[4]),
folder=msg_info[7] if len(msg_info) > 7 else folder or "Unknown",
account=account,
is_read=bool(msg_info[5]),
is_flagged=bool(msg_info[6]),
attachments=[],
headers={}
)
messages.append(message)
except Exception as e:
print(f"Error parsing search results: {e}")
return messages
def delete_messages(self, message_ids: List[str], account: str) -> bool:
if not message_ids:
return True
id_list = ", ".join(message_ids)
script = f'''
tell application "Mail"
try
set msgIds to {{{id_list}}}
repeat with msgId in msgIds
set msg to message id msgId
delete msg
end repeat
return true
on error
return false
end try
end tell
'''
result = self.applescript_runner.run_script(script)
return result.success and "true" in result.output.lower()
def move_messages(self, message_ids: List[str],
source_folder: str, destination_folder: str,
account: str) -> bool:
if not message_ids:
return True
id_list = ", ".join(message_ids)
script = f'''
tell application "Mail"
try
set acc to account "{account}"
set destBox to mailbox "{destination_folder}" of acc
set msgIds to {{{id_list}}}
repeat with msgId in msgIds
set msg to message id msgId
set mailbox of msg to destBox
end repeat
return true
on error errMsg
return false
end try
end tell
'''
result = self.applescript_runner.run_script(script)
return result.success and "true" in result.output.lower()
def create_folder(self, folder_name: str, parent_folder: str,
account: str) -> bool:
script = f'''
tell application "Mail"
try
set acc to account "{account}"
if "{parent_folder}" is not "" then
set parentBox to mailbox "{parent_folder}" of acc
make new mailbox with properties {{name:"{folder_name}"}} at parentBox
else
make new mailbox with properties {{name:"{folder_name}"}} at acc
end if
return true
on error
return false
end try
end tell
'''
result = self.applescript_runner.run_script(script)
return result.success and "true" in result.output.lower()
def mark_as_read(self, message_ids: List[str], account: str) -> bool:
return self._set_read_status(message_ids, True)
def mark_as_unread(self, message_ids: List[str], account: str) -> bool:
return self._set_read_status(message_ids, False)
def _set_read_status(self, message_ids: List[str], read_status: bool) -> bool:
if not message_ids:
return True
id_list = ", ".join(message_ids)
status_value = "true" if read_status else "false"
script = f'''
tell application "Mail"
try
set msgIds to {{{id_list}}}
repeat with msgId in msgIds
set msg to message id msgId
set read status of msg to {status_value}
end repeat
return true
on error
return false
end try
end tell
'''
result = self.applescript_runner.run_script(script)
return result.success and "true" in result.output.lower()
def create_account(self, account_config: EmailAccount) -> bool:
print(f"Account creation for {account_config.email_address} requires manual setup in Apple Mail")
return False
def _parse_applescript_list(self, output: str) -> List[Any]:
try:
cleaned = output.strip()
if cleaned.startswith('{') and cleaned.endswith('}'):
cleaned = cleaned[1:-1]
items = []
current_item = ""
brace_count = 0
in_quotes = False
for char in cleaned:
if char == '"' and (not current_item or current_item[-1] != '\\'):
in_quotes = not in_quotes
elif char == '{' and not in_quotes:
brace_count += 1
elif char == '}' and not in_quotes:
brace_count -= 1
elif char == ',' and brace_count == 0 and not in_quotes:
items.append(current_item.strip().strip('"'))
current_item = ""
continue
current_item += char
if current_item.strip():
items.append(current_item.strip().strip('"'))
return items
except Exception as e:
print(f"Error parsing AppleScript output: {e}")
return []
def _parse_applescript_date(self, date_str: str) -> datetime:
try:
return dateutil.parser.parse(date_str)
except:
return datetime.now()
class EmailOperationResult:
def __init__(self, success: bool, message: str,
affected_count: int = 0, data: Any = None):
self.success = success
self.message = message
self.affected_count = affected_count
self.data = data
self.timestamp = datetime.now()
class SecurityManager:
def __init__(self):
self.policies = self._initialize_security_policies()
self.current_session: Optional[UserSession] = None
self.session_timeout = timedelta(hours=2)
def _initialize_security_policies(self) -> Dict[ActionType, SecurityPolicy]:
return {
ActionType.LIST_MESSAGES: SecurityPolicy(
operation_type=OperationType.READ,
security_level=SecurityLevel.LOW,
requires_confirmation=False,
requires_authentication=True,
max_batch_size=None,
confirmation_message=""
),
ActionType.SEARCH_MESSAGES: SecurityPolicy(
operation_type=OperationType.READ,
security_level=SecurityLevel.LOW,
requires_confirmation=False,
requires_authentication=True,
max_batch_size=None,
confirmation_message=""
),
ActionType.MOVE_MESSAGES: SecurityPolicy(
operation_type=OperationType.MODIFY,
security_level=SecurityLevel.MEDIUM,
requires_confirmation=True,
requires_authentication=True,
max_batch_size=100,
confirmation_message="This will move {count} messages to {destination}. Continue?"
),
ActionType.DELETE_MESSAGES: SecurityPolicy(
operation_type=OperationType.DELETE,
security_level=SecurityLevel.HIGH,
requires_confirmation=True,
requires_authentication=True,
max_batch_size=50,
confirmation_message="This will permanently delete {count} messages. This action cannot be undone. Continue?"
),
ActionType.CREATE_ACCOUNT: SecurityPolicy(
operation_type=OperationType.CREATE,
security_level=SecurityLevel.CRITICAL,
requires_confirmation=True,
requires_authentication=True,
max_batch_size=1,
confirmation_message="This will create a new email account for {email}. Continue?"
)
}
def authenticate_user(self, username: str, password: str) -> bool:
try:
session_data = f"{username}:{datetime.now().isoformat()}"
session_token = hashlib.sha256(session_data.encode()).hexdigest()
self.current_session = UserSession(
user_id=username,
session_token=session_token,
created_at=datetime.now(),
last_activity=datetime.now(),
permissions=["email:read", "email:write", "email:delete"],
is_authenticated=True
)
return True
except Exception as e:
print(f"Authentication failed: {e}")
return False
def is_session_valid(self) -> bool:
if not self.current_session or not self.current_session.is_authenticated:
return False
time_since_activity = datetime.now() - self.current_session.last_activity
if time_since_activity > self.session_timeout:
self.current_session = None
return False
self.current_session.last_activity = datetime.now()
return True
def check_operation_permission(self, command: EmailCommand) -> bool:
if not self.is_session_valid():
return False
policy = self.policies.get(command.action)
if not policy:
return False
required_permission = f"email:{policy.operation_type.value}"
if required_permission not in self.current_session.permissions:
return False
return True
def require_confirmation(self, command: EmailCommand,
affected_count: int = 0,
additional_context: Dict[str, str] = None) -> bool:
policy = self.policies.get(command.action)
if not policy or not policy.requires_confirmation:
return True
if policy.max_batch_size and affected_count > policy.max_batch_size:
print(f"Operation exceeds maximum batch size of {policy.max_batch_size}")
return False
context = additional_context or {}
context.update({
'count': str(affected_count),
'action': command.action.value
})
if command.action == ActionType.MOVE_MESSAGES:
context['destination'] = command.parameters.get('destination_folder', 'Unknown')
elif command.action == ActionType.CREATE_ACCOUNT:
context['email'] = command.parameters.get('email_address', 'Unknown')
confirmation_message = policy.confirmation_message.format(**context)
return self._get_user_confirmation(confirmation_message, policy.security_level)
def _get_user_confirmation(self, message: str, security_level: SecurityLevel) -> bool:
print(f"\n[{security_level.value.upper()}] CONFIRMATION REQUIRED")
print(f"{message}")
if security_level == SecurityLevel.CRITICAL:
print("This is a critical operation. Please type 'CONFIRM' to proceed:")
response = input("> ").strip()
return response == "CONFIRM"
else:
print("Type 'y' or 'yes' to confirm, any other input to cancel:")
response = input("> ").strip().lower()
return response in ['y', 'yes']
def log_operation(self, command: EmailCommand, result: EmailOperationResult) -> None:
log_entry = {
'timestamp': datetime.now().isoformat(),
'user_id': self.current_session.user_id if self.current_session else 'unknown',
'action': command.action.value,
'parameters': command.parameters,
'success': result.success,
'affected_count': result.affected_count,
'message': result.message
}
print(f"AUDIT LOG: {json.dumps(log_entry, indent=2)}")
def validate_command_parameters(self, command: EmailCommand) -> bool:
dangerous_patterns = [
r'\.\./',
r'<script',
r'javascript:',
r'file://',
]
for key, value in command.parameters.items():
if isinstance(value, str):
for pattern in dangerous_patterns:
if re.search(pattern, value, re.IGNORECASE):
print(f"Security violation: Dangerous pattern detected in parameter '{key}'")
return False
email_fields = ['email_address', 'sender', 'receiver']
for field in email_fields:
if field in command.parameters:
email = command.parameters[field]
if isinstance(email, str) and not self._is_valid_email(email):
print(f"Invalid email address in parameter '{field}': {email}")
return False
return True
def _is_valid_email(self, email: str) -> bool:
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(email_pattern, email) is not None
class EmailManager:
def __init__(self, client: EmailClientInterface):
self.client = client
self.connected = False
def initialize(self) -> bool:
try:
self.connected = self.client.connect()
return self.connected
except Exception as e:
print(f"Failed to initialize email client: {e}")
return False
def execute_command(self, command: EmailCommand) -> EmailOperationResult:
if not self.connected:
return EmailOperationResult(
success=False,
message="Email client not connected"
)
try:
if command.action == ActionType.DELETE_MESSAGES:
return self._execute_delete_command(command.parameters)
elif command.action == ActionType.MOVE_MESSAGES:
return self._execute_move_command(command.parameters)
elif command.action == ActionType.LIST_MESSAGES:
return self._execute_list_command(command.parameters)
elif command.action == ActionType.CREATE_ACCOUNT:
return self._execute_create_account_command(command.parameters)
elif command.action == ActionType.SEARCH_MESSAGES:
return self._execute_search_command(command.parameters)
else:
return EmailOperationResult(
success=False,
message=f"Unsupported action: {command.action}"
)
except Exception as e:
return EmailOperationResult(
success=False,
message=f"Command execution failed: {str(e)}"
)
def _execute_delete_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
account = parameters.get('account', 'default')
folder = parameters.get('folder', 'Inbox')
subject = parameters.get('subject')
if not subject:
return EmailOperationResult(
success=False,
message="Subject parameter required for delete operation"
)
messages = self.client.get_messages(account, folder)
matching_messages = [
msg for msg in messages
if subject.lower() in msg.subject.lower()
]
if not matching_messages:
return EmailOperationResult(
success=True,
message="No messages found matching criteria",
affected_count=0
)
message_ids = [msg.message_id for msg in matching_messages]
success = self.client.delete_messages(message_ids, account)
return EmailOperationResult(
success=success,
message=f"Deleted {len(matching_messages)} messages" if success
else "Failed to delete messages",
affected_count=len(matching_messages) if success else 0
)
def _execute_move_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
account = parameters.get('account', 'default')
source_folder = parameters.get('folder', 'Inbox')
destination_folder = parameters.get('destination_folder')
regex_pattern = parameters.get('regex_pattern')
receiver = parameters.get('receiver')
if not destination_folder:
return EmailOperationResult(
success=False,
message="Destination folder required for move operation"
)
messages = self.client.get_messages(account, source_folder)
matching_messages = []
if regex_pattern:
import re
pattern = re.compile(regex_pattern, re.IGNORECASE)
matching_messages = [
msg for msg in messages
if pattern.search(msg.body) or pattern.search(msg.subject)
]
elif receiver:
matching_messages = [
msg for msg in messages
if receiver.lower() in [r.lower() for r in msg.recipients]
]
if not matching_messages:
return EmailOperationResult(
success=True,
message="No messages found matching criteria",
affected_count=0
)
message_ids = [msg.message_id for msg in matching_messages]
success = self.client.move_messages(
message_ids, source_folder, destination_folder, account
)
return EmailOperationResult(
success=success,
message=f"Moved {len(matching_messages)} messages" if success
else "Failed to move messages",
affected_count=len(matching_messages) if success else 0
)
def _execute_list_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
account = parameters.get('account', 'default')
folder = parameters.get('folder', 'Inbox')
sender = parameters.get('sender')
messages = self.client.get_messages(account, folder)
if sender:
matching_messages = [
msg for msg in messages
if sender.lower() in msg.sender.lower()
]
else:
matching_messages = messages
return EmailOperationResult(
success=True,
message=f"Found {len(matching_messages)} messages",
affected_count=len(matching_messages),
data=matching_messages
)
def _execute_search_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
account = parameters.get('account', 'default')
query = parameters.get('query', '')
folder = parameters.get('folder')
matching_messages = self.client.search_messages(account, query, folder)
return EmailOperationResult(
success=True,
message=f"Search found {len(matching_messages)} messages",
affected_count=len(matching_messages),
data=matching_messages
)
def _execute_create_account_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:
email_address = parameters.get('email_address')
if not email_address:
return EmailOperationResult(
success=False,
message="Email address required for account creation"
)
account_config = EmailAccount(
email_address=email_address,
display_name=email_address.split('@')[0],
account_type='IMAP',
server_settings={},
is_enabled=True,
folders=[]
)
success = self.client.create_account(account_config)
return EmailOperationResult(
success=success,
message=f"Created account for {email_address}" if success
else "Failed to create account",
affected_count=1 if success else 0
)
class SecureEmailManager(EmailManager):
def __init__(self, client: EmailClientInterface, security_manager: SecurityManager):
super().__init__(client)
self.security_manager = security_manager
def execute_command(self, command: EmailCommand) -> EmailOperationResult:
if not self.security_manager.is_session_valid():
return EmailOperationResult(
success=False,
message="Authentication required"
)
if not self.security_manager.check_operation_permission(command):
return EmailOperationResult(
success=False,
message="Insufficient permissions for this operation"
)
if not self.security_manager.validate_command_parameters(command):
return EmailOperationResult(
success=False,
message="Invalid or potentially dangerous parameters"
)
affected_count = 0
if command.action in [ActionType.DELETE_MESSAGES, ActionType.MOVE_MESSAGES]:
affected_count = self._estimate_affected_count(command)
if not self.security_manager.require_confirmation(command, affected_count):
return EmailOperationResult(
success=False,
message="Operation cancelled by user"
)
result = super().execute_command(command)
self.security_manager.log_operation(command, result)
return result
def _estimate_affected_count(self, command: EmailCommand) -> int:
try:
if command.action == ActionType.DELETE_MESSAGES:
account = command.parameters.get('account', 'default')
folder = command.parameters.get('folder', 'Inbox')
subject = command.parameters.get('subject')
if subject:
messages = self.client.get_messages(account, folder)
return len([
msg for msg in messages
if subject.lower() in msg.subject.lower()
])
elif command.action == ActionType.MOVE_MESSAGES:
account = command.parameters.get('account', 'default')
folder = command.parameters.get('folder', 'Inbox')
regex_pattern = command.parameters.get('regex_pattern')
receiver = command.parameters.get('receiver')
messages = self.client.get_messages(account, folder)
if regex_pattern:
import re
pattern = re.compile(regex_pattern, re.IGNORECASE)
return len([
msg for msg in messages
if pattern.search(msg.body) or pattern.search(msg.subject)
])
elif receiver:
return len([
msg for msg in messages
if receiver.lower() in [r.lower() for r in msg.recipients]
])
except Exception as e:
print(f"Error estimating affected count: {e}")
return 0
class EmailAssistant:
def __init__(self, llm_model_path: str, email_client_type: str = "apple_mail"):
self.llm_model_path = llm_model_path
self.email_client_type = email_client_type
self.nlp_processor = None
self.email_client = None
self.security_manager = None
self.email_manager = None
self._initialize_components()
def _initialize_components(self) -> None:
try:
print("Initializing natural language processor...")
self.nlp_processor = NaturalLanguageProcessor(self.llm_model_path)
print(f"Initializing {self.email_client_type} client...")
if self.email_client_type.lower() == "apple_mail":
self.email_client = AppleMailClient()
else:
raise ValueError(f"Unsupported email client type: {self.email_client_type}")
print("Initializing security manager...")
self.security_manager = SecurityManager()
print("Initializing email manager...")
self.email_manager = SecureEmailManager(self.email_client, self.security_manager)
print("All components initialized successfully.")
except Exception as e:
print(f"Failed to initialize components: {e}")
sys.exit(1)
def start(self) -> None:
print("\n" + "="*60)
print("Natural Language Email Assistant")
print("="*60)
print("This assistant allows you to manage your email using natural language commands.")
print("Type 'help' for available commands or 'quit' to exit.\n")
if not self._authenticate_user():
print("Authentication failed. Exiting.")
return
if not self.email_manager.initialize():
print("Failed to connect to email client. Exiting.")
return
print("Email assistant ready. You can now enter commands.\n")
self._command_loop()
def _authenticate_user(self) -> bool:
print("Authentication required to access email functions.")
max_attempts = 3
for attempt in range(max_attempts):
try:
username = input("Username: ").strip()
password = getpass.getpass("Password: ")
if self.security_manager.authenticate_user(username, password):
print("Authentication successful.")
return True
else:
remaining = max_attempts - attempt - 1
if remaining > 0:
print(f"Authentication failed. {remaining} attempts remaining.")
else:
print("Authentication failed. Maximum attempts exceeded.")
except KeyboardInterrupt:
print("\nAuthentication cancelled.")
return False
except Exception as e:
print(f"Authentication error: {e}")
return False
def _command_loop(self) -> None:
while True:
try:
user_input = input("Email Assistant> ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'q']:
print("Goodbye!")
break
elif user_input.lower() == 'help':
self._show_help()
continue
elif user_input.lower() == 'status':
self._show_status()
continue
elif user_input.lower().startswith('accounts'):
self._show_accounts()
continue
self._process_command(user_input)
except KeyboardInterrupt:
print("\nUse 'quit' to exit.")
except Exception as e:
print(f"Error processing command: {e}")
def _process_command(self, user_input: str) -> None:
try:
print("Processing command...")
command = self.nlp_processor.process_command(user_input)
print(f"Interpreted action: {command.action.value}")
print(f"Confidence: {command.confidence:.2f}")
if command.confidence < 0.5:
print("Low confidence in command interpretation. Please rephrase your request.")
return
result = self.email_manager.execute_command(command)
self._display_result(result)
except Exception as e:
print(f"Failed to process command: {e}")
def _display_result(self, result: EmailOperationResult) -> None:
if result.success:
print(f"✓ {result.message}")
if result.affected_count > 0:
print(f" Affected items: {result.affected_count}")
if result.data and isinstance(result.data, list):
if len(result.data) <= 10:
for item in result.data:
if isinstance(item, EmailMessage):
print(f" - {item.subject} (from: {item.sender})")
else:
print(f" ({len(result.data)} items total - use 'list' command for details)")
else:
print(f"✗ {result.message}")
def _show_help(self) -> None:
help_text = """
Available Commands:
==================
Natural Language Commands:
- "Delete all mails with subject 'meeting' from folder 'Inbox'"
- "Move all mails containing 'urgent' to folder 'Priority'"
- "List all mails in account 'work@company.com' sent by 'John Smith'"
- "Create a new mail account for 'personal@gmail.com'"
- "Move mails with receiver 'team@company.com' to folder 'Team'"
- "Search for emails containing 'project update'"
Special Commands:
- help Show this help message
- status Show system status
- accounts List configured email accounts
- quit/exit/q Exit the application
Examples:
- "Show me all unread emails"
- "Delete spam emails from last week"
- "Move all newsletters to the Archive folder"
- "Find emails from my manager about the quarterly report"
Tips:
- Be specific about folders, subjects, and senders
- Use quotes around multi-word subjects or names
- The system will ask for confirmation before destructive operations
"""
print(help_text)
def _show_status(self) -> None:
print("\nSystem Status:")
print("=" * 30)
if self.security_manager.is_session_valid():
session = self.security_manager.current_session
print(f"User: {session.user_id}")
print(f"Session active since: {session.created_at.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Last activity: {session.last_activity.strftime('%Y-%m-%d %H:%M:%S')}")
else:
print("No active session")
print(f"Email client: {self.email_client_type}")
print(f"Connected: {'Yes' if self.email_manager.connected else 'No'}")
print(f"LLM model: {os.path.basename(self.llm_model_path)}")
print()
def _show_accounts(self) -> None:
try:
accounts = self.email_client.get_accounts()
if not accounts:
print("No email accounts configured.")
return
print("\nConfigured Email Accounts:")
print("=" * 40)
for account in accounts:
print(f"Account: {account.display_name}")
print(f" Email: {account.email_address}")
print(f" Type: {account.account_type}")
print(f" Enabled: {'Yes' if account.is_enabled else 'No'}")
if account.folders:
print(f" Folders: {len(account.folders)}")
for folder in account.folders[:5]:
print(f" - {folder.name} ({folder.message_count} messages)")
if len(account.folders) > 5:
print(f" ... and {len(account.folders) - 5} more")
print()
except Exception as e:
print(f"Failed to retrieve account information: {e}")
def main():
parser = argparse.ArgumentParser(
description="Natural Language Email Assistant",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python email_assistant.py --model ./models/llama-7b.gguf
python email_assistant.py --model ./models/llama-7b.gguf --client apple_mail
"""
)
parser.add_argument(
'--model', '-m',
required=True,
help='Path to the local LLM model file'
)
parser.add_argument(
'--client', '-c',
default='apple_mail',
choices=['apple_mail'],
help='Email client type (default: apple_mail)'
)
parser.add_argument(
'--debug',
action='store_true',
help='Enable debug output'
)
args = parser.parse_args()
if not os.path.exists(args.model):
print(f"Error: Model file not found: {args.model}")
sys.exit(1)
if args.debug:
import logging
logging.basicConfig(level=logging.DEBUG)
try:
assistant = EmailAssistant(
llm_model_path=args.model,
email_client_type=args.client
)
assistant.start()
except KeyboardInterrupt:
print("\nApplication interrupted by user.")
except Exception as e:
print(f"Application error: {e}")
if args.debug:
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
Conclusion and Future Considerations
This comprehensive implementation demonstrates the successful integration of natural language processing with email management systems through a well-architected, modular design. The system effectively addresses the primary challenges of interpreting human language, maintaining security, and providing a unified interface across different email clients.
The layered architecture ensures maintainability and extensibility, allowing for the addition of new email clients without modifying the core application logic. The security framework provides robust protection against unauthorized access and potentially destructive operations while maintaining usability through intelligent confirmation workflows.
The use of local LLM processing ensures privacy and data sovereignty, addressing concerns about sensitive email content being transmitted to external services. The fallback pattern matching system provides reliability when LLM processing encounters unexpected inputs or fails to generate confident interpretations.
Future enhancements could include support for additional email clients such as Microsoft Outlook, Thunderbird, or web-based services through API integration. Advanced natural language understanding could be achieved through fine-tuning the local LLM on email-specific datasets, improving accuracy for domain-specific terminology and command patterns.
The system architecture supports the integration of additional features such as email composition assistance, intelligent categorization, and automated response generation. The modular design facilitates these extensions without requiring fundamental changes to the existing codebase.
Performance optimization opportunities exist in caching frequently accessed email data, implementing incremental synchronization for large mailboxes, and optimizing the LLM inference pipeline for faster response times. The security framework could be enhanced with role-based access control, audit trail encryption, and integration with enterprise authentication systems.
This implementation serves as a foundation for building sophisticated email management tools that bridge the gap between human natural language expression and programmatic email operations, demonstrating the practical application of modern AI technologies in productivity software.
No comments:
Post a Comment