Tuesday, May 05, 2026

BUILDING THE PERFECT MEMORY SYSTEM FOR LLM APPLICATIONS AND AGENTS




CHAPTER ONE: THE AMNESIA PROBLEM

Every conversation with a large language model begins the same way: with total amnesia. The model does not remember you. It does not remember what you discussed yesterday, last week, or five minutes ago in a different session. It does not remember that you prefer concise answers, that you are building a healthcare application, that you spent three hours debugging a particular piece of code together, or that you once told it your name. Each new context window is a blank slate, a mind wiped clean, a patient waking up from surgery with no recollection of the life lived before.

This is not a small inconvenience. It is a fundamental architectural liability that prevents LLMs from becoming truly intelligent, persistent agents. Humans are defined by their memories. Our identity, our skills, our relationships, our accumulated wisdom -- all of it is memory. An agent without memory is not an agent at all. It is a very sophisticated lookup table.

The irony is profound: LLMs are trained on essentially all of human knowledge, yet they cannot remember what happened in the last conversation. They know the entire history of Rome but cannot recall that you asked them about Rome yesterday. They understand the theory of episodic memory in neuroscience but have no episodic memory of their own.

This article is about fixing that. Not with a patch, not with a hack, but with a complete, principled, biologically-inspired, engineered-for-speed memory system that gives LLM applications and agentic systems a genuine cognitive memory architecture. We will call this system the Unified Cognitive Memory Architecture, or UCMA.

We will build it from the ground up, drawing on neuroscience, computer science, information retrieval theory, and the brilliant conceptual framework that Andrej Karpathy has articulated in his vision of the LLM Operating System. We will look at what exists today, dissect its failures, and then construct something far more powerful.


CHAPTER TWO: KARPATHY'S LLM OS -- THE CONCEPTUAL FOUNDATION

Before we build anything, we need the right mental model. Andrej Karpathy, one of the founding researchers of modern deep learning and a key figure in the development of GPT systems, has articulated a vision that reframes how we should think about LLMs entirely. Rather than seeing a language model as a text completion engine, Karpathy proposes seeing it as a CPU -- the central processing unit of a new kind of operating system.

In this LLM OS framework, the context window of the model is analogous to RAM -- fast, limited, immediately accessible working memory. The model weights themselves are analogous to a kind of slow, baked-in long-term memory -- the accumulated knowledge from training, frozen in place. External storage systems -- databases, vector stores, file systems -- are analogous to disk storage: large, persistent, but requiring explicit read and write operations to access.

This analogy is extraordinarily useful because it immediately reveals the architectural gaps. A real operating system has sophisticated memory management: virtual memory, paging, caching hierarchies (L1, L2, L3 cache), memory-mapped files, garbage collection, and swap space. The naive LLM application has none of this. It has RAM (the context window) and maybe a hard disk (a vector database), with nothing in between and no intelligent management layer connecting them.

Karpathy's insight points us toward what we need to build: a full memory hierarchy for LLM systems, with the same sophistication that operating system designers have brought to hardware memory management over the past sixty years. The difference is that our memory system must deal not with bytes and cache lines but with semantic meaning, temporal context, emotional salience, causal relationships, and procedural knowledge.

The LLM OS framing also clarifies the role of the agent. An agent in this system is like a process running on the operating system. It has access to the CPU (the LLM), it can read and write to memory (our UCMA), it can call system functions (tools), and it can spawn child processes (sub-agents). The memory system is the kernel-level infrastructure that makes all of this coherent and persistent.


CHAPTER THREE: THE NEUROSCIENCE OF MEMORY -- WHAT WE ARE ENGINEERING

Human memory is not a single system. Neuroscience has identified at least five distinct memory systems, each with different storage mechanisms, time scales, and retrieval characteristics. Understanding these systems is not an academic exercise -- it is the blueprint for our architecture.

Sensory memory is the briefest form: the persistence of sensory impressions for a fraction of a second after the stimulus has ended. It is the reason you can follow a moving object even when it briefly disappears. In our system, the Sensory Buffer plays this role: a high-throughput, low-latency holding area for raw input.

Working memory (short-term memory) is the conscious workspace: the roughly seven items that you can hold in mind simultaneously. It is fast, limited, and volatile. In our system, the Working Memory Manager corresponds to this: the managed context window that the LLM actively reasons over.

Episodic memory stores specific events and experiences, tagged with temporal and contextual metadata. "I remember the conversation I had with Michael on Tuesday about JWT authentication." Episodic memories are rich, contextual, and time-stamped. In our system, the Episodic Memory Store handles this.

Semantic memory stores facts, knowledge, and general truths about the world. "JWT tokens expire. FastAPI uses dependency injection." Semantic memories have no "when" -- they are timeless propositions. In our system, the Semantic Memory Store handles this, augmented with a knowledge graph.

Procedural memory stores knowledge of how to do things: skills, habits, workflows. "To implement JWT authentication in FastAPI, follow these steps..." Procedural memories are retrieved not by their content but by the situation that calls for them. In our system, the Procedural Memory Store handles this.

Hermann Ebbinghaus, the nineteenth-century psychologist who first studied memory scientifically, discovered the forgetting curve: memories decay exponentially over time unless they are reinforced. But reinforcement resets the decay clock and makes the memory more resistant to future forgetting. This is the basis of spaced repetition learning. Forgetting is not a failure of the memory system -- it is a feature. Forgetting unimportant information is what allows the system to remain fast and focused.

Our UCMA will implement all of these systems. Every one of them has a direct engineering analog.


CHAPTER FOUR: THE GRAVEYARD OF CURRENT APPROACHES

Before we build the cathedral, let us walk through the ruins of what has been tried before. Understanding why existing approaches fail is essential to understanding why UCMA is designed the way it is.

The simplest approach is conversation buffer memory. You simply concatenate all previous messages into the context window. This is what most chatbot applications do. It works fine for short conversations but fails catastrophically as conversations grow. The context window fills up, older messages are truncated, and the system loses access to early context. There is no prioritization, no summarization, no persistence across sessions. This is not a memory system -- it is a scrolling text buffer.

The next step up is summary memory. When the conversation buffer gets too long, a secondary LLM call summarizes the older portion and replaces it with a compressed version. This is better -- it extends the effective memory horizon -- but it introduces serious problems. Summarization is lossy. Details that seemed unimportant at summarization time may turn out to be critical later. The summary is a flat blob of text with no structure, no retrievability by topic, and no temporal indexing. You cannot ask "what did we discuss about authentication?" and get a precise answer -- you get whatever the summary happened to preserve. Furthermore, summary memory is still session-scoped. When the session ends, the summary is typically discarded.

Vector database RAG (Retrieval Augmented Generation) is the approach that took the industry by storm around 2022 and 2023. The idea is elegant: convert documents or memories into dense vector embeddings using an embedding model, store them in a vector database like Pinecone, Weaviate, Chroma, or Qdrant, and at query time, embed the current query and retrieve the most semantically similar stored vectors. This gives you a form of long-term semantic memory that persists across sessions and scales to millions of documents.

RAG is genuinely powerful, but it has serious architectural limitations that prevent it from serving as a complete memory system. It only supports one type of retrieval -- semantic similarity -- and gives you no good mechanism for time-range queries, entity-specific lookups, or procedural knowledge retrieval. RAG treats all memories as equally important, so a throwaway comment and a critical insight are stored and retrieved with equal weight. RAG has no concept of time -- it does not know that recent memories are generally more relevant than old ones, or that some memories should decay while others should be reinforced. RAG has no consolidation mechanism -- it just accumulates chunks indefinitely, growing slower and noisier over time. And perhaps most importantly, RAG retrieves isolated chunks rather than connected knowledge. It cannot follow causal chains, temporal sequences, or conceptual hierarchies.

MemGPT, developed by researchers at UC Berkeley and later commercialized as Letta, takes a different and more sophisticated approach. It treats the LLM context window as a form of RAM and implements explicit paging: a main context (always in the window), a recall storage (searchable conversation history), and archival storage (long-term vector storage). The LLM itself decides what to page in and out, using special function calls. This is clever and much closer to the right architecture, but it has its own limitations. The LLM must spend tokens deciding what to remember and what to retrieve, which is expensive and slow. The system still lacks true multi-tier memory with different time scales, lacks sleep consolidation, lacks a forgetting curve, and lacks procedural memory.

Commercial systems like Zep and Mem0 add user-level memory persistence, entity extraction, and some degree of memory management. They are useful products but are fundamentally RAG systems with additional preprocessing layers. They do not solve the fundamental architectural problems.

What is universally missing across all current approaches is a unified, hierarchical, graph-structured memory system with multiple specialized stores, intelligent routing, temporal decay with importance weighting, sleep-phase consolidation, procedural memory, and a retrieval layer that can answer not just "what is semantically similar to this query" but also "what happened recently," "what do I know how to do," "what have I learned from past mistakes," and "what is the causal chain leading to this outcome."

That is what we are going to build.


CHAPTER FIVE: THE UNIFIED COGNITIVE MEMORY ARCHITECTURE -- OVERVIEW

UCMA is organized as a set of specialized memory stores connected by a Memory Graph, managed by a Memory Controller, and served through a Unified Retrieval Layer. The following diagram shows the complete architecture:

+========================================================================+
|                      LLM AGENT / APPLICATION                          |
+========================================================================+
                                  |
                    +-------------+-------------+
                    |      MEMORY CONTROLLER    |
                    |  (orchestrates all memory |
                    |       operations)         |
                    +-------------+-------------+
                                  |
     +--------+--------+----------+---------+--------+--------+
     |        |        |          |         |        |        |
+----+----+ +-+-----+ ++---------++ +-------+-+ +----+----+ +-+-------+
|SENSORY  | |WORKING| | EPISODIC  | |SEMANTIC | |PROCEDUR | |LEARNING |
|BUFFER   | |MEMORY | | STORE     | |STORE    | |AL STORE | |STORE    |
|         | |       | |           | |         | |         | |         |
|Ring buf | |3-zone | |Short-term | |Facts +  | |Workflow | |Insights |
|Fast     | |context| |+ Medium-  | |KG +     | |templates| |Lessons  |
|ingest   | |window | |term tiers | |Entity   | |Success  | |Patterns |
|Auto-    | |mgmt   | |Temporal   | |index    | |rates    | |Validated|
|evict    | |       | |index      | |         | |         | |         |
+----+----+ +-+-----+ ++---------++ +-------+-+ +----+----+ +-+-------+
     |        |        |          |         |        |        |
     +--------+--------+----------+---------+--------+--------+
                                  |
                    +-------------+-------------+
                    |        MEMORY GRAPH       |
                    |  Nodes: all memories      |
                    |  Edges: typed relations   |
                    |  PageRank importance      |
                    |  Graph traversal          |
                    +-------------+-------------+
                                  |
          +-----------------------+-----------------------+
          |                                               |
+---------+---------+                       +------------+------------+
| UNIFIED RETRIEVAL |                       | SLEEP CONSOLIDATION     |
|      LAYER        |                       | ENGINE (background)     |
|                   |                       |                         |
|Semantic (ANN/HNSW)|                       |Compress episodes        |
|Structured indexes |                       |Abstract to semantic     |
|Temporal buckets   |                       |Reinforce via Hebb       |
|Graph traversal    |                       |Prune via Ebbinghaus     |
|Hybrid fusion      |                       +------------+------------+
|Multi-factor rank  |                                    |
+-------------------+                       +------------+------------+
                                            | FORGETTING ENGINE       |
                                            |                         |
                                            |Ebbinghaus curve         |
                                            |Stability tracking       |
                                            |Importance protection    |
                                            |LRU + PageRank guard     |
                                            +-------------------------+

DATA FLOW -- WRITE PATH:

  Raw Input --> Sensory Buffer --> Memory Controller
    --> Importance Classifier
    --> [if important] Episodic Store (short-term)
    --> Embedding Model --> vector stored with entry
    --> Memory Graph (new node registered)
    --> [async] Sleep Engine --> compress, abstract, reinforce, prune

DATA FLOW -- READ PATH:

  Agent Query --> Memory Controller --> Query Router
    --> [semantic]    Embedding Model --> ANN search --> Episodic + Semantic
    --> [structured]  Entity/Tag index --> Semantic + Procedural
    --> [temporal]    Bucket index --> Episodic
    --> [graph]       Graph traversal --> any type
    --> Result Fusion --> Deduplication --> Multi-factor Ranking
    --> Top-K results --> Working Memory (staging zone) --> LLM Context

SLEEP CYCLE (background, periodic):

  Timer fires --> SleepConsolidationEngine
    --> expire_short_term()     --> compress high-importance entries
    --> abstract_to_semantic()  --> extract facts via LLM
    --> reinforce_connections() --> update PageRank, strengthen edges
    --> prune_forgotten()       --> ForgettingEngine.run_forgetting_pass()
    --> cleanup orphaned graph nodes + temporal index entries

CHAPTER SIX: MODULE STRUCTURE AND INSTALLATION

Before diving into the code, let us establish the module structure and installation requirements. UCMA is organized as a Python package with one module per major component. The structure is as follows:

ucma/
|-- __init__.py
|-- config.py              # Central configuration
|-- models.py              # All dataclasses and enums
|-- sensory_buffer.py      # Sensory Buffer
|-- working_memory.py      # Working Memory Manager
|-- episodic_store.py      # Episodic Memory Store
|-- semantic_store.py      # Semantic Memory Store
|-- procedural_store.py    # Procedural Memory Store
|-- learning_store.py      # Learning Store
|-- memory_graph.py        # Memory Graph
|-- retrieval.py           # Unified Retrieval Layer
|-- query_router.py        # Query Router (heuristic classifier)
|-- retention.py           # Ebbinghaus Retention State
|-- forgetting.py          # Forgetting Engine
|-- consolidation.py       # Sleep Consolidation Engine
|-- controller.py          # Memory Controller (top-level orchestrator)
|-- embedding_cache.py     # LRU embedding cache
|-- providers/
|   |-- __init__.py
|   |-- base.py            # Protocol definitions for LLM providers
|   |-- openai_provider.py # OpenAI concrete implementation
|-- main.py                # Wiring and example usage
requirements.txt
pyproject.toml

CHAPTER SEVEN: CONFIGURATION

All tunable parameters live in a single configuration dataclass. This eliminates magic numbers scattered throughout the codebase and makes the system easy to tune for different deployment scenarios. A customer service agent might want aggressive forgetting and a small context window; a long-running research agent might want conservative forgetting and a large context window.

# ucma/config.py

"""
Central configuration for the Unified Cognitive Memory Architecture.

All tunable parameters are collected here. Pass a UCMAConfig instance
to every component that needs configuration. This eliminates magic
numbers scattered throughout the codebase and makes the system easy
to tune for different deployment scenarios without touching any
component code.
"""

from __future__ import annotations

from dataclasses import dataclass, field


@dataclass
class UCMAConfig:
    """
    Unified configuration for all UCMA components.

    All numeric thresholds, budgets, and tuning knobs live here.
    Defaults are calibrated for a general-purpose coding assistant
    agent with moderate memory requirements.
    """

    # ------------------------------------------------------------------ #
    # Model identity                                                       #
    # ------------------------------------------------------------------ #
    model_name: str = "gpt-4o-mini"
    embedding_model: str = "text-embedding-3-small"
    embedding_dim: int = 1536

    # ------------------------------------------------------------------ #
    # Sensory Buffer                                                        #
    # ------------------------------------------------------------------ #
    sensory_buffer_capacity: int = 512

    # ------------------------------------------------------------------ #
    # Working Memory                                                        #
    # ------------------------------------------------------------------ #
    working_memory_system_tokens: int = 2048
    working_memory_active_tokens: int = 6144
    working_memory_staging_tokens: int = 512

    # ------------------------------------------------------------------ #
    # Episodic Store                                                        #
    # ------------------------------------------------------------------ #
    # How long entries stay in the short-term tier before expiry check
    short_term_ttl_hours: float = 48.0
    # Minimum importance to survive compression into medium-term
    compression_importance_base: float = 0.30
    # Maximum additional threshold penalty when many entries expire at once
    compression_volume_penalty_max: float = 0.40

    # ------------------------------------------------------------------ #
    # Sleep Consolidation Engine                                            #
    # ------------------------------------------------------------------ #
    consolidation_interval_seconds: float = 3600.0  # Every hour
    consolidation_summary_max_tokens: int = 150
    consolidation_fact_extraction_batch: int = 10

    # ------------------------------------------------------------------ #
    # Memory Graph                                                          #
    # ------------------------------------------------------------------ #
    pagerank_damping: float = 0.85
    pagerank_iterations: int = 20

    # ------------------------------------------------------------------ #
    # Embedding Cache                                                       #
    # ------------------------------------------------------------------ #
    embedding_cache_max_size: int = 256

    # ------------------------------------------------------------------ #
    # Retrieval                                                             #
    # ------------------------------------------------------------------ #
    retrieval_semantic_min_similarity: float = 0.50
    retrieval_max_results: int = 10
    retrieval_graph_expansion_hops: int = 1
    retrieval_graph_expansion_min_weight: float = 0.50
    retrieval_graph_expansion_top_k: int = 3

    # Multi-factor ranking weights (should sum to 1.0)
    rank_weight_relevance: float = 0.40
    rank_weight_importance: float = 0.25
    rank_weight_recency: float = 0.20
    rank_weight_retention: float = 0.15
    rank_recency_halflife_days: float = 7.0

    # ------------------------------------------------------------------ #
    # Forgetting Engine                                                     #
    # ------------------------------------------------------------------ #
    forgetting_retention_threshold: float = 0.10
    forgetting_importance_protection: float = 0.01
    # Stability boost per reinforcement (base, before spacing multiplier)
    retention_stability_boost_base: float = 0.50

CHAPTER EIGHT: THE DATA MODELS

All shared data structures live in a single models.py module. Using a single models file prevents circular imports and gives every component a common vocabulary. The dataclasses are intentionally lightweight -- they carry metadata for routing and ranking, while the full content lives in the appropriate typed store.

# ucma/models.py

