Monday, December 15, 2025

CAPABILITY-CENTRIC ARCHITECTURE: A STEP-BY-STEP TUTORIAL FOR SOFTWARE ENGINEERS


Note: I used Antrophic Claude 4.5 Sonnet to generate the code examples in this document. Before, I feeded it with an article about Capability-Centric Architecture


INTRODUCTION: WHY YOU NEED A NEW ARCHITECTURAL APPROACH

Welcome to the world of Capability-Centric Architecture, a revolutionary architectural pattern that solves a problem you might not even know you have yet. If you have ever struggled with building systems that need to be both flexible and performant, that must evolve rapidly while maintaining stability, or that span the spectrum from embedded devices to cloud platforms, then this tutorial is for you.

Traditional architectural patterns force us into uncomfortable choices. Layered architectures work well for simple applications but become tangled messes as systems grow. Domain-Driven Design provides excellent domain modeling but struggles with hardware integration and real-time constraints. Clean Architecture offers beautiful separation of concerns but treats hardware as just another replaceable component, which simply does not work when your system must read sensor data every hundred microseconds.

Capability-Centric Architecture, or CCA for short, emerged from analyzing why these existing patterns fail when systems must evolve, integrate new technologies like artificial intelligence and containerization, or span from microcontrollers to cloud platforms. Instead of treating these as separate problems requiring separate solutions, CCA provides a unified conceptual framework with built-in mechanisms for managing complexity, dependencies, and change.

In this tutorial, we will build your understanding from the ground up. We will start with the core concepts, move through detailed implementation steps, and finish with real-world examples that demonstrate how CCA works in practice. By the end, you will understand not just what CCA is, but how to apply it to your own systems.

STEP ONE: UNDERSTANDING THE CAPABILITY NUCLEUS

The foundation of Capability-Centric Architecture is the Capability Nucleus. Think of a nucleus as a structured way to organize a cohesive piece of functionality that delivers value. Every capability in your system follows this same structural pattern, whether it controls a motor in an industrial robot or processes millions of payment transactions in a cloud platform.

The Capability Nucleus consists of three concentric layers, each with a distinct purpose and different rules about what it can depend on. Understanding these layers is crucial because they provide the separation of concerns that makes CCA so powerful.

The innermost layer is called the Essence. This layer contains pure domain logic or algorithmic core that defines what the capability does. For a temperature control capability, the Essence contains the control algorithm itself. For a payment processing capability, the Essence contains the business rules for validating and executing payments. The critical characteristic of the Essence is that it has no dependencies on anything outside itself except for capability contracts, which we will discuss shortly.

Let us look at a simple example to make this concrete. Imagine we are building a temperature control system. The Essence would look something like this:

public class TemperatureControlEssence {
    private final ControlParameters parameters;
    
    public TemperatureControlEssence(ControlParameters parameters) {
        this.parameters = parameters;
    }
    
    public double calculateControl(double currentTemp, double targetTemp) {
        double error = targetTemp - currentTemp;
        double proportional = parameters.getKp() * error;
        double integral = parameters.getKi() * accumulatedError;
        double derivative = parameters.getKd() * (error - previousError);
        
        double output = proportional + integral + derivative;
        
        previousError = error;
        accumulatedError += error;
        
        return clamp(output, 0.0, 1.0);
    }
    
    private double clamp(double value, double min, double max) {
        if (value < min) return min;
        if (value > max) return max;
        return value;
    }
    
    private double previousError = 0.0;
    private double accumulatedError = 0.0;
}

Notice how this code contains pure logic with no external dependencies. There are no database calls, no hardware register reads, no network communication. This is intentional and powerful. Because the Essence has no infrastructure dependencies, you can test it in milliseconds without any setup. You can run thousands of test cases to verify the control algorithm works correctly under all conditions. You can even prove mathematical properties about the algorithm if needed.

The middle layer is called the Realization. This layer contains the necessary mechanisms to make the Essence work in the real world. For embedded systems, this includes hardware access, interrupt handlers, and direct memory access controllers. For enterprise systems, this includes database access, message queue integration, and API implementations. The Realization depends on both the Essence and the external technical infrastructure.

Continuing our temperature control example, the Realization might look like this:

public class TemperatureControlRealization {
    private final TemperatureControlEssence essence;
    private static final int TEMP_SENSOR_REGISTER = 0x40001000;
    private static final int HEATER_CONTROL_REGISTER = 0x40002000;
    
    public TemperatureControlRealization(TemperatureControlEssence essence) {
        this.essence = essence;
    }
    
    public void initialize() {
        configureSensor();
        configureHeater();
    }
    
    public void controlLoop() {
        double currentTemp = readTemperature();
        double targetTemp = getTargetTemperature();
        
        double controlOutput = essence.calculateControl(currentTemp, targetTemp);
        
        setHeaterPower(controlOutput);
    }
    
    private double readTemperature() {
        int rawValue = readRegisterDirect(TEMP_SENSOR_REGISTER);
        return convertToTemperature(rawValue);
    }
    
    private void setHeaterPower(double power) {
        int rawValue = convertToPWM(power);
        writeRegisterDirect(HEATER_CONTROL_REGISTER, rawValue);
    }
    
    private native int readRegisterDirect(int address);
    private native void writeRegisterDirect(int address, int value);
    
    private void configureSensor() {
        // Hardware-specific sensor configuration
    }
    
    private void configureHeater() {
        // Hardware-specific heater configuration
    }
    
    private double convertToTemperature(int rawValue) {
        return rawValue * 0.01;
    }
    
    private int convertToPWM(double power) {
        return (int)(power * 255);
    }
    
    private double targetTemperature = 25.0;
    
    private double getTargetTemperature() {
        return targetTemperature;
    }
}

The Realization bridges the gap between pure logic and physical reality. It handles all the messy details of hardware interaction while keeping the Essence clean and testable. Notice how the Realization uses the Essence by calling its calculateControl method, but adds all the infrastructure code needed to read sensors and control heaters.

The outermost layer is called the Adaptation. This layer provides the interfaces through which other capabilities interact with this capability and through which this capability interacts with external systems. Unlike traditional adapters which are often one-directional, Adaptations in CCA are bidirectional and can have different scopes depending on the needs of the capability.

For our temperature control example, the Adaptation provides a clean interface for other capabilities:

public class TemperatureControlAdaptation {
    private final TemperatureControlRealization realization;
    
    public TemperatureControlAdaptation(TemperatureControlRealization realization) {
        this.realization = realization;
    }
    
    public void start() {
        realization.initialize();
    }
    
    public void stop() {
        // Cleanup and shutdown
    }
    
    public TemperatureStatus getStatus() {
        return new TemperatureStatus(
            realization.getCurrentTemperature(),
            realization.getTargetTemperature(),
            realization.isControlActive()
        );
    }
    
    public void setTargetTemperature(double temperature) {
        realization.setTargetTemperature(temperature);
    }
}

The Adaptation provides a high-level interface that other capabilities can use without knowing anything about hardware registers or control algorithms. This separation allows each layer to evolve independently.

The key insight is that whether you are building an embedded temperature controller or an enterprise payment processor, the same structural pattern applies. The Essence contains pure domain logic. The Realization integrates with infrastructure, whether that is hardware registers or databases and message queues. The Adaptation provides interfaces for external interaction.

STEP TWO: DEFINING CAPABILITY CONTRACTS

Capabilities do not interact with each other through direct dependencies. Instead, they interact through Contracts. This is a fundamental principle of CCA that enables independent evolution and prevents the tangled dependency graphs that plague traditional architectures.

A Capability Contract consists of three parts. The Provision declares what the capability offers to others. The Requirement declares what the capability needs from others. The Protocol defines the supported interaction patterns and quality attributes.

Let us define a contract for our temperature monitoring capability:

public interface TemperatureMonitoringContract {
    
    interface Provision {
        void subscribeToTemperature(TemperatureSubscriber subscriber, int updateRateHz);
        
        double getCurrentTemperature();
        
        TemperatureHistory getHistory(Timestamp startTime, Timestamp endTime);
    }
    
    interface Requirement {
        CalibrationParameters getCalibration(String sensorId);
        
        void onCalibrationChange(CalibrationChangeListener listener);
    }
    
    interface Protocol {
        int MAX_LATENCY_MS = 10;
        int MIN_UPDATE_RATE_HZ = 1;
        int MAX_UPDATE_RATE_HZ = 1000;
        
        enum InteractionPattern {
            SYNCHRONOUS_QUERY,
            ASYNCHRONOUS_SUBSCRIBE,
            BATCH_QUERY
        }
    }
}

This contract explicitly states what the temperature monitoring capability provides to other capabilities. It can deliver temperature updates through subscriptions, provide current readings on demand, and return historical data. The contract also states what this capability needs from others, specifically calibration data from a calibration management capability. Finally, the protocol section defines quality attributes like maximum latency and supported update rates, plus the interaction patterns that consumers can use.

Contracts enable capabilities to evolve independently. As long as a capability continues to fulfill its contract, its internal implementation can change without affecting other capabilities. This is similar to interface-based programming, but contracts are richer. They include quality attributes, interaction patterns, and both provisions and requirements.

The power of contracts becomes clear when you consider evolution. Suppose we want to improve our temperature monitoring capability by adding a new machine learning-based anomaly detection feature. We can add this to the Provision interface as a new method:

interface Provision {
    void subscribeToTemperature(TemperatureSubscriber subscriber, int updateRateHz);
    
    double getCurrentTemperature();
    
    TemperatureHistory getHistory(Timestamp startTime, Timestamp endTime);
    
    AnomalyReport detectAnomalies(Timestamp startTime, Timestamp endTime);
}

Existing consumers of the contract continue to work without any changes because we only added a new method. This is backward compatible evolution. Consumers that want to use the new anomaly detection feature can start calling the new method when they are ready.

STEP THREE: IMPLEMENTING EFFICIENCY GRADIENTS

One of the most innovative aspects of CCA is the concept of Efficiency Gradients. This addresses a fundamental challenge in building systems that span from embedded to enterprise domains. Some operations must execute with minimal overhead, direct hardware access, and predictable timing. Other operations can tolerate more abstraction in exchange for flexibility and maintainability.

An Efficiency Gradient allows different parts of the system to operate at different levels of abstraction and optimization. Critical paths can use direct hardware access with minimal indirection. Less critical paths can use higher abstractions and more flexible implementations.

Consider a data acquisition system that reads multiple sensors. The sensor reading itself must be fast and deterministic. The data processing can be more flexible. The data storage can be even more abstract. Let us see how this works in code:

public class DataAcquisitionCapability {
    
    private static final int SENSOR_BASE_ADDRESS = 0x40000000;
    private static final int SENSOR_COUNT = 8;
    
    public void sensorInterruptHandler() {
        for (int i = 0; i < SENSOR_COUNT; i++) {
            int rawValue = readRegisterDirect(SENSOR_BASE_ADDRESS + (i * 4));
            sensorBuffer[i] = rawValue;
        }
        
        dataReady = true;
    }
    
    public void processData() {
        if (!dataReady) return;
        
        SensorReading[] readings = new SensorReading[SENSOR_COUNT];
        for (int i = 0; i < SENSOR_COUNT; i++) {
            readings[i] = new SensorReading(
                i,
                convertToPhysicalValue(sensorBuffer[i]),
                System.currentTimeMillis()
            );
        }
        
        SensorReading[] filtered = applyFiltering(readings);
        
        storageQueue.add(filtered);
        
        dataReady = false;
    }
    
    public void storeData() {
        List<SensorReading[]> batch = new ArrayList<>();
        storageQueue.drainTo(batch);
        
        if (batch.isEmpty()) return;
        
        storage.beginTransaction();
        try {
            for (SensorReading[] readings : batch) {
                for (SensorReading reading : readings) {
                    storage.insert(reading);
                }
            }
            storage.commit();
            
            analytics.processNewData(batch);
            
        } catch (Exception e) {
            storage.rollback();
        }
    }
    
    private int[] sensorBuffer = new int[SENSOR_COUNT];
    private volatile boolean dataReady = false;
    private Queue<SensorReading[]> storageQueue = new ConcurrentLinkedQueue<>();
    
    private native int readRegisterDirect(int address);
    
    private double convertToPhysicalValue(int rawValue) {
        return rawValue * 0.001;
    }
    
    private SensorReading[] applyFiltering(SensorReading[] readings) {
        // Apply filtering algorithms
        return readings;
    }
}

Notice the three different efficiency gradients in this code. The sensorInterruptHandler runs at the highest efficiency gradient. It executes in interrupt context with minimal overhead, using direct register reads and a simple array for buffering. No object allocation, no abstractions, just raw performance.

The processData method runs at a medium efficiency gradient. It uses object-oriented design with SensorReading objects, applies filtering algorithms, and uses a concurrent queue for buffering. This is more flexible and maintainable than the interrupt handler, but still reasonably efficient.

The storeData method runs at the lowest efficiency gradient, meaning highest abstraction. It uses database transactions, batch processing, and triggers analytics. This is the most flexible layer where we can easily change storage mechanisms or add new analytics without affecting the critical real-time paths.

This gradient approach is crucial for embedded systems where you must balance real-time performance requirements with software engineering best practices. It is equally valuable for enterprise systems where you want to optimize high-traffic request paths while using more flexible implementations for administrative operations.

STEP FOUR: MANAGING EVOLUTION WITH EVOLUTION ENVELOPES

Systems evolve. Requirements change. Technologies advance. New features get added. Old features get deprecated. Traditional architectures handle this through informal processes and hope that changes do not break things. CCA makes evolution explicit and manageable through Evolution Envelopes.

An Evolution Envelope defines how a capability can change over time while maintaining compatibility with other capabilities. It specifies what can change, what must remain stable, and how changes are communicated. Every capability has an Evolution Envelope that includes version information, deprecation policies, and migration paths.

Here is what an Evolution Envelope looks like:

public class EvolutionEnvelope {
    private final String capabilityName;
    private final Version currentVersion;
    private final List<Version> supportedVersions;
    private final DeprecationPolicy deprecationPolicy;
    private final MigrationGuide migrationGuide;
    
    public boolean isCompatible(Version requiredVersion) {
        if (requiredVersion.getMajor() != currentVersion.getMajor()) {
            return supportedVersions.contains(requiredVersion);
        }
        
        return currentVersion.getMinor() >= requiredVersion.getMinor();
    }
    
    public List<MigrationStep> getMigrationPath(Version fromVersion) {
        return migrationGuide.getPath(fromVersion, currentVersion);
    }
    
    public DeprecationInfo getDeprecationInfo(String featureName) {
        return deprecationPolicy.getDeprecationInfo(featureName);
    }
}

The Evolution Envelope uses semantic versioning with major, minor, and patch version numbers. Patch versions are always compatible and contain only bug fixes. Minor versions add new features while maintaining backward compatibility. Major versions can break compatibility but should be rare and well-planned.

When you need to make a breaking change to a contract, you introduce it as a new major version and maintain the old version for a transition period. The Evolution Envelope documents this transition, provides migration paths, and specifies deprecation policies.

For example, suppose we want to change our temperature monitoring contract to use a more efficient data structure. We would create version two of the contract with the new structure, mark the old methods as deprecated in version one point five, provide a migration tool to help consumers update their code, and document a six-month transition period before removing version one support entirely.

This formal approach to evolution prevents the chaos that often accompanies system changes. Consumers know exactly what is changing, when it will change, and how to adapt. Providers can evolve their implementations without breaking existing consumers.

STEP FIVE: BUILDING A COMPLETE CAPABILITY

Now that we understand the core concepts, let us build a complete capability from scratch. We will create a notification capability that can send alerts and messages through multiple channels like email, SMS, and push notifications. This example will demonstrate all the key aspects of CCA implementation.

First, we define the contract:

public interface NotificationContract {
    
    NotificationResult sendNotification(Notification notification);
    
    List<NotificationResult> sendBatch(List<Notification> notifications);
    
    DeliveryStatus getStatus(String notificationId);
    
    void cancelNotification(String notificationId);
}

This contract is simple and focused. It provides methods to send individual notifications, send batches for efficiency, check delivery status, and cancel pending notifications. Notice that the contract does not specify how notifications are sent, what infrastructure is used, or how the capability is implemented. It only defines what the capability provides.

Next, we implement the Essence layer with pure business logic:

public class NotificationEssence {
    
    public ValidationResult validateNotification(Notification notification) {
        ValidationResult result = new ValidationResult();
        
        if (notification.getRecipient() == null || notification.getRecipient().isEmpty()) {
            result.addError("Recipient is required");
        }
        
        if (notification.getMessage() == null || notification.getMessage().isEmpty()) {
            result.addError("Message is required");
        }
        
        if (notification.getChannel() == NotificationChannel.SMS) {
            if (notification.getMessage().length() > 160) {
                result.addError("SMS messages cannot exceed 160 characters");
            }
        }
        
        if (notification.getChannel() == NotificationChannel.EMAIL) {
            if (!isValidEmail(notification.getRecipient())) {
                result.addError("Invalid email address format");
            }
        }
        
        return result;
    }
    
