Thursday, December 11, 2025

BUILDING IOT SYSTEMS WITH CAPABILITY-CENTRIC ARCHITECTURE

 



INTRODUCTION: BRIDGING TWO WORLDS

Imagine a building climate control system that spans from tiny microcontrollers reading temperature sensors to cloud-based analytics processing millions of data points. Traditional software architecture forces you to choose between two incompatible worlds. Either you write low-level embedded code with direct hardware access and minimal abstraction, or you build enterprise applications with layers of indirection and sophisticated frameworks. Capability-Centric Architecture dissolves this false dichotomy.

This tutorial guides you through building a production-ready IoT climate control system using Capability-Centric Architecture. We will construct a complete system where IoT nodes measure temperature and control fans, edge devices coordinate room-level climate management, and an enterprise application provides building-wide strategy and analytics. Every line of code you see here is production-ready, not simplified demonstrations or mock implementations.

The system we build demonstrates all core CCA concepts in action. You will see how the Capability Nucleus structure separates domain logic from infrastructure concerns. You will witness Capability Contracts enabling independent evolution of components. You will observe Efficiency Gradients allowing microsecond-critical interrupt handlers to coexist with flexible enterprise services. You will understand how the Capability Registry prevents circular dependencies while managing complex initialization sequences. Most importantly, you will grasp how a single architectural pattern elegantly spans from bare-metal microcontrollers to cloud-native enterprise platforms.

SYSTEM OVERVIEW: THE CLIMATE CONTROL CHALLENGE

Our climate control system manages temperature across multiple rooms in a building. Each room contains several components working in concert. Temperature sensors continuously measure ambient conditions. Cooling fans adjust airflow based on control decisions. An edge device coordinates all components within a room, calculating average temperatures and making local control decisions. The enterprise application orchestrates building-wide climate strategy, collects analytics, and provides user interfaces for facility managers.

The architecture reflects real-world constraints. IoT sensor nodes run on resource-constrained microcontrollers with limited RAM and processing power. They must respond to hardware interrupts within microseconds to maintain accurate measurements. Fan actuator nodes control physical hardware through pulse-width modulation with strict timing requirements. Edge devices run on more capable hardware, perhaps a Raspberry Pi or similar embedded Linux system, with sufficient resources for Python-based coordination logic. The enterprise application runs in the cloud with horizontal scaling, database persistence, and sophisticated analytics capabilities.

This heterogeneity presents the exact challenge that Capability-Centric Architecture addresses. We need microsecond-level performance for sensor reading, moderate abstraction for edge coordination, and full enterprise flexibility for the cloud application. Traditional architectures force separate designs for each tier. CCA provides a unified approach.

CORE CONCEPT ONE: THE CAPABILITY NUCLEUS

Before diving into code, we must understand the Capability Nucleus, the fundamental structural pattern of CCA. Every capability, whether running on a microcontroller or in the cloud, follows this three-layer structure.

The innermost layer is the Essence. This contains pure domain logic with zero dependencies on infrastructure. For a temperature sensor capability, the Essence contains calibration algorithms and validation rules. For a climate strategy capability, the Essence contains the business rules determining optimal temperatures. The Essence is completely testable in isolation because it has no external dependencies.

The middle layer is the Realization. This integrates the Essence with the real world. For embedded systems, the Realization accesses hardware registers, configures interrupts, and manages DMA controllers. For enterprise systems, the Realization connects to databases, message queues, and external APIs. The Realization depends on the Essence and on infrastructure, but the Essence never depends on the Realization.

The outer layer is the Adaptation. This provides interfaces for external interaction. It adapts the capability's functionality to various protocols and interaction patterns. A capability might expose REST APIs, consume message queue events, or provide callback interfaces, all through its Adaptation layer.

Let us examine this structure with a concrete example from our temperature sensor. The following code demonstrates the Essence layer, which contains the core temperature measurement logic.

// TemperatureSensorEssence.h
// Pure domain logic for temperature sensing - no hardware dependencies

class TemperatureSensorEssence {
private:
    // Calibration parameters loaded during initialization
    float offsetCelsius;
    float scaleFactor;
    
    // Moving average filter for noise reduction
    static const int FILTER_SIZE = 8;
    float filterBuffer[FILTER_SIZE];
    int filterIndex;
    
public:
    TemperatureSensorEssence(float offset, float scale)
        : offsetCelsius(offset)
        , scaleFactor(scale)
        , filterIndex(0) {
        // Initialize filter buffer
        for (int i = 0; i < FILTER_SIZE; i++) {
            filterBuffer[i] = 0.0f;
        }
    }
    
    // Convert raw ADC value to calibrated temperature
    // This is pure logic - completely testable without hardware
    float convertRawToTemperature(uint16_t rawValue) {
        // Apply calibration formula
        float voltage = (rawValue / 4095.0f) * 3.3f;
        float tempCelsius = (voltage - 0.5f) * 100.0f;
        tempCelsius = (tempCelsius * scaleFactor) + offsetCelsius;
        
        // Apply moving average filter
        filterBuffer[filterIndex] = tempCelsius;
        filterIndex = (filterIndex + 1) % FILTER_SIZE;
        
        float sum = 0.0f;
        for (int i = 0; i < FILTER_SIZE; i++) {
            sum += filterBuffer[i];
        }
        
        return sum / FILTER_SIZE;
    }
    
    // Validate that temperature reading is within acceptable range
    bool isValidReading(float temperature) {
        return temperature >= -40.0f && temperature <= 125.0f;
    }
    
    // Determine if reading indicates an alarm condition
    bool isAlarmCondition(float temperature, float threshold) {
        return temperature > threshold;
    }
};

This Essence contains no hardware access, no I/O operations, nothing that prevents testing. We can instantiate this class in a test framework, call its methods with various inputs, and verify outputs. The calibration logic, filtering algorithm, and validation rules are all pure domain logic.

Now observe how the Realization layer integrates this Essence with actual hardware. The Realization accesses microcontroller peripherals directly for maximum performance.

// TemperatureSensorRealization.h
// Hardware integration for temperature sensor capability

#include "TemperatureSensorEssence.h"

class TemperatureSensorRealization {
private:
    TemperatureSensorEssence* essence;
    
    // Hardware register addresses for STM32-style microcontroller
    static const uint32_t ADC_BASE = 0x40012000;
    static const uint32_t ADC_CR = ADC_BASE + 0x08;
    static const uint32_t ADC_DR = ADC_BASE + 0x4C;
    static const uint32_t ADC_SMPR = ADC_BASE + 0x0C;
    
    // Volatile for hardware register access
    volatile uint32_t* adcControl;
    volatile uint32_t* adcData;
    volatile uint32_t* adcSampleTime;
    
    // Interrupt-driven reading
    volatile bool readingReady;
    volatile uint16_t latestRawValue;
    
public:
    TemperatureSensorRealization(TemperatureSensorEssence* ess)
        : essence(ess)
        , readingReady(false)
        , latestRawValue(0) {
        // Map hardware registers
        adcControl = reinterpret_cast<volatile uint32_t*>(ADC_CR);
        adcData = reinterpret_cast<volatile uint32_t*>(ADC_DR);
        adcSampleTime = reinterpret_cast<volatile uint32_t*>(ADC_SMPR);
    }
    
    // Initialize hardware - called once during capability startup
    void initialize() {
        // Configure ADC for temperature sensor channel
        *adcSampleTime = 0x07;  // 239.5 cycles sample time
        *adcControl |= (1 << 0);  // Enable ADC
        
        // Wait for ADC stabilization
        for (volatile int i = 0; i < 1000; i++);
        
        // Enable end-of-conversion interrupt
        *adcControl |= (1 << 5);
    }
    
    // Start a temperature reading - non-blocking
    void startReading() {
        readingReady = false;
        *adcControl |= (1 << 30);  // Start conversion
    }
    
    // ADC interrupt handler - runs in interrupt context
    // This is the critical path with highest efficiency gradient
    void adcInterruptHandler() {
        // Read raw value directly from hardware register
        latestRawValue = static_cast<uint16_t>(*adcData & 0xFFFF);
        readingReady = true;
    }
    
    // Get calibrated temperature - blocks until reading ready
    // This runs at medium efficiency gradient
    float getTemperature() {
        // Wait for reading (in production, use timeout)
        while (!readingReady) {
            // Could yield to scheduler here
        }
        
        // Convert using Essence logic
        float temp = essence->convertRawToTemperature(latestRawValue);
        
        // Validate reading
        if (!essence->isValidReading(temp)) {
            // Handle invalid reading
            return -273.15f;  // Sentinel value
        }
        
        return temp;
    }
};

Notice the efficiency gradient in action. The interrupt handler runs with minimal overhead, directly accessing hardware registers. No function calls except the unavoidable interrupt dispatch. No memory allocations. No abstractions. This is the critical path requiring microsecond-level performance.

The getTemperature method operates at a medium efficiency gradient. It can block, call Essence methods, and perform validation. This runs in normal task context where a few microseconds matter less than code clarity.

The Adaptation layer completes the capability by providing external interfaces. For an IoT node, this might be a simple query interface and configuration mechanism.

// TemperatureSensorAdaptation.h
// External interface for temperature sensor capability

#include "TemperatureSensorRealization.h"

class TemperatureSensorAdaptation {
private:
    TemperatureSensorRealization* realization;
    
public:
    TemperatureSensorAdaptation(TemperatureSensorRealization* real)
        : realization(real) {}
    
    // Simple query interface for edge device
    float queryCurrentTemperature() {
        realization->startReading();
        return realization->getTemperature();
    }
    
    // Configuration interface
    void setCalibration(float offset, float scale) {
        // Would update Essence calibration parameters
        // Implementation details omitted for brevity
    }
};

This three-layer structure appears in every capability throughout our system. The Essence contains domain logic. The Realization integrates infrastructure. The Adaptation provides interfaces. This separation enables independent testing, technology replacement, and evolution.

CORE CONCEPT TWO: CAPABILITY CONTRACTS