"""
Shared data models for the Unified Cognitive Memory Architecture.

All dataclasses and enumerations used across UCMA components are defined
here. Keeping all models in one module prevents circular imports and
gives every component a common vocabulary.
"""

from __future__ import annotations

import hashlib
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional


# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------

class MemoryType(str, Enum):
    """The type of a memory node in the system."""
    EPISODIC   = "episodic"
    SEMANTIC   = "semantic"
    PROCEDURAL = "procedural"
    LEARNING   = "learning"


class QueryMode(str, Enum):
    """The retrieval strategy to use for a memory query."""
    SEMANTIC   = "semantic"    # Dense vector similarity search
    STRUCTURED = "structured"  # Metadata / index-based filtering
    HYBRID     = "hybrid"      # Semantic + structured combined
    GRAPH      = "graph"       # Graph traversal from a seed node
    TEMPORAL   = "temporal"    # Time-range retrieval via bucket index


class ProcedureStatus(str, Enum):
    """Lifecycle state of a procedural memory."""
    DRAFT      = "draft"       # Newly created, not yet validated
    ACTIVE     = "active"      # Validated and in production use
    DEPRECATED = "deprecated"  # Superseded by a better procedure


class LearningType(str, Enum):
    """The category of a learning record."""
    FAILURE_ANALYSIS = "failure_analysis"  # What went wrong and why
    SUCCESS_PATTERN  = "success_pattern"   # What worked and why
    STRATEGY_UPDATE  = "strategy_update"   # How to approach a problem class
    SELF_CORRECTION  = "self_correction"   # Agent correcting its own behavior
    USER_PREFERENCE  = "user_preference"   # Learned preferences of the user
    DOMAIN_INSIGHT   = "domain_insight"    # New understanding of the domain


# ---------------------------------------------------------------------------
# Sensory Layer
# ---------------------------------------------------------------------------

@dataclass
class SensoryEntry:
    """
    A single raw input event entering the memory system.

    This is the most primitive unit of information in UCMA. The entry
    is intentionally lightweight -- it carries raw content and minimal
    metadata -- because most entries will be discarded before expensive
    processing is applied.
    """
    content: str
    source: str                        # "user", "tool", "environment", "system"
    modality: str = "text"             # "text", "image", "audio", "structured"
    entry_id: str = field(
        default_factory=lambda: str(uuid.uuid4())
    )
    timestamp: float = field(default_factory=time.time)
    raw_token_estimate: Optional[int] = None


# ---------------------------------------------------------------------------
# Episodic Memory
# ---------------------------------------------------------------------------

@dataclass
class EpisodicEntry:
    """
    A single episodic memory: a record of something that happened.

    Episodic entries are the richest and most contextually detailed
    memories in the system. They answer the questions: what happened,
    when did it happen, who was involved, and why did it matter?
    """
    entry_id: str
    session_id: str
    agent_id: str
    timestamp: float
    content: str
    importance: float
    participants: list[str] = field(default_factory=list)
    tags: list[str] = field(default_factory=list)
    summary: Optional[str] = None
    embedding: Optional[list[float]] = None
    tier: str = "short_term"           # "short_term" or "medium_term"
    access_count: int = 0
    last_accessed: float = field(default_factory=time.time)

    def touch(self) -> None:
        """Record an access event on this entry."""
        self.access_count += 1
        self.last_accessed = time.time()


# ---------------------------------------------------------------------------
# Semantic Memory
# ---------------------------------------------------------------------------

@dataclass
class SemanticFact:
    """
    A single semantic memory: a timeless fact or piece of knowledge.

    Semantic facts are extracted from episodes during consolidation or
    can be directly asserted by the agent or user. They are stored as
    subject-predicate-object triples to enable structured querying and
    knowledge graph construction.
    """
    subject: str
    predicate: str
    object_value: str
    content: str                       # Human-readable form of the triple
    confidence: float
    source_episode_ids: list[str] = field(default_factory=list)
    embedding: Optional[list[float]] = None
    fact_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    created_at: float = field(default_factory=time.time)
    last_verified: float = field(default_factory=time.time)
    access_count: int = 0
    importance: float = 0.5

    @staticmethod
    def make_deterministic_id(
        subject: str, predicate: str, object_value: str
    ) -> str:
        """
        Generate a deterministic ID from the fact's SPO triple.

        When the same fact is derived from multiple episodes, it always
        maps to the same ID. Repeated assertions increase confidence
        rather than creating duplicate entries, keeping the store clean.
        """
        key = f"{subject.lower()}|{predicate.lower()}|{object_value.lower()}"
        return hashlib.sha256(key.encode()).hexdigest()[:32]

    def reinforce(self, source_episode_id: str) -> None:
        """
        Increase confidence and add a new source episode.
        Called when the same fact is derived from a new episode.
        """
        if source_episode_id not in self.source_episode_ids:
            self.source_episode_ids.append(source_episode_id)
        self.confidence = min(1.0, self.confidence + 0.05)
        self.last_verified = time.time()

    def touch(self) -> None:
        """Record an access event."""
        self.access_count += 1


@dataclass
class KnowledgeGraphEdge:
    """
    A directed edge in the knowledge graph connecting two entities.

    Edges represent relationships between concepts, people, systems,
    and any other entities that appear in the agent's experience.
    """
    source_entity: str
    relationship: str
    target_entity: str
    weight: float = 1.0
    created_at: float = field(default_factory=time.time)


# ---------------------------------------------------------------------------
# Procedural Memory
# ---------------------------------------------------------------------------

@dataclass
class ProcedureStep:
    """
    A single step within a procedural memory.

    Steps can be tool calls, LLM reasoning steps, or conditional
    branches. Each step carries its own success criteria so the agent
    can verify completion before proceeding to the next step.
    """
    step_id: str
    description: str
    action_type: str                   # "tool_call", "llm_reasoning", "condition"
    action_spec: dict
    success_criteria: str              # How to verify this step succeeded
    on_failure: str = "retry"          # "retry", "skip", "abort", "escalate"
    max_retries: int = 3


@dataclass
class ProceduralMemory:
    """
    A stored procedure: a reusable, parameterized workflow that the
    agent has learned to execute for a class of situations.
    """
    procedure_id: str
    name: str
    description: str
    trigger_tags: list[str]            # Situation tags that activate this procedure
    steps: list[ProcedureStep]
    status: ProcedureStatus = ProcedureStatus.DRAFT
    success_rate: float = 0.0
    execution_count: int = 0
    created_at: float = field(default_factory=time.time)
    last_used: Optional[float] = None

    def update_statistics(self, success: bool) -> None:
        """
        Update success rate using an incremental running average.

        Uses floating-point arithmetic throughout to avoid integer
        division truncation. The formula is the standard online
        mean update: new_mean = old_mean + (new_value - old_mean) / n
        """
        self.execution_count += 1
        n = float(self.execution_count)
        new_value = 1.0 if success else 0.0
        self.success_rate = self.success_rate + (new_value - self.success_rate) / n
        self.last_used = time.time()


# ---------------------------------------------------------------------------
# Learning Records
# ---------------------------------------------------------------------------

@dataclass
class LearningRecord:
    """
    A single learning: a distilled insight derived from experience.

    Learnings are the highest-level, most abstract form of memory in
    UCMA. They represent the agent's growing wisdom about how to operate
    effectively in its environment.
    """
    learning_id: str
    learning_type: LearningType
    title: str
    insight: str
    evidence: list[str]                # IDs of supporting episodes/facts
    applicability: list[str]           # Tags describing when this applies
    confidence: float
    impact_score: float
    created_at: float = field(default_factory=time.time)
    last_applied: Optional[float] = None
    application_count: int = 0
    validated: bool = False

    def apply(self) -> None:
        """Record that this learning was applied in a decision."""
        self.last_applied = time.time()
        self.application_count += 1

    def validate(self, success: bool) -> None:
        """
        Update confidence based on whether applying this learning
        led to a successful outcome.

        The asymmetric update (larger penalty for failure than reward
        for success) makes the system conservative: it requires multiple
        successes to build confidence but only a few failures to erode
        it. This prevents overconfident bad habits from forming.
        """
        if success:
            self.confidence = min(1.0, self.confidence + 0.10)
            self.validated = True
        else:
            self.confidence = max(0.0, self.confidence - 0.15)


# ---------------------------------------------------------------------------
# Memory Graph Structures
# ---------------------------------------------------------------------------

@dataclass
class MemoryNode:
    """
    A node in the Memory Graph.

    Each node represents a memory of any type, identified by its ID
    and type tag. The node carries lightweight metadata for graph
    operations; the full memory content lives in the appropriate
    typed store.
    """
    node_id: str
    memory_type: MemoryType
    importance: float
    created_at: float
    last_accessed: float
    access_count: int = 0


@dataclass
class MemoryEdge:
    """
    A directed, weighted, typed edge between two memory nodes.

    The edge type captures the semantic relationship between memories,
    enabling structured traversal and causal reasoning.
    """
    source_id: str
    target_id: str
    edge_type: str     # "caused_by", "followed_by", "supports", "derived_from", etc.
    weight: float = 1.0
    created_at: float = field(default_factory=time.time)


# ---------------------------------------------------------------------------
# Retrieval Structures
# ---------------------------------------------------------------------------

@dataclass
class MemoryQuery:
    """
    A unified query object that can express any type of memory retrieval
    request. The Unified Retrieval Layer interprets this object and
    routes it to the appropriate stores and indexes.
    """
    query_text: str
    mode: QueryMode = QueryMode.HYBRID
    max_results: int = 10
    memory_types: Optional[list[MemoryType]] = None
    time_range: Optional[tuple[float, float]] = None
    entity_filter: Optional[str] = None
    tag_filter: Optional[list[str]] = None
    seed_node_id: Optional[str] = None
    min_importance: float = 0.0
    include_graph_neighbors: bool = True


@dataclass
class RetrievalResult:
    """
    A single result from a memory retrieval operation.

    Results are typed and ranked, carrying enough metadata for the
    agent to understand the provenance and reliability of each
    retrieved memory.
    """
    memory_id: str
    memory_type: MemoryType
    content: str
    relevance_score: float
    importance: float
    source_store: str
    timestamp: Optional[float] = None
    graph_distance: int = 0            # Hops from query seed (0 = direct match)

CHAPTER NINE: THE SENSORY BUFFER

The Sensory Buffer is the entry point for all new information. It is a high-throughput, low-latency, short-lived buffer that holds raw, unprocessed input while the Memory Controller performs initial classification and routing. Most entries in the Sensory Buffer never make it to longer-term storage -- they are transient observations that are processed and discarded. Only information that the Memory Controller flags as worth retaining moves forward in the pipeline.

The buffer is implemented as a thread-safe ring buffer using Python's deque with a fixed maxlen. When the buffer is full, the oldest entry is automatically evicted to make room for the newest one. This gives us O(1) insertion and O(1) eviction with no manual management required.

# ucma/sensory_buffer.py

"""
Sensory Buffer: the high-throughput entry point for all incoming information.

The Sensory Buffer is intentionally simple. Its job is to receive, not to
think. Intelligence begins in the Memory Controller, which drains the buffer
and decides what to do with each entry.
"""

from __future__ import annotations

import asyncio
import logging
from collections import deque

from ucma.models import SensoryEntry

logger = logging.getLogger(__name__)


class SensoryBuffer:
    """
    A fixed-capacity, thread-safe ring buffer for incoming sensory information.

    Operates in O(1) for both insertion and eviction. The asyncio.Lock
    ensures correctness when multiple coroutines ingest data concurrently,
    which is common in multi-tool agent loops where tool results and user
    messages can arrive simultaneously.
    """

    def __init__(self, capacity: int = 512) -> None:
        # deque with maxlen gives automatic ring-buffer eviction:
        # when full, appending a new item silently drops the oldest.
        self._buffer: deque[SensoryEntry] = deque(maxlen=capacity)
        self._capacity = capacity
        self._lock = asyncio.Lock()
        self._total_ingested: int = 0  # Monotonic counter for observability

    async def ingest(
        self,
        content: str,
        source: str,
        modality: str = "text",
    ) -> SensoryEntry:
        """
        Accept a new piece of raw input and place it in the buffer.

        Returns the created SensoryEntry so the caller can track its
        entry_id if needed for later correlation. This method is async
        to allow it to be awaited in coroutine-based agent loops without
        blocking the event loop.
        """
        entry = SensoryEntry(content=content, source=source, modality=modality)
        async with self._lock:
            self._buffer.append(entry)
            self._total_ingested += 1
        logger.debug(
            "Sensory ingest: source=%s modality=%s id=%s",
            source,
            modality,
            entry.entry_id,
        )
        return entry

    async def drain(self) -> list[SensoryEntry]:
        """
        Atomically remove and return all current entries for processing.

        After draining, the buffer is empty and ready for new input.
        The atomic swap (copy then clear) under the lock ensures that
        no entries are lost if ingest() is called concurrently.
        """
        async with self._lock:
            entries = list(self._buffer)
            self._buffer.clear()
        return entries

    @property
    def size(self) -> int:
        """Current number of entries in the buffer (approximate, no lock)."""
        return len(self._buffer)

    @property
    def total_ingested(self) -> int:
        """Total entries ever ingested (monotonic, for metrics)."""
        return self._total_ingested

CHAPTER TEN: WORKING MEMORY

Working Memory in UCMA corresponds directly to the LLM's context window, but it is managed rather than naive. The context window is treated as a precious, limited resource and managed with the same care that an operating system manages RAM. The Working Memory Manager divides the context into three zones, enforces token budgets for each zone, and makes principled eviction decisions when space is needed.

The eviction policy combines importance and recency into a single score. An important memory that was accessed recently gets the highest score and is the last to be evicted. An unimportant memory that has not been accessed in a long time gets the lowest score and is the first to go. This is a principled, tunable policy that can be adjusted by changing the relative weights.

# ucma/working_memory.py

"""
Working Memory Manager: treats the LLM context window as a managed resource.

The context window is divided into three zones with independent token budgets.
The system zone holds core instructions and is never evicted. The active zone
holds dynamic working content and is evicted using importance-weighted LRU.
The staging zone holds recently retrieved memories awaiting promotion.
"""

from __future__ import annotations

import asyncio
import logging
import time
from dataclasses import dataclass, field

import tiktoken

from ucma.config import UCMAConfig

logger = logging.getLogger(__name__)


@dataclass
class WorkingMemoryItem:
    """
    A single item occupying space in the context window.

    Items have a zone assignment, a token cost, and an importance score
    that determines their eviction priority.
    """
    item_id: str
    content: str
    zone: str                          # "system", "active", or "staging"
    token_count: int
    importance: float                  # 0.0 to 1.0
    inserted_at: float = field(default_factory=time.time)
    last_accessed: float = field(default_factory=time.time)

    def touch(self) -> None:
        """Record an access, resetting the recency clock."""
        self.last_accessed = time.time()


class WorkingMemoryManager:
    """
    Manages the LLM context window as a structured, budgeted resource.

    Enforces per-zone token budgets and makes principled eviction decisions
    when space is needed. The system zone is always protected; only the
    active and staging zones participate in eviction.
    """

    def __init__(self, config: UCMAConfig) -> None:
        self._zone_budgets: dict[str, int] = {
            "system":  config.working_memory_system_tokens,
            "active":  config.working_memory_active_tokens,
            "staging": config.working_memory_staging_tokens,
        }
        # Load the tokenizer, falling back to cl100k_base for unknown models.
        # tiktoken raises KeyError for models it does not recognize.
        try:
            self._encoder = tiktoken.encoding_for_model(config.model_name)
        except KeyError:
            logger.warning(
                "Unknown model '%s' for tiktoken; falling back to cl100k_base.",
                config.model_name,
            )
            self._encoder = tiktoken.get_encoding("cl100k_base")

        self._items: dict[str, WorkingMemoryItem] = {}
        self._zone_usage: dict[str, int] = {z: 0 for z in self._zone_budgets}
        self._lock = asyncio.Lock()

    def _count_tokens(self, text: str) -> int:
        """Count tokens in a string using the loaded tiktoken encoder."""
        return len(self._encoder.encode(text))

    async def add(
        self,
        item_id: str,
        content: str,
        zone: str,
        importance: float,
    ) -> bool:
        """
        Add an item to the specified zone.

        Returns True if the item was added, False if it could not fit
        even after eviction. The system zone never evicts; if a system
        item does not fit, it is rejected immediately.
        """
        token_count = self._count_tokens(content)
        async with self._lock:
            # If item already exists, update it in place
            if item_id in self._items:
                old = self._items[item_id]
                self._zone_usage[old.zone] -= old.token_count
                old.content = content
                old.token_count = token_count
                old.importance = importance
                old.touch()
                self._zone_usage[old.zone] += token_count
                return True

            # Evict if needed (system zone is never evicted)
            if zone != "system":
                await self._evict_if_needed(zone, token_count)

            budget = self._zone_budgets[zone]
            if self._zone_usage[zone] + token_count > budget:
                logger.warning(
                    "Cannot fit item %s (%d tokens) in zone '%s' "
                    "(used=%d, budget=%d).",
                    item_id, token_count, zone,
                    self._zone_usage[zone], budget,
                )
                return False

            item = WorkingMemoryItem(
                item_id=item_id,
                content=content,
                zone=zone,
                token_count=token_count,
                importance=importance,
            )
            self._items[item_id] = item
            self._zone_usage[zone] += token_count
            return True

    async def _evict_if_needed(self, zone: str, needed_tokens: int) -> None:
        """
        Evict the lowest-priority items from a zone until there is room.

        Priority score = importance * 0.7 + recency_score * 0.3
        where recency_score decays with time since last access.
        Items are evicted in ascending priority order (lowest first).

        Note: called while self._lock is already held.
        """
        budget = self._zone_budgets[zone]
        now = time.time()

        while self._zone_usage[zone] + needed_tokens > budget:
            # Snapshot to avoid modifying dict during iteration
            candidates = [
                item for item in list(self._items.values())
                if item.zone == zone
            ]
            if not candidates:
                break

            def priority(item: WorkingMemoryItem) -> float:
                age_hours = (now - item.last_accessed) / 3600.0
                recency = 1.0 / (1.0 + age_hours)
                return item.importance * 0.7 + recency * 0.3

            # Evict the item with the lowest priority
            victim = min(candidates, key=priority)
            self._zone_usage[zone] -= victim.token_count
            del self._items[victim.item_id]
            logger.debug(
                "Evicted item %s from zone '%s' (importance=%.2f).",
                victim.item_id, zone, victim.importance,
            )

    async def get(self, item_id: str) -> WorkingMemoryItem | None:
        """Retrieve an item by ID and update its access time."""
        async with self._lock:
            item = self._items.get(item_id)
            if item:
                item.touch()
            return item

    async def clear_zone(self, zone: str) -> int:
        """
        Remove all items from a zone. Returns the number of items removed.
        Used by new_session() to clear the active zone between sessions.
        """
        async with self._lock:
            to_remove = [
                item_id
                for item_id, item in self._items.items()
                if item.zone == zone
            ]
            for item_id in to_remove:
                item = self._items.pop(item_id)
                self._zone_usage[zone] -= item.token_count
            return len(to_remove)

    def render_context(self) -> str:
        """
        Produce the full context string to be sent to the LLM.

        Zones are rendered in order: system, active, staging. Empty
        zones are skipped to avoid producing blank sections. Items
        within each zone are sorted by importance (highest first).
        """
        sections: list[str] = []
        for zone in ("system", "active", "staging"):
            zone_items = sorted(
                [item for item in self._items.values() if item.zone == zone],
                key=lambda i: i.importance,
                reverse=True,
            )
            if zone_items:
                sections.append(
                    "\n".join(item.content for item in zone_items)
                )
        return "\n\n".join(sections)

    def usage_summary(self) -> dict[str, object]:
        """Return a summary of token usage per zone for observability."""
        return {
            zone: {
                "used": self._zone_usage[zone],
                "budget": self._zone_budgets[zone],
                "pct": round(
                    100.0 * self._zone_usage[zone] / self._zone_budgets[zone], 1
                ),
            }
            for zone in self._zone_budgets
        }