    public Priority calculatePriority(Notification notification) {
        if (notification.getType() == NotificationType.ALERT) {
            return Priority.HIGH;
        }
        
        if (notification.getType() == NotificationType.REMINDER) {
            return Priority.MEDIUM;
        }
        
        return Priority.LOW;
    }
    
    private boolean isValidEmail(String email) {
        return email.contains("@") && email.contains(".");
    }
}

The Essence contains pure validation logic and priority calculation. There are no infrastructure dependencies, no database calls, no external service invocations. This makes it trivially easy to test. We can write hundreds of test cases that execute in milliseconds to verify all the business rules work correctly.

Now we implement the Realization layer that integrates with infrastructure:

public class NotificationRealization {
    private final NotificationEssence essence;
    private final EmailService emailService;
    private final SMSService smsService;
    private final PushNotificationService pushService;
    private final NotificationQueue queue;
    private final NotificationRepository repository;
    
    public NotificationRealization(
        NotificationEssence essence,
        EmailService emailService,
        SMSService smsService,
        PushNotificationService pushService,
        NotificationQueue queue,
        NotificationRepository repository
    ) {
        this.essence = essence;
        this.emailService = emailService;
        this.smsService = smsService;
        this.pushService = pushService;
        this.queue = queue;
        this.repository = repository;
    }
    
    public NotificationResult send(Notification notification) {
        ValidationResult validation = essence.validateNotification(notification);
        if (!validation.isValid()) {
            return NotificationResult.validationFailure(validation.getErrors());
        }
        
        Priority priority = essence.calculatePriority(notification);
        
        String notificationId = generateId();
        notification.setId(notificationId);
        
        repository.save(notification);
        
        if (priority == Priority.HIGH) {
            return sendImmediately(notification);
        } else {
            queue.enqueue(notification);
            return NotificationResult.queued(notificationId);
        }
    }
    
    private NotificationResult sendImmediately(Notification notification) {
        try {
            switch (notification.getChannel()) {
                case EMAIL:
                    emailService.send(
                        notification.getRecipient(),
                        notification.getSubject(),
                        notification.getMessage()
                    );
                    break;
                case SMS:
                    smsService.send(
                        notification.getRecipient(),
                        notification.getMessage()
                    );
                    break;
                case PUSH:
                    pushService.send(
                        notification.getRecipient(),
                        notification.getMessage()
                    );
                    break;
            }
            
            repository.updateStatus(notification.getId(), DeliveryStatus.SENT);
            return NotificationResult.success(notification.getId());
            
        } catch (Exception e) {
            repository.updateStatus(notification.getId(), DeliveryStatus.FAILED);
            return NotificationResult.failure(notification.getId(), e.getMessage());
        }
    }
    
    public List<NotificationResult> sendBatch(List<Notification> notifications) {
        Map<NotificationChannel, List<Notification>> grouped = groupByChannel(notifications);
        
        List<NotificationResult> results = new ArrayList<>();
        
        for (Map.Entry<NotificationChannel, List<Notification>> entry : grouped.entrySet()) {
            results.addAll(sendBatchThroughChannel(entry.getKey(), entry.getValue()));
        }
        
        return results;
    }
    
    private Map<NotificationChannel, List<Notification>> groupByChannel(
        List<Notification> notifications
    ) {
        Map<NotificationChannel, List<Notification>> grouped = new HashMap<>();
        
        for (Notification notification : notifications) {
            grouped.computeIfAbsent(
                notification.getChannel(),
                k -> new ArrayList<>()
            ).add(notification);
        }
        
        return grouped;
    }
    
    private List<NotificationResult> sendBatchThroughChannel(
        NotificationChannel channel,
        List<Notification> notifications
    ) {
        // Channel-specific batch sending logic
        return new ArrayList<>();
    }
    
    private String generateId() {
        return "NOTIF-" + System.currentTimeMillis();
    }
}

The Realization brings together the Essence and all the infrastructure services. It uses the Essence for validation and priority calculation, then handles all the messy details of actually sending notifications through various channels, managing queues, and updating the repository.

Next, we implement the Adaptation layer:

public class NotificationAdaptation {
    private final NotificationRealization realization;
    
    public NotificationAdaptation(NotificationRealization realization) {
        this.realization = realization;
    }
    
    public HttpResponse handleNotificationRequest(HttpRequest request) {
        try {
            Notification notification = parseNotification(request);
            NotificationResult result = realization.send(notification);
            
            if (result.isSuccessful()) {
                return HttpResponse.ok(serializeResult(result));
            } else {
                return HttpResponse.badRequest(result.getErrorMessage());
            }
        } catch (Exception e) {
            return HttpResponse.internalServerError(e.getMessage());
        }
    }
    
    public void handleNotificationMessage(Message message) {
        Notification notification = deserializeNotification(message.getBody());
        realization.send(notification);
    }
    
    private Notification parseNotification(HttpRequest request) {
        // Parse JSON or XML from HTTP request body
        return new Notification();
    }
    
    private Notification deserializeNotification(byte[] body) {
        // Deserialize from message format
        return new Notification();
    }
    
    private String serializeResult(NotificationResult result) {
        // Serialize to JSON or XML
        return "{}";
    }
}

The Adaptation provides multiple interfaces for accessing the capability. It can handle HTTP requests for synchronous notification sending, or it can consume messages from a queue for asynchronous processing. This flexibility allows the same capability to be used in different contexts without changing its core implementation.

Finally, we bring it all together in the complete capability:

public class NotificationCapability implements CapabilityInstance {
    private final NotificationEssence essence;
    private final NotificationRealization realization;
    private final NotificationAdaptation adaptation;
    private final EvolutionEnvelope evolutionEnvelope;
    
    public NotificationCapability(
        EmailService emailService,
        SMSService smsService,
        PushNotificationService pushService,
        NotificationQueue queue,
        NotificationRepository repository
    ) {
        this.essence = new NotificationEssence();
        this.realization = new NotificationRealization(
            essence,
            emailService,
            smsService,
            pushService,
            queue,
            repository
        );
        this.adaptation = new NotificationAdaptation(realization);
        this.evolutionEnvelope = createEvolutionEnvelope();
    }
    
    public void initialize() {
        // Initialize infrastructure services
    }
    
    public void start() {
        // Start message queue consumer
        // Start retry processor
    }
    
    public void stop() {
        // Stop consumers
        // Flush queues
    }
    
    public Object getContractImplementation(Class<?> contractType) {
        if (contractType == NotificationContract.class) {
            return new NotificationContractImpl(realization);
        }
        return null;
    }
    
    public void injectDependency(Class<?> contractType, Object implementation) {
        // This capability has no dependencies
    }
    
    public EvolutionEnvelope getEvolutionEnvelope() {
        return evolutionEnvelope;
    }
    
    private EvolutionEnvelope createEvolutionEnvelope() {
        return new EvolutionEnvelope(
            "NotificationCapability",
            new Version(1, 0, 0),
            Arrays.asList(new Version(1, 0, 0)),
            new DeprecationPolicy(),
            new MigrationGuide()
        );
    }
}

This complete example demonstrates all the key aspects of implementing a capability. The contract defines what the capability provides. The Essence contains pure business logic. The Realization integrates infrastructure. The Adaptation provides external interfaces. The capability ties everything together and manages the lifecycle.

STEP SIX: INJECTING DEPENDENCIES BETWEEN CAPABILITIES

Capabilities rarely work in isolation. They need services from other capabilities. In CCA, these dependencies are managed through a mechanism called dependency injection based on contracts. This is fundamentally different from traditional dependency injection because it operates at the capability level and uses contracts rather than concrete implementations.

The process works through a Capability Registry that manages all capabilities and their interactions. When a capability needs something from another capability, it declares this need in its contract requirements. The registry then binds the consumer to a provider that fulfills that contract.

Let us see how this works with a concrete example. Suppose our notification capability needs to look up user preferences to determine the best channel for each user. We would first define the contract requirement:

public interface NotificationContract {
    interface Requirement {
        UserPreferences getUserPreferences(String userId);
    }
}

Then in our capability implementation, we add a field to hold the injected dependency:

public class NotificationCapability implements CapabilityInstance {
    private final NotificationEssence essence;
    private final NotificationRealization realization;
    private final NotificationAdaptation adaptation;
    
    private UserPreferencesContract userPreferences;
    
    public void injectDependency(Class<?> contractType, Object implementation) {
        if (contractType == UserPreferencesContract.class) {
            this.userPreferences = (UserPreferencesContract) implementation;
        }
    }
}

The Capability Registry handles the actual injection. It maintains a dependency graph of all capabilities and their requirements. When initializing the system, it performs a topological sort to determine the correct initialization order, ensuring that capabilities are initialized before any capabilities that depend on them.

Here is how the registry manages this:

public class CapabilityRegistry {
    private final Map<String, CapabilityDescriptor> capabilities;
    private final Map<String, List<ContractBinding>> bindings;
    private final DependencyResolver resolver;
    
    public void bindCapabilities(
        String consumer,
        String provider,
        Class<?> contractType
    ) {
        CapabilityDescriptor consumerDesc = capabilities.get(consumer);
        CapabilityDescriptor providerDesc = capabilities.get(provider);
        
        if (!providerDesc.provides(contractType)) {
            throw new IllegalArgumentException(
                provider + " does not provide " + contractType.getName()
            );
        }
        
        if (!consumerDesc.requires(contractType)) {
            throw new IllegalArgumentException(
                consumer + " does not require " + contractType.getName()
            );
        }
        
        if (resolver.wouldCreateCycle(consumer, provider)) {
            throw new CircularDependencyException(
                "Binding would create circular dependency"
            );
        }
        
        ContractBinding binding = new ContractBinding(
            consumerDesc,
            providerDesc,
            contractType
        );
        
        bindings.computeIfAbsent(consumer, k -> new ArrayList<>()).add(binding);
        
        resolver.addDependency(consumer, provider);
    }
    
    public List<String> getInitializationOrder() {
        return resolver.topologicalSort();
    }
}

The registry prevents circular dependencies by checking the dependency graph before creating bindings. If a binding would create a cycle, the registry rejects it and forces you to restructure your capabilities. This is one of the key mechanisms for avoiding architectural antipatterns.

The Lifecycle Manager uses the registry to initialize capabilities in the correct order:

public class CapabilityLifecycleManager {
    private final CapabilityRegistry registry;
    private final Map<String, CapabilityInstance> instances;
    
    public void initializeAll() {
        List<String> initOrder = registry.getInitializationOrder();
        
        for (String capabilityName : initOrder) {
            initializeCapability(capabilityName);
        }
    }
    
    public void initializeCapability(String capabilityName) {
        CapabilityDescriptor descriptor = registry.getCapability(capabilityName);
        
        CapabilityInstance instance = createInstance(descriptor);
        
        injectDependencies(instance, capabilityName);
        
        instance.initialize();
        
        instances.put(capabilityName, instance);
        
        if (descriptor.isAutoStart()) {
            instance.start();
        }
    }
    
    private void injectDependencies(CapabilityInstance instance, String capabilityName) {
        List<ContractBinding> capabilityBindings = registry.getBindings(capabilityName);
        
        for (ContractBinding binding : capabilityBindings) {
            CapabilityInstance provider = instances.get(binding.getProviderName());
            
            if (provider == null) {
                throw new DependencyNotAvailableException(
                    capabilityName + " requires " + binding.getProviderName() + 
                    " but it is not initialized"
                );
            }
            
            Object contractImpl = provider.getContractImplementation(binding.getContractType());
            
            instance.injectDependency(binding.getContractType(), contractImpl);
        }
    }
    
    private CapabilityInstance createInstance(CapabilityDescriptor descriptor) {
        try {
            Class<?> capabilityClass = Class.forName(descriptor.getImplementationClass());
            return (CapabilityInstance) capabilityClass.getDeclaredConstructor().newInstance();
        } catch (Exception e) {
            throw new CapabilityInstantiationException(
                "Failed to create instance of " + descriptor.getName(),
                e
            );
        }
    }
}

This dependency injection mechanism ensures that capabilities are properly initialized with all their dependencies before they start executing. It eliminates a common source of initialization bugs where components try to use dependencies that are not yet available.

STEP SEVEN: TESTING YOUR CAPABILITIES

Testing is where the layered structure of CCA really shines. The separation of Essence, Realization, and Adaptation allows different testing strategies for different parts of the capability.

The Essence can be tested with pure unit tests that require no infrastructure. Because the Essence has no external dependencies, tests are fast, deterministic, and easy to write. Let us look at tests for our notification Essence:

public class NotificationEssenceTest {
    private NotificationEssence essence;
    
    public void setUp() {
        essence = new NotificationEssence();
    }
    
    public void testValidateNotification_ValidNotification_ReturnsValid() {
        Notification notification = new Notification();
        notification.setRecipient("user@example.com");
        notification.setMessage("Test message");
        notification.setChannel(NotificationChannel.EMAIL);
        
        ValidationResult result = essence.validateNotification(notification);
        
        assertTrue(result.isValid());
        assertEquals(0, result.getErrors().size());
    }
    
    public void testValidateNotification_MissingRecipient_ReturnsInvalid() {
        Notification notification = new Notification();
        notification.setMessage("Test message");
        notification.setChannel(NotificationChannel.EMAIL);
        
        ValidationResult result = essence.validateNotification(notification);
        
        assertFalse(result.isValid());
        assertTrue(result.getErrors().contains("Recipient is required"));
    }
    
    public void testValidateNotification_SMSTooLong_ReturnsInvalid() {
        Notification notification = new Notification();
        notification.setRecipient("+1234567890");
        notification.setMessage("This message is way too long for SMS and exceeds the one hundred sixty character limit that is enforced by the validation logic in the essence layer");
        notification.setChannel(NotificationChannel.SMS);
        
        ValidationResult result = essence.validateNotification(notification);
        
        assertFalse(result.isValid());
        assertTrue(result.getErrors().contains("SMS messages cannot exceed 160 characters"));
    }
    
    public void testCalculatePriority_AlertType_ReturnsHigh() {
        Notification notification = new Notification();
        notification.setType(NotificationType.ALERT);
        
        Priority priority = essence.calculatePriority(notification);
        
        assertEquals(Priority.HIGH, priority);
    }
}

These tests run in milliseconds and provide complete coverage of the business logic. You can run thousands of them as part of your continuous integration pipeline without any performance impact.

The Realization requires integration tests that verify infrastructure interaction. However, we can use mocks to avoid depending on actual infrastructure:

public class NotificationRealizationTest {
    private NotificationEssence essence;
    private EmailService mockEmailService;
    private SMSService mockSMSService;
    private NotificationRepository mockRepository;
    private NotificationRealization realization;
    
    public void setUp() {
        essence = new NotificationEssence();
        mockEmailService = mock(EmailService.class);
        mockSMSService = mock(SMSService.class);
        mockRepository = mock(NotificationRepository.class);
        
        realization = new NotificationRealization(
            essence,
            mockEmailService,
            mockSMSService,
            mock(PushNotificationService.class),
            mock(NotificationQueue.class),
            mockRepository
        );
    }
    
    public void testSend_ValidEmailNotification_CallsEmailService() {
        Notification notification = new Notification();
        notification.setRecipient("user@example.com");
        notification.setMessage("Test message");
        notification.setChannel(NotificationChannel.EMAIL);
        notification.setType(NotificationType.ALERT);
        
        NotificationResult result = realization.send(notification);
        
        assertTrue(result.isSuccessful());
        verify(mockEmailService).send(
            eq("user@example.com"),
            anyString(),
            eq("Test message")
        );
        verify(mockRepository).save(notification);
    }
    
    public void testSend_EmailServiceFails_ReturnsFailure() {
        Notification notification = new Notification();
        notification.setRecipient("user@example.com");
        notification.setMessage("Test message");
        notification.setChannel(NotificationChannel.EMAIL);
        notification.setType(NotificationType.ALERT);
        
        when(mockEmailService.send(anyString(), anyString(), anyString()))
            .thenThrow(new EmailServiceException("Service unavailable"));
        
        NotificationResult result = realization.send(notification);
        
        assertFalse(result.isSuccessful());
        verify(mockRepository).updateStatus(anyString(), eq(DeliveryStatus.FAILED));
    }
}

These tests verify that the Realization correctly integrates with infrastructure services. They run in seconds rather than milliseconds because of the mocking overhead, but they still do not require actual infrastructure to be running.

Contract tests verify that capabilities correctly implement their contracts:

public class NotificationContractTest {
    private NotificationContract contract;
    