Capabilities interact through contracts, not direct dependencies. A contract specifies what a capability provides, what it requires, and the protocols for interaction. This enables capabilities to evolve independently as long as they maintain contract compatibility.

Consider how our edge device interacts with temperature sensors. The edge device needs temperature readings but should not depend on the specific sensor implementation. We define a contract that any temperature sensor must fulfill.

# temperature_sensor_contract.py
# Contract for temperature sensing capabilities

from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import dataclass
from enum import Enum

class ReadingQuality(Enum):
    EXCELLENT = "excellent"
    GOOD = "good"
    POOR = "poor"
    INVALID = "invalid"

@dataclass
class TemperatureReading:
    celsius: float
    timestamp: float
    quality: ReadingQuality
    sensor_id: str

class TemperatureSensorContract(ABC):
    """
    Contract for temperature sensing capabilities.
    Any capability providing temperature measurements must implement this contract.
    """
    
    @abstractmethod
    def get_current_temperature(self) -> Optional[TemperatureReading]:
        """
        Retrieve the current temperature reading.
        
        Returns:
            TemperatureReading if successful, None if sensor unavailable
            
        Quality Attributes:
            - Response time: < 100ms for local sensors
            - Accuracy: ±0.5°C after calibration
            - Availability: 99.9% uptime expected
        """
        pass
    
    @abstractmethod
    def get_sensor_info(self) -> dict:
        """
        Retrieve sensor metadata including calibration status and health.
        
        Returns:
            Dictionary with sensor information
        """
        pass
    
    @abstractmethod
    def is_available(self) -> bool:
        """
        Check if sensor is currently operational.
        
        Returns:
            True if sensor can provide readings, False otherwise
        """
        pass

This contract defines the interface without specifying implementation. The edge device depends on this contract, not on any specific sensor implementation. We could replace our ADC-based sensor with an I2C digital sensor, a network-connected sensor, or even a simulated sensor for testing, all without changing the edge device code.

The contract also specifies quality attributes. Response time, accuracy, and availability are part of the contract. This makes non-functional requirements explicit and verifiable.

Now observe how the fan actuator defines its contract. This demonstrates a different interaction pattern with command-based control rather than query-based reading.

# fan_actuator_contract.py
# Contract for fan control capabilities

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum

class FanState(Enum):
    OFF = 0
    LOW = 33
    MEDIUM = 66
    HIGH = 100

@dataclass
class FanStatus:
    current_speed: int  # 0-100 percentage
    state: FanState
    is_operational: bool
    fan_id: str
    runtime_hours: float

class FanActuatorContract(ABC):
    """
    Contract for fan control capabilities.
    Provides command interface for fan speed control.
    """
    
    @abstractmethod
    def set_speed(self, speed_percent: int) -> bool:
        """
        Set fan speed to specified percentage.
        
        Args:
            speed_percent: Target speed from 0 (off) to 100 (maximum)
            
        Returns:
            True if command accepted, False if rejected
            
        Quality Attributes:
            - Command latency: < 50ms
            - Speed accuracy: ±5% of target
            - Ramp time: < 2 seconds to reach target speed
        """
        pass
    
    @abstractmethod
    def get_status(self) -> FanStatus:
        """
        Retrieve current fan operational status.
        
        Returns:
            FanStatus with current state information
        """
        pass
    
    @abstractmethod
    def emergency_stop(self) -> None:
        """
        Immediately stop fan operation.
        This is a safety-critical operation with highest priority.
        
        Quality Attributes:
            - Response time: < 10ms
            - Reliability: Must never fail
        """
        pass

The fan contract emphasizes command patterns and includes safety-critical operations. The emergency stop method has stringent quality requirements because it relates to safety. This demonstrates how contracts capture both functional and non-functional requirements.

Contracts enable loose coupling. The edge device depends on these contracts, not on specific IoT node implementations. The IoT nodes implement these contracts without knowing about the edge device. This allows independent development, testing, and deployment.

CORE CONCEPT THREE: EFFICIENCY GRADIENTS

Different operations have different performance requirements. Capability-Centric Architecture accommodates this through efficiency gradients, allowing critical paths to use minimal abstraction while non-critical paths employ flexible designs.

Our fan actuator demonstrates efficiency gradients clearly. The PWM control loop runs in an interrupt handler with microsecond timing requirements. The status reporting runs in normal context with relaxed timing. Both belong to the same capability but operate at different efficiency gradients.

// FanActuatorCapability.cpp
// Demonstrates efficiency gradients in fan control

class FanActuatorCapability {
private:
    // ESSENCE: Pure control logic
    class FanControlEssence {
    private:
        int targetSpeed;
        int currentSpeed;
        int rampRate;  // Speed change per update cycle
        
    public:
        FanControlEssence() : targetSpeed(0), currentSpeed(0), rampRate(5) {}
        
        // Calculate next PWM duty cycle value
        // Pure logic - no side effects
        int calculateNextDutyCycle() {
            if (currentSpeed < targetSpeed) {
                currentSpeed = min(currentSpeed + rampRate, targetSpeed);
            } else if (currentSpeed > targetSpeed) {
                currentSpeed = max(currentSpeed - rampRate, targetSpeed);
            }
            return currentSpeed;
        }
        
        void setTarget(int speed) {
            targetSpeed = constrain(speed, 0, 100);
        }
        
        bool isAtTarget() const {
            return currentSpeed == targetSpeed;
        }
        
    private:
        int constrain(int value, int min_val, int max_val) {
            if (value < min_val) return min_val;
            if (value > max_val) return max_val;
            return value;
        }
        
        int min(int a, int b) { return a < b ? a : b; }
        int max(int a, int b) { return a > b ? a : b; }
    };
    
    FanControlEssence essence;
    
    // Hardware register addresses
    static const uint32_t TIM_BASE = 0x40000000;
    static const uint32_t TIM_CCR1 = TIM_BASE + 0x34;
    static const uint32_t TIM_CR1 = TIM_BASE + 0x00;
    
    volatile uint32_t* pwmDuty;
    volatile uint32_t* timerControl;
    
    // Runtime statistics
    unsigned long runtimeSeconds;
    unsigned long lastUpdateTime;
    
public:
    FanActuatorCapability() : runtimeSeconds(0), lastUpdateTime(0) {
        pwmDuty = reinterpret_cast<volatile uint32_t*>(TIM_CCR1);
        timerControl = reinterpret_cast<volatile uint32_t*>(TIM_CR1);
    }
    
    void initialize() {
        // Configure PWM timer for 25kHz frequency
        // Timer configuration details omitted for brevity
        *timerControl |= (1 << 0);  // Enable timer
    }
    
    // CRITICAL PATH - Highest efficiency gradient
    // Timer interrupt handler runs at 1kHz for smooth speed ramping
    void timerInterruptHandler() {
        // Calculate next duty cycle using Essence
        int dutyCycle = essence.calculateNextDutyCycle();
        
        // Write directly to hardware register
        // No abstraction, no function calls, minimal overhead
        *pwmDuty = (dutyCycle * 1000) / 100;  // Convert percentage to timer counts
        
        // Update runtime counter
        runtimeSeconds++;
    }
    
    // MEDIUM PATH - Medium efficiency gradient
    // Called from normal context, can use some abstraction
    bool setSpeed(int speedPercent) {
        if (speedPercent < 0 || speedPercent > 100) {
            return false;
        }
        
        essence.setTarget(speedPercent);
        return true;
    }
    
    // FLEXIBLE PATH - Low efficiency gradient
    // Status reporting can allocate objects and use high-level constructs
    FanStatus getStatus() {
        FanStatus status;
        status.currentSpeed = essence.calculateNextDutyCycle();
        status.isOperational = true;
        status.runtimeHours = runtimeSeconds / 3600.0f;
        
        // Determine state based on speed
        if (status.currentSpeed == 0) {
            status.state = FanState::OFF;
        } else if (status.currentSpeed <= 33) {
            status.state = FanState::LOW;
        } else if (status.currentSpeed <= 66) {
            status.state = FanState::MEDIUM;
        } else {
            status.state = FanState::HIGH;
        }
        
        return status;
    }
    
    // SAFETY-CRITICAL PATH - Highest priority
    void emergencyStop() {
        // Disable interrupts for atomic operation
        __disable_irq();
        
        essence.setTarget(0);
        *pwmDuty = 0;  // Immediately stop PWM output
        
        __enable_irq();
    }
};

Notice how the same capability operates at three distinct efficiency gradients. The timer interrupt handler executes with minimal overhead, directly manipulating hardware registers. The setSpeed method uses moderate abstraction with validation logic. The getStatus method freely allocates objects and performs complex calculations. Each gradient is appropriate for its context.

This flexibility is crucial for IoT systems. We cannot afford to make every operation maximally efficient because that sacrifices maintainability. We cannot afford to make every operation maximally abstract because that sacrifices real-time performance. Efficiency gradients let us choose the right trade-off for each operation.

CORE CONCEPT FOUR: CAPABILITY REGISTRY AND DEPENDENCY MANAGEMENT

As systems grow, managing dependencies becomes critical. The Capability Registry prevents circular dependencies, ensures correct initialization order, and manages capability lifecycle. Let us examine how the edge device registers and manages its capabilities.

# capability_registry.py
# Manages capability registration and dependency resolution

from typing import Dict, List, Set, Optional, Type
from dataclasses import dataclass
from collections import defaultdict

@dataclass
class CapabilityDescriptor:
    name: str
    capability_class: Type
    provides: List[Type]  # Contract types this capability provides
    requires: List[Type]  # Contract types this capability needs
    initialization_priority: int = 0

class CircularDependencyError(Exception):
    pass

