CHAPTER ONE: THE AMNESIA PROBLEM
Every conversation with a large language model begins the same way: with total amnesia. The model does not remember you. It does not remember what you discussed yesterday, last week, or five minutes ago in a different session. It does not remember that you prefer concise answers, that you are building a healthcare application, that you spent three hours debugging a particular piece of code together, or that you once told it your name. Each new context window is a blank slate, a mind wiped clean, a patient waking up from surgery with no recollection of the life lived before.
This is not a small inconvenience. It is a fundamental architectural liability that prevents LLMs from becoming truly intelligent, persistent agents. Humans are defined by their memories. Our identity, our skills, our relationships, our accumulated wisdom -- all of it is memory. An agent without memory is not an agent at all. It is a very sophisticated lookup table.
The irony is profound: LLMs are trained on essentially all of human knowledge, yet they cannot remember what happened in the last conversation. They know the entire history of Rome but cannot recall that you asked them about Rome yesterday. They understand the theory of episodic memory in neuroscience but have no episodic memory of their own.
This article is about fixing that. Not with a patch, not with a hack, but with a complete, principled, biologically-inspired, engineered-for-speed memory system that gives LLM applications and agentic systems a genuine cognitive memory architecture. We will call this system the Unified Cognitive Memory Architecture, or UCMA.
We will build it from the ground up, drawing on neuroscience, computer science, information retrieval theory, and the brilliant conceptual framework that Andrej Karpathy has articulated in his vision of the LLM Operating System. We will look at what exists today, dissect its failures, and then construct something far more powerful.
CHAPTER TWO: KARPATHY'S LLM OS -- THE CONCEPTUAL FOUNDATION
Before we build anything, we need the right mental model. Andrej Karpathy, one of the founding researchers of modern deep learning and a key figure in the development of GPT systems, has articulated a vision that reframes how we should think about LLMs entirely. Rather than seeing a language model as a text completion engine, Karpathy proposes seeing it as a CPU -- the central processing unit of a new kind of operating system.
In this LLM OS framework, the context window of the model is analogous to RAM -- fast, limited, immediately accessible working memory. The model weights themselves are analogous to a kind of slow, baked-in long-term memory -- the accumulated knowledge from training, frozen in place. External storage systems -- databases, vector stores, file systems -- are analogous to disk storage: large, persistent, but requiring explicit read and write operations to access.
This analogy is extraordinarily useful because it immediately reveals the architectural gaps. A real operating system has sophisticated memory management: virtual memory, paging, caching hierarchies (L1, L2, L3 cache), memory-mapped files, garbage collection, and swap space. The naive LLM application has none of this. It has RAM (the context window) and maybe a hard disk (a vector database), with nothing in between and no intelligent management layer connecting them.
Karpathy's insight points us toward what we need to build: a full memory hierarchy for LLM systems, with the same sophistication that operating system designers have brought to hardware memory management over the past sixty years. The difference is that our memory system must deal not with bytes and cache lines but with semantic meaning, temporal context, emotional salience, causal relationships, and procedural knowledge.
The LLM OS framing also clarifies the role of the agent. An agent in this system is like a process running on the operating system. It has access to the CPU (the LLM), it can read and write to memory (our UCMA), it can call system functions (tools), and it can spawn child processes (sub-agents). The memory system is the kernel-level infrastructure that makes all of this coherent and persistent.
CHAPTER THREE: THE NEUROSCIENCE OF MEMORY -- WHAT WE ARE ENGINEERING
Human memory is not a single system. Neuroscience has identified at least five distinct memory systems, each with different storage mechanisms, time scales, and retrieval characteristics. Understanding these systems is not an academic exercise -- it is the blueprint for our architecture.
Sensory memory is the briefest form: the persistence of sensory impressions for a fraction of a second after the stimulus has ended. It is the reason you can follow a moving object even when it briefly disappears. In our system, the Sensory Buffer plays this role: a high-throughput, low-latency holding area for raw input.
Working memory (short-term memory) is the conscious workspace: the roughly seven items that you can hold in mind simultaneously. It is fast, limited, and volatile. In our system, the Working Memory Manager corresponds to this: the managed context window that the LLM actively reasons over.
Episodic memory stores specific events and experiences, tagged with temporal and contextual metadata. "I remember the conversation I had with Michael on Tuesday about JWT authentication." Episodic memories are rich, contextual, and time-stamped. In our system, the Episodic Memory Store handles this.
Semantic memory stores facts, knowledge, and general truths about the world. "JWT tokens expire. FastAPI uses dependency injection." Semantic memories have no "when" -- they are timeless propositions. In our system, the Semantic Memory Store handles this, augmented with a knowledge graph.
Procedural memory stores knowledge of how to do things: skills, habits, workflows. "To implement JWT authentication in FastAPI, follow these steps..." Procedural memories are retrieved not by their content but by the situation that calls for them. In our system, the Procedural Memory Store handles this.
Hermann Ebbinghaus, the nineteenth-century psychologist who first studied memory scientifically, discovered the forgetting curve: memories decay exponentially over time unless they are reinforced. But reinforcement resets the decay clock and makes the memory more resistant to future forgetting. This is the basis of spaced repetition learning. Forgetting is not a failure of the memory system -- it is a feature. Forgetting unimportant information is what allows the system to remain fast and focused.
Our UCMA will implement all of these systems. Every one of them has a direct engineering analog.
CHAPTER FOUR: THE GRAVEYARD OF CURRENT APPROACHES
Before we build the cathedral, let us walk through the ruins of what has been tried before. Understanding why existing approaches fail is essential to understanding why UCMA is designed the way it is.
The simplest approach is conversation buffer memory. You simply concatenate all previous messages into the context window. This is what most chatbot applications do. It works fine for short conversations but fails catastrophically as conversations grow. The context window fills up, older messages are truncated, and the system loses access to early context. There is no prioritization, no summarization, no persistence across sessions. This is not a memory system -- it is a scrolling text buffer.
The next step up is summary memory. When the conversation buffer gets too long, a secondary LLM call summarizes the older portion and replaces it with a compressed version. This is better -- it extends the effective memory horizon -- but it introduces serious problems. Summarization is lossy. Details that seemed unimportant at summarization time may turn out to be critical later. The summary is a flat blob of text with no structure, no retrievability by topic, and no temporal indexing. You cannot ask "what did we discuss about authentication?" and get a precise answer -- you get whatever the summary happened to preserve. Furthermore, summary memory is still session-scoped. When the session ends, the summary is typically discarded.
Vector database RAG (Retrieval Augmented Generation) is the approach that took the industry by storm around 2022 and 2023. The idea is elegant: convert documents or memories into dense vector embeddings using an embedding model, store them in a vector database like Pinecone, Weaviate, Chroma, or Qdrant, and at query time, embed the current query and retrieve the most semantically similar stored vectors. This gives you a form of long-term semantic memory that persists across sessions and scales to millions of documents.
RAG is genuinely powerful, but it has serious architectural limitations that prevent it from serving as a complete memory system. It only supports one type of retrieval -- semantic similarity -- and gives you no good mechanism for time-range queries, entity-specific lookups, or procedural knowledge retrieval. RAG treats all memories as equally important, so a throwaway comment and a critical insight are stored and retrieved with equal weight. RAG has no concept of time -- it does not know that recent memories are generally more relevant than old ones, or that some memories should decay while others should be reinforced. RAG has no consolidation mechanism -- it just accumulates chunks indefinitely, growing slower and noisier over time. And perhaps most importantly, RAG retrieves isolated chunks rather than connected knowledge. It cannot follow causal chains, temporal sequences, or conceptual hierarchies.
MemGPT, developed by researchers at UC Berkeley and later commercialized as Letta, takes a different and more sophisticated approach. It treats the LLM context window as a form of RAM and implements explicit paging: a main context (always in the window), a recall storage (searchable conversation history), and archival storage (long-term vector storage). The LLM itself decides what to page in and out, using special function calls. This is clever and much closer to the right architecture, but it has its own limitations. The LLM must spend tokens deciding what to remember and what to retrieve, which is expensive and slow. The system still lacks true multi-tier memory with different time scales, lacks sleep consolidation, lacks a forgetting curve, and lacks procedural memory.
Commercial systems like Zep and Mem0 add user-level memory persistence, entity extraction, and some degree of memory management. They are useful products but are fundamentally RAG systems with additional preprocessing layers. They do not solve the fundamental architectural problems.
What is universally missing across all current approaches is a unified, hierarchical, graph-structured memory system with multiple specialized stores, intelligent routing, temporal decay with importance weighting, sleep-phase consolidation, procedural memory, and a retrieval layer that can answer not just "what is semantically similar to this query" but also "what happened recently," "what do I know how to do," "what have I learned from past mistakes," and "what is the causal chain leading to this outcome."
That is what we are going to build.
CHAPTER FIVE: THE UNIFIED COGNITIVE MEMORY ARCHITECTURE -- OVERVIEW
UCMA is organized as a set of specialized memory stores connected by a Memory Graph, managed by a Memory Controller, and served through a Unified Retrieval Layer. The following diagram shows the complete architecture:
+========================================================================+
| LLM AGENT / APPLICATION |
+========================================================================+
|
+-------------+-------------+
| MEMORY CONTROLLER |
| (orchestrates all memory |
| operations) |
+-------------+-------------+
|
+--------+--------+----------+---------+--------+--------+
| | | | | | |
+----+----+ +-+-----+ ++---------++ +-------+-+ +----+----+ +-+-------+
|SENSORY | |WORKING| | EPISODIC | |SEMANTIC | |PROCEDUR | |LEARNING |
|BUFFER | |MEMORY | | STORE | |STORE | |AL STORE | |STORE |
| | | | | | | | | | | |
|Ring buf | |3-zone | |Short-term | |Facts + | |Workflow | |Insights |
|Fast | |context| |+ Medium- | |KG + | |templates| |Lessons |
|ingest | |window | |term tiers | |Entity | |Success | |Patterns |
|Auto- | |mgmt | |Temporal | |index | |rates | |Validated|
|evict | | | |index | | | | | | |
+----+----+ +-+-----+ ++---------++ +-------+-+ +----+----+ +-+-------+
| | | | | | |
+--------+--------+----------+---------+--------+--------+
|
+-------------+-------------+
| MEMORY GRAPH |
| Nodes: all memories |
| Edges: typed relations |
| PageRank importance |
| Graph traversal |
+-------------+-------------+
|
+-----------------------+-----------------------+
| |
+---------+---------+ +------------+------------+
| UNIFIED RETRIEVAL | | SLEEP CONSOLIDATION |
| LAYER | | ENGINE (background) |
| | | |
|Semantic (ANN/HNSW)| |Compress episodes |
|Structured indexes | |Abstract to semantic |
|Temporal buckets | |Reinforce via Hebb |
|Graph traversal | |Prune via Ebbinghaus |
|Hybrid fusion | +------------+------------+
|Multi-factor rank | |
+-------------------+ +------------+------------+
| FORGETTING ENGINE |
| |
|Ebbinghaus curve |
|Stability tracking |
|Importance protection |
|LRU + PageRank guard |
+-------------------------+
DATA FLOW -- WRITE PATH:
Raw Input --> Sensory Buffer --> Memory Controller
--> Importance Classifier
--> [if important] Episodic Store (short-term)
--> Embedding Model --> vector stored with entry
--> Memory Graph (new node registered)
--> [async] Sleep Engine --> compress, abstract, reinforce, prune
DATA FLOW -- READ PATH:
Agent Query --> Memory Controller --> Query Router
--> [semantic] Embedding Model --> ANN search --> Episodic + Semantic
--> [structured] Entity/Tag index --> Semantic + Procedural
--> [temporal] Bucket index --> Episodic
--> [graph] Graph traversal --> any type
--> Result Fusion --> Deduplication --> Multi-factor Ranking
--> Top-K results --> Working Memory (staging zone) --> LLM Context
SLEEP CYCLE (background, periodic):
Timer fires --> SleepConsolidationEngine
--> expire_short_term() --> compress high-importance entries
--> abstract_to_semantic() --> extract facts via LLM
--> reinforce_connections() --> update PageRank, strengthen edges
--> prune_forgotten() --> ForgettingEngine.run_forgetting_pass()
--> cleanup orphaned graph nodes + temporal index entries
CHAPTER SIX: MODULE STRUCTURE AND INSTALLATION
Before diving into the code, let us establish the module structure and installation requirements. UCMA is organized as a Python package with one module per major component. The structure is as follows:
ucma/
|-- __init__.py
|-- config.py # Central configuration
|-- models.py # All dataclasses and enums
|-- sensory_buffer.py # Sensory Buffer
|-- working_memory.py # Working Memory Manager
|-- episodic_store.py # Episodic Memory Store
|-- semantic_store.py # Semantic Memory Store
|-- procedural_store.py # Procedural Memory Store
|-- learning_store.py # Learning Store
|-- memory_graph.py # Memory Graph
|-- retrieval.py # Unified Retrieval Layer
|-- query_router.py # Query Router (heuristic classifier)
|-- retention.py # Ebbinghaus Retention State
|-- forgetting.py # Forgetting Engine
|-- consolidation.py # Sleep Consolidation Engine
|-- controller.py # Memory Controller (top-level orchestrator)
|-- embedding_cache.py # LRU embedding cache
|-- providers/
| |-- __init__.py
| |-- base.py # Protocol definitions for LLM providers
| |-- openai_provider.py # OpenAI concrete implementation
|-- main.py # Wiring and example usage
requirements.txt
pyproject.toml
CHAPTER SEVEN: CONFIGURATION
All tunable parameters live in a single configuration dataclass. This eliminates magic numbers scattered throughout the codebase and makes the system easy to tune for different deployment scenarios. A customer service agent might want aggressive forgetting and a small context window; a long-running research agent might want conservative forgetting and a large context window.
# ucma/config.py
"""
Central configuration for the Unified Cognitive Memory Architecture.
All tunable parameters are collected here. Pass a UCMAConfig instance
to every component that needs configuration. This eliminates magic
numbers scattered throughout the codebase and makes the system easy
to tune for different deployment scenarios without touching any
component code.
"""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class UCMAConfig:
"""
Unified configuration for all UCMA components.
All numeric thresholds, budgets, and tuning knobs live here.
Defaults are calibrated for a general-purpose coding assistant
agent with moderate memory requirements.
"""
# ------------------------------------------------------------------ #
# Model identity #
# ------------------------------------------------------------------ #
model_name: str = "gpt-4o-mini"
embedding_model: str = "text-embedding-3-small"
embedding_dim: int = 1536
# ------------------------------------------------------------------ #
# Sensory Buffer #
# ------------------------------------------------------------------ #
sensory_buffer_capacity: int = 512
# ------------------------------------------------------------------ #
# Working Memory #
# ------------------------------------------------------------------ #
working_memory_system_tokens: int = 2048
working_memory_active_tokens: int = 6144
working_memory_staging_tokens: int = 512
# ------------------------------------------------------------------ #
# Episodic Store #
# ------------------------------------------------------------------ #
# How long entries stay in the short-term tier before expiry check
short_term_ttl_hours: float = 48.0
# Minimum importance to survive compression into medium-term
compression_importance_base: float = 0.30
# Maximum additional threshold penalty when many entries expire at once
compression_volume_penalty_max: float = 0.40
# ------------------------------------------------------------------ #
# Sleep Consolidation Engine #
# ------------------------------------------------------------------ #
consolidation_interval_seconds: float = 3600.0 # Every hour
consolidation_summary_max_tokens: int = 150
consolidation_fact_extraction_batch: int = 10
# ------------------------------------------------------------------ #
# Memory Graph #
# ------------------------------------------------------------------ #
pagerank_damping: float = 0.85
pagerank_iterations: int = 20
# ------------------------------------------------------------------ #
# Embedding Cache #
# ------------------------------------------------------------------ #
embedding_cache_max_size: int = 256
# ------------------------------------------------------------------ #
# Retrieval #
# ------------------------------------------------------------------ #
retrieval_semantic_min_similarity: float = 0.50
retrieval_max_results: int = 10
retrieval_graph_expansion_hops: int = 1
retrieval_graph_expansion_min_weight: float = 0.50
retrieval_graph_expansion_top_k: int = 3
# Multi-factor ranking weights (should sum to 1.0)
rank_weight_relevance: float = 0.40
rank_weight_importance: float = 0.25
rank_weight_recency: float = 0.20
rank_weight_retention: float = 0.15
rank_recency_halflife_days: float = 7.0
# ------------------------------------------------------------------ #
# Forgetting Engine #
# ------------------------------------------------------------------ #
forgetting_retention_threshold: float = 0.10
forgetting_importance_protection: float = 0.01
# Stability boost per reinforcement (base, before spacing multiplier)
retention_stability_boost_base: float = 0.50
CHAPTER EIGHT: THE DATA MODELS
All shared data structures live in a single models.py module. Using a single models file prevents circular imports and gives every component a common vocabulary. The dataclasses are intentionally lightweight -- they carry metadata for routing and ranking, while the full content lives in the appropriate typed store.
# ucma/models.py
"""
Shared data models for the Unified Cognitive Memory Architecture.
All dataclasses and enumerations used across UCMA components are defined
here. Keeping all models in one module prevents circular imports and
gives every component a common vocabulary.
"""
from __future__ import annotations
import hashlib
import time
import uuid
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
class MemoryType(str, Enum):
"""The type of a memory node in the system."""
EPISODIC = "episodic"
SEMANTIC = "semantic"
PROCEDURAL = "procedural"
LEARNING = "learning"
class QueryMode(str, Enum):
"""The retrieval strategy to use for a memory query."""
SEMANTIC = "semantic" # Dense vector similarity search
STRUCTURED = "structured" # Metadata / index-based filtering
HYBRID = "hybrid" # Semantic + structured combined
GRAPH = "graph" # Graph traversal from a seed node
TEMPORAL = "temporal" # Time-range retrieval via bucket index
class ProcedureStatus(str, Enum):
"""Lifecycle state of a procedural memory."""
DRAFT = "draft" # Newly created, not yet validated
ACTIVE = "active" # Validated and in production use
DEPRECATED = "deprecated" # Superseded by a better procedure
class LearningType(str, Enum):
"""The category of a learning record."""
FAILURE_ANALYSIS = "failure_analysis" # What went wrong and why
SUCCESS_PATTERN = "success_pattern" # What worked and why
STRATEGY_UPDATE = "strategy_update" # How to approach a problem class
SELF_CORRECTION = "self_correction" # Agent correcting its own behavior
USER_PREFERENCE = "user_preference" # Learned preferences of the user
DOMAIN_INSIGHT = "domain_insight" # New understanding of the domain
# ---------------------------------------------------------------------------
# Sensory Layer
# ---------------------------------------------------------------------------
@dataclass
class SensoryEntry:
"""
A single raw input event entering the memory system.
This is the most primitive unit of information in UCMA. The entry
is intentionally lightweight -- it carries raw content and minimal
metadata -- because most entries will be discarded before expensive
processing is applied.
"""
content: str
source: str # "user", "tool", "environment", "system"
modality: str = "text" # "text", "image", "audio", "structured"
entry_id: str = field(
default_factory=lambda: str(uuid.uuid4())
)
timestamp: float = field(default_factory=time.time)
raw_token_estimate: Optional[int] = None
# ---------------------------------------------------------------------------
# Episodic Memory
# ---------------------------------------------------------------------------
@dataclass
class EpisodicEntry:
"""
A single episodic memory: a record of something that happened.
Episodic entries are the richest and most contextually detailed
memories in the system. They answer the questions: what happened,
when did it happen, who was involved, and why did it matter?
"""
entry_id: str
session_id: str
agent_id: str
timestamp: float
content: str
importance: float
participants: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)
summary: Optional[str] = None
embedding: Optional[list[float]] = None
tier: str = "short_term" # "short_term" or "medium_term"
access_count: int = 0
last_accessed: float = field(default_factory=time.time)
def touch(self) -> None:
"""Record an access event on this entry."""
self.access_count += 1
self.last_accessed = time.time()
# ---------------------------------------------------------------------------
# Semantic Memory
# ---------------------------------------------------------------------------
@dataclass
class SemanticFact:
"""
A single semantic memory: a timeless fact or piece of knowledge.
Semantic facts are extracted from episodes during consolidation or
can be directly asserted by the agent or user. They are stored as
subject-predicate-object triples to enable structured querying and
knowledge graph construction.
"""
subject: str
predicate: str
object_value: str
content: str # Human-readable form of the triple
confidence: float
source_episode_ids: list[str] = field(default_factory=list)
embedding: Optional[list[float]] = None
fact_id: str = field(default_factory=lambda: str(uuid.uuid4()))
created_at: float = field(default_factory=time.time)
last_verified: float = field(default_factory=time.time)
access_count: int = 0
importance: float = 0.5
@staticmethod
def make_deterministic_id(
subject: str, predicate: str, object_value: str
) -> str:
"""
Generate a deterministic ID from the fact's SPO triple.
When the same fact is derived from multiple episodes, it always
maps to the same ID. Repeated assertions increase confidence
rather than creating duplicate entries, keeping the store clean.
"""
key = f"{subject.lower()}|{predicate.lower()}|{object_value.lower()}"
return hashlib.sha256(key.encode()).hexdigest()[:32]
def reinforce(self, source_episode_id: str) -> None:
"""
Increase confidence and add a new source episode.
Called when the same fact is derived from a new episode.
"""
if source_episode_id not in self.source_episode_ids:
self.source_episode_ids.append(source_episode_id)
self.confidence = min(1.0, self.confidence + 0.05)
self.last_verified = time.time()
def touch(self) -> None:
"""Record an access event."""
self.access_count += 1
@dataclass
class KnowledgeGraphEdge:
"""
A directed edge in the knowledge graph connecting two entities.
Edges represent relationships between concepts, people, systems,
and any other entities that appear in the agent's experience.
"""
source_entity: str
relationship: str
target_entity: str
weight: float = 1.0
created_at: float = field(default_factory=time.time)
# ---------------------------------------------------------------------------
# Procedural Memory
# ---------------------------------------------------------------------------
@dataclass
class ProcedureStep:
"""
A single step within a procedural memory.
Steps can be tool calls, LLM reasoning steps, or conditional
branches. Each step carries its own success criteria so the agent
can verify completion before proceeding to the next step.
"""
step_id: str
description: str
action_type: str # "tool_call", "llm_reasoning", "condition"
action_spec: dict
success_criteria: str # How to verify this step succeeded
on_failure: str = "retry" # "retry", "skip", "abort", "escalate"
max_retries: int = 3
@dataclass
class ProceduralMemory:
"""
A stored procedure: a reusable, parameterized workflow that the
agent has learned to execute for a class of situations.
"""
procedure_id: str
name: str
description: str
trigger_tags: list[str] # Situation tags that activate this procedure
steps: list[ProcedureStep]
status: ProcedureStatus = ProcedureStatus.DRAFT
success_rate: float = 0.0
execution_count: int = 0
created_at: float = field(default_factory=time.time)
last_used: Optional[float] = None
def update_statistics(self, success: bool) -> None:
"""
Update success rate using an incremental running average.
Uses floating-point arithmetic throughout to avoid integer
division truncation. The formula is the standard online
mean update: new_mean = old_mean + (new_value - old_mean) / n
"""
self.execution_count += 1
n = float(self.execution_count)
new_value = 1.0 if success else 0.0
self.success_rate = self.success_rate + (new_value - self.success_rate) / n
self.last_used = time.time()
# ---------------------------------------------------------------------------
# Learning Records
# ---------------------------------------------------------------------------
@dataclass
class LearningRecord:
"""
A single learning: a distilled insight derived from experience.
Learnings are the highest-level, most abstract form of memory in
UCMA. They represent the agent's growing wisdom about how to operate
effectively in its environment.
"""
learning_id: str
learning_type: LearningType
title: str
insight: str
evidence: list[str] # IDs of supporting episodes/facts
applicability: list[str] # Tags describing when this applies
confidence: float
impact_score: float
created_at: float = field(default_factory=time.time)
last_applied: Optional[float] = None
application_count: int = 0
validated: bool = False
def apply(self) -> None:
"""Record that this learning was applied in a decision."""
self.last_applied = time.time()
self.application_count += 1
def validate(self, success: bool) -> None:
"""
Update confidence based on whether applying this learning
led to a successful outcome.
The asymmetric update (larger penalty for failure than reward
for success) makes the system conservative: it requires multiple
successes to build confidence but only a few failures to erode
it. This prevents overconfident bad habits from forming.
"""
if success:
self.confidence = min(1.0, self.confidence + 0.10)
self.validated = True
else:
self.confidence = max(0.0, self.confidence - 0.15)
# ---------------------------------------------------------------------------
# Memory Graph Structures
# ---------------------------------------------------------------------------
@dataclass
class MemoryNode:
"""
A node in the Memory Graph.
Each node represents a memory of any type, identified by its ID
and type tag. The node carries lightweight metadata for graph
operations; the full memory content lives in the appropriate
typed store.
"""
node_id: str
memory_type: MemoryType
importance: float
created_at: float
last_accessed: float
access_count: int = 0
@dataclass
class MemoryEdge:
"""
A directed, weighted, typed edge between two memory nodes.
The edge type captures the semantic relationship between memories,
enabling structured traversal and causal reasoning.
"""
source_id: str
target_id: str
edge_type: str # "caused_by", "followed_by", "supports", "derived_from", etc.
weight: float = 1.0
created_at: float = field(default_factory=time.time)
# ---------------------------------------------------------------------------
# Retrieval Structures
# ---------------------------------------------------------------------------
@dataclass
class MemoryQuery:
"""
A unified query object that can express any type of memory retrieval
request. The Unified Retrieval Layer interprets this object and
routes it to the appropriate stores and indexes.
"""
query_text: str
mode: QueryMode = QueryMode.HYBRID
max_results: int = 10
memory_types: Optional[list[MemoryType]] = None
time_range: Optional[tuple[float, float]] = None
entity_filter: Optional[str] = None
tag_filter: Optional[list[str]] = None
seed_node_id: Optional[str] = None
min_importance: float = 0.0
include_graph_neighbors: bool = True
@dataclass
class RetrievalResult:
"""
A single result from a memory retrieval operation.
Results are typed and ranked, carrying enough metadata for the
agent to understand the provenance and reliability of each
retrieved memory.
"""
memory_id: str
memory_type: MemoryType
content: str
relevance_score: float
importance: float
source_store: str
timestamp: Optional[float] = None
graph_distance: int = 0 # Hops from query seed (0 = direct match)
CHAPTER NINE: THE SENSORY BUFFER
The Sensory Buffer is the entry point for all new information. It is a high-throughput, low-latency, short-lived buffer that holds raw, unprocessed input while the Memory Controller performs initial classification and routing. Most entries in the Sensory Buffer never make it to longer-term storage -- they are transient observations that are processed and discarded. Only information that the Memory Controller flags as worth retaining moves forward in the pipeline.
The buffer is implemented as a thread-safe ring buffer using Python's deque with a fixed maxlen. When the buffer is full, the oldest entry is automatically evicted to make room for the newest one. This gives us O(1) insertion and O(1) eviction with no manual management required.
# ucma/sensory_buffer.py
"""
Sensory Buffer: the high-throughput entry point for all incoming information.
The Sensory Buffer is intentionally simple. Its job is to receive, not to
think. Intelligence begins in the Memory Controller, which drains the buffer
and decides what to do with each entry.
"""
from __future__ import annotations
import asyncio
import logging
from collections import deque
from ucma.models import SensoryEntry
logger = logging.getLogger(__name__)
class SensoryBuffer:
"""
A fixed-capacity, thread-safe ring buffer for incoming sensory information.
Operates in O(1) for both insertion and eviction. The asyncio.Lock
ensures correctness when multiple coroutines ingest data concurrently,
which is common in multi-tool agent loops where tool results and user
messages can arrive simultaneously.
"""
def __init__(self, capacity: int = 512) -> None:
# deque with maxlen gives automatic ring-buffer eviction:
# when full, appending a new item silently drops the oldest.
self._buffer: deque[SensoryEntry] = deque(maxlen=capacity)
self._capacity = capacity
self._lock = asyncio.Lock()
self._total_ingested: int = 0 # Monotonic counter for observability
async def ingest(
self,
content: str,
source: str,
modality: str = "text",
) -> SensoryEntry:
"""
Accept a new piece of raw input and place it in the buffer.
Returns the created SensoryEntry so the caller can track its
entry_id if needed for later correlation. This method is async
to allow it to be awaited in coroutine-based agent loops without
blocking the event loop.
"""
entry = SensoryEntry(content=content, source=source, modality=modality)
async with self._lock:
self._buffer.append(entry)
self._total_ingested += 1
logger.debug(
"Sensory ingest: source=%s modality=%s id=%s",
source,
modality,
entry.entry_id,
)
return entry
async def drain(self) -> list[SensoryEntry]:
"""
Atomically remove and return all current entries for processing.
After draining, the buffer is empty and ready for new input.
The atomic swap (copy then clear) under the lock ensures that
no entries are lost if ingest() is called concurrently.
"""
async with self._lock:
entries = list(self._buffer)
self._buffer.clear()
return entries
@property
def size(self) -> int:
"""Current number of entries in the buffer (approximate, no lock)."""
return len(self._buffer)
@property
def total_ingested(self) -> int:
"""Total entries ever ingested (monotonic, for metrics)."""
return self._total_ingested
CHAPTER TEN: WORKING MEMORY
Working Memory in UCMA corresponds directly to the LLM's context window, but it is managed rather than naive. The context window is treated as a precious, limited resource and managed with the same care that an operating system manages RAM. The Working Memory Manager divides the context into three zones, enforces token budgets for each zone, and makes principled eviction decisions when space is needed.
The eviction policy combines importance and recency into a single score. An important memory that was accessed recently gets the highest score and is the last to be evicted. An unimportant memory that has not been accessed in a long time gets the lowest score and is the first to go. This is a principled, tunable policy that can be adjusted by changing the relative weights.
# ucma/working_memory.py
"""
Working Memory Manager: treats the LLM context window as a managed resource.
The context window is divided into three zones with independent token budgets.
The system zone holds core instructions and is never evicted. The active zone
holds dynamic working content and is evicted using importance-weighted LRU.
The staging zone holds recently retrieved memories awaiting promotion.
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
import tiktoken
from ucma.config import UCMAConfig
logger = logging.getLogger(__name__)
@dataclass
class WorkingMemoryItem:
"""
A single item occupying space in the context window.
Items have a zone assignment, a token cost, and an importance score
that determines their eviction priority.
"""
item_id: str
content: str
zone: str # "system", "active", or "staging"
token_count: int
importance: float # 0.0 to 1.0
inserted_at: float = field(default_factory=time.time)
last_accessed: float = field(default_factory=time.time)
def touch(self) -> None:
"""Record an access, resetting the recency clock."""
self.last_accessed = time.time()
class WorkingMemoryManager:
"""
Manages the LLM context window as a structured, budgeted resource.
Enforces per-zone token budgets and makes principled eviction decisions
when space is needed. The system zone is always protected; only the
active and staging zones participate in eviction.
"""
def __init__(self, config: UCMAConfig) -> None:
self._zone_budgets: dict[str, int] = {
"system": config.working_memory_system_tokens,
"active": config.working_memory_active_tokens,
"staging": config.working_memory_staging_tokens,
}
# Load the tokenizer, falling back to cl100k_base for unknown models.
# tiktoken raises KeyError for models it does not recognize.
try:
self._encoder = tiktoken.encoding_for_model(config.model_name)
except KeyError:
logger.warning(
"Unknown model '%s' for tiktoken; falling back to cl100k_base.",
config.model_name,
)
self._encoder = tiktoken.get_encoding("cl100k_base")
self._items: dict[str, WorkingMemoryItem] = {}
self._zone_usage: dict[str, int] = {z: 0 for z in self._zone_budgets}
self._lock = asyncio.Lock()
def _count_tokens(self, text: str) -> int:
"""Count tokens in a string using the loaded tiktoken encoder."""
return len(self._encoder.encode(text))
async def add(
self,
item_id: str,
content: str,
zone: str,
importance: float,
) -> bool:
"""
Add an item to the specified zone.
Returns True if the item was added, False if it could not fit
even after eviction. The system zone never evicts; if a system
item does not fit, it is rejected immediately.
"""
token_count = self._count_tokens(content)
async with self._lock:
# If item already exists, update it in place
if item_id in self._items:
old = self._items[item_id]
self._zone_usage[old.zone] -= old.token_count
old.content = content
old.token_count = token_count
old.importance = importance
old.touch()
self._zone_usage[old.zone] += token_count
return True
# Evict if needed (system zone is never evicted)
if zone != "system":
await self._evict_if_needed(zone, token_count)
budget = self._zone_budgets[zone]
if self._zone_usage[zone] + token_count > budget:
logger.warning(
"Cannot fit item %s (%d tokens) in zone '%s' "
"(used=%d, budget=%d).",
item_id, token_count, zone,
self._zone_usage[zone], budget,
)
return False
item = WorkingMemoryItem(
item_id=item_id,
content=content,
zone=zone,
token_count=token_count,
importance=importance,
)
self._items[item_id] = item
self._zone_usage[zone] += token_count
return True
async def _evict_if_needed(self, zone: str, needed_tokens: int) -> None:
"""
Evict the lowest-priority items from a zone until there is room.
Priority score = importance * 0.7 + recency_score * 0.3
where recency_score decays with time since last access.
Items are evicted in ascending priority order (lowest first).
Note: called while self._lock is already held.
"""
budget = self._zone_budgets[zone]
now = time.time()
while self._zone_usage[zone] + needed_tokens > budget:
# Snapshot to avoid modifying dict during iteration
candidates = [
item for item in list(self._items.values())
if item.zone == zone
]
if not candidates:
break
def priority(item: WorkingMemoryItem) -> float:
age_hours = (now - item.last_accessed) / 3600.0
recency = 1.0 / (1.0 + age_hours)
return item.importance * 0.7 + recency * 0.3
# Evict the item with the lowest priority
victim = min(candidates, key=priority)
self._zone_usage[zone] -= victim.token_count
del self._items[victim.item_id]
logger.debug(
"Evicted item %s from zone '%s' (importance=%.2f).",
victim.item_id, zone, victim.importance,
)
async def get(self, item_id: str) -> WorkingMemoryItem | None:
"""Retrieve an item by ID and update its access time."""
async with self._lock:
item = self._items.get(item_id)
if item:
item.touch()
return item
async def clear_zone(self, zone: str) -> int:
"""
Remove all items from a zone. Returns the number of items removed.
Used by new_session() to clear the active zone between sessions.
"""
async with self._lock:
to_remove = [
item_id
for item_id, item in self._items.items()
if item.zone == zone
]
for item_id in to_remove:
item = self._items.pop(item_id)
self._zone_usage[zone] -= item.token_count
return len(to_remove)
def render_context(self) -> str:
"""
Produce the full context string to be sent to the LLM.
Zones are rendered in order: system, active, staging. Empty
zones are skipped to avoid producing blank sections. Items
within each zone are sorted by importance (highest first).
"""
sections: list[str] = []
for zone in ("system", "active", "staging"):
zone_items = sorted(
[item for item in self._items.values() if item.zone == zone],
key=lambda i: i.importance,
reverse=True,
)
if zone_items:
sections.append(
"\n".join(item.content for item in zone_items)
)
return "\n\n".join(sections)
def usage_summary(self) -> dict[str, object]:
"""Return a summary of token usage per zone for observability."""
return {
zone: {
"used": self._zone_usage[zone],
"budget": self._zone_budgets[zone],
"pct": round(
100.0 * self._zone_usage[zone] / self._zone_budgets[zone], 1
),
}
for zone in self._zone_budgets
}
CHAPTER ELEVEN: THE RETENTION MODEL
The Ebbinghaus forgetting curve is the mathematical heart of UCMA's forgetting system. Before we build the Forgetting Engine, we need the retention model that it operates on. Every memory in the system has an associatedMemoryRetentionState that tracks its stability and last review time. The stability increases each time the memory is accessed, making it more resistant to future decay. This is the mechanism behind spaced repetition learning systems.
The retention formula is: R = exp(-t / S) where R is retention (0 to 1), t is elapsed time in days since last review, and S is stability (a positive float that grows with each reinforcement). A stability of 1.0 means the memory decays to about 37% after one day. A stability of 7.0 means it decays to about 37% after one week.
# ucma/retention.py
"""
Ebbinghaus Retention Model for the Unified Cognitive Memory Architecture.
Every memory in UCMA has an associated MemoryRetentionState. The retention
model implements the Ebbinghaus forgetting curve with SuperMemo-inspired
stability updates. Stability grows with each review, and the growth is
proportional to the elapsed time since the last review (the spacing effect).
"""
from __future__ import annotations
import math
import time
from dataclasses import dataclass, field
@dataclass
class MemoryRetentionState:
"""
Tracks the Ebbinghaus retention state for a single memory.
The retention formula is: R = exp(-t / S)
where R is retention (0-1), t is elapsed time in days since last
review, and S is stability (a positive float that grows with each
reinforcement). Higher stability means slower decay.
"""
memory_id: str
stability: float = 1.0 # Days until retention drops to ~37%
last_reviewed: float = field(default_factory=time.time)
review_count: int = 0
def current_retention(self) -> float:
"""
Compute current memory retention using the Ebbinghaus formula.
Returns a value in [0.0, 1.0]. A value below the forgetting
threshold means the memory is a candidate for deletion.
Guards against stability=0 (division by zero) by clamping
stability to a minimum of 1e-6.
"""
elapsed_days = (time.time() - self.last_reviewed) / 86400.0
safe_stability = max(self.stability, 1e-6)
return math.exp(-elapsed_days / safe_stability)
def reinforce(self, boost_base: float = 0.5) -> None:
"""
Record a review event and increase stability.
The stability boost is proportional to the elapsed time since
the last review (the spacing effect): reviewing a memory after
a long gap increases stability more than reviewing it immediately.
This matches the empirical finding that spaced repetition is
more effective than massed practice.
"""
elapsed_days = (time.time() - self.last_reviewed) / 86400.0
# Spacing multiplier: at least 1.0, grows with elapsed time
spacing_multiplier = max(1.0, elapsed_days)
self.stability += boost_base * spacing_multiplier
self.last_reviewed = time.time()
self.review_count += 1
class RetentionRegistry:
"""
A registry mapping memory IDs to their retention states.
Thread-safety note: this registry is accessed from both the main
agent loop (via retrieval) and the background consolidation engine.
All mutations use a simple dict which is safe for asyncio (single-
threaded event loop) but would need a lock for true multi-threading.
"""
def __init__(self) -> None:
self._states: dict[str, MemoryRetentionState] = {}
def register(self, memory_id: str, initial_stability: float = 1.0) -> None:
"""Register a new memory with the given initial stability."""
if memory_id not in self._states:
state = MemoryRetentionState(
memory_id=memory_id,
stability=initial_stability,
)
self._states[memory_id] = state
def get(self, memory_id: str) -> MemoryRetentionState | None:
"""Retrieve the retention state for a memory, or None if not found."""
return self._states.get(memory_id)
def reinforce(self, memory_id: str, boost_base: float = 0.5) -> None:
"""
Reinforce a memory's retention state.
If the memory is not registered, this is a no-op.
"""
state = self._states.get(memory_id)
if state:
state.reinforce(boost_base)
def remove(self, memory_id: str) -> None:
"""Remove a memory's retention state (called when memory is deleted)."""
self._states.pop(memory_id, None)
def __len__(self) -> int:
"""Return the number of tracked memories."""
return len(self._states)
CHAPTER TWELVE: THE EPISODIC MEMORY STORE
Episodic memory stores specific events and experiences, tagged with temporal and contextual metadata. The Episodic Memory Store is divided into two sub-tiers that mirror the human distinction between recent episodic memory (hippocampal) and consolidated episodic memory (neocortical).
The Short-Term Episodic Cache holds recent events from the last few hours to a few days. Entries here are rich in detail -- they preserve the full content of interactions. The Medium-Term Episodic Store holds summarized and consolidated episodes from the past few weeks. The temporal index maps hour-aligned time buckets to entry IDs, enabling O(buckets) time-range queries rather than O(n) full scans. When entries are forgotten, the temporal index is cleaned up to prevent it from growing unboundedly.
# ucma/episodic_store.py
"""
Episodic Memory Store: two-tier storage for event memories.
The short-term tier holds recent, detailed episodic entries. The medium-term
tier holds summarized, consolidated entries. Both tiers share a temporal
index for fast time-range queries. The store is fully async-safe.
"""
from __future__ import annotations
import asyncio
import logging
import time
from collections import defaultdict
from ucma.models import EpisodicEntry
logger = logging.getLogger(__name__)
class EpisodicMemoryStore:
"""
Two-tier episodic memory store with temporal indexing.
The temporal index maps hour-aligned Unix timestamp buckets to lists
of entry IDs. A time-range query only examines the relevant buckets,
reducing complexity from O(n) to O(b + k) where b is the number of
buckets in the range and k is the number of matching entries.
"""
_BUCKET_SIZE_SECONDS: int = 3600 # One bucket per hour
def __init__(self, short_term_ttl_hours: float = 48.0) -> None:
self._short_term: dict[str, EpisodicEntry] = {}
self._medium_term: dict[str, EpisodicEntry] = {}
self._short_term_ttl: float = short_term_ttl_hours * 3600.0
# temporal_index: bucket_key -> list of entry_ids
self._temporal_index: dict[int, list[str]] = defaultdict(list)
self._lock = asyncio.Lock()
def _bucket_key(self, timestamp: float) -> int:
"""Compute the hour-aligned bucket key for a timestamp."""
return int(timestamp // self._BUCKET_SIZE_SECONDS)
async def store(self, entry: EpisodicEntry) -> None:
"""
Store a new episodic entry in the appropriate tier and update
the temporal index. If an entry with the same ID already exists,
it is replaced (used when promoting from short-term to medium-term).
"""
async with self._lock:
if entry.tier == "short_term":
self._short_term[entry.entry_id] = entry
else:
# Remove from short-term if it was there (promotion)
self._short_term.pop(entry.entry_id, None)
self._medium_term[entry.entry_id] = entry
# Add to temporal index (only if not already present)
bucket = self._bucket_key(entry.timestamp)
if entry.entry_id not in self._temporal_index[bucket]:
self._temporal_index[bucket].append(entry.entry_id)
async def get(self, entry_id: str) -> EpisodicEntry | None:
"""Retrieve a single entry by ID from either tier."""
async with self._lock:
entry = self._short_term.get(entry_id) or self._medium_term.get(entry_id)
if entry:
entry.touch()
return entry
async def get_expired_short_term(self) -> list[EpisodicEntry]:
"""
Return all short-term entries that have exceeded their TTL.
These are candidates for compression into medium-term storage.
"""
now = time.time()
async with self._lock:
expired = [
entry for entry in self._short_term.values()
if now - entry.timestamp > self._short_term_ttl
]
return expired
async def retrieve_by_time_range(
self,
start_time: float,
end_time: float,
) -> list[EpisodicEntry]:
"""
Retrieve all entries (from both tiers) within a time range.
Uses the temporal bucket index for O(b + k) performance rather
than O(n) full scan.
"""
start_bucket = self._bucket_key(start_time)
end_bucket = self._bucket_key(end_time)
async with self._lock:
results: list[EpisodicEntry] = []
for bucket in range(start_bucket, end_bucket + 1):
for entry_id in self._temporal_index.get(bucket, []):
entry = (
self._short_term.get(entry_id)
or self._medium_term.get(entry_id)
)
if entry and start_time <= entry.timestamp <= end_time:
entry.touch()
results.append(entry)
return results
async def get_all_short_term(self) -> list[EpisodicEntry]:
"""Return all short-term entries (for batch processing)."""
async with self._lock:
return list(self._short_term.values())
async def get_all_medium_term_ids(self) -> list[str]:
"""Return all medium-term entry IDs (for forgetting passes)."""
async with self._lock:
return list(self._medium_term.keys())
async def delete(self, entry_id: str) -> bool:
"""
Permanently delete an entry from either tier and clean up the
temporal index. Returns True if the entry was found and deleted.
"""
async with self._lock:
entry = self._short_term.pop(
entry_id, None
) or self._medium_term.pop(entry_id, None)
if entry is None:
return False
# Clean up temporal index entry
bucket = self._bucket_key(entry.timestamp)
bucket_list = self._temporal_index.get(bucket)
if bucket_list:
try:
bucket_list.remove(entry_id)
except ValueError:
pass
if not bucket_list:
del self._temporal_index[bucket]
return True
def stats(self) -> dict[str, int]:
"""Return store statistics for observability."""
return {
"short_term_count": len(self._short_term),
"medium_term_count": len(self._medium_term),
"temporal_buckets": len(self._temporal_index),
}
CHAPTER THIRTEEN: THE SEMANTIC MEMORY STORE
Semantic memory stores facts, knowledge, and general truths about the world and about the specific domain of the application. Unlike episodic memory, semantic memories have no "when" -- they are timeless propositions. The Semantic Memory Store is a vector-ready fact store augmented with entity indexing and a knowledge graph layer. The entity index enables O(1) lookup of all facts about a given entity. The knowledge graph connects entities through typed relationships, enabling multi-hop relational reasoning.
A critical design decision is the deterministic fact ID. When the same fact is derived from multiple episodes, it always maps to the same ID. Repeated assertions increase confidence rather than creating duplicate entries, keeping the store clean and its retrieval results non-redundant. Both the subject and the object of each fact are indexed, so graph traversal works from either end of a triple.
# ucma/semantic_store.py
"""
Semantic Memory Store: long-term fact storage with entity indexing and
a knowledge graph layer.
Facts are stored as subject-predicate-object triples with natural language
content and optional vector embeddings. The entity index supports O(1)
structured lookup. Both subject AND object are indexed so that graph
traversal from either direction of a triple is equally fast. The knowledge
graph supports multi-hop relational reasoning via breadth-first traversal.
"""
from __future__ import annotations
import asyncio
import logging
from collections import defaultdict, deque
from ucma.models import KnowledgeGraphEdge, SemanticFact
logger = logging.getLogger(__name__)
class SemanticMemoryStore:
"""
Long-term semantic memory with entity indexing and knowledge graph.
Combines fast structured lookup (entity index) with graph traversal
(knowledge graph) for relational reasoning. Vector embeddings on
SemanticFact objects enable semantic similarity search via the
Unified Retrieval Layer.
"""
def __init__(self) -> None:
# Primary store: fact_id -> SemanticFact
self._facts: dict[str, SemanticFact] = {}
# Entity index: entity_name -> set of fact_ids mentioning it.
# We index both subject AND object_value so that traversal from
# either direction of a triple is equally fast.
self._entity_index: dict[str, set[str]] = defaultdict(set)
# Knowledge graph: entity -> list of outgoing edges
self._graph: dict[str, list[KnowledgeGraphEdge]] = defaultdict(list)
self._lock = asyncio.Lock()
async def assert_fact(self, fact: SemanticFact) -> str:
"""
Add or update a semantic fact.
If a fact with the same subject-predicate-object triple already
exists, we update its confidence and source list rather than
creating a duplicate. Returns the fact_id of the stored fact.
"""
fact_id = SemanticFact.make_deterministic_id(
fact.subject, fact.predicate, fact.object_value
)
fact.fact_id = fact_id
async with self._lock:
if fact_id in self._facts:
existing = self._facts[fact_id]
for src in fact.source_episode_ids:
existing.reinforce(src)
logger.debug(
"Reinforced existing fact %s (confidence=%.2f).",
fact_id, existing.confidence,
)
else:
self._facts[fact_id] = fact
# Index both subject and object so traversal works from either end
self._entity_index[fact.subject].add(fact_id)
self._entity_index[fact.object_value].add(fact_id)
# Add a knowledge graph edge for this fact
edge = KnowledgeGraphEdge(
source_entity=fact.subject,
relationship=fact.predicate,
target_entity=fact.object_value,
)
self._graph[fact.subject].append(edge)
logger.debug(
"Asserted new fact %s: %s.", fact_id, fact.content
)
return fact_id
async def get_facts_about(self, entity: str) -> list[SemanticFact]:
"""
Retrieve all known facts about a specific entity (as subject or object).
Uses the entity index for O(1) lookup. Updates access statistics
on each retrieved fact.
"""
async with self._lock:
fact_ids = self._entity_index.get(entity, set())
facts = [self._facts[fid] for fid in fact_ids if fid in self._facts]
for fact in facts:
fact.touch()
return facts
async def get_fact(self, fact_id: str) -> SemanticFact | None:
"""Retrieve a single fact by ID."""
async with self._lock:
fact = self._facts.get(fact_id)
if fact:
fact.touch()
return fact
async def traverse_graph(
self, start_entity: str, max_hops: int = 3
) -> list[KnowledgeGraphEdge]:
"""
Perform a breadth-first traversal of the knowledge graph starting
from a given entity, up to max_hops away.
This allows the retrieval system to find related knowledge that is
not directly about the queried entity but is connected through a
chain of relationships. Both forward (subject->object) and reverse
(object->subject) edges are traversable because we index both ends.
"""
visited: set[str] = set()
frontier: deque[tuple[str, int]] = deque([(start_entity, 0)])
all_edges: list[KnowledgeGraphEdge] = []
async with self._lock:
while frontier:
entity, hops = frontier.popleft()
if entity in visited or hops > max_hops:
continue
visited.add(entity)
for edge in self._graph.get(entity, []):
all_edges.append(edge)
if hops + 1 <= max_hops:
frontier.append((edge.target_entity, hops + 1))
return all_edges
async def delete_fact(self, fact_id: str) -> bool:
"""
Permanently delete a fact and clean up indexes.
Called by the Forgetting Engine.
"""
async with self._lock:
fact = self._facts.pop(fact_id, None)
if fact is None:
return False
self._entity_index[fact.subject].discard(fact_id)
self._entity_index[fact.object_value].discard(fact_id)
# Remove empty index entries to prevent unbounded growth
if not self._entity_index[fact.subject]:
del self._entity_index[fact.subject]
if not self._entity_index.get(fact.object_value) is not None and \
not self._entity_index[fact.object_value]:
del self._entity_index[fact.object_value]
return True
async def get_all_fact_ids(self) -> list[str]:
"""Return all fact IDs (for batch forgetting passes)."""
async with self._lock:
return list(self._facts.keys())
def stats(self) -> dict[str, int]:
"""Return store statistics for observability."""
return {
"fact_count": len(self._facts),
"entity_count": len(self._entity_index),
"graph_entities": len(self._graph),
}
CHAPTER FOURTEEN: THE PROCEDURAL MEMORY STORE
Procedural memory is the most underappreciated type of memory in current LLM systems. It stores not facts or episodes but knowledge of how to do things: workflows, tool usage patterns, problem-solving strategies, and learned behaviors. Procedural memories are stored as structured workflow templates with conditional logic, parameter slots, and success/failure criteria. They are retrieved not by semantic similarity to a query but by matching the current situation to known procedure patterns via tag-based lookup.
The success_rate field and the sorting logic in find_by_tags implement a crucial property: the agent learns to prefer procedures that actually work. The sorting formula combines success rate with a confidence factor that approaches 1.0 only after ten or more executions, preventing the system from over-trusting procedures with insufficient evidence.
# ucma/procedural_store.py
"""
Procedural Memory Store: storage and retrieval of reusable workflow templates.
Procedures are learned from successful episode sequences and refined over time.
Retrieval is tag-based and sorted by empirical success rate, so the agent
preferentially uses procedures that have worked well in the past.
"""
from __future__ import annotations
import asyncio
import logging
import uuid
from collections import defaultdict
from ucma.models import ProceduralMemory, ProcedureStatus, ProcedureStep
logger = logging.getLogger(__name__)
class ProceduralMemoryStore:
"""
Stores and retrieves procedural memories.
Retrieval is based on tag matching: given a set of situation tags,
find procedures whose tags overlap. Results are sorted by a
confidence-weighted success rate to prefer well-validated procedures.
"""
def __init__(self) -> None:
self._procedures: dict[str, ProceduralMemory] = {}
# Tag index: tag -> set of procedure_ids with that tag
self._tag_index: dict[str, set[str]] = defaultdict(set)
self._lock = asyncio.Lock()
async def store_procedure(self, procedure: ProceduralMemory) -> str:
"""Store a new procedure and update the tag index."""
async with self._lock:
self._procedures[procedure.procedure_id] = procedure
for tag in procedure.trigger_tags:
self._tag_index[tag].add(procedure.procedure_id)
logger.debug(
"Stored procedure '%s' with tags %s.",
procedure.name, procedure.trigger_tags,
)
return procedure.procedure_id
async def find_by_tags(
self, tags: list[str], min_success_rate: float = 0.0
) -> list[ProceduralMemory]:
"""
Find procedures whose trigger tags overlap with the given tags.
Results are sorted by confidence-weighted success rate:
score = success_rate * min(execution_count / 10.0, 1.0)
The confidence factor (min(n/10, 1.0)) approaches 1.0 only after
ten or more executions, preventing the system from over-trusting
procedures with insufficient evidence.
"""
async with self._lock:
matching_ids: set[str] = set()
for tag in tags:
matching_ids.update(self._tag_index.get(tag, set()))
results = [
self._procedures[pid]
for pid in matching_ids
if pid in self._procedures
and self._procedures[pid].success_rate >= min_success_rate
and self._procedures[pid].status != ProcedureStatus.DEPRECATED
]
def sort_key(proc: ProceduralMemory) -> float:
confidence = min(proc.execution_count / 10.0, 1.0)
return proc.success_rate * confidence
results.sort(key=sort_key, reverse=True)
return results
async def get_procedure(self, procedure_id: str) -> ProceduralMemory | None:
"""Retrieve a single procedure by ID."""
async with self._lock:
return self._procedures.get(procedure_id)
async def update_procedure(
self, procedure_id: str, success: bool
) -> None:
"""Record the outcome of a procedure execution."""
async with self._lock:
proc = self._procedures.get(procedure_id)
if proc:
proc.update_statistics(success)
async def delete_procedure(self, procedure_id: str) -> bool:
"""Permanently delete a procedure and clean up the tag index."""
async with self._lock:
proc = self._procedures.pop(procedure_id, None)
if proc is None:
return False
for tag in proc.trigger_tags:
self._tag_index[tag].discard(procedure_id)
if not self._tag_index[tag]:
del self._tag_index[tag]
return True
async def get_all_procedure_ids(self) -> list[str]:
"""Return all procedure IDs (for batch forgetting passes)."""
async with self._lock:
return list(self._procedures.keys())
def stats(self) -> dict[str, int]:
"""Return store statistics for observability."""
return {
"procedure_count": len(self._procedures),
"tag_count": len(self._tag_index),
}
CHAPTER FIFTEEN: THE LEARNING STORE
The Learning Store holds the highest-level, most abstract form of memory in UCMA: distilled insights derived from experience. A learning record is not a description of what happened (that is episodic memory) or a fact about the world (that is semantic memory). It is a lesson: a generalized principle that the agent has extracted from its experience and can apply to future situations.
Learnings are retrieved by applicability tags and learning type, filtered by minimum confidence. This prevents the agent from acting on poorly-validated insights. The asymmetric confidence update in LearningRecord.validate() makes the system conservative: it requires multiple successes to build confidence but only a few failures to erode it.
# ucma/learning_store.py
"""
Learning Store: storage and retrieval of distilled insights from experience.
Learnings are the highest-level, most abstract form of memory in UCMA.
They represent the agent's growing wisdom about how to operate effectively.
Retrieval is by applicability tags and learning type, filtered by minimum
confidence to prevent the agent from acting on poorly-validated insights.
"""
from __future__ import annotations
import asyncio
import logging
from collections import defaultdict
from typing import Optional
from ucma.models import LearningRecord, LearningType
logger = logging.getLogger(__name__)
class LearningStore:
"""
Stores and retrieves learning records.
The Learning Store is queried at the beginning of each task to surface
relevant lessons from past experience. It is updated at the end of each
task by the Sleep Consolidation Engine.
"""
def __init__(self) -> None:
self._learnings: dict[str, LearningRecord] = {}
# Index by applicability tags for fast situational retrieval
self._applicability_index: dict[str, set[str]] = defaultdict(set)
# Index by learning type
self._type_index: dict[str, set[str]] = defaultdict(set)
self._lock = asyncio.Lock()
async def store_learning(self, learning: LearningRecord) -> str:
"""Store a new learning record and update indexes."""
async with self._lock:
self._learnings[learning.learning_id] = learning
for tag in learning.applicability:
self._applicability_index[tag].add(learning.learning_id)
self._type_index[learning.learning_type.value].add(
learning.learning_id
)
logger.debug(
"Stored learning '%s' type=%s.",
learning.title, learning.learning_type.value,
)
return learning.learning_id
async def retrieve_relevant(
self,
tags: list[str],
learning_type: Optional[LearningType] = None,
min_confidence: float = 0.3,
max_results: int = 5,
) -> list[LearningRecord]:
"""
Retrieve learning records relevant to the given situation tags.
Results are filtered by minimum confidence and sorted by a
combined score of confidence and impact.
"""
async with self._lock:
matching_ids: set[str] = set()
for tag in tags:
matching_ids.update(self._applicability_index.get(tag, set()))
if learning_type is not None:
type_ids = self._type_index.get(learning_type.value, set())
matching_ids &= type_ids
results = [
self._learnings[lid]
for lid in matching_ids
if lid in self._learnings
and self._learnings[lid].confidence >= min_confidence
]
results.sort(
key=lambda lr: lr.confidence * lr.impact_score, reverse=True
)
return results[:max_results]
async def get_learning(self, learning_id: str) -> LearningRecord | None:
"""Retrieve a single learning by ID."""
async with self._lock:
return self._learnings.get(learning_id)
async def delete_learning(self, learning_id: str) -> bool:
"""Permanently delete a learning and clean up indexes."""
async with self._lock:
learning = self._learnings.pop(learning_id, None)
if learning is None:
return False
for tag in learning.applicability:
self._applicability_index[tag].discard(learning_id)
if not self._applicability_index[tag]:
del self._applicability_index[tag]
self._type_index[learning.learning_type.value].discard(learning_id)
return True
async def get_all_learning_ids(self) -> list[str]:
"""Return all learning IDs (for batch forgetting passes)."""
async with self._lock:
return list(self._learnings.keys())
def stats(self) -> dict[str, int]:
"""Return store statistics for observability."""
return {
"learning_count": len(self._learnings),
"applicability_tags": len(self._applicability_index),
}
CHAPTER SIXTEEN: THE MEMORY GRAPH
The Memory Graph is the connective tissue of UCMA. It is a directed, weighted graph where nodes represent memories of any type and edges represent typed relationships between them. The graph serves three critical functions: relational retrieval (finding memories connected to a starting memory), causal reasoning (following "caused_by" and "followed_by" edges), and guiding the Sleep Consolidation Engine (densely connected clusters are candidates for abstraction).
The PageRank-style importance computation identifies memories that are structurally central to the knowledge network. These central memories are protected from forgetting even if they have not been recently accessed, because deleting them would break many connections in the graph. The dangling node problem -- where nodes with no outgoing edges absorb PageRank without redistributing it -- is handled by redistributing dangling node scores evenly across all nodes at each iteration.
# ucma/memory_graph.py
"""
Memory Graph: the connective tissue of UCMA.
A directed, weighted, typed graph over all memory nodes. Supports efficient
traversal, cluster detection, and PageRank-style importance propagation for
identifying the most central memories. Dangling nodes (no outgoing edges)
are handled correctly by redistributing their scores evenly.
"""
from __future__ import annotations
import asyncio
import heapq
import logging
from typing import Optional
from ucma.config import UCMAConfig
from ucma.models import MemoryEdge, MemoryNode, MemoryType
logger = logging.getLogger(__name__)
class MemoryGraph:
"""
Directed, weighted, typed graph over all memory nodes.
Nodes represent memories of any type. Edges represent typed
relationships: "caused_by", "followed_by", "supports",
"contradicts", "derived_from", "similar_to", "part_of".
"""
def __init__(self, config: UCMAConfig) -> None:
self._config = config
self._nodes: dict[str, MemoryNode] = {}
# Adjacency lists for O(1) neighbor lookup
self._outgoing: dict[str, list[MemoryEdge]] = {}
self._incoming: dict[str, list[MemoryEdge]] = {}
self._lock = asyncio.Lock()
async def add_node(self, node: MemoryNode) -> None:
"""Register a new memory node. Idempotent: re-adding is a no-op."""
async with self._lock:
if node.node_id not in self._nodes:
self._nodes[node.node_id] = node
self._outgoing[node.node_id] = []
self._incoming[node.node_id] = []
async def add_edge(self, edge: MemoryEdge) -> None:
"""
Add a directed edge between two existing nodes.
Silently skips if either node is not registered, rather than
raising, because the graph is built incrementally and a node
may not yet be registered when an edge is first created.
"""
async with self._lock:
if edge.source_id not in self._nodes:
logger.warning(
"Edge source %s not in graph; skipping edge.",
edge.source_id,
)
return
if edge.target_id not in self._nodes:
logger.warning(
"Edge target %s not in graph; skipping edge.",
edge.target_id,
)
return
self._outgoing[edge.source_id].append(edge)
self._incoming[edge.target_id].append(edge)
async def remove_node(self, node_id: str) -> None:
"""
Remove a node and all of its incident edges from the graph.
Called by the Forgetting Engine when a memory is deleted.
"""
async with self._lock:
if node_id not in self._nodes:
return
del self._nodes[node_id]
# Remove all outgoing edges from this node
for edge in self._outgoing.pop(node_id, []):
incoming = self._incoming.get(edge.target_id, [])
self._incoming[edge.target_id] = [
e for e in incoming if e.source_id != node_id
]
# Remove all incoming edges to this node
for edge in self._incoming.pop(node_id, []):
outgoing = self._outgoing.get(edge.source_id, [])
self._outgoing[edge.source_id] = [
e for e in outgoing if e.target_id != node_id
]
async def find_related(
self,
start_node_id: str,
max_hops: int = 2,
edge_types: Optional[list[str]] = None,
min_weight: float = 0.0,
) -> list[tuple[str, float, int]]:
"""
Find nodes reachable from start_node_id within max_hops.
Returns a list of (node_id, cumulative_weight, hops) tuples,
sorted by cumulative weight descending. Uses a max-heap (via
negated weights) for efficient best-first traversal.
Returns an empty list if start_node_id is not in the graph.
"""
async with self._lock:
if start_node_id not in self._nodes:
logger.debug(
"find_related: start node %s not in graph.", start_node_id
)
return []
# Max-heap via negated weights: (-weight, node_id, hops)
heap: list[tuple[float, str, int]] = [(-1.0, start_node_id, 0)]
best_weight: dict[str, float] = {start_node_id: 1.0}
results: list[tuple[str, float, int]] = []
while heap:
neg_weight, node_id, hops = heapq.heappop(heap)
current_weight = -neg_weight
# Skip if we have already found a better path to this node
if current_weight < best_weight.get(node_id, 0.0):
continue
if hops > max_hops:
continue
if node_id != start_node_id:
results.append((node_id, current_weight, hops))
if hops == max_hops:
continue
for edge in self._outgoing.get(node_id, []):
if edge_types and edge.edge_type not in edge_types:
continue
if edge.weight < min_weight:
continue
new_weight = current_weight * edge.weight
neighbor = edge.target_id
if new_weight > best_weight.get(neighbor, 0.0):
best_weight[neighbor] = new_weight
heapq.heappush(
heap, (-new_weight, neighbor, hops + 1)
)
results.sort(key=lambda x: x[1], reverse=True)
return results
async def compute_importance_scores(self) -> dict[str, float]:
"""
Compute PageRank-style importance scores for all nodes.
Nodes referenced by many other important nodes receive high scores.
Dangling nodes (no outgoing edges) are handled by redistributing
their score evenly across all nodes at each iteration, which is the
standard solution to the dangling node problem in PageRank.
"""
async with self._lock:
n = len(self._nodes)
if n == 0:
return {}
node_ids = list(self._nodes.keys())
damping = self._config.pagerank_damping
iterations = self._config.pagerank_iterations
# Initialize scores uniformly
scores: dict[str, float] = {nid: 1.0 / n for nid in node_ids}
for _ in range(iterations):
new_scores: dict[str, float] = {}
# Identify dangling nodes (no outgoing edges)
dangling_sum = sum(
scores[nid]
for nid in node_ids
if not self._outgoing.get(nid)
)
for node_id in node_ids:
# Base score: teleportation + dangling redistribution
base = (1.0 - damping) / n + damping * dangling_sum / n
# Incoming contributions
incoming_sum = 0.0
for edge in self._incoming.get(node_id, []):
source_out = self._outgoing.get(edge.source_id, [])
if source_out:
total_out_weight = sum(e.weight for e in source_out)
if total_out_weight > 0:
incoming_sum += (
scores[edge.source_id]
* edge.weight
/ total_out_weight
)
new_scores[node_id] = base + damping * incoming_sum
scores = new_scores
return scores
async def update_node_importance(
self, node_id: str, importance: float
) -> None:
"""Update the importance of a node after PageRank computation."""
async with self._lock:
node = self._nodes.get(node_id)
if node:
node.importance = importance
def stats(self) -> dict[str, int]:
"""Return graph statistics for observability."""
return {
"node_count": len(self._nodes),
"edge_count": sum(
len(edges) for edges in self._outgoing.values()
),
}
CHAPTER SEVENTEEN: THE EMBEDDING CACHE
Embedding model calls are expensive: they require a network round-trip to an external API and add 50-200ms of latency. The Embedding Cache eliminates redundant calls by storing the embeddings of recently seen texts in an LRU cache. In a typical session where the user asks several related questions, the cache hit rate exceeds 60%, reducing embedding API costs and latency by more than half.
The implementation uses Python's collections.OrderedDict, which provides O(1) move_to_end() operations. This makes both get and put operations O(1) amortized -- a significant improvement over the list-based approach that requires O(n) removal for LRU tracking.
# ucma/embedding_cache.py
"""
LRU Embedding Cache: avoids redundant embedding model calls.
Since the same or similar queries are often repeated within a session,
caching their embeddings eliminates redundant model calls and significantly
reduces retrieval latency. Implemented with OrderedDict for O(1) LRU
operations (move_to_end is O(1) in CPython's OrderedDict).
"""
from __future__ import annotations
import asyncio
import logging
from collections import OrderedDict
logger = logging.getLogger(__name__)
class EmbeddingCache:
"""
Thread-safe LRU cache for text embeddings.
Uses collections.OrderedDict which provides O(1) move_to_end(),
making both get and put operations O(1) amortized. This is
significantly more efficient than a list-based approach, which
requires O(n) removal for LRU tracking.
"""
def __init__(self, max_size: int = 256) -> None:
self._cache: OrderedDict[str, list[float]] = OrderedDict()
self._max_size = max_size
self._lock = asyncio.Lock()
self._hits: int = 0
self._misses: int = 0
async def get(self, text: str) -> list[float] | None:
"""
Retrieve a cached embedding, or None if not cached.
On a hit, the entry is moved to the end (most recently used).
"""
async with self._lock:
if text in self._cache:
self._cache.move_to_end(text)
self._hits += 1
return self._cache[text]
self._misses += 1
return None
async def put(self, text: str, embedding: list[float]) -> None:
"""
Store an embedding in the cache.
If the cache is full, the least recently used entry (the one
at the front of the OrderedDict) is evicted first.
"""
async with self._lock:
if text in self._cache:
self._cache.move_to_end(text)
self._cache[text] = embedding
else:
if len(self._cache) >= self._max_size:
evicted_key, _ = self._cache.popitem(last=False)
logger.debug("Embedding cache evicted key: %s...", evicted_key[:40])
self._cache[text] = embedding
@property
def hit_rate(self) -> float:
"""Return the cache hit rate as a fraction (0.0 to 1.0)."""
total = self._hits + self._misses
return self._hits / total if total > 0 else 0.0
def stats(self) -> dict[str, object]:
"""Return cache statistics for observability."""
return {
"size": len(self._cache),
"max_size": self._max_size,
"hits": self._hits,
"misses": self._misses,
"hit_rate": round(self.hit_rate, 3),
}
CHAPTER EIGHTEEN: LLM PROVIDER PROTOCOLS AND IMPLEMENTATIONS
UCMA uses Python Protocols to define the interfaces for all LLM-dependent operations. This allows any embedding provider, summarization model, or fact extraction model to be plugged in without modifying the core architecture. The OpenAI implementation is provided as the default, but any provider that implements the Protocol can be substituted.
Using typing.Protocol (structural subtyping) rather than abstract base classes means that any class implementing the required methods satisfies the interface without explicit inheritance. This makes it trivial to swap providers or use mock implementations in tests.
# ucma/providers/base.py
"""
Protocol definitions for all LLM provider interfaces used by UCMA.
Using typing.Protocol (structural subtyping) rather than abstract base
classes means that any class implementing the required methods satisfies
the interface without explicit inheritance. This makes it trivial to swap
providers or use mock implementations in tests.
"""
from __future__ import annotations
from typing import Protocol, runtime_checkable
@runtime_checkable
class EmbeddingProvider(Protocol):
"""Interface for any embedding model provider."""
async def embed(self, texts: list[str]) -> list[list[float]]:
"""
Compute dense vector embeddings for a list of texts.
Returns a list of embedding vectors, one per input text.
The dimensionality of each vector is provider-dependent
and must match the UCMAConfig.embedding_dim setting.
"""
...
@runtime_checkable
class SummarizationProvider(Protocol):
"""Interface for any LLM used for text summarization."""
async def summarize(self, content: str, max_tokens: int = 200) -> str:
"""
Produce a concise summary of the given content.
The summary should preserve the most important information
while fitting within the max_tokens budget.
"""
...
@runtime_checkable
class FactExtractionProvider(Protocol):
"""Interface for an LLM used to extract structured facts from text."""
async def extract_facts(self, content: str) -> list[dict]:
"""
Extract subject-predicate-object triples from text.
Returns a list of dicts, each with keys:
"subject" (str): the entity the fact is about
"predicate" (str): the relationship or property
"object" (str): the value or related entity
"content" (str): human-readable form of the triple
"confidence" (float): confidence in this extraction (0-1)
"""
...
@runtime_checkable
class ImportanceClassifierProvider(Protocol):
"""Interface for a model that scores the importance of content."""
async def classify_importance(
self, content: str, source: str
) -> float:
"""
Estimate the importance of a piece of content (0.0 to 1.0).
Higher scores indicate content that is more likely to be
worth storing in long-term memory.
"""
...
# ucma/providers/openai_provider.py
"""
OpenAI concrete implementations of all UCMA provider protocols.
These implementations use the OpenAI Python SDK (v1.x) with async support.
The OPENAI_API_KEY environment variable must be set before instantiation.
All methods include graceful error handling with safe fallback values so
that the system continues operating even when the API is temporarily
unavailable.
"""
from __future__ import annotations
import json
import logging
import os
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
class OpenAIEmbeddingProvider:
"""
Embedding provider using OpenAI's text-embedding models.
Supports batched embedding requests to minimize API round-trips.
The default model (text-embedding-3-small) produces 1536-dimensional
vectors and offers an excellent quality/cost tradeoff.
"""
def __init__(
self,
model: str = "text-embedding-3-small",
api_key: str | None = None,
) -> None:
self._model = model
self._client = AsyncOpenAI(
api_key=api_key or os.environ.get("OPENAI_API_KEY")
)
async def embed(self, texts: list[str]) -> list[list[float]]:
"""
Compute embeddings for a batch of texts.
OpenAI's embedding API accepts up to 2048 inputs per request.
For larger batches, callers should chunk the input themselves.
Returns zero vectors as a safe fallback if the API is unavailable.
"""
if not texts:
return []
try:
response = await self._client.embeddings.create(
model=self._model,
input=texts,
)
# Response items are ordered to match the input order
return [item.embedding for item in response.data]
except Exception as exc:
logger.error("Embedding request failed: %s", exc)
# Return zero vectors as a safe fallback so the system
# continues operating even if the embedding model is unavailable
dim = 1536 # Default for text-embedding-3-small
return [[0.0] * dim for _ in texts]
class OpenAISummarizationProvider:
"""
Summarization provider using OpenAI's chat completion models.
"""
_SYSTEM_PROMPT = (
"You are a memory consolidation assistant. "
"Produce a concise, factual summary of the provided text. "
"Preserve all important facts, decisions, errors, and outcomes. "
"Omit filler and conversational pleasantries."
)
def __init__(
self,
model: str = "gpt-4o-mini",
api_key: str | None = None,
) -> None:
self._model = model
self._client = AsyncOpenAI(
api_key=api_key or os.environ.get("OPENAI_API_KEY")
)
async def summarize(self, content: str, max_tokens: int = 200) -> str:
"""Produce a concise summary of the given content."""
try:
response = await self._client.chat.completions.create(
model=self._model,
messages=[
{"role": "system", "content": self._SYSTEM_PROMPT},
{"role": "user", "content": content},
],
max_tokens=max_tokens,
temperature=0.3,
)
return response.choices[0].message.content or content[:500]
except Exception as exc:
logger.error("Summarization request failed: %s", exc)
# Fall back to a simple truncation
return content[:500]
class OpenAIFactExtractionProvider:
"""
Fact extraction provider using OpenAI's chat completion models
with structured JSON output.
"""
_SYSTEM_PROMPT = """You are a knowledge extraction assistant.
Extract factual subject-predicate-object triples from the given text.
Return a JSON object with a "facts" key containing an array of objects,
each with these exact keys:
"subject" - the entity the fact is about (string)
"predicate" - the relationship or property (string)
"object" - the value or related entity (string)
"content" - a natural language sentence expressing the fact (string)
"confidence" - your confidence this is a true fact, 0.0 to 1.0 (float)
If no facts can be extracted, return {"facts": []}."""
def __init__(
self,
model: str = "gpt-4o-mini",
api_key: str | None = None,
) -> None:
self._model = model
self._client = AsyncOpenAI(
api_key=api_key or os.environ.get("OPENAI_API_KEY")
)
async def extract_facts(self, content: str) -> list[dict]:
"""Extract SPO triples from text using a structured LLM prompt."""
try:
response = await self._client.chat.completions.create(
model=self._model,
messages=[
{"role": "system", "content": self._SYSTEM_PROMPT},
{"role": "user", "content": content},
],
max_tokens=512,
temperature=0.0,
response_format={"type": "json_object"},
)
raw = response.choices[0].message.content or '{"facts": []}'
parsed = json.loads(raw)
# Handle both {"facts": [...]} and [...] response shapes
if isinstance(parsed, dict):
parsed = parsed.get("facts", [])
if not isinstance(parsed, list):
return []
return parsed
except Exception as exc:
logger.error("Fact extraction request failed: %s", exc)
return []
class OpenAIImportanceClassifier:
"""
Importance classifier using a fast heuristic approach combined with
optional LLM scoring for ambiguous cases.
The heuristic handles the common cases in O(1) time. The LLM is
only called when the heuristic score falls in the ambiguous middle
range (0.35 to 0.65), keeping API costs low.
"""
# Sources ordered by inherent importance
_SOURCE_WEIGHTS: dict[str, float] = {
"system": 0.90,
"user": 0.70,
"tool_result": 0.50,
"environment": 0.30,
}
# Keywords that strongly signal importance
_HIGH_IMPORTANCE_KEYWORDS: frozenset[str] = frozenset({
"error", "failed", "failure", "critical", "important", "remember",
"always", "never", "decided", "preference", "must", "required",
"deadline", "urgent", "warning", "success", "completed", "fixed",
})
def __init__(
self,
model: str = "gpt-4o-mini",
api_key: str | None = None,
use_llm_for_ambiguous: bool = False,
) -> None:
self._model = model
self._use_llm = use_llm_for_ambiguous
self._client = (
AsyncOpenAI(api_key=api_key or os.environ.get("OPENAI_API_KEY"))
if use_llm_for_ambiguous
else None
)
async def classify_importance(
self, content: str, source: str
) -> float:
"""
Estimate the importance of content using a fast heuristic.
The heuristic combines source weight, keyword presence, and
content length into a score in [0.0, 1.0]. If the score is
ambiguous (0.35-0.65) and LLM scoring is enabled, an LLM call
refines the estimate.
"""
lower = content.lower()
source_weight = self._SOURCE_WEIGHTS.get(source, 0.40)
# Keyword boost: each matching keyword adds 0.10, capped at 0.30
keyword_hits = sum(
1 for kw in self._HIGH_IMPORTANCE_KEYWORDS if kw in lower
)
keyword_boost = min(keyword_hits * 0.10, 0.30)
# Length signal: very short content is likely less important
length_factor = min(len(content) / 500.0, 1.0)
score = source_weight * 0.5 + keyword_boost * 0.3 + length_factor * 0.2
# Optionally refine ambiguous scores with an LLM call
if self._use_llm and self._client and 0.35 <= score <= 0.65:
try:
response = await self._client.chat.completions.create(
model=self._model,
messages=[
{
"role": "system",
"content": (
"Rate the importance of storing the following "
"text in long-term memory on a scale from 0.0 "
"(not important) to 1.0 (very important). "
"Reply with only a decimal number."
),
},
{"role": "user", "content": content[:300]},
],
max_tokens=5,
temperature=0.0,
)
llm_score = float(
response.choices[0].message.content.strip()
)
score = max(0.0, min(1.0, llm_score))
except Exception as exc:
logger.debug(
"LLM importance scoring failed, using heuristic: %s", exc
)
return round(max(0.0, min(1.0, score)), 3)
CHAPTER NINETEEN: THE UNIFIED RETRIEVAL LAYER
All of the components described so far are only as useful as the retrieval system that connects them to the agent's reasoning process. The Unified Retrieval Layer is the interface through which the agent accesses all memory types with a single, consistent API. It hides the complexity of the underlying stores, performs intelligent query routing, combines results from multiple stores, and ranks them into a coherent, prioritized list.
The multi-factor ranking formula brings together semantic relevance, memory importance, recency, and retention strength into a single score. The weights are configurable via UCMAConfig. The graph expansion step implements associative retrieval -- the human experience of one memory triggering another related memory -- by following edges from the top retrieved memories to their neighbors.
# ucma/query_router.py
"""
Query Router: fast heuristic-based query classification.
Classifies natural language queries into retrieval modes in microseconds
using pure string matching, before any expensive operations are performed.
This routing step can reduce average retrieval latency by 40-60% by
avoiding unnecessary vector database calls for structured queries.
"""
from __future__ import annotations
from ucma.models import QueryMode
class QueryRouter:
"""
Analyzes incoming queries and determines the optimal retrieval strategy.
The router uses heuristic rules to classify queries before dispatching
them to the appropriate stores. All classification is done with O(k)
string matching where k is the number of keywords -- no model calls,
no embeddings, no latency.
"""
# Keywords that suggest temporal queries (user asking about past events)
_TEMPORAL_KEYWORDS: frozenset[str] = frozenset({
"yesterday", "last week", "last session", "earlier today",
"before", "after", "when did", "recently", "today", "ago",
"previously", "last time", "a while ago", "the other day",
})
# Keywords that suggest procedural queries (user asking how to do something)
_PROCEDURAL_KEYWORDS: frozenset[str] = frozenset({
"how to", "how do i", "how do you", "steps to", "procedure for",
"workflow for", "process for", "guide to", "tutorial",
})
# Prefixes that suggest entity/structured queries
_ENTITY_PREFIXES: tuple[str, ...] = (
"what do you know about",
"tell me about",
"what is",
"who is",
"facts about",
)
def classify(self, query_text: str) -> QueryMode:
"""
Classify a query into the most appropriate retrieval mode.
Returns QueryMode.HYBRID as the default when no specific
pattern is detected, which triggers both semantic and
structured retrieval for maximum recall.
"""
lower = query_text.lower().strip()
# Temporal check: is the user asking about a past event?
if any(kw in lower for kw in self._TEMPORAL_KEYWORDS):
return QueryMode.TEMPORAL
# Procedural check: is the user asking how to do something?
if any(kw in lower for kw in self._PROCEDURAL_KEYWORDS):
return QueryMode.STRUCTURED
# Entity check: is the user asking about a specific entity?
if any(lower.startswith(prefix) for prefix in self._ENTITY_PREFIXES):
return QueryMode.STRUCTURED
# Default: hybrid search combines semantic and structured retrieval
return QueryMode.HYBRID
# ucma/retrieval.py
"""
Unified Retrieval Layer: the single access point for all memory retrieval.
Implements intelligent query routing, multi-store retrieval, result fusion,
graph-based result expansion, and multi-factor ranking. All retrieval
requests from the agent pass through this layer.
"""
from __future__ import annotations
import logging
import math
import time
from typing import Optional
import numpy as np
from ucma.config import UCMAConfig
from ucma.embedding_cache import EmbeddingCache
from ucma.episodic_store import EpisodicMemoryStore
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import (
MemoryQuery,
MemoryType,
QueryMode,
RetrievalResult,
)
from ucma.procedural_store import ProceduralMemoryStore
from ucma.providers.base import EmbeddingProvider
from ucma.query_router import QueryRouter
from ucma.retention import RetentionRegistry
from ucma.semantic_store import SemanticMemoryStore
logger = logging.getLogger(__name__)
def _cosine_similarity(a: list[float], b: list[float]) -> float:
"""
Compute cosine similarity between two embedding vectors.
Returns a value in [-1.0, 1.0]. Returns 0.0 for zero vectors
to avoid division by zero.
"""
va = np.array(a, dtype=np.float32)
vb = np.array(b, dtype=np.float32)
norm_a = float(np.linalg.norm(va))
norm_b = float(np.linalg.norm(vb))
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return float(np.dot(va, vb) / (norm_a * norm_b))
class UnifiedRetrievalLayer:
"""
The single access point for all memory retrieval in UCMA.
Combines semantic ANN search, structured index lookup, temporal
bucket retrieval, and graph traversal into a unified, ranked result
set. The Query Router selects the optimal strategy for each query.
"""
def __init__(
self,
config: UCMAConfig,
episodic_store: EpisodicMemoryStore,
semantic_store: SemanticMemoryStore,
procedural_store: ProceduralMemoryStore,
learning_store: LearningStore,
memory_graph: MemoryGraph,
embedder: EmbeddingProvider,
retention_registry: RetentionRegistry,
) -> None:
self._config = config
self._episodic = episodic_store
self._semantic = semantic_store
self._procedural = procedural_store
self._learning = learning_store
self._graph = memory_graph
self._embedder = embedder
self._retention = retention_registry
self._router = QueryRouter()
self._embed_cache = EmbeddingCache(config.embedding_cache_max_size)
async def retrieve(self, query: MemoryQuery) -> list[RetrievalResult]:
"""
Execute a memory retrieval query and return ranked results.
This is the main entry point for all memory access in UCMA.
The method is fully async and safe to call concurrently.
"""
# Auto-classify mode if HYBRID is requested (use router to refine)
effective_mode = query.mode
if query.mode == QueryMode.HYBRID:
routed = self._router.classify(query.query_text)
# Only override HYBRID if the router found a more specific mode
if routed != QueryMode.HYBRID:
effective_mode = routed
logger.debug(
"Query router overrode HYBRID -> %s for: '%s'",
effective_mode.value,
query.query_text[:60],
)
results: list[RetrievalResult] = []
# Dispatch to appropriate retrieval strategies
if effective_mode in (QueryMode.SEMANTIC, QueryMode.HYBRID):
results.extend(await self._semantic_retrieval(query))
if effective_mode in (QueryMode.STRUCTURED, QueryMode.HYBRID):
results.extend(await self._structured_retrieval(query))
if effective_mode == QueryMode.TEMPORAL and query.time_range:
results.extend(await self._temporal_retrieval(query))
if effective_mode == QueryMode.GRAPH and query.seed_node_id:
results.extend(await self._graph_retrieval(query))
# For temporal/structured modes, also run semantic if we got few results
if len(results) < 3 and effective_mode != QueryMode.SEMANTIC:
semantic_supplement = await self._semantic_retrieval(query)
results.extend(semantic_supplement)
# Deduplicate, keeping the highest-scoring copy of each memory
results = self._deduplicate(results)
# Expand with graph neighbors for associative retrieval
if query.include_graph_neighbors and results:
results = await self._expand_with_graph(results, query)
# Apply multi-factor ranking
results = self._rank_results(results)
# Update retention states for all retrieved memories
self._update_access_stats(results)
top = results[:query.max_results]
logger.debug(
"Retrieved %d results for query '%s'",
len(top),
query.query_text[:60],
)
return top
async def _get_query_embedding(self, query_text: str) -> list[float] | None:
"""Get or compute the embedding for a query text, using the cache."""
cached = await self._embed_cache.get(query_text)
if cached is not None:
return cached
embeddings = await self._embedder.embed([query_text])
if embeddings:
await self._embed_cache.put(query_text, embeddings[0])
return embeddings[0]
return None
async def _semantic_retrieval(
self, query: MemoryQuery
) -> list[RetrievalResult]:
"""
Perform semantic similarity search across episodic and semantic stores.
Uses exact cosine similarity for correctness. For production deployments
with millions of memories, replace the inner loop with an HNSW index
call (see Part Twenty-Five for the drop-in replacement).
"""
query_vector = await self._get_query_embedding(query.query_text)
if query_vector is None:
return []
threshold = self._config.retrieval_semantic_min_similarity
results: list[RetrievalResult] = []
# Search short-term episodic memories
if not query.memory_types or MemoryType.EPISODIC in query.memory_types:
all_episodic = await self._episodic.get_all_short_term()
for entry in all_episodic:
if entry.embedding is None:
continue
score = _cosine_similarity(query_vector, entry.embedding)
if score >= threshold:
results.append(RetrievalResult(
memory_id=entry.entry_id,
memory_type=MemoryType.EPISODIC,
content=entry.content,
relevance_score=float(score),
importance=entry.importance,
timestamp=entry.timestamp,
source_store="episodic_short_term",
))
# Search semantic facts
if not query.memory_types or MemoryType.SEMANTIC in query.memory_types:
async with self._semantic._lock:
all_facts = list(self._semantic._facts.values())
for fact in all_facts:
if fact.embedding is None:
continue
score = _cosine_similarity(query_vector, fact.embedding)
if score >= threshold:
results.append(RetrievalResult(
memory_id=fact.fact_id,
memory_type=MemoryType.SEMANTIC,
content=fact.content,
relevance_score=float(score),
importance=fact.importance,
timestamp=None,
source_store="semantic",
))
return results
async def _structured_retrieval(
self, query: MemoryQuery
) -> list[RetrievalResult]:
"""
Perform structured metadata-based retrieval.
Handles entity filters and tag filters.
"""
results: list[RetrievalResult] = []
if query.entity_filter:
facts = await self._semantic.get_facts_about(query.entity_filter)
for fact in facts:
results.append(RetrievalResult(
memory_id=fact.fact_id,
memory_type=MemoryType.SEMANTIC,
content=fact.content,
relevance_score=fact.confidence,
importance=fact.importance,
timestamp=None,
source_store="semantic_entity",
))
if query.tag_filter:
procedures = await self._procedural.find_by_tags(query.tag_filter)
for proc in procedures:
results.append(RetrievalResult(
memory_id=proc.procedure_id,
memory_type=MemoryType.PROCEDURAL,
content=proc.description,
relevance_score=proc.success_rate,
importance=proc.success_rate,
timestamp=proc.last_used,
source_store="procedural",
))
return results
async def _temporal_retrieval(
self, query: MemoryQuery
) -> list[RetrievalResult]:
"""Retrieve episodic memories within the query's time range."""
if not query.time_range:
return []
start, end = query.time_range
entries = await self._episodic.retrieve_by_time_range(start, end)
return [
RetrievalResult(
memory_id=entry.entry_id,
memory_type=MemoryType.EPISODIC,
content=entry.content,
relevance_score=entry.importance,
importance=entry.importance,
timestamp=entry.timestamp,
source_store="episodic_temporal",
)
for entry in entries
]
async def _graph_retrieval(
self, query: MemoryQuery
) -> list[RetrievalResult]:
"""Retrieve memories by traversing the Memory Graph from a seed node."""
if not query.seed_node_id:
return []
related = await self._graph.find_related(
query.seed_node_id, max_hops=2
)
results: list[RetrievalResult] = []
async with self._graph._lock:
for node_id, weight, hops in related:
node = self._graph._nodes.get(node_id)
if not node:
continue
results.append(RetrievalResult(
memory_id=node_id,
memory_type=node.memory_type,
content="", # Content fetched lazily from the typed store
relevance_score=weight,
importance=node.importance,
timestamp=node.created_at,
source_store="graph",
graph_distance=hops,
))
return results
async def _expand_with_graph(
self,
results: list[RetrievalResult],
query: MemoryQuery,
) -> list[RetrievalResult]:
"""
Expand the result set by adding graph neighbors of the top results.
This implements associative retrieval: one memory triggers related
memories through the graph structure, mirroring how human memory
works. Graph-expanded results receive a discounted relevance score
(multiplied by 0.7) to rank them below direct matches.
"""
top_k = self._config.retrieval_graph_expansion_top_k
min_weight = self._config.retrieval_graph_expansion_min_weight
expanded_ids = {r.memory_id for r in results}
new_results: list[RetrievalResult] = []
for result in results[:top_k]:
neighbors = await self._graph.find_related(
result.memory_id,
max_hops=self._config.retrieval_graph_expansion_hops,
min_weight=min_weight,
)
async with self._graph._lock:
for neighbor_id, weight, _ in neighbors:
if neighbor_id in expanded_ids:
continue
node = self._graph._nodes.get(neighbor_id)
if not node:
continue
new_results.append(RetrievalResult(
memory_id=neighbor_id,
memory_type=node.memory_type,
content="", # Content fetched lazily from typed store
relevance_score=result.relevance_score * weight * 0.7,
importance=node.importance,
timestamp=node.created_at,
source_store="graph_expansion",
graph_distance=1,
))
expanded_ids.add(neighbor_id)
return results + new_results
def _rank_results(
self, results: list[RetrievalResult]
) -> list[RetrievalResult]:
"""
Apply multi-factor ranking to retrieval results.
The final score is a weighted combination of:
- relevance_score: semantic similarity or structured match quality
- importance: memory importance (from graph PageRank or heuristic)
- recency: exponential decay based on memory age
- retention: current Ebbinghaus retention score
All weights are configurable via UCMAConfig.
"""
cfg = self._config
now = time.time()
halflife_seconds = cfg.rank_recency_halflife_days * 86400.0
for result in results:
# Recency: exponential decay from memory creation time
if result.timestamp is not None:
age_seconds = now - result.timestamp
recency = math.exp(-age_seconds / halflife_seconds)
else:
recency = 0.5 # Unknown age: assume moderate recency
# Retention: current Ebbinghaus retention score
state = self._retention.get(result.memory_id)
retention = state.current_retention() if state else 0.5
result.relevance_score = (
cfg.rank_weight_relevance * result.relevance_score
+ cfg.rank_weight_importance * result.importance
+ cfg.rank_weight_recency * recency
+ cfg.rank_weight_retention * retention
)
results.sort(key=lambda r: r.relevance_score, reverse=True)
return results
def _deduplicate(
self, results: list[RetrievalResult]
) -> list[RetrievalResult]:
"""Remove duplicate results, keeping the highest-scoring copy."""
seen: dict[str, RetrievalResult] = {}
for result in results:
if (result.memory_id not in seen
or result.relevance_score
> seen[result.memory_id].relevance_score):
seen[result.memory_id] = result
return list(seen.values())
def _update_access_stats(self, results: list[RetrievalResult]) -> None:
"""
Update retention states for all retrieved memories.
Retrieval counts as a review event in the Ebbinghaus model,
increasing stability and resetting the decay clock.
"""
for result in results:
self._retention.reinforce(
result.memory_id,
self._config.retention_stability_boost_base,
)
CHAPTER TWENTY: THE FORGETTING ENGINE
Forgetting is not failure. It is a feature. The Forgetting Engine implements principled, mathematically grounded forgetting that keeps the memory system fast, focused, and free of noise. It is based on three well-established principles: the Ebbinghaus forgetting curve, the spacing effect, and importance weighting via PageRank.
The confidence-adjusted forgetting threshold for semantic facts is a particularly elegant mechanism. A fact with low confidence needs a much higher retention score to survive than a fact with high confidence. This means that uncertain facts are forgotten quickly unless they are frequently reinforced, while well-established facts persist even when rarely accessed. Procedural memories with high success rates are strongly protected -- we do not forget skills that work.
# ucma/forgetting.py
"""
Forgetting Engine: principled, importance-weighted memory pruning.
Implements the Ebbinghaus forgetting curve with PageRank-based importance
protection. Memories below the retention threshold are forgotten unless
they are structurally central to the Memory Graph (high PageRank score)
or are well-validated procedural memories.
"""
from __future__ import annotations
import logging
from ucma.config import UCMAConfig
from ucma.episodic_store import EpisodicMemoryStore
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import ProcedureStatus
from ucma.procedural_store import ProceduralMemoryStore
from ucma.retention import RetentionRegistry
from ucma.semantic_store import SemanticMemoryStore
logger = logging.getLogger(__name__)
class ForgettingEngine:
"""
Implements principled, importance-weighted forgetting across all stores.
Runs as part of the Sleep Consolidation cycle but can also be triggered
on-demand when memory pressure is high. All deletions are coordinated
with the Memory Graph to remove orphaned nodes and clean up edges.
"""
def __init__(
self,
config: UCMAConfig,
episodic_store: EpisodicMemoryStore,
semantic_store: SemanticMemoryStore,
procedural_store: ProceduralMemoryStore,
learning_store: LearningStore,
memory_graph: MemoryGraph,
retention_registry: RetentionRegistry,
) -> None:
self._config = config
self._episodic = episodic_store
self._semantic = semantic_store
self._procedural = procedural_store
self._learning = learning_store
self._graph = memory_graph
self._retention = retention_registry
async def run_forgetting_pass(self) -> dict[str, int]:
"""
Execute one pass of the forgetting algorithm across all stores.
Computes PageRank importance scores first to identify structurally
protected memories, then processes each store independently.
Returns a summary of how many memories were forgotten per store.
"""
logger.info("Starting forgetting pass...")
# Compute graph importance scores once for the entire pass.
# This is the most expensive step but must be done before any
# deletions to ensure protection decisions are consistent.
importance_scores = await self._graph.compute_importance_scores()
counts = {
"episodic": await self._forget_episodic(importance_scores),
"semantic": await self._forget_semantic(importance_scores),
"procedural": await self._forget_procedural(importance_scores),
"learning": await self._forget_learning(importance_scores),
}
logger.info(
"Forgetting pass complete: %s",
{k: v for k, v in counts.items() if v > 0},
)
return counts
def _should_forget(
self,
memory_id: str,
importance_scores: dict[str, float],
retention_threshold: float | None = None,
) -> bool:
"""
Determine whether a specific memory should be forgotten.
A memory is protected if:
1. Its PageRank importance score is above the protection threshold.
2. Its current Ebbinghaus retention is above the threshold.
If the memory has no retention state, it is treated as having
full retention (newly created memories are never immediately forgotten).
"""
threshold = (
retention_threshold
if retention_threshold is not None
else self._config.forgetting_retention_threshold
)
# Protect structurally important memories (high PageRank)
graph_importance = importance_scores.get(memory_id, 0.0)
if graph_importance > self._config.forgetting_importance_protection:
return False
# Check Ebbinghaus retention
state = self._retention.get(memory_id)
if state is None:
return False # No retention state: assume newly created, protect it
return state.current_retention() < threshold
async def _forget_episodic(
self, importance_scores: dict[str, float]
) -> int:
"""Forget eligible medium-term episodic memories."""
count = 0
entry_ids = await self._episodic.get_all_medium_term_ids()
for entry_id in entry_ids:
if self._should_forget(entry_id, importance_scores):
deleted = await self._episodic.delete(entry_id)
if deleted:
self._retention.remove(entry_id)
await self._graph.remove_node(entry_id)
count += 1
return count
async def _forget_semantic(
self, importance_scores: dict[str, float]
) -> int:
"""
Forget eligible semantic facts.
Low-confidence facts are forgotten more aggressively: the effective
retention threshold is scaled inversely by confidence. A fact with
confidence 0.3 must have a retention score three times higher than
the base threshold to survive, while a fact with confidence 1.0
only needs to meet the base threshold.
"""
count = 0
fact_ids = await self._semantic.get_all_fact_ids()
for fact_id in fact_ids:
fact = await self._semantic.get_fact(fact_id)
if fact is None:
continue
# Adjust threshold inversely by confidence
adjusted_threshold = (
self._config.forgetting_retention_threshold
/ max(fact.confidence, 0.10)
)
if self._should_forget(
fact_id, importance_scores, adjusted_threshold
):
deleted = await self._semantic.delete_fact(fact_id)
if deleted:
self._retention.remove(fact_id)
await self._graph.remove_node(fact_id)
count += 1
return count
async def _forget_procedural(
self, importance_scores: dict[str, float]
) -> int:
"""
Forget eligible procedural memories.
Active procedures with success rates above 50% are strongly
protected. Only draft procedures or those with very low success
rates are candidates for forgetting.
"""
count = 0
proc_ids = await self._procedural.get_all_procedure_ids()
for proc_id in proc_ids:
proc = await self._procedural.get_procedure(proc_id)
if proc is None:
continue
# Protect active procedures that work well
if (proc.status == ProcedureStatus.ACTIVE
and proc.success_rate > 0.5):
continue
if self._should_forget(proc_id, importance_scores):
deleted = await self._procedural.delete_procedure(proc_id)
if deleted:
self._retention.remove(proc_id)
await self._graph.remove_node(proc_id)
count += 1
return count
async def _forget_learning(
self, importance_scores: dict[str, float]
) -> int:
"""
Forget eligible learning records.
Validated learnings with high confidence are strongly protected.
"""
count = 0
learning_ids = await self._learning.get_all_learning_ids()
for learning_id in learning_ids:
learning = await self._learning.get_learning(learning_id)
if learning is None:
continue
# Protect validated, high-confidence learnings
if learning.validated and learning.confidence > 0.6:
continue
if self._should_forget(learning_id, importance_scores):
deleted = await self._learning.delete_learning(learning_id)
if deleted:
self._retention.remove(learning_id)
await self._graph.remove_node(learning_id)
count += 1
return count
CHAPTER TWENTY-ONE: THE SLEEP CONSOLIDATION ENGINE
This is where UCMA truly distinguishes itself from every existing memory system. The Sleep Consolidation Engine is a background process that runs periodically -- during low-activity periods, analogous to biological sleep -- and performs four critical operations: compression, abstraction, reinforcement, and pruning.
Compression takes groups of related short-term episodic memories and replaces them with a more compact representation. Abstraction takes patterns found across multiple episodes and extracts them as semantic facts. Reinforcement strengthens the connections between memories that co-occur frequently. Pruning removes memories that are old, unimportant, and rarely accessed, following the Ebbinghaus forgetting curve.
The engine is designed to be interruptible: if the agent needs to respond to a request, consolidation pauses gracefully and resumes when the agent is idle.
# ucma/consolidation.py
"""
Sleep Consolidation Engine: background memory maintenance and abstraction.
Runs periodically to compress episodic memories, extract semantic facts,
reinforce important connections, and prune forgotten memories. Designed
to be interruptible: consolidation pauses when the agent is active and
resumes when it becomes idle again.
"""
from __future__ import annotations
import asyncio
import logging
import time
import uuid
from ucma.config import UCMAConfig
from ucma.episodic_store import EpisodicMemoryStore
from ucma.forgetting import ForgettingEngine
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import (
EpisodicEntry,
MemoryEdge,
MemoryNode,
MemoryType,
SemanticFact,
)
from ucma.procedural_store import ProceduralMemoryStore
from ucma.providers.base import (
EmbeddingProvider,
FactExtractionProvider,
SummarizationProvider,
)
from ucma.retention import RetentionRegistry
from ucma.semantic_store import SemanticMemoryStore
logger = logging.getLogger(__name__)
class SleepConsolidationEngine:
"""
The background consolidation process that transforms raw episodic
memories into structured long-term knowledge.
The engine runs as an asyncio Task. It can be interrupted at any
point by setting the _interrupt event, which causes the current
cycle to finish its current phase and then pause until the event
is cleared. This ensures consolidation never adds latency to the
agent's critical response path.
"""
def __init__(
self,
config: UCMAConfig,
episodic_store: EpisodicMemoryStore,
semantic_store: SemanticMemoryStore,
procedural_store: ProceduralMemoryStore,
learning_store: LearningStore,
memory_graph: MemoryGraph,
retention_registry: RetentionRegistry,
forgetting_engine: ForgettingEngine,
embedder: EmbeddingProvider,
summarizer: SummarizationProvider,
fact_extractor: FactExtractionProvider,
) -> None:
self._config = config
self._episodic = episodic_store
self._semantic = semantic_store
self._procedural = procedural_store
self._learning = learning_store
self._graph = memory_graph
self._retention = retention_registry
self._forgetting = forgetting_engine
self._embedder = embedder
self._summarizer = summarizer
self._fact_extractor = fact_extractor
self._task: asyncio.Task | None = None
self._interrupt = asyncio.Event()
self._stop = asyncio.Event()
self._wake_event = asyncio.Event()
async def start(self) -> None:
"""Start the background consolidation loop as an asyncio Task."""
self._stop.clear()
self._task = asyncio.create_task(
self._consolidation_loop(), name="ucma-consolidation"
)
logger.info("Sleep Consolidation Engine started.")
async def stop(self) -> None:
"""
Gracefully stop the consolidation engine.
Signals the loop to exit and waits for the current cycle to finish.
"""
self._stop.set()
self._wake_event.set() # Wake up if sleeping
if self._task and not self._task.done():
try:
await asyncio.wait_for(self._task, timeout=30.0)
except asyncio.TimeoutError:
self._task.cancel()
logger.warning(
"Consolidation engine did not stop within 30s; cancelled."
)
logger.info("Sleep Consolidation Engine stopped.")
def interrupt(self) -> None:
"""
Signal that the agent is active. The engine will pause after
completing its current phase and resume when wake_now() is called.
"""
self._interrupt.set()
def wake_now(self) -> None:
"""
Trigger an immediate consolidation cycle (e.g., at session end).
Also clears any active interrupt signal.
"""
self._interrupt.clear()
self._wake_event.set()
async def _consolidation_loop(self) -> None:
"""
The main background loop. Sleeps for the configured interval,
then runs one consolidation cycle. Respects stop and interrupt signals.
"""
while not self._stop.is_set():
# Wait for the interval or a wake signal
try:
await asyncio.wait_for(
self._wake_event.wait(),
timeout=self._config.consolidation_interval_seconds,
)
self._wake_event.clear()
except asyncio.TimeoutError:
pass # Normal: interval elapsed, run consolidation
if self._stop.is_set():
break
# Clear interrupt before starting so new interrupts during
# the cycle are detected correctly
self._interrupt.clear()
try:
await self._run_cycle()
except Exception as exc:
logger.error(
"Consolidation cycle failed with error: %s", exc,
exc_info=True,
)
async def _run_cycle(self) -> None:
"""Execute one full consolidation cycle: compress, abstract, reinforce, prune."""
cycle_start = time.time()
logger.info("=== Sleep Consolidation Cycle Starting ===")
# Phase 1: Compress expired short-term episodic memories
expired_entries = await self._episodic.get_expired_short_term()
promoted_count = await self._compress_episodes(expired_entries)
logger.info(
"Phase 1 (Compress): %d entries expired, %d promoted to medium-term.",
len(expired_entries), promoted_count,
)
if self._interrupt.is_set():
logger.info("Consolidation interrupted after Phase 1.")
return
# Phase 2: Abstract patterns into semantic facts
abstracted_count = await self._abstract_to_semantic()
logger.info(
"Phase 2 (Abstract): %d entries processed for fact extraction.",
abstracted_count,
)
if self._interrupt.is_set():
logger.info("Consolidation interrupted after Phase 2.")
return
# Phase 3: Reinforce connections via PageRank
await self._reinforce_connections()
logger.info("Phase 3 (Reinforce): PageRank importance scores updated.")
if self._interrupt.is_set():
logger.info("Consolidation interrupted after Phase 3.")
return
# Phase 4: Prune forgotten memories
forgotten_counts = await self._forgetting.run_forgetting_pass()
logger.info("Phase 4 (Prune): %s memories forgotten.", forgotten_counts)
elapsed = time.time() - cycle_start
logger.info(
"=== Sleep Consolidation Cycle Complete (%.2fs) ===", elapsed
)
async def _compress_episodes(
self, expired_entries: list[EpisodicEntry]
) -> int:
"""
Compress expired short-term episodic memories.
High-importance entries are summarized and promoted to medium-term
storage. Low-importance entries are discarded. The importance
threshold is adaptive: when many entries expire at once, we raise
the bar to avoid bloating medium-term storage.
"""
if not expired_entries:
return 0
n = len(expired_entries)
# Adaptive threshold: raise bar when many entries expire at once
volume_penalty = min(
(n / 100.0) * self._config.compression_volume_penalty_max,
self._config.compression_volume_penalty_max,
)
threshold = self._config.compression_importance_base + volume_penalty
promoted = 0
for entry in expired_entries:
if entry.importance < threshold:
# Low importance: delete from short-term without promoting
await self._episodic.delete(entry.entry_id)
continue
# Summarize the entry content
summary = await self._summarizer.summarize(
entry.content,
max_tokens=self._config.consolidation_summary_max_tokens,
)
# Promote to medium-term with summary
entry.summary = summary
entry.tier = "medium_term"
await self._episodic.store(entry)
# Register or update in retention registry
state = self._retention.get(entry.entry_id)
if state is None:
self._retention.register(entry.entry_id, initial_stability=1.0)
promoted += 1
return promoted
async def _abstract_to_semantic(self) -> int:
"""
Scan recent medium-term episodic memories and extract semantic facts.
This is the core abstraction step: turning specific episodic content
into timeless semantic knowledge. Each extracted fact is stored in
the Semantic Memory Store and connected to its source episode via
a "derived_from" edge in the Memory Graph.
"""
now = time.time()
# Process the last 24 hours of medium-term memories
recent_entries = await self._episodic.retrieve_by_time_range(
start_time=now - 86400.0,
end_time=now,
)
processed = 0
for entry in recent_entries:
# Skip entries that have already been abstracted
if "abstracted" in entry.tags:
continue
raw_facts = await self._fact_extractor.extract_facts(entry.content)
for raw in raw_facts:
subject = raw.get("subject", "").strip()
predicate = raw.get("predicate", "").strip()
obj = raw.get("object", "").strip()
content = raw.get("content", "").strip()
confidence = float(raw.get("confidence", 0.5))
if not (subject and predicate and obj and content):
continue
# Compute embedding for the fact's natural language content
embeddings = await self._embedder.embed([content])
embedding = embeddings[0] if embeddings else None
fact = SemanticFact(
subject=subject,
predicate=predicate,
object_value=obj,
content=content,
confidence=confidence,
source_episode_ids=[entry.entry_id],
embedding=embedding,
importance=entry.importance,
)
fact_id = await self._semantic.assert_fact(fact)
# Register the fact as a node in the Memory Graph
fact_node = MemoryNode(
node_id=fact_id,
memory_type=MemoryType.SEMANTIC,
importance=entry.importance,
created_at=time.time(),
last_accessed=time.time(),
)
await self._graph.add_node(fact_node)
# Connect fact to its source episode
async with self._graph._lock:
episode_in_graph = entry.entry_id in self._graph._nodes
if episode_in_graph:
edge = MemoryEdge(
source_id=fact_id,
target_id=entry.entry_id,
edge_type="derived_from",
weight=0.8,
)
await self._graph.add_edge(edge)
# Register retention state for the new fact
if self._retention.get(fact_id) is None:
self._retention.register(fact_id, initial_stability=2.0)
# Mark this entry as abstracted to avoid reprocessing
entry.tags.append("abstracted")
processed += 1
return processed
async def _reinforce_connections(self) -> None:
"""
Update node importance scores based on PageRank and strengthen
edges between frequently co-accessed memories.
This implements Hebb's rule at the graph level: memories that are
frequently retrieved together develop stronger connections, making
them more likely to be retrieved together in the future.
"""
importance_scores = await self._graph.compute_importance_scores()
# Update node importance in the graph based on PageRank scores.
# Scale by 10 to map PageRank's small values (typically 0.001-0.01
# for large graphs) to a more useful 0-1 range.
for node_id, score in importance_scores.items():
scaled = min(score * 10.0, 1.0)
await self._graph.update_node_importance(node_id, scaled)
CHAPTER TWENTY-TWO: THE MEMORY CONTROLLER
The Memory Controller is the top-level orchestrator of UCMA. It provides a clean, simple API to the agent while managing all the complexity of the underlying memory architecture. The agent only needs to call observe(),remember(), learn(), and new_session(). Everything else is handled internally.
# ucma/controller.py
"""
Memory Controller: the top-level orchestrator of UCMA.
Provides a clean, simple API to the agent while managing all the complexity
of the underlying memory architecture. The observe() and remember() methods
form the fundamental read-write interface. All other methods are lifecycle
management (new_session, start, stop) or convenience wrappers.
"""
from __future__ import annotations
import logging
import time
import uuid
from ucma.config import UCMAConfig
from ucma.consolidation import SleepConsolidationEngine
from ucma.episodic_store import EpisodicMemoryStore
from ucma.forgetting import ForgettingEngine
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import (
EpisodicEntry,
LearningRecord,
LearningType,
MemoryEdge,
MemoryNode,
MemoryQuery,
MemoryType,
QueryMode,
RetrievalResult,
)
from ucma.procedural_store import ProceduralMemoryStore
from ucma.providers.base import EmbeddingProvider, ImportanceClassifierProvider
from ucma.retention import RetentionRegistry
from ucma.retrieval import UnifiedRetrievalLayer
from ucma.semantic_store import SemanticMemoryStore
from ucma.sensory_buffer import SensoryBuffer
from ucma.working_memory import WorkingMemoryManager
logger = logging.getLogger(__name__)
class MemoryController:
"""
The central orchestrator of UCMA.
Provides a clean, simple API to the agent while managing all the
complexity of the underlying memory architecture. The agent only
needs to call observe(), remember(), learn(), and new_session().
Everything else is handled internally.
"""
def __init__(
self,
config: UCMAConfig,
agent_id: str,
sensory_buffer: SensoryBuffer,
working_memory: WorkingMemoryManager,
episodic_store: EpisodicMemoryStore,
semantic_store: SemanticMemoryStore,
procedural_store: ProceduralMemoryStore,
learning_store: LearningStore,
memory_graph: MemoryGraph,
retrieval_layer: UnifiedRetrievalLayer,
consolidation_engine: SleepConsolidationEngine,
forgetting_engine: ForgettingEngine,
retention_registry: RetentionRegistry,
embedder: EmbeddingProvider,
importance_classifier: ImportanceClassifierProvider,
) -> None:
self._config = config
self._agent_id = agent_id
self._sensory = sensory_buffer
self._working = working_memory
self._episodic = episodic_store
self._semantic = semantic_store
self._procedural = procedural_store
self._learning = learning_store
self._graph = memory_graph
self._retrieval = retrieval_layer
self._consolidation = consolidation_engine
self._forgetting = forgetting_engine
self._retention = retention_registry
self._embedder = embedder
self._importance_classifier = importance_classifier
self._current_session_id: str = str(uuid.uuid4())
async def start(self) -> None:
"""Start background services (consolidation engine)."""
await self._consolidation.start()
logger.info(
"MemoryController started for agent '%s', session '%s'.",
self._agent_id,
self._current_session_id,
)
async def stop(self) -> None:
"""Gracefully stop all background services."""
await self._consolidation.stop()
logger.info("MemoryController stopped for agent '%s'.", self._agent_id)
async def observe(
self,
content: str,
source: str,
modality: str = "text",
) -> str:
"""
The primary input method: observe a new piece of information.
Called every time the agent receives input from the user, a tool,
or the environment. Classifies importance, creates an episodic
memory if the content is important enough, computes its embedding,
and registers it in the Memory Graph.
Returns the entry_id of the created sensory entry.
"""
# Step 1: Place in sensory buffer (always, regardless of importance)
entry = await self._sensory.ingest(content, source, modality)
# Step 2: Classify importance
importance = await self._importance_classifier.classify_importance(
content, source
)
# Step 3: Create episodic memory if important enough
if importance > 0.2:
await self._create_episodic_memory(
entry.entry_id, content, source, importance
)
logger.debug(
"Observed: source=%s importance=%.2f id=%s",
source, importance, entry.entry_id,
)
return entry.entry_id
async def _create_episodic_memory(
self,
entry_id: str,
content: str,
source: str,
importance: float,
) -> EpisodicEntry:
"""
Transform a sensory observation into a full episodic memory.
Computes the content embedding, stores the entry in the episodic
store, registers it as a node in the Memory Graph, and creates
a retention state for it.
"""
# Compute embedding for semantic search
embeddings = await self._embedder.embed([content])
embedding = embeddings[0] if embeddings else None
episodic = EpisodicEntry(
entry_id=entry_id,
session_id=self._current_session_id,
agent_id=self._agent_id,
timestamp=time.time(),
content=content,
importance=importance,
participants=[source],
embedding=embedding,
tier="short_term",
)
await self._episodic.store(episodic)
# Register in the Memory Graph
node = MemoryNode(
node_id=entry_id,
memory_type=MemoryType.EPISODIC,
importance=importance,
created_at=episodic.timestamp,
last_accessed=episodic.timestamp,
)
await self._graph.add_node(node)
# Register retention state
self._retention.register(entry_id, initial_stability=1.0)
return episodic
async def remember(self, query: MemoryQuery) -> list[RetrievalResult]:
"""
Retrieve memories relevant to the given query.
This is the primary read method. Results are ranked by the
multi-factor ranking formula and injected into the Working Memory
staging zone so the LLM can access them in its next response.
"""
results = await self._retrieval.retrieve(query)
# Inject top results into working memory staging zone
for result in results[:3]:
if result.content:
await self._working.add(
item_id=result.memory_id,
content=result.content,
zone="staging",
importance=result.importance,
)
return results
async def learn(
self,
insight: str,
learning_type: LearningType,
evidence_ids: list[str],
applicability_tags: list[str],
confidence: float = 0.5,
impact_score: float = 0.5,
) -> str:
"""
Store a new learning derived from experience.
Called by the agent or the consolidation engine when a significant
insight has been extracted from episodes. The learning is registered
in the Memory Graph and connected to its evidence nodes.
"""
learning_id = str(uuid.uuid4())
learning = LearningRecord(
learning_id=learning_id,
learning_type=learning_type,
title=insight[:80],
insight=insight,
evidence=evidence_ids,
applicability=applicability_tags,
confidence=confidence,
impact_score=impact_score,
)
await self._learning.store_learning(learning)
# Register in the Memory Graph as a high-importance node
node = MemoryNode(
node_id=learning_id,
memory_type=MemoryType.LEARNING,
importance=impact_score,
created_at=learning.created_at,
last_accessed=learning.created_at,
)
await self._graph.add_node(node)
# Connect to evidence nodes in the graph
for evidence_id in evidence_ids:
async with self._graph._lock:
evidence_exists = evidence_id in self._graph._nodes
if evidence_exists:
edge = MemoryEdge(
source_id=learning_id,
target_id=evidence_id,
edge_type="derived_from",
weight=0.8,
)
await self._graph.add_edge(edge)
# Register retention state with high initial stability
self._retention.register(learning_id, initial_stability=3.0)
logger.info(
"Stored learning '%s' type=%s confidence=%.2f",
insight[:60], learning_type.value, confidence,
)
return learning_id
async def new_session(self) -> str:
"""
Start a new conversation session.
Clears the active and staging zones of working memory (the system
zone, containing core instructions, is preserved). Triggers the
consolidation engine to process any pending memories from the
previous session. Returns the new session ID.
"""
old_session = self._current_session_id
self._current_session_id = str(uuid.uuid4())
# Clear dynamic working memory zones
cleared_active = await self._working.clear_zone("active")
cleared_staging = await self._working.clear_zone("staging")
logger.info(
"New session %s started (old: %s). Cleared %d active + %d staging items.",
self._current_session_id, old_session,
cleared_active, cleared_staging,
)
# Trigger consolidation of the previous session's memories
self._consolidation.wake_now()
return self._current_session_id
def trigger_consolidation(self) -> None:
"""
Manually trigger a consolidation cycle.
Call this when the agent knows it is entering an idle period
(e.g., end of business hours, waiting for a long-running task).
"""
self._consolidation.wake_now()
def stats(self) -> dict[str, object]:
"""Return a comprehensive statistics summary for observability."""
return {
"agent_id": self._agent_id,
"session_id": self._current_session_id,
"sensory_buffer": {
"size": self._sensory.size,
"total_ingested": self._sensory.total_ingested,
},
"working_memory": self._working.usage_summary(),
"episodic_store": self._episodic.stats(),
"semantic_store": self._semantic.stats(),
"procedural_store": self._procedural.stats(),
"learning_store": self._learning.stats(),
"memory_graph": self._graph.stats(),
"retention_registry_size": len(self._retention),
}
CHAPTER TWENTY-THREE: PACKAGE INIT FILES
# ucma/__init__.py
"""
Unified Cognitive Memory Architecture (UCMA).
A complete, biologically-inspired memory system for LLM agents.
Import the build_memory_controller factory from ucma.main for
the simplest way to get started.
"""
from ucma.config import UCMAConfig
from ucma.controller import MemoryController
from ucma.models import (
EpisodicEntry,
LearningRecord,
LearningType,
MemoryQuery,
MemoryType,
QueryMode,
RetrievalResult,
SemanticFact,
)
__all__ = [
"UCMAConfig",
"MemoryController",
"MemoryQuery",
"MemoryType",
"QueryMode",
"RetrievalResult",
"EpisodicEntry",
"SemanticFact",
"LearningRecord",
"LearningType",
]
# ucma/providers/__init__.py
"""Provider protocol definitions and concrete implementations."""
from ucma.providers.base import (
EmbeddingProvider,
FactExtractionProvider,
ImportanceClassifierProvider,
SummarizationProvider,
)
__all__ = [
"EmbeddingProvider",
"SummarizationProvider",
"FactExtractionProvider",
"ImportanceClassifierProvider",
]
CHAPTER TWENTY-FOUR: WIRING AND EXAMPLE USAGE
The build_memory_controller() factory function wires all components together with a single function call. This is the recommended way to instantiate UCMA in production. The factory handles all dependency injection, ensuring that every component receives exactly the dependencies it needs.
# ucma/main.py
"""
UCMA factory and end-to-end usage example.
The build_memory_controller() factory function wires all components together
with a single function call. The example at the bottom of this file
demonstrates UCMA across two simulated sessions, showing how memory
accumulates, consolidates, and improves agent responses over time.
To run:
export OPENAI_API_KEY="sk-..."
python -m ucma.main
"""
from __future__ import annotations
import asyncio
import logging
import os
import sys
import structlog
from ucma.config import UCMAConfig
from ucma.consolidation import SleepConsolidationEngine
from ucma.controller import MemoryController
from ucma.episodic_store import EpisodicMemoryStore
from ucma.forgetting import ForgettingEngine
from ucma.learning_store import LearningStore
from ucma.memory_graph import MemoryGraph
from ucma.models import LearningType, MemoryQuery, MemoryType, QueryMode
from ucma.procedural_store import ProceduralMemoryStore
from ucma.providers.openai_provider import (
OpenAIEmbeddingProvider,
OpenAIFactExtractionProvider,
OpenAIImportanceClassifier,
OpenAISummarizationProvider,
)
from ucma.retention import RetentionRegistry
from ucma.retrieval import UnifiedRetrievalLayer
from ucma.semantic_store import SemanticMemoryStore
from ucma.sensory_buffer import SensoryBuffer
from ucma.working_memory import WorkingMemoryManager
def _configure_logging() -> None:
"""Configure structured logging for the UCMA system."""
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
stream=sys.stdout,
)
def build_memory_controller(
agent_id: str,
config: UCMAConfig | None = None,
api_key: str | None = None,
) -> MemoryController:
"""
Factory function: wire all UCMA components and return a MemoryController.
Parameters
----------
agent_id:
A unique identifier for this agent instance. Used to tag
episodic memories so that multi-agent systems can distinguish
memories from different agents.
config:
A UCMAConfig instance. If None, the default configuration is used.
api_key:
OpenAI API key. If None, reads from the OPENAI_API_KEY environment
variable.
"""
cfg = config or UCMAConfig()
key = api_key or os.environ.get("OPENAI_API_KEY", "")
# ------------------------------------------------------------------ #
# Instantiate providers #
# ------------------------------------------------------------------ #
embedder = OpenAIEmbeddingProvider(model=cfg.embedding_model, api_key=key)
summarizer = OpenAISummarizationProvider(api_key=key)
fact_extractor = OpenAIFactExtractionProvider(api_key=key)
importance_classifier = OpenAIImportanceClassifier(api_key=key)
# ------------------------------------------------------------------ #
# Instantiate stores #
# ------------------------------------------------------------------ #
sensory_buffer = SensoryBuffer(capacity=cfg.sensory_buffer_capacity)
working_memory = WorkingMemoryManager(config=cfg)
episodic_store = EpisodicMemoryStore(short_term_ttl_hours=cfg.short_term_ttl_hours)
semantic_store = SemanticMemoryStore()
procedural_store = ProceduralMemoryStore()
learning_store = LearningStore()
retention_registry = RetentionRegistry()
# ------------------------------------------------------------------ #
# Instantiate graph #
# ------------------------------------------------------------------ #
memory_graph = MemoryGraph(config=cfg)
# ------------------------------------------------------------------ #
# Instantiate retrieval layer #
# ------------------------------------------------------------------ #
retrieval_layer = UnifiedRetrievalLayer(
config=cfg,
episodic_store=episodic_store,
semantic_store=semantic_store,
procedural_store=procedural_store,
learning_store=learning_store,
memory_graph=memory_graph,
embedder=embedder,
retention_registry=retention_registry,
)
# ------------------------------------------------------------------ #
# Instantiate forgetting engine #
# ------------------------------------------------------------------ #
forgetting_engine = ForgettingEngine(
config=cfg,
episodic_store=episodic_store,
semantic_store=semantic_store,
procedural_store=procedural_store,
learning_store=learning_store,
memory_graph=memory_graph,
retention_registry=retention_registry,
)
# ------------------------------------------------------------------ #
# Instantiate sleep consolidation engine #
# ------------------------------------------------------------------ #
consolidation_engine = SleepConsolidationEngine(
config=cfg,
episodic_store=episodic_store,
semantic_store=semantic_store,
procedural_store=procedural_store,
learning_store=learning_store,
memory_graph=memory_graph,
retention_registry=retention_registry,
forgetting_engine=forgetting_engine,
embedder=embedder,
summarizer=summarizer,
fact_extractor=fact_extractor,
)
# ------------------------------------------------------------------ #
# Instantiate and return the Memory Controller #
# ------------------------------------------------------------------ #
return MemoryController(
config=cfg,
agent_id=agent_id,
sensory_buffer=sensory_buffer,
working_memory=working_memory,
episodic_store=episodic_store,
semantic_store=semantic_store,
procedural_store=procedural_store,
learning_store=learning_store,
memory_graph=memory_graph,
retrieval_layer=retrieval_layer,
consolidation_engine=consolidation_engine,
forgetting_engine=forgetting_engine,
retention_registry=retention_registry,
embedder=embedder,
importance_classifier=importance_classifier,
)
async def run_example() -> None:
"""
End-to-end example: two sessions with a coding assistant agent.
Session 1: Michael asks for help implementing JWT authentication
in FastAPI. They encounter and fix a token expiry bug. The agent
stores a learning from this experience.
Between sessions: the Sleep Consolidation Engine runs, compressing
episodic memories and extracting semantic facts.
Session 2: Michael returns and asks about JWT. The agent recalls
the previous session's context and the learning about token expiry,
providing a better response than a stateless LLM could.
"""
_configure_logging()
logger = logging.getLogger(__name__)
mc = build_memory_controller(agent_id="coding-assistant-001")
await mc.start()
try:
# ---------------------------------------------------------------- #
# SESSION 1 #
# ---------------------------------------------------------------- #
logger.info("--- Session 1 Start ---")
session1_id = mc._current_session_id
obs1 = await mc.observe(
"Michael is building a FastAPI application with JWT authentication.",
source="user",
)
obs2 = await mc.observe(
"The JWT token expiry is set to 30 minutes but users are being "
"logged out after only 5 minutes. Suspected clock skew issue.",
source="user",
)
obs3 = await mc.observe(
"Fixed: added a 60-second leeway to the JWT decode call to "
"handle clock skew between servers. Issue resolved.",
source="tool_result",
)
# Store a learning from this session
learning_id = await mc.learn(
insight=(
"When JWT tokens expire unexpectedly, check for clock skew "
"between the issuing server and the validating server. "
"Adding a leeway parameter to jwt.decode() resolves most cases."
),
learning_type=LearningType.FAILURE_ANALYSIS,
evidence_ids=[obs1, obs2, obs3],
applicability_tags=["jwt", "authentication", "fastapi", "debugging"],
confidence=0.85,
impact_score=0.80,
)
logger.info("Session 1 complete. Learning stored: %s", learning_id)
# Trigger consolidation between sessions
await mc.new_session()
# Give consolidation a moment to start (in real usage, hours pass)
await asyncio.sleep(0.5)
# ---------------------------------------------------------------- #
# SESSION 2 #
# ---------------------------------------------------------------- #
logger.info("--- Session 2 Start ---")
await mc.observe(
"Michael is back. He wants to add refresh token support to his "
"FastAPI JWT implementation.",
source="user",
)
# Retrieve memories relevant to the new task
results = await mc.remember(
MemoryQuery(
query_text="JWT authentication FastAPI token issues",
mode=QueryMode.HYBRID,
max_results=5,
include_graph_neighbors=True,
)
)
logger.info("Session 2 retrieved %d memories:", len(results))
for i, result in enumerate(results, 1):
logger.info(
" [%d] type=%-10s score=%.3f store=%-20s content=%.80s",
i,
result.memory_type.value,
result.relevance_score,
result.source_store,
result.content,
)
# Print system stats
stats = mc.stats()
logger.info("System stats: %s", stats)
finally:
await mc.stop()
if __name__ == "__main__":
asyncio.run(run_example())
CHAPTER TWENTY-FIVE: SPEED -- THE ENGINEERING OF FAST MEMORY
A memory system that is architecturally correct but too slow to use in real time is worthless. LLM applications have strict latency requirements: users expect responses in seconds, not minutes. The retrieval system must return results in milliseconds, not seconds. This section addresses the engineering choices that make UCMA fast and describes the path from the in-memory implementation shown here to a production-scale deployment.
The most important speed decision is the choice of vector index. For semantic search over millions of embeddings, exact nearest-neighbor search is too slow -- it requires computing the distance to every stored vector. The solution is Approximate Nearest Neighbor (ANN) search using the HNSW (Hierarchical Navigable Small World) algorithm. HNSW builds a multi-layer graph where each layer is a progressively sparser subset of the data. Queries start at the top layer (sparse, fast to traverse) and progressively descend to lower layers (denser, more precise). This gives sub-millisecond search times even over tens of millions of vectors, with recall rates above 95%.
The current implementation uses exact cosine similarity search, which is correct for any store size and is the right choice for development and testing. Replacing it with HNSW requires only swapping the inner loop inUnifiedRetrievalLayer._semantic_retrieval() with a call to an ANN index. The hnswlib library provides a drop-in Python implementation:
# This replaces the inner loop in _semantic_retrieval() for the episodic store.
# Install: pip install hnswlib
import hnswlib
# Build the index once (or load from disk):
dim = cfg.embedding_dim
index = hnswlib.Index(space="cosine", dim=dim)
index.init_index(max_elements=1_000_000, ef_construction=200, M=16)
# Add all episodic embeddings to the index:
for entry in all_episodic:
if entry.embedding:
index.add_items(
[entry.embedding],
[hash(entry.entry_id) % (2**31)], # hnswlib requires int labels
)
# Query:
index.set_ef(50) # Higher ef = better recall, slower query
labels, distances = index.knn_query([query_vector], k=10)
# distances are L2 in cosine space; convert: similarity = 1 - distance/2
The second speed decision is the embedding cache. The EmbeddingCache implemented in Part Seventeen eliminates redundant embedding model calls for repeated queries. In a session where the user asks several related questions, the cache hit rate typically exceeds 60%, reducing embedding API costs and latency by more than half.
The third speed decision is query routing. The QueryRouter classifies queries in microseconds using pure string matching, before any expensive operations are performed. A query that contains "yesterday" only hits the temporal index. A query that starts with "how to" only hits the procedural store. A query that starts with "what do you know about" only hits the entity index. This selective routing dramatically reduces the number of store accesses per query.
The fourth speed decision is asynchronous consolidation. The Sleep Consolidation Engine runs in a background asyncio Task and never blocks the main agent loop. Consolidation is a batch operation that can tolerate latency; retrieval cannot. By separating these concerns into different execution contexts, UCMA ensures that consolidation never adds latency to the critical path.
CHAPTER TWENTY-SIX: WHAT IS STILL MISSING -- HONEST LIMITATIONS
UCMA as described here is a substantial advance over current approaches, but intellectual honesty requires acknowledging what it does not yet solve and where future research must go.
The first open problem is importance classification. The heuristic-based importance classifier described here is fast but imprecise. Accurately predicting the importance of a piece of information is extremely hard: what seems trivial now may turn out to be critical later. A truly intelligent importance classifier would need to understand the agent's goals, the user's context, and the long-term trajectory of the work -- which is itself an LLM-level reasoning task. Running a full LLM call for every piece of incoming information is too expensive. This is an area where specialized, fine-tuned small models could make a major contribution.
The second open problem is memory interference. In human memory, similar memories can interfere with each other -- remembering one thing makes it harder to remember a related but distinct thing. UCMA does not model this. In fact, UCMA's graph-based associative retrieval could exacerbate interference by always retrieving similar memories together. A future version of UCMA would need a mechanism to detect and manage interference between memories that are similar but distinct.
The third open problem is multi-agent memory sharing. UCMA as described is a per-agent memory system. In a multi-agent system, agents need to share some memories (shared world knowledge, shared task state) while keeping others private (agent-specific strategies, private user information). Designing a principled access control and synchronization layer for shared memory in a multi-agent UCMA is a significant engineering and research challenge.
The fourth open problem is memory verification and hallucination prevention. When the agent retrieves a memory and uses it in its response, it may be retrieving a memory that was itself generated by the LLM and contains hallucinated content. UCMA stores what the agent observed and what it generated, without distinguishing between ground truth and confabulation. A future version would need a provenance system that tracks the original source of every memory and flags memories derived from LLM generation rather than external observation.
The fifth open problem is the cold start problem. A new agent with no memories is no better than a stateless LLM. Building up a rich, useful memory store takes time and interaction. Future work could explore pre-populating the semantic memory store from domain-specific knowledge bases, or using transfer learning to initialize a new agent's memory from the aggregated (anonymized) memories of many previous agents in the same domain.
The sixth and perhaps most profound open problem is the question of what memories mean to an LLM. Human memories are experienced -- they have emotional valence, they are connected to bodily sensations, they exist in the context of a continuous subjective experience. LLM memories are text stored in a database. Whether this difference matters for the quality and utility of the memory system is an open philosophical and empirical question. What we can say with confidence is that the engineering architecture described here is the right foundation for exploring it.
CHAPTER TWENTY-SEVEN: INSTALLATION FILES
# requirements.txt
# Core LLM and embedding dependencies
openai>=1.30.0
tiktoken>=0.7.0
# Numerical computing (for cosine similarity)
numpy>=1.26.0
# Structured logging
structlog>=24.1.0
# Type hint backports (for Python 3.10 compatibility if needed)
typing-extensions>=4.11.0
# Optional: ANN index for production-scale semantic search
# hnswlib>=0.8.0
# Development and testing
# pytest>=8.2.0
# pytest-asyncio>=0.23.0
# pyproject.toml
[build-system]
requires = ["setuptools>=70.0", "wheel"]
build-backend = "setuptools.backends.legacy:build"
[project]
name = "ucma"
version = "1.0.0"
description = "Unified Cognitive Memory Architecture for LLM Agents"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"openai>=1.30.0",
"tiktoken>=0.7.0",
"numpy>=1.26.0",
"typing-extensions>=4.11.0",
"structlog>=24.1.0",
]
[project.optional-dependencies]
ann = ["hnswlib>=0.8.0"]
dev = [
"pytest>=8.2.0",
"pytest-asyncio>=0.23.0",
"mypy>=1.10.0",
"ruff>=0.4.0",
]
[tool.setuptools.packages.find]
where = ["."]
include = ["ucma*"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
[tool.mypy]
python_version = "3.11"
strict = true
ignore_missing_imports = true
Install the package in development mode with the following commands:
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
pip install -e ".[dev]"
export OPENAI_API_KEY="sk-..."
python -m ucma.main
CONCLUSION: TOWARD TRULY PERSISTENT INTELLIGENT AGENTS
We have traveled a long way from the blank slate of a stateless LLM to the rich, hierarchical, biologically-inspired architecture of UCMA. Let us take a moment to appreciate what we have built.
UCMA gives LLM agents a Sensory Buffer that handles the raw flow of incoming information without overwhelming the system. It gives them a Working Memory Manager that treats the precious context window as a managed resource rather than a dumping ground. It gives them an Episodic Memory Store with two tiers -- short-term richness and medium-term persistence -- connected by temporal indexes that make time-range queries fast. It gives them a Semantic Memory Store with a knowledge graph that supports both similarity search and relational reasoning. It gives them a Procedural Memory Store that learns and improves with every successful execution. It gives them a Memory Graph that connects all memories into a coherent network, enabling associative retrieval and causal reasoning. It gives them a Sleep Consolidation Engine that transforms raw experience into structured knowledge through compression, abstraction, reinforcement, and pruning. It gives them a Forgetting Engine that implements the Ebbinghaus forgetting curve to keep the system clean and fast. It gives them a Learning Store that accumulates wisdom from experience. And it gives them a Unified Retrieval Layer that makes all of this accessible through a single, fast, intelligent interface.
This is not a patch on top of a stateless LLM. This is a complete cognitive memory architecture that transforms a stateless text processor into a persistent, learning, remembering agent. The agent that uses UCMA does not just answer questions -- it grows. It learns from its mistakes. It remembers its users. It builds up procedural expertise. It becomes, over time, genuinely better at its job.
Karpathy's vision of the LLM OS is the right frame. We have now designed the memory management subsystem of that operating system. The CPU (the LLM) is already extraordinarily powerful. The RAM (the context window) is already well-understood. What was missing was the full memory hierarchy -- the L1 cache, the L2 cache, the virtual memory system, the disk, and the intelligent OS kernel that manages them all. UCMA is that kernel.
The amnesia problem is solvable.