CHAPTER ELEVEN: THE RETENTION MODEL

The Ebbinghaus forgetting curve is the mathematical heart of UCMA's forgetting system. Before we build the Forgetting Engine, we need the retention model that it operates on. Every memory in the system has an associatedMemoryRetentionState that tracks its stability and last review time. The stability increases each time the memory is accessed, making it more resistant to future decay. This is the mechanism behind spaced repetition learning systems.

The retention formula is: R = exp(-t / S) where R is retention (0 to 1), t is elapsed time in days since last review, and S is stability (a positive float that grows with each reinforcement). A stability of 1.0 means the memory decays to about 37% after one day. A stability of 7.0 means it decays to about 37% after one week.

# ucma/retention.py

"""
Ebbinghaus Retention Model for the Unified Cognitive Memory Architecture.

Every memory in UCMA has an associated MemoryRetentionState. The retention
model implements the Ebbinghaus forgetting curve with SuperMemo-inspired
stability updates. Stability grows with each review, and the growth is
proportional to the elapsed time since the last review (the spacing effect).
"""

from __future__ import annotations

import math
import time
from dataclasses import dataclass, field


@dataclass
class MemoryRetentionState:
    """
    Tracks the Ebbinghaus retention state for a single memory.

    The retention formula is: R = exp(-t / S)
    where R is retention (0-1), t is elapsed time in days since last
    review, and S is stability (a positive float that grows with each
    reinforcement). Higher stability means slower decay.
    """
    memory_id: str
    stability: float = 1.0             # Days until retention drops to ~37%
    last_reviewed: float = field(default_factory=time.time)
    review_count: int = 0

    def current_retention(self) -> float:
        """
        Compute current memory retention using the Ebbinghaus formula.

        Returns a value in [0.0, 1.0]. A value below the forgetting
        threshold means the memory is a candidate for deletion.

        Guards against stability=0 (division by zero) by clamping
        stability to a minimum of 1e-6.
        """
        elapsed_days = (time.time() - self.last_reviewed) / 86400.0
        safe_stability = max(self.stability, 1e-6)
        return math.exp(-elapsed_days / safe_stability)

    def reinforce(self, boost_base: float = 0.5) -> None:
        """
        Record a review event and increase stability.

        The stability boost is proportional to the elapsed time since
        the last review (the spacing effect): reviewing a memory after
        a long gap increases stability more than reviewing it immediately.
        This matches the empirical finding that spaced repetition is
        more effective than massed practice.
        """
        elapsed_days = (time.time() - self.last_reviewed) / 86400.0
        # Spacing multiplier: at least 1.0, grows with elapsed time
        spacing_multiplier = max(1.0, elapsed_days)
        self.stability += boost_base * spacing_multiplier
        self.last_reviewed = time.time()
        self.review_count += 1


class RetentionRegistry:
    """
    A registry mapping memory IDs to their retention states.

    Thread-safety note: this registry is accessed from both the main
    agent loop (via retrieval) and the background consolidation engine.
    All mutations use a simple dict which is safe for asyncio (single-
    threaded event loop) but would need a lock for true multi-threading.
    """

    def __init__(self) -> None:
        self._states: dict[str, MemoryRetentionState] = {}

    def register(self, memory_id: str, initial_stability: float = 1.0) -> None:
        """Register a new memory with the given initial stability."""
        if memory_id not in self._states:
            state = MemoryRetentionState(
                memory_id=memory_id,
                stability=initial_stability,
            )
            self._states[memory_id] = state

    def get(self, memory_id: str) -> MemoryRetentionState | None:
        """Retrieve the retention state for a memory, or None if not found."""
        return self._states.get(memory_id)

    def reinforce(self, memory_id: str, boost_base: float = 0.5) -> None:
        """
        Reinforce a memory's retention state.
        If the memory is not registered, this is a no-op.
        """
        state = self._states.get(memory_id)
        if state:
            state.reinforce(boost_base)

    def remove(self, memory_id: str) -> None:
        """Remove a memory's retention state (called when memory is deleted)."""
        self._states.pop(memory_id, None)

    def __len__(self) -> int:
        """Return the number of tracked memories."""
        return len(self._states)

CHAPTER TWELVE: THE EPISODIC MEMORY STORE

Episodic memory stores specific events and experiences, tagged with temporal and contextual metadata. The Episodic Memory Store is divided into two sub-tiers that mirror the human distinction between recent episodic memory (hippocampal) and consolidated episodic memory (neocortical).

The Short-Term Episodic Cache holds recent events from the last few hours to a few days. Entries here are rich in detail -- they preserve the full content of interactions. The Medium-Term Episodic Store holds summarized and consolidated episodes from the past few weeks. The temporal index maps hour-aligned time buckets to entry IDs, enabling O(buckets) time-range queries rather than O(n) full scans. When entries are forgotten, the temporal index is cleaned up to prevent it from growing unboundedly.

# ucma/episodic_store.py

"""
Episodic Memory Store: two-tier storage for event memories.

The short-term tier holds recent, detailed episodic entries. The medium-term
tier holds summarized, consolidated entries. Both tiers share a temporal
index for fast time-range queries. The store is fully async-safe.
"""

from __future__ import annotations

import asyncio
import logging
import time
from collections import defaultdict

from ucma.models import EpisodicEntry

logger = logging.getLogger(__name__)