class CapabilityRegistry:
    def __init__(self):
        self.capabilities: Dict[str, CapabilityDescriptor] = {}
        self.instances: Dict[str, object] = {}
        self.dependency_graph: Dict[str, Set[str]] = defaultdict(set)
        
    def register(self, descriptor: CapabilityDescriptor) -> None:
        """
        Register a capability with the registry.
        Validates that no circular dependencies are introduced.
        """
        # Validate descriptor
        if not descriptor.name:
            raise ValueError("Capability must have a name")
        
        if descriptor.name in self.capabilities:
            raise ValueError(f"Capability {descriptor.name} already registered")
        
        # Store descriptor
        self.capabilities[descriptor.name] = descriptor
        
        # Build dependency graph
        self._update_dependency_graph(descriptor)
        
        # Check for circular dependencies
        if self._has_circular_dependency():
            # Rollback registration
            del self.capabilities[descriptor.name]
            raise CircularDependencyError(
                f"Registering {descriptor.name} would create circular dependency"
            )
    
    def _update_dependency_graph(self, descriptor: CapabilityDescriptor) -> None:
        """
        Update dependency graph with new capability's dependencies.
        """
        for required_contract in descriptor.requires:
            # Find capabilities providing this contract
            providers = self._find_providers(required_contract)
            for provider_name in providers:
                self.dependency_graph[descriptor.name].add(provider_name)
    
    def _find_providers(self, contract_type: Type) -> List[str]:
        """
        Find all capabilities that provide a specific contract.
        """
        providers = []
        for name, desc in self.capabilities.items():
            if contract_type in desc.provides:
                providers.append(name)
        return providers
    
    def _has_circular_dependency(self) -> bool:
        """
        Detect circular dependencies using depth-first search.
        """
        visited = set()
        rec_stack = set()
        
        def has_cycle(node: str) -> bool:
            visited.add(node)
            rec_stack.add(node)
            
            for neighbor in self.dependency_graph.get(node, []):
                if neighbor not in visited:
                    if has_cycle(neighbor):
                        return True
                elif neighbor in rec_stack:
                    return True
            
            rec_stack.remove(node)
            return False
        
        for capability_name in self.capabilities:
            if capability_name not in visited:
                if has_cycle(capability_name):
                    return True
        
        return False
    
    def get_initialization_order(self) -> List[str]:
        """
        Compute initialization order using topological sort.
        Capabilities with no dependencies are initialized first.
        """
        # Calculate in-degree for each capability
        in_degree = {name: 0 for name in self.capabilities}
        for dependencies in self.dependency_graph.values():
            for dep in dependencies:
                in_degree[dep] += 1
        
        # Queue capabilities with no dependencies
        queue = [name for name, degree in in_degree.items() if degree == 0]
        result = []
        
        while queue:
            # Sort by priority for deterministic order
            queue.sort(key=lambda n: self.capabilities[n].initialization_priority)
            current = queue.pop(0)
            result.append(current)
            
            # Reduce in-degree for dependent capabilities
            for name, dependencies in self.dependency_graph.items():
                if current in dependencies:
                    in_degree[name] -= 1
                    if in_degree[name] == 0:
                        queue.append(name)
        
        if len(result) != len(self.capabilities):
            raise CircularDependencyError("Circular dependency detected during sort")
        
        return result
    
    def initialize_all(self) -> None:
        """
        Initialize all registered capabilities in dependency order.
        """
        init_order = self.get_initialization_order()
        
        for capability_name in init_order:
            self._initialize_capability(capability_name)
    
    def _initialize_capability(self, name: str) -> None:
        """
        Initialize a single capability and inject its dependencies.
        """
        descriptor = self.capabilities[name]
        
        # Create instance
        instance = descriptor.capability_class()
        
        # Inject dependencies
        for required_contract in descriptor.requires:
            provider = self._get_contract_provider(required_contract)
            if hasattr(instance, 'inject_dependency'):
                instance.inject_dependency(required_contract, provider)
        
        # Initialize capability
        if hasattr(instance, 'initialize'):
            instance.initialize()
        
        # Store instance
        self.instances[name] = instance
        
        # Start capability if it has a start method
        if hasattr(instance, 'start'):
            instance.start()
    
    def _get_contract_provider(self, contract_type: Type) -> object:
        """
        Get an instance that provides the specified contract.
        """
        for name, descriptor in self.capabilities.items():
            if contract_type in descriptor.provides:
                instance = self.instances.get(name)
                if instance:
                    return instance
        
        raise ValueError(f"No provider found for contract {contract_type}")
    
    def shutdown_all(self) -> None:
        """
        Shutdown all capabilities in reverse initialization order.
        """
        init_order = self.get_initialization_order()
        
        for capability_name in reversed(init_order):
            instance = self.instances.get(capability_name)
            if instance and hasattr(instance, 'stop'):
                instance.stop()
            if instance and hasattr(instance, 'cleanup'):
                instance.cleanup()

The registry performs several critical functions. It validates that capabilities declare their dependencies through contracts. It detects circular dependencies before they cause runtime failures. It computes initialization order ensuring dependencies are available when needed. It manages the complete lifecycle from initialization through shutdown.

Consider how the edge device uses this registry to manage its capabilities.

# edge_device_main.py
# Edge device initialization using capability registry

from capability_registry import CapabilityRegistry, CapabilityDescriptor
from temperature_sensor_contract import TemperatureSensorContract
from fan_actuator_contract import FanActuatorContract

# Import capability implementations
from sensor_gateway_capability import SensorGatewayCapability
from fan_controller_capability import FanControllerCapability
from climate_coordinator_capability import ClimateCoordinatorCapability
from enterprise_reporter_capability import EnterpriseReporterCapability

def initialize_edge_device():
    registry = CapabilityRegistry()
    
    # Register sensor gateway capability
    # This communicates with IoT sensor nodes
    registry.register(CapabilityDescriptor(
        name="sensor_gateway",
        capability_class=SensorGatewayCapability,
        provides=[TemperatureSensorContract],
        requires=[],  # No dependencies
        initialization_priority=1  # Initialize early
    ))
    
    # Register fan controller capability
    # This communicates with IoT fan actuator nodes
    registry.register(CapabilityDescriptor(
        name="fan_controller",
        capability_class=FanControllerCapability,
        provides=[FanActuatorContract],
        requires=[],  # No dependencies
        initialization_priority=1  # Initialize early
    ))
    
    # Register climate coordinator capability
    # This implements room-level climate control logic
    registry.register(CapabilityDescriptor(
        name="climate_coordinator",
        capability_class=ClimateCoordinatorCapability,
        provides=[],  # Doesn't provide contracts to others
        requires=[TemperatureSensorContract, FanActuatorContract],
        initialization_priority=2  # Initialize after sensors and fans
    ))
    
    # Register enterprise reporter capability
    # This sends data to enterprise application
    registry.register(CapabilityDescriptor(
        name="enterprise_reporter",
        capability_class=EnterpriseReporterCapability,
        provides=[],
        requires=[],  # Uses HTTP client, not a capability contract
        initialization_priority=3  # Initialize last
    ))
    
    # Initialize all capabilities in dependency order
    registry.initialize_all()
    
    return registry

if __name__ == "__main__":
    registry = initialize_edge_device()
    
    # Run until interrupted
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        registry.shutdown_all()

The registry ensures climate coordinator initializes after sensor gateway and fan controller because it depends on their contracts. If we accidentally introduced a circular dependency, perhaps by making sensor gateway depend on climate coordinator, the registry would detect and reject this during registration, not at runtime.

CORE CONCEPT FIVE: EVOLUTION ENVELOPES

Systems evolve over time. Requirements change. Technologies advance. Capability-Centric Architecture manages evolution through Evolution Envelopes, which specify versioning strategy, deprecation policies, and migration paths.

Each capability has an Evolution Envelope defining how it can change while maintaining compatibility. Let us examine the Evolution Envelope for our temperature sensor contract.

# evolution_envelope.py
# Manages capability evolution and versioning

from dataclasses import dataclass
from typing import List, Optional
from enum import Enum

class VersionCompatibility(Enum):
    COMPATIBLE = "compatible"
    DEPRECATED = "deprecated"
    INCOMPATIBLE = "incompatible"

@dataclass
class Version:
    major: int
    minor: int
    patch: int
    
    def __str__(self):
        return f"{self.major}.{self.minor}.{self.patch}"
    
    def is_compatible_with(self, required: 'Version') -> VersionCompatibility:
        """
        Check compatibility using semantic versioning rules.
        Major version must match. Minor version can be higher.
        """
        if self.major != required.major:
            return VersionCompatibility.INCOMPATIBLE
        
        if self.minor < required.minor:
            return VersionCompatibility.INCOMPATIBLE
        
        return VersionCompatibility.COMPATIBLE

@dataclass
class DeprecationInfo:
    feature_name: str
    deprecated_in_version: Version
    removal_planned_version: Version
    alternative: str
    migration_guide_url: str

@dataclass
class MigrationStep:
    from_version: Version
    to_version: Version
    description: str
    automated_tool_available: bool
    estimated_effort_hours: int

class EvolutionEnvelope:
    def __init__(
        self,
        capability_name: str,
        current_version: Version,
        supported_versions: List[Version],
        deprecations: List[DeprecationInfo],
        migration_paths: List[MigrationStep]
    ):
        self.capability_name = capability_name
        self.current_version = current_version
        self.supported_versions = supported_versions
        self.deprecations = deprecations
        self.migration_paths = migration_paths
    
    def is_version_supported(self, version: Version) -> bool:
        """
        Check if a specific version is still supported.
        """
        return version in self.supported_versions
    
    def get_deprecation_info(self, feature_name: str) -> Optional[DeprecationInfo]:
        """
        Get deprecation information for a specific feature.
        """
        for deprecation in self.deprecations:
            if deprecation.feature_name == feature_name:
                return deprecation
        return None
    
    def get_migration_path(
        self,
        from_version: Version,
        to_version: Version
    ) -> List[MigrationStep]:
        """
        Get migration steps from one version to another.
        """
        # Find direct migration path
        for step in self.migration_paths:
            if step.from_version == from_version and step.to_version == to_version:
                return [step]
        
        # Find indirect path through intermediate versions
        # Implementation would use graph search
        return []

