You always wanted to build your OpenClaw like Agentic AI, but how does that work? The tutorial below is written for developers who want to understand Agentic AI not just in theory, but by building something real, step by step, piece by piece.
HINT: The full source code of all code is available via Octopussy Light GitHub. You do not need to type any code manually. But you can, of course! The Agentic AI system is called Octopussy Light - I am working on a large OpenClaw alternative called Octopussy. See blog post Octopussy Article
FOREWORD: WHY "OCTOPUSSY LIGHT"?
An octopus is one of nature's most remarkable problem-solvers. It has a central brain, but each of its eight arms also has its own neural cluster capable of semi-autonomous action. The arms can taste, feel, and react independently, yet they coordinate beautifully with the central brain to accomplish complex tasks. This is a surprisingly good metaphor for an Agentic AI system: a coordinator at the center, specialized agents operating semi-autonomously at the periphery, all working together toward a common goal.
"Octopussy Light" is our educational, incremental implementation of exactly this kind of system. It is "light" because we are building it step by step, starting from the simplest possible foundation and adding complexity only when we are ready. By the time you finish this article, you will have built a fully functional multi-agent AI system that reads email, monitors calendars, spawns worker agents on demand, equips them with predefined tools, creates new tools dynamically, and communicates using the Model Context Protocol.
This is not a survey article. You will write real code. Every increment will run. Every concept will be demonstrated with working examples before we move on.
CHAPTER ONE: UNDERSTANDING THE TERRAIN
Before we write a single line of code, we need to understand what Agentic AI actually is and why it represents a fundamentally different way of thinking about software.
Traditional software is deterministic and explicit. You write a function, it takes inputs, it produces outputs, and the path from input to output is entirely specified by the programmer. If you want the software to do something new, you write new code. The software has no agency of its own.
A large language model, on its own, is something different but still limited in a specific way. It can reason, generate text, answer questions, and even write code. But it cannot act on the world. It cannot send an email, check a calendar, or execute a file. It is, in a sense, a very sophisticated oracle: you ask it questions, it gives you answers, and that is where its involvement ends.
Agentic AI closes this gap. An agent is an LLM that has been given tools and the ability to decide when and how to use them. It can look at a goal, reason about what steps are needed to achieve that goal, call the appropriate tools in the appropriate order, observe the results, and continue reasoning until the goal is accomplished. This is what makes an agent fundamentally different from a chatbot: it can act, not just respond.
A multi-agent system takes this further. Instead of one agent trying to do everything, you have multiple specialized agents, each with its own tools, its own memory, and its own area of responsibility. A coordinator agent manages the overall workflow and communicates with the user. A mail agent monitors incoming email. A calendar agent watches for upcoming events. Worker agents execute specific tasks on demand, equipped first with a set of predefined built-in tools, and later with the ability to create entirely new tools dynamically. A tool agent handles that dynamic creation.
This specialization is powerful for the same reason that human organizations use specialization: a focused agent with the right tools and the right context will outperform a generalist agent trying to juggle everything at once.
Now, there is one more piece of the puzzle that we need to understand before we start coding: the Model Context Protocol, or MCP.
THE MODEL CONTEXT PROTOCOL: THE LANGUAGE AGENTS SPEAK
When you build a multi-agent system, one of the first problems you encounter is the question of how agents communicate with tools. Do you write custom code for every tool? Do you use a proprietary API format? Do you invent your own protocol?
The Model Context Protocol, first introduced by Anthropic in November 2024, answers this question with a resounding "none of the above." MCP is an open standard now stewarded by the Agentic AI Foundation (AAIF), a directed fund under the Linux Foundation, co-founded by Anthropic, Block, and OpenAI, with support from Google, Microsoft, AWS, and others. Think of it as the USB standard for AI tools: just as USB lets you plug any compliant device into any compliant port without writing custom drivers, MCP lets any compliant AI agent use any compliant tool without writing custom integration code.
MCP is built on JSON-RPC 2.0, a lightweight remote procedure call protocol that uses JSON for message formatting. It supports multiple transport mechanisms: STDIO for local communication between processes on the same machine, Server-Sent Events for web-based real-time communication, and Streamable HTTP for communication with remote servers. The protocol uses string-based version identifiers in the format YYYY-MM-DD, and the version we will be targeting in Octopussy Light is the November 25, 2025 specification, which is the latest stable version at the time of writing.
The 2025-11-25 specification represents a significant maturation of MCP beyond its original synchronous tool-calling roots. Its most notable additions include the Tasks primitive, which enables asynchronous long-running operations where a client receives a task handle immediately and fetches results later; simplified OAuth authorization via Client ID Metadata Documents replacing the older Dynamic Client Registration approach; Extensions as a formal concept for faster protocol innovation without bloating the core; Sampling with Tools, which allows servers to initiate sampling requests that include tool definitions and enables server-side agent loops; and a range of enterprise security and governance improvements. For Octopussy Light, we will primarily use the stable tool-calling and STDIO transport features that have been part of MCP since its inception, while the architecture is designed to accommodate the newer async Task capabilities as a future extension.
An MCP server exposes three kinds of things to clients. Tools are callable functions that can execute code or produce side effects, analogous to POST endpoints in a REST API. Resources are data sources that can be read, analogous to GET endpoints. Prompts are reusable templates for LLM interactions. In Octopussy Light, we will primarily be working with tools, since our agents need to act on the world, not just read from it.
The Python MCP SDK makes building MCP servers and clients straightforward. We will use it throughout this article. The key package is simply called "mcp" and can be installed with pip.
Now let us talk about the LLM backbone of our system.
CHOOSING YOUR LLM: LOCAL AND REMOTE OPTIONS
One of the design goals of Octopussy Light is that the LLM powering each agent should be swappable without touching the agent's core logic. This is important because the LLM landscape is evolving rapidly. The model that is best today may not be best in six months, and different agents in your system may benefit from different models. A coordinator agent that needs to understand nuanced user requests might benefit from a powerful cloud model, while a worker agent doing structured data extraction might work perfectly well with a smaller local model.
We will support two categories of LLMs. For local models, we will use Ollama, which provides a simple way to run models like Llama 3.2, Mistral, and others on your own hardware. Ollama exposes an OpenAI-compatible API, which means we can use the same client code for both local and remote models. For remote models, we will support any OpenAI-compatible endpoint, which covers OpenAI's GPT-4o, Anthropic's Claude via compatible wrappers, and many others.
The key to making models swappable is a configuration file. We will define the model for each agent in a configuration file, and the agent code will read this configuration at startup. Changing the model for any agent is then a matter of editing one line in the configuration file, with no code changes required.
Note: If you are configuring Octopussy Light with a local LLM, you may use the code examples in the following increments of Octopussy Light on a machine (server, notebook, desktop) without GPU. However, the local LLM will respond very slowly on CPU-only machines.
Let us now set up our development environment and write the very first piece of Octopussy Light.
CHAPTER TWO: INCREMENT ONE - THE FOUNDATION
Our first increment is deliberately minimal. We are going to build the skeleton of the coordinator agent: a simple loop that accepts user input, sends it to an LLM, and prints the response. This is not yet an agent in the full sense, but it establishes the foundation on which everything else will be built.
First, let us set up the project structure. Create a directory called octopussy_light and inside it create the following files and subdirectories:
octopussy_light/
config.yaml
main.py
coordinator/
__init__.py
coordinator_agent.py
llm/
__init__.py
llm_client.py
utils/
__init__.py
logger.py
The config.yaml file is where we define all configurable parameters. Here is the initial version. Do not forget in this and the upcoming versions of config.yaml to enter information specific to your environment, such as for LLM access, Mail access, Calendar access.
# config.yaml
# Octopussy Light - Main Configuration File
#
# This file controls all configurable aspects of the system.
# Change model names here without touching any Python code.
llm:
# Provider can be "ollama" (local) or "openai" (remote/cloud)
provider: "ollama"
# For Ollama: the model name as it appears in "ollama list"
# For OpenAI: the model name as it appears in the OpenAI API
coordinator_model: "llama3.2"
worker_model: "llama3.2"
# For Ollama, the base URL is typically http://localhost:11434
# For OpenAI, use https://api.openai.com/v1
base_url: "http://localhost:11434"
# API key: required for OpenAI, can be "ollama" for local Ollama
api_key: "ollama"
coordinator:
system_prompt: |
You are the coordinator of an agentic AI system called Octopussy Light.
Your job is to understand the user's requests, coordinate specialized
agents to fulfill those requests, and present results clearly and
helpfully. You are friendly, precise, and efficient.
Notice that the configuration separates the LLM provider, the model name, the base URL, and the API key. This four-part combination is all we need to switch between a local Ollama model and a remote cloud model. The coordinator and worker agents can even use different models if desired.
Now let us build the LLM client. This is the module that all agents will use to communicate with their underlying language model:
# llm/llm_client.py
#
# A unified LLM client that works with both local Ollama models
# and remote OpenAI-compatible endpoints. The model and provider
# are determined entirely by the configuration, not by this code.
from openai import AsyncOpenAI
from typing import List, Dict, Optional
class LLMClient:
"""
A thin, async wrapper around the OpenAI-compatible API.
Because Ollama exposes an OpenAI-compatible REST API, we can use
the same client code for both local and remote models. The only
difference is the base_url and api_key, which come from config.
"""
def __init__(
self,
base_url: str,
api_key: str,
model: str,
system_prompt: Optional[str] = None
):
"""
Initialize the LLM client.
Args:
base_url: The base URL of the LLM API endpoint.
Use http://localhost:11434/v1 for Ollama,
or https://api.openai.com/v1 for OpenAI.
api_key: The API key. Use "ollama" for local Ollama.
model: The model identifier (e.g., "llama3.2", "gpt-4o").
system_prompt: An optional system prompt that will be
prepended to every conversation.
"""
self.model = model
self.system_prompt = system_prompt
# The AsyncOpenAI client works with any OpenAI-compatible endpoint.
# Ollama's API is compatible with the OpenAI format when accessed
# at the /v1 path, so we append /v1 if not already present.
if not base_url.endswith("/v1"):
base_url = base_url.rstrip("/") + "/v1"
self.client = AsyncOpenAI(
base_url=base_url,
api_key=api_key
)
async def chat(
self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: Optional[int] = None
) -> str:
"""
Send a list of messages to the LLM and return the response text.
Args:
messages: A list of message dicts with "role" and "content" keys.
Roles can be "user", "assistant", or "system".
temperature: Controls randomness. Lower values are more
deterministic; higher values are more creative.
max_tokens: Optional limit on the length of the response.
Returns:
The text content of the LLM's response.
"""
# Prepend the system prompt if one was configured.
full_messages = []
if self.system_prompt:
full_messages.append({
"role": "system",
"content": self.system_prompt
})
full_messages.extend(messages)
response = await self.client.chat.completions.create(
model=self.model,
messages=full_messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message.content
The LLMClient is deliberately simple. It wraps the AsyncOpenAI client, prepends a system prompt when one is configured, and exposes a single async chat method. Every agent in Octopussy Light will use this client to talk to its underlying model.
Now let us build the logger, because good logging is essential in a multi-agent system where many things are happening concurrently and you need to understand what is going on:
# utils/logger.py
#
# A simple, colored console logger for Octopussy Light.
# Each component uses a different color so you can visually
# distinguish coordinator output from agent output at a glance.
import logging
import sys
from typing import Optional
# ANSI color codes for terminal output
COLORS = {
"coordinator": "\033[94m", # Blue
"mail_agent": "\033[92m", # Green
"calendar": "\033[93m", # Yellow
"worker": "\033[95m", # Magenta
"tool_agent": "\033[96m", # Cyan
"system": "\033[91m", # Red
"reset": "\033[0m"
}
def get_logger(name: str, color_key: Optional[str] = None) -> logging.Logger:
"""
Create and return a named logger with optional color coding.
Args:
name: The logger name, typically the component name.
color_key: A key from the COLORS dict to color the output.
Returns:
A configured logging.Logger instance.
"""
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
color = COLORS.get(color_key, "") if color_key else ""
reset = COLORS["reset"] if color_key else ""
formatter = logging.Formatter(
f"{color}[%(name)s] %(asctime)s %(levelname)s: %(message)s{reset}",
datefmt="%H:%M:%S"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
Now let us build the coordinator agent in its initial, minimal form:
# coordinator/coordinator_agent.py
#
# The Coordinator Agent - Increment 1
#
# In this first increment, the coordinator is simply a conversational
# agent that maintains a conversation history and talks to the user.
# We will add coordination capabilities in later increments.
import asyncio
from typing import List, Dict
import yaml
from llm.llm_client import LLMClient
from utils.logger import get_logger
logger = get_logger("coordinator", "coordinator")
class CoordinatorAgent:
"""
The central coordinator of Octopussy Light.
The coordinator is responsible for two things: communicating with
the user and coordinating all other agents in the system. In this
first increment, we implement only the user communication part.
"""
def __init__(self, config: Dict):
"""
Initialize the coordinator with the application configuration.
Args:
config: The parsed contents of config.yaml.
"""
llm_config = config["llm"]
coordinator_config = config["coordinator"]
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["coordinator_model"],
system_prompt=coordinator_config["system_prompt"]
)
# The conversation history is the coordinator's short-term memory.
# It accumulates the full dialogue so the LLM has context.
self.conversation_history: List[Dict[str, str]] = []
logger.info(
f"Coordinator initialized with model: {llm_config['coordinator_model']}"
)
async def chat(self, user_input: str) -> str:
"""
Process a user message and return the coordinator's response.
The conversation history is maintained across calls, giving
the LLM context about the ongoing conversation.
Args:
user_input: The raw text input from the user.
Returns:
The coordinator's response as a string.
"""
# Add the user's message to the conversation history.
self.conversation_history.append({
"role": "user",
"content": user_input
})
logger.debug(f"User said: {user_input}")
# Send the full conversation history to the LLM.
response = await self.llm.chat(self.conversation_history)
# Add the assistant's response to the history for future context.
self.conversation_history.append({
"role": "assistant",
"content": response
})
logger.debug(f"Coordinator responded: {response[:80]}...")
return response
Finally, let us write the main entry point that ties everything together:
# main.py
#
# Octopussy Light - Main Entry Point
#
# This file loads the configuration and starts the main interaction loop.
# Run this file to start Octopussy Light.
import asyncio
import yaml
import sys
from coordinator.coordinator_agent import CoordinatorAgent
from utils.logger import get_logger
logger = get_logger("system", "system")
def load_config(path: str = "config.yaml") -> dict:
"""
Load and parse the YAML configuration file.
Args:
path: Path to the configuration file.
Returns:
The parsed configuration as a Python dictionary.
"""
try:
with open(path, "r") as f:
return yaml.safe_load(f)
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
sys.exit(1)
except yaml.YAMLError as e:
logger.error(f"Error parsing configuration file: {e}")
sys.exit(1)
async def main():
"""
The main async entry point for Octopussy Light.
Loads configuration, initializes the coordinator, and runs
the interactive conversation loop.
"""
logger.info("Starting Octopussy Light...")
config = load_config()
coordinator = CoordinatorAgent(config)
print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")
while True:
try:
# Read user input synchronously. In later increments,
# we will make this non-blocking so background agents
# can deliver notifications while the user is idle.
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ("quit", "exit"):
print("Goodbye!")
break
response = await coordinator.chat(user_input)
print(f"\nCoordinator: {response}\n")
except Exception as e:
logger.error(f"Unexpected error: {e}")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nGoodbye!")
To run this first increment, you need to install the required packages. Create a requirements.txt file:
# requirements.txt
# Core dependencies for Octopussy Light
openai>=1.0.0 # OpenAI-compatible async client (works with Ollama too)
pyyaml>=6.0 # YAML configuration parsing
mcp>=1.0.0 # Model Context Protocol SDK
aioimaplib>=1.0.0 # Async IMAP email reading
google-api-python-client>=2.0.0 # Google Calendar API
google-auth-oauthlib>=1.0.0 # Google OAuth
google-auth-httplib2>=0.2.0 # Google Auth HTTP transport
chromadb>=1.0.0 # Vector database for long-term memory
icalendar>=5.0.0 # Local .ics calendar file parsing
pytz>=2024.1 # Timezone handling for calendar events
Install them with:
pip install -r requirements.txt
Make sure Ollama is running and you have pulled the llama3.2 model:
ollama pull llama3.2
Then run the system:
python main.py
You should see something like this in your terminal:
[system] 10:23:01 INFO: Starting Octopussy Light...
[coordinator] 10:23:01 INFO: Coordinator initialized with model: llama3.2
Octopussy Light is ready. Type 'quit' or 'exit' to stop.
You: Hello, what can you do?
Coordinator: Hello! I am the coordinator of Octopussy Light, an agentic AI
system. I can help you manage your email, keep track of your calendar, and
execute tasks using specialized worker agents. What would you like to do today?
You:
This is increment one: a working conversational coordinator. It is not yet an agent in the full sense, but the foundation is solid. The LLM is configurable, the conversation history is maintained, and the code is clean and extensible. Now let us add the first real agent capability.
CHAPTER THREE: INCREMENT TWO - ASYNCIO AND THE MESSAGE BUS
Before we add specialized agents, we need to solve a fundamental architectural problem. Our current main loop blocks on user input. While the user is thinking and typing, nothing else can happen. But in a real multi-agent system, things need to happen in the background: the mail agent needs to check for new email, the calendar agent needs to monitor upcoming events, and worker agents need to execute tasks. All of this must happen concurrently with the user interaction.
The solution is Python's asyncio library combined with an internal message bus. We will use asyncio to run all agents concurrently as coroutines, and we will use asyncio.Queue objects to let agents communicate with each other without tight coupling. This is the classic producer-consumer pattern applied to agent communication.
The architecture looks like this:
User Input
|
v
[Coordinator Agent] <----> [Message Bus (asyncio.Queue)]
^ |
| +--------+--------+
| | | |
| [Mail Agent] [Calendar] [Workers]
| | | |
+--------------------+--------+--------+
(all agents send to coordinator's inbox)
Each agent has its own inbox queue. The coordinator has an inbox queue where all other agents send their notifications and results. The coordinator also has outbox queues for sending tasks to specific agents. This decoupled design means agents do not need to know about each other directly; they only need to know about the message bus.
Let us update the project structure to include the message bus:
octopussy_light/
config.yaml
main.py
coordinator/
__init__.py
coordinator_agent.py
agents/
__init__.py
base_agent.py (new)
mail_agent.py (new)
messaging/
__init__.py
message_bus.py (new)
message_types.py (new)
llm/
__init__.py
llm_client.py
utils/
__init__.py
logger.py
The message types module defines the structure of all messages that flow through the system. Using typed messages rather than raw dictionaries makes the code much easier to understand and maintain:
# messaging/message_types.py
#
# Defines all message types that flow through the Octopussy Light
# message bus. Using dataclasses gives us type safety and clear
# documentation of what each message contains.
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from enum import Enum
import uuid
import time
class MessageType(Enum):
"""All possible message types in the Octopussy Light system."""
# Messages from agents to the coordinator
AGENT_NOTIFICATION = "agent_notification" # General notification
MAIL_NOTIFICATION = "mail_notification" # New email matched filters
CALENDAR_NOTIFICATION = "calendar_notification" # Upcoming calendar event
TASK_RESULT = "task_result" # Worker completed a task
TASK_ERROR = "task_error" # Worker failed a task
TASK_PROGRESS = "task_progress" # Worker partial result
# Messages from the coordinator to agents
ASSIGN_TASK = "assign_task" # Give a task to a worker
CONFIGURE_AGENT = "configure_agent" # Update agent configuration
SHUTDOWN_AGENT = "shutdown_agent" # Tell an agent to stop
@dataclass
class Message:
"""
The universal message type for inter-agent communication.
Every message in the system is an instance of this class.
The payload field carries message-type-specific data as a dict.
"""
# The type of this message, from the MessageType enum.
message_type: MessageType
# The name of the agent that sent this message.
sender: str
# The name of the intended recipient agent.
recipient: str
# The message payload. Structure depends on message_type.
payload: Dict[str, Any] = field(default_factory=dict)
# A unique identifier for this message, auto-generated.
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
# Unix timestamp of when this message was created.
timestamp: float = field(default_factory=time.time)
# Optional: the task ID this message relates to.
task_id: Optional[str] = None
Now let us build the message bus itself. The message bus is the central communication hub of the system:
# messaging/message_bus.py
#
# The Octopussy Light Message Bus
#
# The message bus manages a set of named queues, one per agent.
# Agents register themselves to get an inbox queue, then send
# messages to other agents by name. The bus routes messages to
# the correct queue automatically.
import asyncio
from typing import Dict
from utils.logger import get_logger
from messaging.message_types import Message
logger = get_logger("message_bus", "system")
class MessageBus:
"""
A simple, asyncio-based message bus for inter-agent communication.
Each agent registers with the bus and receives its own asyncio.Queue
as an inbox. Sending a message to another agent is as simple as
calling send() with the message (which contains the recipient's name).
"""
def __init__(self):
# Maps agent names to their inbox queues.
self._queues: Dict[str, asyncio.Queue] = {}
logger.info("Message bus initialized.")
def register(self, agent_name: str, maxsize: int = 100) -> asyncio.Queue:
"""
Register an agent and create its inbox queue.
Args:
agent_name: The unique name of the agent.
maxsize: Maximum number of messages the queue can hold.
0 means unlimited. Default is 100 to prevent
runaway memory usage.
Returns:
The asyncio.Queue that serves as this agent's inbox.
"""
if agent_name in self._queues:
logger.warning(
f"Agent '{agent_name}' is already registered. "
f"Returning existing queue."
)
return self._queues[agent_name]
queue = asyncio.Queue(maxsize=maxsize)
self._queues[agent_name] = queue
logger.info(f"Agent '{agent_name}' registered with message bus.")
return queue
def unregister(self, agent_name: str):
"""
Remove an agent's inbox queue from the bus.
Args:
agent_name: The name of the agent to unregister.
"""
if agent_name in self._queues:
del self._queues[agent_name]
logger.info(f"Agent '{agent_name}' unregistered from message bus.")
async def send(self, message: Message):
"""
Send a message to the recipient specified in message.recipient.
If the recipient's queue is full, this will block until
space becomes available (backpressure). If the recipient
is not registered, logs an error and discards the message.
Args:
message: The Message object to send.
"""
recipient = message.recipient
if recipient not in self._queues:
logger.error(
f"Cannot deliver message to '{recipient}': "
f"agent not registered. Message from '{message.sender}' "
f"of type '{message.message_type.value}' discarded."
)
return
await self._queues[recipient].put(message)
logger.debug(
f"Message routed: {message.sender} -> {recipient} "
f"[{message.message_type.value}]"
)
def is_registered(self, agent_name: str) -> bool:
"""Check whether an agent is registered with the bus."""
return agent_name in self._queues
With the message bus in place, we can now define a base class for all agents. This base class handles the common pattern of running in the background, receiving messages from the bus, and dispatching them to handler methods:
# agents/base_agent.py
#
# The Base Agent class for all Octopussy Light agents.
#
# Every agent in the system inherits from this class. It provides
# the common infrastructure for registering with the message bus,
# running as an asyncio task, and processing incoming messages.
import asyncio
from abc import ABC, abstractmethod
from typing import Optional
from messaging.message_bus import MessageBus
from messaging.message_types import Message
from utils.logger import get_logger
class BaseAgent(ABC):
"""
Abstract base class for all Octopussy Light agents.
Subclasses must implement the handle_message() method to define
how they respond to incoming messages. They may also override
run_background_tasks() to perform periodic background work
(like checking email or monitoring a calendar).
"""
def __init__(
self,
name: str,
bus: MessageBus,
config: dict,
color_key: Optional[str] = None
):
"""
Initialize the base agent.
Args:
name: The unique name of this agent on the message bus.
bus: The shared MessageBus instance.
config: The full application configuration dictionary.
color_key: The color key for log output (from utils/logger.py).
"""
self.name = name
self.bus = bus
self.config = config
self.logger = get_logger(name, color_key)
self.running = False
# Register with the message bus to get our inbox queue.
self.inbox = bus.register(name)
self.logger.info(f"Agent '{name}' initialized.")
async def start(self):
"""
Start the agent's main loop and any background tasks.
This method runs indefinitely until stop() is called.
It concurrently runs the message processing loop and
any background tasks defined by the subclass.
"""
self.running = True
self.logger.info(f"Agent '{self.name}' starting.")
# Run the message loop and background tasks concurrently.
await asyncio.gather(
self._message_loop(),
self.run_background_tasks()
)
async def stop(self):
"""Signal the agent to stop gracefully."""
self.running = False
self.logger.info(f"Agent '{self.name}' stopping.")
async def _message_loop(self):
"""
The main message processing loop.
Continuously reads messages from the inbox queue and
dispatches them to handle_message(). Stops when self.running
becomes False and the queue is empty.
task_done() is called in a finally block to guarantee it is
always called even if handle_message() raises an exception,
preventing queue.join() from blocking indefinitely.
"""
while self.running:
try:
# Wait up to 0.1 seconds for a message, then loop
# to check self.running again. This allows clean shutdown.
message = await asyncio.wait_for(
self.inbox.get(),
timeout=0.1
)
try:
await self.handle_message(message)
except Exception as e:
self.logger.error(f"Error processing message: {e}")
finally:
# Always mark the task done, even if handle_message raised.
self.inbox.task_done()
except asyncio.TimeoutError:
# No message arrived; just loop and check self.running.
continue
@abstractmethod
async def handle_message(self, message: Message):
"""
Handle an incoming message from the message bus.
Subclasses must implement this method to define their
response to different message types.
Args:
message: The incoming Message object.
"""
pass
async def run_background_tasks(self):
"""
Override this method to run periodic background tasks.
The default implementation does nothing. Subclasses like
the mail agent and calendar agent override this to perform
their periodic monitoring work.
"""
pass
async def send(self, message: Message):
"""
Convenience method to send a message via the bus.
The recipient is taken from message.recipient, so no
separate recipient argument is needed.
Args:
message: The Message to send. message.recipient must be
set to the name of the intended recipient agent.
"""
await self.bus.send(message)
Now we need to update the coordinator to use the message bus and run as an asyncio task alongside other agents. We also need to update the main loop to be non-blocking. Let us rewrite the coordinator and the main entry point:
# coordinator/coordinator_agent.py
#
# The Coordinator Agent - Increment 2
#
# Now the coordinator runs as an asyncio task alongside other agents.
# It processes user input AND handles messages arriving from other agents.
# The input loop is now non-blocking, using asyncio to read stdin.
import asyncio
import sys
from typing import List, Dict
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from llm.llm_client import LLMClient
from utils.logger import get_logger
logger = get_logger("coordinator", "coordinator")
class CoordinatorAgent(BaseAgent):
"""
The central coordinator of Octopussy Light.
Handles user communication and coordinates all other agents.
Runs concurrently with other agents via asyncio.
"""
def __init__(self, config: dict, bus: MessageBus):
super().__init__(
name="coordinator",
bus=bus,
config=config,
color_key="coordinator"
)
llm_config = config["llm"]
coordinator_config = config["coordinator"]
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["coordinator_model"],
system_prompt=coordinator_config["system_prompt"]
)
# Short-term memory: the conversation history with the user.
self.conversation_history: List[Dict[str, str]] = []
self.logger.info(
f"Coordinator initialized with model: "
f"{llm_config['coordinator_model']}"
)
async def handle_message(self, message: Message):
"""
Handle messages arriving from other agents.
In this increment, we simply display the notification to the user.
In later increments, we will handle task results, errors, etc.
"""
if message.message_type == MessageType.MAIL_NOTIFICATION:
await self._display_mail_notification(message.payload)
elif message.message_type == MessageType.CALENDAR_NOTIFICATION:
await self._display_calendar_notification(message.payload)
elif message.message_type == MessageType.TASK_RESULT:
await self._display_task_result(message.payload)
elif message.message_type == MessageType.TASK_ERROR:
await self._display_task_error(message.payload)
else:
self.logger.warning(
f"Received unhandled message type: {message.message_type}"
)
async def _display_mail_notification(self, payload: dict):
"""Display a mail notification to the user."""
print(
f"\n[MAIL] From: {payload.get('sender', 'Unknown')}\n"
f" Subject: {payload.get('subject', 'No subject')}\n"
f" Summary: {payload.get('summary', '')}\n"
)
async def _display_calendar_notification(self, payload: dict):
"""Display a calendar event notification to the user."""
print(
f"\n[CALENDAR] Event: {payload.get('title', 'Untitled')}\n"
f" When: {payload.get('start_time', 'Unknown')}\n"
f" Location: {payload.get('location', 'Not specified')}\n"
f" Description: {payload.get('description', '')}\n"
)
async def _display_task_result(self, payload: dict):
"""Display a task result to the user."""
task_id = payload.get("task_id", "unknown")
response_num = payload.get("response_number", 1)
result = payload.get("result", "")
print(
f"\n[TASK RESULT] Task {task_id} "
f"(response #{response_num}):\n{result}\n"
)
async def _display_task_error(self, payload: dict):
"""Display a task error to the user."""
task_id = payload.get("task_id", "unknown")
error = payload.get("error", "Unknown error")
print(f"\n[TASK ERROR] Task {task_id} failed: {error}\n")
async def chat(self, user_input: str) -> str:
"""
Process user input and return the coordinator's response.
"""
self.conversation_history.append({
"role": "user",
"content": user_input
})
response = await self.llm.chat(self.conversation_history)
self.conversation_history.append({
"role": "assistant",
"content": response
})
return response
async def run_background_tasks(self):
"""
The coordinator's background task: reading user input asynchronously.
We use asyncio to read from stdin without blocking the event loop,
so that agent notifications can be displayed even while the user
is idle or typing.
"""
loop = asyncio.get_running_loop()
print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")
while self.running:
try:
# Read a line from stdin in a thread pool so we don't
# block the asyncio event loop.
print("You: ", end="", flush=True)
user_input = await loop.run_in_executor(
None, sys.stdin.readline
)
user_input = user_input.strip()
if not user_input:
continue
if user_input.lower() in ("quit", "exit"):
self.running = False
print("Goodbye!")
break
response = await self.chat(user_input)
print(f"\nCoordinator: {response}\n")
except Exception as e:
self.logger.error(f"Error in input loop: {e}")
Now update main.py to create the message bus and start the coordinator as an asyncio task:
# main.py
#
# Octopussy Light - Main Entry Point (Increment 2)
#
# Now uses asyncio to run the coordinator as a concurrent task,
# enabling background agents to run alongside user interaction.
import asyncio
import yaml
import sys
from messaging.message_bus import MessageBus
from coordinator.coordinator_agent import CoordinatorAgent
from utils.logger import get_logger
logger = get_logger("system", "system")
def load_config(path: str = "config.yaml") -> dict:
"""Load and parse the YAML configuration file."""
try:
with open(path, "r") as f:
return yaml.safe_load(f)
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
sys.exit(1)
except yaml.YAMLError as e:
logger.error(f"Error parsing configuration file: {e}")
sys.exit(1)
async def main():
"""
The main async entry point for Octopussy Light.
Creates the message bus, initializes all agents, and runs
them all concurrently using asyncio.gather().
"""
logger.info("Starting Octopussy Light...")
config = load_config()
bus = MessageBus()
coordinator = CoordinatorAgent(config, bus)
# In later increments, we will add more agents here:
# mail_agent = MailAgent(config, bus)
# calendar_agent = CalendarAgent(config, bus)
try:
# Run all agents concurrently. asyncio.gather() runs all
# coroutines concurrently and waits for all to complete.
await asyncio.gather(
coordinator.start(),
# mail_agent.start(),
# calendar_agent.start(),
)
finally:
await coordinator.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutting down...")
This second increment is a significant architectural leap. The system now has a proper message bus, a base agent class, and an asynchronous input loop. The coordinator runs as an asyncio coroutine, which means it can receive messages from other agents even while waiting for user input. The foundation is now ready for real agents to be added.
CHAPTER FOUR: INCREMENT THREE - THE MAIL AGENT
The mail agent is the first specialized agent we will add to Octopussy Light. Its job is to periodically check an email inbox for new messages that match user-defined filters, summarize the matching messages, and send the results to the coordinator.
The mail agent has three configurable aspects. First, the polling interval: how often it checks for new mail, in seconds. Second, the filter rules: a list of regular expression patterns to match against specific parts of each email, such as the sender address, the subject line, or the message body. Third, the IMAP server connection details: host, username, and password.
Let us add the mail agent configuration to config.yaml:
# config.yaml (additions for the mail agent)
mail_agent:
# How often to check for new mail, in seconds.
poll_interval_seconds: 60
# IMAP server connection details.
# For security, consider loading these from environment variables
# rather than storing credentials directly in this file.
imap_host: "imap.gmail.com"
imap_user: "your_email@gmail.com"
imap_password: "your_app_password"
# Filter rules: a list of rules. Each rule specifies a field to
# match (sender, subject, or body) and a regular expression pattern.
# A message matches if ANY rule matches.
filters:
- field: "sender"
pattern: "boss@company.com"
- field: "subject"
pattern: "(?i)urgent|important|action required"
- field: "body"
pattern: "(?i)please review|your approval"
Now let us build the mail agent. It uses aioimaplib for async IMAP access and the LLMClient to summarize matching emails:
# agents/mail_agent.py
#
# The Mail Agent for Octopussy Light.
#
# Periodically checks an IMAP mailbox for new messages that match
# user-defined regular expression filters. Matching messages are
# summarized using an LLM and sent to the coordinator as JSON payloads.
import asyncio
import email
import email.header
import re
import time
from typing import List, Dict, Optional, Set
from aioimaplib import IMAP4_SSL
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from llm.llm_client import LLMClient
class MailAgent(BaseAgent):
"""
Monitors an IMAP mailbox and notifies the coordinator of
new messages that match configured regular expression filters.
"""
def __init__(self, config: dict, bus: MessageBus):
super().__init__(
name="mail_agent",
bus=bus,
config=config,
color_key="mail_agent"
)
mail_config = config["mail_agent"]
llm_config = config["llm"]
self.imap_host = mail_config["imap_host"]
self.imap_user = mail_config["imap_user"]
self.imap_password = mail_config["imap_password"]
self.poll_interval = mail_config["poll_interval_seconds"]
self.filters = mail_config.get("filters", [])
# The LLM is used to summarize matching email content.
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["coordinator_model"],
system_prompt=(
"You are an email summarizer. Given the body of an email, "
"produce a concise 2-3 sentence summary of its key points. "
"Be factual and objective. Do not add commentary."
)
)
# Track which message UIDs we have already processed so we
# do not notify the coordinator about the same email twice.
# Note: this set grows unboundedly for long-running processes.
# A production implementation would use a bounded cache or
# persist processed UIDs to disk.
self.processed_uids: Set[str] = set()
self.logger.info(
f"Mail agent initialized. Polling every "
f"{self.poll_interval}s with {len(self.filters)} filter(s)."
)
async def handle_message(self, message: Message):
"""
The mail agent currently does not respond to incoming messages.
Future increments may add reconfiguration support here.
"""
self.logger.debug(
f"Mail agent received message of type {message.message_type}, "
f"ignoring (not yet handled)."
)
async def run_background_tasks(self):
"""
The mail agent's main background loop.
Polls the IMAP server at the configured interval, checks for
new messages matching the filters, and notifies the coordinator.
"""
self.logger.info("Mail agent background polling started.")
while self.running:
try:
await self._check_mail()
except Exception as e:
self.logger.error(f"Error during mail check: {e}")
# Wait for the configured interval before checking again.
# We check self.running every second so we can shut down
# cleanly without waiting the full poll interval.
for _ in range(self.poll_interval):
if not self.running:
break
await asyncio.sleep(1)
async def _check_mail(self):
"""
Connect to the IMAP server, fetch new messages, apply filters,
and send notifications to the coordinator for any matches.
"""
self.logger.debug("Checking mail...")
imap_client = None
try:
imap_client = IMAP4_SSL(host=self.imap_host)
await imap_client.wait_hello_from_server()
await imap_client.login(self.imap_user, self.imap_password)
await imap_client.select("INBOX")
# Search for UNSEEN messages only, to avoid reprocessing.
status, message_ids = await imap_client.search(None, "UNSEEN")
if status != "OK" or not message_ids[0]:
self.logger.debug("No new unseen messages.")
return
uid_list = message_ids[0].split()
self.logger.info(f"Found {len(uid_list)} unseen message(s).")
for uid in uid_list:
uid_str = uid.decode() if isinstance(uid, bytes) else uid
if uid_str in self.processed_uids:
continue
parsed = await self._fetch_and_parse(imap_client, uid)
if parsed is None:
continue
sender, subject, body = parsed
if self._matches_filters(sender, subject, body):
self.logger.info(
f"Filter match: UID {uid_str}, "
f"From: {sender}, Subject: {subject}"
)
summary = await self._summarize(body)
await self._notify_coordinator(sender, subject, summary)
self.processed_uids.add(uid_str)
finally:
if imap_client:
try:
await imap_client.logout()
except Exception:
pass
async def _fetch_and_parse(
self,
imap_client,
uid: bytes
) -> Optional[tuple]:
"""
Fetch a single email by UID and extract sender, subject, and body.
Returns:
A tuple of (sender, subject, body) strings, or None on error.
"""
try:
status, data = await imap_client.fetch(uid, "(RFC822)")
if status != "OK":
return None
for part in data:
if not isinstance(part, tuple):
continue
msg = email.message_from_bytes(part[1])
# Decode the sender field.
sender_raw = msg.get("From", "")
sender = self._decode_header(sender_raw)
# Decode the subject field.
subject_raw = msg.get("Subject", "No subject")
subject = self._decode_header(subject_raw)
# Extract the plain text body.
body = self._extract_body(msg)
return sender, subject, body
except Exception as e:
self.logger.error(f"Error fetching/parsing email UID {uid}: {e}")
return None
def _decode_header(self, raw_header: str) -> str:
"""Decode a potentially encoded email header to a plain string."""
try:
parts = email.header.decode_header(raw_header)
decoded_parts = []
for part, encoding in parts:
if isinstance(part, bytes):
decoded_parts.append(
part.decode(encoding or "utf-8", errors="replace")
)
else:
decoded_parts.append(str(part))
return " ".join(decoded_parts)
except Exception:
return str(raw_header)
def _extract_body(self, msg) -> str:
"""
Extract the plain text body from an email message.
Handles both simple (non-multipart) and multipart messages.
Prefers text/plain parts over text/html.
"""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
disposition = str(part.get("Content-Disposition", ""))
if content_type == "text/plain" and "attachment" not in disposition:
try:
return part.get_payload(decode=True).decode(
"utf-8", errors="replace"
)
except Exception:
continue
else:
try:
return msg.get_payload(decode=True).decode(
"utf-8", errors="replace"
)
except Exception:
pass
return ""
def _matches_filters(self, sender: str, subject: str, body: str) -> bool:
"""
Check whether an email matches any of the configured filter rules.
Each filter rule specifies a field (sender, subject, or body)
and a regular expression pattern. The email matches if ANY
single rule matches.
Args:
sender: The decoded sender address string.
subject: The decoded subject line string.
body: The plain text body of the email.
Returns:
True if at least one filter rule matches, False otherwise.
"""
field_map = {
"sender": sender,
"subject": subject,
"body": body
}
for rule in self.filters:
field = rule.get("field", "").lower()
pattern = rule.get("pattern", "")
text = field_map.get(field, "")
if pattern and re.search(pattern, text):
return True
return False
async def _summarize(self, body: str) -> str:
"""
Use the LLM to produce a concise summary of the email body.
Args:
body: The plain text body of the email.
Returns:
A 2-3 sentence summary string.
"""
if not body.strip():
return "No body content."
# Truncate very long bodies to avoid exceeding context limits.
truncated = body[:4000] if len(body) > 4000 else body
try:
summary = await self.llm.chat([
{"role": "user", "content": f"Summarize this email:\n\n{truncated}"}
])
return summary.strip()
except Exception as e:
self.logger.error(f"LLM summarization failed: {e}")
return "Summary unavailable."
async def _notify_coordinator(
self,
sender: str,
subject: str,
summary: str
):
"""
Send a mail notification to the coordinator via the message bus.
The payload is a JSON-serializable dictionary containing the
sender, subject, and LLM-generated summary of the email.
"""
notification = Message(
message_type=MessageType.MAIL_NOTIFICATION,
sender=self.name,
recipient="coordinator",
payload={
"sender": sender,
"subject": subject,
"summary": summary
}
)
await self.bus.send(notification)
self.logger.info(
f"Sent mail notification to coordinator: '{subject}'"
)
Now update main.py to include the mail agent:
# main.py (Increment 3)
import asyncio
import yaml
import sys
from messaging.message_bus import MessageBus
from coordinator.coordinator_agent import CoordinatorAgent
from agents.mail_agent import MailAgent
from utils.logger import get_logger
logger = get_logger("system", "system")
def load_config(path: str = "config.yaml") -> dict:
"""Load and parse the YAML configuration file."""
try:
with open(path, "r") as f:
return yaml.safe_load(f)
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
sys.exit(1)
async def main():
"""Start Octopussy Light with the coordinator and mail agent."""
logger.info("Starting Octopussy Light...")
config = load_config()
bus = MessageBus()
coordinator = CoordinatorAgent(config, bus)
mail_agent = MailAgent(config, bus)
try:
await asyncio.gather(
coordinator.start(),
mail_agent.start(),
)
finally:
await coordinator.stop()
await mail_agent.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutting down...")
Let us trace through what happens when a matching email arrives. The mail agent's background loop wakes up after the configured poll interval. It connects to the IMAP server, fetches unseen messages, and for each one calls _matches_filters(). If a message matches, it calls _summarize() to get an LLM-generated summary, then calls _notify_coordinator() to send a Message object to the coordinator's inbox. The coordinator's _message_loop() picks up this message and calls _display_mail_notification(), which prints something like this to the terminal:
[MAIL] From: boss@company.com
Subject: Urgent: Q4 Budget Review
Summary: The sender requests an immediate review of the Q4 budget
projections. They highlight a discrepancy in the marketing allocation
and ask for corrections before the board meeting on Friday.
The user sees this notification appear in their terminal even if they are in the middle of a conversation with the coordinator, because the mail agent and the coordinator are running concurrently as asyncio tasks.
CHAPTER FIVE: INCREMENT FOUR - THE CALENDAR AGENT
The calendar agent follows the same pattern as the mail agent but monitors calendar events rather than email. It supports two calendar backends: the operating system's local calendar (via the icalendar library and a local .ics file) and Google Calendar via the Google Calendar API. The user configures which backend to use and how far in advance to receive notifications.
Let us add the calendar configuration to config.yaml:
# config.yaml (additions for the calendar agent)
calendar_agent:
# How often to check for upcoming events, in seconds.
poll_interval_seconds: 300
# How many minutes before an event to send a notification.
notify_minutes_before: 30
# Backend: "local" for a local .ics file, "google" for Google Calendar.
backend: "local"
# For the "local" backend: path to the .ics file.
local_ics_path: "/path/to/your/calendar.ics"
# For the "google" backend: path to the credentials file.
google_credentials_path: "credentials.json"
google_token_path: "token.json"
google_calendar_id: "primary"
Now let us build the calendar agent. It is structured to support multiple backends through a simple strategy pattern:
# agents/calendar_agent.py
#
# The Calendar Agent for Octopussy Light.
#
# Monitors a calendar (local .ics file or Google Calendar) and notifies
# the coordinator about upcoming events within a configurable time window.
import asyncio
import datetime
from typing import List, Optional, Set
from concurrent.futures import ThreadPoolExecutor
import os
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
class CalendarEvent:
"""A simple data class representing a calendar event."""
def __init__(
self,
event_id: str,
title: str,
start_time: datetime.datetime,
end_time: Optional[datetime.datetime],
location: str = "",
description: str = ""
):
self.event_id = event_id
self.title = title
self.start_time = start_time
self.end_time = end_time
self.location = location
self.description = description
class CalendarAgent(BaseAgent):
"""
Monitors a calendar and notifies the coordinator of upcoming events.
Supports local .ics files and Google Calendar as backends.
Events are notified a configurable number of minutes in advance.
"""
def __init__(self, config: dict, bus: MessageBus):
super().__init__(
name="calendar_agent",
bus=bus,
config=config,
color_key="calendar"
)
cal_config = config["calendar_agent"]
self.poll_interval = cal_config["poll_interval_seconds"]
self.notify_minutes_before = cal_config["notify_minutes_before"]
self.backend = cal_config.get("backend", "local")
# Track which events we have already notified about,
# so we do not send duplicate notifications.
# Note: this set grows unboundedly for long-running processes.
# A production implementation would prune entries for past events.
self.notified_event_ids: Set[str] = set()
if self.backend == "local":
self.ics_path = cal_config.get("local_ics_path", "")
elif self.backend == "google":
self.credentials_path = cal_config.get(
"google_credentials_path", "credentials.json"
)
self.token_path = cal_config.get(
"google_token_path", "token.json"
)
self.calendar_id = cal_config.get(
"google_calendar_id", "primary"
)
self.logger.info(
f"Calendar agent initialized. Backend: {self.backend}, "
f"Notifying {self.notify_minutes_before}min before events."
)
async def handle_message(self, message: Message):
"""Calendar agent does not currently handle incoming messages."""
pass
async def run_background_tasks(self):
"""Poll the calendar at the configured interval."""
self.logger.info("Calendar agent background polling started.")
while self.running:
try:
await self._check_calendar()
except Exception as e:
self.logger.error(f"Error during calendar check: {e}")
for _ in range(self.poll_interval):
if not self.running:
break
await asyncio.sleep(1)
async def _check_calendar(self):
"""
Fetch upcoming events and notify the coordinator for any
that fall within the notification window.
"""
now = datetime.datetime.now(datetime.timezone.utc)
notify_delta = datetime.timedelta(minutes=self.notify_minutes_before)
window_end = now + notify_delta
if self.backend == "local":
events = await self._fetch_local_events(now, window_end)
elif self.backend == "google":
events = await self._fetch_google_events(now, window_end)
else:
self.logger.error(f"Unknown calendar backend: {self.backend}")
return
for event in events:
if event.event_id not in self.notified_event_ids:
await self._notify_coordinator(event)
self.notified_event_ids.add(event.event_id)
async def _fetch_local_events(
self,
start: datetime.datetime,
end: datetime.datetime
) -> List[CalendarEvent]:
"""
Read events from a local .ics file that fall within [start, end].
Uses the icalendar library, which is synchronous, so we run it
in a thread pool to avoid blocking the asyncio event loop.
Note: Naive datetimes in the .ics file (those without timezone
information) are assumed to be UTC. If your calendar stores events
in a local timezone, you should convert them appropriately before
comparison. Consider using the 'pytz' library for timezone-aware
datetime handling in production deployments.
"""
loop = asyncio.get_running_loop()
def _read_ics():
"""Synchronous function to read and parse the .ics file."""
try:
from icalendar import Calendar
if not os.path.exists(self.ics_path):
self.logger.warning(
f"ICS file not found: {self.ics_path}"
)
return []
with open(self.ics_path, "rb") as f:
cal = Calendar.from_ical(f.read())
events = []
for component in cal.walk():
if component.name != "VEVENT":
continue
dtstart = component.get("DTSTART")
if dtstart is None:
continue
event_start = dtstart.dt
# Normalize to UTC datetime for comparison.
if isinstance(event_start, datetime.date) and not isinstance(
event_start, datetime.datetime
):
event_start = datetime.datetime.combine(
event_start,
datetime.time.min,
tzinfo=datetime.timezone.utc
)
elif isinstance(event_start, datetime.datetime):
if event_start.tzinfo is None:
# Naive datetime: assume UTC.
event_start = event_start.replace(
tzinfo=datetime.timezone.utc
)
if not (start <= event_start <= end):
continue
dtend = component.get("DTEND")
event_end = dtend.dt if dtend else None
if isinstance(event_end, datetime.date) and not isinstance(
event_end, datetime.datetime
):
event_end = datetime.datetime.combine(
event_end,
datetime.time.min,
tzinfo=datetime.timezone.utc
)
uid = str(component.get("UID", ""))
summary = str(component.get("SUMMARY", "Untitled"))
location = str(component.get("LOCATION", ""))
description = str(component.get("DESCRIPTION", ""))
events.append(CalendarEvent(
event_id=uid,
title=summary,
start_time=event_start,
end_time=event_end,
location=location,
description=description
))
return events
except Exception as e:
self.logger.error(f"Error reading ICS file: {e}")
return []
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(executor, _read_ics)
async def _fetch_google_events(
self,
start: datetime.datetime,
end: datetime.datetime
) -> List[CalendarEvent]:
"""
Fetch events from Google Calendar that fall within [start, end].
The Google API client is synchronous, so we run it in a thread
pool executor to avoid blocking the asyncio event loop.
"""
loop = asyncio.get_running_loop()
def _fetch_sync():
"""Synchronous Google Calendar API call."""
try:
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
SCOPES = ["https://www.googleapis.com/auth/calendar.readonly"]
creds = None
if os.path.exists(self.token_path):
creds = Credentials.from_authorized_user_file(
self.token_path, SCOPES
)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials_path, SCOPES
)
creds = flow.run_local_server(port=0)
with open(self.token_path, "w") as token:
token.write(creds.to_json())
service = build("calendar", "v3", credentials=creds)
result = service.events().list(
calendarId=self.calendar_id,
timeMin=start.isoformat(),
timeMax=end.isoformat(),
singleEvents=True,
orderBy="startTime"
).execute()
events = []
for item in result.get("items", []):
event_id = item.get("id", "")
title = item.get("summary", "Untitled")
location = item.get("location", "")
description = item.get("description", "")
start_str = item["start"].get(
"dateTime", item["start"].get("date")
)
end_str = item["end"].get(
"dateTime", item["end"].get("date")
) if "end" in item else None
event_start = datetime.datetime.fromisoformat(
start_str.replace("Z", "+00:00")
)
event_end = (
datetime.datetime.fromisoformat(
end_str.replace("Z", "+00:00")
)
if end_str else None
)
events.append(CalendarEvent(
event_id=event_id,
title=title,
start_time=event_start,
end_time=event_end,
location=location,
description=description
))
return events
except Exception as e:
self.logger.error(f"Error fetching Google Calendar events: {e}")
return []
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(executor, _fetch_sync)
async def _notify_coordinator(self, event: CalendarEvent):
"""
Send a calendar notification to the coordinator.
The payload contains all relevant event information as a
JSON-serializable dictionary.
"""
notification = Message(
message_type=MessageType.CALENDAR_NOTIFICATION,
sender=self.name,
recipient="coordinator",
payload={
"event_id": event.event_id,
"title": event.title,
"start_time": event.start_time.isoformat(),
"end_time": event.end_time.isoformat() if event.end_time else None,
"location": event.location,
"description": event.description
}
)
await self.bus.send(notification)
self.logger.info(
f"Sent calendar notification to coordinator: '{event.title}'"
)
And update main.py to include the calendar agent:
# main.py (Increment 4)
import asyncio
import yaml
import sys
from messaging.message_bus import MessageBus
from coordinator.coordinator_agent import CoordinatorAgent
from agents.mail_agent import MailAgent
from agents.calendar_agent import CalendarAgent
from utils.logger import get_logger
logger = get_logger("system", "system")
def load_config(path: str = "config.yaml") -> dict:
"""Load and parse the YAML configuration file."""
try:
with open(path, "r") as f:
return yaml.safe_load(f)
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
sys.exit(1)
async def main():
"""Start Octopussy Light with coordinator, mail, and calendar agents."""
logger.info("Starting Octopussy Light...")
config = load_config()
bus = MessageBus()
coordinator = CoordinatorAgent(config, bus)
mail_agent = MailAgent(config, bus)
calendar_agent = CalendarAgent(config, bus)
all_agents = [coordinator, mail_agent, calendar_agent]
try:
await asyncio.gather(
*[agent.start() for agent in all_agents]
)
finally:
for agent in all_agents:
await agent.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutting down...")
When a calendar event is within the notification window, the user will see output like this in their terminal:
[CALENDAR] Event: Q4 Budget Review Meeting
When: 2025-11-18T14:00:00+00:00
Location: Conference Room B / Zoom
Description: Quarterly budget review with the finance team.
Please bring updated projections.
This notification appears regardless of what the user is currently doing in the terminal, because the calendar agent and the coordinator are running as concurrent asyncio tasks.
CHAPTER SIX: INCREMENT FIVE - WORKER AGENTS AND TASK MANAGEMENT
Worker agents are the most powerful and flexible part of Octopussy Light. They are created on demand when the user wants to execute a task, and they can be reused if an idle worker is available. Each worker gets a unique task ID, maintains its own short-term and long-term memory, and can send multiple responses back to the coordinator for the same task.
The coordinator is responsible for creating workers, assigning tasks to them, reusing idle workers when appropriate, and cleaning up excess idle workers. Before assigning a task, the coordinator runs the task description through a prompt optimizer to improve the quality of the instructions the worker receives.
In this increment, workers reason about tasks using only their internal LLM knowledge. In the next increment, we will equip them with the predefined MCP tools that Octopussy Light ships with out of the box.
Let us start with the prompt optimizer, since it is a prerequisite for the worker system:
# coordinator/prompt_optimizer.py
#
# The Prompt Optimizer for Octopussy Light.
#
# Before a task is sent to a worker agent, the coordinator uses this
# optimizer to improve the task description. A well-crafted prompt
# leads to better, more focused results from the worker.
from llm.llm_client import LLMClient
from utils.logger import get_logger
logger = get_logger("prompt_optimizer", "coordinator")
OPTIMIZER_SYSTEM_PROMPT = """
You are a prompt engineering expert. Your job is to take a user's task
description and rewrite it as a precise, unambiguous instruction for an
AI agent. The optimized prompt should:
1. State the goal clearly and specifically.
2. Specify the expected output format (e.g., JSON, plain text, code).
3. Include any relevant constraints or requirements.
4. Remove ambiguity and vagueness.
5. Be concise but complete.
Return ONLY the optimized prompt text, with no preamble or explanation.
"""
class PromptOptimizer:
"""
Uses an LLM to optimize task descriptions before they are sent
to worker agents. Better prompts lead to better results.
"""
def __init__(self, llm_config: dict):
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["coordinator_model"],
system_prompt=OPTIMIZER_SYSTEM_PROMPT
)
logger.info("Prompt optimizer initialized.")
async def optimize(self, task_description: str) -> str:
"""
Optimize a task description for delivery to a worker agent.
Args:
task_description: The raw task description from the user.
Returns:
An optimized version of the task description.
"""
logger.debug(f"Optimizing prompt: {task_description[:60]}...")
try:
optimized = await self.llm.chat([
{
"role": "user",
"content": (
f"Optimize this task description for an AI agent:\n\n"
f"{task_description}"
)
}
])
logger.debug(f"Optimized prompt: {optimized[:60]}...")
return optimized.strip()
except Exception as e:
logger.error(f"Prompt optimization failed: {e}. Using original.")
return task_description
Now let us build the worker agent. In this increment the worker has its own short-term memory (conversation history) and long-term memory (a vector store using ChromaDB), but it does not yet call external tools — that comes in the next increment:
# agents/worker_agent.py
#
# The Worker Agent for Octopussy Light - Increment 5
#
# Worker agents are created on demand to execute specific tasks.
# Each worker has short-term memory (conversation history for the
# current task) and long-term memory (ChromaDB vector store). Workers
# can send multiple responses for a single task and include a unique
# task ID in every response so the coordinator can associate them correctly.
#
# In this increment workers rely on the LLM's internal knowledge only.
# Tool calling via MCP is added in Increment 6.
import asyncio
import uuid
from typing import List, Dict, Optional
import chromadb
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from llm.llm_client import LLMClient
from utils.logger import get_logger
WORKER_SYSTEM_PROMPT = """
You are a focused, capable worker agent in the Octopussy Light system.
You receive tasks from the coordinator and execute them to the best of
your ability. You are thorough, accurate, and efficient. When you have
completed a task, you provide a clear, well-structured response. If you
cannot complete a task, you explain why clearly and specifically.
"""
class WorkerAgent(BaseAgent):
"""
A worker agent that executes tasks assigned by the coordinator.
Each worker maintains short-term memory (conversation history for
the current task) and long-term memory (a persistent vector store
for facts and context that may be useful across tasks).
Workers are identified by a unique worker_id and can be reused
for multiple tasks sequentially. When a new task is assigned,
the worker's short-term memory is cleared.
"""
def __init__(
self,
worker_id: str,
config: dict,
bus: MessageBus
):
"""
Initialize a worker agent.
Args:
worker_id: A unique identifier for this worker instance.
config: The application configuration dictionary.
bus: The shared MessageBus instance.
"""
name = f"worker_{worker_id}"
super().__init__(
name=name,
bus=bus,
config=config,
color_key="worker"
)
self.worker_id = worker_id
llm_config = config["llm"]
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["worker_model"],
system_prompt=WORKER_SYSTEM_PROMPT
)
# Short-term memory: cleared when a new task is assigned.
self.short_term_memory: List[Dict[str, str]] = []
# Long-term memory: a ChromaDB collection persisted to disk.
self._init_long_term_memory()
# The current task ID, set when a task is assigned.
self.current_task_id: Optional[str] = None
# Whether this worker is currently executing a task.
self.is_busy: bool = False
self.logger.info(f"Worker '{name}' initialized.")
def _init_long_term_memory(self):
"""
Initialize the ChromaDB vector store for long-term memory.
Each worker gets its own collection in a shared ChromaDB
instance. The collection persists to disk across restarts.
"""
try:
self.chroma_client = chromadb.PersistentClient(
path=f"./memory/worker_{self.worker_id}"
)
self.memory_collection = self.chroma_client.get_or_create_collection(
name=f"worker_{self.worker_id}_memory"
)
self.logger.info(
f"Long-term memory initialized for worker {self.worker_id}."
)
except Exception as e:
self.logger.error(f"Failed to initialize long-term memory: {e}")
self.memory_collection = None
def clear_short_term_memory(self):
"""
Clear the worker's short-term memory.
Called by the coordinator before assigning a new task to
ensure the worker starts with a clean context.
"""
self.short_term_memory = []
self.logger.debug("Short-term memory cleared.")
async def store_in_long_term_memory(
self, content: str, metadata: dict = None
):
"""
Store a piece of information in long-term memory.
Args:
content: The text to store.
metadata: Optional metadata to associate with this memory.
"""
if self.memory_collection is None:
return
try:
self.memory_collection.add(
documents=[content],
ids=[str(uuid.uuid4())],
metadatas=[metadata or {}]
)
self.logger.debug(f"Stored in long-term memory: {content[:60]}...")
except Exception as e:
self.logger.error(f"Failed to store in long-term memory: {e}")
async def retrieve_from_long_term_memory(
self,
query: str,
n_results: int = 3
) -> List[str]:
"""
Retrieve relevant memories from long-term storage using
semantic similarity search.
Args:
query: The query text to search for.
n_results: The maximum number of results to return.
Returns:
A list of relevant memory strings.
"""
if self.memory_collection is None:
return []
try:
count = self.memory_collection.count()
if count == 0:
return []
results = self.memory_collection.query(
query_texts=[query],
n_results=min(n_results, count)
)
return results.get("documents", [[]])[0]
except Exception as e:
self.logger.error(f"Failed to retrieve from long-term memory: {e}")
return []
async def handle_message(self, message: Message):
"""Handle an incoming message from the coordinator."""
if message.message_type == MessageType.ASSIGN_TASK:
await self._handle_task_assignment(message)
elif message.message_type == MessageType.SHUTDOWN_AGENT:
await self.stop()
else:
self.logger.warning(
f"Worker received unhandled message type: "
f"{message.message_type}"
)
async def _handle_task_assignment(self, message: Message):
"""
Process a task assignment from the coordinator.
Clears short-term memory, retrieves relevant long-term memories,
executes the task, and sends the result back to the coordinator.
"""
payload = message.payload
task_id = payload.get("task_id", str(uuid.uuid4()))
task_description = payload.get("task_description", "")
self.current_task_id = task_id
self.is_busy = True
self.logger.info(
f"Received task {task_id}: {task_description[:60]}..."
)
self.clear_short_term_memory()
# Retrieve relevant long-term memories for context.
relevant_memories = await self.retrieve_from_long_term_memory(
task_description
)
if relevant_memories:
memory_context = (
"Relevant information from previous tasks:\n"
+ "\n".join(f"- {m}" for m in relevant_memories)
)
self.short_term_memory.append({
"role": "system",
"content": memory_context
})
self.short_term_memory.append({
"role": "user",
"content": task_description
})
try:
result = await self.llm.chat(self.short_term_memory)
self.short_term_memory.append({
"role": "assistant",
"content": result
})
await self.store_in_long_term_memory(
f"Task: {task_description}\nResult: {result}",
metadata={"task_id": task_id}
)
await self._send_result(task_id, result, response_number=1)
except Exception as e:
error_msg = f"Task execution failed: {str(e)}"
self.logger.error(error_msg)
await self._send_error(task_id, error_msg)
finally:
self.is_busy = False
async def _send_result(
self,
task_id: str,
result: str,
response_number: int = 1
):
"""Send a task result back to the coordinator."""
response = Message(
message_type=MessageType.TASK_RESULT,
sender=self.name,
recipient="coordinator",
task_id=task_id,
payload={
"task_id": task_id,
"worker_id": self.worker_id,
"result": result,
"response_number": response_number
}
)
await self.bus.send(response)
self.logger.info(
f"Sent result for task {task_id} (response #{response_number})."
)
async def _send_error(self, task_id: str, error_message: str):
"""Send an error message back to the coordinator."""
error = Message(
message_type=MessageType.TASK_ERROR,
sender=self.name,
recipient="coordinator",
task_id=task_id,
payload={
"task_id": task_id,
"worker_id": self.worker_id,
"error": error_message
}
)
await self.bus.send(error)
self.logger.error(
f"Sent error for task {task_id}: {error_message}"
)
Now we need to update the coordinator to manage workers. The coordinator needs to detect when the user wants to create a task, spawn or reuse a worker, optimize the task prompt, and assign the task. It also needs to manage the pool of idle workers. Let us build the WorkerManager:
# coordinator/worker_manager.py
#
# The Worker Manager for Octopussy Light - Increment 5
#
# Manages the lifecycle of worker agents: creating new workers,
# reusing idle workers, assigning tasks, and cleaning up excess
# idle workers. This keeps the coordinator's code clean and focused.
import asyncio
import uuid
from typing import Dict, List
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from coordinator.prompt_optimizer import PromptOptimizer
from utils.logger import get_logger
logger = get_logger("worker_manager", "coordinator")
class WorkerManager:
"""
Manages the pool of worker agents in Octopussy Light.
Responsibilities:
- Creating new worker agents when needed.
- Reusing idle workers to avoid unnecessary resource usage.
- Assigning tasks with unique task IDs.
- Cleaning up excess idle workers.
"""
# Maximum number of idle workers to keep alive. If there are
# more idle workers than this, the excess will be shut down.
MAX_IDLE_WORKERS = 3
def __init__(self, config: dict, bus: MessageBus):
self.config = config
self.bus = bus
self.prompt_optimizer = PromptOptimizer(config["llm"])
# Maps worker names to WorkerAgent instances.
self._workers: Dict[str, object] = {}
logger.info("Worker manager initialized.")
def _get_idle_workers(self) -> List[object]:
"""Return a list of all workers that are not currently busy."""
return [w for w in self._workers.values() if not w.is_busy]
async def assign_task(self, task_description: str) -> str:
"""
Assign a task to a worker agent.
If an idle worker is available, it will be reused. Otherwise,
a new worker is created. The task description is optimized
before being sent to the worker.
Args:
task_description: The raw task description from the user.
Returns:
The unique task ID assigned to this task.
"""
# Import here to avoid circular imports at module load time.
from agents.worker_agent import WorkerAgent
task_id = str(uuid.uuid4())[:8]
optimized_description = await self.prompt_optimizer.optimize(
task_description
)
logger.info(
f"Task {task_id} optimized. "
f"Original: '{task_description[:40]}...', "
f"Optimized: '{optimized_description[:40]}...'"
)
idle_workers = self._get_idle_workers()
if idle_workers:
worker = idle_workers[0]
logger.info(
f"Reusing idle worker '{worker.name}' for task {task_id}."
)
else:
worker_id = str(uuid.uuid4())[:6]
worker = WorkerAgent(worker_id, self.config, self.bus)
self._workers[worker.name] = worker
# Note: dynamically created worker tasks are not tracked for
# cancellation on shutdown. A production implementation would
# store the task handles and cancel them during teardown.
asyncio.create_task(worker.start())
logger.info(
f"Created new worker '{worker.name}' for task {task_id}."
)
task_message = Message(
message_type=MessageType.ASSIGN_TASK,
sender="coordinator",
recipient=worker.name,
task_id=task_id,
payload={
"task_id": task_id,
"task_description": optimized_description
}
)
await self.bus.send(task_message)
await self._cleanup_idle_workers()
return task_id
async def _cleanup_idle_workers(self):
"""
Shut down excess idle workers to free resources.
If there are more idle workers than MAX_IDLE_WORKERS, the
excess workers are sent a shutdown message and removed from
the pool. We iterate over a snapshot of the idle list to
avoid mutating self._workers while reading it.
"""
idle_workers = self._get_idle_workers()
excess_count = len(idle_workers) - self.MAX_IDLE_WORKERS
if excess_count <= 0:
return
logger.info(f"Cleaning up {excess_count} excess idle worker(s).")
for worker in idle_workers[:excess_count]:
shutdown_msg = Message(
message_type=MessageType.SHUTDOWN_AGENT,
sender="coordinator",
recipient=worker.name
)
await self.bus.send(shutdown_msg)
self.bus.unregister(worker.name)
del self._workers[worker.name]
logger.info(f"Shut down idle worker '{worker.name}'.")
Now we need to update the coordinator to use the WorkerManager and to detect when the user is requesting a task. We will use the LLM to determine whether the user's input is a task request or a conversational message:
# coordinator/coordinator_agent.py (Increment 5 - with worker management)
import asyncio
import sys
import json
from typing import List, Dict
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from coordinator.worker_manager import WorkerManager
from llm.llm_client import LLMClient
from utils.logger import get_logger
logger = get_logger("coordinator", "coordinator")
COORDINATOR_SYSTEM_PROMPT = """
You are the coordinator of an agentic AI system called Octopussy Light.
Your job is to understand the user's requests, coordinate specialized
agents to fulfill those requests, and present results clearly.
When the user asks you to perform a task (something that requires research,
analysis, writing, coding, or other substantive work), respond with a JSON
object in this exact format:
{"action": "create_task", "task_description": "<the task>"}
For conversational messages, questions about the system, or simple requests
that you can answer directly, respond normally in plain text.
You are friendly, precise, and efficient.
"""
class CoordinatorAgent(BaseAgent):
"""
The central coordinator of Octopussy Light.
Handles user communication and coordinates all other agents.
"""
def __init__(self, config: dict, bus: MessageBus):
super().__init__(
name="coordinator",
bus=bus,
config=config,
color_key="coordinator"
)
llm_config = config["llm"]
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["coordinator_model"],
system_prompt=COORDINATOR_SYSTEM_PROMPT
)
self.conversation_history: List[Dict[str, str]] = []
self.worker_manager = WorkerManager(config, bus)
self.logger.info(
f"Coordinator initialized with model: "
f"{llm_config['coordinator_model']}"
)
async def handle_message(self, message: Message):
"""Handle messages arriving from other agents."""
if message.message_type == MessageType.MAIL_NOTIFICATION:
await self._display_mail_notification(message.payload)
elif message.message_type == MessageType.CALENDAR_NOTIFICATION:
await self._display_calendar_notification(message.payload)
elif message.message_type == MessageType.TASK_RESULT:
await self._display_task_result(message.payload)
elif message.message_type == MessageType.TASK_ERROR:
await self._display_task_error(message.payload)
async def _display_mail_notification(self, payload: dict):
print(
f"\n[MAIL] From: {payload.get('sender', 'Unknown')}\n"
f" Subject: {payload.get('subject', 'No subject')}\n"
f" Summary: {payload.get('summary', '')}\n"
)
async def _display_calendar_notification(self, payload: dict):
print(
f"\n[CALENDAR] Event: {payload.get('title', 'Untitled')}\n"
f" When: {payload.get('start_time', 'Unknown')}\n"
f" Location: {payload.get('location', 'Not specified')}\n"
)
async def _display_task_result(self, payload: dict):
task_id = payload.get("task_id", "unknown")
response_num = payload.get("response_number", 1)
result = payload.get("result", "")
print(
f"\n[TASK RESULT] Task {task_id} "
f"(response #{response_num}):\n{result}\n"
)
async def _display_task_error(self, payload: dict):
task_id = payload.get("task_id", "unknown")
error = payload.get("error", "Unknown error")
print(f"\n[TASK ERROR] Task {task_id} failed: {error}\n")
async def chat(self, user_input: str) -> str:
"""
Process user input. If the LLM indicates a task should be created,
delegate to the worker manager. Otherwise, respond conversationally.
"""
self.conversation_history.append({
"role": "user",
"content": user_input
})
response = await self.llm.chat(self.conversation_history)
try:
parsed = json.loads(response.strip())
if parsed.get("action") == "create_task":
task_desc = parsed.get("task_description", user_input)
task_id = await self.worker_manager.assign_task(task_desc)
reply = (
f"I have assigned your task (ID: {task_id}) to a worker. "
f"You will receive the result shortly."
)
self.conversation_history.append({
"role": "assistant",
"content": reply
})
return reply
except (json.JSONDecodeError, AttributeError):
pass
self.conversation_history.append({
"role": "assistant",
"content": response
})
return response
async def run_background_tasks(self):
"""Read user input asynchronously."""
loop = asyncio.get_running_loop()
print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")
while self.running:
try:
print("You: ", end="", flush=True)
user_input = await loop.run_in_executor(
None, sys.stdin.readline
)
user_input = user_input.strip()
if not user_input:
continue
if user_input.lower() in ("quit", "exit"):
self.running = False
print("Goodbye!")
break
response = await self.chat(user_input)
print(f"\nCoordinator: {response}\n")
except Exception as e:
self.logger.error(f"Error in input loop: {e}")
Here is an example interaction with the system at this point:
You: Write me a Python function that checks if a number is prime.
Coordinator: I have assigned your task (ID: a3f7b2c1) to a worker.
You will receive the result shortly.
[TASK RESULT] Task a3f7b2c1 (response #1):
Here is a Python function that checks if a number is prime:
def is_prime(n: int) -> bool:
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(n**0.5) + 1, 2):
if n % i == 0:
return False
return True
You: Hello, how are you?
Coordinator: I am doing well, thank you for asking! I am ready to help
you with tasks, email monitoring, calendar events, and more. What would
you like to do?
The coordinator correctly distinguishes between a task request (which it delegates to a worker) and a conversational message (which it handles directly). The task ID allows the user to associate the result with the original request even if other notifications arrive in between. Workers in this increment rely entirely on the LLM's internal knowledge. In the next increment we will wire in the MCP built-in tools server and give workers real-world capabilities.
CHAPTER SEVEN: INCREMENT SIX - PREDEFINED MCP TOOLS FOR WORKERS
Workers can now reason and respond, but they are blind to the outside world. They cannot read files, execute code, or call any external service. This increment changes that by introducing the Model Context Protocol infrastructure and wiring Octopussy Light's predefined built-in tools directly to every worker agent.
The approach is deliberately incremental. We first build the MCP tools server with a curated set of built-in tools, then build the client-side infrastructure (MCPToolClient and MCPServerRegistry), then update the WorkerAgent to implement the agentic ReAct loop (Reason, Act, Observe), and finally update the WorkerManager and coordinator to pass the registry down to workers. The tool agent — which can create entirely new tools dynamically — comes in the next increment and builds naturally on top of this foundation.
Let us start by adding the MCP server configuration to config.yaml:
# config.yaml (additions for MCP servers)
mcp_servers:
# The built-in tools server ships with Octopussy Light and is
# always present. It provides file I/O and code execution tools.
- name: "builtin"
type: "local"
command: ["python", "mcp_server/builtin_tools_server.py"]
# To add a remote or custom local MCP server, uncomment and edit:
# - name: "my_custom_server"
# type: "local"
# command: ["python", "/path/to/my_mcp_server.py"]
#
# - name: "company_data"
# type: "http"
# url: "https://mcp.yourcompany.com/tools"
# api_key: "your_api_key_here"
Now let us build the MCP tools server. We use the FastMCP class from the MCP Python SDK to expose four predefined tools: reading files, writing files, listing directories, and executing Python code in a subprocess sandbox:
# mcp_server/builtin_tools_server.py
#
# The built-in MCP tools server for Octopussy Light.
#
# This server exposes the system's predefined built-in tools to all
# agents via the Model Context Protocol. It runs as a separate process
# and communicates with agents via STDIO transport.
#
# Built-in tools:
# - read_file: Read the contents of a local file.
# - write_file: Write content to a local file.
# - list_directory: List the contents of a directory.
# - execute_python: Execute a Python code snippet in a sandbox.
import os
import subprocess
import tempfile
from pathlib import Path
from mcp.server.fastmcp import FastMCP
# Initialize the FastMCP server with a descriptive name.
mcp = FastMCP("OctopussyLight-BuiltinTools")
@mcp.tool()
def read_file(file_path: str) -> str:
"""
Read the contents of a local file and return them as a string.
Args:
file_path: The absolute or relative path to the file to read.
Returns:
The file contents as a string, or an error message if the
file cannot be read.
"""
try:
path = Path(file_path)
if not path.exists():
return f"Error: File not found: {file_path}"
if not path.is_file():
return f"Error: Path is not a file: {file_path}"
with open(path, "r", encoding="utf-8", errors="replace") as f:
content = f.read()
return content
except PermissionError:
return f"Error: Permission denied reading file: {file_path}"
except Exception as e:
return f"Error reading file: {str(e)}"
@mcp.tool()
def write_file(file_path: str, content: str) -> str:
"""
Write content to a local file, creating it if it does not exist.
Args:
file_path: The path to the file to write.
content: The content to write to the file.
Returns:
A success message or an error message.
"""
try:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return f"Successfully wrote {len(content)} characters to {file_path}."
except PermissionError:
return f"Error: Permission denied writing to: {file_path}"
except Exception as e:
return f"Error writing file: {str(e)}"
@mcp.tool()
def list_directory(directory_path: str) -> str:
"""
List the contents of a directory.
Args:
directory_path: The path to the directory to list.
Returns:
A formatted string listing the directory contents, or an error.
"""
try:
path = Path(directory_path)
if not path.exists():
return f"Error: Directory not found: {directory_path}"
if not path.is_dir():
return f"Error: Path is not a directory: {directory_path}"
entries = []
for entry in sorted(path.iterdir()):
entry_type = "DIR " if entry.is_dir() else "FILE"
size = entry.stat().st_size if entry.is_file() else 0
entries.append(f"{entry_type} {entry.name} ({size} bytes)")
if not entries:
return f"Directory is empty: {directory_path}"
return f"Contents of {directory_path}:\n" + "\n".join(entries)
except Exception as e:
return f"Error listing directory: {str(e)}"
@mcp.tool()
def execute_python(code: str) -> str:
"""
Execute a Python code snippet in a sandboxed subprocess.
The code runs in a separate process with a 10-second timeout.
Standard output and standard error are captured and returned.
SECURITY NOTE: This tool executes arbitrary Python code. In a
production environment, use a proper sandbox like Docker or gVisor.
For Octopussy Light, we use subprocess isolation as a basic measure.
Args:
code: The Python code to execute.
Returns:
The combined stdout and stderr output, or a timeout/error message.
"""
tmp_path = None
try:
# Write the code to a temporary file to avoid shell injection.
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".py",
delete=False,
encoding="utf-8"
) as tmp:
tmp.write(code)
tmp_path = tmp.name
result = subprocess.run(
["python", tmp_path],
capture_output=True,
text=True,
timeout=10,
# Restrict environment variables for basic isolation.
env={"PATH": os.environ.get("PATH", "")}
)
output = ""
if result.stdout:
output += f"STDOUT:\n{result.stdout}\n"
if result.stderr:
output += f"STDERR:\n{result.stderr}\n"
if not output:
output = "Code executed successfully with no output."
return output
except subprocess.TimeoutExpired:
return "Error: Code execution timed out after 10 seconds."
except Exception as e:
return f"Error executing code: {str(e)}"
finally:
if tmp_path is not None:
try:
os.unlink(tmp_path)
except Exception:
pass
if __name__ == "__main__":
# Run the MCP server using STDIO transport.
# Agents connect to this server by launching it as a subprocess.
mcp.run(transport="stdio")
Now let us build the MCP client that agents use to connect to MCP servers, discover their tools, and call them:
# mcp_client/mcp_tool_client.py
#
# The MCP Tool Client for Octopussy Light agents.
#
# Each agent uses this client to connect to MCP servers and call
# their tools. Supports local (STDIO) servers. The client maintains
# a persistent connection across multiple tool calls.
from typing import Any, Dict, List, Optional
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from utils.logger import get_logger
logger = get_logger("mcp_client", "system")
class MCPToolClient:
"""
A client for connecting to a local MCP server and calling its tools.
The client maintains a persistent connection to the server process
across multiple tool calls. Call connect() once at startup and
disconnect() once at shutdown.
"""
def __init__(self, server_command: List[str], server_name: str = "unknown"):
"""
Initialize the MCP tool client.
Args:
server_command: The command to launch the MCP server process,
as a list of strings. For example:
["python", "mcp_server/builtin_tools_server.py"]
server_name: A human-readable name for this server,
used in log messages.
"""
self.server_command = server_command
self.server_name = server_name
self._session: Optional[ClientSession] = None
self._available_tools: List[Dict] = []
# These context manager objects are stored so that disconnect()
# can cleanly exit them even if connect() was called without
# a surrounding 'async with' block.
self._stdio_context = None
self._session_context = None
logger.info(f"MCP client initialized for server: {server_name}")
async def connect(self):
"""
Connect to the MCP server and discover available tools.
Launches the server subprocess, performs the MCP protocol
handshake, and populates the list of available tools.
Must be called before any call to call_tool().
"""
try:
server_params = StdioServerParameters(
command=self.server_command[0],
args=self.server_command[1:],
env=None
)
# Enter the stdio_client context manager manually so we can
# keep the connection alive across multiple tool calls.
self._stdio_context = stdio_client(server_params)
read, write = await self._stdio_context.__aenter__()
# Enter the ClientSession context manager manually for the
# same reason.
self._session_context = ClientSession(read, write)
self._session = await self._session_context.__aenter__()
# Perform the MCP protocol handshake.
await self._session.initialize()
# Discover available tools from the server.
tools_response = await self._session.list_tools()
self._available_tools = [
{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}
for tool in tools_response.tools
]
logger.info(
f"Connected to MCP server '{self.server_name}'. "
f"Available tools: "
f"{[t['name'] for t in self._available_tools]}"
)
except Exception as e:
logger.error(
f"Failed to connect to MCP server '{self.server_name}': {e}"
)
raise
async def disconnect(self):
"""
Disconnect from the MCP server and clean up resources.
Safely exits both context managers regardless of whether
connect() completed successfully.
"""
try:
if self._session_context is not None:
await self._session_context.__aexit__(None, None, None)
self._session_context = None
if self._stdio_context is not None:
await self._stdio_context.__aexit__(None, None, None)
self._stdio_context = None
self._session = None
logger.info(
f"Disconnected from MCP server '{self.server_name}'."
)
except Exception as e:
logger.error(
f"Error disconnecting from MCP server "
f"'{self.server_name}': {e}"
)
def get_available_tools(self) -> List[Dict]:
"""
Return the list of tools available on this MCP server.
Returns:
A list of tool descriptor dicts, each with 'name',
'description', and 'input_schema' keys.
"""
return self._available_tools
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""
Call a tool on the connected MCP server.
Args:
tool_name: The name of the tool to call.
arguments: A dictionary of arguments to pass to the tool.
Returns:
The tool's response as a string.
Raises:
RuntimeError: If connect() has not been called successfully.
"""
if self._session is None:
raise RuntimeError(
f"Not connected to MCP server '{self.server_name}'. "
f"Call connect() before calling tools."
)
logger.debug(
f"Calling tool '{tool_name}' on server '{self.server_name}' "
f"with args: {arguments}"
)
try:
result = await self._session.call_tool(tool_name, arguments)
# Extract the text content from the MCP result object.
if result.content:
return "\n".join(
item.text
for item in result.content
if hasattr(item, "text")
)
return "Tool returned no content."
except Exception as e:
error_msg = (
f"Error calling tool '{tool_name}' on server "
f"'{self.server_name}': {str(e)}"
)
logger.error(error_msg)
return error_msg
Now let us build the MCP server registry, which manages connections to all configured MCP servers and provides a unified interface for tool discovery and invocation:
# mcp_client/mcp_server_registry.py
#
# The MCP Server Registry for Octopussy Light.
#
# Manages connections to all configured MCP servers and provides a
# unified interface for agents to discover and call tools regardless
# of which server hosts them.
from typing import Any, Dict, List
from mcp_client.mcp_tool_client import MCPToolClient
from utils.logger import get_logger
logger = get_logger("mcp_registry", "system")
class MCPServerRegistry:
"""
Manages all MCP server connections in Octopussy Light.
Provides a single point of access for all tool capabilities,
regardless of whether they come from local or remote MCP servers.
Agents query the registry to discover available tools and call them.
"""
def __init__(self, mcp_server_configs: List[Dict]):
"""
Initialize the registry with server configurations.
Args:
mcp_server_configs: A list of server configuration dicts
from the config.yaml mcp_servers section.
"""
self._server_configs = mcp_server_configs
self._clients: Dict[str, MCPToolClient] = {}
self._tool_index: Dict[str, str] = {} # tool_name -> server_name
logger.info(
f"MCP server registry initialized with "
f"{len(mcp_server_configs)} server(s)."
)
async def connect_all(self):
"""
Connect to all configured MCP servers and discover their tools.
Should be called once at system startup before any agents begin
processing tasks. Servers that fail to connect are skipped with
an error log; the system continues with the remaining servers.
"""
for server_config in self._server_configs:
name = server_config.get("name", "unnamed")
server_type = server_config.get("type", "local")
try:
if server_type == "local":
command = server_config.get("command", [])
client = MCPToolClient(command, server_name=name)
await client.connect()
self._clients[name] = client
# Index all tools from this server. If two servers
# expose a tool with the same name, the later server
# takes precedence and a warning is logged.
for tool in client.get_available_tools():
tool_name = tool["name"]
if tool_name in self._tool_index:
logger.warning(
f"Tool '{tool_name}' from server '{name}' "
f"conflicts with existing tool from "
f"'{self._tool_index[tool_name]}'. "
f"The new one will take precedence."
)
self._tool_index[tool_name] = name
elif server_type == "http":
logger.info(
f"HTTP MCP server '{name}' configured. HTTP "
f"transport support requires additional setup "
f"beyond this article's scope. Skipping."
)
else:
logger.warning(
f"Unknown MCP server type '{server_type}' "
f"for server '{name}'. Skipping."
)
except Exception as e:
logger.error(
f"Failed to connect to MCP server '{name}': {e}"
)
logger.info(
f"MCP registry ready. Available tools: "
f"{list(self._tool_index.keys())}"
)
async def disconnect_all(self):
"""Disconnect from all connected MCP servers."""
for name, client in self._clients.items():
try:
await client.disconnect()
except Exception as e:
logger.error(
f"Error disconnecting from server '{name}': {e}"
)
def get_all_tools(self) -> List[Dict]:
"""
Return a list of all available tools across all connected servers.
Returns:
A list of tool descriptor dicts with 'name', 'description',
and 'input_schema' keys.
"""
all_tools = []
for client in self._clients.values():
all_tools.extend(client.get_available_tools())
return all_tools
async def call_tool(
self,
tool_name: str,
arguments: Dict[str, Any]
) -> str:
"""
Call a tool by name, routing to the correct server automatically.
Args:
tool_name: The name of the tool to call.
arguments: Arguments to pass to the tool.
Returns:
The tool's response as a string.
"""
server_name = self._tool_index.get(tool_name)
if not server_name:
available = list(self._tool_index.keys())
return (
f"Error: Tool '{tool_name}' not found in any connected "
f"MCP server. Available tools: {available}"
)
client = self._clients.get(server_name)
if not client:
return (
f"Error: Server '{server_name}' is registered in the "
f"tool index but has no active client connection."
)
return await client.call_tool(tool_name, arguments)
With the registry in place, we now update the WorkerAgent to implement the agentic ReAct loop. The worker can now call tools, observe their results, and continue reasoning until it reaches a final answer:
# agents/worker_agent.py (updated for Increment 6 - with MCP tool support)
#
# The key addition is the _execute_with_tools() method, which implements
# an agentic loop: the LLM can call tools, observe results, and continue
# reasoning until it produces a final answer.
import asyncio
import json
import uuid
from typing import List, Dict, Optional
import chromadb
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from llm.llm_client import LLMClient
from utils.logger import get_logger
WORKER_SYSTEM_PROMPT = """
You are a focused, capable worker agent in the Octopussy Light system.
You receive tasks from the coordinator and execute them using available tools.
When you need to use a tool, respond with a JSON object in this format:
{"tool_call": {"name": "<tool_name>", "arguments": {<arguments>}}}
When you have gathered enough information and are ready to provide your
final answer, respond with plain text (not JSON).
Be thorough, accurate, and efficient. If you cannot complete a task,
explain clearly why.
"""
class WorkerAgent(BaseAgent):
"""
A worker agent that executes tasks using LLM reasoning and MCP tools.
The worker implements the ReAct agentic loop: it reasons about the
task, calls tools as needed, observes results, and continues until
it produces a final answer. If no MCP registry is provided, the
worker falls back to LLM-only reasoning (as in Increment 5).
"""
def __init__(
self,
worker_id: str,
config: dict,
bus: MessageBus,
mcp_registry=None
):
"""
Initialize a worker agent.
Args:
worker_id: A unique identifier for this worker instance.
config: The application configuration dictionary.
bus: The shared MessageBus instance.
mcp_registry: Optional MCPServerRegistry providing tool access.
If None, the worker uses LLM-only reasoning.
"""
name = f"worker_{worker_id}"
super().__init__(
name=name,
bus=bus,
config=config,
color_key="worker"
)
self.worker_id = worker_id
self.mcp_registry = mcp_registry
llm_config = config["llm"]
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["worker_model"],
system_prompt=WORKER_SYSTEM_PROMPT
)
self.short_term_memory: List[Dict[str, str]] = []
self._init_long_term_memory()
self.current_task_id: Optional[str] = None
self.is_busy: bool = False
self.logger.info(f"Worker '{name}' initialized.")
def _init_long_term_memory(self):
"""Initialize ChromaDB for long-term memory."""
try:
self.chroma_client = chromadb.PersistentClient(
path=f"./memory/worker_{self.worker_id}"
)
self.memory_collection = self.chroma_client.get_or_create_collection(
name=f"worker_{self.worker_id}_memory"
)
except Exception as e:
self.logger.error(f"Failed to initialize long-term memory: {e}")
self.memory_collection = None
def clear_short_term_memory(self):
"""Clear short-term memory for a new task."""
self.short_term_memory = []
async def store_in_long_term_memory(
self, content: str, metadata: dict = None
):
"""Store information in long-term memory."""
if self.memory_collection is None:
return
try:
self.memory_collection.add(
documents=[content],
ids=[str(uuid.uuid4())],
metadatas=[metadata or {}]
)
except Exception as e:
self.logger.error(f"Failed to store in long-term memory: {e}")
async def retrieve_from_long_term_memory(
self, query: str, n_results: int = 3
) -> List[str]:
"""Retrieve relevant memories from long-term storage."""
if self.memory_collection is None:
return []
try:
count = self.memory_collection.count()
if count == 0:
return []
results = self.memory_collection.query(
query_texts=[query],
n_results=min(n_results, count)
)
return results.get("documents", [[]])[0]
except Exception as e:
self.logger.error(f"Failed to retrieve from long-term memory: {e}")
return []
async def handle_message(self, message: Message):
"""Handle incoming messages."""
if message.message_type == MessageType.ASSIGN_TASK:
await self._handle_task_assignment(message)
elif message.message_type == MessageType.SHUTDOWN_AGENT:
await self.stop()
async def _handle_task_assignment(self, message: Message):
"""Process a task assignment from the coordinator."""
payload = message.payload
task_id = payload.get("task_id", str(uuid.uuid4()))
task_description = payload.get("task_description", "")
self.current_task_id = task_id
self.is_busy = True
self.logger.info(
f"Received task {task_id}: {task_description[:60]}..."
)
self.clear_short_term_memory()
# Load relevant long-term memories.
memories = await self.retrieve_from_long_term_memory(task_description)
if memories:
memory_context = (
"Relevant information from previous tasks:\n"
+ "\n".join(f"- {m}" for m in memories)
)
self.short_term_memory.append({
"role": "system",
"content": memory_context
})
# Build the task context as a single user message that includes
# both the tool descriptions and the task itself. This is more
# compatible with LLMs that only support a single system message,
# and ensures the tool list is clearly associated with the task.
if self.mcp_registry:
tools = self.mcp_registry.get_all_tools()
if tools:
tool_descriptions = "\n".join(
f"- {t['name']}: {t['description']}"
for t in tools
)
task_context = (
f"Available tools:\n{tool_descriptions}\n\n"
f"Task: {task_description}"
)
else:
task_context = task_description
else:
task_context = task_description
self.short_term_memory.append({
"role": "user",
"content": task_context
})
try:
result = await self._execute_with_tools(task_id)
await self.store_in_long_term_memory(
f"Task: {task_description}\nResult: {result}",
metadata={"task_id": task_id}
)
await self._send_result(task_id, result, response_number=1)
except Exception as e:
error_msg = f"Task execution failed: {str(e)}"
self.logger.error(error_msg)
await self._send_error(task_id, error_msg)
finally:
self.is_busy = False
async def _execute_with_tools(self, task_id: str) -> str:
"""
Execute a task using the ReAct agentic loop with tool calling.
The worker reasons about the task, calls tools as needed,
observes results, and continues until it produces a final answer.
The loop runs for a maximum of 10 iterations to prevent runaway
execution. If no MCP registry is available, the first LLM response
is returned directly as the final answer.
Args:
task_id: The task ID for logging purposes.
Returns:
The final answer as a string.
"""
max_iterations = 10
# Initialize response to a safe default so it is always defined
# when referenced after the loop, even if the loop body is
# somehow skipped (which cannot happen with max_iterations > 0,
# but the initialization makes the code's intent explicit).
response = ""
for iteration in range(max_iterations):
self.logger.debug(
f"Task {task_id}, iteration {iteration + 1}/{max_iterations}"
)
response = await self.llm.chat(self.short_term_memory)
# Check if the response is a tool call.
try:
parsed = json.loads(response.strip())
if "tool_call" in parsed and self.mcp_registry:
tool_name = parsed["tool_call"]["name"]
arguments = parsed["tool_call"].get("arguments", {})
self.logger.info(
f"Task {task_id}: calling tool '{tool_name}' "
f"with args: {arguments}"
)
# Call the tool via the MCP registry.
tool_result = await self.mcp_registry.call_tool(
tool_name, arguments
)
# Add the tool call and its result to short-term memory
# so the LLM can reason about the outcome.
self.short_term_memory.append({
"role": "assistant",
"content": response
})
self.short_term_memory.append({
"role": "user",
"content": (
f"Tool '{tool_name}' returned:\n{tool_result}"
)
})
# Continue the loop to let the LLM reason about the result.
continue
except (json.JSONDecodeError, KeyError, TypeError):
# Not a valid tool call JSON; treat as a final answer.
pass
# The response is plain text: this is the final answer.
self.short_term_memory.append({
"role": "assistant",
"content": response
})
return response
# If we exhaust all iterations without a plain-text response,
# return whatever the last LLM response was.
self.logger.warning(
f"Task {task_id} hit the iteration limit of {max_iterations}. "
f"Returning last response."
)
return (
f"Task reached the maximum reasoning iteration limit. "
f"Partial result:\n{response}"
)
async def _send_result(
self, task_id: str, result: str, response_number: int = 1
):
"""Send a task result to the coordinator."""
msg = Message(
message_type=MessageType.TASK_RESULT,
sender=self.name,
recipient="coordinator",
task_id=task_id,
payload={
"task_id": task_id,
"worker_id": self.worker_id,
"result": result,
"response_number": response_number
}
)
await self.bus.send(msg)
async def _send_error(self, task_id: str, error_message: str):
"""Send an error message to the coordinator."""
msg = Message(
message_type=MessageType.TASK_ERROR,
sender=self.name,
recipient="coordinator",
task_id=task_id,
payload={
"task_id": task_id,
"worker_id": self.worker_id,
"error": error_message
}
)
await self.bus.send(msg)
We also need to update the WorkerManager to accept and forward the MCP registry to newly created workers:
# coordinator/worker_manager.py (updated for Increment 6 - with MCP registry)
import asyncio
import uuid
from typing import Dict, List, Optional
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from coordinator.prompt_optimizer import PromptOptimizer
from utils.logger import get_logger
logger = get_logger("worker_manager", "coordinator")
class WorkerManager:
"""
Manages the lifecycle of worker agents in Octopussy Light.
Now accepts an optional MCP registry and passes it to each worker
so they can call predefined tools during task execution.
"""
MAX_IDLE_WORKERS = 3
def __init__(
self,
config: dict,
bus: MessageBus,
mcp_registry=None
):
"""
Initialize the worker manager.
Args:
config: The application configuration dictionary.
bus: The shared MessageBus instance.
mcp_registry: Optional MCPServerRegistry. When provided, every
new worker is given access to the registry so it
can call predefined MCP tools during execution.
"""
self.config = config
self.bus = bus
self.mcp_registry = mcp_registry
self.prompt_optimizer = PromptOptimizer(config["llm"])
self._workers: Dict[str, object] = {}
logger.info("Worker manager initialized.")
def _get_idle_workers(self) -> List[object]:
"""Return all workers that are not currently busy."""
return [w for w in self._workers.values() if not w.is_busy]
async def assign_task(self, task_description: str) -> str:
"""
Assign a task to a worker, reusing an idle one if available.
New workers are created with access to the MCP registry.
"""
# Import here to avoid circular imports at module load time.
from agents.worker_agent import WorkerAgent
task_id = str(uuid.uuid4())[:8]
optimized_description = await self.prompt_optimizer.optimize(
task_description
)
idle_workers = self._get_idle_workers()
if idle_workers:
worker = idle_workers[0]
logger.info(
f"Reusing idle worker '{worker.name}' for task {task_id}."
)
else:
worker_id = str(uuid.uuid4())[:6]
worker = WorkerAgent(
worker_id,
self.config,
self.bus,
mcp_registry=self.mcp_registry
)
self._workers[worker.name] = worker
asyncio.create_task(worker.start())
logger.info(
f"Created new worker '{worker.name}' for task {task_id}."
)
task_message = Message(
message_type=MessageType.ASSIGN_TASK,
sender="coordinator",
recipient=worker.name,
task_id=task_id,
payload={
"task_id": task_id,
"task_description": optimized_description
}
)
await self.bus.send(task_message)
await self._cleanup_idle_workers()
return task_id
async def _cleanup_idle_workers(self):
"""Shut down excess idle workers."""
idle_workers = self._get_idle_workers()
excess_count = len(idle_workers) - self.MAX_IDLE_WORKERS
if excess_count <= 0:
return
for worker in idle_workers[:excess_count]:
shutdown_msg = Message(
message_type=MessageType.SHUTDOWN_AGENT,
sender="coordinator",
recipient=worker.name
)
await self.bus.send(shutdown_msg)
self.bus.unregister(worker.name)
del self._workers[worker.name]
logger.info(f"Shut down idle worker '{worker.name}'.")
The coordinator also needs to accept the MCP registry and pass it to the WorkerManager. Here is the complete updated coordinator for Increment 6:
# coordinator/coordinator_agent.py (Increment 6 - complete file)
#
# The coordinator now accepts an optional mcp_registry parameter and
# passes it to the WorkerManager so workers can use predefined tools.
# The system prompt is also updated to mention data retrieval tasks.
import asyncio
import sys
import json
from typing import List, Dict, Optional
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from coordinator.worker_manager import WorkerManager
from llm.llm_client import LLMClient
from utils.logger import get_logger
logger = get_logger("coordinator", "coordinator")
COORDINATOR_SYSTEM_PROMPT = """
You are the coordinator of an agentic AI system called Octopussy Light.
Your job is to understand the user's requests, coordinate specialized
agents to fulfill those requests, and present results clearly.
When the user asks you to perform a task (something that requires research,
analysis, writing, coding, data retrieval, or other substantive work),
respond with a JSON object in this exact format:
{"action": "create_task", "task_description": "<the task>"}
For conversational messages, questions about the system, or simple requests
that you can answer directly from your knowledge, respond in plain text.
You are friendly, precise, and efficient.
"""
class CoordinatorAgent(BaseAgent):
"""
The central coordinator of Octopussy Light.
Handles user communication and coordinates all other agents.
"""
def __init__(
self,
config: dict,
bus: MessageBus,
mcp_registry=None
):
super().__init__(
name="coordinator",
bus=bus,
config=config,
color_key="coordinator"
)
llm_config = config["llm"]
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["coordinator_model"],
system_prompt=COORDINATOR_SYSTEM_PROMPT
)
self.conversation_history: List[Dict[str, str]] = []
# Pass the MCP registry to the worker manager so every worker
# it creates will have access to the predefined tools.
self.worker_manager = WorkerManager(
config, bus, mcp_registry=mcp_registry
)
self.logger.info(
f"Coordinator initialized with model: "
f"{llm_config['coordinator_model']}"
)
async def handle_message(self, message: Message):
"""Handle messages from other agents."""
if message.message_type == MessageType.MAIL_NOTIFICATION:
await self._display_mail_notification(message.payload)
elif message.message_type == MessageType.CALENDAR_NOTIFICATION:
await self._display_calendar_notification(message.payload)
elif message.message_type == MessageType.TASK_RESULT:
await self._display_task_result(message.payload)
elif message.message_type == MessageType.TASK_ERROR:
await self._display_task_error(message.payload)
elif message.message_type == MessageType.AGENT_NOTIFICATION:
await self._display_agent_notification(message.payload)
async def _display_mail_notification(self, payload: dict):
print(
f"\n[MAIL] From: {payload.get('sender', 'Unknown')}\n"
f" Subject: {payload.get('subject', 'No subject')}\n"
f" Summary: {payload.get('summary', '')}\n"
)
async def _display_calendar_notification(self, payload: dict):
print(
f"\n[CALENDAR] Event: {payload.get('title', 'Untitled')}\n"
f" When: {payload.get('start_time', 'Unknown')}\n"
f" Location: {payload.get('location', 'Not specified')}\n"
)
async def _display_task_result(self, payload: dict):
task_id = payload.get("task_id", "unknown")
response_num = payload.get("response_number", 1)
result = payload.get("result", "")
print(
f"\n[TASK RESULT] Task {task_id} "
f"(response #{response_num}):\n{result}\n"
)
async def _display_task_error(self, payload: dict):
task_id = payload.get("task_id", "unknown")
error = payload.get("error", "Unknown error")
print(f"\n[TASK ERROR] Task {task_id} failed: {error}\n")
async def _display_agent_notification(self, payload: dict):
notification_type = payload.get("notification_type", "")
if notification_type == "tool_created":
print(
f"\n[TOOL AGENT] New tool created: "
f"{payload.get('tool_description', '')}\n"
)
elif notification_type == "tool_creation_failed":
print(
f"\n[TOOL AGENT] Tool creation failed: "
f"{payload.get('reason', 'Unknown reason')}\n"
)
else:
print(f"\n[AGENT] {payload.get('message', str(payload))}\n")
async def chat(self, user_input: str) -> str:
"""
Process user input and route to the appropriate handler.
"""
self.conversation_history.append({
"role": "user",
"content": user_input
})
response = await self.llm.chat(self.conversation_history)
try:
parsed = json.loads(response.strip())
action = parsed.get("action")
if action == "create_task":
task_desc = parsed.get("task_description", user_input)
task_id = await self.worker_manager.assign_task(task_desc)
reply = (
f"Task assigned (ID: {task_id}). "
f"A worker is now executing it. "
f"Results will appear shortly."
)
self.conversation_history.append({
"role": "assistant",
"content": reply
})
return reply
except (json.JSONDecodeError, AttributeError):
pass
self.conversation_history.append({
"role": "assistant",
"content": response
})
return response
async def run_background_tasks(self):
"""Read user input asynchronously."""
loop = asyncio.get_running_loop()
print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")
while self.running:
try:
print("You: ", end="", flush=True)
user_input = await loop.run_in_executor(
None, sys.stdin.readline
)
user_input = user_input.strip()
if not user_input:
continue
if user_input.lower() in ("quit", "exit"):
self.running = False
print("Goodbye!")
break
response = await self.chat(user_input)
print(f"\nCoordinator: {response}\n")
except Exception as e:
self.logger.error(f"Error in input loop: {e}")
Now update main.py to start the MCP server registry and wire it into the coordinator:
# main.py (Increment 6)
import asyncio
import yaml
import sys
from messaging.message_bus import MessageBus
from coordinator.coordinator_agent import CoordinatorAgent
from agents.mail_agent import MailAgent
from agents.calendar_agent import CalendarAgent
from mcp_client.mcp_server_registry import MCPServerRegistry
from utils.logger import get_logger
logger = get_logger("system", "system")
def load_config(path: str = "config.yaml") -> dict:
"""Load and parse the YAML configuration file."""
try:
with open(path, "r") as f:
return yaml.safe_load(f)
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
sys.exit(1)
async def main():
"""
Start Octopussy Light with coordinator, mail, calendar agents,
and the MCP built-in tools server connected to all workers.
"""
logger.info("Starting Octopussy Light...")
config = load_config()
bus = MessageBus()
# Initialize and connect the MCP server registry.
mcp_configs = config.get("mcp_servers", [])
mcp_registry = MCPServerRegistry(mcp_configs)
logger.info("Connecting to MCP servers...")
await mcp_registry.connect_all()
logger.info("MCP servers connected.")
# Pass the registry to the coordinator so workers get tool access.
coordinator = CoordinatorAgent(config, bus, mcp_registry=mcp_registry)
mail_agent = MailAgent(config, bus)
calendar_agent = CalendarAgent(config, bus)
all_agents = [coordinator, mail_agent, calendar_agent]
try:
await asyncio.gather(
*[agent.start() for agent in all_agents]
)
finally:
for agent in all_agents:
await agent.stop()
await mcp_registry.disconnect_all()
logger.info("Octopussy Light has shut down cleanly.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutting down...")
Let us trace through what happens when a worker now uses a predefined tool. The user asks the coordinator to analyze a file. The coordinator delegates to a worker. The worker's _execute_with_tools() loop runs: the LLM sees the task and the list of available tools, decides to call read_file, and responds with the JSON tool-call format. The worker calls the MCP registry, which routes the call to the builtin tools server process via STDIO, receives the file contents, adds them to the worker's short-term memory, and lets the LLM reason about the content to produce the final answer. The user sees:
You: Please analyze the file /tmp/sales_data.csv and give me a summary.
Coordinator: Task assigned (ID: f3a9b1c2). A worker is now executing it.
Results will appear shortly.
[worker_a1b2c3] 10:00:16 INFO: Task f3a9b1c2: calling tool 'read_file'
with args: {'file_path': '/tmp/sales_data.csv'}
[TASK RESULT] Task f3a9b1c2 (response #1):
I have analyzed the sales data file. Here is the summary:
The CSV file contains 1,247 rows of sales transactions from Q3 2025.
Total revenue: $2,847,392. Top performing product: Widget Pro (34% of
revenue). The Western region outperformed all others by 23%. There is
a notable spike in sales during the week of September 15th, likely
corresponding to a promotional event. Average transaction value: $2,283.
Workers now have real-world capabilities through the four predefined tools. The next step is to give the system the ability to grow its own toolset dynamically.
CHAPTER EIGHT: INCREMENT SEVEN - THE TOOL AGENT
We have now built a working multi-agent system with mail monitoring, calendar monitoring, on-demand worker agents, and a set of predefined MCP tools. But those tools are fixed. If a worker needs to fetch weather data, call a company API, or perform a domain-specific calculation, no predefined tool exists for that. This is where the tool agent comes in.
The tool agent can dynamically create new MCP tools on demand. When an agent needs a capability that does not yet exist, it can request the tool agent to create one. The tool agent generates the tool code using an LLM, tests it in a sandboxed environment, and if the tests pass, adds the tool to the system's built-in tools server. The new tool is then available to all future workers.
# agents/tool_agent.py
#
# The Tool Agent for Octopussy Light.
#
# The tool agent can dynamically create new MCP tools on demand.
# When an agent needs a capability that does not exist, it can
# request the tool agent to create one. The tool agent:
# 1. Uses an LLM to generate the tool code.
# 2. Tests the generated code in a sandboxed subprocess.
# 3. If tests pass, adds the tool to the built-in tools server.
# 4. Notifies the coordinator that the tool is available.
import asyncio
import os
import subprocess
import tempfile
import textwrap
from typing import Dict, Optional
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from llm.llm_client import LLMClient
from utils.logger import get_logger
TOOL_AGENT_SYSTEM_PROMPT = """
You are a Python tool creation expert. Your job is to create MCP (Model
Context Protocol) tool functions for the Octopussy Light system.
When asked to create a tool, you must respond with ONLY valid Python code
that defines a single function decorated with @mcp.tool(). The function
must have:
- A clear, descriptive name (snake_case).
- Type annotations for all parameters and the return value.
- A comprehensive docstring explaining what the tool does, its parameters,
and what it returns.
- Robust error handling with try/except blocks.
- A return type of str (tools always return strings in MCP).
Do not include any imports, class definitions, or other code. Only the
function definition with the @mcp.tool() decorator.
Example of correct output:
@mcp.tool()
def get_current_weather(city: str) -> str:
\"\"\"
Get the current weather for a specified city.
Args:
city: The name of the city to get weather for.
Returns:
A string describing the current weather conditions.
\"\"\"
try:
import requests
response = requests.get(
f"https://wttr.in/{city}?format=3",
timeout=5
)
return response.text
except Exception as e:
return f"Error fetching weather: {str(e)}"
"""
class ToolAgent(BaseAgent):
"""
Dynamically creates new MCP tools for the Octopussy Light system.
When an agent needs a capability that does not exist, the tool agent
generates, tests, and deploys new tools without requiring manual
code changes or system restarts.
"""
def __init__(self, config: dict, bus: MessageBus):
super().__init__(
name="tool_agent",
bus=bus,
config=config,
color_key="tool_agent"
)
llm_config = config["llm"]
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["coordinator_model"],
system_prompt=TOOL_AGENT_SYSTEM_PROMPT
)
# Path to the built-in tools server file where new tools are added.
self.tools_server_path = "mcp_server/builtin_tools_server.py"
self.logger.info("Tool agent initialized.")
async def handle_message(self, message: Message):
"""
Handle tool creation requests from other agents.
The expected payload format is:
{
"request_type": "create_tool",
"tool_description": "A description of what the tool should do.",
"requester": "The name of the agent that needs the tool."
}
"""
if message.message_type == MessageType.AGENT_NOTIFICATION:
payload = message.payload
if payload.get("request_type") == "create_tool":
await self._handle_tool_creation_request(message)
else:
self.logger.debug(
f"Tool agent received unhandled message: {message.message_type}"
)
async def _handle_tool_creation_request(self, message: Message):
"""
Process a tool creation request.
Generates tool code, tests it, and if successful, adds it to
the built-in tools server.
"""
payload = message.payload
tool_description = payload.get("tool_description", "")
requester = payload.get("requester", "unknown")
self.logger.info(
f"Tool creation requested by '{requester}': "
f"{tool_description[:60]}..."
)
# Step 1: Generate the tool code using the LLM.
tool_code = await self._generate_tool_code(tool_description)
if not tool_code:
await self._notify_failure(
requester, tool_description, "LLM failed to generate tool code."
)
return
# Step 2: Test the generated code in a sandbox.
test_result = await self._test_tool_in_sandbox(tool_code)
if not test_result["success"]:
self.logger.warning(
f"Generated tool failed sandbox test: {test_result['error']}"
)
# Attempt to fix the code once before giving up.
tool_code = await self._fix_tool_code(tool_code, test_result["error"])
test_result = await self._test_tool_in_sandbox(tool_code)
if not test_result["success"]:
await self._notify_failure(
requester,
tool_description,
f"Tool failed sandbox tests: {test_result['error']}"
)
return
# Step 3: Add the tool to the built-in tools server.
success = await self._add_tool_to_server(tool_code)
if not success:
await self._notify_failure(
requester, tool_description, "Failed to add tool to server."
)
return
self.logger.info(
f"Successfully created and deployed new tool for: "
f"{tool_description[:60]}"
)
# Step 4: Notify the coordinator that the tool is ready.
notification = Message(
message_type=MessageType.AGENT_NOTIFICATION,
sender=self.name,
recipient="coordinator",
payload={
"notification_type": "tool_created",
"tool_description": tool_description,
"requester": requester,
"message": (
f"A new tool has been created and is now available: "
f"{tool_description}"
)
}
)
await self.bus.send(notification)
async def _generate_tool_code(self, tool_description: str) -> Optional[str]:
"""
Use the LLM to generate Python code for a new MCP tool.
Args:
tool_description: A natural language description of the tool.
Returns:
The generated Python function code, or None on failure.
"""
try:
code = await self.llm.chat([
{
"role": "user",
"content": (
f"Create an MCP tool function for the following "
f"purpose:\n\n{tool_description}\n\n"
f"Remember: return ONLY the function definition with "
f"the @mcp.tool() decorator. No imports, no other code."
)
}
])
# Strip any markdown code fences the LLM may have added.
# We strip trailing whitespace first so that a fence like
# "```\n" at the end is reliably detected as "```".
code = code.strip()
if code.startswith("```"):
lines = code.splitlines()
# Find the last line that is a closing fence.
end_index = len(lines)
for i in range(len(lines) - 1, 0, -1):
if lines[i].strip() == "```":
end_index = i
break
# Skip the opening fence line (e.g. "```python") and
# the closing fence line.
code = "\n".join(lines[1:end_index]).strip()
return code if code else None
except Exception as e:
self.logger.error(f"LLM tool generation failed: {e}")
return None
async def _test_tool_in_sandbox(self, tool_code: str) -> Dict:
"""
Test the generated tool code in a sandboxed subprocess.
Checks that the code is syntactically valid Python, defines
exactly one function, and that the function has a docstring.
The tool code is written to its own temporary file and the
path is passed to the test script via a command-line argument.
This avoids any string-embedding issues (such as triple-quote
collisions) that would arise from interpolating the code
directly into the test script source.
Args:
tool_code: The generated Python function code.
Returns:
A dict with 'success' (bool) and 'error' (str) keys.
"""
# The test script reads the tool code from a file whose path
# is passed as sys.argv[1], completely avoiding any quoting or
# escaping issues with the tool code content.
test_script = textwrap.dedent("""\
import ast
import sys
# Read the tool code from the file path passed as an argument.
tool_code_path = sys.argv[1]
with open(tool_code_path, "r", encoding="utf-8") as f:
tool_code = f.read()
# Test 1: Check that the code is valid Python syntax.
try:
tree = ast.parse(tool_code)
except SyntaxError as e:
print(f"SYNTAX_ERROR: {e}")
sys.exit(1)
# Test 2: Check that the code defines exactly one function.
functions = [
node for node in ast.walk(tree)
if isinstance(node, ast.FunctionDef)
]
if len(functions) != 1:
print(
f"STRUCTURE_ERROR: Expected 1 function, "
f"found {len(functions)}"
)
sys.exit(1)
# Test 3: Check that the function has a docstring.
func = functions[0]
has_docstring = (
func.body
and isinstance(func.body[0], ast.Expr)
and isinstance(func.body[0].value, ast.Constant)
and isinstance(func.body[0].value.value, str)
)
if not has_docstring:
print("DOCSTRING_ERROR: Function must have a docstring.")
sys.exit(1)
print("TESTS_PASSED")
""")
loop = asyncio.get_running_loop()
def _run_test():
# Both temp file paths are initialized to None before the
# try block so the finally clause can safely reference them
# even if file creation fails partway through.
tool_code_path = None
test_script_path = None
try:
# Write the tool code to its own temp file.
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".py",
delete=False,
encoding="utf-8"
) as tool_tmp:
tool_tmp.write(tool_code)
tool_code_path = tool_tmp.name
# Write the test script to a separate temp file.
with tempfile.NamedTemporaryFile(
mode="w",
suffix=".py",
delete=False,
encoding="utf-8"
) as script_tmp:
script_tmp.write(test_script)
test_script_path = script_tmp.name
result = subprocess.run(
["python", test_script_path, tool_code_path],
capture_output=True,
text=True,
timeout=15,
env={"PATH": os.environ.get("PATH", "")}
)
if "TESTS_PASSED" in result.stdout:
return {"success": True, "error": ""}
else:
error = result.stdout.strip() or result.stderr.strip()
return {"success": False, "error": error}
except subprocess.TimeoutExpired:
return {"success": False, "error": "Sandbox test timed out."}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
for path in (tool_code_path, test_script_path):
if path is not None:
try:
os.unlink(path)
except Exception:
pass
return await loop.run_in_executor(None, _run_test)
async def _fix_tool_code(
self,
original_code: str,
error_message: str
) -> str:
"""
Ask the LLM to fix a tool that failed sandbox testing.
Args:
original_code: The code that failed.
error_message: The error message from the sandbox test.
Returns:
The (hopefully) fixed code.
"""
try:
fixed = await self.llm.chat([
{
"role": "user",
"content": (
f"This MCP tool code failed with the following error:\n\n"
f"ERROR: {error_message}\n\n"
f"ORIGINAL CODE:\n{original_code}\n\n"
f"Please fix the code. Return ONLY the corrected "
f"function definition with the @mcp.tool() decorator."
)
}
])
fixed = fixed.strip()
if fixed.startswith("```"):
lines = fixed.splitlines()
end_index = len(lines)
for i in range(len(lines) - 1, 0, -1):
if lines[i].strip() == "```":
end_index = i
break
fixed = "\n".join(lines[1:end_index]).strip()
return fixed if fixed else original_code
except Exception as e:
self.logger.error(f"Tool fix attempt failed: {e}")
return original_code
async def _add_tool_to_server(self, tool_code: str) -> bool:
"""
Append the new tool code to the built-in tools server file.
The appended tool will be available after the MCP server process
is restarted. The MCPServerRegistry holds a live connection to
the currently running server process; a full restart-and-reconnect
cycle is required to expose the new tool to agents at runtime.
This is left as a production enhancement; for Octopussy Light,
the new tool is available from the next system startup.
Args:
tool_code: The validated tool function code to add.
Returns:
True if the tool was successfully appended, False otherwise.
"""
try:
with open(self.tools_server_path, "a", encoding="utf-8") as f:
f.write(f"\n\n# Dynamically generated tool\n{tool_code}\n")
self.logger.info(
f"Tool code appended to {self.tools_server_path}."
)
return True
except Exception as e:
self.logger.error(f"Failed to add tool to server: {e}")
return False
async def _notify_failure(
self,
requester: str,
tool_description: str,
reason: str
):
"""Notify the coordinator that tool creation failed."""
notification = Message(
message_type=MessageType.AGENT_NOTIFICATION,
sender=self.name,
recipient="coordinator",
payload={
"notification_type": "tool_creation_failed",
"tool_description": tool_description,
"requester": requester,
"reason": reason
}
)
await self.bus.send(notification)
self.logger.error(
f"Tool creation failed for '{tool_description[:60]}': {reason}"
)
The final coordinator adds the create_tool action alongside the existing create_task action. Here is the complete final version of coordinator_agent.py:
# coordinator/coordinator_agent.py (Final - Increment 7)
#
# Adds "create_tool" action handling so the coordinator can route
# tool creation requests to the tool agent. This is the complete,
# final version of the coordinator for the full Octopussy Light system.
import asyncio
import sys
import json
from typing import List, Dict, Optional
from agents.base_agent import BaseAgent
from messaging.message_bus import MessageBus
from messaging.message_types import Message, MessageType
from coordinator.worker_manager import WorkerManager
from llm.llm_client import LLMClient
from utils.logger import get_logger
logger = get_logger("coordinator", "coordinator")
COORDINATOR_SYSTEM_PROMPT = """
You are the coordinator of an agentic AI system called Octopussy Light.
Your job is to understand the user's requests, coordinate specialized
agents to fulfill those requests, and present results clearly.
When the user asks you to perform a task (something that requires research,
analysis, writing, coding, data retrieval, or other substantive work),
respond with a JSON object in this exact format:
{"action": "create_task", "task_description": "<the task>"}
When the user asks you to create a new tool or add a new capability to
the system, respond with:
{"action": "create_tool", "tool_description": "<what the tool should do>"}
For conversational messages, questions about the system, or simple requests
that you can answer directly from your knowledge, respond in plain text.
You are friendly, precise, and efficient.
"""
class CoordinatorAgent(BaseAgent):
"""
The central coordinator of Octopussy Light.
Handles user communication and coordinates all other agents.
"""
def __init__(
self,
config: dict,
bus: MessageBus,
mcp_registry=None
):
super().__init__(
name="coordinator",
bus=bus,
config=config,
color_key="coordinator"
)
llm_config = config["llm"]
self.llm = LLMClient(
base_url=llm_config["base_url"],
api_key=llm_config["api_key"],
model=llm_config["coordinator_model"],
system_prompt=COORDINATOR_SYSTEM_PROMPT
)
self.conversation_history: List[Dict[str, str]] = []
# Pass the MCP registry to the worker manager so every worker
# it creates will have access to the predefined tools.
self.worker_manager = WorkerManager(
config, bus, mcp_registry=mcp_registry
)
self.logger.info(
f"Coordinator initialized with model: "
f"{llm_config['coordinator_model']}"
)
async def handle_message(self, message: Message):
"""Handle messages from other agents."""
if message.message_type == MessageType.MAIL_NOTIFICATION:
await self._display_mail_notification(message.payload)
elif message.message_type == MessageType.CALENDAR_NOTIFICATION:
await self._display_calendar_notification(message.payload)
elif message.message_type == MessageType.TASK_RESULT:
await self._display_task_result(message.payload)
elif message.message_type == MessageType.TASK_ERROR:
await self._display_task_error(message.payload)
elif message.message_type == MessageType.AGENT_NOTIFICATION:
await self._display_agent_notification(message.payload)
async def _display_mail_notification(self, payload: dict):
print(
f"\n[MAIL] From: {payload.get('sender', 'Unknown')}\n"
f" Subject: {payload.get('subject', 'No subject')}\n"
f" Summary: {payload.get('summary', '')}\n"
)
async def _display_calendar_notification(self, payload: dict):
print(
f"\n[CALENDAR] Event: {payload.get('title', 'Untitled')}\n"
f" When: {payload.get('start_time', 'Unknown')}\n"
f" Location: {payload.get('location', 'Not specified')}\n"
)
async def _display_task_result(self, payload: dict):
task_id = payload.get("task_id", "unknown")
response_num = payload.get("response_number", 1)
result = payload.get("result", "")
print(
f"\n[TASK RESULT] Task {task_id} "
f"(response #{response_num}):\n{result}\n"
)
async def _display_task_error(self, payload: dict):
task_id = payload.get("task_id", "unknown")
error = payload.get("error", "Unknown error")
print(f"\n[TASK ERROR] Task {task_id} failed: {error}\n")
async def _display_agent_notification(self, payload: dict):
notification_type = payload.get("notification_type", "")
if notification_type == "tool_created":
print(
f"\n[TOOL AGENT] New tool created: "
f"{payload.get('tool_description', '')}\n"
)
elif notification_type == "tool_creation_failed":
print(
f"\n[TOOL AGENT] Tool creation failed: "
f"{payload.get('reason', 'Unknown reason')}\n"
)
else:
print(f"\n[AGENT] {payload.get('message', str(payload))}\n")
async def chat(self, user_input: str) -> str:
"""
Process user input and route to the appropriate handler.
The LLM decides whether the input is a task request, a tool
creation request, or a conversational message, and responds
with a JSON action object or plain text accordingly.
"""
self.conversation_history.append({
"role": "user",
"content": user_input
})
response = await self.llm.chat(self.conversation_history)
try:
parsed = json.loads(response.strip())
action = parsed.get("action")
if action == "create_task":
task_desc = parsed.get("task_description", user_input)
task_id = await self.worker_manager.assign_task(task_desc)
reply = (
f"Task assigned (ID: {task_id}). "
f"A worker is now executing it. "
f"Results will appear shortly."
)
self.conversation_history.append({
"role": "assistant",
"content": reply
})
return reply
elif action == "create_tool":
tool_desc = parsed.get("tool_description", "")
await self._request_tool_creation(tool_desc)
reply = (
f"I have asked the tool agent to create a new tool: "
f"'{tool_desc}'. You will be notified when it is ready."
)
self.conversation_history.append({
"role": "assistant",
"content": reply
})
return reply
except (json.JSONDecodeError, AttributeError):
pass
self.conversation_history.append({
"role": "assistant",
"content": response
})
return response
async def _request_tool_creation(self, tool_description: str):
"""Send a tool creation request to the tool agent."""
request = Message(
message_type=MessageType.AGENT_NOTIFICATION,
sender=self.name,
recipient="tool_agent",
payload={
"request_type": "create_tool",
"tool_description": tool_description,
"requester": self.name
}
)
await self.bus.send(request)
async def run_background_tasks(self):
"""Read user input asynchronously."""
loop = asyncio.get_running_loop()
print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")
while self.running:
try:
print("You: ", end="", flush=True)
user_input = await loop.run_in_executor(
None, sys.stdin.readline
)
user_input = user_input.strip()
if not user_input:
continue
if user_input.lower() in ("quit", "exit"):
self.running = False
print("Goodbye!")
break
response = await self.chat(user_input)
print(f"\nCoordinator: {response}\n")
except Exception as e:
self.logger.error(f"Error in input loop: {e}")
Let us trace through a tool creation scenario. The user asks for a weather tool. The coordinator's LLM responds with {"action": "create_tool", "tool_description": "Fetch current weather for a city"}. The coordinator sends a creation request to the tool agent. The tool agent asks its LLM to generate the function, tests the syntax and structure by writing the code to a temp file and running a separate test script against it, appends the validated code to the built-in tools server file, and notifies the coordinator:
You: I need a tool that can fetch the current weather for a city.
Coordinator: I have asked the tool agent to create a new tool:
'Fetch current weather for a city using the wttr.in service'.
You will be notified when it is ready.
[tool_agent] 10:01:30 INFO: Tool creation requested: Fetch current weather...
[tool_agent] 10:01:32 INFO: Generated tool code. Running sandbox tests...
[tool_agent] 10:01:33 INFO: Sandbox tests passed. Adding tool to server.
[tool_agent] 10:01:33 INFO: Tool code appended to builtin_tools_server.py.
[TOOL AGENT] New tool created: 'Fetch current weather for a city using
the wttr.in service'. The tool is now available as 'get_current_weather'
from the next system startup.
CHAPTER NINE: INCREMENT EIGHT - REMOTE MCP SERVERS
The final major feature of Octopussy Light is support for remote MCP servers. Users can configure external MCP servers in config.yaml, and the agents can use the tools those servers expose. This opens up a vast ecosystem of capabilities: specialized data sources, enterprise APIs, domain-specific tools, and more. The MCPServerRegistry we built in Increment 6 already handles multiple servers; we simply need to configure additional entries.
Let us expand the MCP server configuration in config.yaml to show the full range of options:
# config.yaml (expanded mcp_servers section for Increment 8)
mcp_servers:
# Built-in server (always present)
- name: "builtin"
type: "local"
command: ["python", "mcp_server/builtin_tools_server.py"]
# Example: a custom local MCP server for specialized tasks
# - name: "data_analysis"
# type: "local"
# command: ["python", "/path/to/data_analysis_mcp_server.py"]
# Example: a remote HTTP MCP server
# - name: "company_data"
# type: "http"
# url: "https://mcp.yourcompany.com/tools"
# api_key: "your_api_key_here"
The MCPServerRegistry already handles the routing logic: when a worker calls a tool by name, the registry looks up which server hosts that tool and routes the call there automatically. Adding a new server is purely a configuration change. No code changes are required.
When multiple servers are connected, the registry merges their tool lists. If two servers expose a tool with the same name, the registry logs a warning and the later server's tool takes precedence. This makes it straightforward to override a built-in tool with a custom implementation simply by adding a server that exposes a tool with the same name.
CHAPTER TEN: THE COMPLETE SYSTEM - FINAL ASSEMBLY
We have now built all the components of Octopussy Light. Let us put together the final, complete version of main.py that wires everything together, and then review the complete project structure:
# main.py (Final - Complete Octopussy Light System)
#
# This is the complete entry point for Octopussy Light.
# It initializes all components, connects to MCP servers,
# and runs all agents concurrently.
import asyncio
import yaml
import sys
from messaging.message_bus import MessageBus
from coordinator.coordinator_agent import CoordinatorAgent
from agents.mail_agent import MailAgent
from agents.calendar_agent import CalendarAgent
from agents.tool_agent import ToolAgent
from mcp_client.mcp_server_registry import MCPServerRegistry
from utils.logger import get_logger
logger = get_logger("system", "system")
def load_config(path: str = "config.yaml") -> dict:
"""Load and parse the YAML configuration file."""
try:
with open(path, "r") as f:
return yaml.safe_load(f)
except FileNotFoundError:
logger.error(f"Configuration file not found: {path}")
sys.exit(1)
except yaml.YAMLError as e:
logger.error(f"Error parsing configuration file: {e}")
sys.exit(1)
async def main():
"""
The complete Octopussy Light startup sequence.
1. Load configuration.
2. Initialize the message bus.
3. Connect to all configured MCP servers.
4. Initialize all agents with the MCP registry.
5. Run all agents concurrently.
6. Handle graceful shutdown on exit.
"""
logger.info("=" * 60)
logger.info("Starting Octopussy Light - Complete System")
logger.info("=" * 60)
config = load_config()
# Step 1: Initialize the message bus.
bus = MessageBus()
# Step 2: Initialize and connect the MCP server registry.
mcp_configs = config.get("mcp_servers", [])
mcp_registry = MCPServerRegistry(mcp_configs)
logger.info("Connecting to MCP servers...")
await mcp_registry.connect_all()
logger.info("MCP servers connected.")
# Step 3: Initialize all agents.
# The coordinator receives the MCP registry and passes it to the
# WorkerManager, which in turn passes it to each new worker agent.
coordinator = CoordinatorAgent(config, bus, mcp_registry=mcp_registry)
mail_agent = MailAgent(config, bus)
calendar_agent = CalendarAgent(config, bus)
tool_agent = ToolAgent(config, bus)
all_agents = [coordinator, mail_agent, calendar_agent, tool_agent]
logger.info("All agents initialized. Starting Octopussy Light...")
try:
# Step 4: Run all agents concurrently.
await asyncio.gather(
*[agent.start() for agent in all_agents]
)
finally:
# Step 5: Graceful shutdown.
logger.info("Stopping all agents...")
for agent in all_agents:
await agent.stop()
logger.info("Disconnecting from MCP servers...")
await mcp_registry.disconnect_all()
logger.info("Octopussy Light has shut down cleanly.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutting down Octopussy Light...")
The complete project structure for Octopussy Light is as follows:
octopussy_light/
config.yaml (configuration)
main.py (entry point)
requirements.txt (dependencies)
memory/ (created at runtime by ChromaDB)
worker_<id>/ (per-worker ChromaDB stores)
coordinator/
__init__.py
coordinator_agent.py (coordinator agent - final version)
worker_manager.py (worker lifecycle management)
prompt_optimizer.py (task prompt optimization)
agents/
__init__.py
base_agent.py (abstract base class)
mail_agent.py (email monitoring)
calendar_agent.py (calendar monitoring)
worker_agent.py (task execution with MCP tools)
tool_agent.py (dynamic tool creation)
messaging/
__init__.py
message_bus.py (asyncio-based message routing)
message_types.py (message type definitions)
llm/
__init__.py
llm_client.py (unified LLM client)
mcp_server/
builtin_tools_server.py (predefined MCP tools server)
mcp_client/
__init__.py
mcp_tool_client.py (MCP client for agents)
mcp_server_registry.py (multi-server management)
utils/
__init__.py
logger.py (colored logging)
All __init__.py files are empty unless otherwise shown. The memory/ directory is created automatically by ChromaDB when the first worker agent initializes its long-term memory store.
CHAPTER ELEVEN: A COMPLETE INTERACTION WALKTHROUGH
To make the full system concrete, let us walk through a complete interaction session that exercises all the major components.
The user starts Octopussy Light:
[system] 10:00:00 INFO: Starting Octopussy Light - Complete System
[system] 10:00:00 INFO: Connecting to MCP servers...
[mcp_registry] 10:00:01 INFO: Connected to MCP server 'builtin'.
Available tools: ['read_file', 'write_file', 'list_directory',
'execute_python']
[coordinator] 10:00:01 INFO: Coordinator initialized with model: llama3.2
[mail_agent] 10:00:01 INFO: Mail agent initialized. Polling every 60s.
[calendar_agent] 10:00:01 INFO: Calendar agent initialized.
[tool_agent] 10:00:01 INFO: Tool agent initialized.
Octopussy Light is ready. Type 'quit' or 'exit' to stop.
You: Hello! What can you do?
Coordinator: Hello! I am Octopussy Light, your personal agentic AI assistant.
Here is what I can do for you: I monitor your email inbox and notify you when
messages matching your configured filters arrive. I watch your calendar and
alert you to upcoming events. I can spawn worker agents to execute complex
tasks like research, writing, coding, and data analysis. Workers have access
to built-in tools including file reading and writing, directory listing, and
sandboxed Python code execution. I can also create entirely new tools on
demand if you need a capability that does not yet exist. What would you like
to do?
You: Please analyze the file /tmp/sales_data.csv and give me a summary.
Coordinator: Task assigned (ID: f3a9b1c2). A worker is now executing it.
Results will appear shortly.
[worker_f3a9b1] 10:00:15 INFO: Received task f3a9b1c2.
[worker_f3a9b1] 10:00:16 INFO: Task f3a9b1c2: calling tool 'read_file'
with args: {'file_path': '/tmp/sales_data.csv'}
[TASK RESULT] Task f3a9b1c2 (response #1):
I have analyzed the sales data file. Here is the summary:
The CSV file contains 1,247 rows of sales transactions from Q3 2025.
Total revenue: $2,847,392. Top performing product: Widget Pro (34% of
revenue). The Western region outperformed all others by 23%. There is
a notable spike in sales during the week of September 15th, likely
corresponding to a promotional event. Average transaction value: $2,283.
You: I need a tool that can convert Celsius to Fahrenheit.
Coordinator: I have asked the tool agent to create a new tool:
'Convert temperature values between Celsius and Fahrenheit'. You will
be notified when it is ready.
[tool_agent] 10:01:30 INFO: Tool creation requested.
[tool_agent] 10:01:32 INFO: Generated tool code. Running sandbox tests...
[tool_agent] 10:01:33 INFO: Sandbox tests passed. Adding tool to server.
[TOOL AGENT] New tool created: 'Convert temperature values between Celsius
and Fahrenheit'. The tool is now available as 'celsius_to_fahrenheit'
from the next system startup.
This walkthrough demonstrates the full system in action: conversational interaction with the coordinator, task delegation to a worker that uses predefined MCP tools, dynamic tool creation by the tool agent, and background notifications from the mail and calendar agents, all happening concurrently in a single terminal session.
CONCLUSION: WHAT YOU HAVE BUILT AND WHERE TO GO NEXT
You have built Octopussy Light from scratch, starting with a simple conversational coordinator and incrementally adding every major component of a production-grade agentic AI system. Let us take stock of what the complete system contains.
The coordinator agent serves as the brain of the system, communicating with the user, routing requests to the appropriate agents, displaying notifications, and managing the overall workflow. It uses an LLM to distinguish between conversational messages, task requests, and tool creation requests, and it delegates each to the appropriate handler.
The mail agent runs in the background, polling an IMAP inbox at a configurable interval, applying regular expression filters to incoming messages, summarizing matches with an LLM, and notifying the coordinator. It handles the full complexity of email parsing, including multipart messages and encoded headers.
The calendar agent monitors either a local .ics file or Google Calendar, checking for events that fall within a configurable notification window and alerting the coordinator in advance. It handles the asynchronous integration of synchronous APIs using thread pool executors.
The worker agents are the workhorses of the system. They are created on demand, reused when idle, and cleaned up when excess. Each worker has short-term memory (conversation history for the current task) and long-term memory (a ChromaDB vector store). Workers implement the ReAct agentic loop, reasoning about tasks, calling MCP tools, observing results, and continuing until they produce a final answer.
The predefined MCP tools — read_file, write_file, list_directory, and execute_python — give workers immediate real-world capabilities from the moment the system starts. These tools are exposed by the built-in MCP tools server and discovered automatically by the registry at startup.
The prompt optimizer improves task descriptions before they reach workers, using an LLM to add clarity, specificity, and structure to raw user requests. This consistently improves the quality of worker outputs.
The tool agent is the most remarkable component: it can create entirely new MCP tools on demand, using an LLM to generate the code, testing it safely by writing the code to a dedicated temp file and running a separate validation script against it, and deploying it to the system without any manual intervention. New tools are appended to the built-in tools server file and become available from the next system startup.
The MCP server registry manages connections to all configured MCP servers, both local and remote, providing a unified tool discovery and invocation interface to all agents. Adding a new server requires only a configuration change. The message bus ties everything together, routing messages between agents using asyncio queues with backpressure and clean shutdown support. The base agent's message loop guarantees that queue.task_done() is always called — even when message handling raises an exception — preventing any future use of queue.join() from blocking indefinitely.
Where should you go from here? There are several natural extensions to explore.
You could, for example,
add a web search tool to the built-in MCP server, giving workers the ability to retrieve current information from the internet,
implement the IMAP IDLE command in the mail agent to receive instant notifications instead of polling,
add a proper REST API to the coordinator so that external systems can send tasks and receive results programmatically,
containerize the system with Docker to make it portable and deployable. You could add more sophisticated memory management, such as automatic summarization of old conversation history and relevance-based pruning of long-term memories,
implement scheduled tasks, where a worker repeats a task periodically and sends multiple responses over time. You could also explore the MCP 2025-11-25 Tasks primitive to convert long-running worker tasks into proper async MCP tasks, enabling clients to poll for results rather than waiting for push notifications,
and should harden the code and add security means, because in Octopussy Light wa are using authentication credentials in raw text.
Octopussy Light is not just a tutorial project. It is a genuine, working foundation for building sophisticated agentic AI systems. The architecture is clean, the components are modular, and every piece is designed to be extended. The octopus has eight arms, and you have built all of them. Now it is time to teach them to swim.