    public void setUp() {
        NotificationCapability capability = createTestCapability();
        contract = (NotificationContract) capability.getContractImplementation(
            NotificationContract.class
        );
    }
    
    public void testContract_SendNotification_ReturnsResult() {
        Notification notification = createValidNotification();
        
        NotificationResult result = contract.sendNotification(notification);
        
        assertNotNull(result);
    }
    
    public void testContract_SendBatch_ReturnsResultsForAll() {
        List<Notification> notifications = Arrays.asList(
            createValidNotification(),
            createValidNotification(),
            createValidNotification()
        );
        
        List<NotificationResult> results = contract.sendBatch(notifications);
        
        assertEquals(notifications.size(), results.size());
    }
    
    private NotificationCapability createTestCapability() {
        return new NotificationCapability(
            new TestEmailService(),
            new TestSMSService(),
            new TestPushService(),
            new TestNotificationQueue(),
            new TestNotificationRepository()
        );
    }
    
    private Notification createValidNotification() {
        Notification notification = new Notification();
        notification.setRecipient("test@example.com");
        notification.setMessage("Test message");
        notification.setChannel(NotificationChannel.EMAIL);
        return notification;
    }
}

These testing strategies provide comprehensive coverage while keeping tests fast and maintainable. The Essence tests are pure unit tests that run in milliseconds. The Realization tests use mocks to verify infrastructure integration. Contract tests ensure that capabilities fulfill their promises.

CONCLUSION: PUTTING IT ALL TOGETHER

Capability-Centric Architecture provides a unified architectural pattern that works equally well for embedded and enterprise systems. By organizing systems around capabilities structured as nuclei with Essence, Realization, and Adaptation layers, software engineers achieve a separation of concerns that enables independent evolution, testing, and deployment.

The pattern addresses fundamental architectural challenges that have plagued software development for decades. Circular dependencies are prevented through contract-based interaction and dependency graph management. Technology dependencies are isolated in the Realization layer, allowing technologies to be replaced without affecting business logic. Quality attributes are addressed explicitly through contracts and efficiency gradients.

For embedded systems, efficiency gradients allow critical paths to use direct hardware access while non-critical paths use higher abstractions. This balances real-time performance requirements with software engineering best practices. Resource contracts make resource usage explicit and manageable.

For enterprise systems, contract-based interaction enables independent deployment and scaling of capabilities. Evolution envelopes provide a formal mechanism for managing change over time. Support for modern technologies like artificial intelligence, big data, and containerization is built into the architecture rather than added as an afterthought.

The architecture is practical to implement. Capabilities follow a clear structure that developers can consistently understand and apply. Testing strategies leverage the separation of Essence, Realization, and Adaptation to provide comprehensive coverage with fast, maintainable tests. Deployment flexibility allows the same capability code to run in different environments, from embedded devices to cloud platforms.

By following the principles outlined in this tutorial, you can build systems that are easier to understand, test, deploy, and evolve over time, whether those systems control industrial machinery, process billions of transactions, or anything in between.


Building a Natural Language Interface for Email Management Using Local LLMs




Introduction and Problem Analysis


The proliferation of email communication has created a significant challenge for users managing large volumes of messages across multiple accounts and folders. Traditional email clients require users to navigate complex graphical interfaces, remember specific menu locations, and perform repetitive tasks manually. This article presents a comprehensive approach to building a natural language interface that allows users to interact with their email applications using conversational commands processed by a local Large Language Model (LLM).

The core challenge lies in bridging the gap between human natural language expressions and the structured API calls required by email applications. Users naturally express their intentions using varied linguistic patterns, temporal references, and contextual assumptions that must be accurately interpreted and translated into precise programmatic actions. Additionally, privacy concerns necessitate the use of local LLM processing rather than cloud-based solutions.


System Architecture Overview


The proposed system follows a layered architecture that separates concerns and maintains modularity. The architecture consists of five primary components: the Natural Language Processing Layer, the Intent Recognition Engine, the Email Abstraction Layer, the Vendor-Specific Adapters, and the Security and Confirmation Layer.

The Natural Language Processing Layer receives user input and performs initial text preprocessing, including tokenization, normalization, and context extraction. This layer interfaces directly with the local LLM to generate structured interpretations of user commands.

The Intent Recognition Engine analyzes the LLM output to identify specific actions, extract parameters, and validate the completeness of the request. This component maintains a registry of supported operations and their required parameters.

The Email Abstraction Layer provides a unified interface for email operations, abstracting away vendor-specific implementation details. This layer defines standard operations such as message retrieval, folder management, and account configuration.

The Vendor-Specific Adapters implement the abstract email operations for particular email clients. For Apple Mail, this involves interfacing with the Mail.app through AppleScript or the EventKit framework.

The Security and Confirmation Layer manages user authentication, validates permissions, and handles confirmation workflows for potentially destructive operations.


Natural Language Processing Implementation


The natural language processing component serves as the primary interface between user input and system interpretation. This component must handle the inherent ambiguity and variability of human language while extracting precise operational parameters.


import json

import re

from typing import Dict, List, Optional, Tuple

from dataclasses import dataclass

from enum import Enum


class ActionType(Enum):

    DELETE_MESSAGES = "delete_messages"

    MOVE_MESSAGES = "move_messages"

    LIST_MESSAGES = "list_messages"

    CREATE_ACCOUNT = "create_account"

    SEARCH_MESSAGES = "search_messages"


@dataclass

class EmailCommand:

    action: ActionType

    parameters: Dict[str, any]

    confidence: float

    requires_confirmation: bool