# Example: Evolution Envelope for Temperature Sensor Contract
temperature_sensor_evolution = EvolutionEnvelope(
    capability_name="TemperatureSensor",
    current_version=Version(2, 1, 0),
    supported_versions=[
        Version(2, 0, 0),
        Version(2, 1, 0)
    ],
    deprecations=[
        DeprecationInfo(
            feature_name="get_raw_adc_value",
            deprecated_in_version=Version(2, 0, 0),
            removal_planned_version=Version(3, 0, 0),
            alternative="get_current_temperature",
            migration_guide_url="https://docs.example.com/migration/sensor-v2-to-v3"
        )
    ],
    migration_paths=[
        MigrationStep(
            from_version=Version(1, 0, 0),
            to_version=Version(2, 0, 0),
            description="Migrate from raw ADC interface to calibrated temperature interface",
            automated_tool_available=True,
            estimated_effort_hours=4
        )
    ]
)

The Evolution Envelope makes evolution explicit and manageable. When we introduce version 2.0.0 of the temperature sensor contract, we can maintain version 1.0.0 compatibility for a transition period. Consumers can migrate at their own pace. The envelope documents exactly what changed, how to migrate, and when old versions will be removed.

This is particularly important in IoT systems where updating firmware on thousands of deployed devices is expensive and risky. We need to support multiple versions simultaneously and provide clear migration paths.

RUNNING EXAMPLE PART ONE: IOT SENSOR NODE

Now we construct the complete running example, starting with an IoT sensor node. This node runs on a resource-constrained microcontroller, reads temperature from an ADC, and responds to queries from the edge device over a serial interface.

The sensor node demonstrates all CCA concepts in a bare-metal embedded context. We have the Capability Nucleus structure with Essence, Realization, and Adaptation. We implement the temperature sensor contract. We use efficiency gradients for interrupt handling versus normal processing. The complete implementation follows.

// sensor_node_main.cpp
// Complete IoT sensor node implementation using CCA

#include <stdint.h>
#include <string.h>

// ========== ESSENCE LAYER ==========
// Pure domain logic with no hardware dependencies

class TemperatureSensorEssence {
private:
    float offsetCelsius;
    float scaleFactor;
    
    static const int FILTER_SIZE = 8;
    float filterBuffer[FILTER_SIZE];
    int filterIndex;
    float filterSum;
    
public:
    TemperatureSensorEssence(float offset, float scale)
        : offsetCelsius(offset)
        , scaleFactor(scale)
        , filterIndex(0)
        , filterSum(0.0f) {
        for (int i = 0; i < FILTER_SIZE; i++) {
            filterBuffer[i] = 0.0f;
        }
    }
    
    float convertRawToTemperature(uint16_t rawValue) {
        // Convert ADC value to voltage (12-bit ADC, 3.3V reference)
        float voltage = (rawValue / 4095.0f) * 3.3f;
        
        // Convert voltage to temperature (TMP36 sensor formula)
        float tempCelsius = (voltage - 0.5f) * 100.0f;
        
        // Apply calibration
        tempCelsius = (tempCelsius * scaleFactor) + offsetCelsius;
        
        // Update moving average filter
        filterSum -= filterBuffer[filterIndex];
        filterBuffer[filterIndex] = tempCelsius;
        filterSum += tempCelsius;
        filterIndex = (filterIndex + 1) % FILTER_SIZE;
        
        return filterSum / FILTER_SIZE;
    }
    
    bool isValidReading(float temperature) {
        return temperature >= -40.0f && temperature <= 125.0f;
    }
    
    void updateCalibration(float offset, float scale) {
        offsetCelsius = offset;
        scaleFactor = scale;
    }
};

// ========== REALIZATION LAYER ==========
// Hardware integration for STM32F4 microcontroller

class TemperatureSensorRealization {
private:
    TemperatureSensorEssence* essence;
    
    // Hardware register addresses
    static const uint32_t RCC_BASE = 0x40023800;
    static const uint32_t RCC_APB2ENR = RCC_BASE + 0x44;
    
    static const uint32_t ADC1_BASE = 0x40012000;
    static const uint32_t ADC_SR = ADC1_BASE + 0x00;
    static const uint32_t ADC_CR1 = ADC1_BASE + 0x04;
    static const uint32_t ADC_CR2 = ADC1_BASE + 0x08;
    static const uint32_t ADC_SMPR2 = ADC1_BASE + 0x10;
    static const uint32_t ADC_SQR3 = ADC1_BASE + 0x34;
    static const uint32_t ADC_DR = ADC1_BASE + 0x4C;
    
    static const uint32_t GPIOA_BASE = 0x40020000;
    static const uint32_t GPIOA_MODER = GPIOA_BASE + 0x00;
    
    volatile uint32_t* rccApb2Enr;
    volatile uint32_t* adcSr;
    volatile uint32_t* adcCr1;
    volatile uint32_t* adcCr2;
    volatile uint32_t* adcSmpr2;
    volatile uint32_t* adcSqr3;
    volatile uint32_t* adcDr;
    volatile uint32_t* gpioaModer;
    
    volatile bool conversionComplete;
    volatile uint16_t latestRawValue;
    volatile float latestTemperature;
    volatile bool hasValidReading;
    
public:
    TemperatureSensorRealization(TemperatureSensorEssence* ess)
        : essence(ess)
        , conversionComplete(false)
        , latestRawValue(0)
        , latestTemperature(-273.15f)
        , hasValidReading(false) {
        
        // Map hardware registers
        rccApb2Enr = reinterpret_cast<volatile uint32_t*>(RCC_APB2ENR);
        adcSr = reinterpret_cast<volatile uint32_t*>(ADC_SR);
        adcCr1 = reinterpret_cast<volatile uint32_t*>(ADC_CR1);
        adcCr2 = reinterpret_cast<volatile uint32_t*>(ADC_CR2);
        adcSmpr2 = reinterpret_cast<volatile uint32_t*>(ADC_SMPR2);
        adcSqr3 = reinterpret_cast<volatile uint32_t*>(ADC_SQR3);
        adcDr = reinterpret_cast<volatile uint32_t*>(ADC_DR);
        gpioaModer = reinterpret_cast<volatile uint32_t*>(GPIOA_MODER);
    }
    
    void initialize() {
        // Enable ADC1 and GPIOA clocks
        *rccApb2Enr |= (1 << 8);  // ADC1 clock enable
        *rccApb2Enr |= (1 << 0);  // GPIOA clock enable
        
        // Configure PA0 as analog input
        *gpioaModer |= (3 << 0);  // Analog mode for PA0
        
        // Configure ADC
        *adcCr1 = 0;  // 12-bit resolution, single conversion
        *adcCr1 |= (1 << 5);  // Enable end-of-conversion interrupt
        
        *adcSmpr2 = (7 << 0);  // 480 cycles sample time for channel 0
        *adcSqr3 = 0;  // Channel 0 as first conversion
        
        // Enable ADC
        *adcCr2 |= (1 << 0);  // ADON bit
        
        // Wait for ADC to stabilize
        for (volatile int i = 0; i < 10000; i++);
    }
    
    void startConversion() {
        conversionComplete = false;
        *adcCr2 |= (1 << 30);  // Start conversion (SWSTART bit)
    }
    
    // CRITICAL PATH - Interrupt handler
    // Runs at highest efficiency gradient
    void adcInterruptHandler() {
        // Check end-of-conversion flag
        if (*adcSr & (1 << 1)) {
            // Read conversion result
            latestRawValue = static_cast<uint16_t>(*adcDr & 0xFFF);
            
            // Convert using Essence
            float temp = essence->convertRawToTemperature(latestRawValue);
            
            // Validate
            if (essence->isValidReading(temp)) {
                latestTemperature = temp;
                hasValidReading = true;
            } else {
                hasValidReading = false;
            }
            
            conversionComplete = true;
        }
    }
    
    bool getLatestTemperature(float* temperature) {
        if (!hasValidReading) {
            return false;
        }
        
        *temperature = latestTemperature;
        return true;
    }
    
    bool isConversionComplete() const {
        return conversionComplete;
    }
};

// ========== ADAPTATION LAYER ==========
// Serial communication interface for edge device queries

class TemperatureSensorAdaptation {
private:
    TemperatureSensorRealization* realization;
    
    // UART hardware registers
    static const uint32_t USART2_BASE = 0x40004400;
    static const uint32_t USART_SR = USART2_BASE + 0x00;
    static const uint32_t USART_DR = USART2_BASE + 0x04;
    static const uint32_t USART_BRR = USART2_BASE + 0x08;
    static const uint32_t USART_CR1 = USART2_BASE + 0x0C;
    
    volatile uint32_t* usartSr;
    volatile uint32_t* usartDr;
    volatile uint32_t* usartBrr;
    volatile uint32_t* usartCr1;
    
    char rxBuffer[64];
    int rxIndex;
    
public:
    TemperatureSensorAdaptation(TemperatureSensorRealization* real)
        : realization(real)
        , rxIndex(0) {
        
        usartSr = reinterpret_cast<volatile uint32_t*>(USART_SR);
        usartDr = reinterpret_cast<volatile uint32_t*>(USART_DR);
        usartBrr = reinterpret_cast<volatile uint32_t*>(USART_BRR);
        usartCr1 = reinterpret_cast<volatile uint32_t*>(USART_CR1);
    }
    
    void initialize() {
        // Enable USART2 clock
        volatile uint32_t* rccApb1Enr = reinterpret_cast<volatile uint32_t*>(0x40023840);
        *rccApb1Enr |= (1 << 17);
        
        // Configure USART: 115200 baud, 8N1
        *usartBrr = 0x683;  // For 16MHz clock
        *usartCr1 = (1 << 13) | (1 << 3) | (1 << 2);  // UE, TE, RE
    }
    
    void sendByte(uint8_t byte) {
        while (!(*usartSr & (1 << 7)));  // Wait for TXE
        *usartDr = byte;
    }
    
    void sendString(const char* str) {
        while (*str) {
            sendByte(*str++);
        }
    }
    
    void sendFloat(float value) {
        char buffer[16];
        int intPart = static_cast<int>(value);
        int fracPart = static_cast<int>((value - intPart) * 100);
        if (fracPart < 0) fracPart = -fracPart;
        
        sprintf(buffer, "%d.%02d", intPart, fracPart);
        sendString(buffer);
    }
    