class EpisodicMemoryStore:
    """
    Two-tier episodic memory store with temporal indexing.

    The temporal index maps hour-aligned Unix timestamp buckets to lists
    of entry IDs. A time-range query only examines the relevant buckets,
    reducing complexity from O(n) to O(b + k) where b is the number of
    buckets in the range and k is the number of matching entries.
    """

    _BUCKET_SIZE_SECONDS: int = 3600  # One bucket per hour

    def __init__(self, short_term_ttl_hours: float = 48.0) -> None:
        self._short_term: dict[str, EpisodicEntry] = {}
        self._medium_term: dict[str, EpisodicEntry] = {}
        self._short_term_ttl: float = short_term_ttl_hours * 3600.0
        # temporal_index: bucket_key -> list of entry_ids
        self._temporal_index: dict[int, list[str]] = defaultdict(list)
        self._lock = asyncio.Lock()

    def _bucket_key(self, timestamp: float) -> int:
        """Compute the hour-aligned bucket key for a timestamp."""
        return int(timestamp // self._BUCKET_SIZE_SECONDS)

    async def store(self, entry: EpisodicEntry) -> None:
        """
        Store a new episodic entry in the appropriate tier and update
        the temporal index. If an entry with the same ID already exists,
        it is replaced (used when promoting from short-term to medium-term).
        """
        async with self._lock:
            if entry.tier == "short_term":
                self._short_term[entry.entry_id] = entry
            else:
                # Remove from short-term if it was there (promotion)
                self._short_term.pop(entry.entry_id, None)
                self._medium_term[entry.entry_id] = entry

            # Add to temporal index (only if not already present)
            bucket = self._bucket_key(entry.timestamp)
            if entry.entry_id not in self._temporal_index[bucket]:
                self._temporal_index[bucket].append(entry.entry_id)

    async def get(self, entry_id: str) -> EpisodicEntry | None:
        """Retrieve a single entry by ID from either tier."""
        async with self._lock:
            entry = self._short_term.get(entry_id) or self._medium_term.get(entry_id)
            if entry:
                entry.touch()
            return entry

    async def get_expired_short_term(self) -> list[EpisodicEntry]:
        """
        Return all short-term entries that have exceeded their TTL.
        These are candidates for compression into medium-term storage.
        """
        now = time.time()
        async with self._lock:
            expired = [
                entry for entry in self._short_term.values()
                if now - entry.timestamp > self._short_term_ttl
            ]
        return expired

    async def retrieve_by_time_range(
        self,
        start_time: float,
        end_time: float,
    ) -> list[EpisodicEntry]:
        """
        Retrieve all entries (from both tiers) within a time range.

        Uses the temporal bucket index for O(b + k) performance rather
        than O(n) full scan.
        """
        start_bucket = self._bucket_key(start_time)
        end_bucket = self._bucket_key(end_time)

        async with self._lock:
            results: list[EpisodicEntry] = []
            for bucket in range(start_bucket, end_bucket + 1):
                for entry_id in self._temporal_index.get(bucket, []):
                    entry = (
                        self._short_term.get(entry_id)
                        or self._medium_term.get(entry_id)
                    )
                    if entry and start_time <= entry.timestamp <= end_time:
                        entry.touch()
                        results.append(entry)
        return results

    async def get_all_short_term(self) -> list[EpisodicEntry]:
        """Return all short-term entries (for batch processing)."""
        async with self._lock:
            return list(self._short_term.values())

    async def get_all_medium_term_ids(self) -> list[str]:
        """Return all medium-term entry IDs (for forgetting passes)."""
        async with self._lock:
            return list(self._medium_term.keys())

    async def delete(self, entry_id: str) -> bool:
        """
        Permanently delete an entry from either tier and clean up the
        temporal index. Returns True if the entry was found and deleted.
        """
        async with self._lock:
            entry = self._short_term.pop(
                entry_id, None
            ) or self._medium_term.pop(entry_id, None)
            if entry is None:
                return False
            # Clean up temporal index entry
            bucket = self._bucket_key(entry.timestamp)
            bucket_list = self._temporal_index.get(bucket)
            if bucket_list:
                try:
                    bucket_list.remove(entry_id)
                except ValueError:
                    pass
                if not bucket_list:
                    del self._temporal_index[bucket]
            return True

    def stats(self) -> dict[str, int]:
        """Return store statistics for observability."""
        return {
            "short_term_count": len(self._short_term),
            "medium_term_count": len(self._medium_term),
            "temporal_buckets": len(self._temporal_index),
        }

CHAPTER THIRTEEN: THE SEMANTIC MEMORY STORE

Semantic memory stores facts, knowledge, and general truths about the world and about the specific domain of the application. Unlike episodic memory, semantic memories have no "when" -- they are timeless propositions. The Semantic Memory Store is a vector-ready fact store augmented with entity indexing and a knowledge graph layer. The entity index enables O(1) lookup of all facts about a given entity. The knowledge graph connects entities through typed relationships, enabling multi-hop relational reasoning.

A critical design decision is the deterministic fact ID. When the same fact is derived from multiple episodes, it always maps to the same ID. Repeated assertions increase confidence rather than creating duplicate entries, keeping the store clean and its retrieval results non-redundant. Both the subject and the object of each fact are indexed, so graph traversal works from either end of a triple.

# ucma/semantic_store.py

"""
Semantic Memory Store: long-term fact storage with entity indexing and
a knowledge graph layer.

Facts are stored as subject-predicate-object triples with natural language
content and optional vector embeddings. The entity index supports O(1)
structured lookup. Both subject AND object are indexed so that graph
traversal from either direction of a triple is equally fast. The knowledge
graph supports multi-hop relational reasoning via breadth-first traversal.
"""

from __future__ import annotations

import asyncio
import logging
from collections import defaultdict, deque

from ucma.models import KnowledgeGraphEdge, SemanticFact

logger = logging.getLogger(__name__)


class SemanticMemoryStore:
    """
    Long-term semantic memory with entity indexing and knowledge graph.

    Combines fast structured lookup (entity index) with graph traversal
    (knowledge graph) for relational reasoning. Vector embeddings on
    SemanticFact objects enable semantic similarity search via the
    Unified Retrieval Layer.
    """

    def __init__(self) -> None:
        # Primary store: fact_id -> SemanticFact
        self._facts: dict[str, SemanticFact] = {}
        # Entity index: entity_name -> set of fact_ids mentioning it.
        # We index both subject AND object_value so that traversal from
        # either direction of a triple is equally fast.
        self._entity_index: dict[str, set[str]] = defaultdict(set)
        # Knowledge graph: entity -> list of outgoing edges
        self._graph: dict[str, list[KnowledgeGraphEdge]] = defaultdict(list)
        self._lock = asyncio.Lock()

    async def assert_fact(self, fact: SemanticFact) -> str:
        """
        Add or update a semantic fact.

        If a fact with the same subject-predicate-object triple already
        exists, we update its confidence and source list rather than
        creating a duplicate. Returns the fact_id of the stored fact.
        """
        fact_id = SemanticFact.make_deterministic_id(
            fact.subject, fact.predicate, fact.object_value
        )
        fact.fact_id = fact_id

        async with self._lock:
            if fact_id in self._facts:
                existing = self._facts[fact_id]
                for src in fact.source_episode_ids:
                    existing.reinforce(src)
                logger.debug(
                    "Reinforced existing fact %s (confidence=%.2f).",
                    fact_id, existing.confidence,
                )
            else:
                self._facts[fact_id] = fact
                # Index both subject and object so traversal works from either end
                self._entity_index[fact.subject].add(fact_id)
                self._entity_index[fact.object_value].add(fact_id)
                # Add a knowledge graph edge for this fact
                edge = KnowledgeGraphEdge(
                    source_entity=fact.subject,
                    relationship=fact.predicate,
                    target_entity=fact.object_value,
                )
                self._graph[fact.subject].append(edge)
                logger.debug(
                    "Asserted new fact %s: %s.", fact_id, fact.content
                )

        return fact_id

    async def get_facts_about(self, entity: str) -> list[SemanticFact]:
        """
        Retrieve all known facts about a specific entity (as subject or object).

        Uses the entity index for O(1) lookup. Updates access statistics
        on each retrieved fact.
        """
        async with self._lock:
            fact_ids = self._entity_index.get(entity, set())
            facts = [self._facts[fid] for fid in fact_ids if fid in self._facts]
            for fact in facts:
                fact.touch()
        return facts

    async def get_fact(self, fact_id: str) -> SemanticFact | None:
        """Retrieve a single fact by ID."""
        async with self._lock:
            fact = self._facts.get(fact_id)
            if fact:
                fact.touch()
            return fact

    async def traverse_graph(
        self, start_entity: str, max_hops: int = 3
    ) -> list[KnowledgeGraphEdge]:
        """
        Perform a breadth-first traversal of the knowledge graph starting
        from a given entity, up to max_hops away.

        This allows the retrieval system to find related knowledge that is
        not directly about the queried entity but is connected through a
        chain of relationships. Both forward (subject->object) and reverse
        (object->subject) edges are traversable because we index both ends.
        """
        visited: set[str] = set()
        frontier: deque[tuple[str, int]] = deque([(start_entity, 0)])
        all_edges: list[KnowledgeGraphEdge] = []

        async with self._lock:
            while frontier:
                entity, hops = frontier.popleft()
                if entity in visited or hops > max_hops:
                    continue
                visited.add(entity)
                for edge in self._graph.get(entity, []):
                    all_edges.append(edge)
                    if hops + 1 <= max_hops:
                        frontier.append((edge.target_entity, hops + 1))

        return all_edges

    async def delete_fact(self, fact_id: str) -> bool:
        """
        Permanently delete a fact and clean up indexes.
        Called by the Forgetting Engine.
        """
        async with self._lock:
            fact = self._facts.pop(fact_id, None)
            if fact is None:
                return False
            self._entity_index[fact.subject].discard(fact_id)
            self._entity_index[fact.object_value].discard(fact_id)
            # Remove empty index entries to prevent unbounded growth
            if not self._entity_index[fact.subject]:
                del self._entity_index[fact.subject]
            if not self._entity_index.get(fact.object_value) is not None and \
               not self._entity_index[fact.object_value]:
                del self._entity_index[fact.object_value]
        return True

    async def get_all_fact_ids(self) -> list[str]:
        """Return all fact IDs (for batch forgetting passes)."""
        async with self._lock:
            return list(self._facts.keys())

    def stats(self) -> dict[str, int]:
        """Return store statistics for observability."""
        return {
            "fact_count": len(self._facts),
            "entity_count": len(self._entity_index),
            "graph_entities": len(self._graph),
        }

CHAPTER FOURTEEN: THE PROCEDURAL MEMORY STORE

Procedural memory is the most underappreciated type of memory in current LLM systems. It stores not facts or episodes but knowledge of how to do things: workflows, tool usage patterns, problem-solving strategies, and learned behaviors. Procedural memories are stored as structured workflow templates with conditional logic, parameter slots, and success/failure criteria. They are retrieved not by semantic similarity to a query but by matching the current situation to known procedure patterns via tag-based lookup.

The success_rate field and the sorting logic in find_by_tags implement a crucial property: the agent learns to prefer procedures that actually work. The sorting formula combines success rate with a confidence factor that approaches 1.0 only after ten or more executions, preventing the system from over-trusting procedures with insufficient evidence.

# ucma/procedural_store.py

"""
Procedural Memory Store: storage and retrieval of reusable workflow templates.

Procedures are learned from successful episode sequences and refined over time.
Retrieval is tag-based and sorted by empirical success rate, so the agent
preferentially uses procedures that have worked well in the past.
"""

from __future__ import annotations

import asyncio
import logging
import uuid
from collections import defaultdict

from ucma.models import ProceduralMemory, ProcedureStatus, ProcedureStep

logger = logging.getLogger(__name__)


class ProceduralMemoryStore:
    """
    Stores and retrieves procedural memories.

    Retrieval is based on tag matching: given a set of situation tags,
    find procedures whose tags overlap. Results are sorted by a
    confidence-weighted success rate to prefer well-validated procedures.
    """

    def __init__(self) -> None:
        self._procedures: dict[str, ProceduralMemory] = {}
        # Tag index: tag -> set of procedure_ids with that tag
        self._tag_index: dict[str, set[str]] = defaultdict(set)
        self._lock = asyncio.Lock()

    async def store_procedure(self, procedure: ProceduralMemory) -> str:
        """Store a new procedure and update the tag index."""
        async with self._lock:
            self._procedures[procedure.procedure_id] = procedure
            for tag in procedure.trigger_tags:
                self._tag_index[tag].add(procedure.procedure_id)
        logger.debug(
            "Stored procedure '%s' with tags %s.",
            procedure.name, procedure.trigger_tags,
        )
        return procedure.procedure_id

    async def find_by_tags(
        self, tags: list[str], min_success_rate: float = 0.0
    ) -> list[ProceduralMemory]:
        """
        Find procedures whose trigger tags overlap with the given tags.

        Results are sorted by confidence-weighted success rate:
            score = success_rate * min(execution_count / 10.0, 1.0)

        The confidence factor (min(n/10, 1.0)) approaches 1.0 only after
        ten or more executions, preventing the system from over-trusting
        procedures with insufficient evidence.
        """
        async with self._lock:
            matching_ids: set[str] = set()
            for tag in tags:
                matching_ids.update(self._tag_index.get(tag, set()))

            results = [
                self._procedures[pid]
                for pid in matching_ids
                if pid in self._procedures
                and self._procedures[pid].success_rate >= min_success_rate
                and self._procedures[pid].status != ProcedureStatus.DEPRECATED
            ]

        def sort_key(proc: ProceduralMemory) -> float:
            confidence = min(proc.execution_count / 10.0, 1.0)
            return proc.success_rate * confidence

        results.sort(key=sort_key, reverse=True)
        return results

    async def get_procedure(self, procedure_id: str) -> ProceduralMemory | None:
        """Retrieve a single procedure by ID."""
        async with self._lock:
            return self._procedures.get(procedure_id)

    async def update_procedure(
        self, procedure_id: str, success: bool
    ) -> None:
        """Record the outcome of a procedure execution."""
        async with self._lock:
            proc = self._procedures.get(procedure_id)
            if proc:
                proc.update_statistics(success)

    async def delete_procedure(self, procedure_id: str) -> bool:
        """Permanently delete a procedure and clean up the tag index."""
        async with self._lock:
            proc = self._procedures.pop(procedure_id, None)
            if proc is None:
                return False
            for tag in proc.trigger_tags:
                self._tag_index[tag].discard(procedure_id)
                if not self._tag_index[tag]:
                    del self._tag_index[tag]
        return True

    async def get_all_procedure_ids(self) -> list[str]:
        """Return all procedure IDs (for batch forgetting passes)."""
        async with self._lock:
            return list(self._procedures.keys())

    def stats(self) -> dict[str, int]:
        """Return store statistics for observability."""
        return {
            "procedure_count": len(self._procedures),
            "tag_count": len(self._tag_index),
        }

CHAPTER FIFTEEN: THE LEARNING STORE

The Learning Store holds the highest-level, most abstract form of memory in UCMA: distilled insights derived from experience. A learning record is not a description of what happened (that is episodic memory) or a fact about the world (that is semantic memory). It is a lesson: a generalized principle that the agent has extracted from its experience and can apply to future situations.

Learnings are retrieved by applicability tags and learning type, filtered by minimum confidence. This prevents the agent from acting on poorly-validated insights. The asymmetric confidence update in LearningRecord.validate() makes the system conservative: it requires multiple successes to build confidence but only a few failures to erode it.

# ucma/learning_store.py

"""
Learning Store: storage and retrieval of distilled insights from experience.

Learnings are the highest-level, most abstract form of memory in UCMA.
They represent the agent's growing wisdom about how to operate effectively.
Retrieval is by applicability tags and learning type, filtered by minimum
confidence to prevent the agent from acting on poorly-validated insights.
"""

from __future__ import annotations

import asyncio
import logging
from collections import defaultdict
from typing import Optional

from ucma.models import LearningRecord, LearningType

logger = logging.getLogger(__name__)


class LearningStore:
    """
    Stores and retrieves learning records.

    The Learning Store is queried at the beginning of each task to surface
    relevant lessons from past experience. It is updated at the end of each
    task by the Sleep Consolidation Engine.
    """

    def __init__(self) -> None:
        self._learnings: dict[str, LearningRecord] = {}
        # Index by applicability tags for fast situational retrieval
        self._applicability_index: dict[str, set[str]] = defaultdict(set)
        # Index by learning type
        self._type_index: dict[str, set[str]] = defaultdict(set)
        self._lock = asyncio.Lock()

    async def store_learning(self, learning: LearningRecord) -> str:
        """Store a new learning record and update indexes."""
        async with self._lock:
            self._learnings[learning.learning_id] = learning
            for tag in learning.applicability:
                self._applicability_index[tag].add(learning.learning_id)
            self._type_index[learning.learning_type.value].add(
                learning.learning_id
            )
        logger.debug(
            "Stored learning '%s' type=%s.",
            learning.title, learning.learning_type.value,
        )
        return learning.learning_id

    async def retrieve_relevant(
        self,
        tags: list[str],
        learning_type: Optional[LearningType] = None,
        min_confidence: float = 0.3,
        max_results: int = 5,
    ) -> list[LearningRecord]:
        """
        Retrieve learning records relevant to the given situation tags.

        Results are filtered by minimum confidence and sorted by a
        combined score of confidence and impact.
        """
        async with self._lock:
            matching_ids: set[str] = set()
            for tag in tags:
                matching_ids.update(self._applicability_index.get(tag, set()))

            if learning_type is not None:
                type_ids = self._type_index.get(learning_type.value, set())
                matching_ids &= type_ids

            results = [
                self._learnings[lid]
                for lid in matching_ids
                if lid in self._learnings
                and self._learnings[lid].confidence >= min_confidence
            ]

        results.sort(
            key=lambda lr: lr.confidence * lr.impact_score, reverse=True
        )
        return results[:max_results]

    async def get_learning(self, learning_id: str) -> LearningRecord | None:
        """Retrieve a single learning by ID."""
        async with self._lock:
            return self._learnings.get(learning_id)

    async def delete_learning(self, learning_id: str) -> bool:
        """Permanently delete a learning and clean up indexes."""
        async with self._lock:
            learning = self._learnings.pop(learning_id, None)
            if learning is None:
                return False
            for tag in learning.applicability:
                self._applicability_index[tag].discard(learning_id)
                if not self._applicability_index[tag]:
                    del self._applicability_index[tag]
            self._type_index[learning.learning_type.value].discard(learning_id)
        return True

    async def get_all_learning_ids(self) -> list[str]:
        """Return all learning IDs (for batch forgetting passes)."""
        async with self._lock:
            return list(self._learnings.keys())

    def stats(self) -> dict[str, int]:
        """Return store statistics for observability."""
        return {
            "learning_count": len(self._learnings),
            "applicability_tags": len(self._applicability_index),
        }

CHAPTER SIXTEEN: THE MEMORY GRAPH

The Memory Graph is the connective tissue of UCMA. It is a directed, weighted graph where nodes represent memories of any type and edges represent typed relationships between them. The graph serves three critical functions: relational retrieval (finding memories connected to a starting memory), causal reasoning (following "caused_by" and "followed_by" edges), and guiding the Sleep Consolidation Engine (densely connected clusters are candidates for abstraction).

The PageRank-style importance computation identifies memories that are structurally central to the knowledge network. These central memories are protected from forgetting even if they have not been recently accessed, because deleting them would break many connections in the graph. The dangling node problem -- where nodes with no outgoing edges absorb PageRank without redistributing it -- is handled by redistributing dangling node scores evenly across all nodes at each iteration.

# ucma/memory_graph.py

"""
Memory Graph: the connective tissue of UCMA.

A directed, weighted, typed graph over all memory nodes. Supports efficient
traversal, cluster detection, and PageRank-style importance propagation for
identifying the most central memories. Dangling nodes (no outgoing edges)
are handled correctly by redistributing their scores evenly.
"""

from __future__ import annotations

import asyncio
import heapq
import logging
from typing import Optional

from ucma.config import UCMAConfig
from ucma.models import MemoryEdge, MemoryNode, MemoryType

logger = logging.getLogger(__name__)


class MemoryGraph:
    """
    Directed, weighted, typed graph over all memory nodes.

    Nodes represent memories of any type. Edges represent typed
    relationships: "caused_by", "followed_by", "supports",
    "contradicts", "derived_from", "similar_to", "part_of".
    """

    def __init__(self, config: UCMAConfig) -> None:
        self._config = config
        self._nodes: dict[str, MemoryNode] = {}
        # Adjacency lists for O(1) neighbor lookup
        self._outgoing: dict[str, list[MemoryEdge]] = {}
        self._incoming: dict[str, list[MemoryEdge]] = {}
        self._lock = asyncio.Lock()

    async def add_node(self, node: MemoryNode) -> None:
        """Register a new memory node. Idempotent: re-adding is a no-op."""
        async with self._lock:
            if node.node_id not in self._nodes:
                self._nodes[node.node_id] = node
                self._outgoing[node.node_id] = []
                self._incoming[node.node_id] = []

    async def add_edge(self, edge: MemoryEdge) -> None:
        """
        Add a directed edge between two existing nodes.

        Silently skips if either node is not registered, rather than
        raising, because the graph is built incrementally and a node
        may not yet be registered when an edge is first created.
        """
        async with self._lock:
            if edge.source_id not in self._nodes:
                logger.warning(
                    "Edge source %s not in graph; skipping edge.",
                    edge.source_id,
                )
                return
            if edge.target_id not in self._nodes:
                logger.warning(
                    "Edge target %s not in graph; skipping edge.",
                    edge.target_id,
                )
                return
            self._outgoing[edge.source_id].append(edge)
            self._incoming[edge.target_id].append(edge)

    async def remove_node(self, node_id: str) -> None:
        """
        Remove a node and all of its incident edges from the graph.
        Called by the Forgetting Engine when a memory is deleted.
        """
        async with self._lock:
            if node_id not in self._nodes:
                return
            del self._nodes[node_id]
            # Remove all outgoing edges from this node
            for edge in self._outgoing.pop(node_id, []):
                incoming = self._incoming.get(edge.target_id, [])
                self._incoming[edge.target_id] = [
                    e for e in incoming if e.source_id != node_id
                ]
            # Remove all incoming edges to this node
            for edge in self._incoming.pop(node_id, []):
                outgoing = self._outgoing.get(edge.source_id, [])
                self._outgoing[edge.source_id] = [
                    e for e in outgoing if e.target_id != node_id
                ]

    async def find_related(
        self,
        start_node_id: str,
        max_hops: int = 2,
        edge_types: Optional[list[str]] = None,
        min_weight: float = 0.0,
    ) -> list[tuple[str, float, int]]:
        """
        Find nodes reachable from start_node_id within max_hops.

        Returns a list of (node_id, cumulative_weight, hops) tuples,
        sorted by cumulative weight descending. Uses a max-heap (via
        negated weights) for efficient best-first traversal.

        Returns an empty list if start_node_id is not in the graph.
        """
        async with self._lock:
            if start_node_id not in self._nodes:
                logger.debug(
                    "find_related: start node %s not in graph.", start_node_id
                )
                return []

            # Max-heap via negated weights: (-weight, node_id, hops)
            heap: list[tuple[float, str, int]] = [(-1.0, start_node_id, 0)]
            best_weight: dict[str, float] = {start_node_id: 1.0}
            results: list[tuple[str, float, int]] = []

            while heap:
                neg_weight, node_id, hops = heapq.heappop(heap)
                current_weight = -neg_weight

                # Skip if we have already found a better path to this node
                if current_weight < best_weight.get(node_id, 0.0):
                    continue

                if hops > max_hops:
                    continue

                if node_id != start_node_id:
                    results.append((node_id, current_weight, hops))

                if hops == max_hops:
                    continue

                for edge in self._outgoing.get(node_id, []):
                    if edge_types and edge.edge_type not in edge_types:
                        continue
                    if edge.weight < min_weight:
                        continue
                    new_weight = current_weight * edge.weight
                    neighbor = edge.target_id
                    if new_weight > best_weight.get(neighbor, 0.0):
                        best_weight[neighbor] = new_weight
                        heapq.heappush(
                            heap, (-new_weight, neighbor, hops + 1)
                        )

        results.sort(key=lambda x: x[1], reverse=True)
        return results

    async def compute_importance_scores(self) -> dict[str, float]:
        """
        Compute PageRank-style importance scores for all nodes.

        Nodes referenced by many other important nodes receive high scores.
        Dangling nodes (no outgoing edges) are handled by redistributing
        their score evenly across all nodes at each iteration, which is the
        standard solution to the dangling node problem in PageRank.
        """
        async with self._lock:
            n = len(self._nodes)
            if n == 0:
                return {}

            node_ids = list(self._nodes.keys())
            damping = self._config.pagerank_damping
            iterations = self._config.pagerank_iterations

            # Initialize scores uniformly
            scores: dict[str, float] = {nid: 1.0 / n for nid in node_ids}

            for _ in range(iterations):
                new_scores: dict[str, float] = {}

                # Identify dangling nodes (no outgoing edges)
                dangling_sum = sum(
                    scores[nid]
                    for nid in node_ids
                    if not self._outgoing.get(nid)
                )

                for node_id in node_ids:
                    # Base score: teleportation + dangling redistribution
                    base = (1.0 - damping) / n + damping * dangling_sum / n
                    # Incoming contributions
                    incoming_sum = 0.0
                    for edge in self._incoming.get(node_id, []):
                        source_out = self._outgoing.get(edge.source_id, [])
                        if source_out:
                            total_out_weight = sum(e.weight for e in source_out)
                            if total_out_weight > 0:
                                incoming_sum += (
                                    scores[edge.source_id]
                                    * edge.weight
                                    / total_out_weight
                                )
                    new_scores[node_id] = base + damping * incoming_sum

                scores = new_scores

        return scores

    async def update_node_importance(
        self, node_id: str, importance: float
    ) -> None:
        """Update the importance of a node after PageRank computation."""
        async with self._lock:
            node = self._nodes.get(node_id)
            if node:
                node.importance = importance

    def stats(self) -> dict[str, int]:
        """Return graph statistics for observability."""
        return {
            "node_count": len(self._nodes),
            "edge_count": sum(
                len(edges) for edges in self._outgoing.values()
            ),
        }

CHAPTER SEVENTEEN: THE EMBEDDING CACHE

Embedding model calls are expensive: they require a network round-trip to an external API and add 50-200ms of latency. The Embedding Cache eliminates redundant calls by storing the embeddings of recently seen texts in an LRU cache. In a typical session where the user asks several related questions, the cache hit rate exceeds 60%, reducing embedding API costs and latency by more than half.

The implementation uses Python's collections.OrderedDict, which provides O(1) move_to_end() operations. This makes both get and put operations O(1) amortized -- a significant improvement over the list-based approach that requires O(n) removal for LRU tracking.

# ucma/embedding_cache.py

"""
LRU Embedding Cache: avoids redundant embedding model calls.

Since the same or similar queries are often repeated within a session,
caching their embeddings eliminates redundant model calls and significantly
reduces retrieval latency. Implemented with OrderedDict for O(1) LRU
operations (move_to_end is O(1) in CPython's OrderedDict).
"""

from __future__ import annotations

import asyncio
import logging
from collections import OrderedDict

logger = logging.getLogger(__name__)


class EmbeddingCache:
    """
    Thread-safe LRU cache for text embeddings.

    Uses collections.OrderedDict which provides O(1) move_to_end(),
    making both get and put operations O(1) amortized. This is
    significantly more efficient than a list-based approach, which
    requires O(n) removal for LRU tracking.
    """

    def __init__(self, max_size: int = 256) -> None:
        self._cache: OrderedDict[str, list[float]] = OrderedDict()
        self._max_size = max_size
        self._lock = asyncio.Lock()
        self._hits: int = 0
        self._misses: int = 0

    async def get(self, text: str) -> list[float] | None:
        """
        Retrieve a cached embedding, or None if not cached.

        On a hit, the entry is moved to the end (most recently used).
        """
        async with self._lock:
            if text in self._cache:
                self._cache.move_to_end(text)
                self._hits += 1
                return self._cache[text]
            self._misses += 1
            return None

    async def put(self, text: str, embedding: list[float]) -> None:
        """
        Store an embedding in the cache.

        If the cache is full, the least recently used entry (the one
        at the front of the OrderedDict) is evicted first.
        """
        async with self._lock:
            if text in self._cache:
                self._cache.move_to_end(text)
                self._cache[text] = embedding
            else:
                if len(self._cache) >= self._max_size:
                    evicted_key, _ = self._cache.popitem(last=False)
                    logger.debug("Embedding cache evicted key: %s...", evicted_key[:40])
                self._cache[text] = embedding

    @property
    def hit_rate(self) -> float:
        """Return the cache hit rate as a fraction (0.0 to 1.0)."""
        total = self._hits + self._misses
        return self._hits / total if total > 0 else 0.0

    def stats(self) -> dict[str, object]:
        """Return cache statistics for observability."""
        return {
            "size": len(self._cache),
            "max_size": self._max_size,
            "hits": self._hits,
            "misses": self._misses,
            "hit_rate": round(self.hit_rate, 3),
        }

CHAPTER EIGHTEEN: LLM PROVIDER PROTOCOLS AND IMPLEMENTATIONS

UCMA uses Python Protocols to define the interfaces for all LLM-dependent operations. This allows any embedding provider, summarization model, or fact extraction model to be plugged in without modifying the core architecture. The OpenAI implementation is provided as the default, but any provider that implements the Protocol can be substituted.

Using typing.Protocol (structural subtyping) rather than abstract base classes means that any class implementing the required methods satisfies the interface without explicit inheritance. This makes it trivial to swap providers or use mock implementations in tests.

# ucma/providers/base.py

"""
Protocol definitions for all LLM provider interfaces used by UCMA.

Using typing.Protocol (structural subtyping) rather than abstract base
classes means that any class implementing the required methods satisfies
the interface without explicit inheritance. This makes it trivial to swap
providers or use mock implementations in tests.
"""

from __future__ import annotations

from typing import Protocol, runtime_checkable


@runtime_checkable
class EmbeddingProvider(Protocol):
    """Interface for any embedding model provider."""

    async def embed(self, texts: list[str]) -> list[list[float]]:
        """
        Compute dense vector embeddings for a list of texts.

        Returns a list of embedding vectors, one per input text.
        The dimensionality of each vector is provider-dependent
        and must match the UCMAConfig.embedding_dim setting.
        """
        ...


@runtime_checkable
class SummarizationProvider(Protocol):
    """Interface for any LLM used for text summarization."""

    async def summarize(self, content: str, max_tokens: int = 200) -> str:
        """
        Produce a concise summary of the given content.

        The summary should preserve the most important information
        while fitting within the max_tokens budget.
        """
        ...


@runtime_checkable
class FactExtractionProvider(Protocol):
    """Interface for an LLM used to extract structured facts from text."""

    async def extract_facts(self, content: str) -> list[dict]:
        """
        Extract subject-predicate-object triples from text.

        Returns a list of dicts, each with keys:
            "subject"    (str): the entity the fact is about
            "predicate"  (str): the relationship or property
            "object"     (str): the value or related entity
            "content"    (str): human-readable form of the triple
            "confidence" (float): confidence in this extraction (0-1)
        """
        ...


@runtime_checkable
class ImportanceClassifierProvider(Protocol):
    """Interface for a model that scores the importance of content."""

    async def classify_importance(
        self, content: str, source: str
    ) -> float:
        """
        Estimate the importance of a piece of content (0.0 to 1.0).

        Higher scores indicate content that is more likely to be
        worth storing in long-term memory.
        """
        ...
# ucma/providers/openai_provider.py

"""
OpenAI concrete implementations of all UCMA provider protocols.

These implementations use the OpenAI Python SDK (v1.x) with async support.
The OPENAI_API_KEY environment variable must be set before instantiation.
All methods include graceful error handling with safe fallback values so
that the system continues operating even when the API is temporarily
unavailable.
"""

from __future__ import annotations

import json
import logging
import os

from openai import AsyncOpenAI

logger = logging.getLogger(__name__)


class OpenAIEmbeddingProvider:
    """
    Embedding provider using OpenAI's text-embedding models.

    Supports batched embedding requests to minimize API round-trips.
    The default model (text-embedding-3-small) produces 1536-dimensional
    vectors and offers an excellent quality/cost tradeoff.
    """

    def __init__(
        self,
        model: str = "text-embedding-3-small",
        api_key: str | None = None,
    ) -> None:
        self._model = model
        self._client = AsyncOpenAI(
            api_key=api_key or os.environ.get("OPENAI_API_KEY")
        )

    async def embed(self, texts: list[str]) -> list[list[float]]:
        """
        Compute embeddings for a batch of texts.

        OpenAI's embedding API accepts up to 2048 inputs per request.
        For larger batches, callers should chunk the input themselves.
        Returns zero vectors as a safe fallback if the API is unavailable.
        """
        if not texts:
            return []
        try:
            response = await self._client.embeddings.create(
                model=self._model,
                input=texts,
            )
            # Response items are ordered to match the input order
            return [item.embedding for item in response.data]
        except Exception as exc:
            logger.error("Embedding request failed: %s", exc)
            # Return zero vectors as a safe fallback so the system
            # continues operating even if the embedding model is unavailable
            dim = 1536  # Default for text-embedding-3-small
            return [[0.0] * dim for _ in texts]


class OpenAISummarizationProvider:
    """
    Summarization provider using OpenAI's chat completion models.
    """

    _SYSTEM_PROMPT = (
        "You are a memory consolidation assistant. "
        "Produce a concise, factual summary of the provided text. "
        "Preserve all important facts, decisions, errors, and outcomes. "
        "Omit filler and conversational pleasantries."
    )

    def __init__(
        self,
        model: str = "gpt-4o-mini",
        api_key: str | None = None,
    ) -> None:
        self._model = model
        self._client = AsyncOpenAI(
            api_key=api_key or os.environ.get("OPENAI_API_KEY")
        )

    async def summarize(self, content: str, max_tokens: int = 200) -> str:
        """Produce a concise summary of the given content."""
        try:
            response = await self._client.chat.completions.create(
                model=self._model,
                messages=[
                    {"role": "system", "content": self._SYSTEM_PROMPT},
                    {"role": "user", "content": content},
                ],
                max_tokens=max_tokens,
                temperature=0.3,
            )
            return response.choices[0].message.content or content[:500]
        except Exception as exc:
            logger.error("Summarization request failed: %s", exc)
            # Fall back to a simple truncation
            return content[:500]


class OpenAIFactExtractionProvider:
    """
    Fact extraction provider using OpenAI's chat completion models
    with structured JSON output.
    """

    _SYSTEM_PROMPT = """You are a knowledge extraction assistant.

Extract factual subject-predicate-object triples from the given text.
Return a JSON object with a "facts" key containing an array of objects,
each with these exact keys:
  "subject"    - the entity the fact is about (string)
  "predicate"  - the relationship or property (string)
  "object"     - the value or related entity (string)
  "content"    - a natural language sentence expressing the fact (string)
  "confidence" - your confidence this is a true fact, 0.0 to 1.0 (float)
If no facts can be extracted, return {"facts": []}."""

    def __init__(
        self,
        model: str = "gpt-4o-mini",
        api_key: str | None = None,
    ) -> None:
        self._model = model
        self._client = AsyncOpenAI(
            api_key=api_key or os.environ.get("OPENAI_API_KEY")
        )

    async def extract_facts(self, content: str) -> list[dict]:
        """Extract SPO triples from text using a structured LLM prompt."""
        try:
            response = await self._client.chat.completions.create(
                model=self._model,
                messages=[
                    {"role": "system", "content": self._SYSTEM_PROMPT},
                    {"role": "user", "content": content},
                ],
                max_tokens=512,
                temperature=0.0,
                response_format={"type": "json_object"},
            )
            raw = response.choices[0].message.content or '{"facts": []}'
            parsed = json.loads(raw)
            # Handle both {"facts": [...]} and [...] response shapes
            if isinstance(parsed, dict):
                parsed = parsed.get("facts", [])
            if not isinstance(parsed, list):
                return []
            return parsed
        except Exception as exc:
            logger.error("Fact extraction request failed: %s", exc)
            return []


class OpenAIImportanceClassifier:
    """
    Importance classifier using a fast heuristic approach combined with
    optional LLM scoring for ambiguous cases.

    The heuristic handles the common cases in O(1) time. The LLM is
    only called when the heuristic score falls in the ambiguous middle
    range (0.35 to 0.65), keeping API costs low.
    """

    # Sources ordered by inherent importance
    _SOURCE_WEIGHTS: dict[str, float] = {
        "system":      0.90,
        "user":        0.70,
        "tool_result": 0.50,
        "environment": 0.30,
    }

    # Keywords that strongly signal importance
    _HIGH_IMPORTANCE_KEYWORDS: frozenset[str] = frozenset({
        "error", "failed", "failure", "critical", "important", "remember",
        "always", "never", "decided", "preference", "must", "required",
        "deadline", "urgent", "warning", "success", "completed", "fixed",
    })

    def __init__(
        self,
        model: str = "gpt-4o-mini",
        api_key: str | None = None,
        use_llm_for_ambiguous: bool = False,
    ) -> None:
        self._model = model
        self._use_llm = use_llm_for_ambiguous
        self._client = (
            AsyncOpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
            if use_llm_for_ambiguous
            else None
        )

    async def classify_importance(
        self, content: str, source: str
    ) -> float:
        """
        Estimate the importance of content using a fast heuristic.

        The heuristic combines source weight, keyword presence, and
        content length into a score in [0.0, 1.0]. If the score is
        ambiguous (0.35-0.65) and LLM scoring is enabled, an LLM call
        refines the estimate.
        """
        lower = content.lower()
        source_weight = self._SOURCE_WEIGHTS.get(source, 0.40)

        # Keyword boost: each matching keyword adds 0.10, capped at 0.30
        keyword_hits = sum(
            1 for kw in self._HIGH_IMPORTANCE_KEYWORDS if kw in lower
        )
        keyword_boost = min(keyword_hits * 0.10, 0.30)

        # Length signal: very short content is likely less important
        length_factor = min(len(content) / 500.0, 1.0)

        score = source_weight * 0.5 + keyword_boost * 0.3 + length_factor * 0.2

        # Optionally refine ambiguous scores with an LLM call
        if self._use_llm and self._client and 0.35 <= score <= 0.65:
            try:
                response = await self._client.chat.completions.create(
                    model=self._model,
                    messages=[
                        {
                            "role": "system",
                            "content": (
                                "Rate the importance of storing the following "
                                "text in long-term memory on a scale from 0.0 "
                                "(not important) to 1.0 (very important). "
                                "Reply with only a decimal number."
                            ),
                        },
                        {"role": "user", "content": content[:300]},
                    ],
                    max_tokens=5,
                    temperature=0.0,
                )
                llm_score = float(
                    response.choices[0].message.content.strip()
                )
                score = max(0.0, min(1.0, llm_score))
            except Exception as exc:
                logger.debug(
                    "LLM importance scoring failed, using heuristic: %s", exc
                )

        return round(max(0.0, min(1.0, score)), 3)

CHAPTER NINETEEN: THE UNIFIED RETRIEVAL LAYER

All of the components described so far are only as useful as the retrieval system that connects them to the agent's reasoning process. The Unified Retrieval Layer is the interface through which the agent accesses all memory types with a single, consistent API. It hides the complexity of the underlying stores, performs intelligent query routing, combines results from multiple stores, and ranks them into a coherent, prioritized list.

The multi-factor ranking formula brings together semantic relevance, memory importance, recency, and retention strength into a single score. The weights are configurable via UCMAConfig. The graph expansion step implements associative retrieval -- the human experience of one memory triggering another related memory -- by following edges from the top retrieved memories to their neighbors.

# ucma/query_router.py

"""
Query Router: fast heuristic-based query classification.

Classifies natural language queries into retrieval modes in microseconds
using pure string matching, before any expensive operations are performed.
This routing step can reduce average retrieval latency by 40-60% by
avoiding unnecessary vector database calls for structured queries.
"""

from __future__ import annotations

from ucma.models import QueryMode


class QueryRouter:
    """
    Analyzes incoming queries and determines the optimal retrieval strategy.

    The router uses heuristic rules to classify queries before dispatching
    them to the appropriate stores. All classification is done with O(k)
    string matching where k is the number of keywords -- no model calls,
    no embeddings, no latency.
    """

    # Keywords that suggest temporal queries (user asking about past events)
    _TEMPORAL_KEYWORDS: frozenset[str] = frozenset({
        "yesterday", "last week", "last session", "earlier today",
        "before", "after", "when did", "recently", "today", "ago",
        "previously", "last time", "a while ago", "the other day",
    })

    # Keywords that suggest procedural queries (user asking how to do something)
    _PROCEDURAL_KEYWORDS: frozenset[str] = frozenset({
        "how to", "how do i", "how do you", "steps to", "procedure for",
        "workflow for", "process for", "guide to", "tutorial",
    })

    # Prefixes that suggest entity/structured queries
    _ENTITY_PREFIXES: tuple[str, ...] = (
        "what do you know about",
        "tell me about",
        "what is",
        "who is",
        "facts about",
    )

    def classify(self, query_text: str) -> QueryMode:
        """
        Classify a query into the most appropriate retrieval mode.

        Returns QueryMode.HYBRID as the default when no specific
        pattern is detected, which triggers both semantic and
        structured retrieval for maximum recall.
        """
        lower = query_text.lower().strip()

        # Temporal check: is the user asking about a past event?
        if any(kw in lower for kw in self._TEMPORAL_KEYWORDS):
            return QueryMode.TEMPORAL

        # Procedural check: is the user asking how to do something?
        if any(kw in lower for kw in self._PROCEDURAL_KEYWORDS):
            return QueryMode.STRUCTURED

        # Entity check: is the user asking about a specific entity?
        if any(lower.startswith(prefix) for prefix in self._ENTITY_PREFIXES):
            return QueryMode.STRUCTURED

        # Default: hybrid search combines semantic and structured retrieval
        return QueryMode.HYBRID
# ucma/retrieval.py

"""
Unified Retrieval Layer: the single access point for all memory retrieval.

Implements intelligent query routing, multi-store retrieval, result fusion,
graph-based result expansion, and multi-factor ranking. All retrieval
requests from the agent pass through this layer.
"""

from __future__ import annotations

import logging
import math
import time
from typing import Optional

import numpy as np

from ucma.config import UCMAConfig
from ucma.embedding_cache import EmbeddingCache
from ucma.episodic_store import EpisodicMemoryStore
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import (
    MemoryQuery,
    MemoryType,
    QueryMode,
    RetrievalResult,
)
from ucma.procedural_store import ProceduralMemoryStore
from ucma.providers.base import EmbeddingProvider
from ucma.query_router import QueryRouter
from ucma.retention import RetentionRegistry
from ucma.semantic_store import SemanticMemoryStore

logger = logging.getLogger(__name__)


def _cosine_similarity(a: list[float], b: list[float]) -> float:
    """
    Compute cosine similarity between two embedding vectors.

    Returns a value in [-1.0, 1.0]. Returns 0.0 for zero vectors
    to avoid division by zero.
    """
    va = np.array(a, dtype=np.float32)
    vb = np.array(b, dtype=np.float32)
    norm_a = float(np.linalg.norm(va))
    norm_b = float(np.linalg.norm(vb))
    if norm_a == 0.0 or norm_b == 0.0:
        return 0.0
    return float(np.dot(va, vb) / (norm_a * norm_b))


class UnifiedRetrievalLayer:
    """
    The single access point for all memory retrieval in UCMA.

    Combines semantic ANN search, structured index lookup, temporal
    bucket retrieval, and graph traversal into a unified, ranked result
    set. The Query Router selects the optimal strategy for each query.
    """

    def __init__(
        self,
        config: UCMAConfig,
        episodic_store: EpisodicMemoryStore,
        semantic_store: SemanticMemoryStore,
        procedural_store: ProceduralMemoryStore,
        learning_store: LearningStore,
        memory_graph: MemoryGraph,
        embedder: EmbeddingProvider,
        retention_registry: RetentionRegistry,
    ) -> None:
        self._config = config
        self._episodic = episodic_store
        self._semantic = semantic_store
        self._procedural = procedural_store
        self._learning = learning_store
        self._graph = memory_graph
        self._embedder = embedder
        self._retention = retention_registry
        self._router = QueryRouter()
        self._embed_cache = EmbeddingCache(config.embedding_cache_max_size)

    async def retrieve(self, query: MemoryQuery) -> list[RetrievalResult]:
        """
        Execute a memory retrieval query and return ranked results.

        This is the main entry point for all memory access in UCMA.
        The method is fully async and safe to call concurrently.
        """
        # Auto-classify mode if HYBRID is requested (use router to refine)
        effective_mode = query.mode
        if query.mode == QueryMode.HYBRID:
            routed = self._router.classify(query.query_text)
            # Only override HYBRID if the router found a more specific mode
            if routed != QueryMode.HYBRID:
                effective_mode = routed
                logger.debug(
                    "Query router overrode HYBRID -> %s for: '%s'",
                    effective_mode.value,
                    query.query_text[:60],
                )

        results: list[RetrievalResult] = []

        # Dispatch to appropriate retrieval strategies
        if effective_mode in (QueryMode.SEMANTIC, QueryMode.HYBRID):
            results.extend(await self._semantic_retrieval(query))

        if effective_mode in (QueryMode.STRUCTURED, QueryMode.HYBRID):
            results.extend(await self._structured_retrieval(query))

        if effective_mode == QueryMode.TEMPORAL and query.time_range:
            results.extend(await self._temporal_retrieval(query))

        if effective_mode == QueryMode.GRAPH and query.seed_node_id:
            results.extend(await self._graph_retrieval(query))

        # For temporal/structured modes, also run semantic if we got few results
        if len(results) < 3 and effective_mode != QueryMode.SEMANTIC:
            semantic_supplement = await self._semantic_retrieval(query)
            results.extend(semantic_supplement)

        # Deduplicate, keeping the highest-scoring copy of each memory
        results = self._deduplicate(results)

        # Expand with graph neighbors for associative retrieval
        if query.include_graph_neighbors and results:
            results = await self._expand_with_graph(results, query)

        # Apply multi-factor ranking
        results = self._rank_results(results)

        # Update retention states for all retrieved memories
        self._update_access_stats(results)

        top = results[:query.max_results]
        logger.debug(
            "Retrieved %d results for query '%s'",
            len(top),
            query.query_text[:60],
        )
        return top

    async def _get_query_embedding(self, query_text: str) -> list[float] | None:
        """Get or compute the embedding for a query text, using the cache."""
        cached = await self._embed_cache.get(query_text)
        if cached is not None:
            return cached
        embeddings = await self._embedder.embed([query_text])
        if embeddings:
            await self._embed_cache.put(query_text, embeddings[0])
            return embeddings[0]
        return None

    async def _semantic_retrieval(
        self, query: MemoryQuery
    ) -> list[RetrievalResult]:
        """
        Perform semantic similarity search across episodic and semantic stores.

        Uses exact cosine similarity for correctness. For production deployments
        with millions of memories, replace the inner loop with an HNSW index
        call (see Part Twenty-Five for the drop-in replacement).
        """
        query_vector = await self._get_query_embedding(query.query_text)
        if query_vector is None:
            return []

        threshold = self._config.retrieval_semantic_min_similarity
        results: list[RetrievalResult] = []

        # Search short-term episodic memories
        if not query.memory_types or MemoryType.EPISODIC in query.memory_types:
            all_episodic = await self._episodic.get_all_short_term()
            for entry in all_episodic:
                if entry.embedding is None:
                    continue
                score = _cosine_similarity(query_vector, entry.embedding)
                if score >= threshold:
                    results.append(RetrievalResult(
                        memory_id=entry.entry_id,
                        memory_type=MemoryType.EPISODIC,
                        content=entry.content,
                        relevance_score=float(score),
                        importance=entry.importance,
                        timestamp=entry.timestamp,
                        source_store="episodic_short_term",
                    ))

        # Search semantic facts
        if not query.memory_types or MemoryType.SEMANTIC in query.memory_types:
            async with self._semantic._lock:
                all_facts = list(self._semantic._facts.values())
            for fact in all_facts:
                if fact.embedding is None:
                    continue
                score = _cosine_similarity(query_vector, fact.embedding)
                if score >= threshold:
                    results.append(RetrievalResult(
                        memory_id=fact.fact_id,
                        memory_type=MemoryType.SEMANTIC,
                        content=fact.content,
                        relevance_score=float(score),
                        importance=fact.importance,
                        timestamp=None,
                        source_store="semantic",
                    ))

        return results

    async def _structured_retrieval(
        self, query: MemoryQuery
    ) -> list[RetrievalResult]:
        """
        Perform structured metadata-based retrieval.
        Handles entity filters and tag filters.
        """
        results: list[RetrievalResult] = []

        if query.entity_filter:
            facts = await self._semantic.get_facts_about(query.entity_filter)
            for fact in facts:
                results.append(RetrievalResult(
                    memory_id=fact.fact_id,
                    memory_type=MemoryType.SEMANTIC,
                    content=fact.content,
                    relevance_score=fact.confidence,
                    importance=fact.importance,
                    timestamp=None,
                    source_store="semantic_entity",
                ))

        if query.tag_filter:
            procedures = await self._procedural.find_by_tags(query.tag_filter)
            for proc in procedures:
                results.append(RetrievalResult(
                    memory_id=proc.procedure_id,
                    memory_type=MemoryType.PROCEDURAL,
                    content=proc.description,
                    relevance_score=proc.success_rate,
                    importance=proc.success_rate,
                    timestamp=proc.last_used,
                    source_store="procedural",
                ))

        return results

    async def _temporal_retrieval(
        self, query: MemoryQuery
    ) -> list[RetrievalResult]:
        """Retrieve episodic memories within the query's time range."""
        if not query.time_range:
            return []
        start, end = query.time_range
        entries = await self._episodic.retrieve_by_time_range(start, end)
        return [
            RetrievalResult(
                memory_id=entry.entry_id,
                memory_type=MemoryType.EPISODIC,
                content=entry.content,
                relevance_score=entry.importance,
                importance=entry.importance,
                timestamp=entry.timestamp,
                source_store="episodic_temporal",
            )
            for entry in entries
        ]

    async def _graph_retrieval(
        self, query: MemoryQuery
    ) -> list[RetrievalResult]:
        """Retrieve memories by traversing the Memory Graph from a seed node."""
        if not query.seed_node_id:
            return []
        related = await self._graph.find_related(
            query.seed_node_id, max_hops=2
        )
        results: list[RetrievalResult] = []
        async with self._graph._lock:
            for node_id, weight, hops in related:
                node = self._graph._nodes.get(node_id)
                if not node:
                    continue
                results.append(RetrievalResult(
                    memory_id=node_id,
                    memory_type=node.memory_type,
                    content="",  # Content fetched lazily from the typed store
                    relevance_score=weight,
                    importance=node.importance,
                    timestamp=node.created_at,
                    source_store="graph",
                    graph_distance=hops,
                ))
        return results

    async def _expand_with_graph(
        self,
        results: list[RetrievalResult],
        query: MemoryQuery,
    ) -> list[RetrievalResult]:
        """
        Expand the result set by adding graph neighbors of the top results.

        This implements associative retrieval: one memory triggers related
        memories through the graph structure, mirroring how human memory
        works. Graph-expanded results receive a discounted relevance score
        (multiplied by 0.7) to rank them below direct matches.
        """
        top_k = self._config.retrieval_graph_expansion_top_k
        min_weight = self._config.retrieval_graph_expansion_min_weight
        expanded_ids = {r.memory_id for r in results}
        new_results: list[RetrievalResult] = []

        for result in results[:top_k]:
            neighbors = await self._graph.find_related(
                result.memory_id,
                max_hops=self._config.retrieval_graph_expansion_hops,
                min_weight=min_weight,
            )
            async with self._graph._lock:
                for neighbor_id, weight, _ in neighbors:
                    if neighbor_id in expanded_ids:
                        continue
                    node = self._graph._nodes.get(neighbor_id)
                    if not node:
                        continue
                    new_results.append(RetrievalResult(
                        memory_id=neighbor_id,
                        memory_type=node.memory_type,
                        content="",  # Content fetched lazily from typed store
                        relevance_score=result.relevance_score * weight * 0.7,
                        importance=node.importance,
                        timestamp=node.created_at,
                        source_store="graph_expansion",
                        graph_distance=1,
                    ))
                    expanded_ids.add(neighbor_id)

        return results + new_results

    def _rank_results(
        self, results: list[RetrievalResult]
    ) -> list[RetrievalResult]:
        """
        Apply multi-factor ranking to retrieval results.

        The final score is a weighted combination of:
          - relevance_score: semantic similarity or structured match quality
          - importance: memory importance (from graph PageRank or heuristic)
          - recency: exponential decay based on memory age
          - retention: current Ebbinghaus retention score

        All weights are configurable via UCMAConfig.
        """
        cfg = self._config
        now = time.time()
        halflife_seconds = cfg.rank_recency_halflife_days * 86400.0

        for result in results:
            # Recency: exponential decay from memory creation time
            if result.timestamp is not None:
                age_seconds = now - result.timestamp
                recency = math.exp(-age_seconds / halflife_seconds)
            else:
                recency = 0.5  # Unknown age: assume moderate recency

            # Retention: current Ebbinghaus retention score
            state = self._retention.get(result.memory_id)
            retention = state.current_retention() if state else 0.5

            result.relevance_score = (
                cfg.rank_weight_relevance  * result.relevance_score
                + cfg.rank_weight_importance * result.importance
                + cfg.rank_weight_recency    * recency
                + cfg.rank_weight_retention  * retention
            )

        results.sort(key=lambda r: r.relevance_score, reverse=True)
        return results

    def _deduplicate(
        self, results: list[RetrievalResult]
    ) -> list[RetrievalResult]:
        """Remove duplicate results, keeping the highest-scoring copy."""
        seen: dict[str, RetrievalResult] = {}
        for result in results:
            if (result.memory_id not in seen
                    or result.relevance_score
                    > seen[result.memory_id].relevance_score):
                seen[result.memory_id] = result
        return list(seen.values())

    def _update_access_stats(self, results: list[RetrievalResult]) -> None:
        """
        Update retention states for all retrieved memories.
        Retrieval counts as a review event in the Ebbinghaus model,
        increasing stability and resetting the decay clock.
        """
        for result in results:
            self._retention.reinforce(
                result.memory_id,
                self._config.retention_stability_boost_base,
            )

CHAPTER TWENTY: THE FORGETTING ENGINE

Forgetting is not failure. It is a feature. The Forgetting Engine implements principled, mathematically grounded forgetting that keeps the memory system fast, focused, and free of noise. It is based on three well-established principles: the Ebbinghaus forgetting curve, the spacing effect, and importance weighting via PageRank.

The confidence-adjusted forgetting threshold for semantic facts is a particularly elegant mechanism. A fact with low confidence needs a much higher retention score to survive than a fact with high confidence. This means that uncertain facts are forgotten quickly unless they are frequently reinforced, while well-established facts persist even when rarely accessed. Procedural memories with high success rates are strongly protected -- we do not forget skills that work.

# ucma/forgetting.py

"""
Forgetting Engine: principled, importance-weighted memory pruning.

Implements the Ebbinghaus forgetting curve with PageRank-based importance
protection. Memories below the retention threshold are forgotten unless
they are structurally central to the Memory Graph (high PageRank score)
or are well-validated procedural memories.
"""

from __future__ import annotations

import logging

from ucma.config import UCMAConfig
from ucma.episodic_store import EpisodicMemoryStore
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import ProcedureStatus
from ucma.procedural_store import ProceduralMemoryStore
from ucma.retention import RetentionRegistry
from ucma.semantic_store import SemanticMemoryStore

logger = logging.getLogger(__name__)


class ForgettingEngine:
    """
    Implements principled, importance-weighted forgetting across all stores.

    Runs as part of the Sleep Consolidation cycle but can also be triggered
    on-demand when memory pressure is high. All deletions are coordinated
    with the Memory Graph to remove orphaned nodes and clean up edges.
    """

    def __init__(
        self,
        config: UCMAConfig,
        episodic_store: EpisodicMemoryStore,
        semantic_store: SemanticMemoryStore,
        procedural_store: ProceduralMemoryStore,
        learning_store: LearningStore,
        memory_graph: MemoryGraph,
        retention_registry: RetentionRegistry,
    ) -> None:
        self._config = config
        self._episodic = episodic_store
        self._semantic = semantic_store
        self._procedural = procedural_store
        self._learning = learning_store
        self._graph = memory_graph
        self._retention = retention_registry

    async def run_forgetting_pass(self) -> dict[str, int]:
        """
        Execute one pass of the forgetting algorithm across all stores.

        Computes PageRank importance scores first to identify structurally
        protected memories, then processes each store independently.
        Returns a summary of how many memories were forgotten per store.
        """
        logger.info("Starting forgetting pass...")

        # Compute graph importance scores once for the entire pass.
        # This is the most expensive step but must be done before any
        # deletions to ensure protection decisions are consistent.
        importance_scores = await self._graph.compute_importance_scores()

        counts = {
            "episodic":   await self._forget_episodic(importance_scores),
            "semantic":   await self._forget_semantic(importance_scores),
            "procedural": await self._forget_procedural(importance_scores),
            "learning":   await self._forget_learning(importance_scores),
        }

        logger.info(
            "Forgetting pass complete: %s",
            {k: v for k, v in counts.items() if v > 0},
        )
        return counts

    def _should_forget(
        self,
        memory_id: str,
        importance_scores: dict[str, float],
        retention_threshold: float | None = None,
    ) -> bool:
        """
        Determine whether a specific memory should be forgotten.

        A memory is protected if:
          1. Its PageRank importance score is above the protection threshold.
          2. Its current Ebbinghaus retention is above the threshold.

        If the memory has no retention state, it is treated as having
        full retention (newly created memories are never immediately forgotten).
        """
        threshold = (
            retention_threshold
            if retention_threshold is not None
            else self._config.forgetting_retention_threshold
        )

        # Protect structurally important memories (high PageRank)
        graph_importance = importance_scores.get(memory_id, 0.0)
        if graph_importance > self._config.forgetting_importance_protection:
            return False

        # Check Ebbinghaus retention
        state = self._retention.get(memory_id)
        if state is None:
            return False  # No retention state: assume newly created, protect it

        return state.current_retention() < threshold

    async def _forget_episodic(
        self, importance_scores: dict[str, float]
    ) -> int:
        """Forget eligible medium-term episodic memories."""
        count = 0
        entry_ids = await self._episodic.get_all_medium_term_ids()
        for entry_id in entry_ids:
            if self._should_forget(entry_id, importance_scores):
                deleted = await self._episodic.delete(entry_id)
                if deleted:
                    self._retention.remove(entry_id)
                    await self._graph.remove_node(entry_id)
                    count += 1
        return count

    async def _forget_semantic(
        self, importance_scores: dict[str, float]
    ) -> int:
        """
        Forget eligible semantic facts.

        Low-confidence facts are forgotten more aggressively: the effective
        retention threshold is scaled inversely by confidence. A fact with
        confidence 0.3 must have a retention score three times higher than
        the base threshold to survive, while a fact with confidence 1.0
        only needs to meet the base threshold.
        """
        count = 0
        fact_ids = await self._semantic.get_all_fact_ids()
        for fact_id in fact_ids:
            fact = await self._semantic.get_fact(fact_id)
            if fact is None:
                continue
            # Adjust threshold inversely by confidence
            adjusted_threshold = (
                self._config.forgetting_retention_threshold
                / max(fact.confidence, 0.10)
            )
            if self._should_forget(
                fact_id, importance_scores, adjusted_threshold
            ):
                deleted = await self._semantic.delete_fact(fact_id)
                if deleted:
                    self._retention.remove(fact_id)
                    await self._graph.remove_node(fact_id)
                    count += 1
        return count

    async def _forget_procedural(
        self, importance_scores: dict[str, float]
    ) -> int:
        """
        Forget eligible procedural memories.

        Active procedures with success rates above 50% are strongly
        protected. Only draft procedures or those with very low success
        rates are candidates for forgetting.
        """
        count = 0
        proc_ids = await self._procedural.get_all_procedure_ids()
        for proc_id in proc_ids:
            proc = await self._procedural.get_procedure(proc_id)
            if proc is None:
                continue
            # Protect active procedures that work well
            if (proc.status == ProcedureStatus.ACTIVE
                    and proc.success_rate > 0.5):
                continue
            if self._should_forget(proc_id, importance_scores):
                deleted = await self._procedural.delete_procedure(proc_id)
                if deleted:
                    self._retention.remove(proc_id)
                    await self._graph.remove_node(proc_id)
                    count += 1
        return count

    async def _forget_learning(
        self, importance_scores: dict[str, float]
    ) -> int:
        """
        Forget eligible learning records.

        Validated learnings with high confidence are strongly protected.
        """
        count = 0
        learning_ids = await self._learning.get_all_learning_ids()
        for learning_id in learning_ids:
            learning = await self._learning.get_learning(learning_id)
            if learning is None:
                continue
            # Protect validated, high-confidence learnings
            if learning.validated and learning.confidence > 0.6:
                continue
            if self._should_forget(learning_id, importance_scores):
                deleted = await self._learning.delete_learning(learning_id)
                if deleted:
                    self._retention.remove(learning_id)
                    await self._graph.remove_node(learning_id)
                    count += 1
        return count

CHAPTER TWENTY-ONE: THE SLEEP CONSOLIDATION ENGINE

This is where UCMA truly distinguishes itself from every existing memory system. The Sleep Consolidation Engine is a background process that runs periodically -- during low-activity periods, analogous to biological sleep -- and performs four critical operations: compression, abstraction, reinforcement, and pruning.

Compression takes groups of related short-term episodic memories and replaces them with a more compact representation. Abstraction takes patterns found across multiple episodes and extracts them as semantic facts. Reinforcement strengthens the connections between memories that co-occur frequently. Pruning removes memories that are old, unimportant, and rarely accessed, following the Ebbinghaus forgetting curve.

The engine is designed to be interruptible: if the agent needs to respond to a request, consolidation pauses gracefully and resumes when the agent is idle.

# ucma/consolidation.py

"""
Sleep Consolidation Engine: background memory maintenance and abstraction.

Runs periodically to compress episodic memories, extract semantic facts,
reinforce important connections, and prune forgotten memories. Designed
to be interruptible: consolidation pauses when the agent is active and
resumes when it becomes idle again.
"""

from __future__ import annotations

import asyncio
import logging
import time
import uuid

from ucma.config import UCMAConfig
from ucma.episodic_store import EpisodicMemoryStore
from ucma.forgetting import ForgettingEngine
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import (
    EpisodicEntry,
    MemoryEdge,
    MemoryNode,
    MemoryType,
    SemanticFact,
)
from ucma.procedural_store import ProceduralMemoryStore
from ucma.providers.base import (
    EmbeddingProvider,
    FactExtractionProvider,
    SummarizationProvider,
)
from ucma.retention import RetentionRegistry
from ucma.semantic_store import SemanticMemoryStore

logger = logging.getLogger(__name__)


class SleepConsolidationEngine:
    """
    The background consolidation process that transforms raw episodic
    memories into structured long-term knowledge.

    The engine runs as an asyncio Task. It can be interrupted at any
    point by setting the _interrupt event, which causes the current
    cycle to finish its current phase and then pause until the event
    is cleared. This ensures consolidation never adds latency to the
    agent's critical response path.
    """

    def __init__(
        self,
        config: UCMAConfig,
        episodic_store: EpisodicMemoryStore,
        semantic_store: SemanticMemoryStore,
        procedural_store: ProceduralMemoryStore,
        learning_store: LearningStore,
        memory_graph: MemoryGraph,
        retention_registry: RetentionRegistry,
        forgetting_engine: ForgettingEngine,
        embedder: EmbeddingProvider,
        summarizer: SummarizationProvider,
        fact_extractor: FactExtractionProvider,
    ) -> None:
        self._config = config
        self._episodic = episodic_store
        self._semantic = semantic_store
        self._procedural = procedural_store
        self._learning = learning_store
        self._graph = memory_graph
        self._retention = retention_registry
        self._forgetting = forgetting_engine
        self._embedder = embedder
        self._summarizer = summarizer
        self._fact_extractor = fact_extractor

        self._task: asyncio.Task | None = None
        self._interrupt = asyncio.Event()
        self._stop = asyncio.Event()
        self._wake_event = asyncio.Event()

    async def start(self) -> None:
        """Start the background consolidation loop as an asyncio Task."""
        self._stop.clear()
        self._task = asyncio.create_task(
            self._consolidation_loop(), name="ucma-consolidation"
        )
        logger.info("Sleep Consolidation Engine started.")

    async def stop(self) -> None:
        """
        Gracefully stop the consolidation engine.
        Signals the loop to exit and waits for the current cycle to finish.
        """
        self._stop.set()
        self._wake_event.set()  # Wake up if sleeping
        if self._task and not self._task.done():
            try:
                await asyncio.wait_for(self._task, timeout=30.0)
            except asyncio.TimeoutError:
                self._task.cancel()
                logger.warning(
                    "Consolidation engine did not stop within 30s; cancelled."
                )
        logger.info("Sleep Consolidation Engine stopped.")

    def interrupt(self) -> None:
        """
        Signal that the agent is active. The engine will pause after
        completing its current phase and resume when wake_now() is called.
        """
        self._interrupt.set()

    def wake_now(self) -> None:
        """
        Trigger an immediate consolidation cycle (e.g., at session end).
        Also clears any active interrupt signal.
        """
        self._interrupt.clear()
        self._wake_event.set()

    async def _consolidation_loop(self) -> None:
        """
        The main background loop. Sleeps for the configured interval,
        then runs one consolidation cycle. Respects stop and interrupt signals.
        """
        while not self._stop.is_set():
            # Wait for the interval or a wake signal
            try:
                await asyncio.wait_for(
                    self._wake_event.wait(),
                    timeout=self._config.consolidation_interval_seconds,
                )
                self._wake_event.clear()
            except asyncio.TimeoutError:
                pass  # Normal: interval elapsed, run consolidation

            if self._stop.is_set():
                break

            # Clear interrupt before starting so new interrupts during
            # the cycle are detected correctly
            self._interrupt.clear()

            try:
                await self._run_cycle()
            except Exception as exc:
                logger.error(
                    "Consolidation cycle failed with error: %s", exc,
                    exc_info=True,
                )

    async def _run_cycle(self) -> None:
        """Execute one full consolidation cycle: compress, abstract, reinforce, prune."""
        cycle_start = time.time()
        logger.info("=== Sleep Consolidation Cycle Starting ===")

        # Phase 1: Compress expired short-term episodic memories
        expired_entries = await self._episodic.get_expired_short_term()
        promoted_count = await self._compress_episodes(expired_entries)
        logger.info(
            "Phase 1 (Compress): %d entries expired, %d promoted to medium-term.",
            len(expired_entries), promoted_count,
        )

        if self._interrupt.is_set():
            logger.info("Consolidation interrupted after Phase 1.")
            return

        # Phase 2: Abstract patterns into semantic facts
        abstracted_count = await self._abstract_to_semantic()
        logger.info(
            "Phase 2 (Abstract): %d entries processed for fact extraction.",
            abstracted_count,
        )

        if self._interrupt.is_set():
            logger.info("Consolidation interrupted after Phase 2.")
            return

        # Phase 3: Reinforce connections via PageRank
        await self._reinforce_connections()
        logger.info("Phase 3 (Reinforce): PageRank importance scores updated.")

        if self._interrupt.is_set():
            logger.info("Consolidation interrupted after Phase 3.")
            return

        # Phase 4: Prune forgotten memories
        forgotten_counts = await self._forgetting.run_forgetting_pass()
        logger.info("Phase 4 (Prune): %s memories forgotten.", forgotten_counts)

        elapsed = time.time() - cycle_start
        logger.info(
            "=== Sleep Consolidation Cycle Complete (%.2fs) ===", elapsed
        )

    async def _compress_episodes(
        self, expired_entries: list[EpisodicEntry]
    ) -> int:
        """
        Compress expired short-term episodic memories.

        High-importance entries are summarized and promoted to medium-term
        storage. Low-importance entries are discarded. The importance
        threshold is adaptive: when many entries expire at once, we raise
        the bar to avoid bloating medium-term storage.
        """
        if not expired_entries:
            return 0

        n = len(expired_entries)
        # Adaptive threshold: raise bar when many entries expire at once
        volume_penalty = min(
            (n / 100.0) * self._config.compression_volume_penalty_max,
            self._config.compression_volume_penalty_max,
        )
        threshold = self._config.compression_importance_base + volume_penalty

        promoted = 0
        for entry in expired_entries:
            if entry.importance < threshold:
                # Low importance: delete from short-term without promoting
                await self._episodic.delete(entry.entry_id)
                continue

            # Summarize the entry content
            summary = await self._summarizer.summarize(
                entry.content,
                max_tokens=self._config.consolidation_summary_max_tokens,
            )

            # Promote to medium-term with summary
            entry.summary = summary
            entry.tier = "medium_term"
            await self._episodic.store(entry)

            # Register or update in retention registry
            state = self._retention.get(entry.entry_id)
            if state is None:
                self._retention.register(entry.entry_id, initial_stability=1.0)

            promoted += 1

        return promoted

    async def _abstract_to_semantic(self) -> int:
        """
        Scan recent medium-term episodic memories and extract semantic facts.

        This is the core abstraction step: turning specific episodic content
        into timeless semantic knowledge. Each extracted fact is stored in
        the Semantic Memory Store and connected to its source episode via
        a "derived_from" edge in the Memory Graph.
        """
        now = time.time()
        # Process the last 24 hours of medium-term memories
        recent_entries = await self._episodic.retrieve_by_time_range(
            start_time=now - 86400.0,
            end_time=now,
        )

        processed = 0
        for entry in recent_entries:
            # Skip entries that have already been abstracted
            if "abstracted" in entry.tags:
                continue

            raw_facts = await self._fact_extractor.extract_facts(entry.content)

            for raw in raw_facts:
                subject   = raw.get("subject", "").strip()
                predicate = raw.get("predicate", "").strip()
                obj       = raw.get("object", "").strip()
                content   = raw.get("content", "").strip()
                confidence = float(raw.get("confidence", 0.5))

                if not (subject and predicate and obj and content):
                    continue

                # Compute embedding for the fact's natural language content
                embeddings = await self._embedder.embed([content])
                embedding = embeddings[0] if embeddings else None

                fact = SemanticFact(
                    subject=subject,
                    predicate=predicate,
                    object_value=obj,
                    content=content,
                    confidence=confidence,
                    source_episode_ids=[entry.entry_id],
                    embedding=embedding,
                    importance=entry.importance,
                )
                fact_id = await self._semantic.assert_fact(fact)

                # Register the fact as a node in the Memory Graph
                fact_node = MemoryNode(
                    node_id=fact_id,
                    memory_type=MemoryType.SEMANTIC,
                    importance=entry.importance,
                    created_at=time.time(),
                    last_accessed=time.time(),
                )
                await self._graph.add_node(fact_node)

                # Connect fact to its source episode
                async with self._graph._lock:
                    episode_in_graph = entry.entry_id in self._graph._nodes
                if episode_in_graph:
                    edge = MemoryEdge(
                        source_id=fact_id,
                        target_id=entry.entry_id,
                        edge_type="derived_from",
                        weight=0.8,
                    )
                    await self._graph.add_edge(edge)

                # Register retention state for the new fact
                if self._retention.get(fact_id) is None:
                    self._retention.register(fact_id, initial_stability=2.0)

            # Mark this entry as abstracted to avoid reprocessing
            entry.tags.append("abstracted")
            processed += 1

        return processed

    async def _reinforce_connections(self) -> None:
        """
        Update node importance scores based on PageRank and strengthen
        edges between frequently co-accessed memories.

        This implements Hebb's rule at the graph level: memories that are
        frequently retrieved together develop stronger connections, making
        them more likely to be retrieved together in the future.
        """
        importance_scores = await self._graph.compute_importance_scores()

        # Update node importance in the graph based on PageRank scores.
        # Scale by 10 to map PageRank's small values (typically 0.001-0.01
        # for large graphs) to a more useful 0-1 range.
        for node_id, score in importance_scores.items():
            scaled = min(score * 10.0, 1.0)
            await self._graph.update_node_importance(node_id, scaled)

CHAPTER TWENTY-TWO: THE MEMORY CONTROLLER

The Memory Controller is the top-level orchestrator of UCMA. It provides a clean, simple API to the agent while managing all the complexity of the underlying memory architecture. The agent only needs to call observe(),remember()learn(), and new_session(). Everything else is handled internally.

# ucma/controller.py

"""
Memory Controller: the top-level orchestrator of UCMA.

Provides a clean, simple API to the agent while managing all the complexity
of the underlying memory architecture. The observe() and remember() methods
form the fundamental read-write interface. All other methods are lifecycle
management (new_session, start, stop) or convenience wrappers.
"""

from __future__ import annotations

import logging
import time
import uuid

from ucma.config import UCMAConfig
from ucma.consolidation import SleepConsolidationEngine
from ucma.episodic_store import EpisodicMemoryStore
from ucma.forgetting import ForgettingEngine
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import (
    EpisodicEntry,
    LearningRecord,
    LearningType,
    MemoryEdge,
    MemoryNode,
    MemoryQuery,
    MemoryType,
    QueryMode,
    RetrievalResult,
)
from ucma.procedural_store import ProceduralMemoryStore
from ucma.providers.base import EmbeddingProvider, ImportanceClassifierProvider
from ucma.retention import RetentionRegistry
from ucma.retrieval import UnifiedRetrievalLayer
from ucma.semantic_store import SemanticMemoryStore
from ucma.sensory_buffer import SensoryBuffer
from ucma.working_memory import WorkingMemoryManager

logger = logging.getLogger(__name__)


class MemoryController:
    """
    The central orchestrator of UCMA.

    Provides a clean, simple API to the agent while managing all the
    complexity of the underlying memory architecture. The agent only
    needs to call observe(), remember(), learn(), and new_session().
    Everything else is handled internally.
    """

    def __init__(
        self,
        config: UCMAConfig,
        agent_id: str,
        sensory_buffer: SensoryBuffer,
        working_memory: WorkingMemoryManager,
        episodic_store: EpisodicMemoryStore,
        semantic_store: SemanticMemoryStore,
        procedural_store: ProceduralMemoryStore,
        learning_store: LearningStore,
        memory_graph: MemoryGraph,
        retrieval_layer: UnifiedRetrievalLayer,
        consolidation_engine: SleepConsolidationEngine,
        forgetting_engine: ForgettingEngine,
        retention_registry: RetentionRegistry,
        embedder: EmbeddingProvider,
        importance_classifier: ImportanceClassifierProvider,
    ) -> None:
        self._config = config
        self._agent_id = agent_id
        self._sensory = sensory_buffer
        self._working = working_memory
        self._episodic = episodic_store
        self._semantic = semantic_store
        self._procedural = procedural_store
        self._learning = learning_store
        self._graph = memory_graph
        self._retrieval = retrieval_layer
        self._consolidation = consolidation_engine
        self._forgetting = forgetting_engine
        self._retention = retention_registry
        self._embedder = embedder
        self._importance_classifier = importance_classifier
        self._current_session_id: str = str(uuid.uuid4())

    async def start(self) -> None:
        """Start background services (consolidation engine)."""
        await self._consolidation.start()
        logger.info(
            "MemoryController started for agent '%s', session '%s'.",
            self._agent_id,
            self._current_session_id,
        )

    async def stop(self) -> None:
        """Gracefully stop all background services."""
        await self._consolidation.stop()
        logger.info("MemoryController stopped for agent '%s'.", self._agent_id)

    async def observe(
        self,
        content: str,
        source: str,
        modality: str = "text",
    ) -> str:
        """
        The primary input method: observe a new piece of information.

        Called every time the agent receives input from the user, a tool,
        or the environment. Classifies importance, creates an episodic
        memory if the content is important enough, computes its embedding,
        and registers it in the Memory Graph.

        Returns the entry_id of the created sensory entry.
        """
        # Step 1: Place in sensory buffer (always, regardless of importance)
        entry = await self._sensory.ingest(content, source, modality)

        # Step 2: Classify importance
        importance = await self._importance_classifier.classify_importance(
            content, source
        )

        # Step 3: Create episodic memory if important enough
        if importance > 0.2:
            await self._create_episodic_memory(
                entry.entry_id, content, source, importance
            )

        logger.debug(
            "Observed: source=%s importance=%.2f id=%s",
            source, importance, entry.entry_id,
        )
        return entry.entry_id

    async def _create_episodic_memory(
        self,
        entry_id: str,
        content: str,
        source: str,
        importance: float,
    ) -> EpisodicEntry:
        """
        Transform a sensory observation into a full episodic memory.

        Computes the content embedding, stores the entry in the episodic
        store, registers it as a node in the Memory Graph, and creates
        a retention state for it.
        """
        # Compute embedding for semantic search
        embeddings = await self._embedder.embed([content])
        embedding = embeddings[0] if embeddings else None

        episodic = EpisodicEntry(
            entry_id=entry_id,
            session_id=self._current_session_id,
            agent_id=self._agent_id,
            timestamp=time.time(),
            content=content,
            importance=importance,
            participants=[source],
            embedding=embedding,
            tier="short_term",
        )
        await self._episodic.store(episodic)

        # Register in the Memory Graph
        node = MemoryNode(
            node_id=entry_id,
            memory_type=MemoryType.EPISODIC,
            importance=importance,
            created_at=episodic.timestamp,
            last_accessed=episodic.timestamp,
        )
        await self._graph.add_node(node)

        # Register retention state
        self._retention.register(entry_id, initial_stability=1.0)

        return episodic

    async def remember(self, query: MemoryQuery) -> list[RetrievalResult]:
        """
        Retrieve memories relevant to the given query.

        This is the primary read method. Results are ranked by the
        multi-factor ranking formula and injected into the Working Memory
        staging zone so the LLM can access them in its next response.
        """
        results = await self._retrieval.retrieve(query)

        # Inject top results into working memory staging zone
        for result in results[:3]:
            if result.content:
                await self._working.add(
                    item_id=result.memory_id,
                    content=result.content,
                    zone="staging",
                    importance=result.importance,
                )

        return results

    async def learn(
        self,
        insight: str,
        learning_type: LearningType,
        evidence_ids: list[str],
        applicability_tags: list[str],
        confidence: float = 0.5,
        impact_score: float = 0.5,
    ) -> str:
        """
        Store a new learning derived from experience.

        Called by the agent or the consolidation engine when a significant
        insight has been extracted from episodes. The learning is registered
        in the Memory Graph and connected to its evidence nodes.
        """
        learning_id = str(uuid.uuid4())
        learning = LearningRecord(
            learning_id=learning_id,
            learning_type=learning_type,
            title=insight[:80],
            insight=insight,
            evidence=evidence_ids,
            applicability=applicability_tags,
            confidence=confidence,
            impact_score=impact_score,
        )
        await self._learning.store_learning(learning)

        # Register in the Memory Graph as a high-importance node
        node = MemoryNode(
            node_id=learning_id,
            memory_type=MemoryType.LEARNING,
            importance=impact_score,
            created_at=learning.created_at,
            last_accessed=learning.created_at,
        )
        await self._graph.add_node(node)

        # Connect to evidence nodes in the graph
        for evidence_id in evidence_ids:
            async with self._graph._lock:
                evidence_exists = evidence_id in self._graph._nodes
            if evidence_exists:
                edge = MemoryEdge(
                    source_id=learning_id,
                    target_id=evidence_id,
                    edge_type="derived_from",
                    weight=0.8,
                )
                await self._graph.add_edge(edge)

        # Register retention state with high initial stability
        self._retention.register(learning_id, initial_stability=3.0)

        logger.info(
            "Stored learning '%s' type=%s confidence=%.2f",
            insight[:60], learning_type.value, confidence,
        )
        return learning_id

    async def new_session(self) -> str:
        """
        Start a new conversation session.

        Clears the active and staging zones of working memory (the system
        zone, containing core instructions, is preserved). Triggers the
        consolidation engine to process any pending memories from the
        previous session. Returns the new session ID.
        """
        old_session = self._current_session_id
        self._current_session_id = str(uuid.uuid4())

        # Clear dynamic working memory zones
        cleared_active  = await self._working.clear_zone("active")
        cleared_staging = await self._working.clear_zone("staging")
        logger.info(
            "New session %s started (old: %s). Cleared %d active + %d staging items.",
            self._current_session_id, old_session,
            cleared_active, cleared_staging,
        )

        # Trigger consolidation of the previous session's memories
        self._consolidation.wake_now()

        return self._current_session_id

    def trigger_consolidation(self) -> None:
        """
        Manually trigger a consolidation cycle.

        Call this when the agent knows it is entering an idle period
        (e.g., end of business hours, waiting for a long-running task).
        """
        self._consolidation.wake_now()

    def stats(self) -> dict[str, object]:
        """Return a comprehensive statistics summary for observability."""
        return {
            "agent_id": self._agent_id,
            "session_id": self._current_session_id,
            "sensory_buffer": {
                "size": self._sensory.size,
                "total_ingested": self._sensory.total_ingested,
            },
            "working_memory": self._working.usage_summary(),
            "episodic_store": self._episodic.stats(),
            "semantic_store": self._semantic.stats(),
            "procedural_store": self._procedural.stats(),
            "learning_store": self._learning.stats(),
            "memory_graph": self._graph.stats(),
            "retention_registry_size": len(self._retention),
        }

CHAPTER TWENTY-THREE: PACKAGE INIT FILES

# ucma/__init__.py

"""
Unified Cognitive Memory Architecture (UCMA).

A complete, biologically-inspired memory system for LLM agents.
Import the build_memory_controller factory from ucma.main for
the simplest way to get started.
"""

from ucma.config import UCMAConfig
from ucma.controller import MemoryController
from ucma.models import (
    EpisodicEntry,
    LearningRecord,
    LearningType,
    MemoryQuery,
    MemoryType,
    QueryMode,
    RetrievalResult,
    SemanticFact,
)

__all__ = [
    "UCMAConfig",
    "MemoryController",
    "MemoryQuery",
    "MemoryType",
    "QueryMode",
    "RetrievalResult",
    "EpisodicEntry",
    "SemanticFact",
    "LearningRecord",
    "LearningType",
]
# ucma/providers/__init__.py

"""Provider protocol definitions and concrete implementations."""

from ucma.providers.base import (
    EmbeddingProvider,
    FactExtractionProvider,
    ImportanceClassifierProvider,
    SummarizationProvider,
)

__all__ = [
    "EmbeddingProvider",
    "SummarizationProvider",
    "FactExtractionProvider",
    "ImportanceClassifierProvider",
]

CHAPTER TWENTY-FOUR: WIRING AND EXAMPLE USAGE

The build_memory_controller() factory function wires all components together with a single function call. This is the recommended way to instantiate UCMA in production. The factory handles all dependency injection, ensuring that every component receives exactly the dependencies it needs.

# ucma/main.py

"""
UCMA factory and end-to-end usage example.

The build_memory_controller() factory function wires all components together
with a single function call. The example at the bottom of this file
demonstrates UCMA across two simulated sessions, showing how memory
accumulates, consolidates, and improves agent responses over time.

To run:
    export OPENAI_API_KEY="sk-..."
    python -m ucma.main
"""

from __future__ import annotations

import asyncio
import logging
import os
import sys

import structlog

from ucma.config import UCMAConfig
from ucma.consolidation import SleepConsolidationEngine
from ucma.controller import MemoryController
from ucma.episodic_store import EpisodicMemoryStore
from ucma.forgetting import ForgettingEngine
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import LearningType, MemoryQuery, MemoryType, QueryMode
from ucma.procedural_store import ProceduralMemoryStore
from ucma.providers.openai_provider import (
    OpenAIEmbeddingProvider,
    OpenAIFactExtractionProvider,
    OpenAIImportanceClassifier,
    OpenAISummarizationProvider,
)
from ucma.retention import RetentionRegistry
from ucma.retrieval import UnifiedRetrievalLayer
from ucma.semantic_store import SemanticMemoryStore
from ucma.sensory_buffer import SensoryBuffer
from ucma.working_memory import WorkingMemoryManager


def _configure_logging() -> None:
    """Configure structured logging for the UCMA system."""
    structlog.configure(
        processors=[
            structlog.stdlib.add_log_level,
            structlog.stdlib.add_logger_name,
            structlog.dev.ConsoleRenderer(),
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
    )
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(name)s %(levelname)s %(message)s",
        stream=sys.stdout,
    )


def build_memory_controller(
    agent_id: str,
    config: UCMAConfig | None = None,
    api_key: str | None = None,
) -> MemoryController:
    """
    Factory function: wire all UCMA components and return a MemoryController.

    Parameters
    ----------
    agent_id:
        A unique identifier for this agent instance. Used to tag
        episodic memories so that multi-agent systems can distinguish
        memories from different agents.
    config:
        A UCMAConfig instance. If None, the default configuration is used.
    api_key:
        OpenAI API key. If None, reads from the OPENAI_API_KEY environment
        variable.
    """
    cfg = config or UCMAConfig()
    key = api_key or os.environ.get("OPENAI_API_KEY", "")

    # ------------------------------------------------------------------ #
    # Instantiate providers                                                #
    # ------------------------------------------------------------------ #
    embedder             = OpenAIEmbeddingProvider(model=cfg.embedding_model, api_key=key)
    summarizer           = OpenAISummarizationProvider(api_key=key)
    fact_extractor       = OpenAIFactExtractionProvider(api_key=key)
    importance_classifier = OpenAIImportanceClassifier(api_key=key)

    # ------------------------------------------------------------------ #
    # Instantiate stores                                                   #
    # ------------------------------------------------------------------ #
    sensory_buffer    = SensoryBuffer(capacity=cfg.sensory_buffer_capacity)
    working_memory    = WorkingMemoryManager(config=cfg)
    episodic_store    = EpisodicMemoryStore(short_term_ttl_hours=cfg.short_term_ttl_hours)
    semantic_store    = SemanticMemoryStore()
    procedural_store  = ProceduralMemoryStore()
    learning_store    = LearningStore()
    retention_registry = RetentionRegistry()

    # ------------------------------------------------------------------ #
    # Instantiate graph                                                    #
    # ------------------------------------------------------------------ #
    memory_graph = MemoryGraph(config=cfg)

    # ------------------------------------------------------------------ #
    # Instantiate retrieval layer                                          #
    # ------------------------------------------------------------------ #
    retrieval_layer = UnifiedRetrievalLayer(
        config=cfg,
        episodic_store=episodic_store,
        semantic_store=semantic_store,
        procedural_store=procedural_store,
        learning_store=learning_store,
        memory_graph=memory_graph,
        embedder=embedder,
        retention_registry=retention_registry,
    )

    # ------------------------------------------------------------------ #
    # Instantiate forgetting engine                                        #
    # ------------------------------------------------------------------ #
    forgetting_engine = ForgettingEngine(
        config=cfg,
        episodic_store=episodic_store,
        semantic_store=semantic_store,
        procedural_store=procedural_store,
        learning_store=learning_store,
        memory_graph=memory_graph,
        retention_registry=retention_registry,
    )

    # ------------------------------------------------------------------ #
    # Instantiate sleep consolidation engine                               #
    # ------------------------------------------------------------------ #
    consolidation_engine = SleepConsolidationEngine(
        config=cfg,
        episodic_store=episodic_store,
        semantic_store=semantic_store,
        procedural_store=procedural_store,
        learning_store=learning_store,
        memory_graph=memory_graph,
        retention_registry=retention_registry,
        forgetting_engine=forgetting_engine,
        embedder=embedder,
        summarizer=summarizer,
        fact_extractor=fact_extractor,
    )

    # ------------------------------------------------------------------ #
    # Instantiate and return the Memory Controller                         #
    # ------------------------------------------------------------------ #
    return MemoryController(
        config=cfg,
        agent_id=agent_id,
        sensory_buffer=sensory_buffer,
        working_memory=working_memory,
        episodic_store=episodic_store,
        semantic_store=semantic_store,
        procedural_store=procedural_store,
        learning_store=learning_store,
        memory_graph=memory_graph,
        retrieval_layer=retrieval_layer,
        consolidation_engine=consolidation_engine,
        forgetting_engine=forgetting_engine,
        retention_registry=retention_registry,
        embedder=embedder,
        importance_classifier=importance_classifier,
    )


async def run_example() -> None:
    """
    End-to-end example: two sessions with a coding assistant agent.

    Session 1: Michael asks for help implementing JWT authentication
    in FastAPI. They encounter and fix a token expiry bug. The agent
    stores a learning from this experience.

    Between sessions: the Sleep Consolidation Engine runs, compressing
    episodic memories and extracting semantic facts.

    Session 2: Michael returns and asks about JWT. The agent recalls
    the previous session's context and the learning about token expiry,
    providing a better response than a stateless LLM could.
    """
    _configure_logging()
    logger = logging.getLogger(__name__)

    mc = build_memory_controller(agent_id="coding-assistant-001")
    await mc.start()

    try:
        # ---------------------------------------------------------------- #
        # SESSION 1                                                         #
        # ---------------------------------------------------------------- #
        logger.info("--- Session 1 Start ---")

        session1_id = mc._current_session_id

        obs1 = await mc.observe(
            "Michael is building a FastAPI application with JWT authentication.",
            source="user",
        )
        obs2 = await mc.observe(
            "The JWT token expiry is set to 30 minutes but users are being "
            "logged out after only 5 minutes. Suspected clock skew issue.",
            source="user",
        )
        obs3 = await mc.observe(
            "Fixed: added a 60-second leeway to the JWT decode call to "
            "handle clock skew between servers. Issue resolved.",
            source="tool_result",
        )

        # Store a learning from this session
        learning_id = await mc.learn(
            insight=(
                "When JWT tokens expire unexpectedly, check for clock skew "
                "between the issuing server and the validating server. "
                "Adding a leeway parameter to jwt.decode() resolves most cases."
            ),
            learning_type=LearningType.FAILURE_ANALYSIS,
            evidence_ids=[obs1, obs2, obs3],
            applicability_tags=["jwt", "authentication", "fastapi", "debugging"],
            confidence=0.85,
            impact_score=0.80,
        )
        logger.info("Session 1 complete. Learning stored: %s", learning_id)

        # Trigger consolidation between sessions
        await mc.new_session()
        # Give consolidation a moment to start (in real usage, hours pass)
        await asyncio.sleep(0.5)

        # ---------------------------------------------------------------- #
        # SESSION 2                                                         #
        # ---------------------------------------------------------------- #
        logger.info("--- Session 2 Start ---")

        await mc.observe(
            "Michael is back. He wants to add refresh token support to his "
            "FastAPI JWT implementation.",
            source="user",
        )

        # Retrieve memories relevant to the new task
        results = await mc.remember(
            MemoryQuery(
                query_text="JWT authentication FastAPI token issues",
                mode=QueryMode.HYBRID,
                max_results=5,
                include_graph_neighbors=True,
            )
        )

        logger.info("Session 2 retrieved %d memories:", len(results))
        for i, result in enumerate(results, 1):
            logger.info(
                "  [%d] type=%-10s score=%.3f store=%-20s content=%.80s",
                i,
                result.memory_type.value,
                result.relevance_score,
                result.source_store,
                result.content,
            )

        # Print system stats
        stats = mc.stats()
        logger.info("System stats: %s", stats)

    finally:
        await mc.stop()


if __name__ == "__main__":
    asyncio.run(run_example())

CHAPTER TWENTY-FIVE: SPEED -- THE ENGINEERING OF FAST MEMORY

A memory system that is architecturally correct but too slow to use in real time is worthless. LLM applications have strict latency requirements: users expect responses in seconds, not minutes. The retrieval system must return results in milliseconds, not seconds. This section addresses the engineering choices that make UCMA fast and describes the path from the in-memory implementation shown here to a production-scale deployment.

The most important speed decision is the choice of vector index. For semantic search over millions of embeddings, exact nearest-neighbor search is too slow -- it requires computing the distance to every stored vector. The solution is Approximate Nearest Neighbor (ANN) search using the HNSW (Hierarchical Navigable Small World) algorithm. HNSW builds a multi-layer graph where each layer is a progressively sparser subset of the data. Queries start at the top layer (sparse, fast to traverse) and progressively descend to lower layers (denser, more precise). This gives sub-millisecond search times even over tens of millions of vectors, with recall rates above 95%.

The current implementation uses exact cosine similarity search, which is correct for any store size and is the right choice for development and testing. Replacing it with HNSW requires only swapping the inner loop inUnifiedRetrievalLayer._semantic_retrieval() with a call to an ANN index. The hnswlib library provides a drop-in Python implementation:

# This replaces the inner loop in _semantic_retrieval() for the episodic store.
# Install: pip install hnswlib

import hnswlib

# Build the index once (or load from disk):
dim = cfg.embedding_dim
index = hnswlib.Index(space="cosine", dim=dim)
index.init_index(max_elements=1_000_000, ef_construction=200, M=16)

# Add all episodic embeddings to the index:
for entry in all_episodic:
    if entry.embedding:
        index.add_items(
            [entry.embedding],
            [hash(entry.entry_id) % (2**31)],  # hnswlib requires int labels
        )

# Query:
index.set_ef(50)  # Higher ef = better recall, slower query
labels, distances = index.knn_query([query_vector], k=10)
# distances are L2 in cosine space; convert: similarity = 1 - distance/2

The second speed decision is the embedding cache. The EmbeddingCache implemented in Part Seventeen eliminates redundant embedding model calls for repeated queries. In a session where the user asks several related questions, the cache hit rate typically exceeds 60%, reducing embedding API costs and latency by more than half.

The third speed decision is query routing. The QueryRouter classifies queries in microseconds using pure string matching, before any expensive operations are performed. A query that contains "yesterday" only hits the temporal index. A query that starts with "how to" only hits the procedural store. A query that starts with "what do you know about" only hits the entity index. This selective routing dramatically reduces the number of store accesses per query.

The fourth speed decision is asynchronous consolidation. The Sleep Consolidation Engine runs in a background asyncio Task and never blocks the main agent loop. Consolidation is a batch operation that can tolerate latency; retrieval cannot. By separating these concerns into different execution contexts, UCMA ensures that consolidation never adds latency to the critical path.


CHAPTER TWENTY-SIX: WHAT IS STILL MISSING -- HONEST LIMITATIONS

UCMA as described here is a substantial advance over current approaches, but intellectual honesty requires acknowledging what it does not yet solve and where future research must go.

The first open problem is importance classification. The heuristic-based importance classifier described here is fast but imprecise. Accurately predicting the importance of a piece of information is extremely hard: what seems trivial now may turn out to be critical later. A truly intelligent importance classifier would need to understand the agent's goals, the user's context, and the long-term trajectory of the work -- which is itself an LLM-level reasoning task. Running a full LLM call for every piece of incoming information is too expensive. This is an area where specialized, fine-tuned small models could make a major contribution.

The second open problem is memory interference. In human memory, similar memories can interfere with each other -- remembering one thing makes it harder to remember a related but distinct thing. UCMA does not model this. In fact, UCMA's graph-based associative retrieval could exacerbate interference by always retrieving similar memories together. A future version of UCMA would need a mechanism to detect and manage interference between memories that are similar but distinct.

The third open problem is multi-agent memory sharing. UCMA as described is a per-agent memory system. In a multi-agent system, agents need to share some memories (shared world knowledge, shared task state) while keeping others private (agent-specific strategies, private user information). Designing a principled access control and synchronization layer for shared memory in a multi-agent UCMA is a significant engineering and research challenge.

The fourth open problem is memory verification and hallucination prevention. When the agent retrieves a memory and uses it in its response, it may be retrieving a memory that was itself generated by the LLM and contains hallucinated content. UCMA stores what the agent observed and what it generated, without distinguishing between ground truth and confabulation. A future version would need a provenance system that tracks the original source of every memory and flags memories derived from LLM generation rather than external observation.

The fifth open problem is the cold start problem. A new agent with no memories is no better than a stateless LLM. Building up a rich, useful memory store takes time and interaction. Future work could explore pre-populating the semantic memory store from domain-specific knowledge bases, or using transfer learning to initialize a new agent's memory from the aggregated (anonymized) memories of many previous agents in the same domain.

The sixth and perhaps most profound open problem is the question of what memories mean to an LLM. Human memories are experienced -- they have emotional valence, they are connected to bodily sensations, they exist in the context of a continuous subjective experience. LLM memories are text stored in a database. Whether this difference matters for the quality and utility of the memory system is an open philosophical and empirical question. What we can say with confidence is that the engineering architecture described here is the right foundation for exploring it.


CHAPTER TWENTY-SEVEN: INSTALLATION FILES

# requirements.txt

# Core LLM and embedding dependencies
openai>=1.30.0
tiktoken>=0.7.0

# Numerical computing (for cosine similarity)
numpy>=1.26.0

# Structured logging
structlog>=24.1.0

# Type hint backports (for Python 3.10 compatibility if needed)
typing-extensions>=4.11.0

# Optional: ANN index for production-scale semantic search
# hnswlib>=0.8.0

# Development and testing
# pytest>=8.2.0
# pytest-asyncio>=0.23.0
# pyproject.toml

[build-system]
requires = ["setuptools>=70.0", "wheel"]
build-backend = "setuptools.backends.legacy:build"

[project]
name = "ucma"
version = "1.0.0"
description = "Unified Cognitive Memory Architecture for LLM Agents"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
    "openai>=1.30.0",
    "tiktoken>=0.7.0",
    "numpy>=1.26.0",
    "typing-extensions>=4.11.0",
    "structlog>=24.1.0",
]