class NaturalLanguageProcessor:

    def __init__(self, llm_model_path: str):

        """

        Initialize the NLP processor with a local LLM model.

        

        Args:

            llm_model_path: Path to the local LLM model file

        """

        self.llm_model = self._load_local_llm(llm_model_path)

        self.command_patterns = self._initialize_command_patterns()

        

    def _load_local_llm(self, model_path: str):

        """

        Load and initialize the local LLM model for processing.

        This implementation assumes a model compatible with the llama.cpp

        Python bindings for local inference.

        """

        try:

            from llama_cpp import Llama

            return Llama(

                model_path=model_path,

                n_ctx=2048,

                n_threads=4,

                verbose=False

            )

        except ImportError:

            raise ImportError("llama-cpp-python required for local LLM processing")

    

    def _initialize_command_patterns(self) -> Dict[str, str]:

        """

        Initialize regex patterns for common email command structures.

        These patterns serve as fallback mechanisms when LLM processing

        fails or produces low-confidence results.

        """

        return {

            'delete_subject': r'delete.*mails?.*subject\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',

            'move_regex': r'move.*mails?.*contain\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',

            'list_sender': r'list.*mails?.*account\s+([^\s]+).*sent by\s+["\']?([^"\']+)["\']?',

            'create_account': r'create.*mail account.*for\s+([^\s]+)',

            'move_receiver': r'move.*mails?.*receiver\s+([^\s]+).*folder\s+["\']?([^"\']+)["\']?'

        }

    

    def process_command(self, user_input: str) -> EmailCommand:

        """

        Process natural language input and extract structured command information.

        

        Args:

            user_input: Raw natural language command from user

            

        Returns:

            EmailCommand object containing parsed action and parameters

        """

        # Preprocess the input text

        normalized_input = self._preprocess_text(user_input)

        

        # Generate LLM prompt for command interpretation

        prompt = self._create_interpretation_prompt(normalized_input)

        

        # Get LLM response

        llm_response = self._query_llm(prompt)

        

        # Parse LLM response into structured format

        try:

            command = self._parse_llm_response(llm_response)

            if command.confidence > 0.7:

                return command

        except Exception as e:

            print(f"LLM parsing failed: {e}")

        

        # Fallback to pattern matching if LLM fails

        return self._fallback_pattern_matching(normalized_input)

    

    def _preprocess_text(self, text: str) -> str:

        """

        Normalize and clean input text for better processing.

        """

        # Convert to lowercase for consistency

        text = text.lower().strip()

        

        # Normalize whitespace

        text = re.sub(r'\s+', ' ', text)

        

        # Handle common abbreviations

        text = text.replace("e-mail", "email")

        text = text.replace("e-mails", "emails")

        

        return text

    

    def _create_interpretation_prompt(self, user_input: str) -> str:

        """

        Create a structured prompt for the LLM to interpret email commands.

        """

        prompt = f"""

You are an email command interpreter. Parse the following user command and return a JSON response with the action type and parameters.


Supported actions:

- delete_messages: Delete emails based on criteria

- move_messages: Move emails to a different folder

- list_messages: List emails matching criteria

- create_account: Create a new email account

- search_messages: Search for emails


User command: "{user_input}"


Return JSON format:

{{

    "action": "action_type",

    "parameters": {{

        "subject": "email subject (if applicable)",

        "folder": "folder name (if applicable)",

        "sender": "sender email/name (if applicable)",

        "receiver": "receiver email (if applicable)",

        "account": "email account (if applicable)",

        "regex_pattern": "regex pattern (if applicable)",

        "destination_folder": "target folder for moves (if applicable)"

    }},

    "confidence": 0.95,

    "requires_confirmation": true

}}


JSON Response:

"""

        return prompt

    

    def _query_llm(self, prompt: str) -> str:

        """

        Query the local LLM with the interpretation prompt.

        """

        response = self.llm_model(

            prompt,

            max_tokens=512,

            temperature=0.1,

            stop=["User command:", "\n\n"]

        )

        return response['choices'][0]['text'].strip()

    

    def _parse_llm_response(self, response: str) -> EmailCommand:

        """

        Parse the LLM JSON response into an EmailCommand object.

        """

        try:

            # Extract JSON from response

            json_start = response.find('{')

            json_end = response.rfind('}') + 1

            json_str = response[json_start:json_end]

            

            parsed = json.loads(json_str)

            

            action = ActionType(parsed['action'])

            parameters = {k: v for k, v in parsed['parameters'].items() if v}

            confidence = float(parsed.get('confidence', 0.5))

            requires_confirmation = bool(parsed.get('requires_confirmation', True))

            

            return EmailCommand(

                action=action,

                parameters=parameters,

                confidence=confidence,

                requires_confirmation=requires_confirmation

            )

        except (json.JSONDecodeError, KeyError, ValueError) as e:

            raise ValueError(f"Failed to parse LLM response: {e}")

    

    def _fallback_pattern_matching(self, text: str) -> EmailCommand:

        """

        Fallback pattern matching when LLM processing fails.

        """

        for pattern_name, pattern in self.command_patterns.items():

            match = re.search(pattern, text, re.IGNORECASE)

            if match:

                return self._create_command_from_pattern(pattern_name, match.groups())

        

        # Default fallback

        return EmailCommand(

            action=ActionType.SEARCH_MESSAGES,

            parameters={"query": text},

            confidence=0.3,

            requires_confirmation=True

        )

    

    def _create_command_from_pattern(self, pattern_name: str, groups: Tuple[str, ...]) -> EmailCommand:

        """

        Create EmailCommand from regex pattern matches.

        """

        if pattern_name == 'delete_subject':

            return EmailCommand(

                action=ActionType.DELETE_MESSAGES,

                parameters={"subject": groups[0], "folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'move_regex':

            return EmailCommand(

                action=ActionType.MOVE_MESSAGES,

                parameters={"regex_pattern": groups[0], "destination_folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'list_sender':

            return EmailCommand(

                action=ActionType.LIST_MESSAGES,

                parameters={"account": groups[0], "sender": groups[1]},

                confidence=0.8,

                requires_confirmation=False

            )

        elif pattern_name == 'create_account':

            return EmailCommand(

                action=ActionType.CREATE_ACCOUNT,

                parameters={"email_address": groups[0]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'move_receiver':

            return EmailCommand(

                action=ActionType.MOVE_MESSAGES,

                parameters={"receiver": groups[0], "destination_folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        

        return EmailCommand(

            action=ActionType.SEARCH_MESSAGES,

            parameters={"query": " ".join(groups)},

            confidence=0.5,

            requires_confirmation=True

        )


Email Abstraction Layer Design


The email abstraction layer provides a unified interface that isolates the application logic from vendor-specific implementation details. This design enables support for multiple email clients while maintaining a consistent programming interface.


from abc import ABC, abstractmethod

from typing import List, Dict, Optional, Any

from dataclasses import dataclass

from datetime import datetime


@dataclass

class EmailMessage:

    """

    Unified representation of an email message across different clients.

    """

    message_id: str

    subject: str

    sender: str

    recipients: List[str]

    cc_recipients: List[str]

    bcc_recipients: List[str]

    body: str

    html_body: Optional[str]

    date_sent: datetime

    date_received: datetime

    folder: str

    account: str

    is_read: bool

    is_flagged: bool

    attachments: List[str]

    headers: Dict[str, str]


@dataclass

class EmailFolder:

    """

    Representation of an email folder structure.

    """

    name: str

    path: str

    message_count: int

    unread_count: int

    subfolders: List['EmailFolder']

    account: str


@dataclass

class EmailAccount:

    """

    Representation of an email account configuration.

    """

    email_address: str

    display_name: str

    account_type: str  # IMAP, POP3, Exchange, etc.

    server_settings: Dict[str, Any]

    is_enabled: bool

    folders: List[EmailFolder]


class EmailClientInterface(ABC):

    """

    Abstract interface defining operations that email clients must implement.

    This interface ensures consistency across different email client implementations.

    """

    

    @abstractmethod

    def connect(self) -> bool:

        """

        Establish connection to the email client.

        

        Returns:

            True if connection successful, False otherwise

        """

        pass

    

    @abstractmethod

    def disconnect(self) -> None:

        """

        Close connection to the email client.

        """

        pass

    

    @abstractmethod

    def get_accounts(self) -> List[EmailAccount]:

        """

        Retrieve all configured email accounts.

        

        Returns:

            List of EmailAccount objects

        """

        pass

    

    @abstractmethod

    def get_folders(self, account: str) -> List[EmailFolder]:

        """

        Get folder structure for a specific account.

        

        Args:

            account: Email account identifier

            

        Returns:

            List of EmailFolder objects

        """

        pass

    

    @abstractmethod

    def get_messages(self, account: str, folder: str, 

                    limit: Optional[int] = None,

                    filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:

        """

        Retrieve messages from a specific folder.

        

        Args:

            account: Email account identifier

            folder: Folder name or path

            limit: Maximum number of messages to retrieve

            filters: Dictionary of filter criteria

            

        Returns:

            List of EmailMessage objects

        """

        pass

    

    @abstractmethod

    def search_messages(self, account: str, query: str,

                       folder: Optional[str] = None) -> List[EmailMessage]:

        """

        Search for messages matching query criteria.

        

        Args:

            account: Email account identifier

            query: Search query string

            folder: Optional folder to limit search scope

            

        Returns:

            List of matching EmailMessage objects

        """

        pass

    

    @abstractmethod

    def delete_messages(self, message_ids: List[str], 

                       account: str) -> bool:

        """

        Delete specified messages.

        

        Args:

            message_ids: List of message identifiers to delete

            account: Email account identifier

            

        Returns:

            True if deletion successful, False otherwise

        """

        pass

    

    @abstractmethod

    def move_messages(self, message_ids: List[str], 

                     source_folder: str, destination_folder: str,

                     account: str) -> bool:

        """

        Move messages between folders.

        

        Args:

            message_ids: List of message identifiers to move

            source_folder: Source folder name

            destination_folder: Destination folder name

            account: Email account identifier

            

        Returns:

            True if move successful, False otherwise

        """

        pass

    

    @abstractmethod

    def create_folder(self, folder_name: str, parent_folder: str,

                     account: str) -> bool:

        """

        Create a new email folder.

        

        Args:

            folder_name: Name of the new folder

            parent_folder: Parent folder path

            account: Email account identifier

            

        Returns:

            True if creation successful, False otherwise

        """

        pass

    

    @abstractmethod

    def mark_as_read(self, message_ids: List[str], account: str) -> bool:

        """

        Mark messages as read.

        

        Args:

            message_ids: List of message identifiers

            account: Email account identifier

            

        Returns:

            True if operation successful, False otherwise

        """

        pass

    

    @abstractmethod

    def mark_as_unread(self, message_ids: List[str], account: str) -> bool:

        """

        Mark messages as unread.

        

        Args:

            message_ids: List of message identifiers

            account: Email account identifier

            

        Returns:

            True if operation successful, False otherwise

        """

        pass

    

    @abstractmethod

    def create_account(self, account_config: EmailAccount) -> bool:

        """

        Create a new email account configuration.

        

        Args:

            account_config: EmailAccount object with configuration details

            

        Returns:

            True if account creation successful, False otherwise

        """

        pass


class EmailOperationResult:

    """

    Standardized result object for email operations.

    """

    def __init__(self, success: bool, message: str, 

                 affected_count: int = 0, data: Any = None):

        self.success = success

        self.message = message

        self.affected_count = affected_count

        self.data = data

        self.timestamp = datetime.now()


class EmailManager:

    """

    High-level email management class that coordinates operations

    across the abstraction layer.

    """

    

    def __init__(self, client: EmailClientInterface):

        """

        Initialize email manager with a specific client implementation.

        

        Args:

            client: EmailClientInterface implementation

        """

        self.client = client

        self.connected = False

    

    def initialize(self) -> bool:

        """

        Initialize connection to the email client.

        

        Returns:

            True if initialization successful, False otherwise

        """

        try:

            self.connected = self.client.connect()

            return self.connected

        except Exception as e:

            print(f"Failed to initialize email client: {e}")

            return False

    

    def execute_command(self, command: EmailCommand) -> EmailOperationResult:

        """

        Execute a parsed email command through the abstraction layer.

        

        Args:

            command: EmailCommand object containing action and parameters

            

        Returns:

            EmailOperationResult with operation outcome

        """

        if not self.connected:

            return EmailOperationResult(

                success=False,

                message="Email client not connected"

            )

        

        try:

            if command.action == ActionType.DELETE_MESSAGES:

                return self._execute_delete_command(command.parameters)

            elif command.action == ActionType.MOVE_MESSAGES:

                return self._execute_move_command(command.parameters)

            elif command.action == ActionType.LIST_MESSAGES:

                return self._execute_list_command(command.parameters)

            elif command.action == ActionType.CREATE_ACCOUNT:

                return self._execute_create_account_command(command.parameters)

            elif command.action == ActionType.SEARCH_MESSAGES:

                return self._execute_search_command(command.parameters)

            else:

                return EmailOperationResult(

                    success=False,

                    message=f"Unsupported action: {command.action}"

                )

        except Exception as e:

            return EmailOperationResult(

                success=False,

                message=f"Command execution failed: {str(e)}"

            )

    

    def _execute_delete_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute delete messages command.

        """

        account = parameters.get('account', 'default')

        folder = parameters.get('folder', 'Inbox')

        subject = parameters.get('subject')

        

        if not subject:

            return EmailOperationResult(

                success=False,

                message="Subject parameter required for delete operation"

            )

        

        # Find messages matching criteria

        messages = self.client.get_messages(account, folder)

        matching_messages = [

            msg for msg in messages 

            if subject.lower() in msg.subject.lower()

        ]

        

        if not matching_messages:

            return EmailOperationResult(

                success=True,

                message="No messages found matching criteria",

                affected_count=0

            )

        

        # Delete matching messages

        message_ids = [msg.message_id for msg in matching_messages]

        success = self.client.delete_messages(message_ids, account)

        

        return EmailOperationResult(

            success=success,

            message=f"Deleted {len(matching_messages)} messages" if success 

                   else "Failed to delete messages",

            affected_count=len(matching_messages) if success else 0

        )

    

    def _execute_move_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute move messages command.

        """

        account = parameters.get('account', 'default')

        source_folder = parameters.get('folder', 'Inbox')

        destination_folder = parameters.get('destination_folder')

        regex_pattern = parameters.get('regex_pattern')

        receiver = parameters.get('receiver')

        

        if not destination_folder:

            return EmailOperationResult(

                success=False,

                message="Destination folder required for move operation"

            )

        

        # Get messages from source folder

        messages = self.client.get_messages(account, source_folder)

        matching_messages = []

        

        if regex_pattern:

            import re

            pattern = re.compile(regex_pattern, re.IGNORECASE)

            matching_messages = [

                msg for msg in messages

                if pattern.search(msg.body) or pattern.search(msg.subject)

            ]

        elif receiver:

            matching_messages = [

                msg for msg in messages

                if receiver.lower() in [r.lower() for r in msg.recipients]

            ]

        

        if not matching_messages:

            return EmailOperationResult(

                success=True,

                message="No messages found matching criteria",

                affected_count=0

            )

        

        # Move matching messages

        message_ids = [msg.message_id for msg in matching_messages]

        success = self.client.move_messages(

            message_ids, source_folder, destination_folder, account

        )

        

        return EmailOperationResult(

            success=success,

            message=f"Moved {len(matching_messages)} messages" if success 

                   else "Failed to move messages",

            affected_count=len(matching_messages) if success else 0

        )

    

    def _execute_list_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute list messages command.

        """

        account = parameters.get('account', 'default')

        folder = parameters.get('folder', 'Inbox')

        sender = parameters.get('sender')

        

        messages = self.client.get_messages(account, folder)

        

        if sender:

            matching_messages = [

                msg for msg in messages

                if sender.lower() in msg.sender.lower()

            ]

        else:

            matching_messages = messages

        

        return EmailOperationResult(

            success=True,

            message=f"Found {len(matching_messages)} messages",

            affected_count=len(matching_messages),

            data=matching_messages

        )

    

    def _execute_search_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute search messages command.

        """

        account = parameters.get('account', 'default')

        query = parameters.get('query', '')

        folder = parameters.get('folder')

        

        matching_messages = self.client.search_messages(account, query, folder)

        

        return EmailOperationResult(

            success=True,

            message=f"Search found {len(matching_messages)} messages",

            affected_count=len(matching_messages),

            data=matching_messages

        )

    

    def _execute_create_account_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        """

        Execute create account command.

        """

        email_address = parameters.get('email_address')

        

        if not email_address:

            return EmailOperationResult(

                success=False,

                message="Email address required for account creation"

            )

        

        # Create basic account configuration

        account_config = EmailAccount(

            email_address=email_address,

            display_name=email_address.split('@')[0],

            account_type='IMAP',

            server_settings={},

            is_enabled=True,

            folders=[]

        )

        

        success = self.client.create_account(account_config)

        

        return EmailOperationResult(

            success=success,

            message=f"Created account for {email_address}" if success 

                   else "Failed to create account",

            affected_count=1 if success else 0

        )


Apple Mail Integration Implementation


The Apple Mail adapter implements the email abstraction interface using AppleScript and Objective-C bridges to interact with the Mail.app application. This implementation demonstrates how vendor-specific functionality integrates with the abstract interface.


import subprocess

import json

import re

from typing import List, Dict, Optional, Any

from datetime import datetime

import objc

from Foundation import NSAppleScript, NSString

from AppKit import NSWorkspace


class AppleMailClient(EmailClientInterface):

    """

    Apple Mail implementation of the EmailClientInterface.

    Uses AppleScript and Objective-C runtime for Mail.app integration.

    """

    

    def __init__(self):

        """

        Initialize Apple Mail client interface.

        """

        self.app_name = "Mail"

        self.is_connected = False

        self.applescript_runner = AppleScriptRunner()

    

    def connect(self) -> bool:

        """

        Establish connection to Apple Mail application.

        """

        try:

            # Check if Mail.app is running

            workspace = NSWorkspace.sharedWorkspace()

            running_apps = workspace.runningApplications()

            

            mail_running = any(

                app.bundleIdentifier() == "com.apple.mail" 

                for app in running_apps

            )

            

            if not mail_running:

                # Launch Mail.app

                workspace.launchApplication_(self.app_name)

                # Wait for application to start

                import time

                time.sleep(3)

            

            # Test connection with a simple AppleScript command

            test_script = 'tell application "Mail" to get name of every account'

            result = self.applescript_runner.run_script(test_script)

            

            self.is_connected = result.success

            return self.is_connected

            

        except Exception as e:

            print(f"Failed to connect to Apple Mail: {e}")

            return False

    

    def disconnect(self) -> None:

        """

        Close connection to Apple Mail.

        """

        self.is_connected = False

    

    def get_accounts(self) -> List[EmailAccount]:

        """

        Retrieve all configured email accounts from Apple Mail.

        """

        script = '''

        tell application "Mail"

            set accountList to {}

            repeat with acc in every account

                set accountInfo to {name of acc, user name of acc, email addresses of acc}

                set end of accountList to accountInfo

            end repeat

            return accountList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        accounts = []

        try:

            # Parse AppleScript result

            account_data = self._parse_applescript_list(result.output)

            

            for acc_info in account_data:

                if len(acc_info) >= 3:

                    account = EmailAccount(

                        email_address=acc_info[2][0] if acc_info[2] else acc_info[1],

                        display_name=acc_info[0],

                        account_type="Apple Mail",

                        server_settings={},

                        is_enabled=True,

                        folders=self._get_account_folders(acc_info[0])

                    )

                    accounts.append(account)

        except Exception as e:

            print(f"Error parsing account data: {e}")

        

        return accounts

    

    def get_folders(self, account: str) -> List[EmailFolder]:

        """

        Get folder structure for a specific account.

        """

        return self._get_account_folders(account)

    

    def _get_account_folders(self, account_name: str) -> List[EmailFolder]:

        """

        Helper method to retrieve folders for an account.

        """

        script = f'''

        tell application "Mail"

            set folderList to {{}}

            try

                set acc to account "{account_name}"

                repeat with mbox in every mailbox of acc

                    set folderInfo to {{name of mbox, (count of messages of mbox), (count of (messages of mbox whose read status is false))}}

                    set end of folderList to folderInfo

                end repeat

            end try

            return folderList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        folders = []

        try:

            folder_data = self._parse_applescript_list(result.output)

            

            for folder_info in folder_data:

                if len(folder_info) >= 3:

                    folder = EmailFolder(

                        name=folder_info[0],

                        path=folder_info[0],

                        message_count=int(folder_info[1]),

                        unread_count=int(folder_info[2]),

                        subfolders=[],

                        account=account_name

                    )

                    folders.append(folder)

        except Exception as e:

            print(f"Error parsing folder data: {e}")

        

        return folders

    

    def get_messages(self, account: str, folder: str, 

                    limit: Optional[int] = None,

                    filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:

        """

        Retrieve messages from a specific folder in Apple Mail.

        """

        limit_clause = f"items 1 thru {limit} of " if limit else ""

        

        script = f'''

        tell application "Mail"

            set messageList to {{}}

            try

                set acc to account "{account}"

                set mbox to mailbox "{folder}" of acc

                set msgs to {limit_clause}(every message of mbox)

                

                repeat with msg in msgs

                    set msgInfo to {{id of msg, subject of msg, sender of msg, ¬

                                   content of msg, date received of msg, ¬

                                   read status of msg, flagged status of msg}}

                    set end of messageList to msgInfo

                end repeat

            end try

            return messageList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        messages = []

        try:

            message_data = self._parse_applescript_list(result.output)

            

            for msg_info in message_data:

                if len(msg_info) >= 7:

                    # Parse recipients separately

                    recipients = self._get_message_recipients(msg_info[0])

                    

                    message = EmailMessage(

                        message_id=str(msg_info[0]),

                        subject=msg_info[1] or "",

                        sender=msg_info[2] or "",

                        recipients=recipients.get('to', []),

                        cc_recipients=recipients.get('cc', []),

                        bcc_recipients=recipients.get('bcc', []),

                        body=msg_info[3] or "",

                        html_body=None,

                        date_sent=self._parse_applescript_date(msg_info[4]),

                        date_received=self._parse_applescript_date(msg_info[4]),

                        folder=folder,

                        account=account,

                        is_read=bool(msg_info[5]),

                        is_flagged=bool(msg_info[6]),

                        attachments=[],

                        headers={}

                    )

                    messages.append(message)

        except Exception as e:

            print(f"Error parsing message data: {e}")

        

        return messages

    

    def _get_message_recipients(self, message_id: str) -> Dict[str, List[str]]:

        """

        Get detailed recipient information for a message.

        """

        script = f'''

        tell application "Mail"

            try

                set msg to message id {message_id}

                set toRecipients to {{}}

                set ccRecipients to {{}}

                

                repeat with recip in to recipients of msg

                    set end of toRecipients to address of recip

                end repeat

                

                repeat with recip in cc recipients of msg

                    set end of ccRecipients to address of recip

                end repeat

                

                return {{toRecipients, ccRecipients}}

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return {'to': [], 'cc': [], 'bcc': []}

        

        try:

            recipient_data = self._parse_applescript_list(result.output)

            return {

                'to': recipient_data[0] if len(recipient_data) > 0 else [],

                'cc': recipient_data[1] if len(recipient_data) > 1 else [],

                'bcc': []  # BCC recipients not accessible via AppleScript

            }

        except:

            return {'to': [], 'cc': [], 'bcc': []}

    

    def search_messages(self, account: str, query: str,

                       folder: Optional[str] = None) -> List[EmailMessage]:

        """

        Search for messages in Apple Mail using Spotlight integration.

        """

        folder_clause = f'in mailbox "{folder}" of account "{account}"' if folder else f'in account "{account}"'

        

        script = f'''

        tell application "Mail"

            set searchResults to {{}}

            try

                set msgs to (every message {folder_clause} whose subject contains "{query}" or content contains "{query}")

                

                repeat with msg in msgs

                    set msgInfo to {{id of msg, subject of msg, sender of msg, ¬

                                   content of msg, date received of msg, ¬

                                   read status of msg, flagged status of msg, ¬

                                   name of mailbox of msg}}

                    set end of searchResults to msgInfo

                end repeat

            end try

            return searchResults

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        messages = []

        try:

            message_data = self._parse_applescript_list(result.output)

            

            for msg_info in message_data:

                if len(msg_info) >= 8:

                    recipients = self._get_message_recipients(msg_info[0])

                    

                    message = EmailMessage(

                        message_id=str(msg_info[0]),

                        subject=msg_info[1] or "",

                        sender=msg_info[2] or "",

                        recipients=recipients.get('to', []),

                        cc_recipients=recipients.get('cc', []),

                        bcc_recipients=recipients.get('bcc', []),

                        body=msg_info[3] or "",

                        html_body=None,

                        date_sent=self._parse_applescript_date(msg_info[4]),

                        date_received=self._parse_applescript_date(msg_info[4]),

                        folder=msg_info[7] if len(msg_info) > 7 else folder or "Unknown",

                        account=account,

                        is_read=bool(msg_info[5]),

                        is_flagged=bool(msg_info[6]),

                        attachments=[],

                        headers={}

                    )

                    messages.append(message)

        except Exception as e:

            print(f"Error parsing search results: {e}")

        

        return messages

    

    def delete_messages(self, message_ids: List[str], account: str) -> bool:

        """

        Delete specified messages in Apple Mail.

        """

        if not message_ids:

            return True

        

        # Convert message IDs to AppleScript list format

        id_list = ", ".join(message_ids)

        

        script = f'''

        tell application "Mail"

            try

                set msgIds to {{{id_list}}}

                repeat with msgId in msgIds

                    set msg to message id msgId

                    delete msg

                end repeat

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def move_messages(self, message_ids: List[str], 

                     source_folder: str, destination_folder: str,

                     account: str) -> bool:

        """

        Move messages between folders in Apple Mail.

        """

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        

        script = f'''

        tell application "Mail"

            try

                set acc to account "{account}"

                set destBox to mailbox "{destination_folder}" of acc

                set msgIds to {{{id_list}}}

                

                repeat with msgId in msgIds

                    set msg to message id msgId

                    set mailbox of msg to destBox

                end repeat

                return true

            on error errMsg

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def create_folder(self, folder_name: str, parent_folder: str,

                     account: str) -> bool:

        """

        Create a new mailbox in Apple Mail.

        """

        script = f'''

        tell application "Mail"

            try

                set acc to account "{account}"

                if "{parent_folder}" is not "" then

                    set parentBox to mailbox "{parent_folder}" of acc

                    make new mailbox with properties {{name:"{folder_name}"}} at parentBox

                else

                    make new mailbox with properties {{name:"{folder_name}"}} at acc

                end if

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def mark_as_read(self, message_ids: List[str], account: str) -> bool:

        """

        Mark messages as read in Apple Mail.

        """

        return self._set_read_status(message_ids, True)

    

    def mark_as_unread(self, message_ids: List[str], account: str) -> bool:

        """

        Mark messages as unread in Apple Mail.

        """

        return self._set_read_status(message_ids, False)

    

    def _set_read_status(self, message_ids: List[str], read_status: bool) -> bool:

        """

        Helper method to set read status of messages.

        """

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        status_value = "true" if read_status else "false"

        

        script = f'''

        tell application "Mail"

            try

                set msgIds to {{{id_list}}}

                repeat with msgId in msgIds

                    set msg to message id msgId

                    set read status of msg to {status_value}

                end repeat

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def create_account(self, account_config: EmailAccount) -> bool:

        """

        Create a new email account in Apple Mail.

        Note: This operation typically requires user interaction

        and may not be fully automatable via AppleScript.

        """

        # Apple Mail account creation usually requires manual setup

        # or configuration profiles. This is a simplified implementation.

        print(f"Account creation for {account_config.email_address} requires manual setup in Apple Mail")

        return False

    

    def _parse_applescript_list(self, output: str) -> List[Any]:

        """

        Parse AppleScript list output into Python data structures.

        """

        try:

            # Remove AppleScript list formatting and convert to Python-parseable format

            cleaned = output.strip()

            if cleaned.startswith('{') and cleaned.endswith('}'):

                cleaned = cleaned[1:-1]  # Remove outer braces

            

            # This is a simplified parser - a production implementation

            # would need more robust AppleScript result parsing

            items = []

            current_item = ""

            brace_count = 0

            in_quotes = False

            

            for char in cleaned:

                if char == '"' and (not current_item or current_item[-1] != '\\'):

                    in_quotes = not in_quotes

                elif char == '{' and not in_quotes:

                    brace_count += 1

                elif char == '}' and not in_quotes:

                    brace_count -= 1

                elif char == ',' and brace_count == 0 and not in_quotes:

                    items.append(current_item.strip().strip('"'))

                    current_item = ""

                    continue

                

                current_item += char

            

            if current_item.strip():

                items.append(current_item.strip().strip('"'))

            

            return items

        except Exception as e:

            print(f"Error parsing AppleScript output: {e}")

            return []

    

    def _parse_applescript_date(self, date_str: str) -> datetime:

        """

        Parse AppleScript date format into Python datetime.

        """

        try:

            # AppleScript dates are typically in format like:

            # "date \"Wednesday, January 1, 2025 at 12:00:00 PM\""

            import dateutil.parser

            return dateutil.parser.parse(date_str)

        except:

            return datetime.now()


class AppleScriptRunner:

    """

    Utility class for executing AppleScript commands.

    """

    

    def __init__(self):

        self.timeout = 30  # seconds

    

    def run_script(self, script: str) -> 'AppleScriptResult':

        """

        Execute an AppleScript and return the result.

        """

        try:

            # Use NSAppleScript for better integration

            applescript = NSAppleScript.alloc().initWithSource_(script)

            result, error = applescript.executeAndReturnError_(None)

            

            if error:

                return AppleScriptResult(

                    success=False,

                    output="",

                    error=str(error)

                )

            

            output = str(result.stringValue()) if result else ""

            return AppleScriptResult(

                success=True,

                output=output,

                error=""

            )

            

        except Exception as e:

            return AppleScriptResult(

                success=False,

                output="",

                error=str(e)

            )


class AppleScriptResult:

    """

    Result object for AppleScript execution.

    """

    def __init__(self, success: bool, output: str, error: str):

        self.success = success

        self.output = output

        self.error = error


Security and Confirmation System


The security layer ensures that potentially destructive operations require explicit user confirmation and that all operations are performed with appropriate permissions. This component also handles authentication and access control.


import hashlib

import json

import getpass

from typing import Dict, List, Optional, Callable

from datetime import datetime, timedelta

from dataclasses import dataclass

from enum import Enum


class SecurityLevel(Enum):

    LOW = "low"

    MEDIUM = "medium"

    HIGH = "high"

    CRITICAL = "critical"


class OperationType(Enum):

    READ = "read"

    MODIFY = "modify"

    DELETE = "delete"

    CREATE = "create"

    MOVE = "move"


@dataclass

class SecurityPolicy:

    """

    Defines security requirements for different operation types.

    """

    operation_type: OperationType

    security_level: SecurityLevel

    requires_confirmation: bool

    requires_authentication: bool

    max_batch_size: Optional[int]

    confirmation_message: str


@dataclass

class UserSession:

    """

    Represents an authenticated user session.

    """

    user_id: str

    session_token: str

    created_at: datetime

    last_activity: datetime

    permissions: List[str]

    is_authenticated: bool


class SecurityManager:

    """

    Manages security policies, user authentication, and operation confirmation.

    """

    

    def __init__(self):

        """

        Initialize security manager with default policies.

        """

        self.policies = self._initialize_security_policies()

        self.current_session: Optional[UserSession] = None

        self.confirmation_callbacks: Dict[str, Callable] = {}

        self.session_timeout = timedelta(hours=2)

    

    def _initialize_security_policies(self) -> Dict[ActionType, SecurityPolicy]:

        """

        Initialize default security policies for different operations.

        """

        return {

            ActionType.LIST_MESSAGES: SecurityPolicy(

                operation_type=OperationType.READ,

                security_level=SecurityLevel.LOW,

                requires_confirmation=False,

                requires_authentication=True,

                max_batch_size=None,

                confirmation_message=""

            ),

            ActionType.SEARCH_MESSAGES: SecurityPolicy(

                operation_type=OperationType.READ,

                security_level=SecurityLevel.LOW,

                requires_confirmation=False,

                requires_authentication=True,

                max_batch_size=None,

                confirmation_message=""

            ),

            ActionType.MOVE_MESSAGES: SecurityPolicy(

                operation_type=OperationType.MODIFY,

                security_level=SecurityLevel.MEDIUM,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=100,

                confirmation_message="This will move {count} messages to {destination}. Continue?"

            ),

            ActionType.DELETE_MESSAGES: SecurityPolicy(

                operation_type=OperationType.DELETE,

                security_level=SecurityLevel.HIGH,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=50,

                confirmation_message="This will permanently delete {count} messages. This action cannot be undone. Continue?"

            ),

            ActionType.CREATE_ACCOUNT: SecurityPolicy(

                operation_type=OperationType.CREATE,

                security_level=SecurityLevel.CRITICAL,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=1,

                confirmation_message="This will create a new email account for {email}. Continue?"

            )

        }

    

    def authenticate_user(self, username: str, password: str) -> bool:

        """

        Authenticate user and create session.

        

        Args:

            username: User identifier

            password: User password

            

        Returns:

            True if authentication successful, False otherwise

        """

        # In a production system, this would verify against a secure credential store

        # For this example, we'll use a simple hash-based verification

        

        try:

            # Generate session token

            session_data = f"{username}:{datetime.now().isoformat()}"

            session_token = hashlib.sha256(session_data.encode()).hexdigest()

            

            # Create user session

            self.current_session = UserSession(

                user_id=username,

                session_token=session_token,

                created_at=datetime.now(),

                last_activity=datetime.now(),

                permissions=["email:read", "email:write", "email:delete"],

                is_authenticated=True

            )

            

            return True

            

        except Exception as e:

            print(f"Authentication failed: {e}")

            return False

    

    def is_session_valid(self) -> bool:

        """

        Check if current session is valid and not expired.

        

        Returns:

            True if session is valid, False otherwise

        """

        if not self.current_session or not self.current_session.is_authenticated:

            return False

        

        # Check session timeout

        time_since_activity = datetime.now() - self.current_session.last_activity

        if time_since_activity > self.session_timeout:

            self.current_session = None

            return False

        

        # Update last activity

        self.current_session.last_activity = datetime.now()

        return True

    

    def check_operation_permission(self, command: EmailCommand) -> bool:

        """

        Check if current user has permission to execute the command.

        

        Args:

            command: EmailCommand to check permissions for

            

        Returns:

            True if operation is permitted, False otherwise

        """

        if not self.is_session_valid():

            return False

        

        policy = self.policies.get(command.action)

        if not policy:

            return False

        

        # Check if user has required permissions

        required_permission = f"email:{policy.operation_type.value}"

        if required_permission not in self.current_session.permissions:

            return False

        

        return True

    

    def require_confirmation(self, command: EmailCommand, 

                           affected_count: int = 0,

                           additional_context: Dict[str, str] = None) -> bool:

        """

        Handle confirmation requirement for operations.

        

        Args:

            command: EmailCommand requiring confirmation

            affected_count: Number of items that will be affected

            additional_context: Additional context for confirmation message

            

        Returns:

            True if user confirms, False otherwise

        """

        policy = self.policies.get(command.action)

        if not policy or not policy.requires_confirmation:

            return True

        

        # Check batch size limits

        if policy.max_batch_size and affected_count > policy.max_batch_size:

            print(f"Operation exceeds maximum batch size of {policy.max_batch_size}")

            return False

        

        # Format confirmation message

        context = additional_context or {}

        context.update({

            'count': str(affected_count),

            'action': command.action.value

        })

        

        # Add command-specific context

        if command.action == ActionType.MOVE_MESSAGES:

            context['destination'] = command.parameters.get('destination_folder', 'Unknown')

        elif command.action == ActionType.CREATE_ACCOUNT:

            context['email'] = command.parameters.get('email_address', 'Unknown')

        

        confirmation_message = policy.confirmation_message.format(**context)

        

        # Get user confirmation

        return self._get_user_confirmation(confirmation_message, policy.security_level)

    

    def _get_user_confirmation(self, message: str, security_level: SecurityLevel) -> bool:

        """

        Get user confirmation for an operation.

        

        Args:

            message: Confirmation message to display

            security_level: Security level of the operation

            

        Returns:

            True if user confirms, False otherwise

        """

        print(f"\n[{security_level.value.upper()}] CONFIRMATION REQUIRED")

        print(f"{message}")

        

        if security_level == SecurityLevel.CRITICAL:

            print("This is a critical operation. Please type 'CONFIRM' to proceed:")

            response = input("> ").strip()

            return response == "CONFIRM"

        else:

            print("Type 'y' or 'yes' to confirm, any other input to cancel:")

            response = input("> ").strip().lower()

            return response in ['y', 'yes']

    

    def log_operation(self, command: EmailCommand, result: EmailOperationResult) -> None:

        """

        Log security-relevant operations for audit purposes.

        

        Args:

            command: EmailCommand that was executed

            result: Result of the operation

        """

        log_entry = {

            'timestamp': datetime.now().isoformat(),

            'user_id': self.current_session.user_id if self.current_session else 'unknown',

            'action': command.action.value,

            'parameters': command.parameters,

            'success': result.success,

            'affected_count': result.affected_count,

            'message': result.message

        }

        

        # In a production system, this would write to a secure audit log

        print(f"AUDIT LOG: {json.dumps(log_entry, indent=2)}")

    

    def validate_command_parameters(self, command: EmailCommand) -> bool:

        """

        Validate command parameters for security issues.

        

        Args:

            command: EmailCommand to validate

            

        Returns:

            True if parameters are valid, False otherwise

        """

        # Check for potentially dangerous patterns

        dangerous_patterns = [

            r'\.\./',  # Directory traversal

            r'<script',  # Script injection

            r'javascript:',  # JavaScript execution

            r'file://',  # File protocol access

        ]

        

        # Validate all string parameters

        for key, value in command.parameters.items():

            if isinstance(value, str):

                for pattern in dangerous_patterns:

                    if re.search(pattern, value, re.IGNORECASE):

                        print(f"Security violation: Dangerous pattern detected in parameter '{key}'")

                        return False

        

        # Validate email addresses

        email_fields = ['email_address', 'sender', 'receiver']

        for field in email_fields:

            if field in command.parameters:

                email = command.parameters[field]

                if isinstance(email, str) and not self._is_valid_email(email):

                    print(f"Invalid email address in parameter '{field}': {email}")

                    return False

        

        return True

    

    def _is_valid_email(self, email: str) -> bool:

        """

        Validate email address format.

        

        Args:

            email: Email address to validate

            

        Returns:

            True if email format is valid, False otherwise

        """

        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

        return re.match(email_pattern, email) is not None


class SecureEmailManager(EmailManager):

    """

    Extended EmailManager with integrated security controls.

    """

    

    def __init__(self, client: EmailClientInterface, security_manager: SecurityManager):

        """

        Initialize secure email manager.

        

        Args:

            client: EmailClientInterface implementation

            security_manager: SecurityManager instance

        """

        super().__init__(client)

        self.security_manager = security_manager

    

    def execute_command(self, command: EmailCommand) -> EmailOperationResult:

        """

        Execute email command with security controls.

        

        Args:

            command: EmailCommand to execute

            

        Returns:

            EmailOperationResult with security validation

        """

        # Validate session

        if not self.security_manager.is_session_valid():

            return EmailOperationResult(

                success=False,

                message="Authentication required"

            )

        

        # Check permissions

        if not self.security_manager.check_operation_permission(command):

            return EmailOperationResult(

                success=False,

                message="Insufficient permissions for this operation"

            )

        

        # Validate command parameters

        if not self.security_manager.validate_command_parameters(command):

            return EmailOperationResult(

                success=False,

                message="Invalid or potentially dangerous parameters"

            )

        

        # For operations that modify data, get affected count for confirmation

        affected_count = 0

        if command.action in [ActionType.DELETE_MESSAGES, ActionType.MOVE_MESSAGES]:

            affected_count = self._estimate_affected_count(command)

        

        # Handle confirmation requirement

        if not self.security_manager.require_confirmation(command, affected_count):

            return EmailOperationResult(

                success=False,

                message="Operation cancelled by user"

            )

        

        # Execute the command

        result = super().execute_command(command)

        

        # Log the operation

        self.security_manager.log_operation(command, result)

        

        return result

    

    def _estimate_affected_count(self, command: EmailCommand) -> int:

        """

        Estimate number of items that will be affected by the command.

        

        Args:

            command: EmailCommand to estimate for

            

        Returns:

            Estimated number of affected items

        """

        try:

            if command.action == ActionType.DELETE_MESSAGES:

                account = command.parameters.get('account', 'default')

                folder = command.parameters.get('folder', 'Inbox')

                subject = command.parameters.get('subject')

                

                if subject:

                    messages = self.client.get_messages(account, folder)

                    return len([

                        msg for msg in messages 

                        if subject.lower() in msg.subject.lower()

                    ])

            

            elif command.action == ActionType.MOVE_MESSAGES:

                account = command.parameters.get('account', 'default')

                folder = command.parameters.get('folder', 'Inbox')

                regex_pattern = command.parameters.get('regex_pattern')

                receiver = command.parameters.get('receiver')

                

                messages = self.client.get_messages(account, folder)

                

                if regex_pattern:

                    import re

                    pattern = re.compile(regex_pattern, re.IGNORECASE)

                    return len([

                        msg for msg in messages

                        if pattern.search(msg.body) or pattern.search(msg.subject)

                    ])

                elif receiver:

                    return len([

                        msg for msg in messages

                        if receiver.lower() in [r.lower() for r in msg.recipients]

                    ])

        

        except Exception as e:

            print(f"Error estimating affected count: {e}")

        

        return 0


Main Application Integration


The main application component orchestrates all the system components and provides the primary user interface for the natural language email management system.


import sys

import os

from typing import Optional

import argparse


class EmailAssistant:

    """

    Main application class that integrates all components of the

    natural language email management system.

    """

    

    def __init__(self, llm_model_path: str, email_client_type: str = "apple_mail"):

        """

        Initialize the email assistant application.

        

        Args:

            llm_model_path: Path to the local LLM model file

            email_client_type: Type of email client to use

        """

        self.llm_model_path = llm_model_path

        self.email_client_type = email_client_type

        

        # Initialize components

        self.nlp_processor = None

        self.email_client = None

        self.security_manager = None

        self.email_manager = None

        

        self._initialize_components()

    

    def _initialize_components(self) -> None:

        """

        Initialize all system components.

        """

        try:

            # Initialize NLP processor

            print("Initializing natural language processor...")

            self.nlp_processor = NaturalLanguageProcessor(self.llm_model_path)

            

            # Initialize email client

            print(f"Initializing {self.email_client_type} client...")

            if self.email_client_type.lower() == "apple_mail":

                self.email_client = AppleMailClient()

            else:

                raise ValueError(f"Unsupported email client type: {self.email_client_type}")

            

            # Initialize security manager

            print("Initializing security manager...")

            self.security_manager = SecurityManager()

            

            # Initialize email manager

            print("Initializing email manager...")

            self.email_manager = SecureEmailManager(self.email_client, self.security_manager)

            

            print("All components initialized successfully.")

            

        except Exception as e:

            print(f"Failed to initialize components: {e}")

            sys.exit(1)

    

    def start(self) -> None:

        """

        Start the email assistant application.

        """

        print("\n" + "="*60)

        print("Natural Language Email Assistant")

        print("="*60)

        print("This assistant allows you to manage your email using natural language commands.")

        print("Type 'help' for available commands or 'quit' to exit.\n")

        

        # Authenticate user

        if not self._authenticate_user():

            print("Authentication failed. Exiting.")

            return

        

        # Initialize email client connection

        if not self.email_manager.initialize():

            print("Failed to connect to email client. Exiting.")

            return

        

        print("Email assistant ready. You can now enter commands.\n")

        

        # Main command loop

        self._command_loop()

    

    def _authenticate_user(self) -> bool:

        """

        Handle user authentication.

        

        Returns:

            True if authentication successful, False otherwise

        """

        print("Authentication required to access email functions.")

        

        max_attempts = 3

        for attempt in range(max_attempts):

            try:

                username = input("Username: ").strip()

                password = getpass.getpass("Password: ")

                

                if self.security_manager.authenticate_user(username, password):

                    print("Authentication successful.")

                    return True

                else:

                    remaining = max_attempts - attempt - 1

                    if remaining > 0:

                        print(f"Authentication failed. {remaining} attempts remaining.")

                    else:

                        print("Authentication failed. Maximum attempts exceeded.")

                        

            except KeyboardInterrupt:

                print("\nAuthentication cancelled.")

                return False

            except Exception as e:

                print(f"Authentication error: {e}")

        

        return False

    

    def _command_loop(self) -> None:

        """

        Main command processing loop.

        """

        while True:

            try:

                # Get user input

                user_input = input("Email Assistant> ").strip()

                

                if not user_input:

                    continue

                

                # Handle special commands

                if user_input.lower() in ['quit', 'exit', 'q']:

                    print("Goodbye!")

                    break

                elif user_input.lower() == 'help':

                    self._show_help()

                    continue

                elif user_input.lower() == 'status':

                    self._show_status()

                    continue

                elif user_input.lower().startswith('accounts'):

                    self._show_accounts()

                    continue

                

                # Process natural language command

                self._process_command(user_input)

                

            except KeyboardInterrupt:

                print("\nUse 'quit' to exit.")

            except Exception as e:

                print(f"Error processing command: {e}")

    

    def _process_command(self, user_input: str) -> None:

        """

        Process a natural language command.

        

        Args:

            user_input: Raw user command string

        """

        try:

            print("Processing command...")

            

            # Parse natural language input

            command = self.nlp_processor.process_command(user_input)

            

            print(f"Interpreted action: {command.action.value}")

            print(f"Confidence: {command.confidence:.2f}")

            

            if command.confidence < 0.5:

                print("Low confidence in command interpretation. Please rephrase your request.")

                return

            

            # Execute command

            result = self.email_manager.execute_command(command)

            

            # Display result

            self._display_result(result)

            

        except Exception as e:

            print(f"Failed to process command: {e}")

    

    def _display_result(self, result: EmailOperationResult) -> None:

        """

        Display the result of an email operation.

        

        Args:

            result: EmailOperationResult to display

        """

        if result.success:

            print(f"✓ {result.message}")

            if result.affected_count > 0:

                print(f"  Affected items: {result.affected_count}")

            

            # Display additional data if available

            if result.data and isinstance(result.data, list):

                if len(result.data) <= 10:  # Show details for small result sets

                    for item in result.data:

                        if isinstance(item, EmailMessage):

                            print(f"  - {item.subject} (from: {item.sender})")

                else:

                    print(f"  ({len(result.data)} items total - use 'list' command for details)")

        else:

            print(f"✗ {result.message}")

    

    def _show_help(self) -> None:

        """

        Display help information.

        """

        help_text = """

Available Commands:

==================


Natural Language Commands:

- "Delete all mails with subject 'meeting' from folder 'Inbox'"

- "Move all mails containing 'urgent' to folder 'Priority'"

- "List all mails in account 'work@company.com' sent by 'John Smith'"

- "Create a new mail account for 'personal@gmail.com'"

- "Move mails with receiver 'team@company.com' to folder 'Team'"

- "Search for emails containing 'project update'"


Special Commands:

- help          Show this help message

- status        Show system status

- accounts      List configured email accounts

- quit/exit/q   Exit the application


Examples:

- "Show me all unread emails"

- "Delete spam emails from last week"

- "Move all newsletters to the Archive folder"

- "Find emails from my manager about the quarterly report"


Tips:

- Be specific about folders, subjects, and senders

- Use quotes around multi-word subjects or names

- The system will ask for confirmation before destructive operations

        """

        print(help_text)

    

    def _show_status(self) -> None:

        """

        Display system status information.

        """

        print("\nSystem Status:")

        print("=" * 30)

        

        # Session status

        if self.security_manager.is_session_valid():

            session = self.security_manager.current_session

            print(f"User: {session.user_id}")

            print(f"Session active since: {session.created_at.strftime('%Y-%m-%d %H:%M:%S')}")

            print(f"Last activity: {session.last_activity.strftime('%Y-%m-%d %H:%M:%S')}")

        else:

            print("No active session")

        

        # Email client status

        print(f"Email client: {self.email_client_type}")

        print(f"Connected: {'Yes' if self.email_manager.connected else 'No'}")

        

        # Model status

        print(f"LLM model: {os.path.basename(self.llm_model_path)}")

        print()

    

    def _show_accounts(self) -> None:

        """

        Display configured email accounts.

        """

        try:

            accounts = self.email_client.get_accounts()

            

            if not accounts:

                print("No email accounts configured.")

                return

            

            print("\nConfigured Email Accounts:")

            print("=" * 40)

            

            for account in accounts:

                print(f"Account: {account.display_name}")

                print(f"  Email: {account.email_address}")

                print(f"  Type: {account.account_type}")

                print(f"  Enabled: {'Yes' if account.is_enabled else 'No'}")

                

                if account.folders:

                    print(f"  Folders: {len(account.folders)}")

                    for folder in account.folders[:5]:  # Show first 5 folders

                        print(f"    - {folder.name} ({folder.message_count} messages)")

                    if len(account.folders) > 5:

                        print(f"    ... and {len(account.folders) - 5} more")

                print()

                

        except Exception as e:

            print(f"Failed to retrieve account information: {e}")


def main():

    """

    Main entry point for the email assistant application.

    """

    parser = argparse.ArgumentParser(

        description="Natural Language Email Assistant",

        formatter_class=argparse.RawDescriptionHelpFormatter,

        epilog="""

Examples:

  python email_assistant.py --model ./models/llama-7b.gguf

  python email_assistant.py --model ./models/llama-7b.gguf --client apple_mail

        """

    )

    

    parser.add_argument(

        '--model', '-m',

        required=True,

        help='Path to the local LLM model file'

    )

    

    parser.add_argument(

        '--client', '-c',

        default='apple_mail',

        choices=['apple_mail'],

        help='Email client type (default: apple_mail)'

    )

    

    parser.add_argument(

        '--debug',

        action='store_true',

        help='Enable debug output'

    )

    

    args = parser.parse_args()

    

    # Validate model file

    if not os.path.exists(args.model):

        print(f"Error: Model file not found: {args.model}")

        sys.exit(1)

    

    # Set debug mode

    if args.debug:

        import logging

        logging.basicConfig(level=logging.DEBUG)

    

    try:

        # Create and start the email assistant

        assistant = EmailAssistant(

            llm_model_path=args.model,

            email_client_type=args.client

        )

        assistant.start()

        

    except KeyboardInterrupt:

        print("\nApplication interrupted by user.")

    except Exception as e:

        print(f"Application error: {e}")

        if args.debug:

            import traceback

            traceback.print_exc()

        sys.exit(1)


if __name__ == "__main__":

    main()


Complete Running Example


The following represents a complete, functional implementation that demonstrates all the concepts discussed in this article. This example can be executed directly and provides a working natural language interface for Apple Mail.



#!/usr/bin/env python3

"""

Natural Language Email Assistant - Complete Implementation

=========================================================


A email management system that uses local LLM processing

to interpret natural language commands and execute email operations

through a vendor-agnostic abstraction layer.


Requirements:

- Python 3.8+

- llama-cpp-python

- PyObjC (for macOS/Apple Mail integration)

- dateutil


Installation:

pip install llama-cpp-python PyObjC python-dateutil


Usage:

python email_assistant.py --model path/to/your/model.gguf

"""


import json

import re

import sys

import os

import hashlib

import getpass

import subprocess

import argparse

from typing import Dict, List, Optional, Any, Tuple, Callable

from dataclasses import dataclass

from enum import Enum

from datetime import datetime, timedelta

from abc import ABC, abstractmethod


# Check for required imports

try:

    from llama_cpp import Llama

except ImportError:

    print("Error: llama-cpp-python is required. Install with: pip install llama-cpp-python")

    sys.exit(1)


try:

    import objc

    from Foundation import NSAppleScript, NSString

    from AppKit import NSWorkspace

except ImportError:

    print("Error: PyObjC is required for Apple Mail integration. Install with: pip install PyObjC")

    sys.exit(1)


try:

    import dateutil.parser

except ImportError:

    print("Error: python-dateutil is required. Install with: pip install python-dateutil")

    sys.exit(1)


# Enums and Data Classes

class ActionType(Enum):

    DELETE_MESSAGES = "delete_messages"

    MOVE_MESSAGES = "move_messages"

    LIST_MESSAGES = "list_messages"

    CREATE_ACCOUNT = "create_account"

    SEARCH_MESSAGES = "search_messages"


class SecurityLevel(Enum):

    LOW = "low"

    MEDIUM = "medium"

    HIGH = "high"

    CRITICAL = "critical"


class OperationType(Enum):

    READ = "read"

    MODIFY = "modify"

    DELETE = "delete"

    CREATE = "create"

    MOVE = "move"


@dataclass

class EmailCommand:

    action: ActionType

    parameters: Dict[str, any]

    confidence: float

    requires_confirmation: bool


@dataclass

class EmailMessage:

    message_id: str

    subject: str

    sender: str

    recipients: List[str]

    cc_recipients: List[str]

    bcc_recipients: List[str]

    body: str

    html_body: Optional[str]

    date_sent: datetime

    date_received: datetime

    folder: str

    account: str

    is_read: bool

    is_flagged: bool

    attachments: List[str]

    headers: Dict[str, str]


@dataclass

class EmailFolder:

    name: str

    path: str

    message_count: int

    unread_count: int

    subfolders: List['EmailFolder']

    account: str


@dataclass

class EmailAccount:

    email_address: str

    display_name: str

    account_type: str

    server_settings: Dict[str, Any]

    is_enabled: bool

    folders: List[EmailFolder]


@dataclass

class SecurityPolicy:

    operation_type: OperationType

    security_level: SecurityLevel

    requires_confirmation: bool

    requires_authentication: bool

    max_batch_size: Optional[int]

    confirmation_message: str


@dataclass

class UserSession:

    user_id: str

    session_token: str

    created_at: datetime

    last_activity: datetime

    permissions: List[str]

    is_authenticated: bool


# Core Classes

class NaturalLanguageProcessor:

    """Natural language processing component for email commands."""

    

    def __init__(self, llm_model_path: str):

        self.llm_model = self._load_local_llm(llm_model_path)

        self.command_patterns = self._initialize_command_patterns()

        

    def _load_local_llm(self, model_path: str):

        try:

            return Llama(

                model_path=model_path,

                n_ctx=2048,

                n_threads=4,

                verbose=False

            )

        except Exception as e:

            raise ImportError(f"Failed to load LLM model: {e}")

    

    def _initialize_command_patterns(self) -> Dict[str, str]:

        return {

            'delete_subject': r'delete.*mails?.*subject\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',

            'move_regex': r'move.*mails?.*contain\s+["\']?([^"\']+)["\']?.*folder\s+["\']?([^"\']+)["\']?',

            'list_sender': r'list.*mails?.*account\s+([^\s]+).*sent by\s+["\']?([^"\']+)["\']?',

            'create_account': r'create.*mail account.*for\s+([^\s]+)',

            'move_receiver': r'move.*mails?.*receiver\s+([^\s]+).*folder\s+["\']?([^"\']+)["\']?'

        }

    

    def process_command(self, user_input: str) -> EmailCommand:

        normalized_input = self._preprocess_text(user_input)

        prompt = self._create_interpretation_prompt(normalized_input)

        llm_response = self._query_llm(prompt)

        

        try:

            command = self._parse_llm_response(llm_response)

            if command.confidence > 0.7:

                return command

        except Exception as e:

            print(f"LLM parsing failed: {e}")

        

        return self._fallback_pattern_matching(normalized_input)

    

    def _preprocess_text(self, text: str) -> str:

        text = text.lower().strip()

        text = re.sub(r'\s+', ' ', text)

        text = text.replace("e-mail", "email").replace("e-mails", "emails")

        return text

    

    def _create_interpretation_prompt(self, user_input: str) -> str:

        return f"""

You are an email command interpreter. Parse the following user command and return a JSON response.


Supported actions:

- delete_messages: Delete emails based on criteria

- move_messages: Move emails to a different folder

- list_messages: List emails matching criteria

- create_account: Create a new email account

- search_messages: Search for emails


User command: "{user_input}"


Return JSON format:

{{

    "action": "action_type",

    "parameters": {{

        "subject": "email subject (if applicable)",

        "folder": "folder name (if applicable)",

        "sender": "sender email/name (if applicable)",

        "receiver": "receiver email (if applicable)",

        "account": "email account (if applicable)",

        "regex_pattern": "regex pattern (if applicable)",

        "destination_folder": "target folder for moves (if applicable)"

    }},

    "confidence": 0.95,

    "requires_confirmation": true

}}


JSON Response:

"""

    

    def _query_llm(self, prompt: str) -> str:

        response = self.llm_model(

            prompt,

            max_tokens=512,

            temperature=0.1,

            stop=["User command:", "\n\n"]

        )

        return response['choices'][0]['text'].strip()

    

    def _parse_llm_response(self, response: str) -> EmailCommand:

        try:

            json_start = response.find('{')

            json_end = response.rfind('}') + 1

            json_str = response[json_start:json_end]

            

            parsed = json.loads(json_str)

            

            action = ActionType(parsed['action'])

            parameters = {k: v for k, v in parsed['parameters'].items() if v}

            confidence = float(parsed.get('confidence', 0.5))

            requires_confirmation = bool(parsed.get('requires_confirmation', True))

            

            return EmailCommand(

                action=action,

                parameters=parameters,

                confidence=confidence,

                requires_confirmation=requires_confirmation

            )

        except (json.JSONDecodeError, KeyError, ValueError) as e:

            raise ValueError(f"Failed to parse LLM response: {e}")

    

    def _fallback_pattern_matching(self, text: str) -> EmailCommand:

        for pattern_name, pattern in self.command_patterns.items():

            match = re.search(pattern, text, re.IGNORECASE)

            if match:

                return self._create_command_from_pattern(pattern_name, match.groups())

        

        return EmailCommand(

            action=ActionType.SEARCH_MESSAGES,

            parameters={"query": text},

            confidence=0.3,

            requires_confirmation=True

        )

    

    def _create_command_from_pattern(self, pattern_name: str, groups: Tuple[str, ...]) -> EmailCommand:

        if pattern_name == 'delete_subject':

            return EmailCommand(

                action=ActionType.DELETE_MESSAGES,

                parameters={"subject": groups[0], "folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'move_regex':

            return EmailCommand(

                action=ActionType.MOVE_MESSAGES,

                parameters={"regex_pattern": groups[0], "destination_folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'list_sender':

            return EmailCommand(

                action=ActionType.LIST_MESSAGES,

                parameters={"account": groups[0], "sender": groups[1]},

                confidence=0.8,

                requires_confirmation=False

            )

        elif pattern_name == 'create_account':

            return EmailCommand(

                action=ActionType.CREATE_ACCOUNT,

                parameters={"email_address": groups[0]},

                confidence=0.8,

                requires_confirmation=True

            )

        elif pattern_name == 'move_receiver':

            return EmailCommand(

                action=ActionType.MOVE_MESSAGES,

                parameters={"receiver": groups[0], "destination_folder": groups[1]},

                confidence=0.8,

                requires_confirmation=True

            )

        

        return EmailCommand(

            action=ActionType.SEARCH_MESSAGES,

            parameters={"query": " ".join(groups)},

            confidence=0.5,

            requires_confirmation=True

        )


class EmailClientInterface(ABC):

    """Abstract interface for email client implementations."""

    

    @abstractmethod

    def connect(self) -> bool:

        pass

    

    @abstractmethod

    def disconnect(self) -> None:

        pass

    

    @abstractmethod

    def get_accounts(self) -> List[EmailAccount]:

        pass

    

    @abstractmethod

    def get_folders(self, account: str) -> List[EmailFolder]:

        pass

    

    @abstractmethod

    def get_messages(self, account: str, folder: str, 

                    limit: Optional[int] = None,

                    filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:

        pass

    

    @abstractmethod

    def search_messages(self, account: str, query: str,

                       folder: Optional[str] = None) -> List[EmailMessage]:

        pass

    

    @abstractmethod

    def delete_messages(self, message_ids: List[str], account: str) -> bool:

        pass

    

    @abstractmethod

    def move_messages(self, message_ids: List[str], 

                     source_folder: str, destination_folder: str,

                     account: str) -> bool:

        pass

    

    @abstractmethod

    def create_folder(self, folder_name: str, parent_folder: str,

                     account: str) -> bool:

        pass

    

    @abstractmethod

    def mark_as_read(self, message_ids: List[str], account: str) -> bool:

        pass

    

    @abstractmethod

    def mark_as_unread(self, message_ids: List[str], account: str) -> bool:

        pass

    

    @abstractmethod

    def create_account(self, account_config: EmailAccount) -> bool:

        pass


class AppleScriptResult:

    def __init__(self, success: bool, output: str, error: str):

        self.success = success

        self.output = output

        self.error = error


class AppleScriptRunner:

    def __init__(self):

        self.timeout = 30

    

    def run_script(self, script: str) -> AppleScriptResult:

        try:

            applescript = NSAppleScript.alloc().initWithSource_(script)

            result, error = applescript.executeAndReturnError_(None)

            

            if error:

                return AppleScriptResult(

                    success=False,

                    output="",

                    error=str(error)

                )

            

            output = str(result.stringValue()) if result else ""

            return AppleScriptResult(

                success=True,

                output=output,

                error=""

            )

            

        except Exception as e:

            return AppleScriptResult(

                success=False,

                output="",

                error=str(e)

            )


class AppleMailClient(EmailClientInterface):

    """Apple Mail implementation of EmailClientInterface."""

    

    def __init__(self):

        self.app_name = "Mail"

        self.is_connected = False

        self.applescript_runner = AppleScriptRunner()

    

    def connect(self) -> bool:

        try:

            workspace = NSWorkspace.sharedWorkspace()

            running_apps = workspace.runningApplications()

            

            mail_running = any(

                app.bundleIdentifier() == "com.apple.mail" 

                for app in running_apps

            )

            

            if not mail_running:

                workspace.launchApplication_(self.app_name)

                import time

                time.sleep(3)

            

            test_script = 'tell application "Mail" to get name of every account'

            result = self.applescript_runner.run_script(test_script)

            

            self.is_connected = result.success

            return self.is_connected

            

        except Exception as e:

            print(f"Failed to connect to Apple Mail: {e}")

            return False

    

    def disconnect(self) -> None:

        self.is_connected = False

    

    def get_accounts(self) -> List[EmailAccount]:

        script = '''

        tell application "Mail"

            set accountList to {}

            repeat with acc in every account

                set accountInfo to {name of acc, user name of acc, email addresses of acc}

                set end of accountList to accountInfo

            end repeat

            return accountList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        accounts = []

        try:

            account_data = self._parse_applescript_list(result.output)

            

            for acc_info in account_data:

                if len(acc_info) >= 3:

                    account = EmailAccount(

                        email_address=acc_info[2][0] if acc_info[2] else acc_info[1],

                        display_name=acc_info[0],

                        account_type="Apple Mail",

                        server_settings={},

                        is_enabled=True,

                        folders=self._get_account_folders(acc_info[0])

                    )

                    accounts.append(account)

        except Exception as e:

            print(f"Error parsing account data: {e}")

        

        return accounts

    

    def get_folders(self, account: str) -> List[EmailFolder]:

        return self._get_account_folders(account)

    

    def _get_account_folders(self, account_name: str) -> List[EmailFolder]:

        script = f'''

        tell application "Mail"

            set folderList to {{}}

            try

                set acc to account "{account_name}"

                repeat with mbox in every mailbox of acc

                    set folderInfo to {{name of mbox, (count of messages of mbox), (count of (messages of mbox whose read status is false))}}

                    set end of folderList to folderInfo

                end repeat

            end try

            return folderList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        folders = []

        try:

            folder_data = self._parse_applescript_list(result.output)

            

            for folder_info in folder_data:

                if len(folder_info) >= 3:

                    folder = EmailFolder(

                        name=folder_info[0],

                        path=folder_info[0],

                        message_count=int(folder_info[1]),

                        unread_count=int(folder_info[2]),

                        subfolders=[],

                        account=account_name

                    )

                    folders.append(folder)

        except Exception as e:

            print(f"Error parsing folder data: {e}")

        

        return folders

    

    def get_messages(self, account: str, folder: str, 

                    limit: Optional[int] = None,

                    filters: Optional[Dict[str, Any]] = None) -> List[EmailMessage]:

        limit_clause = f"items 1 thru {limit} of " if limit else ""

        

        script = f'''

        tell application "Mail"

            set messageList to {{}}

            try

                set acc to account "{account}"

                set mbox to mailbox "{folder}" of acc

                set msgs to {limit_clause}(every message of mbox)

                

                repeat with msg in msgs

                    set msgInfo to {{id of msg, subject of msg, sender of msg, ¬

                                   content of msg, date received of msg, ¬

                                   read status of msg, flagged status of msg}}

                    set end of messageList to msgInfo

                end repeat

            end try

            return messageList

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        messages = []

        try:

            message_data = self._parse_applescript_list(result.output)

            

            for msg_info in message_data:

                if len(msg_info) >= 7:

                    recipients = self._get_message_recipients(msg_info[0])

                    

                    message = EmailMessage(

                        message_id=str(msg_info[0]),

                        subject=msg_info[1] or "",

                        sender=msg_info[2] or "",

                        recipients=recipients.get('to', []),

                        cc_recipients=recipients.get('cc', []),

                        bcc_recipients=recipients.get('bcc', []),

                        body=msg_info[3] or "",

                        html_body=None,

                        date_sent=self._parse_applescript_date(msg_info[4]),

                        date_received=self._parse_applescript_date(msg_info[4]),

                        folder=folder,

                        account=account,

                        is_read=bool(msg_info[5]),

                        is_flagged=bool(msg_info[6]),

                        attachments=[],

                        headers={}

                    )

                    messages.append(message)

        except Exception as e:

            print(f"Error parsing message data: {e}")

        

        return messages

    

    def _get_message_recipients(self, message_id: str) -> Dict[str, List[str]]:

        script = f'''

        tell application "Mail"

            try

                set msg to message id {message_id}

                set toRecipients to {{}}

                set ccRecipients to {{}}

                

                repeat with recip in to recipients of msg

                    set end of toRecipients to address of recip

                end repeat

                

                repeat with recip in cc recipients of msg

                    set end of ccRecipients to address of recip

                end repeat

                

                return {{toRecipients, ccRecipients}}

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return {'to': [], 'cc': [], 'bcc': []}

        

        try:

            recipient_data = self._parse_applescript_list(result.output)

            return {

                'to': recipient_data[0] if len(recipient_data) > 0 else [],

                'cc': recipient_data[1] if len(recipient_data) > 1 else [],

                'bcc': []

            }

        except:

            return {'to': [], 'cc': [], 'bcc': []}

    

    def search_messages(self, account: str, query: str,

                       folder: Optional[str] = None) -> List[EmailMessage]:

        folder_clause = f'in mailbox "{folder}" of account "{account}"' if folder else f'in account "{account}"'

        

        script = f'''

        tell application "Mail"

            set searchResults to {{}}

            try

                set msgs to (every message {folder_clause} whose subject contains "{query}" or content contains "{query}")

                

                repeat with msg in msgs

                    set msgInfo to {{id of msg, subject of msg, sender of msg, ¬

                                   content of msg, date received of msg, ¬

                                   read status of msg, flagged status of msg, ¬

                                   name of mailbox of msg}}

                    set end of searchResults to msgInfo

                end repeat

            end try

            return searchResults

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        if not result.success:

            return []

        

        messages = []

        try:

            message_data = self._parse_applescript_list(result.output)

            

            for msg_info in message_data:

                if len(msg_info) >= 8:

                    recipients = self._get_message_recipients(msg_info[0])

                    

                    message = EmailMessage(

                        message_id=str(msg_info[0]),

                        subject=msg_info[1] or "",

                        sender=msg_info[2] or "",

                        recipients=recipients.get('to', []),

                        cc_recipients=recipients.get('cc', []),

                        bcc_recipients=recipients.get('bcc', []),

                        body=msg_info[3] or "",

                        html_body=None,

                        date_sent=self._parse_applescript_date(msg_info[4]),

                        date_received=self._parse_applescript_date(msg_info[4]),

                        folder=msg_info[7] if len(msg_info) > 7 else folder or "Unknown",

                        account=account,

                        is_read=bool(msg_info[5]),

                        is_flagged=bool(msg_info[6]),

                        attachments=[],

                        headers={}

                    )

                    messages.append(message)

        except Exception as e:

            print(f"Error parsing search results: {e}")

        

        return messages

    

    def delete_messages(self, message_ids: List[str], account: str) -> bool:

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        

        script = f'''

        tell application "Mail"

            try

                set msgIds to {{{id_list}}}

                repeat with msgId in msgIds

                    set msg to message id msgId

                    delete msg

                end repeat

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def move_messages(self, message_ids: List[str], 

                     source_folder: str, destination_folder: str,

                     account: str) -> bool:

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        

        script = f'''

        tell application "Mail"

            try

                set acc to account "{account}"

                set destBox to mailbox "{destination_folder}" of acc

                set msgIds to {{{id_list}}}

                

                repeat with msgId in msgIds

                    set msg to message id msgId

                    set mailbox of msg to destBox

                end repeat

                return true

            on error errMsg

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def create_folder(self, folder_name: str, parent_folder: str,

                     account: str) -> bool:

        script = f'''

        tell application "Mail"

            try

                set acc to account "{account}"

                if "{parent_folder}" is not "" then

                    set parentBox to mailbox "{parent_folder}" of acc

                    make new mailbox with properties {{name:"{folder_name}"}} at parentBox

                else

                    make new mailbox with properties {{name:"{folder_name}"}} at acc

                end if

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def mark_as_read(self, message_ids: List[str], account: str) -> bool:

        return self._set_read_status(message_ids, True)

    

    def mark_as_unread(self, message_ids: List[str], account: str) -> bool:

        return self._set_read_status(message_ids, False)

    

    def _set_read_status(self, message_ids: List[str], read_status: bool) -> bool:

        if not message_ids:

            return True

        

        id_list = ", ".join(message_ids)

        status_value = "true" if read_status else "false"

        

        script = f'''

        tell application "Mail"

            try

                set msgIds to {{{id_list}}}

                repeat with msgId in msgIds

                    set msg to message id msgId

                    set read status of msg to {status_value}

                end repeat

                return true

            on error

                return false

            end try

        end tell

        '''

        

        result = self.applescript_runner.run_script(script)

        return result.success and "true" in result.output.lower()

    

    def create_account(self, account_config: EmailAccount) -> bool:

        print(f"Account creation for {account_config.email_address} requires manual setup in Apple Mail")

        return False

    

    def _parse_applescript_list(self, output: str) -> List[Any]:

        try:

            cleaned = output.strip()

            if cleaned.startswith('{') and cleaned.endswith('}'):

                cleaned = cleaned[1:-1]

            

            items = []

            current_item = ""

            brace_count = 0

            in_quotes = False

            

            for char in cleaned:

                if char == '"' and (not current_item or current_item[-1] != '\\'):

                    in_quotes = not in_quotes

                elif char == '{' and not in_quotes:

                    brace_count += 1

                elif char == '}' and not in_quotes:

                    brace_count -= 1

                elif char == ',' and brace_count == 0 and not in_quotes:

                    items.append(current_item.strip().strip('"'))

                    current_item = ""

                    continue

                

                current_item += char

            

            if current_item.strip():

                items.append(current_item.strip().strip('"'))

            

            return items

        except Exception as e:

            print(f"Error parsing AppleScript output: {e}")

            return []

    

    def _parse_applescript_date(self, date_str: str) -> datetime:

        try:

            return dateutil.parser.parse(date_str)

        except:

            return datetime.now()


class EmailOperationResult:

    def __init__(self, success: bool, message: str, 

                 affected_count: int = 0, data: Any = None):

        self.success = success

        self.message = message

        self.affected_count = affected_count

        self.data = data

        self.timestamp = datetime.now()


class SecurityManager:

    def __init__(self):

        self.policies = self._initialize_security_policies()

        self.current_session: Optional[UserSession] = None

        self.session_timeout = timedelta(hours=2)

    

    def _initialize_security_policies(self) -> Dict[ActionType, SecurityPolicy]:

        return {

            ActionType.LIST_MESSAGES: SecurityPolicy(

                operation_type=OperationType.READ,

                security_level=SecurityLevel.LOW,

                requires_confirmation=False,

                requires_authentication=True,

                max_batch_size=None,

                confirmation_message=""

            ),

            ActionType.SEARCH_MESSAGES: SecurityPolicy(

                operation_type=OperationType.READ,

                security_level=SecurityLevel.LOW,

                requires_confirmation=False,

                requires_authentication=True,

                max_batch_size=None,

                confirmation_message=""

            ),

            ActionType.MOVE_MESSAGES: SecurityPolicy(

                operation_type=OperationType.MODIFY,

                security_level=SecurityLevel.MEDIUM,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=100,

                confirmation_message="This will move {count} messages to {destination}. Continue?"

            ),

            ActionType.DELETE_MESSAGES: SecurityPolicy(

                operation_type=OperationType.DELETE,

                security_level=SecurityLevel.HIGH,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=50,

                confirmation_message="This will permanently delete {count} messages. This action cannot be undone. Continue?"

            ),

            ActionType.CREATE_ACCOUNT: SecurityPolicy(

                operation_type=OperationType.CREATE,

                security_level=SecurityLevel.CRITICAL,

                requires_confirmation=True,

                requires_authentication=True,

                max_batch_size=1,

                confirmation_message="This will create a new email account for {email}. Continue?"

            )

        }

    

    def authenticate_user(self, username: str, password: str) -> bool:

        try:

            session_data = f"{username}:{datetime.now().isoformat()}"

            session_token = hashlib.sha256(session_data.encode()).hexdigest()

            

            self.current_session = UserSession(

                user_id=username,

                session_token=session_token,

                created_at=datetime.now(),

                last_activity=datetime.now(),

                permissions=["email:read", "email:write", "email:delete"],

                is_authenticated=True

            )

            

            return True

            

        except Exception as e:

            print(f"Authentication failed: {e}")

            return False

    

    def is_session_valid(self) -> bool:

        if not self.current_session or not self.current_session.is_authenticated:

            return False

        

        time_since_activity = datetime.now() - self.current_session.last_activity

        if time_since_activity > self.session_timeout:

            self.current_session = None

            return False

        

        self.current_session.last_activity = datetime.now()

        return True

    

    def check_operation_permission(self, command: EmailCommand) -> bool:

        if not self.is_session_valid():

            return False

        

        policy = self.policies.get(command.action)

        if not policy:

            return False

        

        required_permission = f"email:{policy.operation_type.value}"

        if required_permission not in self.current_session.permissions:

            return False

        

        return True

    

    def require_confirmation(self, command: EmailCommand, 

                           affected_count: int = 0,

                           additional_context: Dict[str, str] = None) -> bool:

        policy = self.policies.get(command.action)

        if not policy or not policy.requires_confirmation:

            return True

        

        if policy.max_batch_size and affected_count > policy.max_batch_size:

            print(f"Operation exceeds maximum batch size of {policy.max_batch_size}")

            return False

        

        context = additional_context or {}

        context.update({

            'count': str(affected_count),

            'action': command.action.value

        })

        

        if command.action == ActionType.MOVE_MESSAGES:

            context['destination'] = command.parameters.get('destination_folder', 'Unknown')

        elif command.action == ActionType.CREATE_ACCOUNT:

            context['email'] = command.parameters.get('email_address', 'Unknown')

        

        confirmation_message = policy.confirmation_message.format(**context)

        

        return self._get_user_confirmation(confirmation_message, policy.security_level)

    

    def _get_user_confirmation(self, message: str, security_level: SecurityLevel) -> bool:

        print(f"\n[{security_level.value.upper()}] CONFIRMATION REQUIRED")

        print(f"{message}")

        

        if security_level == SecurityLevel.CRITICAL:

            print("This is a critical operation. Please type 'CONFIRM' to proceed:")

            response = input("> ").strip()

            return response == "CONFIRM"

        else:

            print("Type 'y' or 'yes' to confirm, any other input to cancel:")

            response = input("> ").strip().lower()

            return response in ['y', 'yes']

    

    def log_operation(self, command: EmailCommand, result: EmailOperationResult) -> None:

        log_entry = {

            'timestamp': datetime.now().isoformat(),

            'user_id': self.current_session.user_id if self.current_session else 'unknown',

            'action': command.action.value,

            'parameters': command.parameters,

            'success': result.success,

            'affected_count': result.affected_count,

            'message': result.message

        }

        

        print(f"AUDIT LOG: {json.dumps(log_entry, indent=2)}")

    

    def validate_command_parameters(self, command: EmailCommand) -> bool:

        dangerous_patterns = [

            r'\.\./',

            r'<script',

            r'javascript:',

            r'file://',

        ]

        

        for key, value in command.parameters.items():

            if isinstance(value, str):

                for pattern in dangerous_patterns:

                    if re.search(pattern, value, re.IGNORECASE):

                        print(f"Security violation: Dangerous pattern detected in parameter '{key}'")

                        return False

        

        email_fields = ['email_address', 'sender', 'receiver']

        for field in email_fields:

            if field in command.parameters:

                email = command.parameters[field]

                if isinstance(email, str) and not self._is_valid_email(email):

                    print(f"Invalid email address in parameter '{field}': {email}")

                    return False

        

        return True

    

    def _is_valid_email(self, email: str) -> bool:

        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'

        return re.match(email_pattern, email) is not None


class EmailManager:

    def __init__(self, client: EmailClientInterface):

        self.client = client

        self.connected = False

    

    def initialize(self) -> bool:

        try:

            self.connected = self.client.connect()

            return self.connected

        except Exception as e:

            print(f"Failed to initialize email client: {e}")

            return False

    

    def execute_command(self, command: EmailCommand) -> EmailOperationResult:

        if not self.connected:

            return EmailOperationResult(

                success=False,

                message="Email client not connected"

            )

        

        try:

            if command.action == ActionType.DELETE_MESSAGES:

                return self._execute_delete_command(command.parameters)

            elif command.action == ActionType.MOVE_MESSAGES:

                return self._execute_move_command(command.parameters)

            elif command.action == ActionType.LIST_MESSAGES:

                return self._execute_list_command(command.parameters)

            elif command.action == ActionType.CREATE_ACCOUNT:

                return self._execute_create_account_command(command.parameters)

            elif command.action == ActionType.SEARCH_MESSAGES:

                return self._execute_search_command(command.parameters)

            else:

                return EmailOperationResult(

                    success=False,

                    message=f"Unsupported action: {command.action}"

                )

        except Exception as e:

            return EmailOperationResult(

                success=False,

                message=f"Command execution failed: {str(e)}"

            )

    

    def _execute_delete_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        account = parameters.get('account', 'default')

        folder = parameters.get('folder', 'Inbox')

        subject = parameters.get('subject')

        

        if not subject:

            return EmailOperationResult(

                success=False,

                message="Subject parameter required for delete operation"

            )

        

        messages = self.client.get_messages(account, folder)

        matching_messages = [

            msg for msg in messages 

            if subject.lower() in msg.subject.lower()

        ]

        

        if not matching_messages:

            return EmailOperationResult(

                success=True,

                message="No messages found matching criteria",

                affected_count=0

            )

        

        message_ids = [msg.message_id for msg in matching_messages]

        success = self.client.delete_messages(message_ids, account)

        

        return EmailOperationResult(

            success=success,

            message=f"Deleted {len(matching_messages)} messages" if success 

                   else "Failed to delete messages",

            affected_count=len(matching_messages) if success else 0

        )

    

    def _execute_move_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        account = parameters.get('account', 'default')

        source_folder = parameters.get('folder', 'Inbox')

        destination_folder = parameters.get('destination_folder')

        regex_pattern = parameters.get('regex_pattern')

        receiver = parameters.get('receiver')

        

        if not destination_folder:

            return EmailOperationResult(

                success=False,

                message="Destination folder required for move operation"

            )

        

        messages = self.client.get_messages(account, source_folder)

        matching_messages = []

        

        if regex_pattern:

            import re

            pattern = re.compile(regex_pattern, re.IGNORECASE)

            matching_messages = [

                msg for msg in messages

                if pattern.search(msg.body) or pattern.search(msg.subject)

            ]

        elif receiver:

            matching_messages = [

                msg for msg in messages

                if receiver.lower() in [r.lower() for r in msg.recipients]

            ]

        

        if not matching_messages:

            return EmailOperationResult(

                success=True,

                message="No messages found matching criteria",

                affected_count=0

            )

        

        message_ids = [msg.message_id for msg in matching_messages]

        success = self.client.move_messages(

            message_ids, source_folder, destination_folder, account

        )

        

        return EmailOperationResult(

            success=success,

            message=f"Moved {len(matching_messages)} messages" if success 

                   else "Failed to move messages",

            affected_count=len(matching_messages) if success else 0

        )

    

    def _execute_list_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        account = parameters.get('account', 'default')

        folder = parameters.get('folder', 'Inbox')

        sender = parameters.get('sender')

        

        messages = self.client.get_messages(account, folder)

        

        if sender:

            matching_messages = [

                msg for msg in messages

                if sender.lower() in msg.sender.lower()

            ]

        else:

            matching_messages = messages

        

        return EmailOperationResult(

            success=True,

            message=f"Found {len(matching_messages)} messages",

            affected_count=len(matching_messages),

            data=matching_messages

        )

    

    def _execute_search_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        account = parameters.get('account', 'default')

        query = parameters.get('query', '')

        folder = parameters.get('folder')

        

        matching_messages = self.client.search_messages(account, query, folder)

        

        return EmailOperationResult(

            success=True,

            message=f"Search found {len(matching_messages)} messages",

            affected_count=len(matching_messages),

            data=matching_messages

        )

    

    def _execute_create_account_command(self, parameters: Dict[str, Any]) -> EmailOperationResult:

        email_address = parameters.get('email_address')

        

        if not email_address:

            return EmailOperationResult(

                success=False,

                message="Email address required for account creation"

            )

        

        account_config = EmailAccount(

            email_address=email_address,

            display_name=email_address.split('@')[0],

            account_type='IMAP',

            server_settings={},

            is_enabled=True,

            folders=[]

        )

        

        success = self.client.create_account(account_config)

        

        return EmailOperationResult(

            success=success,

            message=f"Created account for {email_address}" if success 

                   else "Failed to create account",

            affected_count=1 if success else 0

        )


class SecureEmailManager(EmailManager):

    def __init__(self, client: EmailClientInterface, security_manager: SecurityManager):

        super().__init__(client)

        self.security_manager = security_manager

    

    def execute_command(self, command: EmailCommand) -> EmailOperationResult:

        if not self.security_manager.is_session_valid():

            return EmailOperationResult(

                success=False,

                message="Authentication required"

            )

        

        if not self.security_manager.check_operation_permission(command):

            return EmailOperationResult(

                success=False,

                message="Insufficient permissions for this operation"

            )

        

        if not self.security_manager.validate_command_parameters(command):

            return EmailOperationResult(

                success=False,

                message="Invalid or potentially dangerous parameters"

            )

        

        affected_count = 0

        if command.action in [ActionType.DELETE_MESSAGES, ActionType.MOVE_MESSAGES]:

            affected_count = self._estimate_affected_count(command)

        

        if not self.security_manager.require_confirmation(command, affected_count):

            return EmailOperationResult(

                success=False,

                message="Operation cancelled by user"

            )

        

        result = super().execute_command(command)

        

        self.security_manager.log_operation(command, result)

        

        return result

    

    def _estimate_affected_count(self, command: EmailCommand) -> int:

        try:

            if command.action == ActionType.DELETE_MESSAGES:

                account = command.parameters.get('account', 'default')

                folder = command.parameters.get('folder', 'Inbox')

                subject = command.parameters.get('subject')

                

                if subject:

                    messages = self.client.get_messages(account, folder)

                    return len([

                        msg for msg in messages 

                        if subject.lower() in msg.subject.lower()

                    ])

            

            elif command.action == ActionType.MOVE_MESSAGES:

                account = command.parameters.get('account', 'default')

                folder = command.parameters.get('folder', 'Inbox')

                regex_pattern = command.parameters.get('regex_pattern')

                receiver = command.parameters.get('receiver')

                

                messages = self.client.get_messages(account, folder)

                

                if regex_pattern:

                    import re

                    pattern = re.compile(regex_pattern, re.IGNORECASE)

                    return len([

                        msg for msg in messages

                        if pattern.search(msg.body) or pattern.search(msg.subject)

                    ])

                elif receiver:

                    return len([

                        msg for msg in messages

                        if receiver.lower() in [r.lower() for r in msg.recipients]

                    ])

        

        except Exception as e:

            print(f"Error estimating affected count: {e}")

        

        return 0


class EmailAssistant:

    def __init__(self, llm_model_path: str, email_client_type: str = "apple_mail"):

        self.llm_model_path = llm_model_path

        self.email_client_type = email_client_type

        

        self.nlp_processor = None

        self.email_client = None

        self.security_manager = None

        self.email_manager = None

        

        self._initialize_components()

    

    def _initialize_components(self) -> None:

        try:

            print("Initializing natural language processor...")

            self.nlp_processor = NaturalLanguageProcessor(self.llm_model_path)

            

            print(f"Initializing {self.email_client_type} client...")

            if self.email_client_type.lower() == "apple_mail":

                self.email_client = AppleMailClient()

            else:

                raise ValueError(f"Unsupported email client type: {self.email_client_type}")

            

            print("Initializing security manager...")

            self.security_manager = SecurityManager()

            

            print("Initializing email manager...")

            self.email_manager = SecureEmailManager(self.email_client, self.security_manager)

            

            print("All components initialized successfully.")

            

        except Exception as e:

            print(f"Failed to initialize components: {e}")

            sys.exit(1)

    

    def start(self) -> None:

        print("\n" + "="*60)

        print("Natural Language Email Assistant")

        print("="*60)

        print("This assistant allows you to manage your email using natural language commands.")

        print("Type 'help' for available commands or 'quit' to exit.\n")

        

        if not self._authenticate_user():

            print("Authentication failed. Exiting.")

            return

        

        if not self.email_manager.initialize():

            print("Failed to connect to email client. Exiting.")

            return

        

        print("Email assistant ready. You can now enter commands.\n")

        

        self._command_loop()

    

    def _authenticate_user(self) -> bool:

        print("Authentication required to access email functions.")

        

        max_attempts = 3

        for attempt in range(max_attempts):

            try:

                username = input("Username: ").strip()

                password = getpass.getpass("Password: ")

                

                if self.security_manager.authenticate_user(username, password):

                    print("Authentication successful.")

                    return True

                else:

                    remaining = max_attempts - attempt - 1

                    if remaining > 0:

                        print(f"Authentication failed. {remaining} attempts remaining.")

                    else:

                        print("Authentication failed. Maximum attempts exceeded.")

                        

            except KeyboardInterrupt:

                print("\nAuthentication cancelled.")

                return False

            except Exception as e:

                print(f"Authentication error: {e}")

        

        return False

    

    def _command_loop(self) -> None:

        while True:

            try:

                user_input = input("Email Assistant> ").strip()

                

                if not user_input:

                    continue

                

                if user_input.lower() in ['quit', 'exit', 'q']:

                    print("Goodbye!")

                    break

                elif user_input.lower() == 'help':

                    self._show_help()

                    continue

                elif user_input.lower() == 'status':

                    self._show_status()

                    continue

                elif user_input.lower().startswith('accounts'):

                    self._show_accounts()

                    continue

                

                self._process_command(user_input)

                

            except KeyboardInterrupt:

                print("\nUse 'quit' to exit.")

            except Exception as e:

                print(f"Error processing command: {e}")

    

    def _process_command(self, user_input: str) -> None:

        try:

            print("Processing command...")

            

            command = self.nlp_processor.process_command(user_input)

            

            print(f"Interpreted action: {command.action.value}")

            print(f"Confidence: {command.confidence:.2f}")

            

            if command.confidence < 0.5:

                print("Low confidence in command interpretation. Please rephrase your request.")

                return

            

            result = self.email_manager.execute_command(command)

            

            self._display_result(result)

            

        except Exception as e:

            print(f"Failed to process command: {e}")

    

    def _display_result(self, result: EmailOperationResult) -> None:

        if result.success:

            print(f"✓ {result.message}")

            if result.affected_count > 0:

                print(f"  Affected items: {result.affected_count}")

            

            if result.data and isinstance(result.data, list):

                if len(result.data) <= 10:

                    for item in result.data:

                        if isinstance(item, EmailMessage):

                            print(f"  - {item.subject} (from: {item.sender})")

                else:

                    print(f"  ({len(result.data)} items total - use 'list' command for details)")

        else:

            print(f"✗ {result.message}")

    

    def _show_help(self) -> None:

        help_text = """

Available Commands:

==================


Natural Language Commands:

- "Delete all mails with subject 'meeting' from folder 'Inbox'"

- "Move all mails containing 'urgent' to folder 'Priority'"

- "List all mails in account 'work@company.com' sent by 'John Smith'"

- "Create a new mail account for 'personal@gmail.com'"

- "Move mails with receiver 'team@company.com' to folder 'Team'"

- "Search for emails containing 'project update'"


Special Commands:

- help          Show this help message

- status        Show system status

- accounts      List configured email accounts

- quit/exit/q   Exit the application


Examples:

- "Show me all unread emails"

- "Delete spam emails from last week"

- "Move all newsletters to the Archive folder"

- "Find emails from my manager about the quarterly report"


Tips:

- Be specific about folders, subjects, and senders

- Use quotes around multi-word subjects or names

- The system will ask for confirmation before destructive operations

        """

        print(help_text)

    

    def _show_status(self) -> None:

        print("\nSystem Status:")

        print("=" * 30)

        

        if self.security_manager.is_session_valid():

            session = self.security_manager.current_session

            print(f"User: {session.user_id}")

            print(f"Session active since: {session.created_at.strftime('%Y-%m-%d %H:%M:%S')}")

            print(f"Last activity: {session.last_activity.strftime('%Y-%m-%d %H:%M:%S')}")

        else:

            print("No active session")

        

        print(f"Email client: {self.email_client_type}")

        print(f"Connected: {'Yes' if self.email_manager.connected else 'No'}")

        

        print(f"LLM model: {os.path.basename(self.llm_model_path)}")

        print()

    

    def _show_accounts(self) -> None:

        try:

            accounts = self.email_client.get_accounts()

            

            if not accounts:

                print("No email accounts configured.")

                return

            

            print("\nConfigured Email Accounts:")

            print("=" * 40)

            

            for account in accounts:

                print(f"Account: {account.display_name}")

                print(f"  Email: {account.email_address}")

                print(f"  Type: {account.account_type}")

                print(f"  Enabled: {'Yes' if account.is_enabled else 'No'}")

                

                if account.folders:

                    print(f"  Folders: {len(account.folders)}")

                    for folder in account.folders[:5]:

                        print(f"    - {folder.name} ({folder.message_count} messages)")

                    if len(account.folders) > 5:

                        print(f"    ... and {len(account.folders) - 5} more")

                print()

                

        except Exception as e:

            print(f"Failed to retrieve account information: {e}")


def main():

    parser = argparse.ArgumentParser(

        description="Natural Language Email Assistant",

        formatter_class=argparse.RawDescriptionHelpFormatter,

        epilog="""

Examples:

  python email_assistant.py --model ./models/llama-7b.gguf

  python email_assistant.py --model ./models/llama-7b.gguf --client apple_mail

        """

    )

    

    parser.add_argument(

        '--model', '-m',

        required=True,

        help='Path to the local LLM model file'

    )

    

    parser.add_argument(

        '--client', '-c',

        default='apple_mail',

        choices=['apple_mail'],

        help='Email client type (default: apple_mail)'

    )

    

    parser.add_argument(

        '--debug',

        action='store_true',

        help='Enable debug output'

    )

    

    args = parser.parse_args()

    

    if not os.path.exists(args.model):

        print(f"Error: Model file not found: {args.model}")

        sys.exit(1)

    

    if args.debug:

        import logging

        logging.basicConfig(level=logging.DEBUG)

    

    try:

        assistant = EmailAssistant(

            llm_model_path=args.model,

            email_client_type=args.client

        )

        assistant.start()

        

    except KeyboardInterrupt:

        print("\nApplication interrupted by user.")

    except Exception as e:

        print(f"Application error: {e}")

        if args.debug:

            import traceback

            traceback.print_exc()

        sys.exit(1)


if __name__ == "__main__":

    main()



Conclusion and Future Considerations


This comprehensive implementation demonstrates the successful integration of natural language processing with email management systems through a well-architected, modular design. The system effectively addresses the primary challenges of interpreting human language, maintaining security, and providing a unified interface across different email clients.

The layered architecture ensures maintainability and extensibility, allowing for the addition of new email clients without modifying the core application logic. The security framework provides robust protection against unauthorized access and potentially destructive operations while maintaining usability through intelligent confirmation workflows.

The use of local LLM processing ensures privacy and data sovereignty, addressing concerns about sensitive email content being transmitted to external services. The fallback pattern matching system provides reliability when LLM processing encounters unexpected inputs or fails to generate confident interpretations.

Future enhancements could include support for additional email clients such as Microsoft Outlook, Thunderbird, or web-based services through API integration. Advanced natural language understanding could be achieved through fine-tuning the local LLM on email-specific datasets, improving accuracy for domain-specific terminology and command patterns.

The system architecture supports the integration of additional features such as email composition assistance, intelligent categorization, and automated response generation. The modular design facilitates these extensions without requiring fundamental changes to the existing codebase.

Performance optimization opportunities exist in caching frequently accessed email data, implementing incremental synchronization for large mailboxes, and optimizing the LLM inference pipeline for faster response times. The security framework could be enhanced with role-based access control, audit trail encryption, and integration with enterprise authentication systems.

This implementation serves as a foundation for building sophisticated email management tools that bridge the gap between human natural language expression and programmatic email operations, demonstrating the practical application of modern AI technologies in productivity software.