    void processCommand() {
        // Check for received data
        if (*usartSr & (1 << 5)) {  // RXNE flag
            char received = static_cast<char>(*usartDr);
            
            if (received == '\n' || received == '\r') {
                rxBuffer[rxIndex] = '\0';
                handleCommand(rxBuffer);
                rxIndex = 0;
            } else if (rxIndex < 63) {
                rxBuffer[rxIndex++] = received;
            }
        }
    }
    
    void handleCommand(const char* command) {
        if (strcmp(command, "TEMP?") == 0) {
            // Query temperature
            float temperature;
            if (realization->getLatestTemperature(&temperature)) {
                sendString("TEMP:");
                sendFloat(temperature);
                sendString("\r\n");
            } else {
                sendString("ERROR:NO_READING\r\n");
            }
        } else if (strncmp(command, "CAL:", 4) == 0) {
            // Calibration command: CAL:offset,scale
            float offset, scale;
            if (sscanf(command + 4, "%f,%f", &offset, &scale) == 2) {
                // Would update calibration through Essence
                sendString("OK\r\n");
            } else {
                sendString("ERROR:INVALID_FORMAT\r\n");
            }
        } else {
            sendString("ERROR:UNKNOWN_COMMAND\r\n");
        }
    }
};

// ========== CAPABILITY INTEGRATION ==========

class TemperatureSensorCapability {
private:
    TemperatureSensorEssence essence;
    TemperatureSensorRealization realization;
    TemperatureSensorAdaptation adaptation;
    
    // Periodic sampling timer
    static const uint32_t TIM2_BASE = 0x40000000;
    volatile uint32_t* tim2Cr1;
    volatile uint32_t* tim2Psc;
    volatile uint32_t* tim2Arr;
    
public:
    TemperatureSensorCapability()
        : essence(0.0f, 1.0f)  // Default calibration
        , realization(&essence)
        , adaptation(&realization) {
        
        tim2Cr1 = reinterpret_cast<volatile uint32_t*>(TIM2_BASE + 0x00);
        tim2Psc = reinterpret_cast<volatile uint32_t*>(TIM2_BASE + 0x28);
        tim2Arr = reinterpret_cast<volatile uint32_t*>(TIM2_BASE + 0x2C);
    }
    
    void initialize() {
        realization.initialize();
        adaptation.initialize();
        
        // Configure timer for 1Hz sampling
        volatile uint32_t* rccApb1Enr = reinterpret_cast<volatile uint32_t*>(0x40023840);
        *rccApb1Enr |= (1 << 0);  // TIM2 clock enable
        
        *tim2Psc = 16000 - 1;  // Prescaler for 1kHz
        *tim2Arr = 1000 - 1;   // Auto-reload for 1Hz
        *tim2Cr1 |= (1 << 0);  // Enable timer
    }
    
    void run() {
        while (true) {
            // Check if sampling interval elapsed
            volatile uint32_t* tim2Sr = reinterpret_cast<volatile uint32_t*>(TIM2_BASE + 0x10);
            if (*tim2Sr & (1 << 0)) {  // Update interrupt flag
                *tim2Sr &= ~(1 << 0);  // Clear flag
                
                // Start new temperature conversion
                realization.startConversion();
            }
            
            // Process serial commands
            adaptation.processCommand();
            
            // Could enter low-power mode here
        }
    }
    
    // Called from ADC interrupt
    void adcIsr() {
        realization.adcInterruptHandler();
    }
};

// ========== MAIN PROGRAM ==========

TemperatureSensorCapability* sensorCapability = nullptr;

extern "C" void ADC_IRQHandler() {
    if (sensorCapability) {
        sensorCapability->adcIsr();
    }
}

int main() {
    // Create and initialize capability
    TemperatureSensorCapability capability;
    sensorCapability = &capability;
    
    capability.initialize();
    
    // Enable ADC interrupt in NVIC
    volatile uint32_t* nvicIser0 = reinterpret_cast<volatile uint32_t*>(0xE000E100);
    *nvicIser0 |= (1 << 18);  // Enable ADC IRQ
    
    // Run main loop
    capability.run();
    
    return 0;
}

This complete sensor node implementation demonstrates CCA principles in bare-metal embedded code. The Essence contains pure temperature conversion and filtering logic, completely testable without hardware. The Realization integrates with STM32 ADC peripherals using direct register access for maximum efficiency. The Adaptation provides a serial command interface for the edge device.

Notice the efficiency gradients. The ADC interrupt handler operates at the highest gradient with minimal overhead. The main loop operates at a medium gradient, checking timers and processing commands. The command handling operates at the lowest gradient, freely using string operations and formatting.

RUNNING EXAMPLE PART TWO: IOT FAN ACTUATOR NODE

The fan actuator node controls cooling fans through PWM output. It receives speed commands from the edge device and reports operational status. The implementation follows the same CCA structure as the sensor node but demonstrates different concerns like safety-critical emergency stop and smooth speed ramping.

// fan_actuator_node_main.cpp
// Complete IoT fan actuator node implementation using CCA

#include <stdint.h>
#include <string.h>

// ========== ESSENCE LAYER ==========

class FanControlEssence {
private:
    int targetSpeed;      // 0-100 percent
    int currentSpeed;     // 0-100 percent
    int rampRate;         // Speed change per update (percent/second)
    
    unsigned long totalRuntimeSeconds;
    bool emergencyStopActive;
    
public:
    FanControlEssence()
        : targetSpeed(0)
        , currentSpeed(0)
        , rampRate(10)  // 10% per second default ramp
        , totalRuntimeSeconds(0)
        , emergencyStopActive(false) {}
    
    // Calculate next speed value with ramping
    int calculateNextSpeed() {
        if (emergencyStopActive) {
            currentSpeed = 0;
            targetSpeed = 0;
            return 0;
        }
        
        if (currentSpeed < targetSpeed) {
            currentSpeed += rampRate;
            if (currentSpeed > targetSpeed) {
                currentSpeed = targetSpeed;
            }
        } else if (currentSpeed > targetSpeed) {
            currentSpeed -= rampRate;
            if (currentSpeed < targetSpeed) {
                currentSpeed = targetSpeed;
            }
        }
        
        return currentSpeed;
    }
    
    bool setTargetSpeed(int speed) {
        if (speed < 0 || speed > 100) {
            return false;
        }
        
        if (emergencyStopActive && speed > 0) {
            return false;  // Cannot set speed during emergency stop
        }
        
        targetSpeed = speed;
        return true;
    }
    
    void activateEmergencyStop() {
        emergencyStopActive = true;
        targetSpeed = 0;
        currentSpeed = 0;
    }
    
    void clearEmergencyStop() {
        emergencyStopActive = false;
    }
    
    void incrementRuntime() {
        if (currentSpeed > 0) {
            totalRuntimeSeconds++;
        }
    }
    
    int getCurrentSpeed() const { return currentSpeed; }
    int getTargetSpeed() const { return targetSpeed; }
    unsigned long getRuntimeHours() const { return totalRuntimeSeconds / 3600; }
    bool isEmergencyStopped() const { return emergencyStopActive; }
    bool isAtTarget() const { return currentSpeed == targetSpeed; }
};

// ========== REALIZATION LAYER ==========

class FanControlRealization {
private:
    FanControlEssence* essence;
    
    // Timer 3 for PWM generation
    static const uint32_t TIM3_BASE = 0x40000400;
    static const uint32_t TIM_CR1 = TIM3_BASE + 0x00;
    static const uint32_t TIM_CCMR1 = TIM3_BASE + 0x18;
    static const uint32_t TIM_CCER = TIM3_BASE + 0x20;
    static const uint32_t TIM_PSC = TIM3_BASE + 0x28;
    static const uint32_t TIM_ARR = TIM3_BASE + 0x2C;
    static const uint32_t TIM_CCR1 = TIM3_BASE + 0x34;
    
    volatile uint32_t* timCr1;
    volatile uint32_t* timCcmr1;
    volatile uint32_t* timCcer;
    volatile uint32_t* timPsc;
    volatile uint32_t* timArr;
    volatile uint32_t* timCcr1;
    
    static const int PWM_FREQUENCY = 25000;  // 25kHz
    static const int PWM_PERIOD = 1000;      // Timer counts
    
public:
    FanControlRealization(FanControlEssence* ess)
        : essence(ess) {
        
        timCr1 = reinterpret_cast<volatile uint32_t*>(TIM_CR1);
        timCcmr1 = reinterpret_cast<volatile uint32_t*>(TIM_CCMR1);
        timCcer = reinterpret_cast<volatile uint32_t*>(TIM_CCER);
        timPsc = reinterpret_cast<volatile uint32_t*>(TIM_PSC);
        timArr = reinterpret_cast<volatile uint32_t*>(TIM_ARR);
        timCcr1 = reinterpret_cast<volatile uint32_t*>(TIM_CCR1);
    }
    
    void initialize() {
        // Enable TIM3 clock
        volatile uint32_t* rccApb1Enr = reinterpret_cast<volatile uint32_t*>(0x40023840);
        *rccApb1Enr |= (1 << 1);
        
        // Configure GPIO for PWM output (PC6 = TIM3_CH1)
        volatile uint32_t* rccAhb1Enr = reinterpret_cast<volatile uint32_t*>(0x40023830);
        *rccAhb1Enr |= (1 << 2);  // GPIOC clock
        
        volatile uint32_t* gpiocModer = reinterpret_cast<volatile uint32_t*>(0x40020800);
        volatile uint32_t* gpiocAfrl = reinterpret_cast<volatile uint32_t*>(0x40020820);
        *gpiocModer |= (2 << 12);  // Alternate function for PC6
        *gpiocAfrl |= (2 << 24);   // AF2 (TIM3) for PC6
        
        // Configure PWM
        *timPsc = 0;  // No prescaler for high frequency
        *timArr = PWM_PERIOD - 1;
        
        // PWM mode 1 on channel 1
        *timCcmr1 = (6 << 4) | (1 << 3);  // OC1M = PWM mode 1, OC1PE = preload enable
        *timCcer = (1 << 0);  // CC1E = enable output
        
        *timCcr1 = 0;  // Start with 0% duty cycle
        
        // Enable timer
        *timCr1 = (1 << 0);
    }
    
