FOREWORD
The history of computing is littered with patterns that were invented for one purpose and turned out to be exactly right for another. The Actor Model, conceived by Carl Hewitt at MIT in 1973, was designed to reason about concurrent computation in a principled way. Fifty years later, it turns out to be one of the most elegant and practical foundations for building Agentic AI systems, where autonomous reasoning entities must collaborate, compete, and coordinate to solve complex problems. This article takes you on a deep journey through both worlds, showing you how to marry the Actor Model with modern Large Language Model (LLM) technology to build systems that are robust, scalable, observable, and even transactional.
We will build everything from first principles. We will write real code. We will think carefully about failure, concurrency, and correctness. By the end, you will have a complete mental model and a practical toolkit for building production-grade Agentic AI systems using the Actor pattern.
CHAPTER ONE: THE ACTOR MODEL - FOUNDATIONS AND PHILOSOPHY
1.1 What Is an Actor?
An Actor is the fundamental unit of computation in the Actor Model. It is a self-contained, autonomous object that encapsulates both state and behavior, and that communicates with the outside world exclusively through asynchronous message passing. This is not a minor implementation detail - it is the defining characteristic that makes the Actor Model so powerful.
Think of an Actor as a very disciplined bureaucrat sitting in a sealed office. The bureaucrat has their own filing cabinet (private state), their own set of skills and procedures (behavior), and an inbox on their desk (message queue). Anyone who wants something from this bureaucrat must write a letter and drop it in the inbox. The bureaucrat reads letters one at a time, processes each one, possibly writes letters to other bureaucrats, possibly updates their filing cabinet, and then picks up the next letter. Nobody can walk into the office and rummage through the filing cabinet directly. Nobody can interrupt the bureaucrat mid-task. The only interface is the inbox.
This model gives us three profound guarantees. First, there are no shared mutable data races, because no two actors ever touch the same memory simultaneously. Second, the system is naturally concurrent, because thousands of bureaucrats can be processing their inboxes at the same time. Third, the system is naturally distributed, because it does not matter whether the bureaucrat's office is in the same building or on the other side of the world
- you still just drop a letter in their inbox.
The formal definition of an Actor specifies that upon receiving a message, an Actor can do exactly three things: it can send a finite number of messages to other Actors, it can create a finite number of new Actors, and it can designate the behavior to be used for the next message it receives, which effectively means updating its own state or changing its own behavior.
The figure below captures the essential anatomy of a single Actor and the flow of messages through it.
+---------------------------------------------------------------+
| ACTOR SYSTEM |
| |
| Actor A Actor B |
| +--------------------+ +--------------------+ |
| | Private State | | Private State | |
| | +--------------+ | | +--------------+ | |
| | | memory | | message | | memory | | |
| | | context | | --------> | | context | | |
| | | config | | | | config | | |
| | +--------------+ | | +--------------+ | |
| | | | | |
| | Inbox (Queue) | | Inbox (Queue) | |
| | +--------------+ | | +--------------+ | |
| | | msg3 | | | | msg1 | | |
| | | msg2 | | | +--------------+ | |
| | | msg1 <--- | | | | | |
| | +--------------+ | | v | |
| | | | | +------------+ | |
| | v | | | Behavior | | |
| | +------------+ | | | Handler | | |
| | | Behavior | | | +------------+ | |
| | | Handler | | | | |
| | +------------+ | +--------------------+ |
| +--------------------+ |
+---------------------------------------------------------------+
Figure 1: Two Actors within an Actor System. Each Actor owns its private state and processes messages from its inbox one at a time. Communication between actors happens exclusively by placing messages in the target actor's inbox. No actor can access another actor's private state directly.
1.2 Why Actors Are Perfect for AI Agents
An AI Agent, in the modern sense, is an autonomous reasoning entity that perceives its environment, decides on actions, executes those actions, and observes the results. It typically has access to tools, maintains memory across interactions, and may collaborate with other agents. When you list these properties and compare them to the Actor Model, the fit is almost uncanny.
An Agent needs to be autonomous and encapsulated, which is exactly what an Actor provides. An Agent needs to communicate with other agents without tight coupling, which is exactly what message passing provides. An Agent needs to maintain private state such as memory, context, and intermediate results without exposing it to other agents, which is exactly what Actor encapsulation provides. An Agent needs to be deployable on different machines or in different processes, which is exactly what the location-transparent message passing of the Actor Model enables.
Furthermore, the Actor Model gives us something that naive agent implementations often lack: a principled approach to concurrency and failure. In a naive multi-agent system, you might have agents calling each other's methods directly, sharing data structures, and creating all sorts of subtle race conditions and deadlocks. In an Actor-based system, these problems are eliminated by design.
The diagram below shows how the classic Sense-Think-Act loop of an AI agent maps naturally onto the Actor message-processing loop.
CLASSIC AGENT LOOP ACTOR MESSAGE-PROCESSING LOOP
+-------------------+ +----------------------------+
| | | |
| +----------+ | | +--------------------+ |
| | SENSE | | | | Receive message | |
| | perceive | | | | from inbox | |
| | environ. | | | +--------------------+ |
| +----+-----+ | | | |
| | | | v |
| v | | +--------------------+ |
| +----------+ | | | Inspect message | |
| | THINK | | <====> | | type and payload | |
| | reason & | | | +--------------------+ |
| | plan | | | | |
| +----+-----+ | | v |
| | | | +--------------------+ |
| v | | | Execute behavior: | |
| +----------+ | | | update state, | |
| | ACT | | | | send messages, | |
| | execute | | | | spawn new actors | |
| | actions | | | +--------------------+ |
| +----------+ | | | |
| | | | v |
| +----------+ | (wait for next message) |
| (loop) | +-----------------------+|
+-------------------+ +----------------------------+
Figure 2: The classic agent Sense-Think-Act loop maps directly onto the Actor message-processing loop. Sensing corresponds to receiving a message, thinking corresponds to inspecting it and running the behavior handler, and acting corresponds to sending messages and updating internal state.
1.3 The Constituents of an Actor System
A complete Actor system consists of several interacting components that work together to provide the full Actor Model semantics.
The Actor itself is the primary constituent. It holds private state that no other component can access directly. It holds a reference to its message queue, which serves as its inbox. It holds a behavior function or method that defines how it responds to each type of message. It may hold references to other actors, specifically their addresses rather than their internals, so it can send them messages.
The Message Queue, also called the mailbox or inbox, is the gatekeeper of the Actor. All incoming messages are placed in this queue by the runtime, and the Actor processes them one at a time in order. The queue is typically unbounded in the theoretical model, though practical implementations impose limits. The queue is the only way to interact with an Actor.
The Message itself is an immutable data structure that carries information from one Actor to another. Because messages are immutable, they can be safely shared across thread or process boundaries without copying in some implementations, or safely serialized and deserialized in distributed implementations. A message typically has a type or tag that identifies what kind of message it is, a payload that carries the actual data, and optionally a reply-to address so the recipient knows where to send a response.
The Actor Runtime, sometimes called the Actor System or ActorSystem, is the infrastructure that manages the lifecycle of Actors, delivers messages between them, and schedules their execution. In a thread-based implementation, the runtime might use a thread pool where each thread picks up an Actor with pending messages and runs its message handler. In a process-based implementation, the runtime uses operating system processes and inter-process communication mechanisms.
The Address, or Actor Reference, is what one Actor holds when it wants to communicate with another. It is not a pointer to the Actor's memory - it is an opaque identifier that the runtime uses to route messages. This indirection is what enables location transparency: the same code works whether the target Actor is in the same thread, the same process, or a remote machine.
CHAPTER TWO: DESIGNING THE COMMUNICATION PROTOCOL
2.1 The Importance of a Well-Defined Protocol
In any distributed system, the communication protocol is the contract between components. In an Actor-based agent system, this contract is especially important because it is the only interface that exists. There are no method calls, no shared objects, no direct memory access. If the protocol is poorly designed, the system will be brittle, hard to debug, and impossible to evolve.
A good Actor communication protocol for an AI agent system must satisfy several requirements. It must be self-describing, meaning that a message should carry enough information to be understood without external context. It must be versioned, so that the system can evolve without breaking existing actors. It must be serializable, so that messages can be sent across process or network boundaries. It must support both fire-and-forget semantics for notifications and request-response semantics for queries that expect an answer. And it must carry enough metadata for debugging, tracing, and monitoring.
2.2 Message Structure and Types
We will define a base message structure and a set of standard message types that all actors in our system understand. This forms the foundation of our protocol. The design deliberately keeps the message envelope small and consistent while allowing the payload to carry arbitrary typed data.
# protocol.py
#
# Defines the complete communication protocol for the Actor-based
# Agentic AI system. All messages exchanged between actors must
# conform to the structures defined here.
import uuid
import time
import json
from enum import Enum
from dataclasses import dataclass, field, asdict
from typing import Any, Optional, Dict
class MessageType(Enum):
"""
Enumerates every message type recognised by the actor system.
Adding a new message type here is the first step in extending
the protocol; the corresponding handler must then be registered
in each actor that needs to process it.
"""
# --- Lifecycle messages -------------------------------------------
INIT = "INIT" # Sent once when an actor starts up
SHUTDOWN = "SHUTDOWN" # Requests graceful actor shutdown
PING = "PING" # Health-check probe
PONG = "PONG" # Health-check acknowledgement
# --- General work messages ----------------------------------------
REQUEST = "REQUEST" # Generic work request
RESPONSE = "RESPONSE" # Generic work response
ERROR = "ERROR" # Signals a processing failure
# --- Agent-specific messages --------------------------------------
TASK = "TASK" # Assigns a high-level task to an agent
TOOL_CALL = "TOOL_CALL" # Asks a tool actor to run a tool
TOOL_RESULT = "TOOL_RESULT" # Returns the result of a tool execution
LLM_QUERY = "LLM_QUERY" # Sends a prompt to an LLM actor
LLM_REPLY = "LLM_REPLY" # Returns the LLM's generated response
# --- Memory messages ----------------------------------------------
MEMORY_STORE = "MEMORY_STORE" # Stores a fact in the memory actor
MEMORY_FETCH = "MEMORY_FETCH" # Retrieves facts from the memory actor
MEMORY_DATA = "MEMORY_DATA" # Carries retrieved memory data
# --- Transaction messages -----------------------------------------
TXN_BEGIN = "TXN_BEGIN" # Opens a distributed transaction
TXN_PREPARE = "TXN_PREPARE" # Phase-1 of two-phase commit
TXN_COMMIT = "TXN_COMMIT" # Commits a prepared transaction
TXN_ROLLBACK = "TXN_ROLLBACK" # Rolls back a transaction
TXN_ACK = "TXN_ACK" # Acknowledges a transaction directive
@dataclass
class Message:
"""
The universal envelope for all actor-to-actor communication.
Every field except 'payload' is part of the routing and tracing
infrastructure. The 'payload' field carries the actual business
data and is typed as Any so that each message type can define
its own payload schema independently.
"""
# The kind of message this is (drives dispatch in the handler)
msg_type: MessageType
# The address of the actor that sent this message
sender_id: str
# The address of the intended recipient
receiver_id: str
# Arbitrary business data; should be a dataclass or dict
payload: Any = None
# Unique identifier for this message (auto-generated)
msg_id: str = field(default_factory=lambda: str(uuid.uuid4()))
# Wall-clock creation time in seconds since epoch
timestamp: float = field(default_factory=time.time)
# Links this message to a parent message for tracing call chains
correlation_id: Optional[str] = None
# Protocol version for forward/backward compatibility
version: str = "1.0"
def to_json(self) -> str:
"""
Serialises the message to JSON for cross-process or
cross-network transport. The MessageType enum is converted
to its string value so JSON can represent it.
"""
d = asdict(self)
d["msg_type"] = self.msg_type.value
return json.dumps(d)
@classmethod
def from_json(cls, raw: str) -> "Message":
"""
Deserialises a message that arrived over a transport layer.
Reconstructs the MessageType enum from its string value.
"""
d = json.loads(raw)
d["msg_type"] = MessageType(d["msg_type"])
return cls(**d)
The Message class is intentionally simple. Every actor in the system, whether it is an LLM actor, a tool actor, a memory actor, or an orchestrator, speaks this same language. The msg_type field drives dispatch: when an actor receives a message, the very first thing it does is look at msg_type to decide which handler to invoke. The correlation_id field is crucial for request-response patterns: when Actor A sends a REQUEST to Actor B, it puts its own msg_id into the correlation_id of the request, and Actor B echoes that same correlation_id back in its RESPONSE. Actor A can then match the response to the original request even if many other messages have arrived in the meantime.
2.3 Payload Schemas
While the Message envelope is universal, each message type has its own expected payload structure. Defining these payload schemas explicitly, rather than passing raw dictionaries, catches errors early and makes the protocol self-documenting.
# protocol.py (continued)
#
# Payload dataclasses define the expected structure of the 'payload'
# field for each message type. Using typed dataclasses instead of
# raw dicts gives us IDE support, runtime validation, and clear
# documentation of what each message carries.
@dataclass
class TaskPayload:
"""Payload for a TASK message sent to an agent actor."""
task_description: str # Human-readable task description
context: Dict = field(default_factory=dict) # Extra context
max_steps: int = 10 # Safety limit on reasoning steps
timeout_seconds: float = 120.0 # Maximum wall-clock time for the task
@dataclass
class LLMQueryPayload:
"""Payload for an LLM_QUERY message sent to an LLM actor."""
messages: list # OpenAI-style message list
model: str = "" # Empty means use the actor's default model
temperature: float = 0.7 # Sampling temperature
max_tokens: int = 2048 # Maximum tokens to generate
stream: bool = False # Whether to stream the response
@dataclass
class LLMReplyPayload:
"""Payload for an LLM_REPLY message returned by an LLM actor."""
content: str # The generated text
model_used: str = "" # Which model actually produced this
input_tokens: int = 0 # Tokens consumed by the prompt
output_tokens: int = 0 # Tokens in the generated response
finish_reason: str = "" # "stop", "length", "tool_calls", etc.
@dataclass
class ToolCallPayload:
"""Payload for a TOOL_CALL message sent to a tool actor."""
tool_name: str # Name of the tool to execute
arguments: Dict # Named arguments for the tool
call_id: str = field(default_factory=lambda: str(uuid.uuid4()))
@dataclass
class ToolResultPayload:
"""Payload for a TOOL_RESULT message returned by a tool actor."""
call_id: str # Matches the call_id from ToolCallPayload
result: Any = None # The tool's return value
error: Optional[str] = None # Non-None if the tool raised an error
@dataclass
class TransactionPayload:
"""Payload for all transaction-related messages."""
txn_id: str # Unique transaction identifier
participants: list = field(default_factory=list) # Actor IDs involved
data: Dict = field(default_factory=dict) # Transaction data
reason: str = "" # Human-readable reason (for rollback)
CHAPTER THREE: THE BASE ACTOR - THREAD AND PROCESS IMPLEMENTATIONS
3.1 Design Principles for the Base Actor
Before writing a single line of the base Actor class, it is worth articulating the design principles that will guide us. These principles are not arbitrary; they follow directly from the Actor Model's theoretical foundations and from hard-won practical experience with concurrent systems.
The first principle is that the Actor's inbox is its only public interface. No method on the Actor class should be called directly from outside the Actor, except for the send() method that places a message in the inbox. All other methods are private implementation details.
The second principle is that the Actor processes exactly one message at a time. This is the source of the Actor Model's freedom from data races. The message handler runs to completion before the next message is dequeued. This means that within a message handler, the Actor's private state can be read and written freely without any locks or synchronisation primitives.
The third principle is that the Actor is responsible for its own lifecycle. It starts itself, runs its message loop, handles its own errors, and shuts itself down gracefully when it receives a SHUTDOWN message. The runtime provides the infrastructure, but the Actor controls its own destiny.
The fourth principle is that failure is a first-class citizen. If a message handler raises an exception, the Actor should not crash silently. It should log the error, optionally send an ERROR message to a supervisor, and continue processing the next message. This is the Actor Model's approach to fault tolerance: isolate failures to individual actors and let the system continue.
3.2 Thread-Based Actor Implementation
The thread-based implementation is the simplest and most appropriate for single-machine deployments where actors need to share memory efficiently. Each Actor runs in its own thread, blocked on its inbox queue when there is no work to do, and woken up automatically when a message arrives.
# actor_base.py
#
# Provides the foundational BaseActor class that all concrete actors
# in the system inherit from. The thread-based implementation uses
# Python's threading module and queue.Queue for the inbox.
#
# Design note: We use daemon=True for actor threads so that the
# entire process exits cleanly when the main thread finishes,
# without needing to explicitly shut down every actor.
import threading
import queue
import logging
import traceback
from abc import ABC, abstractmethod
from typing import Dict, Callable, Optional
from protocol import Message, MessageType
logger = logging.getLogger(__name__)
class BaseActor(ABC):
"""
Abstract base class for all thread-based actors in the system.
Subclasses must implement on_message() to define their behaviour.
They may also override on_init() and on_shutdown() to perform
setup and teardown work when the actor starts and stops.
The actor runs its message loop in a dedicated daemon thread.
The only thread-safe public method is send(), which places a
message into the actor's inbox queue.
"""
def __init__(self, actor_id: str, inbox_maxsize: int = 0):
"""
Initialises the actor's identity, inbox, and internal state.
actor_id : Unique name for this actor within the system.
Used as the sender_id in outgoing messages.
inbox_maxsize: Maximum number of messages the inbox can hold.
0 means unlimited (the default).
"""
self.actor_id = actor_id
self._inbox = queue.Queue(maxsize=inbox_maxsize)
self._running = False
self._thread = None
# Registry of other actors this actor knows about.
# Maps actor_id -> BaseActor instance (for in-process actors)
# or actor_id -> queue.Queue (for cross-process actors).
self._registry: Dict[str, "BaseActor"] = {}
# Handlers are registered per MessageType. This dispatch table
# is populated by subclasses calling _register_handler().
self._handlers: Dict[MessageType, Callable] = {}
# Register the built-in lifecycle handlers
self._register_handler(MessageType.PING, self._handle_ping)
self._register_handler(MessageType.SHUTDOWN, self._handle_shutdown)
# ------------------------------------------------------------------
# Public interface: the ONLY methods intended to be called from
# outside the actor. Everything else is private.
# ------------------------------------------------------------------
def start(self) -> None:
"""
Starts the actor's message-processing thread.
Safe to call only once per actor instance.
"""
if self._running:
logger.warning(
"Actor %s is already running; ignoring start().",
self.actor_id
)
return
self._running = True
self._thread = threading.Thread(
target=self._run_loop,
name=f"actor-{self.actor_id}",
daemon=True
)
self._thread.start()
logger.info("Actor %s started.", self.actor_id)
def send(self, message: Message) -> None:
"""
Places a message into this actor's inbox.
This is the sole entry point for external communication.
Thread-safe: may be called from any thread.
"""
self._inbox.put(message)
def register_peer(self, actor: "BaseActor") -> None:
"""
Registers another actor so that this actor can send messages
to it by name. In a distributed system this would be replaced
by a network address lookup.
"""
self._registry[actor.actor_id] = actor
def join(self, timeout: Optional[float] = None) -> None:
"""Waits for the actor's thread to finish."""
if self._thread:
self._thread.join(timeout=timeout)
# ------------------------------------------------------------------
# Protected interface: intended for use by subclasses only.
# ------------------------------------------------------------------
def _register_handler(
self,
msg_type: MessageType,
handler: Callable
) -> None:
"""
Registers a handler function for a specific message type.
The handler receives the full Message object as its argument.
Subclasses call this in their __init__ to wire up their
message-handling logic.
"""
self._handlers[msg_type] = handler
def _send_to(self, target_id: str, message: Message) -> None:
"""
Looks up a peer actor by ID and delivers a message to it.
Raises KeyError if the target actor is not registered.
"""
if target_id not in self._registry:
raise KeyError(
f"Actor {self.actor_id} has no peer named '{target_id}'"
)
self._registry[target_id].send(message)
def _reply(self, original: Message, msg_type: MessageType,
payload: any = None) -> None:
"""
Convenience method for sending a reply to the sender of a
received message. Automatically sets the receiver_id and
correlation_id fields correctly.
"""
reply = Message(
msg_type = msg_type,
sender_id = self.actor_id,
receiver_id = original.sender_id,
payload = payload,
correlation_id = original.msg_id
)
self._send_to(original.sender_id, reply)
# ------------------------------------------------------------------
# Lifecycle hooks: override in subclasses for custom behaviour.
# ------------------------------------------------------------------
def on_init(self) -> None:
"""Called once before the message loop starts. Override freely."""
pass
def on_shutdown(self) -> None:
"""Called once after the message loop exits. Override freely."""
pass
@abstractmethod
def on_message(self, message: Message) -> None:
"""
The primary message handler. Subclasses must implement this.
It is called for every message whose type has no specific
handler registered via _register_handler().
"""
# ------------------------------------------------------------------
# Private implementation: never call these from outside the class.
# ------------------------------------------------------------------
def _run_loop(self) -> None:
"""
The actor's main message-processing loop. Runs in its own
thread. Calls on_init() once, then processes messages until
_running is set to False by _handle_shutdown().
"""
try:
self.on_init()
except Exception:
logger.error(
"Actor %s raised an exception in on_init():\n%s",
self.actor_id, traceback.format_exc()
)
while self._running:
try:
# Block for up to 1 second, then loop to check _running.
# This allows clean shutdown even if no SHUTDOWN message
# arrives (e.g., if the process is being terminated).
message = self._inbox.get(timeout=1.0)
except queue.Empty:
continue
try:
self._dispatch(message)
except Exception:
logger.error(
"Actor %s raised an unhandled exception "
"processing message %s:\n%s",
self.actor_id,
message.msg_id,
traceback.format_exc()
)
finally:
self._inbox.task_done()
try:
self.on_shutdown()
except Exception:
logger.error(
"Actor %s raised an exception in on_shutdown():\n%s",
self.actor_id, traceback.format_exc()
)
logger.info("Actor %s has stopped.", self.actor_id)
def _dispatch(self, message: Message) -> None:
"""
Routes an incoming message to the appropriate handler.
If a specific handler is registered for the message type,
it is called. Otherwise, the abstract on_message() fallback
is called, giving subclasses a catch-all hook.
"""
handler = self._handlers.get(message.msg_type)
if handler:
handler(message)
else:
self.on_message(message)
def _handle_ping(self, message: Message) -> None:
"""Responds to PING messages with a PONG."""
self._reply(message, MessageType.PONG,
payload={"status": "alive",
"actor_id": self.actor_id})
def _handle_shutdown(self, message: Message) -> None:
"""Handles a SHUTDOWN message by stopping the message loop."""
logger.info(
"Actor %s received SHUTDOWN from %s.",
self.actor_id, message.sender_id
)
self._running = False
The BaseActor class is the cornerstone of the entire system. Notice how the _run_loop method uses a one-second timeout on the inbox.get() call rather than blocking indefinitely. This is a subtle but important design choice: it allows the loop to periodically check the _running flag, which means the actor can be stopped cleanly even if no SHUTDOWN message ever arrives. In production systems you would also want to add metrics here, recording how long each message takes to process, how deep the inbox queue is, and how many messages of each type have been processed.
The _dispatch method implements a simple but powerful pattern: a dispatch table of handlers indexed by message type. This is far cleaner than a long chain of if/elif statements, and it allows subclasses to register handlers for specific message types without overriding the entire dispatch mechanism.
3.3 Process-Based Actor Implementation
While thread-based actors are convenient for single-machine deployments, they are limited by Python's Global Interpreter Lock (GIL), which prevents true parallel execution of CPU-bound Python code across threads. For actors that do heavy computation, such as running a local LLM, a process-based implementation is more appropriate. Each actor runs in its own operating system process, with true parallelism and complete memory isolation.
The key difference in the process-based implementation is that the inbox cannot be a simple in-memory queue. It must be a queue that works across process boundaries. Python's multiprocessing module provides exactly this with multiprocessing.Queue, which uses operating system pipes or shared memory under the hood to transfer data between processes. Because data must cross a process boundary, all messages must be serialisable, which is why we implemented to_json() and from_json() on the Message class earlier.
# actor_process.py
#
# Process-based actor implementation. Each actor runs in its own
# OS process, providing true CPU parallelism and complete memory
# isolation. Communication uses multiprocessing.Queue, which
# serialises objects automatically using pickle.
#
# Use this implementation for CPU-intensive actors such as local
# LLM inference actors. Use BaseActor (thread-based) for I/O-bound
# actors such as tool actors that call external APIs.
import multiprocessing as mp
import logging
import traceback
from abc import ABC, abstractmethod
from typing import Dict, Callable, Optional
from protocol import Message, MessageType
# We must use 'spawn' start method on all platforms for safety,
# especially on macOS where 'fork' can cause deadlocks with certain
# libraries (including those used by PyTorch and MLX).
mp.set_start_method("spawn", force=True)
logger = logging.getLogger(__name__)
class ProcessActor(ABC):
"""
Abstract base class for process-isolated actors.
Each instance of this class manages a separate OS process.
The process runs _process_main(), which is the entry point
for the actor's message loop inside the child process.
Communication uses mp.Queue objects that are shared between
the parent and child process via OS-level IPC mechanisms.
"""
def __init__(self, actor_id: str, inbox_maxsize: int = 100):
self.actor_id = actor_id
# These queues are created in the parent process but are
# accessible from the child process after fork/spawn.
self._inbox = mp.Queue(maxsize=inbox_maxsize)
self._outbox = mp.Queue() # For sending results back
# Maps peer actor_id -> mp.Queue (their inbox)
self._peer_queues: Dict[str, mp.Queue] = {}
self._process: Optional[mp.Process] = None
def start(self) -> None:
"""Spawns the child process and starts the actor's message loop."""
self._process = mp.Process(
target=ProcessActor._process_main,
args=(self,),
name=f"actor-{self.actor_id}",
daemon=True
)
self._process.start()
logger.info(
"ProcessActor %s started (PID %d).",
self.actor_id, self._process.pid
)
def send(self, message: Message) -> None:
"""Places a message into this actor's inbox. Process-safe."""
self._inbox.put(message)
def register_peer_queue(
self, actor_id: str, inbox: mp.Queue
) -> None:
"""
Registers a peer actor's inbox queue so this actor can
send messages to it across process boundaries.
"""
self._peer_queues[actor_id] = inbox
@staticmethod
def _process_main(actor: "ProcessActor") -> None:
"""
Entry point for the child process. This static method is
necessary because instance methods cannot be pickled on
all platforms when using the 'spawn' start method.
"""
logging.basicConfig(level=logging.INFO)
running = True
try:
actor.on_init()
except Exception:
logger.error(
"ProcessActor %s failed in on_init():\n%s",
actor.actor_id, traceback.format_exc()
)
return
while running:
try:
message = actor._inbox.get(timeout=1.0)
except Exception:
# Catches both queue.Empty and any IPC errors
continue
if message.msg_type == MessageType.SHUTDOWN:
running = False
continue
try:
actor.on_message(message)
except Exception:
logger.error(
"ProcessActor %s error on message %s:\n%s",
actor.actor_id,
message.msg_id,
traceback.format_exc()
)
try:
actor.on_shutdown()
except Exception:
logger.error(
"ProcessActor %s failed in on_shutdown():\n%s",
actor.actor_id, traceback.format_exc()
)
def _send_to_peer(
self, target_id: str, message: Message
) -> None:
"""Sends a message to a peer actor's inbox queue."""
if target_id not in self._peer_queues:
raise KeyError(
f"ProcessActor {self.actor_id} has no peer "
f"queue for '{target_id}'"
)
self._peer_queues[target_id].put(message)
def on_init(self) -> None:
"""Override to perform setup inside the child process."""
pass
def on_shutdown(self) -> None:
"""Override to perform teardown inside the child process."""
pass
@abstractmethod
def on_message(self, message: Message) -> None:
"""Subclasses must implement their message-handling logic here."""
def join(self, timeout: Optional[float] = None) -> None:
"""Waits for the child process to exit."""
if self._process:
self._process.join(timeout=timeout)
def terminate(self) -> None:
"""Forcefully terminates the child process."""
if self._process:
self._process.terminate()
The process-based actor is architecturally identical to the thread-based one from the outside. Both expose a send() method and a start() method. Both process messages one at a time in a loop. The difference is entirely in the execution environment: one uses a thread, the other uses a process. This uniformity is intentional. It means that the rest of the system, the orchestrators, the tool actors, the memory actors, does not need to know which kind of actor it is talking to. It just sends messages.
The diagram below shows the relationship between thread-based and process-based actors in a mixed deployment, which is the most common real-world scenario.
+================================================================+
| SINGLE MACHINE DEPLOYMENT |
| |
| OS Process: main application |
| +----------------------------------------------------------+ |
| | | |
| | Thread: OrchestratorActor Thread: MemoryActor | |
| | +----------------------+ +----------------------+ | |
| | | (BaseActor) | | (BaseActor) | | |
| | | Coordinates agents | | Stores/retrieves | | |
| | | Manages task flow | | conversation history | | |
| | +----------+-----------+ +----------+-----------+ | |
| | | mp.Queue ^ | | |
| | | or Queue | | | |
| +-------------|---------------------|-----|----------------+ |
| | | | |
| OS Process: LLM Actor | | |
| +---------------------------+ | | |
| | (ProcessActor) | | | |
| | Runs local LLM inference |<------+ | |
| | Uses GPU (CUDA/MLX/Vulkan)| | |
| | Full CPU/GPU parallelism +------------>+ |
| +---------------------------+ |
| |
| OS Process: ToolActor (web search, code exec, file I/O) |
| +----------------------------------------------------------+ |
| | (ProcessActor) Sandboxed tool execution environment | |
| +----------------------------------------------------------+ |
+================================================================+
Figure 3: A mixed deployment where I/O-bound actors (Orchestrator, Memory) run as threads within the main process, while CPU-intensive actors (LLM inference, sandboxed tool execution) run as separate OS processes. The message-passing interface is identical in both cases.
CHAPTER FOUR: THE LLM ACTOR - SUPPORTING MULTIPLE BACKENDS
4.1 The Challenge of Multi-Backend LLM Support
One of the most practically important actors in any Agentic AI system is the LLM actor: the actor responsible for running language model inference. The challenge is that the landscape of LLM inference backends is fragmented. NVIDIA GPUs use CUDA and are served by libraries like PyTorch with CUDA support or llama-cpp-python with CUDA backends. Apple Silicon Macs use the MLX framework, which is optimised for the unified memory architecture of M- series chips. Cross-platform Vulkan support is emerging for AMD and Intel GPUs. And remote LLMs are accessed via HTTP APIs that follow the OpenAI specification.
The Actor pattern gives us a beautiful solution to this fragmentation. We define a single LLM actor interface, expressed entirely in terms of the message protocol we designed in Part Two. The LLM_QUERY message goes in; the LLM_REPLY message comes out. The implementation behind that interface can be swapped freely: CUDA, MLX, Vulkan, or a remote API. The rest of the system never needs to know which backend is in use.
4.2 The LLM Backend Abstraction
We start by defining an abstract backend class that all concrete backends must implement. This is the Strategy pattern applied to LLM inference.
# llm_backends.py
#
# Abstract and concrete LLM backend implementations.
# Each backend wraps a specific inference library or API and
# exposes a uniform generate() interface to the LLM actor.
#
# Supported backends:
# - OpenAIBackend : Remote API (OpenAI, Azure, Ollama, vLLM, etc.)
# - CUDABackend : Local inference via llama-cpp-python with CUDA
# - MLXBackend : Local inference via mlx-lm on Apple Silicon
# - VulkanBackend : Local inference via llama-cpp-python with Vulkan
#
# To add a new backend, subclass LLMBackend and implement generate().
import os
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Optional
logger = logging.getLogger(__name__)
@dataclass
class GenerationResult:
"""
Uniform result structure returned by every backend's generate().
Decouples the rest of the system from backend-specific response
objects.
"""
content: str
model_used: str
input_tokens: int
output_tokens: int
finish_reason: str
class LLMBackend(ABC):
"""
Abstract base for all LLM inference backends.
Subclasses wrap a specific library or API and translate the
generic generate() call into backend-specific operations.
"""
@abstractmethod
def generate(
self,
messages: List[Dict],
model: str,
temperature: float,
max_tokens: int
) -> GenerationResult:
"""
Runs inference and returns a GenerationResult.
messages : OpenAI-style list of {"role": ..., "content": ...}
model : Model name or path (backend-specific)
temperature : Sampling temperature (0.0 = greedy, 1.0 = creative)
max_tokens : Maximum number of tokens to generate
"""
@abstractmethod
def is_available(self) -> bool:
"""
Returns True if this backend's dependencies are installed
and its hardware is available. Used at startup to select
the best available backend automatically.
"""
# ------------------------------------------------------------------
# Remote API Backend (OpenAI-compatible)
# Works with OpenAI, Azure OpenAI, Ollama, vLLM, LM Studio, etc.
# ------------------------------------------------------------------
class OpenAIBackend(LLMBackend):
"""
Backend for any OpenAI-compatible HTTP API.
Set base_url to point to a local server (e.g., Ollama at
http://localhost:11434/v1) or leave it None for the real
OpenAI API.
"""
def __init__(
self,
api_key: str = "",
base_url: Optional[str] = None,
default_model: str = "gpt-4o"
):
self._default_model = default_model
self._client = None
self._api_key = api_key or os.getenv("OPENAI_API_KEY", "")
self._base_url = base_url
def _get_client(self):
"""Lazily initialises the openai client to avoid import cost."""
if self._client is None:
try:
import openai
self._client = openai.OpenAI(
api_key = self._api_key,
base_url = self._base_url
)
except ImportError:
raise RuntimeError(
"openai package not installed. "
"Run: pip install openai"
)
return self._client
def generate(
self,
messages: List[Dict],
model: str,
temperature: float,
max_tokens: int
) -> GenerationResult:
client = self._get_client()
model_name = model or self._default_model
response = client.chat.completions.create(
model = model_name,
messages = messages,
temperature = temperature,
max_tokens = max_tokens
)
choice = response.choices[0]
return GenerationResult(
content = choice.message.content or "",
model_used = response.model,
input_tokens = response.usage.prompt_tokens,
output_tokens = response.usage.completion_tokens,
finish_reason = choice.finish_reason or "stop"
)
def is_available(self) -> bool:
try:
import openai # noqa: F401
return True
except ImportError:
return False
# ------------------------------------------------------------------
# CUDA Backend (llama-cpp-python with CUDA offloading)
# Requires: pip install llama-cpp-python --extra-index-url
# https://abetlen.github.io/llama-cpp-python/whl/cu121
# ------------------------------------------------------------------
class CUDABackend(LLMBackend):
"""
Runs local GGUF models using llama-cpp-python with CUDA GPU
offloading. The n_gpu_layers parameter controls how many
transformer layers are offloaded to the GPU; set it to -1
to offload all layers (recommended if VRAM is sufficient).
"""
def __init__(
self,
model_path: str,
n_gpu_layers: int = -1, # -1 = offload all layers
n_ctx: int = 8192, # Context window size
default_model: str = "local-cuda"
):
self._model_path = model_path
self._n_gpu_layers = n_gpu_layers
self._n_ctx = n_ctx
self._default_model = default_model
self._llm = None # Loaded lazily on first use
def _load_model(self):
"""Loads the GGUF model into GPU memory on first call."""
if self._llm is None:
try:
from llama_cpp import Llama
logger.info(
"Loading GGUF model from %s with %d GPU layers...",
self._model_path, self._n_gpu_layers
)
self._llm = Llama(
model_path = self._model_path,
n_gpu_layers = self._n_gpu_layers,
n_ctx = self._n_ctx,
verbose = False
)
logger.info("CUDA model loaded successfully.")
except ImportError:
raise RuntimeError(
"llama-cpp-python not installed with CUDA support."
)
return self._llm
def generate(
self,
messages: List[Dict],
model: str,
temperature: float,
max_tokens: int
) -> GenerationResult:
llm = self._load_model()
response = llm.create_chat_completion(
messages = messages,
temperature = temperature,
max_tokens = max_tokens
)
choice = response["choices"][0]
usage = response.get("usage", {})
return GenerationResult(
content = choice["message"]["content"] or "",
model_used = self._default_model,
input_tokens = usage.get("prompt_tokens", 0),
output_tokens = usage.get("completion_tokens", 0),
finish_reason = choice.get("finish_reason", "stop")
)
def is_available(self) -> bool:
try:
from llama_cpp import Llama # noqa: F401
import ctypes
# Check that CUDA runtime is accessible
ctypes.CDLL("libcudart.so")
return True
except (ImportError, OSError):
return False
# ------------------------------------------------------------------
# Apple MLX Backend
# Requires: pip install mlx-lm
# Only available on Apple Silicon (M1/M2/M3/M4) Macs.
# ------------------------------------------------------------------
class MLXBackend(LLMBackend):
"""
Runs local models using Apple's MLX framework, which is
optimised for the unified memory architecture of Apple Silicon.
MLX models are loaded from Hugging Face Hub in MLX format.
Example model: mlx-community/Mistral-7B-Instruct-v0.3-4bit
"""
def __init__(
self,
model_name: str,
default_model: str = "local-mlx"
):
self._model_name = model_name
self._default_model = default_model
self._model = None
self._tokenizer = None
def _load_model(self):
"""Loads the MLX model on first use."""
if self._model is None:
try:
from mlx_lm import load
logger.info(
"Loading MLX model: %s", self._model_name
)
self._model, self._tokenizer = load(self._model_name)
logger.info("MLX model loaded successfully.")
except ImportError:
raise RuntimeError(
"mlx-lm not installed. Run: pip install mlx-lm"
)
return self._model, self._tokenizer
def generate(
self,
messages: List[Dict],
model: str,
temperature: float,
max_tokens: int
) -> GenerationResult:
from mlx_lm import generate as mlx_generate
mdl, tokenizer = self._load_model()
# Apply the chat template to format the message list
# into a single prompt string that the model expects.
prompt = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
response = mlx_generate(
mdl,
tokenizer,
prompt = prompt,
max_tokens = max_tokens,
temp = temperature,
verbose = False
)
# mlx_generate returns the generated text only (no prompt)
return GenerationResult(
content = response,
model_used = self._default_model,
input_tokens = 0, # MLX does not expose token counts here
output_tokens = 0,
finish_reason = "stop"
)
def is_available(self) -> bool:
try:
import mlx_lm # noqa: F401
import platform
return (
platform.system() == "Darwin" and
platform.machine() == "arm64"
)
except ImportError:
return False
# ------------------------------------------------------------------
# Vulkan Backend (llama-cpp-python with Vulkan/CLBlast offloading)
# Requires: pip install llama-cpp-python with Vulkan build flags
# Works on AMD, Intel, and other Vulkan-capable GPUs.
# ------------------------------------------------------------------
class VulkanBackend(LLMBackend):
"""
Runs local GGUF models using llama-cpp-python compiled with
Vulkan support. This provides GPU acceleration on AMD and Intel
GPUs that are not supported by CUDA.
Build llama-cpp-python with Vulkan:
CMAKE_ARGS="-DGGML_VULKAN=on" pip install llama-cpp-python
"""
def __init__(
self,
model_path: str,
n_gpu_layers: int = -1,
n_ctx: int = 8192,
default_model: str = "local-vulkan"
):
self._model_path = model_path
self._n_gpu_layers = n_gpu_layers
self._n_ctx = n_ctx
self._default_model = default_model
self._llm = None
def _load_model(self):
if self._llm is None:
try:
from llama_cpp import Llama
logger.info(
"Loading GGUF model via Vulkan from %s...",
self._model_path
)
# llama-cpp-python uses the same Llama class for
# Vulkan; the GPU backend is selected at compile time.
self._llm = Llama(
model_path = self._model_path,
n_gpu_layers = self._n_gpu_layers,
n_ctx = self._n_ctx,
verbose = False
)
logger.info("Vulkan model loaded successfully.")
except ImportError:
raise RuntimeError(
"llama-cpp-python not installed with Vulkan support."
)
return self._llm
def generate(
self,
messages: List[Dict],
model: str,
temperature: float,
max_tokens: int
) -> GenerationResult:
llm = self._load_model()
response = llm.create_chat_completion(
messages = messages,
temperature = temperature,
max_tokens = max_tokens
)
choice = response["choices"][0]
usage = response.get("usage", {})
return GenerationResult(
content = choice["message"]["content"] or "",
model_used = self._default_model,
input_tokens = usage.get("prompt_tokens", 0),
output_tokens = usage.get("completion_tokens", 0),
finish_reason = choice.get("finish_reason", "stop")
)
def is_available(self) -> bool:
try:
from llama_cpp import Llama # noqa: F401
return True # Assume Vulkan if llama_cpp is installed
except ImportError:
return False
# ------------------------------------------------------------------
# Backend auto-selection factory
# ------------------------------------------------------------------
def select_best_backend(
model_path_or_name: str = "",
api_key: str = "",
base_url: Optional[str] = None
) -> LLMBackend:
"""
Automatically selects the best available LLM backend based on
the current hardware and installed packages.
Priority order:
1. MLX (Apple Silicon - fastest on M-series chips)
2. CUDA (NVIDIA GPU - fastest on NVIDIA hardware)
3. Vulkan (AMD/Intel GPU - good cross-platform GPU support)
4. OpenAI API (remote fallback - always available if key set)
Raises RuntimeError if no backend is available.
"""
candidates = [
MLXBackend(model_name=model_path_or_name),
CUDABackend(model_path=model_path_or_name),
VulkanBackend(model_path=model_path_or_name),
OpenAIBackend(api_key=api_key, base_url=base_url)
]
for backend in candidates:
if backend.is_available():
logger.info(
"Auto-selected backend: %s",
type(backend).__name__
)
return backend
raise RuntimeError(
"No LLM backend is available. Install at least one of: "
"mlx-lm, llama-cpp-python (with CUDA or Vulkan), or openai."
)
4.3 The LLM Actor
With the backend abstraction in place, the LLM actor itself is remarkably clean. It receives LLM_QUERY messages, delegates to whichever backend is configured, and sends back LLM_REPLY messages. Because it is a ProcessActor, it runs in its own OS process, which means that heavy GPU computation does not block any other actor in the system.
# llm_actor.py
#
# The LLM Actor wraps an LLM backend and exposes it to the rest of
# the actor system via the standard message protocol.
# It runs as a ProcessActor to isolate GPU memory and CPU usage.
import logging
from protocol import (
Message, MessageType, LLMQueryPayload, LLMReplyPayload
)
from actor_process import ProcessActor
from llm_backends import LLMBackend, GenerationResult
logger = logging.getLogger(__name__)
class LLMActor(ProcessActor):
"""
An actor that wraps an LLM backend and serves inference requests.
Receives : LLM_QUERY messages with an LLMQueryPayload
Sends : LLM_REPLY messages with an LLMReplyPayload
ERROR messages if inference fails
Because this actor runs in a separate process, the GPU memory
used by the model is completely isolated from the main process.
Multiple LLMActor instances can run in parallel on different
GPUs by pointing each at a different backend.
"""
def __init__(self, actor_id: str, backend: LLMBackend):
super().__init__(actor_id)
# The backend is stored here but will be used inside the
# child process after the process is spawned. For this to
# work with the 'spawn' start method, the backend object
# must be picklable. All our backend classes are picklable
# because they use lazy initialisation (models are loaded
# inside _load_model(), not in __init__).
self._backend = backend
def on_init(self) -> None:
"""
Called inside the child process when it starts.
We log a startup message here; heavy model loading is
deferred to the first actual inference request so that
startup is fast and errors are reported in context.
"""
logger.info(
"LLMActor %s initialised in process. Backend: %s",
self.actor_id,
type(self._backend).__name__
)
def on_message(self, message: Message) -> None:
"""
Handles all messages received by the LLM actor.
Only LLM_QUERY is expected in normal operation.
"""
if message.msg_type == MessageType.LLM_QUERY:
self._handle_llm_query(message)
else:
logger.warning(
"LLMActor %s received unexpected message type: %s",
self.actor_id, message.msg_type
)
def _handle_llm_query(self, message: Message) -> None:
"""
Processes an LLM_QUERY message by running inference and
sending back an LLM_REPLY to the original sender.
"""
payload: LLMQueryPayload = message.payload
logger.info(
"LLMActor %s running inference for %s "
"(model=%s, temp=%.2f, max_tokens=%d)",
self.actor_id,
message.sender_id,
payload.model or "default",
payload.temperature,
payload.max_tokens
)
try:
result: GenerationResult = self._backend.generate(
messages = payload.messages,
model = payload.model,
temperature = payload.temperature,
max_tokens = payload.max_tokens
)
reply_payload = LLMReplyPayload(
content = result.content,
model_used = result.model_used,
input_tokens = result.input_tokens,
output_tokens = result.output_tokens,
finish_reason = result.finish_reason
)
reply = Message(
msg_type = MessageType.LLM_REPLY,
sender_id = self.actor_id,
receiver_id = message.sender_id,
payload = reply_payload,
correlation_id = message.msg_id
)
self._send_to_peer(message.sender_id, reply)
except Exception as exc:
# Send an ERROR message back so the caller can handle
# the failure gracefully rather than waiting forever.
error_msg = Message(
msg_type = MessageType.ERROR,
sender_id = self.actor_id,
receiver_id = message.sender_id,
payload = {"error": str(exc)},
correlation_id = message.msg_id
)
self._send_to_peer(message.sender_id, error_msg)
logger.error(
"LLMActor %s inference failed: %s",
self.actor_id, exc
)
def on_shutdown(self) -> None:
"""Releases GPU memory by deleting the backend's model."""
if hasattr(self._backend, "_llm") and self._backend._llm:
del self._backend._llm
if hasattr(self._backend, "_model") and self._backend._model:
del self._backend._model
logger.info(
"LLMActor %s shut down; GPU memory released.",
self.actor_id
)
CHAPTER FIVE: TOOL ACTORS AND THE MEMORY ACTOR
5.1 Tool Actors
In modern agentic systems, tools are the hands of the agent: they allow the agent to interact with the world beyond the LLM's parametric knowledge. A tool might search the web, execute Python code, read a file, query a database, call an external API, or perform any other side-effecting operation. In our Actor-based system, each tool or group of related tools is wrapped in a Tool Actor that receives TOOL_CALL messages and responds with TOOL_RESULT messages.
The sandboxing that the Actor Model provides is especially valuable for tool actors. Because a tool actor runs in its own process (or at minimum its own thread with no shared state), a misbehaving tool cannot corrupt the state of the agent that called it. If the tool raises an exception, the exception is caught inside the tool actor's message handler, wrapped in a TOOL_RESULT message with an error field, and sent back to the caller. The calling agent receives a clean error report rather than a stack trace that unwinds through its own code.
# tool_actor.py
#
# A flexible tool actor that hosts a registry of callable tools.
# Tools are registered as plain Python functions. The actor
# receives TOOL_CALL messages, looks up the named tool, calls it
# with the provided arguments, and returns a TOOL_RESULT.
#
# This actor uses the thread-based BaseActor because most tools
# are I/O-bound (network calls, file I/O) rather than CPU-bound.
# For CPU-intensive tools (e.g., image processing), use ProcessActor.
import logging
from typing import Dict, Callable, Any
from protocol import (
Message, MessageType, ToolCallPayload, ToolResultPayload
)
from actor_base import BaseActor
logger = logging.getLogger(__name__)
class ToolActor(BaseActor):
"""
Hosts a registry of tools and serves TOOL_CALL requests.
Tools are registered as plain Python callables. Each tool
receives keyword arguments from the message payload and returns
a result that is wrapped in a TOOL_RESULT message.
Example usage:
tool_actor = ToolActor("tools")
tool_actor.register_tool("web_search", do_web_search)
tool_actor.register_tool("read_file", do_read_file)
tool_actor.start()
"""
def __init__(self, actor_id: str):
super().__init__(actor_id)
# Maps tool_name -> callable
self._tools: Dict[str, Callable] = {}
# Register the TOOL_CALL handler in the dispatch table
self._register_handler(
MessageType.TOOL_CALL, self._handle_tool_call
)
def register_tool(self, name: str, fn: Callable) -> None:
"""
Registers a tool function under the given name.
The function must accept only keyword arguments that match
the 'arguments' dict in the ToolCallPayload.
"""
self._tools[name] = fn
logger.info(
"ToolActor %s registered tool: %s", self.actor_id, name
)
def on_message(self, message: Message) -> None:
"""Catch-all for unrecognised message types."""
logger.warning(
"ToolActor %s received unhandled message type: %s",
self.actor_id, message.msg_type
)
def _handle_tool_call(self, message: Message) -> None:
"""
Looks up the requested tool and calls it with the provided
arguments. Sends a TOOL_RESULT back to the caller regardless
of whether the tool succeeded or raised an exception.
"""
payload: ToolCallPayload = message.payload
tool_name = payload.tool_name
arguments = payload.arguments
logger.info(
"ToolActor %s executing tool '%s' for %s",
self.actor_id, tool_name, message.sender_id
)
result = None
error = None
if tool_name not in self._tools:
error = f"Unknown tool: '{tool_name}'"
else:
try:
result = self._tools[tool_name](**arguments)
except Exception as exc:
error = f"{type(exc).__name__}: {exc}"
logger.error(
"ToolActor %s: tool '%s' raised: %s",
self.actor_id, tool_name, exc
)
result_payload = ToolResultPayload(
call_id = payload.call_id,
result = result,
error = error
)
self._reply(message, MessageType.TOOL_RESULT, result_payload)
# ------------------------------------------------------------------
# Example tool implementations
# These are plain functions; they know nothing about actors.
# ------------------------------------------------------------------
def tool_web_search(query: str, max_results: int = 5) -> list:
"""
Performs a web search and returns a list of result snippets.
In production, replace this stub with a real search API call
(e.g., Brave Search, SerpAPI, or DuckDuckGo).
"""
# Stub implementation for illustration
logger.info("web_search: query='%s', max_results=%d",
query, max_results)
return [
{"title": f"Result {i} for '{query}'",
"url": f"https://example.com/{i}",
"snippet": f"Snippet {i} about {query}"}
for i in range(1, max_results + 1)
]
def tool_read_file(path: str) -> str:
"""Reads a text file and returns its contents as a string."""
with open(path, "r", encoding="utf-8") as fh:
return fh.read()
def tool_write_file(path: str, content: str) -> str:
"""Writes content to a text file. Returns a confirmation."""
with open(path, "w", encoding="utf-8") as fh:
fh.write(content)
return f"Successfully wrote {len(content)} characters to {path}"
def tool_run_python(code: str) -> str:
"""
Executes a Python code snippet and returns stdout as a string.
WARNING: In production, this MUST run in a sandboxed environment
(e.g., a Docker container or a restricted subprocess) to prevent
arbitrary code execution vulnerabilities.
"""
import io
import sys
buffer = io.StringIO()
old_stdout = sys.stdout
sys.stdout = buffer
try:
exec(code, {}) # noqa: S102
finally:
sys.stdout = old_stdout
return buffer.getvalue()
5.2 The Memory Actor
Memory is what separates a stateful, context-aware agent from a stateless question-answering system. In our Actor-based system, memory is managed by a dedicated Memory Actor. This actor maintains the agent's conversation history, stores facts learned during task execution, and provides retrieval services to other actors. By isolating memory in its own actor, we ensure that memory access is serialised (no concurrent writes can corrupt the memory store) and that the memory implementation can be swapped out independently of the agents that use it.
# memory_actor.py
#
# The Memory Actor maintains conversation history and a fact store
# for agent actors. It uses a simple in-memory store by default,
# but the storage backend can be replaced with Redis, SQLite, or
# any other persistence layer without changing the actor's interface.
import logging
from collections import defaultdict
from typing import List, Dict, Any, Optional
from protocol import Message, MessageType
from actor_base import BaseActor
logger = logging.getLogger(__name__)
class MemoryActor(BaseActor):
"""
Manages conversation history and a key-value fact store.
Receives:
MEMORY_STORE : Stores a message or fact
MEMORY_FETCH : Retrieves stored messages or facts
PING : Health check (handled by BaseActor)
SHUTDOWN : Graceful shutdown (handled by BaseActor)
Sends:
MEMORY_DATA : Response to MEMORY_FETCH containing results
"""
def __init__(self, actor_id: str):
super().__init__(actor_id)
# Conversation history per session_id.
# Each entry is an OpenAI-style message dict.
self._history: Dict[str, List[Dict]] = defaultdict(list)
# Key-value fact store per session_id.
self._facts: Dict[str, Dict[str, Any]] = defaultdict(dict)
self._register_handler(
MessageType.MEMORY_STORE, self._handle_store
)
self._register_handler(
MessageType.MEMORY_FETCH, self._handle_fetch
)
def on_message(self, message: Message) -> None:
logger.warning(
"MemoryActor %s received unhandled message: %s",
self.actor_id, message.msg_type
)
def _handle_store(self, message: Message) -> None:
"""
Stores a conversation message or a named fact.
Expected payload fields:
session_id : str - identifies the conversation/session
store_type : str - "message" or "fact"
data : dict - the message dict or {key: value} fact
"""
payload = message.payload
session_id = payload.get("session_id", "default")
store_type = payload.get("store_type", "message")
data = payload.get("data", {})
if store_type == "message":
self._history[session_id].append(data)
logger.debug(
"MemoryActor: stored message for session %s "
"(history length: %d)",
session_id, len(self._history[session_id])
)
elif store_type == "fact":
self._facts[session_id].update(data)
logger.debug(
"MemoryActor: stored %d fact(s) for session %s",
len(data), session_id
)
def _handle_fetch(self, message: Message) -> None:
"""
Retrieves conversation history or facts for a session.
Expected payload fields:
session_id : str - the session to fetch from
fetch_type : str - "history", "facts", or "all"
last_n : Optional[int] - only return last N messages
"""
payload = message.payload
session_id = payload.get("session_id", "default")
fetch_type = payload.get("fetch_type", "history")
last_n = payload.get("last_n", None)
result = {}
if fetch_type in ("history", "all"):
history = self._history[session_id]
if last_n is not None:
history = history[-last_n:]
result["history"] = history
if fetch_type in ("facts", "all"):
result["facts"] = dict(self._facts[session_id])
self._reply(message, MessageType.MEMORY_DATA, result)
CHAPTER SIX: THE AGENT ACTOR - BRINGING IT ALL TOGETHER
6.1 The Agent Actor's Role
The Agent Actor is the centrepiece of the system. It is the actor that embodies the reasoning loop of an AI agent: it receives a task, consults its memory for context, queries the LLM for a plan, executes tools as directed by the LLM, stores results in memory, and iterates until the task is complete or a resource limit is reached.
What makes the Agent Actor interesting is that it must coordinate with multiple other actors: the LLM actor for reasoning, the tool actor for action execution, and the memory actor for context. All of this coordination happens through message passing. The Agent Actor sends a message and then waits for a reply, but it does not block its thread while waiting. Instead, it uses a state machine pattern: each state corresponds to what the agent is waiting for, and incoming messages drive transitions between states.
The diagram below shows the state machine that governs the Agent Actor's reasoning loop.
+-------------------------------------------------------------------+
| AGENT ACTOR STATE MACHINE |
| |
| [TASK received] |
| | |
| v |
| +------------+ MEMORY_DATA +------------------+ |
| | FETCHING | -----------------> | THINKING | |
| | MEMORY | | (waiting for LLM)| |
| +------------+ +--------+---------+ |
| | |
| LLM_REPLY | |
| v |
| +--------------+-----------+ |
| | PARSE LLM RESPONSE | |
| | - tool call? -> CALLING | |
| | - final answer? -> DONE | |
| | - error? -> ERROR | |
| +-------+----------+-------+ |
| | | |
| tool call | | final answer |
| v v |
| +-----------+ +--------+ |
| | CALLING | | DONE | |
| | TOOL | | report | |
| | (waiting) | | result | |
| +-----+-----+ +--------+ |
| | |
| TOOL_RESULT| |
| v |
| +-----------+ |
| | APPEND | |
| | tool result |
| | to context| |
| +-----+-----+ |
| | |
| | (loop back to THINKING) |
| +----> [send LLM_QUERY again] |
+-------------------------------------------------------------------+
Figure 4: The Agent Actor's internal state machine. The agent moves through states as it receives replies from the LLM actor, tool actors, and memory actor. At each state, the actor is waiting for a specific message type. This non-blocking design allows the actor's thread to be used efficiently.
# agent_actor.py
#
# The Agent Actor implements a full ReAct-style reasoning loop
# (Reason + Act) using the Actor message-passing infrastructure.
# It coordinates with LLM, Tool, and Memory actors to complete tasks.
#
# The agent uses a simple JSON-based tool-calling protocol:
# the LLM is instructed to respond with either a JSON tool call
# or a plain text final answer, and the agent parses the response
# to decide what to do next.
import json
import logging
from enum import Enum, auto
from typing import List, Dict, Optional, Any
from protocol import (
Message, MessageType,
TaskPayload, LLMQueryPayload, LLMReplyPayload,
ToolCallPayload, ToolResultPayload
)
from actor_base import BaseActor
logger = logging.getLogger(__name__)
class AgentState(Enum):
"""
Enumerates the states of the agent's internal state machine.
The agent is always in exactly one of these states.
"""
IDLE = auto() # Waiting for a TASK message
FETCHING_MEM = auto() # Waiting for MEMORY_DATA
THINKING = auto() # Waiting for LLM_REPLY
CALLING_TOOL = auto() # Waiting for TOOL_RESULT
DONE = auto() # Task complete; sending result
# System prompt that instructs the LLM how to use tools.
# The LLM must respond with a JSON object to call a tool, or
# with plain text to provide a final answer.
SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools.
To use a tool, respond with ONLY a JSON object in this exact format: { "tool": "tool_name", "arguments": {"arg1": "value1", "arg2": "value2"} }
To provide a final answer, respond with plain text (not JSON).
Available tools: {tool_descriptions}
Think step by step. Use tools when you need external information. When you have enough information, provide your final answer as plain text. """
class AgentActor(BaseActor):
"""
A ReAct-style agent that coordinates LLM reasoning with tool use.
The agent maintains a conversation context (list of messages) and
iterates through a Reason-Act loop until it produces a final answer
or exhausts its step budget.
Receives : TASK messages from an orchestrator or user interface
Sends : RESPONSE messages with the final answer
LLM_QUERY to the LLM actor
TOOL_CALL to the tool actor
MEMORY_STORE / MEMORY_FETCH to the memory actor
"""
def __init__(
self,
actor_id: str,
llm_actor_id: str,
tool_actor_id: str,
memory_actor_id: str,
available_tools: List[Dict],
model: str = "",
temperature: float = 0.7,
max_tokens: int = 2048
):
super().__init__(actor_id)
# IDs of peer actors this agent communicates with
self._llm_id = llm_actor_id
self._tool_id = tool_actor_id
self._memory_id = memory_actor_id
# LLM configuration
self._model = model
self._temperature = temperature
self._max_tokens = max_tokens
# Tool descriptions injected into the system prompt
self._available_tools = available_tools
tool_desc = "\n".join(
f"- {t['name']}: {t['description']}"
for t in available_tools
)
self._system_prompt = SYSTEM_PROMPT.format(
tool_descriptions=tool_desc
)
# --- Per-task state (reset for each new task) ---
self._state: AgentState = AgentState.IDLE
self._task_msg: Optional[Message] = None
self._context: List[Dict] = []
self._steps_taken: int = 0
self._max_steps: int = 10
self._session_id: str = ""
# Pending tool call ID (to match TOOL_RESULT to the call)
self._pending_tool_call_id: Optional[str] = None
# Register message handlers
self._register_handler(MessageType.TASK, self._on_task)
self._register_handler(MessageType.MEMORY_DATA, self._on_memory)
self._register_handler(MessageType.LLM_REPLY, self._on_llm_reply)
self._register_handler(MessageType.TOOL_RESULT, self._on_tool_result)
self._register_handler(MessageType.ERROR, self._on_error)
def on_message(self, message: Message) -> None:
logger.warning(
"AgentActor %s: unhandled message type %s in state %s",
self.actor_id, message.msg_type, self._state
)
# ------------------------------------------------------------------
# State machine handlers
# ------------------------------------------------------------------
def _on_task(self, message: Message) -> None:
"""
Entry point: receives a new task and begins the reasoning loop.
Resets all per-task state and fetches memory context first.
"""
if self._state != AgentState.IDLE:
logger.warning(
"AgentActor %s received TASK while in state %s; "
"ignoring.",
self.actor_id, self._state
)
return
payload: TaskPayload = message.payload
logger.info(
"AgentActor %s starting task: %s",
self.actor_id, payload.task_description
)
# Reset per-task state
self._task_msg = message
self._steps_taken = 0
self._max_steps = payload.max_steps
self._session_id = message.msg_id # Use task msg_id as session
self._context = []
# Fetch recent memory context before starting
self._state = AgentState.FETCHING_MEM
self._send_to(
self._memory_id,
Message(
msg_type = MessageType.MEMORY_FETCH,
sender_id = self.actor_id,
receiver_id = self._memory_id,
payload = {
"session_id": self._session_id,
"fetch_type": "history",
"last_n": 10
}
)
)
def _on_memory(self, message: Message) -> None:
"""
Receives memory context and initiates the first LLM call.
Transitions from FETCHING_MEM to THINKING.
"""
if self._state != AgentState.FETCHING_MEM:
return
history = message.payload.get("history", [])
task_desc = self._task_msg.payload.task_description
# Build the initial context: system prompt + history + task
self._context = [
{"role": "system", "content": self._system_prompt}
]
self._context.extend(history)
self._context.append(
{"role": "user", "content": task_desc}
)
self._state = AgentState.THINKING
self._query_llm()
def _on_llm_reply(self, message: Message) -> None:
"""
Receives the LLM's response and decides what to do next.
If the response is a JSON tool call, transitions to CALLING_TOOL.
If it is plain text, the task is complete.
"""
if self._state != AgentState.THINKING:
return
payload: LLMReplyPayload = message.payload
content = payload.content.strip()
self._steps_taken += 1
logger.info(
"AgentActor %s step %d/%d LLM response: %.100s...",
self.actor_id, self._steps_taken, self._max_steps, content
)
# Append the assistant's response to the context
self._context.append(
{"role": "assistant", "content": content}
)
# Try to parse as a tool call
tool_call = self._try_parse_tool_call(content)
if tool_call and self._steps_taken < self._max_steps:
# The LLM wants to call a tool
self._state = AgentState.CALLING_TOOL
self._execute_tool_call(tool_call)
else:
# Plain text response = final answer
self._finish_task(content)
def _on_tool_result(self, message: Message) -> None:
"""
Receives the result of a tool call and appends it to the
context, then loops back to query the LLM again.
"""
if self._state != AgentState.CALLING_TOOL:
return
payload: ToolResultPayload = message.payload
# Verify this result matches our pending call
if payload.call_id != self._pending_tool_call_id:
logger.warning(
"AgentActor %s received TOOL_RESULT with unexpected "
"call_id %s (expected %s)",
self.actor_id, payload.call_id,
self._pending_tool_call_id
)
return
# Format the tool result as a user message for the LLM
if payload.error:
result_text = f"Tool error: {payload.error}"
else:
result_text = (
f"Tool result: {json.dumps(payload.result, indent=2)}"
)
self._context.append(
{"role": "user", "content": result_text}
)
# Loop back to the LLM with the updated context
self._state = AgentState.THINKING
self._query_llm()
def _on_error(self, message: Message) -> None:
"""Handles ERROR messages from peer actors."""
error_info = message.payload.get("error", "Unknown error")
logger.error(
"AgentActor %s received ERROR from %s: %s",
self.actor_id, message.sender_id, error_info
)
self._finish_task(
f"I encountered an error and could not complete the task: "
f"{error_info}"
)
# ------------------------------------------------------------------
# Helper methods
# ------------------------------------------------------------------
def _query_llm(self) -> None:
"""Sends the current context to the LLM actor for processing."""
query_payload = LLMQueryPayload(
messages = self._context,
model = self._model,
temperature = self._temperature,
max_tokens = self._max_tokens
)
self._send_to(
self._llm_id,
Message(
msg_type = MessageType.LLM_QUERY,
sender_id = self.actor_id,
receiver_id = self._llm_id,
payload = query_payload
)
)
def _try_parse_tool_call(
self, content: str
) -> Optional[Dict]:
"""
Attempts to parse the LLM's response as a JSON tool call.
Returns the parsed dict if successful, None otherwise.
"""
try:
# Strip markdown code fences if present
cleaned = content.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")
cleaned = "\n".join(lines[1:-1])
data = json.loads(cleaned)
if "tool" in data and "arguments" in data:
return data
except (json.JSONDecodeError, ValueError):
pass
return None
def _execute_tool_call(self, tool_call: Dict) -> None:
"""Sends a TOOL_CALL message to the tool actor."""
call_payload = ToolCallPayload(
tool_name = tool_call["tool"],
arguments = tool_call["arguments"]
)
self._pending_tool_call_id = call_payload.call_id
self._send_to(
self._tool_id,
Message(
msg_type = MessageType.TOOL_CALL,
sender_id = self.actor_id,
receiver_id = self._tool_id,
payload = call_payload
)
)
def _finish_task(self, answer: str) -> None:
"""
Stores the conversation in memory and sends the final
answer back to whoever sent the original TASK message.
"""
# Persist the conversation to memory
self._send_to(
self._memory_id,
Message(
msg_type = MessageType.MEMORY_STORE,
sender_id = self.actor_id,
receiver_id = self._memory_id,
payload = {
"session_id": self._session_id,
"store_type": "message",
"data": {
"role": "assistant",
"content": answer
}
}
)
)
# Send the final answer to the original requester
self._reply(
self._task_msg,
MessageType.RESPONSE,
{"answer": answer, "steps_taken": self._steps_taken}
)
# Return to idle state, ready for the next task
self._state = AgentState.IDLE
logger.info(
"AgentActor %s completed task in %d steps.",
self.actor_id, self._steps_taken
)
CHAPTER SEVEN: MULTI-AGENT SYSTEMS AND THE ORCHESTRATOR ACTOR
7.1 From Single Agent to Multi-Agent Systems
A single agent, no matter how capable, has limitations. It can only work on one task at a time. It has a single context window, which limits how much information it can hold simultaneously. It has a single set of tools and a single reasoning style. Multi-agent systems overcome these limitations by distributing work across multiple specialised agents that collaborate.
The Actor Model is uniquely well-suited to multi-agent systems because it naturally models the kind of autonomous, message-driven collaboration that multi-agent systems require. Each agent is an actor. Agents collaborate by sending messages. An orchestrator actor coordinates the work, assigning tasks to agents and collecting their results. The whole system is concurrent: all agents work in parallel, each processing its own inbox.
The diagram below shows a typical multi-agent architecture built on the Actor pattern, with an orchestrator coordinating several specialised agents.
+=====================================================================+
| MULTI-AGENT ACTOR SYSTEM |
| |
| User / External System |
| | |
| | TASK |
| v |
| +------------------+ |
| | ORCHESTRATOR | |
| | ACTOR | |
| | Decomposes task | |
| | Routes subtasks | |
| | Aggregates | |
| | results | |
| +--+--------+--+---+ |
| | | | |
| TASK| TASK| |TASK |
| v v v |
| +--------+ +--------+ +--------+ |
| | AGENT | | AGENT | | AGENT | |
| | Researcher| Writer | | Coder | |
| | (Actor)| | (Actor)| | (Actor)| |
| +---+----+ +---+----+ +---+----+ |
| | | | |
| | | | (all share the same peer actors) |
| v v v |
| +----------+ +----------+ +----------+ |
| | LLM | | TOOL | | MEMORY | |
| | ACTOR | | ACTOR | | ACTOR | |
| | (shared) | | (shared) | | (shared) | |
| +----------+ +----------+ +----------+ |
+=====================================================================+
Figure 5: A multi-agent system where an Orchestrator Actor decomposes a complex task and routes subtasks to specialised Agent Actors. All agents share the same LLM, Tool, and Memory actors, which serve requests concurrently from their own inboxes.
7.2 The Orchestrator Actor
The Orchestrator Actor is responsible for decomposing complex tasks into subtasks, assigning those subtasks to the appropriate agent actors, collecting the results, and synthesising a final answer. It is itself an actor, which means it participates in the same message-passing system as every other component. It receives TASK messages from the outside world and sends RESPONSE messages back when the work is complete.
# orchestrator_actor.py
#
# The Orchestrator Actor coordinates multiple Agent Actors to solve
# complex tasks that benefit from parallelism or specialisation.
# It decomposes tasks, routes subtasks, and aggregates results.
import uuid
import logging
from typing import Dict, List, Optional, Any
from protocol import Message, MessageType, TaskPayload
from actor_base import BaseActor
logger = logging.getLogger(__name__)
class SubTask:
"""
Represents a unit of work assigned to a specific agent actor.
Tracks the assignment and its completion status.
"""
def __init__(
self,
subtask_id: str,
description: str,
agent_id: str,
parent_msg_id: str
):
self.subtask_id = subtask_id
self.description = description
self.agent_id = agent_id
self.parent_msg_id = parent_msg_id
self.result: Optional[Any] = None
self.completed: bool = False
self.error: Optional[str] = None
class OrchestratorActor(BaseActor):
"""
Coordinates multiple Agent Actors to complete complex tasks.
The orchestrator uses a simple decomposition strategy: it asks
its own LLM to break the task into subtasks, assigns each
subtask to an available agent, waits for all results, and then
asks the LLM to synthesise a final answer.
Receives : TASK - a complex task from the outside world
RESPONSE - a subtask result from an agent actor
Sends : TASK - subtasks to agent actors
RESPONSE - the final synthesised answer
"""
def __init__(
self,
actor_id: str,
agent_ids: List[str],
llm_actor_id: str
):
super().__init__(actor_id)
self._agent_ids = agent_ids
self._llm_id = llm_actor_id
# Maps correlation_id -> SubTask for tracking in-flight work
self._pending: Dict[str, SubTask] = {}
# Maps parent task msg_id -> list of SubTasks
self._task_groups: Dict[str, List[SubTask]] = {}
# Maps parent task msg_id -> original TASK message
self._original_tasks: Dict[str, Message] = {}
self._register_handler(MessageType.TASK, self._on_task)
self._register_handler(MessageType.RESPONSE, self._on_response)
self._register_handler(MessageType.LLM_REPLY,self._on_llm_reply)
def on_message(self, message: Message) -> None:
logger.warning(
"OrchestratorActor %s: unhandled message %s",
self.actor_id, message.msg_type
)
def _on_task(self, message: Message) -> None:
"""
Receives a complex task and decomposes it into subtasks.
For simplicity, this implementation uses a fixed round-robin
decomposition. A production system would use the LLM to
generate the decomposition dynamically.
"""
payload: TaskPayload = message.payload
task_id = message.msg_id
logger.info(
"OrchestratorActor %s received task: %s",
self.actor_id, payload.task_description
)
self._original_tasks[task_id] = message
# Simple decomposition: split by sentences as a demonstration.
# In production, query the LLM to produce a proper plan.
sentences = [
s.strip()
for s in payload.task_description.split(".")
if s.strip()
]
if not sentences:
sentences = [payload.task_description]
subtasks = []
for i, sentence in enumerate(sentences):
agent_id = self._agent_ids[i % len(self._agent_ids)]
subtask = SubTask(
subtask_id = str(uuid.uuid4()),
description = sentence,
agent_id = agent_id,
parent_msg_id = task_id
)
subtasks.append(subtask)
# Send the subtask to the assigned agent
task_msg = Message(
msg_type = MessageType.TASK,
sender_id = self.actor_id,
receiver_id = agent_id,
payload = TaskPayload(
task_description = sentence,
max_steps = payload.max_steps
)
)
# Track this subtask by the message ID we are sending
self._pending[task_msg.msg_id] = subtask
self._send_to(agent_id, task_msg)
self._task_groups[task_id] = subtasks
logger.info(
"OrchestratorActor %s dispatched %d subtasks for task %s",
self.actor_id, len(subtasks), task_id
)
def _on_response(self, message: Message) -> None:
"""
Receives a completed subtask result from an agent actor.
When all subtasks for a parent task are complete, synthesises
the final answer and sends it to the original requester.
"""
# Find the subtask this response belongs to
subtask = self._pending.get(message.correlation_id)
if not subtask:
logger.warning(
"OrchestratorActor: received RESPONSE with unknown "
"correlation_id %s", message.correlation_id
)
return
subtask.result = message.payload.get("answer", "")
subtask.completed = True
del self._pending[message.correlation_id]
logger.info(
"OrchestratorActor: subtask '%s' completed by %s",
subtask.description[:50], message.sender_id
)
# Check if all subtasks for the parent task are done
parent_id = subtask.parent_msg_id
group = self._task_groups.get(parent_id, [])
if all(st.completed for st in group):
self._synthesise_results(parent_id, group)
def _synthesise_results(
self, task_id: str, subtasks: List[SubTask]
) -> None:
"""
Combines all subtask results into a coherent final answer.
Sends the synthesis request to the LLM actor.
"""
combined = "\n\n".join(
f"Subtask: {st.description}\nResult: {st.result}"
for st in subtasks
)
synthesis_prompt = (
f"The following subtask results have been collected. "
f"Please synthesise them into a single coherent answer:\n\n"
f"{combined}"
)
from protocol import LLMQueryPayload
query = Message(
msg_type = MessageType.LLM_QUERY,
sender_id = self.actor_id,
receiver_id = self._llm_id,
payload = LLMQueryPayload(
messages = [
{"role": "user", "content": synthesis_prompt}
]
),
correlation_id = task_id # Carry task_id for matching
)
self._send_to(self._llm_id, query)
def _on_llm_reply(self, message: Message) -> None:
"""
Receives the synthesised answer from the LLM and sends it
back to the original task requester.
"""
task_id = message.correlation_id
original_msg = self._original_tasks.get(task_id)
if not original_msg:
return
self._reply(
original_msg,
MessageType.RESPONSE,
{"answer": message.payload.content}
)
# Clean up task tracking state
del self._original_tasks[task_id]
del self._task_groups[task_id]
logger.info(
"OrchestratorActor: task %s fully completed.", task_id
)
def on_message(self, message: Message) -> None:
logger.warning(
"OrchestratorActor %s: unhandled message type %s",
self.actor_id, message.msg_type
)
CHAPTER EIGHT: TRANSACTIONS WITH COMMIT, ROLLBACK, AND COMPENSATION
8.1 Why Transactions Matter in Agentic Systems
When an agent executes a sequence of actions, those actions often have side effects in the real world: files are written, database records are updated, emails are sent, APIs are called. If the agent fails partway through a sequence, the system may be left in an inconsistent state. Some actions have been performed, others have not. This is the classic problem of distributed transactions, and it is just as relevant in agentic AI systems as it is in traditional enterprise software.
The Actor Model gives us a natural framework for implementing transactions. Because all communication is via messages, we can implement the classic Two-Phase Commit (2PC) protocol entirely in terms of message exchanges. We can also implement the Saga pattern, which uses compensating transactions to undo the effects of completed steps when a later step fails.
The diagram below illustrates both approaches.
TWO-PHASE COMMIT (2PC)
----------------------
Coordinator Participant A Participant B
| | |
|--- TXN_PREPARE --->| |
|--- TXN_PREPARE ----------------------->|
| | |
|<-- TXN_ACK(ok) ----| |
|<-- TXN_ACK(ok) ------------------------|
| | |
|--- TXN_COMMIT ---->| |
|--- TXN_COMMIT ------------------------>|
| | |
|<-- TXN_ACK(done) --| |
|<-- TXN_ACK(done) ----------------------|
| | |
[Transaction committed successfully]
SAGA PATTERN (Compensating Transactions)
----------------------------------------
Orchestrator Step A Actor Step B Actor Step C Actor
| | | |
|--- TASK(A) --->| | |
|<-- RESPONSE(ok)| | |
|--- TASK(B) -----------------> | |
|<-- RESPONSE(ok)----------------| |
|--- TASK(C) ----------------------------------> |
|<-- ERROR(fail) --------------------------------|
| | | |
| [C failed: begin compensation] |
| | | |
|--- COMPENSATE(B) ------------>| |
|<-- RESPONSE(compensated) -----| |
|--- COMPENSATE(A) ->| | |
|<-- RESPONSE(compensated) | |
| | | |
[Saga rolled back via compensation]
Figure 6: Two transaction strategies available in an Actor-based system. Two-Phase Commit provides strong atomicity but requires all participants to be available simultaneously. The Saga pattern provides eventual consistency and is more resilient to partial failures.
8.2 The Transaction Manager Actor
The Transaction Manager Actor implements both the 2PC coordinator role and the Saga orchestrator role. It tracks the state of in-flight transactions and drives them to completion or rollback.
# transaction_actor.py
#
# Implements distributed transaction coordination using the Actor
# pattern. Supports both Two-Phase Commit (2PC) and Saga (compensating
# transactions) patterns.
#
# The TransactionManagerActor coordinates participants (other actors)
# through the phases of a transaction, handling failures gracefully
# by initiating rollback or compensation as needed.
import uuid
import logging
from enum import Enum, auto
from typing import Dict, List, Optional, Callable, Any
from protocol import Message, MessageType, TransactionPayload
from actor_base import BaseActor
logger = logging.getLogger(__name__)
class TxnState(Enum):
"""Lifecycle states of a managed transaction."""
ACTIVE = auto() # Transaction is open, steps executing
PREPARING = auto() # 2PC Phase 1: waiting for PREPARE votes
COMMITTING = auto() # 2PC Phase 2: commit messages sent
ROLLING_BACK= auto() # Rollback or compensation in progress
COMMITTED = auto() # Successfully committed
ABORTED = auto() # Rolled back or compensated
class SagaStep:
"""
Represents one step in a Saga transaction.
Each step has a forward action and a compensating action.
"""
def __init__(
self,
step_id: str,
actor_id: str,
action_payload: Any,
compensate_payload: Any
):
self.step_id = step_id
self.actor_id = actor_id
self.action_payload = action_payload
self.compensate_payload = compensate_payload
self.completed: bool = False
self.compensated: bool = False
self.result: Optional[Any] = None
class Transaction:
"""
Tracks the full state of one distributed transaction.
Holds the list of participants (for 2PC) or steps (for Saga),
the current phase, and the votes received so far.
"""
def __init__(self, txn_id: str, txn_type: str):
self.txn_id = txn_id
self.txn_type = txn_type # "2pc" or "saga"
self.state = TxnState.ACTIVE
self.participants: List[str] = []
self.votes: Dict[str,str] = {} # actor_id -> "ok"/"fail"
self.saga_steps: List[SagaStep] = []
self.current_step: int = 0
self.requester_id: str = ""
self.requester_msg_id: str = ""
class TransactionManagerActor(BaseActor):
"""
Coordinates distributed transactions across multiple actor
participants. Supports Two-Phase Commit and Saga patterns.
Receives:
TXN_BEGIN - opens a new transaction
TXN_ACK - a participant's vote or acknowledgement
RESPONSE - a saga step's completion result
ERROR - a saga step's failure
Sends:
TXN_PREPARE - Phase 1 of 2PC to all participants
TXN_COMMIT - Phase 2 commit to all participants
TXN_ROLLBACK - Rollback directive to all participants
TASK - Forward action for a saga step
(compensation messages for saga rollback)
"""
def __init__(self, actor_id: str):
super().__init__(actor_id)
self._transactions: Dict[str, Transaction] = {}
self._register_handler(MessageType.TXN_BEGIN, self._on_begin)
self._register_handler(MessageType.TXN_ACK, self._on_ack)
self._register_handler(MessageType.RESPONSE, self._on_response)
self._register_handler(MessageType.ERROR, self._on_error)
def on_message(self, message: Message) -> None:
logger.warning(
"TxnManager %s: unhandled message %s",
self.actor_id, message.msg_type
)
# ------------------------------------------------------------------
# Two-Phase Commit
# ------------------------------------------------------------------
def _on_begin(self, message: Message) -> None:
"""
Opens a new transaction. The payload specifies the type
("2pc" or "saga") and the participants or saga steps.
"""
payload = message.payload
txn_id = payload.txn_id
txn = Transaction(txn_id, payload.data.get("type", "2pc"))
txn.requester_id = message.sender_id
txn.requester_msg_id = message.msg_id
txn.participants = payload.participants
self._transactions[txn_id] = txn
if txn.txn_type == "2pc":
self._begin_2pc(txn)
elif txn.txn_type == "saga":
steps_data = payload.data.get("steps", [])
txn.saga_steps = [
SagaStep(
step_id = s["step_id"],
actor_id = s["actor_id"],
action_payload = s["action"],
compensate_payload = s["compensate"]
)
for s in steps_data
]
self._execute_saga_step(txn)
def _begin_2pc(self, txn: Transaction) -> None:
"""Sends TXN_PREPARE to all participants (Phase 1)."""
txn.state = TxnState.PREPARING
for actor_id in txn.participants:
self._send_to(
actor_id,
Message(
msg_type = MessageType.TXN_PREPARE,
sender_id = self.actor_id,
receiver_id = actor_id,
payload = TransactionPayload(
txn_id = txn.txn_id
)
)
)
logger.info(
"TxnManager: 2PC PREPARE sent to %d participants "
"for txn %s",
len(txn.participants), txn.txn_id
)
def _on_ack(self, message: Message) -> None:
"""
Receives a vote or acknowledgement from a participant.
In Phase 1 (PREPARING): collects votes and decides commit/abort.
In Phase 2 (COMMITTING/ROLLING_BACK): counts completions.
"""
payload = message.payload
txn_id = payload.txn_id
txn = self._transactions.get(txn_id)
if not txn:
return
vote = payload.data.get("vote", "ok")
txn.votes[message.sender_id] = vote
if txn.state == TxnState.PREPARING:
all_voted = (
len(txn.votes) == len(txn.participants)
)
if all_voted:
if all(v == "ok" for v in txn.votes.values()):
self._commit_2pc(txn)
else:
self._rollback_2pc(txn)
elif txn.state in (TxnState.COMMITTING, TxnState.ROLLING_BACK):
all_done = len(txn.votes) == len(txn.participants)
if all_done:
final_state = (
TxnState.COMMITTED
if txn.state == TxnState.COMMITTING
else TxnState.ABORTED
)
txn.state = final_state
self._notify_requester(txn)
def _commit_2pc(self, txn: Transaction) -> None:
"""Sends TXN_COMMIT to all participants (Phase 2)."""
txn.state = TxnState.COMMITTING
txn.votes = {} # Reset votes to count Phase 2 acks
for actor_id in txn.participants:
self._send_to(
actor_id,
Message(
msg_type = MessageType.TXN_COMMIT,
sender_id = self.actor_id,
receiver_id = actor_id,
payload = TransactionPayload(
txn_id = txn.txn_id
)
)
)
logger.info(
"TxnManager: 2PC COMMIT sent for txn %s", txn.txn_id
)
def _rollback_2pc(self, txn: Transaction) -> None:
"""Sends TXN_ROLLBACK to all participants."""
txn.state = TxnState.ROLLING_BACK
txn.votes = {}
for actor_id in txn.participants:
self._send_to(
actor_id,
Message(
msg_type = MessageType.TXN_ROLLBACK,
sender_id = self.actor_id,
receiver_id = actor_id,
payload = TransactionPayload(
txn_id = txn.txn_id,
reason = "One or more participants voted NO"
)
)
)
logger.info(
"TxnManager: 2PC ROLLBACK sent for txn %s", txn.txn_id
)
# ------------------------------------------------------------------
# Saga Pattern
# ------------------------------------------------------------------
def _execute_saga_step(self, txn: Transaction) -> None:
"""Executes the next pending saga step."""
if txn.current_step >= len(txn.saga_steps):
# All steps completed successfully
txn.state = TxnState.COMMITTED
self._notify_requester(txn)
return
step = txn.saga_steps[txn.current_step]
logger.info(
"TxnManager: executing saga step %d (%s) for txn %s",
txn.current_step, step.step_id, txn.txn_id
)
self._send_to(
step.actor_id,
Message(
msg_type = MessageType.TASK,
sender_id = self.actor_id,
receiver_id = step.actor_id,
payload = step.action_payload,
correlation_id = txn.txn_id
)
)
def _on_response(self, message: Message) -> None:
"""A saga step completed successfully; advance to next step."""
txn_id = message.correlation_id
txn = self._transactions.get(txn_id)
if not txn or txn.txn_type != "saga":
return
step = txn.saga_steps[txn.current_step]
step.completed = True
step.result = message.payload
txn.current_step += 1
self._execute_saga_step(txn)
def _on_error(self, message: Message) -> None:
"""A saga step failed; begin compensating completed steps."""
txn_id = message.correlation_id
txn = self._transactions.get(txn_id)
if not txn or txn.txn_type != "saga":
return
logger.warning(
"TxnManager: saga step %d failed for txn %s; "
"beginning compensation.",
txn.current_step, txn_id
)
txn.state = TxnState.ROLLING_BACK
self._compensate_saga(txn)
def _compensate_saga(self, txn: Transaction) -> None:
"""
Sends compensation messages to all completed saga steps
in reverse order. This is the 'rollback' of the Saga pattern.
Each compensation undoes the effect of its corresponding step.
"""
completed_steps = [
s for s in txn.saga_steps if s.completed
]
for step in reversed(completed_steps):
logger.info(
"TxnManager: compensating step %s for txn %s",
step.step_id, txn.txn_id
)
self._send_to(
step.actor_id,
Message(
msg_type = MessageType.TASK,
sender_id = self.actor_id,
receiver_id = step.actor_id,
payload = step.compensate_payload,
correlation_id = txn.txn_id
)
)
txn.state = TxnState.ABORTED
self._notify_requester(txn)
def _notify_requester(self, txn: Transaction) -> None:
"""Informs the original requester of the transaction outcome."""
outcome = (
"committed" if txn.state == TxnState.COMMITTED
else "aborted"
)
self._send_to(
txn.requester_id,
Message(
msg_type = MessageType.RESPONSE,
sender_id = self.actor_id,
receiver_id = txn.requester_id,
payload = {
"txn_id": txn.txn_id,
"outcome": outcome,
"state": txn.state.name
},
correlation_id = txn.requester_msg_id
)
)
logger.info(
"TxnManager: txn %s %s.", txn.txn_id, outcome
)
CHAPTER NINE: WIRING IT ALL TOGETHER
9.1 The Actor Registry and System Bootstrap
With all the individual actor types defined, we need a way to wire them together into a running system. The Actor Registry is a lightweight directory that maps actor IDs to actor instances and provides a convenient place to manage the lifecycle of the entire system. It is not itself an actor; it is the infrastructure that creates and connects actors.
# actor_system.py
#
# The ActorSystem is the top-level container that creates, registers,
# and connects all actors in the application. It provides a clean
# bootstrap and shutdown sequence for the entire system.
import logging
import time
from typing import Dict, Optional
from actor_base import BaseActor
logger = logging.getLogger(__name__)
class ActorSystem:
"""
Manages the lifecycle of all actors in the application.
Responsibilities:
- Registering actors by ID
- Connecting actors to their peers (so they can send messages)
- Starting all actors in dependency order
- Shutting down all actors gracefully
"""
def __init__(self, name: str = "ActorSystem"):
self.name = name
self._actors: Dict[str, BaseActor] = {}
def register(self, actor: BaseActor) -> "ActorSystem":
"""
Adds an actor to the registry. Returns self for chaining.
Must be called before start().
"""
self._actors[actor.actor_id] = actor
logger.info(
"ActorSystem '%s': registered actor '%s'",
self.name, actor.actor_id
)
return self
def connect_all(self) -> "ActorSystem":
"""
Registers every actor as a peer of every other actor.
This is appropriate for small systems. For large systems,
use selective connect() calls to limit coupling.
"""
actors = list(self._actors.values())
for actor in actors:
for peer in actors:
if peer.actor_id != actor.actor_id:
actor.register_peer(peer)
logger.info(
"ActorSystem '%s': fully connected %d actors.",
self.name, len(actors)
)
return self
def connect(
self, from_id: str, to_id: str
) -> "ActorSystem":
"""
Registers actor 'to_id' as a peer of actor 'from_id'.
Use this for selective, explicit connection graphs.
"""
src = self._actors[from_id]
dest = self._actors[to_id]
src.register_peer(dest)
return self
def start_all(self) -> "ActorSystem":
"""Starts all registered actors."""
for actor in self._actors.values():
actor.start()
logger.info(
"ActorSystem '%s': all %d actors started.",
self.name, len(self._actors)
)
return self
def get(self, actor_id: str) -> Optional[BaseActor]:
"""Returns the actor with the given ID, or None."""
return self._actors.get(actor_id)
def shutdown_all(self, timeout: float = 5.0) -> None:
"""
Sends SHUTDOWN to every actor and waits for them to stop.
"""
from protocol import Message, MessageType
for actor in self._actors.values():
actor.send(Message(
msg_type = MessageType.SHUTDOWN,
sender_id = "ActorSystem",
receiver_id = actor.actor_id
))
for actor in self._actors.values():
actor.join(timeout=timeout)
logger.info(
"ActorSystem '%s': all actors shut down.", self.name
)
9.2 A Complete End-to-End Example
The following example assembles a complete working system: an LLM actor backed by whichever backend is available, a tool actor with web search and file tools, a memory actor, and an agent actor, all wired together and used to complete a real task. This is the payoff for all the infrastructure we have built.
# main.py
#
# End-to-end demonstration of the Actor-based Agentic AI system.
# Assembles all actor types, wires them together, and runs a task.
#
# Run with:
# OPENAI_API_KEY=sk-... python main.py
# Or configure a local backend in select_best_backend() below.
import logging
import time
import queue
import threading
logging.basicConfig(
level = logging.INFO,
format = "%(asctime)s [%(name)s] %(levelname)s: %(message)s"
)
logger = logging.getLogger("main")
from protocol import (
Message, MessageType, TaskPayload
)
from actor_base import BaseActor
from actor_system import ActorSystem
from llm_backends import select_best_backend, OpenAIBackend
from llm_actor import LLMActor
from tool_actor import (
ToolActor, tool_web_search, tool_read_file, tool_write_file
)
from memory_actor import MemoryActor
from agent_actor import AgentActor
class UserInterfaceActor(BaseActor):
"""
A simple actor that represents the user interface.
It sends a task and collects the response, making it easy
to drive the system from synchronous test code.
"""
def __init__(self, actor_id: str):
super().__init__(actor_id)
# A regular queue used to pass the result back to the
# calling thread (which is blocked waiting for the answer).
self._result_queue = queue.Queue()
self._register_handler(
MessageType.RESPONSE, self._on_response
)
def on_message(self, message: Message) -> None:
pass
def _on_response(self, message: Message) -> None:
"""Puts the response payload into the result queue."""
self._result_queue.put(message.payload)
def ask(
self,
agent_id: str,
task: str,
timeout: float = 120.0
) -> dict:
"""
Sends a task to an agent and blocks until the response
arrives or the timeout expires.
"""
msg = Message(
msg_type = MessageType.TASK,
sender_id = self.actor_id,
receiver_id = agent_id,
payload = TaskPayload(
task_description = task,
max_steps = 8,
timeout_seconds = timeout
)
)
# We need to send via the registry; find the agent actor
self._send_to(agent_id, msg)
try:
result = self._result_queue.get(timeout=timeout)
return result
except queue.Empty:
return {"answer": "Timed out waiting for agent response."}
def build_system() -> ActorSystem:
"""
Constructs and wires the complete actor system.
Returns a started ActorSystem ready to process tasks.
"""
# --- Select the best available LLM backend ---
# For a local CUDA model, use:
# backend = CUDABackend("path/to/model.gguf")
# For Apple Silicon, use:
# backend = MLXBackend("mlx-community/Mistral-7B-Instruct-v0.3-4bit")
# For Vulkan, use:
# backend = VulkanBackend("path/to/model.gguf")
# The auto-selector tries each in order of preference:
backend = select_best_backend(
api_key = "", # Set OPENAI_API_KEY env var instead
base_url = None # None = use real OpenAI; set for local server
)
# --- Create actors ---
llm_actor = LLMActor(
actor_id = "llm",
backend = backend
)
tool_actor = ToolActor(actor_id="tools")
tool_actor.register_tool("web_search", tool_web_search)
tool_actor.register_tool("read_file", tool_read_file)
tool_actor.register_tool("write_file", tool_write_file)
memory_actor = MemoryActor(actor_id="memory")
agent_actor = AgentActor(
actor_id = "agent",
llm_actor_id = "llm",
tool_actor_id = "tools",
memory_actor_id = "memory",
available_tools = [
{
"name": "web_search",
"description": "Search the web for current information. "
"Args: query (str), max_results (int)"
},
{
"name": "read_file",
"description": "Read the contents of a text file. "
"Args: path (str)"
},
{
"name": "write_file",
"description": "Write text content to a file. "
"Args: path (str), content (str)"
}
],
model = "", # Empty = use backend's default model
temperature = 0.7,
max_tokens = 2048
)
ui_actor = UserInterfaceActor(actor_id="ui")
# --- Assemble and start the system ---
system = ActorSystem("AgenticAI")
(system
.register(llm_actor)
.register(tool_actor)
.register(memory_actor)
.register(agent_actor)
.register(ui_actor)
.connect_all()
.start_all()
)
# Give actors a moment to initialise before sending work
time.sleep(0.5)
return system
def main():
logger.info("=== Actor-Based Agentic AI System Starting ===")
system = build_system()
ui: UserInterfaceActor = system.get("ui")
# --- Run a sample task ---
task = (
"Search the web for the latest developments in quantum computing "
"and write a brief summary to the file 'quantum_summary.txt'."
)
logger.info("Sending task: %s", task)
result = ui.ask("agent", task, timeout=120.0)
print("\n" + "=" * 60)
print("AGENT RESPONSE:")
print("=" * 60)
print(result.get("answer", "No answer received."))
print(f"\n(Completed in {result.get('steps_taken', '?')} steps)")
print("=" * 60)
system.shutdown_all()
logger.info("=== System shut down cleanly ===")
if __name__ == "__main__":
main()
CHAPTER TEN: PUTTING IT ALL IN PERSPECTIVE
10.1 The Full Architecture at a Glance
The diagram below shows the complete architecture of the system we have built, from the user's task all the way through to the final answer, including the transaction layer.
+====================================================================+
| COMPLETE ACTOR-BASED AGENTIC AI ARCHITECTURE |
| |
| +-------------+ |
| | User / UI | |
| | Actor | |
| +------+------+ |
| | TASK |
| v |
| +------+------+ TASK +----------+ +----------+ |
| |Orchestrator |-------------->| Agent A | | Agent B | |
| | Actor | | (Researcher) | (Writer) | |
| +------+------+ +----+-----+ +----+-----+ |
| | | | |
| | TXN_BEGIN | | |
| v | | |
| +------+------+ | | |
| | Transaction | | | |
| | Manager | | | |
| | Actor | | | |
| +-------------+ | | |
| | LLM_QUERY | LLM_QUERY |
| v v |
| +------+------+ |
| | LLM Actor | |
| | CUDA / MLX | |
| | Vulkan / API| |
| +-------------+ |
| |
| +-------------+ +-------------+ |
| | Tool Actor | | Memory Actor| |
| | web_search | | history | |
| | read_file | | facts | |
| | write_file | | sessions | |
| | run_python | +-------------+ |
| +-------------+ |
+====================================================================+
Figure 7: The complete architecture. Every arrow represents a message passing through an actor's inbox. No component calls another's methods directly. The Transaction Manager Actor can wrap any sequence of agent actions in a transactional boundary with commit and rollback semantics.
10.2 Design Patterns and Lessons Learned
Several important design patterns have emerged from this implementation that are worth naming explicitly, because they recur in any serious Actor-based system.
The Dispatch Table pattern, used in BaseActor._dispatch(), is far superior to long if/elif chains for routing messages. It makes adding new message types trivial, keeps each handler small and focused, and makes the actor's capabilities immediately visible by inspecting the handler registry.
The Correlation ID pattern, used throughout the request-response flows, is essential for matching asynchronous replies to their original requests. Every actor that sends a request and expects a reply must store the request's msg_id and compare it against the correlation_id of incoming replies. Without this, an actor cannot safely handle multiple in-flight requests.
The State Machine pattern, used in AgentActor, is the right way to model actors that have complex multi-step interactions with other actors. Rather than using blocking calls or callbacks, the actor records its current state and uses incoming messages to drive transitions. This keeps the actor non-blocking and makes its behaviour easy to reason about and test.
The Lazy Initialisation pattern, used in all LLM backends, defers expensive operations such as loading a multi-gigabyte model into GPU memory until the first actual request arrives. This makes startup fast, makes errors easier to diagnose in context, and avoids loading resources that might never be used.
The Compensation pattern, used in the Saga implementation, is the practical alternative to Two-Phase Commit for long-running transactions. Rather than holding locks across multiple steps, each step defines a compensating action that undoes its effects. If any step fails, the system executes the compensating actions of all previously completed steps in reverse order.
10.3 Scaling and Production Considerations
The system as presented runs entirely in a single Python process with threads and subprocesses. Scaling it to a truly distributed deployment requires replacing the in-memory queues with a message broker such as RabbitMQ, Apache Kafka, or Redis Streams. The message protocol we designed, with its JSON serialisation and correlation IDs, is already prepared for this transition. The Actor interface, with its send() method as the only public API, means that swapping the transport layer requires changing only the send() and _send_to() implementations, not any of the business logic in the actors themselves.
Monitoring is another critical production concern. Because all communication flows through message queues, it is straightforward to add instrumentation at the queue level: measure queue depth, message processing latency, and throughput for every actor. These metrics give you a complete picture of where bottlenecks are forming and which actors are under load.
Fault tolerance in production Actor systems is typically handled through supervision trees, a concept borrowed from Erlang's OTP framework. A supervisor actor monitors a set of child actors and restarts them if they crash. The Actor Model makes this natural: the supervisor simply receives a notification message when a child actor terminates unexpectedly, and responds by creating a new instance of that actor and re-registering it with the system.
The Actor Model, as we have seen throughout this article, is not just a concurrency pattern. It is a complete architectural philosophy that aligns remarkably well with the requirements of Agentic AI systems. By treating each agent as an autonomous, message-driven actor, we gain encapsulation, concurrency, fault isolation, and the ability to implement distributed transactions, all without sacrificing the clarity and simplicity that make systems maintainable over time. The bureaucrat in the sealed office, it turns out, is an excellent model for the AI agent of the future.