[project.optional-dependencies]
ann = ["hnswlib>=0.8.0"]
dev = [
    "pytest>=8.2.0",
    "pytest-asyncio>=0.23.0",
    "mypy>=1.10.0",
    "ruff>=0.4.0",
]

[tool.setuptools.packages.find]
where = ["."]
include = ["ucma*"]

[tool.pytest.ini_options]
asyncio_mode = "auto"

[tool.mypy]
python_version = "3.11"
strict = true
ignore_missing_imports = true

Install the package in development mode with the following commands:

python -m venv .venv
source .venv/bin/activate          # On Windows: .venv\Scripts\activate
pip install -e ".[dev]"
export OPENAI_API_KEY="sk-..."
python -m ucma.main

CONCLUSION: TOWARD TRULY PERSISTENT INTELLIGENT AGENTS

We have traveled a long way from the blank slate of a stateless LLM to the rich, hierarchical, biologically-inspired architecture of UCMA. Let us take a moment to appreciate what we have built.

UCMA gives LLM agents a Sensory Buffer that handles the raw flow of incoming information without overwhelming the system. It gives them a Working Memory Manager that treats the precious context window as a managed resource rather than a dumping ground. It gives them an Episodic Memory Store with two tiers -- short-term richness and medium-term persistence -- connected by temporal indexes that make time-range queries fast. It gives them a Semantic Memory Store with a knowledge graph that supports both similarity search and relational reasoning. It gives them a Procedural Memory Store that learns and improves with every successful execution. It gives them a Memory Graph that connects all memories into a coherent network, enabling associative retrieval and causal reasoning. It gives them a Sleep Consolidation Engine that transforms raw experience into structured knowledge through compression, abstraction, reinforcement, and pruning. It gives them a Forgetting Engine that implements the Ebbinghaus forgetting curve to keep the system clean and fast. It gives them a Learning Store that accumulates wisdom from experience. And it gives them a Unified Retrieval Layer that makes all of this accessible through a single, fast, intelligent interface.

This is not a patch on top of a stateless LLM. This is a complete cognitive memory architecture that transforms a stateless text processor into a persistent, learning, remembering agent. The agent that uses UCMA does not just answer questions -- it grows. It learns from its mistakes. It remembers its users. It builds up procedural expertise. It becomes, over time, genuinely better at its job.

Karpathy's vision of the LLM OS is the right frame. We have now designed the memory management subsystem of that operating system. The CPU (the LLM) is already extraordinarily powerful. The RAM (the context window) is already well-understood. What was missing was the full memory hierarchy -- the L1 cache, the L2 cache, the virtual memory system, the disk, and the intelligent OS kernel that manages them all. UCMA is that kernel.

The amnesia problem is solvable.