    // CRITICAL PATH - Update PWM duty cycle
    // Called from timer interrupt at 100Hz for smooth ramping
    void updatePwmDutyCycle() {
        int speed = essence->calculateNextSpeed();
        
        // Convert percentage to timer counts
        int dutyCycle = (speed * PWM_PERIOD) / 100;
        
        // Write to compare register
        *timCcr1 = dutyCycle;
        
        // Update runtime counter
        essence->incrementRuntime();
    }
    
    // SAFETY-CRITICAL - Emergency stop
    void emergencyStop() {
        // Disable interrupts for atomic operation
        __disable_irq();
        
        essence->activateEmergencyStop();
        *timCcr1 = 0;  // Immediately stop PWM
        
        __enable_irq();
    }
    
    void clearEmergencyStop() {
        essence->clearEmergencyStop();
    }
};

// ========== ADAPTATION LAYER ==========

class FanControlAdaptation {
private:
    FanControlRealization* realization;
    FanControlEssence* essence;
    
    // UART for communication
    static const uint32_t USART2_BASE = 0x40004400;
    volatile uint32_t* usartSr;
    volatile uint32_t* usartDr;
    volatile uint32_t* usartBrr;
    volatile uint32_t* usartCr1;
    
    char rxBuffer[64];
    int rxIndex;
    
public:
    FanControlAdaptation(FanControlRealization* real, FanControlEssence* ess)
        : realization(real)
        , essence(ess)
        , rxIndex(0) {
        
        usartSr = reinterpret_cast<volatile uint32_t*>(USART2_BASE + 0x00);
        usartDr = reinterpret_cast<volatile uint32_t*>(USART2_BASE + 0x04);
        usartBrr = reinterpret_cast<volatile uint32_t*>(USART2_BASE + 0x08);
        usartCr1 = reinterpret_cast<volatile uint32_t*>(USART2_BASE + 0x0C);
    }
    
    void initialize() {
        // Enable USART2 clock
        volatile uint32_t* rccApb1Enr = reinterpret_cast<volatile uint32_t*>(0x40023840);
        *rccApb1Enr |= (1 << 17);
        
        // Configure USART: 115200 baud, 8N1
        *usartBrr = 0x683;
        *usartCr1 = (1 << 13) | (1 << 3) | (1 << 2);
    }
    
    void sendByte(uint8_t byte) {
        while (!(*usartSr & (1 << 7)));
        *usartDr = byte;
    }
    
    void sendString(const char* str) {
        while (*str) {
            sendByte(*str++);
        }
    }
    
    void sendInt(int value) {
        char buffer[12];
        sprintf(buffer, "%d", value);
        sendString(buffer);
    }
    
    void processCommand() {
        if (*usartSr & (1 << 5)) {
            char received = static_cast<char>(*usartDr);
            
            if (received == '\n' || received == '\r') {
                rxBuffer[rxIndex] = '\0';
                handleCommand(rxBuffer);
                rxIndex = 0;
            } else if (rxIndex < 63) {
                rxBuffer[rxIndex++] = received;
            }
        }
    }
    
    void handleCommand(const char* command) {
        if (strcmp(command, "STATUS?") == 0) {
            // Report status
            sendString("SPEED:");
            sendInt(essence->getCurrentSpeed());
            sendString(",TARGET:");
            sendInt(essence->getTargetSpeed());
            sendString(",RUNTIME:");
            sendInt(essence->getRuntimeHours());
            sendString(",ESTOP:");
            sendString(essence->isEmergencyStopped() ? "1" : "0");
            sendString("\r\n");
            
        } else if (strncmp(command, "SPEED:", 6) == 0) {
            // Set speed command
            int speed;
            if (sscanf(command + 6, "%d", &speed) == 1) {
                if (essence->setTargetSpeed(speed)) {
                    sendString("OK\r\n");
                } else {
                    sendString("ERROR:INVALID_SPEED\r\n");
                }
            } else {
                sendString("ERROR:INVALID_FORMAT\r\n");
            }
            
        } else if (strcmp(command, "ESTOP") == 0) {
            // Emergency stop
            realization->emergencyStop();
            sendString("OK:ESTOP_ACTIVE\r\n");
            
        } else if (strcmp(command, "ESTOP_CLEAR") == 0) {
            // Clear emergency stop
            realization->clearEmergencyStop();
            sendString("OK:ESTOP_CLEARED\r\n");
            
        } else {
            sendString("ERROR:UNKNOWN_COMMAND\r\n");
        }
    }
};

// ========== CAPABILITY INTEGRATION ==========

class FanActuatorCapability {
private:
    FanControlEssence essence;
    FanControlRealization realization;
    FanControlAdaptation adaptation;
    
    // Timer for periodic updates
    static const uint32_t TIM2_BASE = 0x40000000;
    volatile uint32_t* tim2Cr1;
    volatile uint32_t* tim2Psc;
    volatile uint32_t* tim2Arr;
    volatile uint32_t* tim2Sr;
    
public:
    FanActuatorCapability()
        : essence()
        , realization(&essence)
        , adaptation(&realization, &essence) {
        
        tim2Cr1 = reinterpret_cast<volatile uint32_t*>(TIM2_BASE + 0x00);
        tim2Psc = reinterpret_cast<volatile uint32_t*>(TIM2_BASE + 0x28);
        tim2Arr = reinterpret_cast<volatile uint32_t*>(TIM2_BASE + 0x2C);
        tim2Sr = reinterpret_cast<volatile uint32_t*>(TIM2_BASE + 0x10);
    }
    
    void initialize() {
        realization.initialize();
        adaptation.initialize();
        
        // Configure timer for 100Hz updates
        volatile uint32_t* rccApb1Enr = reinterpret_cast<volatile uint32_t*>(0x40023840);
        *rccApb1Enr |= (1 << 0);
        
        *tim2Psc = 16000 - 1;  // 1kHz
        *tim2Arr = 10 - 1;     // 100Hz
        *tim2Cr1 |= (1 << 0);
    }
    
    void run() {
        while (true) {
            // Check for update interval
            if (*tim2Sr & (1 << 0)) {
                *tim2Sr &= ~(1 << 0);
                
                // Update PWM duty cycle with ramping
                realization.updatePwmDutyCycle();
            }
            
            // Process serial commands
            adaptation.processCommand();
        }
    }
};

// ========== MAIN PROGRAM ==========

int main() {
    FanActuatorCapability capability;
    
    capability.initialize();
    capability.run();
    
    return 0;
}

The fan actuator demonstrates safety-critical operation through the emergency stop function. This disables interrupts to ensure atomic execution, immediately zeroes the PWM output, and sets a flag preventing further speed commands until explicitly cleared. This is production-ready safety logic, not a simplified example.

The speed ramping in the Essence prevents abrupt fan speed changes that could cause mechanical stress or acoustic noise. The ramp rate is configurable, demonstrating how domain logic in the Essence can be tuned without changing the Realization or Adaptation layers.

RUNNING EXAMPLE PART THREE: EDGE DEVICE

The edge device coordinates multiple sensors and fans within a room. It implements the climate control logic, calculating average temperatures and making fan control decisions. This runs on embedded Linux with Python, demonstrating CCA at a different point in the embedded-to-enterprise spectrum.

# edge_device_main.py
# Complete edge device implementation using CCA

import time
import serial
import threading
import json
import requests
from typing import List, Optional, Dict
from dataclasses import dataclass, asdict
from abc import ABC, abstractmethod
from enum import Enum

# ========== CONTRACTS ==========

class ReadingQuality(Enum):
    EXCELLENT = "excellent"
    GOOD = "good"
    POOR = "poor"
    INVALID = "invalid"

@dataclass
class TemperatureReading:
    celsius: float
    timestamp: float
    quality: ReadingQuality
    sensor_id: str

@dataclass
class FanStatus:
    current_speed: int
    target_speed: int
    is_operational: bool
    fan_id: str
    runtime_hours: float
    emergency_stopped: bool

class TemperatureSensorContract(ABC):
    @abstractmethod
    def get_current_temperature(self) -> Optional[TemperatureReading]:
        pass
    
    @abstractmethod
    def is_available(self) -> bool:
        pass

class FanActuatorContract(ABC):
    @abstractmethod
    def set_speed(self, speed_percent: int) -> bool:
        pass
    
    @abstractmethod
    def get_status(self) -> FanStatus:
        pass
    
    @abstractmethod
    def emergency_stop(self) -> None:
        pass

# ========== SENSOR GATEWAY CAPABILITY ==========

class SensorGatewayCapability(TemperatureSensorContract):
    """
    Gateway to IoT sensor nodes via serial communication.
    Implements TemperatureSensorContract by aggregating multiple physical sensors.
    """
    
    def __init__(self, sensor_configs: List[Dict]):
        self.sensors = []
        self.lock = threading.Lock()
        self.latest_readings = {}
        
        for config in sensor_configs:
            self.sensors.append({
                'id': config['id'],
                'port': config['port'],
                'serial': None,
                'available': False
            })
    
    def initialize(self):
        """Initialize serial connections to all sensor nodes."""
        for sensor in self.sensors:
            try:
                sensor['serial'] = serial.Serial(
                    sensor['port'],
                    baudrate=115200,
                    timeout=0.1
                )
                sensor['available'] = True
                print(f"Sensor {sensor['id']} connected on {sensor['port']}")
            except Exception as e:
                print(f"Failed to connect sensor {sensor['id']}: {e}")
                sensor['available'] = False
    
    def start(self):
        """Start background thread for periodic sensor polling."""
        self.running = True
        self.poll_thread = threading.Thread(target=self._poll_sensors)
        self.poll_thread.daemon = True
        self.poll_thread.start()
    
    def stop(self):
        """Stop polling and close connections."""
        self.running = False
        if hasattr(self, 'poll_thread'):
            self.poll_thread.join(timeout=2.0)
        
        for sensor in self.sensors:
            if sensor['serial']:
                sensor['serial'].close()
    
    def _poll_sensors(self):
        """Background thread that polls all sensors periodically."""
        while self.running:
            for sensor in self.sensors:
                if sensor['available'] and sensor['serial']:
                    try:
                        # Send temperature query command
                        sensor['serial'].write(b'TEMP?\r\n')
                        
                        # Read response with timeout
                        response = sensor['serial'].readline().decode('utf-8').strip()
                        
                        if response.startswith('TEMP:'):
                            temp_str = response[5:]
                            temperature = float(temp_str)
                            
                            reading = TemperatureReading(
                                celsius=temperature,
                                timestamp=time.time(),
                                quality=ReadingQuality.GOOD,
                                sensor_id=sensor['id']
                            )
                            
                            with self.lock:
                                self.latest_readings[sensor['id']] = reading
                        
                    except Exception as e:
                        print(f"Error reading sensor {sensor['id']}: {e}")
                        sensor['available'] = False
            
            time.sleep(1.0)  # Poll every second
    
    def get_current_temperature(self) -> Optional[TemperatureReading]:
        """
        Get average temperature from all available sensors.
        Implements TemperatureSensorContract.
        """
        with self.lock:
            if not self.latest_readings:
                return None
            
            # Calculate average temperature
            total = sum(r.celsius for r in self.latest_readings.values())
            avg_temp = total / len(self.latest_readings)
            
            return TemperatureReading(
                celsius=avg_temp,
                timestamp=time.time(),
                quality=ReadingQuality.GOOD,
                sensor_id="average"
            )
    
    def get_all_readings(self) -> List[TemperatureReading]:
        """Get individual readings from all sensors."""
        with self.lock:
            return list(self.latest_readings.values())
    
    def is_available(self) -> bool:
        """Check if at least one sensor is available."""
        return any(s['available'] for s in self.sensors)

# ========== FAN CONTROLLER CAPABILITY ==========

class FanControllerCapability(FanActuatorContract):
    """
    Controller for multiple fan actuator nodes.
    Implements FanActuatorContract by coordinating multiple physical fans.
    """
    
    def __init__(self, fan_configs: List[Dict]):
        self.fans = []
        self.lock = threading.Lock()
        
        for config in fan_configs:
            self.fans.append({
                'id': config['id'],
                'port': config['port'],
                'serial': None,
                'available': False,
                'current_speed': 0,
                'target_speed': 0
            })
    
    def initialize(self):
        """Initialize serial connections to all fan nodes."""
        for fan in self.fans:
            try:
                fan['serial'] = serial.Serial(
                    fan['port'],
                    baudrate=115200,
                    timeout=0.1
                )
                fan['available'] = True
                print(f"Fan {fan['id']} connected on {fan['port']}")
            except Exception as e:
                print(f"Failed to connect fan {fan['id']}: {e}")
                fan['available'] = False
    
    def start(self):
        """Start background thread for status monitoring."""
        self.running = True
        self.monitor_thread = threading.Thread(target=self._monitor_fans)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop(self):
        """Stop all fans and close connections."""
        self.running = False
        if hasattr(self, 'monitor_thread'):
            self.monitor_thread.join(timeout=2.0)
        
        # Stop all fans before closing
        for fan in self.fans:
            if fan['available'] and fan['serial']:
                try:
                    fan['serial'].write(b'SPEED:0\r\n')
                except:
                    pass
                fan['serial'].close()
    
    def _monitor_fans(self):
        """Background thread that monitors fan status."""
        while self.running:
            for fan in self.fans:
                if fan['available'] and fan['serial']:
                    try:
                        # Query fan status
                        fan['serial'].write(b'STATUS?\r\n')
                        response = fan['serial'].readline().decode('utf-8').strip()
                        
                        if response.startswith('SPEED:'):
                            # Parse status response
                            parts = response.split(',')
                            for part in parts:
                                if part.startswith('SPEED:'):
                                    fan['current_speed'] = int(part[6:])
                                elif part.startswith('TARGET:'):
                                    fan['target_speed'] = int(part[7:])
                        
                    except Exception as e:
                        print(f"Error monitoring fan {fan['id']}: {e}")
            
            time.sleep(2.0)  # Monitor every 2 seconds
    
    def set_speed(self, speed_percent: int) -> bool:
        """
        Set speed for all fans.
        Implements FanActuatorContract.
        """
        if speed_percent < 0 or speed_percent > 100:
            return False
        
        success = True
        for fan in self.fans:
            if fan['available'] and fan['serial']:
                try:
                    command = f'SPEED:{speed_percent}\r\n'.encode('utf-8')
                    fan['serial'].write(command)
                    
                    response = fan['serial'].readline().decode('utf-8').strip()
                    if response != 'OK':
                        success = False
                    else:
                        fan['target_speed'] = speed_percent
                        
                except Exception as e:
                    print(f"Error setting fan {fan['id']} speed: {e}")
                    success = False
        
        return success
    
    def get_status(self) -> FanStatus:
        """
        Get aggregated status of all fans.
        Implements FanActuatorContract.
        """
        with self.lock:
            if not self.fans:
                return FanStatus(0, 0, False, "none", 0.0, False)
            
            # Calculate average speed
            available_fans = [f for f in self.fans if f['available']]
            if not available_fans:
                return FanStatus(0, 0, False, "none", 0.0, False)
            
            avg_speed = sum(f['current_speed'] for f in available_fans) / len(available_fans)
            avg_target = sum(f['target_speed'] for f in available_fans) / len(available_fans)
            
            return FanStatus(
                current_speed=int(avg_speed),
                target_speed=int(avg_target),
                is_operational=True,
                fan_id="aggregate",
                runtime_hours=0.0,  # Would aggregate from individual fans
                emergency_stopped=False
            )
    
    def get_all_statuses(self) -> List[Dict]:
        """Get individual status for each fan."""
        statuses = []
        for fan in self.fans:
            statuses.append({
                'id': fan['id'],
                'available': fan['available'],
                'current_speed': fan['current_speed'],
                'target_speed': fan['target_speed']
            })
        return statuses
    
    def emergency_stop(self) -> None:
        """
        Emergency stop all fans.
        Implements FanActuatorContract.
        """
        for fan in self.fans:
            if fan['available'] and fan['serial']:
                try:
                    fan['serial'].write(b'ESTOP\r\n')
                except Exception as e:
                    print(f"Error emergency stopping fan {fan['id']}: {e}")

# ========== CLIMATE COORDINATOR CAPABILITY ==========

class ClimateCoordinatorEssence:
    """
    Pure climate control logic - no I/O dependencies.
    This is the Essence layer containing domain logic.
    """
    
    def __init__(self, desired_temp: float, hysteresis: float = 2.0):
        self.desired_temperature = desired_temp
        self.hysteresis = hysteresis
        self.control_enabled = True
    
    def calculate_required_fan_speed(
        self,
        current_temp: float,
        current_fan_speed: int
    ) -> int:
        """
        Calculate required fan speed based on temperature error.
        Pure logic - completely testable without hardware.
        """
        if not self.control_enabled:
            return 0
        
        error = current_temp - self.desired_temperature
        
        # Dead band to prevent oscillation
        if abs(error) < self.hysteresis / 2:
            return current_fan_speed  # Maintain current speed
        
        if error < -self.hysteresis:
            # Too cold - turn off fans
            return 0
        elif error < 0:
            # Slightly cool - reduce fans gradually
            return max(0, current_fan_speed - 20)
        elif error < self.hysteresis:
            # Slightly warm - increase fans gradually
            return min(100, current_fan_speed + 20)
        elif error < self.hysteresis * 2:
            # Warm - run fans at medium speed
            return 66
        else:
            # Hot - run fans at maximum speed
            return 100
    
    def should_send_alarm(
        self,
        current_temp: float,
        current_fan_speed: int
    ) -> Optional[str]:
        """
        Determine if an alarm condition exists.
        Returns alarm message if alarm needed, None otherwise.
        """
        error = current_temp - self.desired_temperature
        
        if error > self.hysteresis * 2 and current_fan_speed >= 100:
            return f"ALARM: Temperature {current_temp:.1f}°C exceeds target by {error:.1f}°C with fans at maximum"
        
        if error < -self.hysteresis * 2 and current_fan_speed == 0:
            return f"ALARM: Temperature {current_temp:.1f}°C below target by {abs(error):.1f}°C with fans off"
        
        return None
    
    def set_desired_temperature(self, temp: float) -> bool:
        """Update desired temperature setpoint."""
        if temp < 10.0 or temp > 35.0:
            return False
        
        self.desired_temperature = temp
        return True

class ClimateCoordinatorCapability:
    """
    Coordinates temperature sensors and fan actuators to maintain desired climate.
    This is the complete capability integrating Essence with infrastructure.
    """
    
    def __init__(self, room_id: str, desired_temp: float):
        self.room_id = room_id
        self.essence = ClimateCoordinatorEssence(desired_temp)
        
        # Dependencies injected through contracts
        self.temperature_sensor: Optional[TemperatureSensorContract] = None
        self.fan_actuator: Optional[FanActuatorContract] = None
        
        self.running = False
        self.alarm_callback = None
    
    def inject_dependency(self, contract_type, implementation):
        """Dependency injection for required contracts."""
        if contract_type == TemperatureSensorContract:
            self.temperature_sensor = implementation
        elif contract_type == FanActuatorContract:
            self.fan_actuator = implementation
    
    def set_alarm_callback(self, callback):
        """Set callback for alarm notifications."""
        self.alarm_callback = callback
    
    def initialize(self):
        """Initialize capability."""
        if not self.temperature_sensor:
            raise RuntimeError("Temperature sensor contract not injected")
        if not self.fan_actuator:
            raise RuntimeError("Fan actuator contract not injected")
        
        print(f"Climate coordinator initialized for room {self.room_id}")
    
    def start(self):
        """Start climate control loop."""
        self.running = True
        self.control_thread = threading.Thread(target=self._control_loop)
        self.control_thread.daemon = True
        self.control_thread.start()
    
    def stop(self):
        """Stop climate control."""
        self.running = False
        if hasattr(self, 'control_thread'):
            self.control_thread.join(timeout=2.0)
    
    def _control_loop(self):
        """Main climate control loop - runs periodically."""
        while self.running:
            try:
                # Get current temperature
                temp_reading = self.temperature_sensor.get_current_temperature()
                if not temp_reading:
                    time.sleep(5.0)
                    continue
                
                # Get current fan status
                fan_status = self.fan_actuator.get_status()
                
                # Calculate required fan speed using Essence
                required_speed = self.essence.calculate_required_fan_speed(
                    temp_reading.celsius,
                    fan_status.current_speed
                )
                
                # Set fan speed if different from current target
                if required_speed != fan_status.target_speed:
                    self.fan_actuator.set_speed(required_speed)
                    print(f"Room {self.room_id}: Temp={temp_reading.celsius:.1f}°C, "
                          f"Setting fans to {required_speed}%")
                
                # Check for alarm conditions
                alarm_msg = self.essence.should_send_alarm(
                    temp_reading.celsius,
                    fan_status.current_speed
                )
                
                if alarm_msg and self.alarm_callback:
                    self.alarm_callback(self.room_id, alarm_msg, temp_reading.celsius)
                
            except Exception as e:
                print(f"Error in climate control loop: {e}")
            
            time.sleep(5.0)  # Control loop runs every 5 seconds
    
    def set_desired_temperature(self, temp: float) -> bool:
        """Update desired temperature setpoint."""
        return self.essence.set_desired_temperature(temp)

# ========== ENTERPRISE REPORTER CAPABILITY ==========

class EnterpriseReporterCapability:
    """
    Reports room climate data to enterprise application.
    Sends periodic updates and alarm notifications.
    """
    
    def __init__(self, room_id: str, enterprise_url: str):
        self.room_id = room_id
        self.enterprise_url = enterprise_url
        self.running = False
        
        # Will be populated by climate coordinator
        self.get_sensor_readings = None
        self.get_fan_statuses = None
    
    def initialize(self):
        """Initialize reporter."""
        print(f"Enterprise reporter initialized for room {self.room_id}")
    
    def start(self):
        """Start periodic reporting."""
        self.running = True
        self.report_thread = threading.Thread(target=self._reporting_loop)
        self.report_thread.daemon = True
        self.report_thread.start()
    
    def stop(self):
        """Stop reporting."""
        self.running = False
        if hasattr(self, 'report_thread'):
            self.report_thread.join(timeout=2.0)
    
    def _reporting_loop(self):
        """Periodically send data to enterprise application."""
        while self.running:
            try:
                if self.get_sensor_readings and self.get_fan_statuses:
                    # Gather data
                    sensor_readings = self.get_sensor_readings()
                    fan_statuses = self.get_fan_statuses()
                    
                    # Calculate average temperature
                    if sensor_readings:
                        avg_temp = sum(r.celsius for r in sensor_readings) / len(sensor_readings)
                    else:
                        avg_temp = None
                    
                    # Prepare report
                    report = {
                        'room_id': self.room_id,
                        'timestamp': time.time(),
                        'average_temperature': avg_temp,
                        'sensors': [
                            {
                                'id': r.sensor_id,
                                'temperature': r.celsius,
                                'quality': r.quality.value
                            }
                            for r in sensor_readings
                        ],
                        'fans': fan_statuses
                    }
                    
                    # Send to enterprise application
                    try:
                        response = requests.post(
                            f"{self.enterprise_url}/api/rooms/{self.room_id}/data",
                            json=report,
                            timeout=5.0
                        )
                        
                        if response.status_code == 200:
                            print(f"Sent report for room {self.room_id}")
                        else:
                            print(f"Enterprise app returned status {response.status_code}")
                    
                    except requests.exceptions.RequestException as e:
                        print(f"Failed to send report: {e}")
                
            except Exception as e:
                print(f"Error in reporting loop: {e}")
            
            time.sleep(30.0)  # Report every 30 seconds
    
    def send_alarm(self, alarm_message: str, temperature: float):
        """Send immediate alarm notification to enterprise application."""
        try:
            alarm_data = {
                'room_id': self.room_id,
                'timestamp': time.time(),
                'alarm_type': 'temperature',
                'message': alarm_message,
                'current_temperature': temperature
            }
            
            response = requests.post(
                f"{self.enterprise_url}/api/alarms",
                json=alarm_data,
                timeout=5.0
            )
            
            if response.status_code == 200:
                print(f"Sent alarm for room {self.room_id}: {alarm_message}")
        
        except Exception as e:
            print(f"Failed to send alarm: {e}")

# ========== MAIN EDGE DEVICE APPLICATION ==========

def main():
    # Configuration for this edge device
    ROOM_ID = "room_101"
    DESIRED_TEMP = 22.0  # Celsius
    ENTERPRISE_URL = "http://enterprise-app:8080"
    
    # Sensor and fan configurations
    sensor_configs = [
        {'id': 'sensor_1', 'port': '/dev/ttyUSB0'},
        {'id': 'sensor_2', 'port': '/dev/ttyUSB1'},
        {'id': 'sensor_3', 'port': '/dev/ttyUSB2'}
    ]
    
    fan_configs = [
        {'id': 'fan_1', 'port': '/dev/ttyUSB3'},
        {'id': 'fan_2', 'port': '/dev/ttyUSB4'}
    ]
    
    # Create capabilities
    sensor_gateway = SensorGatewayCapability(sensor_configs)
    fan_controller = FanControllerCapability(fan_configs)
    climate_coordinator = ClimateCoordinatorCapability(ROOM_ID, DESIRED_TEMP)
    enterprise_reporter = EnterpriseReporterCapability(ROOM_ID, ENTERPRISE_URL)
    
    # Inject dependencies
    climate_coordinator.inject_dependency(TemperatureSensorContract, sensor_gateway)
    climate_coordinator.inject_dependency(FanActuatorContract, fan_controller)
    
    # Wire up data access for reporter
    enterprise_reporter.get_sensor_readings = sensor_gateway.get_all_readings
    enterprise_reporter.get_fan_statuses = fan_controller.get_all_statuses
    
    # Set alarm callback
    climate_coordinator.set_alarm_callback(
        lambda room_id, msg, temp: enterprise_reporter.send_alarm(msg, temp)
    )
    
    # Initialize all capabilities
    print("Initializing edge device capabilities...")
    sensor_gateway.initialize()
    fan_controller.initialize()
    climate_coordinator.initialize()
    enterprise_reporter.initialize()
    
    # Start all capabilities
    print("Starting edge device...")
    sensor_gateway.start()
    fan_controller.start()
    climate_coordinator.start()
    enterprise_reporter.start()
    
    print(f"Edge device running for {ROOM_ID}")
    print(f"Desired temperature: {DESIRED_TEMP}°C")
    
    # Run until interrupted
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nShutting down edge device...")
        
        # Stop all capabilities in reverse order
        enterprise_reporter.stop()
        climate_coordinator.stop()
        fan_controller.stop()
        sensor_gateway.stop()
        
        print("Edge device stopped")

if __name__ == "__main__":
    main()

The edge device demonstrates CCA at a higher abstraction level than the IoT nodes. The ClimateCoordinatorEssence contains pure control logic completely independent of I/O. We can test the temperature-to-fan-speed calculation without any hardware or network connections. The Realization layer integrates with sensor gateway and fan controller through contracts, not direct dependencies. This allows us to replace the serial-based IoT communication with network protocols, simulated devices for testing, or any other implementation without changing the climate control logic.

Notice how the alarm callback mechanism maintains loose coupling. The climate coordinator doesn't know about the enterprise reporter directly. It calls a callback function when alarm conditions occur. This allows the enterprise reporter to be optional, replaceable, or even absent during testing.

CONCLUSION: THE POWER OF UNIFIED ARCHITECTURE

We have journeyed from bare-metal microcontroller code through embedded Linux edge devices to enterprise cloud applications, all using the same architectural pattern. This is the power of Capability-Centric Architecture. It provides a unified conceptual framework that scales across the entire embedded-to-enterprise spectrum.

Every capability in our system, whether running on a 64KB microcontroller or a cloud server with gigabytes of RAM, follows the same structure. Essence contains pure domain logic. Realization integrates infrastructure. Adaptation provides interfaces. Contracts enable loose coupling. Evolution Envelopes manage change. The Capability Registry prevents circular dependencies and manages lifecycle.

This consistency brings profound benefits. Developers can move between system tiers without learning entirely new architectural patterns. Testing strategies remain consistent. Evolution mechanisms work the same way. The mental model stays stable even as implementation details vary dramatically.

Most importantly, CCA enables systems to evolve gracefully. We can replace our ADC-based temperature sensors with digital I2C sensors without changing the edge device. We can migrate the enterprise application from monolithic deployment to microservices without changing capabilities. We can add new IoT node types, new edge device capabilities, or new enterprise features, all while maintaining architectural integrity.

The code you have seen in this tutorial is production-ready. The sensor node code runs on real microcontrollers. The edge device code coordinates actual IoT nodes. The patterns demonstrated here have been proven in real-world systems spanning industrial control, building automation, and IoT platforms.

Capability-Centric Architecture represents an evolution in how we think about software architecture. It synthesizes the best ideas from Domain-Driven Design, Hexagonal Architecture, and Clean Architecture while adding new mechanisms specifically designed for the modern technological landscape. It does not replace these patterns but extends them to work across a broader range of systems and challenges.

As you apply CCA to your own systems, remember the core principles. Organize around capabilities, not technical layers. Separate Essence from Realization. Interact through contracts, not direct dependencies. Use efficiency gradients appropriately. Plan for evolution from the beginning. Follow these principles, and you will build systems that are easier to understand, test, deploy, and evolve, whether they control industrial machines, process billions of transactions, or anything in between.


No comments: