Thursday, March 19, 2026

OCTOPUSSY LIGHT: A DEVELOPER'S GUIDE TO BUILDING AN AGENTIC AI SYSTEM FROM SCRATCH


 

You always wanted to build your OpenClaw like Agentic AI, but how does that work? The tutorial below is  written for developers who want to understand Agentic AI not just in theory, but by building something real, step by step, piece by piece.


HINT: The full source code of all code is available via  Octopussy Light GitHub. You do not need to type any code manually. But you can, of course! The Agentic AI system is called Octopussy Light -  I am working on a large OpenClaw alternative called Octopussy. See blog post Octopussy Article


FOREWORD: WHY "OCTOPUSSY LIGHT"?


An octopus is one of nature's most remarkable problem-solvers. It has a central brain, but each of its eight arms also has its own neural cluster capable of semi-autonomous action. The arms can taste, feel, and react independently, yet they coordinate beautifully with the central brain to accomplish complex tasks. This is a surprisingly good metaphor for an Agentic AI system: a coordinator at the center, specialized agents operating semi-autonomously at the periphery, all working together toward a common goal.


"Octopussy Light" is our educational, incremental implementation of exactly this kind of system. It is "light" because we are building it step by step, starting from the simplest possible foundation and adding complexity only when we are ready. By the time you finish this article, you will have built a fully functional multi-agent AI system that reads email, monitors calendars, spawns worker agents on demand, equips them with predefined tools, creates new tools dynamically, and communicates using the Model Context Protocol.


This is not a survey article. You will write real code. Every increment will run. Every concept will be demonstrated with working examples before we move on.


CHAPTER ONE: UNDERSTANDING THE TERRAIN


Before we write a single line of code, we need to understand what Agentic AI actually is and why it represents a fundamentally different way of thinking about software.


Traditional software is deterministic and explicit. You write a function, it takes inputs, it produces outputs, and the path from input to output is entirely specified by the programmer. If you want the software to do something new, you write new code. The software has no agency of its own.


A large language model, on its own, is something different but still limited in a specific way. It can reason, generate text, answer questions, and even write code. But it cannot act on the world. It cannot send an email, check a calendar, or execute a file. It is, in a sense, a very sophisticated oracle: you ask it questions, it gives you answers, and that is where its involvement ends.


Agentic AI closes this gap. An agent is an LLM that has been given tools and the ability to decide when and how to use them. It can look at a goal, reason about what steps are needed to achieve that goal, call the appropriate tools in the appropriate order, observe the results, and continue reasoning until the goal is accomplished. This is what makes an agent fundamentally different from a chatbot: it can act, not just respond.


A multi-agent system takes this further. Instead of one agent trying to do everything, you have multiple specialized agents, each with its own tools, its own memory, and its own area of responsibility. A coordinator agent manages the overall workflow and communicates with the user. A mail agent monitors incoming email. A calendar agent watches for upcoming events. Worker agents execute specific tasks on demand, equipped first with a set of predefined built-in tools, and later with the ability to create entirely new tools dynamically. A tool agent handles that dynamic creation.


This specialization is powerful for the same reason that human organizations use specialization: a focused agent with the right tools and the right context will outperform a generalist agent trying to juggle everything at once.


Now, there is one more piece of the puzzle that we need to understand before we start coding: the Model Context Protocol, or MCP.


THE MODEL CONTEXT PROTOCOL: THE LANGUAGE AGENTS SPEAK


When you build a multi-agent system, one of the first problems you encounter is the question of how agents communicate with tools. Do you write custom code for every tool? Do you use a proprietary API format? Do you invent your own protocol?


The Model Context Protocol, first introduced by Anthropic in November 2024, answers this question with a resounding "none of the above." MCP is an open standard now stewarded by the Agentic AI Foundation (AAIF), a directed fund under the Linux Foundation, co-founded by Anthropic, Block, and OpenAI, with support from Google, Microsoft, AWS, and others. Think of it as the USB standard for AI tools: just as USB lets you plug any compliant device into any compliant port without writing custom drivers, MCP lets any compliant AI agent use any compliant tool without writing custom integration code.


MCP is built on JSON-RPC 2.0, a lightweight remote procedure call protocol that uses JSON for message formatting. It supports multiple transport mechanisms: STDIO for local communication between processes on the same machine, Server-Sent Events for web-based real-time communication, and Streamable HTTP for communication with remote servers. The protocol uses string-based version identifiers in the format YYYY-MM-DD, and the version we will be targeting in Octopussy Light is the November 25, 2025 specification, which is the latest stable version at the time of writing.


The 2025-11-25 specification represents a significant maturation of MCP beyond its original synchronous tool-calling roots. Its most notable additions include the Tasks primitive, which enables asynchronous long-running operations where a client receives a task handle immediately and fetches results later; simplified OAuth authorization via Client ID Metadata Documents replacing the older Dynamic Client Registration approach; Extensions as a formal concept for faster protocol innovation without bloating the core; Sampling with Tools, which allows servers to initiate sampling requests that include tool definitions and enables server-side agent loops; and a range of enterprise security and governance improvements. For Octopussy Light, we will primarily use the stable tool-calling and STDIO transport features that have been part of MCP since its inception, while the architecture is designed to accommodate the newer async Task capabilities as a future extension.


An MCP server exposes three kinds of things to clients. Tools are callable functions that can execute code or produce side effects, analogous to POST endpoints in a REST API. Resources are data sources that can be read, analogous to GET endpoints. Prompts are reusable templates for LLM interactions. In Octopussy Light, we will primarily be working with tools, since our agents need to act on the world, not just read from it.


The Python MCP SDK makes building MCP servers and clients straightforward. We will use it throughout this article. The key package is simply called "mcp" and can be installed with pip.


Now let us talk about the LLM backbone of our system.


CHOOSING YOUR LLM: LOCAL AND REMOTE OPTIONS


One of the design goals of Octopussy Light is that the LLM powering each agent should be swappable without touching the agent's core logic. This is important because the LLM landscape is evolving rapidly. The model that is best today may not be best in six months, and different agents in your system may benefit from different models. A coordinator agent that needs to understand nuanced user requests might benefit from a powerful cloud model, while a worker agent doing structured data extraction might work perfectly well with a smaller local model.


We will support two categories of LLMs. For local models, we will use Ollama, which provides a simple way to run models like Llama 3.2, Mistral, and others on your own hardware. Ollama exposes an OpenAI-compatible API, which means we can use the same client code for both local and remote models. For remote models, we will support any OpenAI-compatible endpoint, which covers OpenAI's GPT-4o, Anthropic's Claude via compatible wrappers, and many others.


The key to making models swappable is a configuration file. We will define the model for each agent in a configuration file, and the agent code will read this configuration at startup. Changing the model for any agent is then a matter of editing one line in the configuration file, with no code changes required.


Note: If you are configuring Octopussy Light with a local LLM, you may use the code examples in the following increments of Octopussy Light on a machine (server, notebook, desktop) without GPU. However, the local LLM will respond very slowly on CPU-only machines.


Let us now set up our development environment and write the very first piece of Octopussy Light.


CHAPTER TWO: INCREMENT ONE - THE FOUNDATION


Our first increment is deliberately minimal. We are going to build the skeleton of the coordinator agent: a simple loop that accepts user input, sends it to an LLM, and prints the response. This is not yet an agent in the full sense, but it establishes the foundation on which everything else will be built.


First, let us set up the project structure. Create a directory called octopussy_light and inside it create the following files and subdirectories:


octopussy_light/

    config.yaml

    main.py

    coordinator/

        __init__.py

        coordinator_agent.py

    llm/

        __init__.py

        llm_client.py

    utils/

        __init__.py

        logger.py


The config.yaml file is where we define all configurable parameters. Here is the initial version. Do not forget in this and the upcoming versions of config.yaml to enter information specific to your environment, such as for LLM access, Mail access, Calendar access.


# config.yaml

# Octopussy Light - Main Configuration File

#

# This file controls all configurable aspects of the system.

# Change model names here without touching any Python code.


llm:

  # Provider can be "ollama" (local) or "openai" (remote/cloud)

  provider: "ollama"


  # For Ollama: the model name as it appears in "ollama list"

  # For OpenAI: the model name as it appears in the OpenAI API

  coordinator_model: "llama3.2"

  worker_model: "llama3.2"


  # For Ollama, the base URL is typically http://localhost:11434

  # For OpenAI, use https://api.openai.com/v1

  base_url: "http://localhost:11434"


  # API key: required for OpenAI, can be "ollama" for local Ollama

  api_key: "ollama"


coordinator:

  system_prompt: |

    You are the coordinator of an agentic AI system called Octopussy Light.

    Your job is to understand the user's requests, coordinate specialized

    agents to fulfill those requests, and present results clearly and

    helpfully. You are friendly, precise, and efficient.


Notice that the configuration separates the LLM provider, the model name, the base URL, and the API key. This four-part combination is all we need to switch between a local Ollama model and a remote cloud model. The coordinator and worker agents can even use different models if desired.

Now let us build the LLM client. This is the module that all agents will use to communicate with their underlying language model:


# llm/llm_client.py

#

# A unified LLM client that works with both local Ollama models

# and remote OpenAI-compatible endpoints. The model and provider

# are determined entirely by the configuration, not by this code.


from openai import AsyncOpenAI

from typing import List, Dict, Optional



class LLMClient:

    """

    A thin, async wrapper around the OpenAI-compatible API.


    Because Ollama exposes an OpenAI-compatible REST API, we can use

    the same client code for both local and remote models. The only

    difference is the base_url and api_key, which come from config.

    """


    def __init__(

        self,

        base_url: str,

        api_key: str,

        model: str,

        system_prompt: Optional[str] = None

    ):

        """

        Initialize the LLM client.


        Args:

            base_url: The base URL of the LLM API endpoint.

                      Use http://localhost:11434/v1 for Ollama,

                      or https://api.openai.com/v1 for OpenAI.

            api_key:  The API key. Use "ollama" for local Ollama.

            model:    The model identifier (e.g., "llama3.2", "gpt-4o").

            system_prompt: An optional system prompt that will be

                           prepended to every conversation.

        """

        self.model = model

        self.system_prompt = system_prompt


        # The AsyncOpenAI client works with any OpenAI-compatible endpoint.

        # Ollama's API is compatible with the OpenAI format when accessed

        # at the /v1 path, so we append /v1 if not already present.

        if not base_url.endswith("/v1"):

            base_url = base_url.rstrip("/") + "/v1"


        self.client = AsyncOpenAI(

            base_url=base_url,

            api_key=api_key

        )


    async def chat(

        self,

        messages: List[Dict[str, str]],

        temperature: float = 0.7,

        max_tokens: Optional[int] = None

    ) -> str:

        """

        Send a list of messages to the LLM and return the response text.


        Args:

            messages: A list of message dicts with "role" and "content" keys.

                      Roles can be "user", "assistant", or "system".

            temperature: Controls randomness. Lower values are more

                         deterministic; higher values are more creative.

            max_tokens: Optional limit on the length of the response.


        Returns:

            The text content of the LLM's response.

        """

        # Prepend the system prompt if one was configured.

        full_messages = []

        if self.system_prompt:

            full_messages.append({

                "role": "system",

                "content": self.system_prompt

            })

        full_messages.extend(messages)


        response = await self.client.chat.completions.create(

            model=self.model,

            messages=full_messages,

            temperature=temperature,

            max_tokens=max_tokens

        )


        return response.choices[0].message.content


The LLMClient is deliberately simple. It wraps the AsyncOpenAI client, prepends a system prompt when one is configured, and exposes a single async chat method. Every agent in Octopussy Light will use this client to talk to its underlying model.


Now let us build the logger, because good logging is essential in a multi-agent system where many things are happening concurrently and you need to understand what is going on:


# utils/logger.py

#

# A simple, colored console logger for Octopussy Light.

# Each component uses a different color so you can visually

# distinguish coordinator output from agent output at a glance.


import logging

import sys

from typing import Optional



# ANSI color codes for terminal output

COLORS = {

    "coordinator": "\033[94m",  # Blue

    "mail_agent":  "\033[92m",  # Green

    "calendar":    "\033[93m",  # Yellow

    "worker":      "\033[95m",  # Magenta

    "tool_agent":  "\033[96m",  # Cyan

    "system":      "\033[91m",  # Red

    "reset":       "\033[0m"

}



def get_logger(name: str, color_key: Optional[str] = None) -> logging.Logger:

    """

    Create and return a named logger with optional color coding.


    Args:

        name:      The logger name, typically the component name.

        color_key: A key from the COLORS dict to color the output.


    Returns:

        A configured logging.Logger instance.

    """

    logger = logging.getLogger(name)


    if not logger.handlers:

        handler = logging.StreamHandler(sys.stdout)

        color = COLORS.get(color_key, "") if color_key else ""

        reset = COLORS["reset"] if color_key else ""


        formatter = logging.Formatter(

            f"{color}[%(name)s] %(asctime)s %(levelname)s: %(message)s{reset}",

            datefmt="%H:%M:%S"

        )

        handler.setFormatter(formatter)

        logger.addHandler(handler)

        logger.setLevel(logging.DEBUG)


    return logger


Now let us build the coordinator agent in its initial, minimal form:


# coordinator/coordinator_agent.py

#

# The Coordinator Agent - Increment 1

#

# In this first increment, the coordinator is simply a conversational

# agent that maintains a conversation history and talks to the user.

# We will add coordination capabilities in later increments.


import asyncio

from typing import List, Dict

import yaml


from llm.llm_client import LLMClient

from utils.logger import get_logger


logger = get_logger("coordinator", "coordinator")



class CoordinatorAgent:

    """

    The central coordinator of Octopussy Light.


    The coordinator is responsible for two things: communicating with

    the user and coordinating all other agents in the system. In this

    first increment, we implement only the user communication part.

    """


    def __init__(self, config: Dict):

        """

        Initialize the coordinator with the application configuration.


        Args:

            config: The parsed contents of config.yaml.

        """

        llm_config = config["llm"]

        coordinator_config = config["coordinator"]


        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["coordinator_model"],

            system_prompt=coordinator_config["system_prompt"]

        )


        # The conversation history is the coordinator's short-term memory.

        # It accumulates the full dialogue so the LLM has context.

        self.conversation_history: List[Dict[str, str]] = []


        logger.info(

            f"Coordinator initialized with model: {llm_config['coordinator_model']}"

        )


    async def chat(self, user_input: str) -> str:

        """

        Process a user message and return the coordinator's response.


        The conversation history is maintained across calls, giving

        the LLM context about the ongoing conversation.


        Args:

            user_input: The raw text input from the user.


        Returns:

            The coordinator's response as a string.

        """

        # Add the user's message to the conversation history.

        self.conversation_history.append({

            "role": "user",

            "content": user_input

        })


        logger.debug(f"User said: {user_input}")


        # Send the full conversation history to the LLM.

        response = await self.llm.chat(self.conversation_history)


        # Add the assistant's response to the history for future context.

        self.conversation_history.append({

            "role": "assistant",

            "content": response

        })


        logger.debug(f"Coordinator responded: {response[:80]}...")


        return response


Finally, let us write the main entry point that ties everything together:


# main.py

#

# Octopussy Light - Main Entry Point

#

# This file loads the configuration and starts the main interaction loop.

# Run this file to start Octopussy Light.


import asyncio

import yaml

import sys


from coordinator.coordinator_agent import CoordinatorAgent

from utils.logger import get_logger


logger = get_logger("system", "system")



def load_config(path: str = "config.yaml") -> dict:

    """

    Load and parse the YAML configuration file.


    Args:

        path: Path to the configuration file.


    Returns:

        The parsed configuration as a Python dictionary.

    """

    try:

        with open(path, "r") as f:

            return yaml.safe_load(f)

    except FileNotFoundError:

        logger.error(f"Configuration file not found: {path}")

        sys.exit(1)

    except yaml.YAMLError as e:

        logger.error(f"Error parsing configuration file: {e}")

        sys.exit(1)



async def main():

    """

    The main async entry point for Octopussy Light.


    Loads configuration, initializes the coordinator, and runs

    the interactive conversation loop.

    """

    logger.info("Starting Octopussy Light...")


    config = load_config()

    coordinator = CoordinatorAgent(config)


    print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")


    while True:

        try:

            # Read user input synchronously. In later increments,

            # we will make this non-blocking so background agents

            # can deliver notifications while the user is idle.

            user_input = input("You: ").strip()


            if not user_input:

                continue


            if user_input.lower() in ("quit", "exit"):

                print("Goodbye!")

                break


            response = await coordinator.chat(user_input)

            print(f"\nCoordinator: {response}\n")


        except Exception as e:

            logger.error(f"Unexpected error: {e}")



if __name__ == "__main__":

    try:

        asyncio.run(main())

    except KeyboardInterrupt:

        print("\nGoodbye!")



To run this first increment, you need to install the required packages. Create a requirements.txt file:


# requirements.txt

# Core dependencies for Octopussy Light


openai>=1.0.0          # OpenAI-compatible async client (works with Ollama too)

pyyaml>=6.0            # YAML configuration parsing

mcp>=1.0.0             # Model Context Protocol SDK

aioimaplib>=1.0.0      # Async IMAP email reading

google-api-python-client>=2.0.0  # Google Calendar API

google-auth-oauthlib>=1.0.0      # Google OAuth

google-auth-httplib2>=0.2.0      # Google Auth HTTP transport

chromadb>=1.0.0        # Vector database for long-term memory

icalendar>=5.0.0       # Local .ics calendar file parsing

pytz>=2024.1           # Timezone handling for calendar events


Install them with:


pip install -r requirements.txt


Make sure Ollama is running and you have pulled the llama3.2 model:


ollama pull llama3.2


Then run the system:


python main.py


You should see something like this in your terminal:


[system] 10:23:01 INFO: Starting Octopussy Light...

[coordinator] 10:23:01 INFO: Coordinator initialized with model: llama3.2


Octopussy Light is ready. Type 'quit' or 'exit' to stop.


You: Hello, what can you do?


Coordinator: Hello! I am the coordinator of Octopussy Light, an agentic AI

system. I can help you manage your email, keep track of your calendar, and

execute tasks using specialized worker agents. What would you like to do today?


You:


This is increment one: a working conversational coordinator. It is not yet an agent in the full sense, but the foundation is solid. The LLM is configurable, the conversation history is maintained, and the code is clean and extensible. Now let us add the first real agent capability.


CHAPTER THREE: INCREMENT TWO - ASYNCIO AND THE MESSAGE BUS


Before we add specialized agents, we need to solve a fundamental architectural problem. Our current main loop blocks on user input. While the user is thinking and typing, nothing else can happen. But in a real multi-agent system, things need to happen in the background: the mail agent needs to check for new email, the calendar agent needs to monitor upcoming events, and worker agents need to execute tasks. All of this must happen concurrently with the user interaction.


The solution is Python's asyncio library combined with an internal message bus. We will use asyncio to run all agents concurrently as coroutines, and we will use asyncio.Queue objects to let agents communicate with each other without tight coupling. This is the classic producer-consumer pattern applied to agent communication.


The architecture looks like this:


User Input

    |

    v

[Coordinator Agent] <----> [Message Bus (asyncio.Queue)]

    ^                              |

    |                    +--------+--------+

    |                    |        |        |

    |              [Mail Agent] [Calendar] [Workers]

    |                    |        |        |

    +--------------------+--------+--------+

                (all agents send to coordinator's inbox)


Each agent has its own inbox queue. The coordinator has an inbox queue where all other agents send their notifications and results. The coordinator also has outbox queues for sending tasks to specific agents. This decoupled design means agents do not need to know about each other directly; they only need to know about the message bus.

Let us update the project structure to include the message bus:


octopussy_light/

    config.yaml

    main.py

    coordinator/

        __init__.py

        coordinator_agent.py

    agents/

        __init__.py

        base_agent.py         (new)

        mail_agent.py         (new)

    messaging/

        __init__.py

        message_bus.py        (new)

        message_types.py      (new)

    llm/

        __init__.py

        llm_client.py

    utils/

        __init__.py

        logger.py


The message types module defines the structure of all messages that flow through the system. Using typed messages rather than raw dictionaries makes the code much easier to understand and maintain:


# messaging/message_types.py

#

# Defines all message types that flow through the Octopussy Light

# message bus. Using dataclasses gives us type safety and clear

# documentation of what each message contains.


from dataclasses import dataclass, field

from typing import Any, Dict, Optional

from enum import Enum

import uuid

import time



class MessageType(Enum):

    """All possible message types in the Octopussy Light system."""


    # Messages from agents to the coordinator

    AGENT_NOTIFICATION    = "agent_notification"   # General notification

    MAIL_NOTIFICATION     = "mail_notification"    # New email matched filters

    CALENDAR_NOTIFICATION = "calendar_notification" # Upcoming calendar event

    TASK_RESULT           = "task_result"          # Worker completed a task

    TASK_ERROR            = "task_error"           # Worker failed a task

    TASK_PROGRESS         = "task_progress"        # Worker partial result


    # Messages from the coordinator to agents

    ASSIGN_TASK           = "assign_task"          # Give a task to a worker

    CONFIGURE_AGENT       = "configure_agent"      # Update agent configuration

    SHUTDOWN_AGENT        = "shutdown_agent"       # Tell an agent to stop



@dataclass

class Message:

    """

    The universal message type for inter-agent communication.


    Every message in the system is an instance of this class.

    The payload field carries message-type-specific data as a dict.

    """


    # The type of this message, from the MessageType enum.

    message_type: MessageType


    # The name of the agent that sent this message.

    sender: str


    # The name of the intended recipient agent.

    recipient: str


    # The message payload. Structure depends on message_type.

    payload: Dict[str, Any] = field(default_factory=dict)


    # A unique identifier for this message, auto-generated.

    message_id: str = field(default_factory=lambda: str(uuid.uuid4()))


    # Unix timestamp of when this message was created.

    timestamp: float = field(default_factory=time.time)


    # Optional: the task ID this message relates to.

    task_id: Optional[str] = None


Now let us build the message bus itself. The message bus is the central communication hub of the system:


# messaging/message_bus.py

#

# The Octopussy Light Message Bus

#

# The message bus manages a set of named queues, one per agent.

# Agents register themselves to get an inbox queue, then send

# messages to other agents by name. The bus routes messages to

# the correct queue automatically.


import asyncio

from typing import Dict

from utils.logger import get_logger

from messaging.message_types import Message


logger = get_logger("message_bus", "system")



class MessageBus:

    """

    A simple, asyncio-based message bus for inter-agent communication.


    Each agent registers with the bus and receives its own asyncio.Queue

    as an inbox. Sending a message to another agent is as simple as

    calling send() with the message (which contains the recipient's name).

    """


    def __init__(self):

        # Maps agent names to their inbox queues.

        self._queues: Dict[str, asyncio.Queue] = {}

        logger.info("Message bus initialized.")


    def register(self, agent_name: str, maxsize: int = 100) -> asyncio.Queue:

        """

        Register an agent and create its inbox queue.


        Args:

            agent_name: The unique name of the agent.

            maxsize:    Maximum number of messages the queue can hold.

                        0 means unlimited. Default is 100 to prevent

                        runaway memory usage.


        Returns:

            The asyncio.Queue that serves as this agent's inbox.

        """

        if agent_name in self._queues:

            logger.warning(

                f"Agent '{agent_name}' is already registered. "

                f"Returning existing queue."

            )

            return self._queues[agent_name]


        queue = asyncio.Queue(maxsize=maxsize)

        self._queues[agent_name] = queue

        logger.info(f"Agent '{agent_name}' registered with message bus.")

        return queue


    def unregister(self, agent_name: str):

        """

        Remove an agent's inbox queue from the bus.


        Args:

            agent_name: The name of the agent to unregister.

        """

        if agent_name in self._queues:

            del self._queues[agent_name]

            logger.info(f"Agent '{agent_name}' unregistered from message bus.")


    async def send(self, message: Message):

        """

        Send a message to the recipient specified in message.recipient.


        If the recipient's queue is full, this will block until

        space becomes available (backpressure). If the recipient

        is not registered, logs an error and discards the message.


        Args:

            message: The Message object to send.

        """

        recipient = message.recipient

        if recipient not in self._queues:

            logger.error(

                f"Cannot deliver message to '{recipient}': "

                f"agent not registered. Message from '{message.sender}' "

                f"of type '{message.message_type.value}' discarded."

            )

            return


        await self._queues[recipient].put(message)

        logger.debug(

            f"Message routed: {message.sender} -> {recipient} "

            f"[{message.message_type.value}]"

        )


    def is_registered(self, agent_name: str) -> bool:

        """Check whether an agent is registered with the bus."""

        return agent_name in self._queues


With the message bus in place, we can now define a base class for all agents. This base class handles the common pattern of running in the background, receiving messages from the bus, and dispatching them to handler methods:


# agents/base_agent.py

#

# The Base Agent class for all Octopussy Light agents.

#

# Every agent in the system inherits from this class. It provides

# the common infrastructure for registering with the message bus,

# running as an asyncio task, and processing incoming messages.


import asyncio

from abc import ABC, abstractmethod

from typing import Optional


from messaging.message_bus import MessageBus

from messaging.message_types import Message

from utils.logger import get_logger



class BaseAgent(ABC):

    """

    Abstract base class for all Octopussy Light agents.


    Subclasses must implement the handle_message() method to define

    how they respond to incoming messages. They may also override

    run_background_tasks() to perform periodic background work

    (like checking email or monitoring a calendar).

    """


    def __init__(

        self,

        name: str,

        bus: MessageBus,

        config: dict,

        color_key: Optional[str] = None

    ):

        """

        Initialize the base agent.


        Args:

            name:      The unique name of this agent on the message bus.

            bus:       The shared MessageBus instance.

            config:    The full application configuration dictionary.

            color_key: The color key for log output (from utils/logger.py).

        """

        self.name = name

        self.bus = bus

        self.config = config

        self.logger = get_logger(name, color_key)

        self.running = False


        # Register with the message bus to get our inbox queue.

        self.inbox = bus.register(name)


        self.logger.info(f"Agent '{name}' initialized.")


    async def start(self):

        """

        Start the agent's main loop and any background tasks.


        This method runs indefinitely until stop() is called.

        It concurrently runs the message processing loop and

        any background tasks defined by the subclass.

        """

        self.running = True

        self.logger.info(f"Agent '{self.name}' starting.")


        # Run the message loop and background tasks concurrently.

        await asyncio.gather(

            self._message_loop(),

            self.run_background_tasks()

        )


    async def stop(self):

        """Signal the agent to stop gracefully."""

        self.running = False

        self.logger.info(f"Agent '{self.name}' stopping.")


    async def _message_loop(self):

        """

        The main message processing loop.


        Continuously reads messages from the inbox queue and

        dispatches them to handle_message(). Stops when self.running

        becomes False and the queue is empty.


        task_done() is called in a finally block to guarantee it is

        always called even if handle_message() raises an exception,

        preventing queue.join() from blocking indefinitely.

        """

        while self.running:

            try:

                # Wait up to 0.1 seconds for a message, then loop

                # to check self.running again. This allows clean shutdown.

                message = await asyncio.wait_for(

                    self.inbox.get(),

                    timeout=0.1

                )

                try:

                    await self.handle_message(message)

                except Exception as e:

                    self.logger.error(f"Error processing message: {e}")

                finally:

                    # Always mark the task done, even if handle_message raised.

                    self.inbox.task_done()

            except asyncio.TimeoutError:

                # No message arrived; just loop and check self.running.

                continue


    @abstractmethod

    async def handle_message(self, message: Message):

        """

        Handle an incoming message from the message bus.


        Subclasses must implement this method to define their

        response to different message types.


        Args:

            message: The incoming Message object.

        """

        pass


    async def run_background_tasks(self):

        """

        Override this method to run periodic background tasks.


        The default implementation does nothing. Subclasses like

        the mail agent and calendar agent override this to perform

        their periodic monitoring work.

        """

        pass


    async def send(self, message: Message):

        """

        Convenience method to send a message via the bus.


        The recipient is taken from message.recipient, so no

        separate recipient argument is needed.


        Args:

            message: The Message to send. message.recipient must be

                     set to the name of the intended recipient agent.

        """

        await self.bus.send(message)


Now we need to update the coordinator to use the message bus and run as an asyncio task alongside other agents. We also need to update the main loop to be non-blocking. Let us rewrite the coordinator and the main entry point:


# coordinator/coordinator_agent.py

#

# The Coordinator Agent - Increment 2

#

# Now the coordinator runs as an asyncio task alongside other agents.

# It processes user input AND handles messages arriving from other agents.

# The input loop is now non-blocking, using asyncio to read stdin.


import asyncio

import sys

from typing import List, Dict


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from llm.llm_client import LLMClient

from utils.logger import get_logger


logger = get_logger("coordinator", "coordinator")



class CoordinatorAgent(BaseAgent):

    """

    The central coordinator of Octopussy Light.


    Handles user communication and coordinates all other agents.

    Runs concurrently with other agents via asyncio.

    """


    def __init__(self, config: dict, bus: MessageBus):

        super().__init__(

            name="coordinator",

            bus=bus,

            config=config,

            color_key="coordinator"

        )


        llm_config = config["llm"]

        coordinator_config = config["coordinator"]


        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["coordinator_model"],

            system_prompt=coordinator_config["system_prompt"]

        )


        # Short-term memory: the conversation history with the user.

        self.conversation_history: List[Dict[str, str]] = []


        self.logger.info(

            f"Coordinator initialized with model: "

            f"{llm_config['coordinator_model']}"

        )


    async def handle_message(self, message: Message):

        """

        Handle messages arriving from other agents.


        In this increment, we simply display the notification to the user.

        In later increments, we will handle task results, errors, etc.

        """

        if message.message_type == MessageType.MAIL_NOTIFICATION:

            await self._display_mail_notification(message.payload)

        elif message.message_type == MessageType.CALENDAR_NOTIFICATION:

            await self._display_calendar_notification(message.payload)

        elif message.message_type == MessageType.TASK_RESULT:

            await self._display_task_result(message.payload)

        elif message.message_type == MessageType.TASK_ERROR:

            await self._display_task_error(message.payload)

        else:

            self.logger.warning(

                f"Received unhandled message type: {message.message_type}"

            )


    async def _display_mail_notification(self, payload: dict):

        """Display a mail notification to the user."""

        print(

            f"\n[MAIL] From: {payload.get('sender', 'Unknown')}\n"

            f"       Subject: {payload.get('subject', 'No subject')}\n"

            f"       Summary: {payload.get('summary', '')}\n"

        )


    async def _display_calendar_notification(self, payload: dict):

        """Display a calendar event notification to the user."""

        print(

            f"\n[CALENDAR] Event: {payload.get('title', 'Untitled')}\n"

            f"           When: {payload.get('start_time', 'Unknown')}\n"

            f"           Location: {payload.get('location', 'Not specified')}\n"

            f"           Description: {payload.get('description', '')}\n"

        )


    async def _display_task_result(self, payload: dict):

        """Display a task result to the user."""

        task_id = payload.get("task_id", "unknown")

        response_num = payload.get("response_number", 1)

        result = payload.get("result", "")

        print(

            f"\n[TASK RESULT] Task {task_id} "

            f"(response #{response_num}):\n{result}\n"

        )


    async def _display_task_error(self, payload: dict):

        """Display a task error to the user."""

        task_id = payload.get("task_id", "unknown")

        error = payload.get("error", "Unknown error")

        print(f"\n[TASK ERROR] Task {task_id} failed: {error}\n")


    async def chat(self, user_input: str) -> str:

        """

        Process user input and return the coordinator's response.

        """

        self.conversation_history.append({

            "role": "user",

            "content": user_input

        })


        response = await self.llm.chat(self.conversation_history)


        self.conversation_history.append({

            "role": "assistant",

            "content": response

        })


        return response


    async def run_background_tasks(self):

        """

        The coordinator's background task: reading user input asynchronously.


        We use asyncio to read from stdin without blocking the event loop,

        so that agent notifications can be displayed even while the user

        is idle or typing.

        """

        loop = asyncio.get_running_loop()

        print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")


        while self.running:

            try:

                # Read a line from stdin in a thread pool so we don't

                # block the asyncio event loop.

                print("You: ", end="", flush=True)

                user_input = await loop.run_in_executor(

                    None, sys.stdin.readline

                )

                user_input = user_input.strip()


                if not user_input:

                    continue


                if user_input.lower() in ("quit", "exit"):

                    self.running = False

                    print("Goodbye!")

                    break


                response = await self.chat(user_input)

                print(f"\nCoordinator: {response}\n")


            except Exception as e:

                self.logger.error(f"Error in input loop: {e}")


Now update main.py to create the message bus and start the coordinator as an asyncio task:


# main.py

#

# Octopussy Light - Main Entry Point (Increment 2)

#

# Now uses asyncio to run the coordinator as a concurrent task,

# enabling background agents to run alongside user interaction.


import asyncio

import yaml

import sys


from messaging.message_bus import MessageBus

from coordinator.coordinator_agent import CoordinatorAgent

from utils.logger import get_logger


logger = get_logger("system", "system")



def load_config(path: str = "config.yaml") -> dict:

    """Load and parse the YAML configuration file."""

    try:

        with open(path, "r") as f:

            return yaml.safe_load(f)

    except FileNotFoundError:

        logger.error(f"Configuration file not found: {path}")

        sys.exit(1)

    except yaml.YAMLError as e:

        logger.error(f"Error parsing configuration file: {e}")

        sys.exit(1)



async def main():

    """

    The main async entry point for Octopussy Light.


    Creates the message bus, initializes all agents, and runs

    them all concurrently using asyncio.gather().

    """

    logger.info("Starting Octopussy Light...")


    config = load_config()

    bus = MessageBus()


    coordinator = CoordinatorAgent(config, bus)


    # In later increments, we will add more agents here:

    # mail_agent = MailAgent(config, bus)

    # calendar_agent = CalendarAgent(config, bus)


    try:

        # Run all agents concurrently. asyncio.gather() runs all

        # coroutines concurrently and waits for all to complete.

        await asyncio.gather(

            coordinator.start(),

            # mail_agent.start(),

            # calendar_agent.start(),

        )

    finally:

        await coordinator.stop()



if __name__ == "__main__":

    try:

        asyncio.run(main())

    except KeyboardInterrupt:

        print("\nShutting down...")


This second increment is a significant architectural leap. The system now has a proper message bus, a base agent class, and an asynchronous input loop. The coordinator runs as an asyncio coroutine, which means it can receive messages from other agents even while waiting for user input. The foundation is now ready for real agents to be added.


CHAPTER FOUR: INCREMENT THREE - THE MAIL AGENT


The mail agent is the first specialized agent we will add to Octopussy Light. Its job is to periodically check an email inbox for new messages that match user-defined filters, summarize the matching messages, and send the results to the coordinator.


The mail agent has three configurable aspects. First, the polling interval: how often it checks for new mail, in seconds. Second, the filter rules: a list of regular expression patterns to match against specific parts of each email, such as the sender address, the subject line, or the message body. Third, the IMAP server connection details: host, username, and password.


Let us add the mail agent configuration to config.yaml:


# config.yaml (additions for the mail agent)


mail_agent:

  # How often to check for new mail, in seconds.

  poll_interval_seconds: 60


  # IMAP server connection details.

  # For security, consider loading these from environment variables

  # rather than storing credentials directly in this file.

  imap_host: "imap.gmail.com"

  imap_user: "your_email@gmail.com"

  imap_password: "your_app_password"


  # Filter rules: a list of rules. Each rule specifies a field to

  # match (sender, subject, or body) and a regular expression pattern.

  # A message matches if ANY rule matches.

  filters:

    - field: "sender"

      pattern: "boss@company.com"

    - field: "subject"

      pattern: "(?i)urgent|important|action required"

    - field: "body"

      pattern: "(?i)please review|your approval"


Now let us build the mail agent. It uses aioimaplib for async IMAP access and the LLMClient to summarize matching emails:


# agents/mail_agent.py

#

# The Mail Agent for Octopussy Light.

#

# Periodically checks an IMAP mailbox for new messages that match

# user-defined regular expression filters. Matching messages are

# summarized using an LLM and sent to the coordinator as JSON payloads.


import asyncio

import email

import email.header

import re

import time

from typing import List, Dict, Optional, Set


from aioimaplib import IMAP4_SSL


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from llm.llm_client import LLMClient



class MailAgent(BaseAgent):

    """

    Monitors an IMAP mailbox and notifies the coordinator of

    new messages that match configured regular expression filters.

    """


    def __init__(self, config: dict, bus: MessageBus):

        super().__init__(

            name="mail_agent",

            bus=bus,

            config=config,

            color_key="mail_agent"

        )


        mail_config = config["mail_agent"]

        llm_config = config["llm"]


        self.imap_host = mail_config["imap_host"]

        self.imap_user = mail_config["imap_user"]

        self.imap_password = mail_config["imap_password"]

        self.poll_interval = mail_config["poll_interval_seconds"]

        self.filters = mail_config.get("filters", [])


        # The LLM is used to summarize matching email content.

        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["coordinator_model"],

            system_prompt=(

                "You are an email summarizer. Given the body of an email, "

                "produce a concise 2-3 sentence summary of its key points. "

                "Be factual and objective. Do not add commentary."

            )

        )


        # Track which message UIDs we have already processed so we

        # do not notify the coordinator about the same email twice.

        # Note: this set grows unboundedly for long-running processes.

        # A production implementation would use a bounded cache or

        # persist processed UIDs to disk.

        self.processed_uids: Set[str] = set()


        self.logger.info(

            f"Mail agent initialized. Polling every "

            f"{self.poll_interval}s with {len(self.filters)} filter(s)."

        )


    async def handle_message(self, message: Message):

        """

        The mail agent currently does not respond to incoming messages.

        Future increments may add reconfiguration support here.

        """

        self.logger.debug(

            f"Mail agent received message of type {message.message_type}, "

            f"ignoring (not yet handled)."

        )


    async def run_background_tasks(self):

        """

        The mail agent's main background loop.


        Polls the IMAP server at the configured interval, checks for

        new messages matching the filters, and notifies the coordinator.

        """

        self.logger.info("Mail agent background polling started.")


        while self.running:

            try:

                await self._check_mail()

            except Exception as e:

                self.logger.error(f"Error during mail check: {e}")


            # Wait for the configured interval before checking again.

            # We check self.running every second so we can shut down

            # cleanly without waiting the full poll interval.

            for _ in range(self.poll_interval):

                if not self.running:

                    break

                await asyncio.sleep(1)


    async def _check_mail(self):

        """

        Connect to the IMAP server, fetch new messages, apply filters,

        and send notifications to the coordinator for any matches.

        """

        self.logger.debug("Checking mail...")

        imap_client = None


        try:

            imap_client = IMAP4_SSL(host=self.imap_host)

            await imap_client.wait_hello_from_server()

            await imap_client.login(self.imap_user, self.imap_password)

            await imap_client.select("INBOX")


            # Search for UNSEEN messages only, to avoid reprocessing.

            status, message_ids = await imap_client.search(None, "UNSEEN")


            if status != "OK" or not message_ids[0]:

                self.logger.debug("No new unseen messages.")

                return


            uid_list = message_ids[0].split()

            self.logger.info(f"Found {len(uid_list)} unseen message(s).")


            for uid in uid_list:

                uid_str = uid.decode() if isinstance(uid, bytes) else uid


                if uid_str in self.processed_uids:

                    continue


                parsed = await self._fetch_and_parse(imap_client, uid)

                if parsed is None:

                    continue


                sender, subject, body = parsed


                if self._matches_filters(sender, subject, body):

                    self.logger.info(

                        f"Filter match: UID {uid_str}, "

                        f"From: {sender}, Subject: {subject}"

                    )

                    summary = await self._summarize(body)

                    await self._notify_coordinator(sender, subject, summary)


                self.processed_uids.add(uid_str)


        finally:

            if imap_client:

                try:

                    await imap_client.logout()

                except Exception:

                    pass


    async def _fetch_and_parse(

        self,

        imap_client,

        uid: bytes

    ) -> Optional[tuple]:

        """

        Fetch a single email by UID and extract sender, subject, and body.


        Returns:

            A tuple of (sender, subject, body) strings, or None on error.

        """

        try:

            status, data = await imap_client.fetch(uid, "(RFC822)")

            if status != "OK":

                return None


            for part in data:

                if not isinstance(part, tuple):

                    continue


                msg = email.message_from_bytes(part[1])


                # Decode the sender field.

                sender_raw = msg.get("From", "")

                sender = self._decode_header(sender_raw)


                # Decode the subject field.

                subject_raw = msg.get("Subject", "No subject")

                subject = self._decode_header(subject_raw)


                # Extract the plain text body.

                body = self._extract_body(msg)


                return sender, subject, body


        except Exception as e:

            self.logger.error(f"Error fetching/parsing email UID {uid}: {e}")

            return None


    def _decode_header(self, raw_header: str) -> str:

        """Decode a potentially encoded email header to a plain string."""

        try:

            parts = email.header.decode_header(raw_header)

            decoded_parts = []

            for part, encoding in parts:

                if isinstance(part, bytes):

                    decoded_parts.append(

                        part.decode(encoding or "utf-8", errors="replace")

                    )

                else:

                    decoded_parts.append(str(part))

            return " ".join(decoded_parts)

        except Exception:

            return str(raw_header)


    def _extract_body(self, msg) -> str:

        """

        Extract the plain text body from an email message.


        Handles both simple (non-multipart) and multipart messages.

        Prefers text/plain parts over text/html.

        """

        if msg.is_multipart():

            for part in msg.walk():

                content_type = part.get_content_type()

                disposition = str(part.get("Content-Disposition", ""))

                if content_type == "text/plain" and "attachment" not in disposition:

                    try:

                        return part.get_payload(decode=True).decode(

                            "utf-8", errors="replace"

                        )

                    except Exception:

                        continue

        else:

            try:

                return msg.get_payload(decode=True).decode(

                    "utf-8", errors="replace"

                )

            except Exception:

                pass


        return ""


    def _matches_filters(self, sender: str, subject: str, body: str) -> bool:

        """

        Check whether an email matches any of the configured filter rules.


        Each filter rule specifies a field (sender, subject, or body)

        and a regular expression pattern. The email matches if ANY

        single rule matches.


        Args:

            sender:  The decoded sender address string.

            subject: The decoded subject line string.

            body:    The plain text body of the email.


        Returns:

            True if at least one filter rule matches, False otherwise.

        """

        field_map = {

            "sender": sender,

            "subject": subject,

            "body": body

        }


        for rule in self.filters:

            field = rule.get("field", "").lower()

            pattern = rule.get("pattern", "")

            text = field_map.get(field, "")


            if pattern and re.search(pattern, text):

                return True


        return False


    async def _summarize(self, body: str) -> str:

        """

        Use the LLM to produce a concise summary of the email body.


        Args:

            body: The plain text body of the email.


        Returns:

            A 2-3 sentence summary string.

        """

        if not body.strip():

            return "No body content."


        # Truncate very long bodies to avoid exceeding context limits.

        truncated = body[:4000] if len(body) > 4000 else body


        try:

            summary = await self.llm.chat([

                {"role": "user", "content": f"Summarize this email:\n\n{truncated}"}

            ])

            return summary.strip()

        except Exception as e:

            self.logger.error(f"LLM summarization failed: {e}")

            return "Summary unavailable."


    async def _notify_coordinator(

        self,

        sender: str,

        subject: str,

        summary: str

    ):

        """

        Send a mail notification to the coordinator via the message bus.


        The payload is a JSON-serializable dictionary containing the

        sender, subject, and LLM-generated summary of the email.

        """

        notification = Message(

            message_type=MessageType.MAIL_NOTIFICATION,

            sender=self.name,

            recipient="coordinator",

            payload={

                "sender": sender,

                "subject": subject,

                "summary": summary

            }

        )

        await self.bus.send(notification)

        self.logger.info(

            f"Sent mail notification to coordinator: '{subject}'"

        )


Now update main.py to include the mail agent:


# main.py (Increment 3)


import asyncio

import yaml

import sys


from messaging.message_bus import MessageBus

from coordinator.coordinator_agent import CoordinatorAgent

from agents.mail_agent import MailAgent

from utils.logger import get_logger


logger = get_logger("system", "system")



def load_config(path: str = "config.yaml") -> dict:

    """Load and parse the YAML configuration file."""

    try:

        with open(path, "r") as f:

            return yaml.safe_load(f)

    except FileNotFoundError:

        logger.error(f"Configuration file not found: {path}")

        sys.exit(1)



async def main():

    """Start Octopussy Light with the coordinator and mail agent."""

    logger.info("Starting Octopussy Light...")


    config = load_config()

    bus = MessageBus()


    coordinator = CoordinatorAgent(config, bus)

    mail_agent = MailAgent(config, bus)


    try:

        await asyncio.gather(

            coordinator.start(),

            mail_agent.start(),

        )

    finally:

        await coordinator.stop()

        await mail_agent.stop()



if __name__ == "__main__":

    try:

        asyncio.run(main())

    except KeyboardInterrupt:

        print("\nShutting down...")


Let us trace through what happens when a matching email arrives. The mail agent's background loop wakes up after the configured poll interval. It connects to the IMAP server, fetches unseen messages, and for each one calls _matches_filters(). If a message matches, it calls _summarize() to get an LLM-generated summary, then calls _notify_coordinator() to send a Message object to the coordinator's inbox. The coordinator's _message_loop() picks up this message and calls _display_mail_notification(), which prints something like this to the terminal:


[MAIL] From: boss@company.com

       Subject: Urgent: Q4 Budget Review

       Summary: The sender requests an immediate review of the Q4 budget

       projections. They highlight a discrepancy in the marketing allocation

       and ask for corrections before the board meeting on Friday.


The user sees this notification appear in their terminal even if they are in the middle of a conversation with the coordinator, because the mail agent and the coordinator are running concurrently as asyncio tasks.


CHAPTER FIVE: INCREMENT FOUR - THE CALENDAR AGENT


The calendar agent follows the same pattern as the mail agent but monitors calendar events rather than email. It supports two calendar backends: the operating system's local calendar (via the icalendar library and a local .ics file) and Google Calendar via the Google Calendar API. The user configures which backend to use and how far in advance to receive notifications.


Let us add the calendar configuration to config.yaml:


# config.yaml (additions for the calendar agent)


calendar_agent:

  # How often to check for upcoming events, in seconds.

  poll_interval_seconds: 300


  # How many minutes before an event to send a notification.

  notify_minutes_before: 30


  # Backend: "local" for a local .ics file, "google" for Google Calendar.

  backend: "local"


  # For the "local" backend: path to the .ics file.

  local_ics_path: "/path/to/your/calendar.ics"


  # For the "google" backend: path to the credentials file.

  google_credentials_path: "credentials.json"

  google_token_path: "token.json"

  google_calendar_id: "primary"


Now let us build the calendar agent. It is structured to support multiple backends through a simple strategy pattern:


# agents/calendar_agent.py

#

# The Calendar Agent for Octopussy Light.

#

# Monitors a calendar (local .ics file or Google Calendar) and notifies

# the coordinator about upcoming events within a configurable time window.


import asyncio

import datetime

from typing import List, Optional, Set

from concurrent.futures import ThreadPoolExecutor

import os


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType



class CalendarEvent:

    """A simple data class representing a calendar event."""


    def __init__(

        self,

        event_id: str,

        title: str,

        start_time: datetime.datetime,

        end_time: Optional[datetime.datetime],

        location: str = "",

        description: str = ""

    ):

        self.event_id = event_id

        self.title = title

        self.start_time = start_time

        self.end_time = end_time

        self.location = location

        self.description = description



class CalendarAgent(BaseAgent):

    """

    Monitors a calendar and notifies the coordinator of upcoming events.


    Supports local .ics files and Google Calendar as backends.

    Events are notified a configurable number of minutes in advance.

    """


    def __init__(self, config: dict, bus: MessageBus):

        super().__init__(

            name="calendar_agent",

            bus=bus,

            config=config,

            color_key="calendar"

        )


        cal_config = config["calendar_agent"]

        self.poll_interval = cal_config["poll_interval_seconds"]

        self.notify_minutes_before = cal_config["notify_minutes_before"]

        self.backend = cal_config.get("backend", "local")


        # Track which events we have already notified about,

        # so we do not send duplicate notifications.

        # Note: this set grows unboundedly for long-running processes.

        # A production implementation would prune entries for past events.

        self.notified_event_ids: Set[str] = set()


        if self.backend == "local":

            self.ics_path = cal_config.get("local_ics_path", "")

        elif self.backend == "google":

            self.credentials_path = cal_config.get(

                "google_credentials_path", "credentials.json"

            )

            self.token_path = cal_config.get(

                "google_token_path", "token.json"

            )

            self.calendar_id = cal_config.get(

                "google_calendar_id", "primary"

            )


        self.logger.info(

            f"Calendar agent initialized. Backend: {self.backend}, "

            f"Notifying {self.notify_minutes_before}min before events."

        )


    async def handle_message(self, message: Message):

        """Calendar agent does not currently handle incoming messages."""

        pass


    async def run_background_tasks(self):

        """Poll the calendar at the configured interval."""

        self.logger.info("Calendar agent background polling started.")


        while self.running:

            try:

                await self._check_calendar()

            except Exception as e:

                self.logger.error(f"Error during calendar check: {e}")


            for _ in range(self.poll_interval):

                if not self.running:

                    break

                await asyncio.sleep(1)


    async def _check_calendar(self):

        """

        Fetch upcoming events and notify the coordinator for any

        that fall within the notification window.

        """

        now = datetime.datetime.now(datetime.timezone.utc)

        notify_delta = datetime.timedelta(minutes=self.notify_minutes_before)

        window_end = now + notify_delta


        if self.backend == "local":

            events = await self._fetch_local_events(now, window_end)

        elif self.backend == "google":

            events = await self._fetch_google_events(now, window_end)

        else:

            self.logger.error(f"Unknown calendar backend: {self.backend}")

            return


        for event in events:

            if event.event_id not in self.notified_event_ids:

                await self._notify_coordinator(event)

                self.notified_event_ids.add(event.event_id)


    async def _fetch_local_events(

        self,

        start: datetime.datetime,

        end: datetime.datetime

    ) -> List[CalendarEvent]:

        """

        Read events from a local .ics file that fall within [start, end].


        Uses the icalendar library, which is synchronous, so we run it

        in a thread pool to avoid blocking the asyncio event loop.


        Note: Naive datetimes in the .ics file (those without timezone

        information) are assumed to be UTC. If your calendar stores events

        in a local timezone, you should convert them appropriately before

        comparison. Consider using the 'pytz' library for timezone-aware

        datetime handling in production deployments.

        """

        loop = asyncio.get_running_loop()


        def _read_ics():

            """Synchronous function to read and parse the .ics file."""

            try:

                from icalendar import Calendar


                if not os.path.exists(self.ics_path):

                    self.logger.warning(

                        f"ICS file not found: {self.ics_path}"

                    )

                    return []


                with open(self.ics_path, "rb") as f:

                    cal = Calendar.from_ical(f.read())


                events = []

                for component in cal.walk():

                    if component.name != "VEVENT":

                        continue


                    dtstart = component.get("DTSTART")

                    if dtstart is None:

                        continue


                    event_start = dtstart.dt

                    # Normalize to UTC datetime for comparison.

                    if isinstance(event_start, datetime.date) and not isinstance(

                        event_start, datetime.datetime

                    ):

                        event_start = datetime.datetime.combine(

                            event_start,

                            datetime.time.min,

                            tzinfo=datetime.timezone.utc

                        )

                    elif isinstance(event_start, datetime.datetime):

                        if event_start.tzinfo is None:

                            # Naive datetime: assume UTC.

                            event_start = event_start.replace(

                                tzinfo=datetime.timezone.utc

                            )


                    if not (start <= event_start <= end):

                        continue


                    dtend = component.get("DTEND")

                    event_end = dtend.dt if dtend else None

                    if isinstance(event_end, datetime.date) and not isinstance(

                        event_end, datetime.datetime

                    ):

                        event_end = datetime.datetime.combine(

                            event_end,

                            datetime.time.min,

                            tzinfo=datetime.timezone.utc

                        )


                    uid = str(component.get("UID", ""))

                    summary = str(component.get("SUMMARY", "Untitled"))

                    location = str(component.get("LOCATION", ""))

                    description = str(component.get("DESCRIPTION", ""))


                    events.append(CalendarEvent(

                        event_id=uid,

                        title=summary,

                        start_time=event_start,

                        end_time=event_end,

                        location=location,

                        description=description

                    ))


                return events


            except Exception as e:

                self.logger.error(f"Error reading ICS file: {e}")

                return []


        with ThreadPoolExecutor() as executor:

            return await loop.run_in_executor(executor, _read_ics)


    async def _fetch_google_events(

        self,

        start: datetime.datetime,

        end: datetime.datetime

    ) -> List[CalendarEvent]:

        """

        Fetch events from Google Calendar that fall within [start, end].


        The Google API client is synchronous, so we run it in a thread

        pool executor to avoid blocking the asyncio event loop.

        """

        loop = asyncio.get_running_loop()


        def _fetch_sync():

            """Synchronous Google Calendar API call."""

            try:

                from google.auth.transport.requests import Request

                from google.oauth2.credentials import Credentials

                from google_auth_oauthlib.flow import InstalledAppFlow

                from googleapiclient.discovery import build


                SCOPES = ["https://www.googleapis.com/auth/calendar.readonly"]

                creds = None


                if os.path.exists(self.token_path):

                    creds = Credentials.from_authorized_user_file(

                        self.token_path, SCOPES

                    )


                if not creds or not creds.valid:

                    if creds and creds.expired and creds.refresh_token:

                        creds.refresh(Request())

                    else:

                        flow = InstalledAppFlow.from_client_secrets_file(

                            self.credentials_path, SCOPES

                        )

                        creds = flow.run_local_server(port=0)

                    with open(self.token_path, "w") as token:

                        token.write(creds.to_json())


                service = build("calendar", "v3", credentials=creds)


                result = service.events().list(

                    calendarId=self.calendar_id,

                    timeMin=start.isoformat(),

                    timeMax=end.isoformat(),

                    singleEvents=True,

                    orderBy="startTime"

                ).execute()


                events = []

                for item in result.get("items", []):

                    event_id = item.get("id", "")

                    title = item.get("summary", "Untitled")

                    location = item.get("location", "")

                    description = item.get("description", "")


                    start_str = item["start"].get(

                        "dateTime", item["start"].get("date")

                    )

                    end_str = item["end"].get(

                        "dateTime", item["end"].get("date")

                    ) if "end" in item else None


                    event_start = datetime.datetime.fromisoformat(

                        start_str.replace("Z", "+00:00")

                    )

                    event_end = (

                        datetime.datetime.fromisoformat(

                            end_str.replace("Z", "+00:00")

                        )

                        if end_str else None

                    )


                    events.append(CalendarEvent(

                        event_id=event_id,

                        title=title,

                        start_time=event_start,

                        end_time=event_end,

                        location=location,

                        description=description

                    ))


                return events


            except Exception as e:

                self.logger.error(f"Error fetching Google Calendar events: {e}")

                return []


        with ThreadPoolExecutor() as executor:

            return await loop.run_in_executor(executor, _fetch_sync)


    async def _notify_coordinator(self, event: CalendarEvent):

        """

        Send a calendar notification to the coordinator.


        The payload contains all relevant event information as a

        JSON-serializable dictionary.

        """

        notification = Message(

            message_type=MessageType.CALENDAR_NOTIFICATION,

            sender=self.name,

            recipient="coordinator",

            payload={

                "event_id": event.event_id,

                "title": event.title,

                "start_time": event.start_time.isoformat(),

                "end_time": event.end_time.isoformat() if event.end_time else None,

                "location": event.location,

                "description": event.description

            }

        )

        await self.bus.send(notification)

        self.logger.info(

            f"Sent calendar notification to coordinator: '{event.title}'"

        )


And update main.py to include the calendar agent:


# main.py (Increment 4)


import asyncio

import yaml

import sys


from messaging.message_bus import MessageBus

from coordinator.coordinator_agent import CoordinatorAgent

from agents.mail_agent import MailAgent

from agents.calendar_agent import CalendarAgent

from utils.logger import get_logger


logger = get_logger("system", "system")



def load_config(path: str = "config.yaml") -> dict:

    """Load and parse the YAML configuration file."""

    try:

        with open(path, "r") as f:

            return yaml.safe_load(f)

    except FileNotFoundError:

        logger.error(f"Configuration file not found: {path}")

        sys.exit(1)



async def main():

    """Start Octopussy Light with coordinator, mail, and calendar agents."""

    logger.info("Starting Octopussy Light...")


    config = load_config()

    bus = MessageBus()


    coordinator = CoordinatorAgent(config, bus)

    mail_agent = MailAgent(config, bus)

    calendar_agent = CalendarAgent(config, bus)


    all_agents = [coordinator, mail_agent, calendar_agent]


    try:

        await asyncio.gather(

            *[agent.start() for agent in all_agents]

        )

    finally:

        for agent in all_agents:

            await agent.stop()



if __name__ == "__main__":

    try:

        asyncio.run(main())

    except KeyboardInterrupt:

        print("\nShutting down...")


When a calendar event is within the notification window, the user will see output like this in their terminal:


[CALENDAR] Event: Q4 Budget Review Meeting

           When: 2025-11-18T14:00:00+00:00

           Location: Conference Room B / Zoom

           Description: Quarterly budget review with the finance team.

           Please bring updated projections.


This notification appears regardless of what the user is currently doing in the terminal, because the calendar agent and the coordinator are running as concurrent asyncio tasks.


CHAPTER SIX: INCREMENT FIVE - WORKER AGENTS AND TASK MANAGEMENT


Worker agents are the most powerful and flexible part of Octopussy Light. They are created on demand when the user wants to execute a task, and they can be reused if an idle worker is available. Each worker gets a unique task ID, maintains its own short-term and long-term memory, and can send multiple responses back to the coordinator for the same task.


The coordinator is responsible for creating workers, assigning tasks to them, reusing idle workers when appropriate, and cleaning up excess idle workers. Before assigning a task, the coordinator runs the task description through a prompt optimizer to improve the quality of the instructions the worker receives.


In this increment, workers reason about tasks using only their internal LLM knowledge. In the next increment, we will equip them with the predefined MCP tools that Octopussy Light ships with out of the box.


Let us start with the prompt optimizer, since it is a prerequisite for the worker system:


# coordinator/prompt_optimizer.py

#

# The Prompt Optimizer for Octopussy Light.

#

# Before a task is sent to a worker agent, the coordinator uses this

# optimizer to improve the task description. A well-crafted prompt

# leads to better, more focused results from the worker.


from llm.llm_client import LLMClient

from utils.logger import get_logger


logger = get_logger("prompt_optimizer", "coordinator")


OPTIMIZER_SYSTEM_PROMPT = """

You are a prompt engineering expert. Your job is to take a user's task

description and rewrite it as a precise, unambiguous instruction for an

AI agent. The optimized prompt should:


1. State the goal clearly and specifically.

2. Specify the expected output format (e.g., JSON, plain text, code).

3. Include any relevant constraints or requirements.

4. Remove ambiguity and vagueness.

5. Be concise but complete.


Return ONLY the optimized prompt text, with no preamble or explanation.

"""



class PromptOptimizer:

    """

    Uses an LLM to optimize task descriptions before they are sent

    to worker agents. Better prompts lead to better results.

    """


    def __init__(self, llm_config: dict):

        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["coordinator_model"],

            system_prompt=OPTIMIZER_SYSTEM_PROMPT

        )

        logger.info("Prompt optimizer initialized.")


    async def optimize(self, task_description: str) -> str:

        """

        Optimize a task description for delivery to a worker agent.


        Args:

            task_description: The raw task description from the user.


        Returns:

            An optimized version of the task description.

        """

        logger.debug(f"Optimizing prompt: {task_description[:60]}...")


        try:

            optimized = await self.llm.chat([

                {

                    "role": "user",

                    "content": (

                        f"Optimize this task description for an AI agent:\n\n"

                        f"{task_description}"

                    )

                }

            ])

            logger.debug(f"Optimized prompt: {optimized[:60]}...")

            return optimized.strip()

        except Exception as e:

            logger.error(f"Prompt optimization failed: {e}. Using original.")

            return task_description


Now let us build the worker agent. In this increment the worker has its own short-term memory (conversation history) and long-term memory (a vector store using ChromaDB), but it does not yet call external tools — that comes in the next increment:


# agents/worker_agent.py

#

# The Worker Agent for Octopussy Light - Increment 5

#

# Worker agents are created on demand to execute specific tasks.

# Each worker has short-term memory (conversation history for the

# current task) and long-term memory (ChromaDB vector store). Workers

# can send multiple responses for a single task and include a unique

# task ID in every response so the coordinator can associate them correctly.

#

# In this increment workers rely on the LLM's internal knowledge only.

# Tool calling via MCP is added in Increment 6.


import asyncio

import uuid

from typing import List, Dict, Optional


import chromadb


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from llm.llm_client import LLMClient

from utils.logger import get_logger


WORKER_SYSTEM_PROMPT = """

You are a focused, capable worker agent in the Octopussy Light system.

You receive tasks from the coordinator and execute them to the best of

your ability. You are thorough, accurate, and efficient. When you have

completed a task, you provide a clear, well-structured response. If you

cannot complete a task, you explain why clearly and specifically.

"""



class WorkerAgent(BaseAgent):

    """

    A worker agent that executes tasks assigned by the coordinator.


    Each worker maintains short-term memory (conversation history for

    the current task) and long-term memory (a persistent vector store

    for facts and context that may be useful across tasks).


    Workers are identified by a unique worker_id and can be reused

    for multiple tasks sequentially. When a new task is assigned,

    the worker's short-term memory is cleared.

    """


    def __init__(

        self,

        worker_id: str,

        config: dict,

        bus: MessageBus

    ):

        """

        Initialize a worker agent.


        Args:

            worker_id: A unique identifier for this worker instance.

            config:    The application configuration dictionary.

            bus:       The shared MessageBus instance.

        """

        name = f"worker_{worker_id}"

        super().__init__(

            name=name,

            bus=bus,

            config=config,

            color_key="worker"

        )


        self.worker_id = worker_id

        llm_config = config["llm"]


        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["worker_model"],

            system_prompt=WORKER_SYSTEM_PROMPT

        )


        # Short-term memory: cleared when a new task is assigned.

        self.short_term_memory: List[Dict[str, str]] = []


        # Long-term memory: a ChromaDB collection persisted to disk.

        self._init_long_term_memory()


        # The current task ID, set when a task is assigned.

        self.current_task_id: Optional[str] = None


        # Whether this worker is currently executing a task.

        self.is_busy: bool = False


        self.logger.info(f"Worker '{name}' initialized.")


    def _init_long_term_memory(self):

        """

        Initialize the ChromaDB vector store for long-term memory.


        Each worker gets its own collection in a shared ChromaDB

        instance. The collection persists to disk across restarts.

        """

        try:

            self.chroma_client = chromadb.PersistentClient(

                path=f"./memory/worker_{self.worker_id}"

            )

            self.memory_collection = self.chroma_client.get_or_create_collection(

                name=f"worker_{self.worker_id}_memory"

            )

            self.logger.info(

                f"Long-term memory initialized for worker {self.worker_id}."

            )

        except Exception as e:

            self.logger.error(f"Failed to initialize long-term memory: {e}")

            self.memory_collection = None


    def clear_short_term_memory(self):

        """

        Clear the worker's short-term memory.


        Called by the coordinator before assigning a new task to

        ensure the worker starts with a clean context.

        """

        self.short_term_memory = []

        self.logger.debug("Short-term memory cleared.")


    async def store_in_long_term_memory(

        self, content: str, metadata: dict = None

    ):

        """

        Store a piece of information in long-term memory.


        Args:

            content:  The text to store.

            metadata: Optional metadata to associate with this memory.

        """

        if self.memory_collection is None:

            return

        try:

            self.memory_collection.add(

                documents=[content],

                ids=[str(uuid.uuid4())],

                metadatas=[metadata or {}]

            )

            self.logger.debug(f"Stored in long-term memory: {content[:60]}...")

        except Exception as e:

            self.logger.error(f"Failed to store in long-term memory: {e}")


    async def retrieve_from_long_term_memory(

        self,

        query: str,

        n_results: int = 3

    ) -> List[str]:

        """

        Retrieve relevant memories from long-term storage using

        semantic similarity search.


        Args:

            query:     The query text to search for.

            n_results: The maximum number of results to return.


        Returns:

            A list of relevant memory strings.

        """

        if self.memory_collection is None:

            return []

        try:

            count = self.memory_collection.count()

            if count == 0:

                return []

            results = self.memory_collection.query(

                query_texts=[query],

                n_results=min(n_results, count)

            )

            return results.get("documents", [[]])[0]

        except Exception as e:

            self.logger.error(f"Failed to retrieve from long-term memory: {e}")

            return []


    async def handle_message(self, message: Message):

        """Handle an incoming message from the coordinator."""

        if message.message_type == MessageType.ASSIGN_TASK:

            await self._handle_task_assignment(message)

        elif message.message_type == MessageType.SHUTDOWN_AGENT:

            await self.stop()

        else:

            self.logger.warning(

                f"Worker received unhandled message type: "

                f"{message.message_type}"

            )


    async def _handle_task_assignment(self, message: Message):

        """

        Process a task assignment from the coordinator.


        Clears short-term memory, retrieves relevant long-term memories,

        executes the task, and sends the result back to the coordinator.

        """

        payload = message.payload

        task_id = payload.get("task_id", str(uuid.uuid4()))

        task_description = payload.get("task_description", "")


        self.current_task_id = task_id

        self.is_busy = True


        self.logger.info(

            f"Received task {task_id}: {task_description[:60]}..."

        )


        self.clear_short_term_memory()


        # Retrieve relevant long-term memories for context.

        relevant_memories = await self.retrieve_from_long_term_memory(

            task_description

        )

        if relevant_memories:

            memory_context = (

                "Relevant information from previous tasks:\n"

                + "\n".join(f"- {m}" for m in relevant_memories)

            )

            self.short_term_memory.append({

                "role": "system",

                "content": memory_context

            })


        self.short_term_memory.append({

            "role": "user",

            "content": task_description

        })


        try:

            result = await self.llm.chat(self.short_term_memory)


            self.short_term_memory.append({

                "role": "assistant",

                "content": result

            })


            await self.store_in_long_term_memory(

                f"Task: {task_description}\nResult: {result}",

                metadata={"task_id": task_id}

            )


            await self._send_result(task_id, result, response_number=1)


        except Exception as e:

            error_msg = f"Task execution failed: {str(e)}"

            self.logger.error(error_msg)

            await self._send_error(task_id, error_msg)

        finally:

            self.is_busy = False


    async def _send_result(

        self,

        task_id: str,

        result: str,

        response_number: int = 1

    ):

        """Send a task result back to the coordinator."""

        response = Message(

            message_type=MessageType.TASK_RESULT,

            sender=self.name,

            recipient="coordinator",

            task_id=task_id,

            payload={

                "task_id": task_id,

                "worker_id": self.worker_id,

                "result": result,

                "response_number": response_number

            }

        )

        await self.bus.send(response)

        self.logger.info(

            f"Sent result for task {task_id} (response #{response_number})."

        )


    async def _send_error(self, task_id: str, error_message: str):

        """Send an error message back to the coordinator."""

        error = Message(

            message_type=MessageType.TASK_ERROR,

            sender=self.name,

            recipient="coordinator",

            task_id=task_id,

            payload={

                "task_id": task_id,

                "worker_id": self.worker_id,

                "error": error_message

            }

        )

        await self.bus.send(error)

        self.logger.error(

            f"Sent error for task {task_id}: {error_message}"

        )


Now we need to update the coordinator to manage workers. The coordinator needs to detect when the user wants to create a task, spawn or reuse a worker, optimize the task prompt, and assign the task. It also needs to manage the pool of idle workers. Let us build the WorkerManager:


# coordinator/worker_manager.py

#

# The Worker Manager for Octopussy Light - Increment 5

#

# Manages the lifecycle of worker agents: creating new workers,

# reusing idle workers, assigning tasks, and cleaning up excess

# idle workers. This keeps the coordinator's code clean and focused.


import asyncio

import uuid

from typing import Dict, List


from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from coordinator.prompt_optimizer import PromptOptimizer

from utils.logger import get_logger


logger = get_logger("worker_manager", "coordinator")



class WorkerManager:

    """

    Manages the pool of worker agents in Octopussy Light.


    Responsibilities:

    - Creating new worker agents when needed.

    - Reusing idle workers to avoid unnecessary resource usage.

    - Assigning tasks with unique task IDs.

    - Cleaning up excess idle workers.

    """


    # Maximum number of idle workers to keep alive. If there are

    # more idle workers than this, the excess will be shut down.

    MAX_IDLE_WORKERS = 3


    def __init__(self, config: dict, bus: MessageBus):

        self.config = config

        self.bus = bus

        self.prompt_optimizer = PromptOptimizer(config["llm"])


        # Maps worker names to WorkerAgent instances.

        self._workers: Dict[str, object] = {}


        logger.info("Worker manager initialized.")


    def _get_idle_workers(self) -> List[object]:

        """Return a list of all workers that are not currently busy."""

        return [w for w in self._workers.values() if not w.is_busy]


    async def assign_task(self, task_description: str) -> str:

        """

        Assign a task to a worker agent.


        If an idle worker is available, it will be reused. Otherwise,

        a new worker is created. The task description is optimized

        before being sent to the worker.


        Args:

            task_description: The raw task description from the user.


        Returns:

            The unique task ID assigned to this task.

        """

        # Import here to avoid circular imports at module load time.

        from agents.worker_agent import WorkerAgent


        task_id = str(uuid.uuid4())[:8]


        optimized_description = await self.prompt_optimizer.optimize(

            task_description

        )

        logger.info(

            f"Task {task_id} optimized. "

            f"Original: '{task_description[:40]}...', "

            f"Optimized: '{optimized_description[:40]}...'"

        )


        idle_workers = self._get_idle_workers()

        if idle_workers:

            worker = idle_workers[0]

            logger.info(

                f"Reusing idle worker '{worker.name}' for task {task_id}."

            )

        else:

            worker_id = str(uuid.uuid4())[:6]

            worker = WorkerAgent(worker_id, self.config, self.bus)

            self._workers[worker.name] = worker

            # Note: dynamically created worker tasks are not tracked for

            # cancellation on shutdown. A production implementation would

            # store the task handles and cancel them during teardown.

            asyncio.create_task(worker.start())

            logger.info(

                f"Created new worker '{worker.name}' for task {task_id}."

            )


        task_message = Message(

            message_type=MessageType.ASSIGN_TASK,

            sender="coordinator",

            recipient=worker.name,

            task_id=task_id,

            payload={

                "task_id": task_id,

                "task_description": optimized_description

            }

        )

        await self.bus.send(task_message)

        await self._cleanup_idle_workers()


        return task_id


    async def _cleanup_idle_workers(self):

        """

        Shut down excess idle workers to free resources.


        If there are more idle workers than MAX_IDLE_WORKERS, the

        excess workers are sent a shutdown message and removed from

        the pool. We iterate over a snapshot of the idle list to

        avoid mutating self._workers while reading it.

        """

        idle_workers = self._get_idle_workers()

        excess_count = len(idle_workers) - self.MAX_IDLE_WORKERS


        if excess_count <= 0:

            return


        logger.info(f"Cleaning up {excess_count} excess idle worker(s).")


        for worker in idle_workers[:excess_count]:

            shutdown_msg = Message(

                message_type=MessageType.SHUTDOWN_AGENT,

                sender="coordinator",

                recipient=worker.name

            )

            await self.bus.send(shutdown_msg)

            self.bus.unregister(worker.name)

            del self._workers[worker.name]

            logger.info(f"Shut down idle worker '{worker.name}'.")


Now we need to update the coordinator to use the WorkerManager and to detect when the user is requesting a task. We will use the LLM to determine whether the user's input is a task request or a conversational message:


# coordinator/coordinator_agent.py (Increment 5 - with worker management)


import asyncio

import sys

import json

from typing import List, Dict


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from coordinator.worker_manager import WorkerManager

from llm.llm_client import LLMClient

from utils.logger import get_logger


logger = get_logger("coordinator", "coordinator")


COORDINATOR_SYSTEM_PROMPT = """

You are the coordinator of an agentic AI system called Octopussy Light.

Your job is to understand the user's requests, coordinate specialized

agents to fulfill those requests, and present results clearly.


When the user asks you to perform a task (something that requires research,

analysis, writing, coding, or other substantive work), respond with a JSON

object in this exact format:

{"action": "create_task", "task_description": "<the task>"}


For conversational messages, questions about the system, or simple requests

that you can answer directly, respond normally in plain text.


You are friendly, precise, and efficient.

"""



class CoordinatorAgent(BaseAgent):

    """

    The central coordinator of Octopussy Light.

    Handles user communication and coordinates all other agents.

    """


    def __init__(self, config: dict, bus: MessageBus):

        super().__init__(

            name="coordinator",

            bus=bus,

            config=config,

            color_key="coordinator"

        )


        llm_config = config["llm"]


        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["coordinator_model"],

            system_prompt=COORDINATOR_SYSTEM_PROMPT

        )


        self.conversation_history: List[Dict[str, str]] = []

        self.worker_manager = WorkerManager(config, bus)


        self.logger.info(

            f"Coordinator initialized with model: "

            f"{llm_config['coordinator_model']}"

        )


    async def handle_message(self, message: Message):

        """Handle messages arriving from other agents."""

        if message.message_type == MessageType.MAIL_NOTIFICATION:

            await self._display_mail_notification(message.payload)

        elif message.message_type == MessageType.CALENDAR_NOTIFICATION:

            await self._display_calendar_notification(message.payload)

        elif message.message_type == MessageType.TASK_RESULT:

            await self._display_task_result(message.payload)

        elif message.message_type == MessageType.TASK_ERROR:

            await self._display_task_error(message.payload)


    async def _display_mail_notification(self, payload: dict):

        print(

            f"\n[MAIL] From: {payload.get('sender', 'Unknown')}\n"

            f"       Subject: {payload.get('subject', 'No subject')}\n"

            f"       Summary: {payload.get('summary', '')}\n"

        )


    async def _display_calendar_notification(self, payload: dict):

        print(

            f"\n[CALENDAR] Event: {payload.get('title', 'Untitled')}\n"

            f"           When: {payload.get('start_time', 'Unknown')}\n"

            f"           Location: {payload.get('location', 'Not specified')}\n"

        )


    async def _display_task_result(self, payload: dict):

        task_id = payload.get("task_id", "unknown")

        response_num = payload.get("response_number", 1)

        result = payload.get("result", "")

        print(

            f"\n[TASK RESULT] Task {task_id} "

            f"(response #{response_num}):\n{result}\n"

        )


    async def _display_task_error(self, payload: dict):

        task_id = payload.get("task_id", "unknown")

        error = payload.get("error", "Unknown error")

        print(f"\n[TASK ERROR] Task {task_id} failed: {error}\n")


    async def chat(self, user_input: str) -> str:

        """

        Process user input. If the LLM indicates a task should be created,

        delegate to the worker manager. Otherwise, respond conversationally.

        """

        self.conversation_history.append({

            "role": "user",

            "content": user_input

        })


        response = await self.llm.chat(self.conversation_history)


        try:

            parsed = json.loads(response.strip())

            if parsed.get("action") == "create_task":

                task_desc = parsed.get("task_description", user_input)

                task_id = await self.worker_manager.assign_task(task_desc)

                reply = (

                    f"I have assigned your task (ID: {task_id}) to a worker. "

                    f"You will receive the result shortly."

                )

                self.conversation_history.append({

                    "role": "assistant",

                    "content": reply

                })

                return reply

        except (json.JSONDecodeError, AttributeError):

            pass


        self.conversation_history.append({

            "role": "assistant",

            "content": response

        })

        return response


    async def run_background_tasks(self):

        """Read user input asynchronously."""

        loop = asyncio.get_running_loop()

        print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")


        while self.running:

            try:

                print("You: ", end="", flush=True)

                user_input = await loop.run_in_executor(

                    None, sys.stdin.readline

                )

                user_input = user_input.strip()


                if not user_input:

                    continue


                if user_input.lower() in ("quit", "exit"):

                    self.running = False

                    print("Goodbye!")

                    break


                response = await self.chat(user_input)

                print(f"\nCoordinator: {response}\n")


            except Exception as e:

                self.logger.error(f"Error in input loop: {e}")


Here is an example interaction with the system at this point:

You: Write me a Python function that checks if a number is prime.


Coordinator: I have assigned your task (ID: a3f7b2c1) to a worker.

You will receive the result shortly.


[TASK RESULT] Task a3f7b2c1 (response #1):

Here is a Python function that checks if a number is prime:


def is_prime(n: int) -> bool:

    if n < 2:

        return False

    if n == 2:

        return True

    if n % 2 == 0:

        return False

    for i in range(3, int(n**0.5) + 1, 2):

        if n % i == 0:

            return False

    return True


You: Hello, how are you?


Coordinator: I am doing well, thank you for asking! I am ready to help

you with tasks, email monitoring, calendar events, and more. What would

you like to do?


The coordinator correctly distinguishes between a task request (which it delegates to a worker) and a conversational message (which it handles directly). The task ID allows the user to associate the result with the original request even if other notifications arrive in between. Workers in this increment rely entirely on the LLM's internal knowledge. In the next increment we will wire in the MCP built-in tools server and give workers real-world capabilities.


CHAPTER SEVEN: INCREMENT SIX - PREDEFINED MCP TOOLS FOR WORKERS


Workers can now reason and respond, but they are blind to the outside world. They cannot read files, execute code, or call any external service. This increment changes that by introducing the Model Context Protocol infrastructure and wiring Octopussy Light's predefined built-in tools directly to every worker agent.


The approach is deliberately incremental. We first build the MCP tools server with a curated set of built-in tools, then build the client-side infrastructure (MCPToolClient and MCPServerRegistry), then update the WorkerAgent to implement the agentic ReAct loop (Reason, Act, Observe), and finally update the WorkerManager and coordinator to pass the registry down to workers. The tool agent — which can create entirely new tools dynamically — comes in the next increment and builds naturally on top of this foundation.


Let us start by adding the MCP server configuration to config.yaml:


# config.yaml (additions for MCP servers)


mcp_servers:

  # The built-in tools server ships with Octopussy Light and is

  # always present. It provides file I/O and code execution tools.

  - name: "builtin"

    type: "local"

    command: ["python", "mcp_server/builtin_tools_server.py"]


  # To add a remote or custom local MCP server, uncomment and edit:

  # - name: "my_custom_server"

  #   type: "local"

  #   command: ["python", "/path/to/my_mcp_server.py"]

  #

  # - name: "company_data"

  #   type: "http"

  #   url: "https://mcp.yourcompany.com/tools"

  #   api_key: "your_api_key_here"


Now let us build the MCP tools server. We use the FastMCP class from the MCP Python SDK to expose four predefined tools: reading files, writing files, listing directories, and executing Python code in a subprocess sandbox:


# mcp_server/builtin_tools_server.py

#

# The built-in MCP tools server for Octopussy Light.

#

# This server exposes the system's predefined built-in tools to all

# agents via the Model Context Protocol. It runs as a separate process

# and communicates with agents via STDIO transport.

#

# Built-in tools:

#   - read_file:       Read the contents of a local file.

#   - write_file:      Write content to a local file.

#   - list_directory:  List the contents of a directory.

#   - execute_python:  Execute a Python code snippet in a sandbox.


import os

import subprocess

import tempfile

from pathlib import Path


from mcp.server.fastmcp import FastMCP


# Initialize the FastMCP server with a descriptive name.

mcp = FastMCP("OctopussyLight-BuiltinTools")



@mcp.tool()

def read_file(file_path: str) -> str:

    """

    Read the contents of a local file and return them as a string.


    Args:

        file_path: The absolute or relative path to the file to read.


    Returns:

        The file contents as a string, or an error message if the

        file cannot be read.

    """

    try:

        path = Path(file_path)

        if not path.exists():

            return f"Error: File not found: {file_path}"

        if not path.is_file():

            return f"Error: Path is not a file: {file_path}"


        with open(path, "r", encoding="utf-8", errors="replace") as f:

            content = f.read()


        return content


    except PermissionError:

        return f"Error: Permission denied reading file: {file_path}"

    except Exception as e:

        return f"Error reading file: {str(e)}"



@mcp.tool()

def write_file(file_path: str, content: str) -> str:

    """

    Write content to a local file, creating it if it does not exist.


    Args:

        file_path: The path to the file to write.

        content:   The content to write to the file.


    Returns:

        A success message or an error message.

    """

    try:

        path = Path(file_path)

        path.parent.mkdir(parents=True, exist_ok=True)


        with open(path, "w", encoding="utf-8") as f:

            f.write(content)


        return f"Successfully wrote {len(content)} characters to {file_path}."


    except PermissionError:

        return f"Error: Permission denied writing to: {file_path}"

    except Exception as e:

        return f"Error writing file: {str(e)}"



@mcp.tool()

def list_directory(directory_path: str) -> str:

    """

    List the contents of a directory.


    Args:

        directory_path: The path to the directory to list.


    Returns:

        A formatted string listing the directory contents, or an error.

    """

    try:

        path = Path(directory_path)

        if not path.exists():

            return f"Error: Directory not found: {directory_path}"

        if not path.is_dir():

            return f"Error: Path is not a directory: {directory_path}"


        entries = []

        for entry in sorted(path.iterdir()):

            entry_type = "DIR " if entry.is_dir() else "FILE"

            size = entry.stat().st_size if entry.is_file() else 0

            entries.append(f"{entry_type}  {entry.name}  ({size} bytes)")


        if not entries:

            return f"Directory is empty: {directory_path}"


        return f"Contents of {directory_path}:\n" + "\n".join(entries)


    except Exception as e:

        return f"Error listing directory: {str(e)}"



@mcp.tool()

def execute_python(code: str) -> str:

    """

    Execute a Python code snippet in a sandboxed subprocess.


    The code runs in a separate process with a 10-second timeout.

    Standard output and standard error are captured and returned.


    SECURITY NOTE: This tool executes arbitrary Python code. In a

    production environment, use a proper sandbox like Docker or gVisor.

    For Octopussy Light, we use subprocess isolation as a basic measure.


    Args:

        code: The Python code to execute.


    Returns:

        The combined stdout and stderr output, or a timeout/error message.

    """

    tmp_path = None

    try:

        # Write the code to a temporary file to avoid shell injection.

        with tempfile.NamedTemporaryFile(

            mode="w",

            suffix=".py",

            delete=False,

            encoding="utf-8"

        ) as tmp:

            tmp.write(code)

            tmp_path = tmp.name


        result = subprocess.run(

            ["python", tmp_path],

            capture_output=True,

            text=True,

            timeout=10,

            # Restrict environment variables for basic isolation.

            env={"PATH": os.environ.get("PATH", "")}

        )


        output = ""

        if result.stdout:

            output += f"STDOUT:\n{result.stdout}\n"

        if result.stderr:

            output += f"STDERR:\n{result.stderr}\n"

        if not output:

            output = "Code executed successfully with no output."


        return output


    except subprocess.TimeoutExpired:

        return "Error: Code execution timed out after 10 seconds."

    except Exception as e:

        return f"Error executing code: {str(e)}"

    finally:

        if tmp_path is not None:

            try:

                os.unlink(tmp_path)

            except Exception:

                pass



if __name__ == "__main__":

    # Run the MCP server using STDIO transport.

    # Agents connect to this server by launching it as a subprocess.

    mcp.run(transport="stdio")


Now let us build the MCP client that agents use to connect to MCP servers, discover their tools, and call them:


# mcp_client/mcp_tool_client.py

#

# The MCP Tool Client for Octopussy Light agents.

#

# Each agent uses this client to connect to MCP servers and call

# their tools. Supports local (STDIO) servers. The client maintains

# a persistent connection across multiple tool calls.


from typing import Any, Dict, List, Optional


from mcp import ClientSession, StdioServerParameters

from mcp.client.stdio import stdio_client

from utils.logger import get_logger


logger = get_logger("mcp_client", "system")



class MCPToolClient:

    """

    A client for connecting to a local MCP server and calling its tools.


    The client maintains a persistent connection to the server process

    across multiple tool calls. Call connect() once at startup and

    disconnect() once at shutdown.

    """


    def __init__(self, server_command: List[str], server_name: str = "unknown"):

        """

        Initialize the MCP tool client.


        Args:

            server_command: The command to launch the MCP server process,

                            as a list of strings. For example:

                            ["python", "mcp_server/builtin_tools_server.py"]

            server_name:    A human-readable name for this server,

                            used in log messages.

        """

        self.server_command = server_command

        self.server_name = server_name

        self._session: Optional[ClientSession] = None

        self._available_tools: List[Dict] = []


        # These context manager objects are stored so that disconnect()

        # can cleanly exit them even if connect() was called without

        # a surrounding 'async with' block.

        self._stdio_context = None

        self._session_context = None


        logger.info(f"MCP client initialized for server: {server_name}")


    async def connect(self):

        """

        Connect to the MCP server and discover available tools.


        Launches the server subprocess, performs the MCP protocol

        handshake, and populates the list of available tools.

        Must be called before any call to call_tool().

        """

        try:

            server_params = StdioServerParameters(

                command=self.server_command[0],

                args=self.server_command[1:],

                env=None

            )


            # Enter the stdio_client context manager manually so we can

            # keep the connection alive across multiple tool calls.

            self._stdio_context = stdio_client(server_params)

            read, write = await self._stdio_context.__aenter__()


            # Enter the ClientSession context manager manually for the

            # same reason.

            self._session_context = ClientSession(read, write)

            self._session = await self._session_context.__aenter__()


            # Perform the MCP protocol handshake.

            await self._session.initialize()


            # Discover available tools from the server.

            tools_response = await self._session.list_tools()

            self._available_tools = [

                {

                    "name": tool.name,

                    "description": tool.description,

                    "input_schema": tool.inputSchema

                }

                for tool in tools_response.tools

            ]


            logger.info(

                f"Connected to MCP server '{self.server_name}'. "

                f"Available tools: "

                f"{[t['name'] for t in self._available_tools]}"

            )


        except Exception as e:

            logger.error(

                f"Failed to connect to MCP server '{self.server_name}': {e}"

            )

            raise


    async def disconnect(self):

        """

        Disconnect from the MCP server and clean up resources.


        Safely exits both context managers regardless of whether

        connect() completed successfully.

        """

        try:

            if self._session_context is not None:

                await self._session_context.__aexit__(None, None, None)

                self._session_context = None

            if self._stdio_context is not None:

                await self._stdio_context.__aexit__(None, None, None)

                self._stdio_context = None

            self._session = None

            logger.info(

                f"Disconnected from MCP server '{self.server_name}'."

            )

        except Exception as e:

            logger.error(

                f"Error disconnecting from MCP server "

                f"'{self.server_name}': {e}"

            )


    def get_available_tools(self) -> List[Dict]:

        """

        Return the list of tools available on this MCP server.


        Returns:

            A list of tool descriptor dicts, each with 'name',

            'description', and 'input_schema' keys.

        """

        return self._available_tools


    async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str:

        """

        Call a tool on the connected MCP server.


        Args:

            tool_name:  The name of the tool to call.

            arguments:  A dictionary of arguments to pass to the tool.


        Returns:

            The tool's response as a string.


        Raises:

            RuntimeError: If connect() has not been called successfully.

        """

        if self._session is None:

            raise RuntimeError(

                f"Not connected to MCP server '{self.server_name}'. "

                f"Call connect() before calling tools."

            )


        logger.debug(

            f"Calling tool '{tool_name}' on server '{self.server_name}' "

            f"with args: {arguments}"

        )


        try:

            result = await self._session.call_tool(tool_name, arguments)


            # Extract the text content from the MCP result object.

            if result.content:

                return "\n".join(

                    item.text

                    for item in result.content

                    if hasattr(item, "text")

                )

            return "Tool returned no content."


        except Exception as e:

            error_msg = (

                f"Error calling tool '{tool_name}' on server "

                f"'{self.server_name}': {str(e)}"

            )

            logger.error(error_msg)

            return error_msg


Now let us build the MCP server registry, which manages connections to all configured MCP servers and provides a unified interface for tool discovery and invocation:


# mcp_client/mcp_server_registry.py

#

# The MCP Server Registry for Octopussy Light.

#

# Manages connections to all configured MCP servers and provides a

# unified interface for agents to discover and call tools regardless

# of which server hosts them.


from typing import Any, Dict, List


from mcp_client.mcp_tool_client import MCPToolClient

from utils.logger import get_logger


logger = get_logger("mcp_registry", "system")



class MCPServerRegistry:

    """

    Manages all MCP server connections in Octopussy Light.


    Provides a single point of access for all tool capabilities,

    regardless of whether they come from local or remote MCP servers.

    Agents query the registry to discover available tools and call them.

    """


    def __init__(self, mcp_server_configs: List[Dict]):

        """

        Initialize the registry with server configurations.


        Args:

            mcp_server_configs: A list of server configuration dicts

                                from the config.yaml mcp_servers section.

        """

        self._server_configs = mcp_server_configs

        self._clients: Dict[str, MCPToolClient] = {}

        self._tool_index: Dict[str, str] = {}  # tool_name -> server_name

        logger.info(

            f"MCP server registry initialized with "

            f"{len(mcp_server_configs)} server(s)."

        )


    async def connect_all(self):

        """

        Connect to all configured MCP servers and discover their tools.


        Should be called once at system startup before any agents begin

        processing tasks. Servers that fail to connect are skipped with

        an error log; the system continues with the remaining servers.

        """

        for server_config in self._server_configs:

            name = server_config.get("name", "unnamed")

            server_type = server_config.get("type", "local")


            try:

                if server_type == "local":

                    command = server_config.get("command", [])

                    client = MCPToolClient(command, server_name=name)

                    await client.connect()

                    self._clients[name] = client


                    # Index all tools from this server. If two servers

                    # expose a tool with the same name, the later server

                    # takes precedence and a warning is logged.

                    for tool in client.get_available_tools():

                        tool_name = tool["name"]

                        if tool_name in self._tool_index:

                            logger.warning(

                                f"Tool '{tool_name}' from server '{name}' "

                                f"conflicts with existing tool from "

                                f"'{self._tool_index[tool_name]}'. "

                                f"The new one will take precedence."

                            )

                        self._tool_index[tool_name] = name


                elif server_type == "http":

                    logger.info(

                        f"HTTP MCP server '{name}' configured. HTTP "

                        f"transport support requires additional setup "

                        f"beyond this article's scope. Skipping."

                    )


                else:

                    logger.warning(

                        f"Unknown MCP server type '{server_type}' "

                        f"for server '{name}'. Skipping."

                    )


            except Exception as e:

                logger.error(

                    f"Failed to connect to MCP server '{name}': {e}"

                )


        logger.info(

            f"MCP registry ready. Available tools: "

            f"{list(self._tool_index.keys())}"

        )


    async def disconnect_all(self):

        """Disconnect from all connected MCP servers."""

        for name, client in self._clients.items():

            try:

                await client.disconnect()

            except Exception as e:

                logger.error(

                    f"Error disconnecting from server '{name}': {e}"

                )


    def get_all_tools(self) -> List[Dict]:

        """

        Return a list of all available tools across all connected servers.


        Returns:

            A list of tool descriptor dicts with 'name', 'description',

            and 'input_schema' keys.

        """

        all_tools = []

        for client in self._clients.values():

            all_tools.extend(client.get_available_tools())

        return all_tools


    async def call_tool(

        self,

        tool_name: str,

        arguments: Dict[str, Any]

    ) -> str:

        """

        Call a tool by name, routing to the correct server automatically.


        Args:

            tool_name:  The name of the tool to call.

            arguments:  Arguments to pass to the tool.


        Returns:

            The tool's response as a string.

        """

        server_name = self._tool_index.get(tool_name)

        if not server_name:

            available = list(self._tool_index.keys())

            return (

                f"Error: Tool '{tool_name}' not found in any connected "

                f"MCP server. Available tools: {available}"

            )


        client = self._clients.get(server_name)

        if not client:

            return (

                f"Error: Server '{server_name}' is registered in the "

                f"tool index but has no active client connection."

            )


        return await client.call_tool(tool_name, arguments)


With the registry in place, we now update the WorkerAgent to implement the agentic ReAct loop. The worker can now call tools, observe their results, and continue reasoning until it reaches a final answer:


# agents/worker_agent.py (updated for Increment 6 - with MCP tool support)

#

# The key addition is the _execute_with_tools() method, which implements

# an agentic loop: the LLM can call tools, observe results, and continue

# reasoning until it produces a final answer.


import asyncio

import json

import uuid

from typing import List, Dict, Optional


import chromadb


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from llm.llm_client import LLMClient

from utils.logger import get_logger


WORKER_SYSTEM_PROMPT = """

You are a focused, capable worker agent in the Octopussy Light system.

You receive tasks from the coordinator and execute them using available tools.


When you need to use a tool, respond with a JSON object in this format:

{"tool_call": {"name": "<tool_name>", "arguments": {<arguments>}}}


When you have gathered enough information and are ready to provide your

final answer, respond with plain text (not JSON).


Be thorough, accurate, and efficient. If you cannot complete a task,

explain clearly why.

"""



class WorkerAgent(BaseAgent):

    """

    A worker agent that executes tasks using LLM reasoning and MCP tools.


    The worker implements the ReAct agentic loop: it reasons about the

    task, calls tools as needed, observes results, and continues until

    it produces a final answer. If no MCP registry is provided, the

    worker falls back to LLM-only reasoning (as in Increment 5).

    """


    def __init__(

        self,

        worker_id: str,

        config: dict,

        bus: MessageBus,

        mcp_registry=None

    ):

        """

        Initialize a worker agent.


        Args:

            worker_id:    A unique identifier for this worker instance.

            config:       The application configuration dictionary.

            bus:          The shared MessageBus instance.

            mcp_registry: Optional MCPServerRegistry providing tool access.

                          If None, the worker uses LLM-only reasoning.

        """

        name = f"worker_{worker_id}"

        super().__init__(

            name=name,

            bus=bus,

            config=config,

            color_key="worker"

        )


        self.worker_id = worker_id

        self.mcp_registry = mcp_registry

        llm_config = config["llm"]


        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["worker_model"],

            system_prompt=WORKER_SYSTEM_PROMPT

        )


        self.short_term_memory: List[Dict[str, str]] = []

        self._init_long_term_memory()

        self.current_task_id: Optional[str] = None

        self.is_busy: bool = False


        self.logger.info(f"Worker '{name}' initialized.")


    def _init_long_term_memory(self):

        """Initialize ChromaDB for long-term memory."""

        try:

            self.chroma_client = chromadb.PersistentClient(

                path=f"./memory/worker_{self.worker_id}"

            )

            self.memory_collection = self.chroma_client.get_or_create_collection(

                name=f"worker_{self.worker_id}_memory"

            )

        except Exception as e:

            self.logger.error(f"Failed to initialize long-term memory: {e}")

            self.memory_collection = None


    def clear_short_term_memory(self):

        """Clear short-term memory for a new task."""

        self.short_term_memory = []


    async def store_in_long_term_memory(

        self, content: str, metadata: dict = None

    ):

        """Store information in long-term memory."""

        if self.memory_collection is None:

            return

        try:

            self.memory_collection.add(

                documents=[content],

                ids=[str(uuid.uuid4())],

                metadatas=[metadata or {}]

            )

        except Exception as e:

            self.logger.error(f"Failed to store in long-term memory: {e}")


    async def retrieve_from_long_term_memory(

        self, query: str, n_results: int = 3

    ) -> List[str]:

        """Retrieve relevant memories from long-term storage."""

        if self.memory_collection is None:

            return []

        try:

            count = self.memory_collection.count()

            if count == 0:

                return []

            results = self.memory_collection.query(

                query_texts=[query],

                n_results=min(n_results, count)

            )

            return results.get("documents", [[]])[0]

        except Exception as e:

            self.logger.error(f"Failed to retrieve from long-term memory: {e}")

            return []


    async def handle_message(self, message: Message):

        """Handle incoming messages."""

        if message.message_type == MessageType.ASSIGN_TASK:

            await self._handle_task_assignment(message)

        elif message.message_type == MessageType.SHUTDOWN_AGENT:

            await self.stop()


    async def _handle_task_assignment(self, message: Message):

        """Process a task assignment from the coordinator."""

        payload = message.payload

        task_id = payload.get("task_id", str(uuid.uuid4()))

        task_description = payload.get("task_description", "")


        self.current_task_id = task_id

        self.is_busy = True


        self.logger.info(

            f"Received task {task_id}: {task_description[:60]}..."

        )


        self.clear_short_term_memory()


        # Load relevant long-term memories.

        memories = await self.retrieve_from_long_term_memory(task_description)

        if memories:

            memory_context = (

                "Relevant information from previous tasks:\n"

                + "\n".join(f"- {m}" for m in memories)

            )

            self.short_term_memory.append({

                "role": "system",

                "content": memory_context

            })


        # Build the task context as a single user message that includes

        # both the tool descriptions and the task itself. This is more

        # compatible with LLMs that only support a single system message,

        # and ensures the tool list is clearly associated with the task.

        if self.mcp_registry:

            tools = self.mcp_registry.get_all_tools()

            if tools:

                tool_descriptions = "\n".join(

                    f"- {t['name']}: {t['description']}"

                    for t in tools

                )

                task_context = (

                    f"Available tools:\n{tool_descriptions}\n\n"

                    f"Task: {task_description}"

                )

            else:

                task_context = task_description

        else:

            task_context = task_description


        self.short_term_memory.append({

            "role": "user",

            "content": task_context

        })


        try:

            result = await self._execute_with_tools(task_id)


            await self.store_in_long_term_memory(

                f"Task: {task_description}\nResult: {result}",

                metadata={"task_id": task_id}

            )


            await self._send_result(task_id, result, response_number=1)


        except Exception as e:

            error_msg = f"Task execution failed: {str(e)}"

            self.logger.error(error_msg)

            await self._send_error(task_id, error_msg)

        finally:

            self.is_busy = False


    async def _execute_with_tools(self, task_id: str) -> str:

        """

        Execute a task using the ReAct agentic loop with tool calling.


        The worker reasons about the task, calls tools as needed,

        observes results, and continues until it produces a final answer.

        The loop runs for a maximum of 10 iterations to prevent runaway

        execution. If no MCP registry is available, the first LLM response

        is returned directly as the final answer.


        Args:

            task_id: The task ID for logging purposes.


        Returns:

            The final answer as a string.

        """

        max_iterations = 10

        # Initialize response to a safe default so it is always defined

        # when referenced after the loop, even if the loop body is

        # somehow skipped (which cannot happen with max_iterations > 0,

        # but the initialization makes the code's intent explicit).

        response = ""


        for iteration in range(max_iterations):

            self.logger.debug(

                f"Task {task_id}, iteration {iteration + 1}/{max_iterations}"

            )


            response = await self.llm.chat(self.short_term_memory)


            # Check if the response is a tool call.

            try:

                parsed = json.loads(response.strip())

                if "tool_call" in parsed and self.mcp_registry:

                    tool_name = parsed["tool_call"]["name"]

                    arguments = parsed["tool_call"].get("arguments", {})


                    self.logger.info(

                        f"Task {task_id}: calling tool '{tool_name}' "

                        f"with args: {arguments}"

                    )


                    # Call the tool via the MCP registry.

                    tool_result = await self.mcp_registry.call_tool(

                        tool_name, arguments

                    )


                    # Add the tool call and its result to short-term memory

                    # so the LLM can reason about the outcome.

                    self.short_term_memory.append({

                        "role": "assistant",

                        "content": response

                    })

                    self.short_term_memory.append({

                        "role": "user",

                        "content": (

                            f"Tool '{tool_name}' returned:\n{tool_result}"

                        )

                    })


                    # Continue the loop to let the LLM reason about the result.

                    continue


            except (json.JSONDecodeError, KeyError, TypeError):

                # Not a valid tool call JSON; treat as a final answer.

                pass


            # The response is plain text: this is the final answer.

            self.short_term_memory.append({

                "role": "assistant",

                "content": response

            })

            return response


        # If we exhaust all iterations without a plain-text response,

        # return whatever the last LLM response was.

        self.logger.warning(

            f"Task {task_id} hit the iteration limit of {max_iterations}. "

            f"Returning last response."

        )

        return (

            f"Task reached the maximum reasoning iteration limit. "

            f"Partial result:\n{response}"

        )


    async def _send_result(

        self, task_id: str, result: str, response_number: int = 1

    ):

        """Send a task result to the coordinator."""

        msg = Message(

            message_type=MessageType.TASK_RESULT,

            sender=self.name,

            recipient="coordinator",

            task_id=task_id,

            payload={

                "task_id": task_id,

                "worker_id": self.worker_id,

                "result": result,

                "response_number": response_number

            }

        )

        await self.bus.send(msg)


    async def _send_error(self, task_id: str, error_message: str):

        """Send an error message to the coordinator."""

        msg = Message(

            message_type=MessageType.TASK_ERROR,

            sender=self.name,

            recipient="coordinator",

            task_id=task_id,

            payload={

                "task_id": task_id,

                "worker_id": self.worker_id,

                "error": error_message

            }

        )

        await self.bus.send(msg)


We also need to update the WorkerManager to accept and forward the MCP registry to newly created workers:


# coordinator/worker_manager.py (updated for Increment 6 - with MCP registry)


import asyncio

import uuid

from typing import Dict, List, Optional


from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from coordinator.prompt_optimizer import PromptOptimizer

from utils.logger import get_logger


logger = get_logger("worker_manager", "coordinator")



class WorkerManager:

    """

    Manages the lifecycle of worker agents in Octopussy Light.

    Now accepts an optional MCP registry and passes it to each worker

    so they can call predefined tools during task execution.

    """


    MAX_IDLE_WORKERS = 3


    def __init__(

        self,

        config: dict,

        bus: MessageBus,

        mcp_registry=None

    ):

        """

        Initialize the worker manager.


        Args:

            config:       The application configuration dictionary.

            bus:          The shared MessageBus instance.

            mcp_registry: Optional MCPServerRegistry. When provided, every

                          new worker is given access to the registry so it

                          can call predefined MCP tools during execution.

        """

        self.config = config

        self.bus = bus

        self.mcp_registry = mcp_registry

        self.prompt_optimizer = PromptOptimizer(config["llm"])

        self._workers: Dict[str, object] = {}

        logger.info("Worker manager initialized.")


    def _get_idle_workers(self) -> List[object]:

        """Return all workers that are not currently busy."""

        return [w for w in self._workers.values() if not w.is_busy]


    async def assign_task(self, task_description: str) -> str:

        """

        Assign a task to a worker, reusing an idle one if available.

        New workers are created with access to the MCP registry.

        """

        # Import here to avoid circular imports at module load time.

        from agents.worker_agent import WorkerAgent


        task_id = str(uuid.uuid4())[:8]


        optimized_description = await self.prompt_optimizer.optimize(

            task_description

        )


        idle_workers = self._get_idle_workers()

        if idle_workers:

            worker = idle_workers[0]

            logger.info(

                f"Reusing idle worker '{worker.name}' for task {task_id}."

            )

        else:

            worker_id = str(uuid.uuid4())[:6]

            worker = WorkerAgent(

                worker_id,

                self.config,

                self.bus,

                mcp_registry=self.mcp_registry

            )

            self._workers[worker.name] = worker

            asyncio.create_task(worker.start())

            logger.info(

                f"Created new worker '{worker.name}' for task {task_id}."

            )


        task_message = Message(

            message_type=MessageType.ASSIGN_TASK,

            sender="coordinator",

            recipient=worker.name,

            task_id=task_id,

            payload={

                "task_id": task_id,

                "task_description": optimized_description

            }

        )

        await self.bus.send(task_message)

        await self._cleanup_idle_workers()


        return task_id


    async def _cleanup_idle_workers(self):

        """Shut down excess idle workers."""

        idle_workers = self._get_idle_workers()

        excess_count = len(idle_workers) - self.MAX_IDLE_WORKERS


        if excess_count <= 0:

            return


        for worker in idle_workers[:excess_count]:

            shutdown_msg = Message(

                message_type=MessageType.SHUTDOWN_AGENT,

                sender="coordinator",

                recipient=worker.name

            )

            await self.bus.send(shutdown_msg)

            self.bus.unregister(worker.name)

            del self._workers[worker.name]

            logger.info(f"Shut down idle worker '{worker.name}'.")


The coordinator also needs to accept the MCP registry and pass it to the WorkerManager. Here is the complete updated coordinator for Increment 6:


# coordinator/coordinator_agent.py (Increment 6 - complete file)

#

# The coordinator now accepts an optional mcp_registry parameter and

# passes it to the WorkerManager so workers can use predefined tools.

# The system prompt is also updated to mention data retrieval tasks.


import asyncio

import sys

import json

from typing import List, Dict, Optional


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from coordinator.worker_manager import WorkerManager

from llm.llm_client import LLMClient

from utils.logger import get_logger


logger = get_logger("coordinator", "coordinator")


COORDINATOR_SYSTEM_PROMPT = """

You are the coordinator of an agentic AI system called Octopussy Light.

Your job is to understand the user's requests, coordinate specialized

agents to fulfill those requests, and present results clearly.


When the user asks you to perform a task (something that requires research,

analysis, writing, coding, data retrieval, or other substantive work),

respond with a JSON object in this exact format:

{"action": "create_task", "task_description": "<the task>"}


For conversational messages, questions about the system, or simple requests

that you can answer directly from your knowledge, respond in plain text.


You are friendly, precise, and efficient.

"""



class CoordinatorAgent(BaseAgent):

    """

    The central coordinator of Octopussy Light.

    Handles user communication and coordinates all other agents.

    """


    def __init__(

        self,

        config: dict,

        bus: MessageBus,

        mcp_registry=None

    ):

        super().__init__(

            name="coordinator",

            bus=bus,

            config=config,

            color_key="coordinator"

        )


        llm_config = config["llm"]


        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["coordinator_model"],

            system_prompt=COORDINATOR_SYSTEM_PROMPT

        )


        self.conversation_history: List[Dict[str, str]] = []


        # Pass the MCP registry to the worker manager so every worker

        # it creates will have access to the predefined tools.

        self.worker_manager = WorkerManager(

            config, bus, mcp_registry=mcp_registry

        )


        self.logger.info(

            f"Coordinator initialized with model: "

            f"{llm_config['coordinator_model']}"

        )


    async def handle_message(self, message: Message):

        """Handle messages from other agents."""

        if message.message_type == MessageType.MAIL_NOTIFICATION:

            await self._display_mail_notification(message.payload)

        elif message.message_type == MessageType.CALENDAR_NOTIFICATION:

            await self._display_calendar_notification(message.payload)

        elif message.message_type == MessageType.TASK_RESULT:

            await self._display_task_result(message.payload)

        elif message.message_type == MessageType.TASK_ERROR:

            await self._display_task_error(message.payload)

        elif message.message_type == MessageType.AGENT_NOTIFICATION:

            await self._display_agent_notification(message.payload)


    async def _display_mail_notification(self, payload: dict):

        print(

            f"\n[MAIL] From: {payload.get('sender', 'Unknown')}\n"

            f"       Subject: {payload.get('subject', 'No subject')}\n"

            f"       Summary: {payload.get('summary', '')}\n"

        )


    async def _display_calendar_notification(self, payload: dict):

        print(

            f"\n[CALENDAR] Event: {payload.get('title', 'Untitled')}\n"

            f"           When: {payload.get('start_time', 'Unknown')}\n"

            f"           Location: {payload.get('location', 'Not specified')}\n"

        )


    async def _display_task_result(self, payload: dict):

        task_id = payload.get("task_id", "unknown")

        response_num = payload.get("response_number", 1)

        result = payload.get("result", "")

        print(

            f"\n[TASK RESULT] Task {task_id} "

            f"(response #{response_num}):\n{result}\n"

        )


    async def _display_task_error(self, payload: dict):

        task_id = payload.get("task_id", "unknown")

        error = payload.get("error", "Unknown error")

        print(f"\n[TASK ERROR] Task {task_id} failed: {error}\n")


    async def _display_agent_notification(self, payload: dict):

        notification_type = payload.get("notification_type", "")

        if notification_type == "tool_created":

            print(

                f"\n[TOOL AGENT] New tool created: "

                f"{payload.get('tool_description', '')}\n"

            )

        elif notification_type == "tool_creation_failed":

            print(

                f"\n[TOOL AGENT] Tool creation failed: "

                f"{payload.get('reason', 'Unknown reason')}\n"

            )

        else:

            print(f"\n[AGENT] {payload.get('message', str(payload))}\n")


    async def chat(self, user_input: str) -> str:

        """

        Process user input and route to the appropriate handler.

        """

        self.conversation_history.append({

            "role": "user",

            "content": user_input

        })


        response = await self.llm.chat(self.conversation_history)


        try:

            parsed = json.loads(response.strip())

            action = parsed.get("action")


            if action == "create_task":

                task_desc = parsed.get("task_description", user_input)

                task_id = await self.worker_manager.assign_task(task_desc)

                reply = (

                    f"Task assigned (ID: {task_id}). "

                    f"A worker is now executing it. "

                    f"Results will appear shortly."

                )

                self.conversation_history.append({

                    "role": "assistant",

                    "content": reply

                })

                return reply


        except (json.JSONDecodeError, AttributeError):

            pass


        self.conversation_history.append({

            "role": "assistant",

            "content": response

        })

        return response


    async def run_background_tasks(self):

        """Read user input asynchronously."""

        loop = asyncio.get_running_loop()

        print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")


        while self.running:

            try:

                print("You: ", end="", flush=True)

                user_input = await loop.run_in_executor(

                    None, sys.stdin.readline

                )

                user_input = user_input.strip()


                if not user_input:

                    continue


                if user_input.lower() in ("quit", "exit"):

                    self.running = False

                    print("Goodbye!")

                    break


                response = await self.chat(user_input)

                print(f"\nCoordinator: {response}\n")


            except Exception as e:

                self.logger.error(f"Error in input loop: {e}")


Now update main.py to start the MCP server registry and wire it into the coordinator:


# main.py (Increment 6)

import asyncio

import yaml

import sys

from messaging.message_bus import MessageBus

from coordinator.coordinator_agent import CoordinatorAgent

from agents.mail_agent import MailAgent

from agents.calendar_agent import CalendarAgent

from mcp_client.mcp_server_registry import MCPServerRegistry

from utils.logger import get_logger

logger = get_logger("system", "system")

def load_config(path: str = "config.yaml") -> dict:

   """Load and parse the YAML configuration file."""

   try:

       with open(path, "r") as f:

           return yaml.safe_load(f)

   except FileNotFoundError:

       logger.error(f"Configuration file not found: {path}")

       sys.exit(1)

async def main():

   """

   Start Octopussy Light with coordinator, mail, calendar agents,

   and the MCP built-in tools server connected to all workers.

   """

   logger.info("Starting Octopussy Light...")

   config = load_config()

   bus = MessageBus()

   # Initialize and connect the MCP server registry.

   mcp_configs = config.get("mcp_servers", [])

   mcp_registry = MCPServerRegistry(mcp_configs)

   logger.info("Connecting to MCP servers...")

   await mcp_registry.connect_all()

   logger.info("MCP servers connected.")

   # Pass the registry to the coordinator so workers get tool access.

   coordinator = CoordinatorAgent(config, bus, mcp_registry=mcp_registry)

   mail_agent = MailAgent(config, bus)

   calendar_agent = CalendarAgent(config, bus)

   all_agents = [coordinator, mail_agent, calendar_agent]

   try:

       await asyncio.gather(

           *[agent.start() for agent in all_agents]

       )

   finally:

       for agent in all_agents:

           await agent.stop()

       await mcp_registry.disconnect_all()

       logger.info("Octopussy Light has shut down cleanly.")

if __name__ == "__main__":

   try:

       asyncio.run(main())

   except KeyboardInterrupt:

       print("\nShutting down...")


Let us trace through what happens when a worker now uses a predefined tool. The user asks the coordinator to analyze a file. The coordinator delegates to a worker. The worker's _execute_with_tools() loop runs: the LLM sees the task and the list of available tools, decides to call read_file, and responds with the JSON tool-call format. The worker calls the MCP registry, which routes the call to the builtin tools server process via STDIO, receives the file contents, adds them to the worker's short-term memory, and lets the LLM reason about the content to produce the final answer. The user sees:


You: Please analyze the file /tmp/sales_data.csv and give me a summary.


Coordinator: Task assigned (ID: f3a9b1c2). A worker is now executing it.

Results will appear shortly.


[worker_a1b2c3] 10:00:16 INFO: Task f3a9b1c2: calling tool 'read_file'

with args: {'file_path': '/tmp/sales_data.csv'}


[TASK RESULT] Task f3a9b1c2 (response #1):

I have analyzed the sales data file. Here is the summary:


The CSV file contains 1,247 rows of sales transactions from Q3 2025.

Total revenue: $2,847,392. Top performing product: Widget Pro (34% of

revenue). The Western region outperformed all others by 23%. There is

a notable spike in sales during the week of September 15th, likely

corresponding to a promotional event. Average transaction value: $2,283.


Workers now have real-world capabilities through the four predefined tools. The next step is to give the system the ability to grow its own toolset dynamically.


CHAPTER EIGHT: INCREMENT SEVEN - THE TOOL AGENT


We have now built a working multi-agent system with mail monitoring, calendar monitoring, on-demand worker agents, and a set of predefined MCP tools. But those tools are fixed. If a worker needs to fetch weather data, call a company API, or perform a domain-specific calculation, no predefined tool exists for that. This is where the tool agent comes in.


The tool agent can dynamically create new MCP tools on demand. When an agent needs a capability that does not yet exist, it can request the tool agent to create one. The tool agent generates the tool code using an LLM, tests it in a sandboxed environment, and if the tests pass, adds the tool to the system's built-in tools server. The new tool is then available to all future workers.


# agents/tool_agent.py

#

# The Tool Agent for Octopussy Light.

#

# The tool agent can dynamically create new MCP tools on demand.

# When an agent needs a capability that does not exist, it can

# request the tool agent to create one. The tool agent:

#   1. Uses an LLM to generate the tool code.

#   2. Tests the generated code in a sandboxed subprocess.

#   3. If tests pass, adds the tool to the built-in tools server.

#   4. Notifies the coordinator that the tool is available.


import asyncio

import os

import subprocess

import tempfile

import textwrap

from typing import Dict, Optional


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from llm.llm_client import LLMClient

from utils.logger import get_logger


TOOL_AGENT_SYSTEM_PROMPT = """

You are a Python tool creation expert. Your job is to create MCP (Model

Context Protocol) tool functions for the Octopussy Light system.


When asked to create a tool, you must respond with ONLY valid Python code

that defines a single function decorated with @mcp.tool(). The function

must have:

  - A clear, descriptive name (snake_case).

  - Type annotations for all parameters and the return value.

  - A comprehensive docstring explaining what the tool does, its parameters,

    and what it returns.

  - Robust error handling with try/except blocks.

  - A return type of str (tools always return strings in MCP).


Do not include any imports, class definitions, or other code. Only the

function definition with the @mcp.tool() decorator.


Example of correct output:


@mcp.tool()

def get_current_weather(city: str) -> str:

    \"\"\"

    Get the current weather for a specified city.


    Args:

        city: The name of the city to get weather for.


    Returns:

        A string describing the current weather conditions.

    \"\"\"

    try:

        import requests

        response = requests.get(

            f"https://wttr.in/{city}?format=3",

            timeout=5

        )

        return response.text

    except Exception as e:

        return f"Error fetching weather: {str(e)}"

"""



class ToolAgent(BaseAgent):

    """

    Dynamically creates new MCP tools for the Octopussy Light system.


    When an agent needs a capability that does not exist, the tool agent

    generates, tests, and deploys new tools without requiring manual

    code changes or system restarts.

    """


    def __init__(self, config: dict, bus: MessageBus):

        super().__init__(

            name="tool_agent",

            bus=bus,

            config=config,

            color_key="tool_agent"

        )


        llm_config = config["llm"]

        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["coordinator_model"],

            system_prompt=TOOL_AGENT_SYSTEM_PROMPT

        )


        # Path to the built-in tools server file where new tools are added.

        self.tools_server_path = "mcp_server/builtin_tools_server.py"


        self.logger.info("Tool agent initialized.")


    async def handle_message(self, message: Message):

        """

        Handle tool creation requests from other agents.


        The expected payload format is:

        {

            "request_type": "create_tool",

            "tool_description": "A description of what the tool should do.",

            "requester": "The name of the agent that needs the tool."

        }

        """

        if message.message_type == MessageType.AGENT_NOTIFICATION:

            payload = message.payload

            if payload.get("request_type") == "create_tool":

                await self._handle_tool_creation_request(message)

        else:

            self.logger.debug(

                f"Tool agent received unhandled message: {message.message_type}"

            )


    async def _handle_tool_creation_request(self, message: Message):

        """

        Process a tool creation request.


        Generates tool code, tests it, and if successful, adds it to

        the built-in tools server.

        """

        payload = message.payload

        tool_description = payload.get("tool_description", "")

        requester = payload.get("requester", "unknown")


        self.logger.info(

            f"Tool creation requested by '{requester}': "

            f"{tool_description[:60]}..."

        )


        # Step 1: Generate the tool code using the LLM.

        tool_code = await self._generate_tool_code(tool_description)

        if not tool_code:

            await self._notify_failure(

                requester, tool_description, "LLM failed to generate tool code."

            )

            return


        # Step 2: Test the generated code in a sandbox.

        test_result = await self._test_tool_in_sandbox(tool_code)

        if not test_result["success"]:

            self.logger.warning(

                f"Generated tool failed sandbox test: {test_result['error']}"

            )

            # Attempt to fix the code once before giving up.

            tool_code = await self._fix_tool_code(tool_code, test_result["error"])

            test_result = await self._test_tool_in_sandbox(tool_code)


            if not test_result["success"]:

                await self._notify_failure(

                    requester,

                    tool_description,

                    f"Tool failed sandbox tests: {test_result['error']}"

                )

                return


        # Step 3: Add the tool to the built-in tools server.

        success = await self._add_tool_to_server(tool_code)

        if not success:

            await self._notify_failure(

                requester, tool_description, "Failed to add tool to server."

            )

            return


        self.logger.info(

            f"Successfully created and deployed new tool for: "

            f"{tool_description[:60]}"

        )


        # Step 4: Notify the coordinator that the tool is ready.

        notification = Message(

            message_type=MessageType.AGENT_NOTIFICATION,

            sender=self.name,

            recipient="coordinator",

            payload={

                "notification_type": "tool_created",

                "tool_description": tool_description,

                "requester": requester,

                "message": (

                    f"A new tool has been created and is now available: "

                    f"{tool_description}"

                )

            }

        )

        await self.bus.send(notification)


    async def _generate_tool_code(self, tool_description: str) -> Optional[str]:

        """

        Use the LLM to generate Python code for a new MCP tool.


        Args:

            tool_description: A natural language description of the tool.


        Returns:

            The generated Python function code, or None on failure.

        """

        try:

            code = await self.llm.chat([

                {

                    "role": "user",

                    "content": (

                        f"Create an MCP tool function for the following "

                        f"purpose:\n\n{tool_description}\n\n"

                        f"Remember: return ONLY the function definition with "

                        f"the @mcp.tool() decorator. No imports, no other code."

                    )

                }

            ])


            # Strip any markdown code fences the LLM may have added.

            # We strip trailing whitespace first so that a fence like

            # "```\n" at the end is reliably detected as "```".

            code = code.strip()

            if code.startswith("```"):

                lines = code.splitlines()

                # Find the last line that is a closing fence.

                end_index = len(lines)

                for i in range(len(lines) - 1, 0, -1):

                    if lines[i].strip() == "```":

                        end_index = i

                        break

                # Skip the opening fence line (e.g. "```python") and

                # the closing fence line.

                code = "\n".join(lines[1:end_index]).strip()


            return code if code else None


        except Exception as e:

            self.logger.error(f"LLM tool generation failed: {e}")

            return None


    async def _test_tool_in_sandbox(self, tool_code: str) -> Dict:

        """

        Test the generated tool code in a sandboxed subprocess.


        Checks that the code is syntactically valid Python, defines

        exactly one function, and that the function has a docstring.


        The tool code is written to its own temporary file and the

        path is passed to the test script via a command-line argument.

        This avoids any string-embedding issues (such as triple-quote

        collisions) that would arise from interpolating the code

        directly into the test script source.


        Args:

            tool_code: The generated Python function code.


        Returns:

            A dict with 'success' (bool) and 'error' (str) keys.

        """

        # The test script reads the tool code from a file whose path

        # is passed as sys.argv[1], completely avoiding any quoting or

        # escaping issues with the tool code content.

        test_script = textwrap.dedent("""\

            import ast

            import sys


            # Read the tool code from the file path passed as an argument.

            tool_code_path = sys.argv[1]

            with open(tool_code_path, "r", encoding="utf-8") as f:

                tool_code = f.read()


            # Test 1: Check that the code is valid Python syntax.

            try:

                tree = ast.parse(tool_code)

            except SyntaxError as e:

                print(f"SYNTAX_ERROR: {e}")

                sys.exit(1)


            # Test 2: Check that the code defines exactly one function.

            functions = [

                node for node in ast.walk(tree)

                if isinstance(node, ast.FunctionDef)

            ]

            if len(functions) != 1:

                print(

                    f"STRUCTURE_ERROR: Expected 1 function, "

                    f"found {len(functions)}"

                )

                sys.exit(1)


            # Test 3: Check that the function has a docstring.

            func = functions[0]

            has_docstring = (

                func.body

                and isinstance(func.body[0], ast.Expr)

                and isinstance(func.body[0].value, ast.Constant)

                and isinstance(func.body[0].value.value, str)

            )

            if not has_docstring:

                print("DOCSTRING_ERROR: Function must have a docstring.")

                sys.exit(1)


            print("TESTS_PASSED")

        """)


        loop = asyncio.get_running_loop()


        def _run_test():

            # Both temp file paths are initialized to None before the

            # try block so the finally clause can safely reference them

            # even if file creation fails partway through.

            tool_code_path = None

            test_script_path = None

            try:

                # Write the tool code to its own temp file.

                with tempfile.NamedTemporaryFile(

                    mode="w",

                    suffix=".py",

                    delete=False,

                    encoding="utf-8"

                ) as tool_tmp:

                    tool_tmp.write(tool_code)

                    tool_code_path = tool_tmp.name


                # Write the test script to a separate temp file.

                with tempfile.NamedTemporaryFile(

                    mode="w",

                    suffix=".py",

                    delete=False,

                    encoding="utf-8"

                ) as script_tmp:

                    script_tmp.write(test_script)

                    test_script_path = script_tmp.name


                result = subprocess.run(

                    ["python", test_script_path, tool_code_path],

                    capture_output=True,

                    text=True,

                    timeout=15,

                    env={"PATH": os.environ.get("PATH", "")}

                )


                if "TESTS_PASSED" in result.stdout:

                    return {"success": True, "error": ""}

                else:

                    error = result.stdout.strip() or result.stderr.strip()

                    return {"success": False, "error": error}


            except subprocess.TimeoutExpired:

                return {"success": False, "error": "Sandbox test timed out."}

            except Exception as e:

                return {"success": False, "error": str(e)}

            finally:

                for path in (tool_code_path, test_script_path):

                    if path is not None:

                        try:

                            os.unlink(path)

                        except Exception:

                            pass


        return await loop.run_in_executor(None, _run_test)


    async def _fix_tool_code(

        self,

        original_code: str,

        error_message: str

    ) -> str:

        """

        Ask the LLM to fix a tool that failed sandbox testing.


        Args:

            original_code:  The code that failed.

            error_message:  The error message from the sandbox test.


        Returns:

            The (hopefully) fixed code.

        """

        try:

            fixed = await self.llm.chat([

                {

                    "role": "user",

                    "content": (

                        f"This MCP tool code failed with the following error:\n\n"

                        f"ERROR: {error_message}\n\n"

                        f"ORIGINAL CODE:\n{original_code}\n\n"

                        f"Please fix the code. Return ONLY the corrected "

                        f"function definition with the @mcp.tool() decorator."

                    )

                }

            ])

            fixed = fixed.strip()

            if fixed.startswith("```"):

                lines = fixed.splitlines()

                end_index = len(lines)

                for i in range(len(lines) - 1, 0, -1):

                    if lines[i].strip() == "```":

                        end_index = i

                        break

                fixed = "\n".join(lines[1:end_index]).strip()

            return fixed if fixed else original_code

        except Exception as e:

            self.logger.error(f"Tool fix attempt failed: {e}")

            return original_code


    async def _add_tool_to_server(self, tool_code: str) -> bool:

        """

        Append the new tool code to the built-in tools server file.


        The appended tool will be available after the MCP server process

        is restarted. The MCPServerRegistry holds a live connection to

        the currently running server process; a full restart-and-reconnect

        cycle is required to expose the new tool to agents at runtime.

        This is left as a production enhancement; for Octopussy Light,

        the new tool is available from the next system startup.


        Args:

            tool_code: The validated tool function code to add.


        Returns:

            True if the tool was successfully appended, False otherwise.

        """

        try:

            with open(self.tools_server_path, "a", encoding="utf-8") as f:

                f.write(f"\n\n# Dynamically generated tool\n{tool_code}\n")


            self.logger.info(

                f"Tool code appended to {self.tools_server_path}."

            )

            return True


        except Exception as e:

            self.logger.error(f"Failed to add tool to server: {e}")

            return False


    async def _notify_failure(

        self,

        requester: str,

        tool_description: str,

        reason: str

    ):

        """Notify the coordinator that tool creation failed."""

        notification = Message(

            message_type=MessageType.AGENT_NOTIFICATION,

            sender=self.name,

            recipient="coordinator",

            payload={

                "notification_type": "tool_creation_failed",

                "tool_description": tool_description,

                "requester": requester,

                "reason": reason

            }

        )

        await self.bus.send(notification)

        self.logger.error(

            f"Tool creation failed for '{tool_description[:60]}': {reason}"

        )


The final coordinator adds the create_tool action alongside the existing create_task action. Here is the complete final version of coordinator_agent.py:


# coordinator/coordinator_agent.py (Final - Increment 7)

#

# Adds "create_tool" action handling so the coordinator can route

# tool creation requests to the tool agent. This is the complete,

# final version of the coordinator for the full Octopussy Light system.


import asyncio

import sys

import json

from typing import List, Dict, Optional


from agents.base_agent import BaseAgent

from messaging.message_bus import MessageBus

from messaging.message_types import Message, MessageType

from coordinator.worker_manager import WorkerManager

from llm.llm_client import LLMClient

from utils.logger import get_logger


logger = get_logger("coordinator", "coordinator")


COORDINATOR_SYSTEM_PROMPT = """

You are the coordinator of an agentic AI system called Octopussy Light.

Your job is to understand the user's requests, coordinate specialized

agents to fulfill those requests, and present results clearly.


When the user asks you to perform a task (something that requires research,

analysis, writing, coding, data retrieval, or other substantive work),

respond with a JSON object in this exact format:

{"action": "create_task", "task_description": "<the task>"}


When the user asks you to create a new tool or add a new capability to

the system, respond with:

{"action": "create_tool", "tool_description": "<what the tool should do>"}


For conversational messages, questions about the system, or simple requests

that you can answer directly from your knowledge, respond in plain text.


You are friendly, precise, and efficient.

"""



class CoordinatorAgent(BaseAgent):

    """

    The central coordinator of Octopussy Light.

    Handles user communication and coordinates all other agents.

    """


    def __init__(

        self,

        config: dict,

        bus: MessageBus,

        mcp_registry=None

    ):

        super().__init__(

            name="coordinator",

            bus=bus,

            config=config,

            color_key="coordinator"

        )


        llm_config = config["llm"]


        self.llm = LLMClient(

            base_url=llm_config["base_url"],

            api_key=llm_config["api_key"],

            model=llm_config["coordinator_model"],

            system_prompt=COORDINATOR_SYSTEM_PROMPT

        )


        self.conversation_history: List[Dict[str, str]] = []


        # Pass the MCP registry to the worker manager so every worker

        # it creates will have access to the predefined tools.

        self.worker_manager = WorkerManager(

            config, bus, mcp_registry=mcp_registry

        )


        self.logger.info(

            f"Coordinator initialized with model: "

            f"{llm_config['coordinator_model']}"

        )


    async def handle_message(self, message: Message):

        """Handle messages from other agents."""

        if message.message_type == MessageType.MAIL_NOTIFICATION:

            await self._display_mail_notification(message.payload)

        elif message.message_type == MessageType.CALENDAR_NOTIFICATION:

            await self._display_calendar_notification(message.payload)

        elif message.message_type == MessageType.TASK_RESULT:

            await self._display_task_result(message.payload)

        elif message.message_type == MessageType.TASK_ERROR:

            await self._display_task_error(message.payload)

        elif message.message_type == MessageType.AGENT_NOTIFICATION:

            await self._display_agent_notification(message.payload)


    async def _display_mail_notification(self, payload: dict):

        print(

            f"\n[MAIL] From: {payload.get('sender', 'Unknown')}\n"

            f"       Subject: {payload.get('subject', 'No subject')}\n"

            f"       Summary: {payload.get('summary', '')}\n"

        )


    async def _display_calendar_notification(self, payload: dict):

        print(

            f"\n[CALENDAR] Event: {payload.get('title', 'Untitled')}\n"

            f"           When: {payload.get('start_time', 'Unknown')}\n"

            f"           Location: {payload.get('location', 'Not specified')}\n"

        )


    async def _display_task_result(self, payload: dict):

        task_id = payload.get("task_id", "unknown")

        response_num = payload.get("response_number", 1)

        result = payload.get("result", "")

        print(

            f"\n[TASK RESULT] Task {task_id} "

            f"(response #{response_num}):\n{result}\n"

        )


    async def _display_task_error(self, payload: dict):

        task_id = payload.get("task_id", "unknown")

        error = payload.get("error", "Unknown error")

        print(f"\n[TASK ERROR] Task {task_id} failed: {error}\n")


    async def _display_agent_notification(self, payload: dict):

        notification_type = payload.get("notification_type", "")

        if notification_type == "tool_created":

            print(

                f"\n[TOOL AGENT] New tool created: "

                f"{payload.get('tool_description', '')}\n"

            )

        elif notification_type == "tool_creation_failed":

            print(

                f"\n[TOOL AGENT] Tool creation failed: "

                f"{payload.get('reason', 'Unknown reason')}\n"

            )

        else:

            print(f"\n[AGENT] {payload.get('message', str(payload))}\n")


    async def chat(self, user_input: str) -> str:

        """

        Process user input and route to the appropriate handler.


        The LLM decides whether the input is a task request, a tool

        creation request, or a conversational message, and responds

        with a JSON action object or plain text accordingly.

        """

        self.conversation_history.append({

            "role": "user",

            "content": user_input

        })


        response = await self.llm.chat(self.conversation_history)


        try:

            parsed = json.loads(response.strip())

            action = parsed.get("action")


            if action == "create_task":

                task_desc = parsed.get("task_description", user_input)

                task_id = await self.worker_manager.assign_task(task_desc)

                reply = (

                    f"Task assigned (ID: {task_id}). "

                    f"A worker is now executing it. "

                    f"Results will appear shortly."

                )

                self.conversation_history.append({

                    "role": "assistant",

                    "content": reply

                })

                return reply


            elif action == "create_tool":

                tool_desc = parsed.get("tool_description", "")

                await self._request_tool_creation(tool_desc)

                reply = (

                    f"I have asked the tool agent to create a new tool: "

                    f"'{tool_desc}'. You will be notified when it is ready."

                )

                self.conversation_history.append({

                    "role": "assistant",

                    "content": reply

                })

                return reply


        except (json.JSONDecodeError, AttributeError):

            pass


        self.conversation_history.append({

            "role": "assistant",

            "content": response

        })

        return response


    async def _request_tool_creation(self, tool_description: str):

        """Send a tool creation request to the tool agent."""

        request = Message(

            message_type=MessageType.AGENT_NOTIFICATION,

            sender=self.name,

            recipient="tool_agent",

            payload={

                "request_type": "create_tool",

                "tool_description": tool_description,

                "requester": self.name

            }

        )

        await self.bus.send(request)


    async def run_background_tasks(self):

        """Read user input asynchronously."""

        loop = asyncio.get_running_loop()

        print("\nOctopussy Light is ready. Type 'quit' or 'exit' to stop.\n")


        while self.running:

            try:

                print("You: ", end="", flush=True)

                user_input = await loop.run_in_executor(

                    None, sys.stdin.readline

                )

                user_input = user_input.strip()


                if not user_input:

                    continue


                if user_input.lower() in ("quit", "exit"):

                    self.running = False

                    print("Goodbye!")

                    break


                response = await self.chat(user_input)

                print(f"\nCoordinator: {response}\n")


            except Exception as e:

                self.logger.error(f"Error in input loop: {e}")


Let us trace through a tool creation scenario. The user asks for a weather tool. The coordinator's LLM responds with {"action": "create_tool", "tool_description": "Fetch current weather for a city"}. The coordinator sends a creation request to the tool agent. The tool agent asks its LLM to generate the function, tests the syntax and structure by writing the code to a temp file and running a separate test script against it, appends the validated code to the built-in tools server file, and notifies the coordinator:


You: I need a tool that can fetch the current weather for a city.


Coordinator: I have asked the tool agent to create a new tool:

'Fetch current weather for a city using the wttr.in service'.

You will be notified when it is ready.


[tool_agent] 10:01:30 INFO: Tool creation requested: Fetch current weather...

[tool_agent] 10:01:32 INFO: Generated tool code. Running sandbox tests...

[tool_agent] 10:01:33 INFO: Sandbox tests passed. Adding tool to server.

[tool_agent] 10:01:33 INFO: Tool code appended to builtin_tools_server.py.


[TOOL AGENT] New tool created: 'Fetch current weather for a city using

the wttr.in service'. The tool is now available as 'get_current_weather'

from the next system startup.


CHAPTER NINE: INCREMENT EIGHT - REMOTE MCP SERVERS


The final major feature of Octopussy Light is support for remote MCP servers. Users can configure external MCP servers in config.yaml, and the agents can use the tools those servers expose. This opens up a vast ecosystem of capabilities: specialized data sources, enterprise APIs, domain-specific tools, and more. The MCPServerRegistry we built in Increment 6 already handles multiple servers; we simply need to configure additional entries.


Let us expand the MCP server configuration in config.yaml to show the full range of options:


# config.yaml (expanded mcp_servers section for Increment 8)


mcp_servers:

  # Built-in server (always present)

  - name: "builtin"

    type: "local"

    command: ["python", "mcp_server/builtin_tools_server.py"]


  # Example: a custom local MCP server for specialized tasks

  # - name: "data_analysis"

  #   type: "local"

  #   command: ["python", "/path/to/data_analysis_mcp_server.py"]


  # Example: a remote HTTP MCP server

  # - name: "company_data"

  #   type: "http"

  #   url: "https://mcp.yourcompany.com/tools"

  #   api_key: "your_api_key_here"


The MCPServerRegistry already handles the routing logic: when a worker calls a tool by name, the registry looks up which server hosts that tool and routes the call there automatically. Adding a new server is purely a configuration change. No code changes are required.


When multiple servers are connected, the registry merges their tool lists. If two servers expose a tool with the same name, the registry logs a warning and the later server's tool takes precedence. This makes it straightforward to override a built-in tool with a custom implementation simply by adding a server that exposes a tool with the same name.


CHAPTER TEN: THE COMPLETE SYSTEM - FINAL ASSEMBLY


We have now built all the components of Octopussy Light. Let us put together the final, complete version of main.py that wires everything together, and then review the complete project structure:


# main.py (Final - Complete Octopussy Light System)

#

# This is the complete entry point for Octopussy Light.

# It initializes all components, connects to MCP servers,

# and runs all agents concurrently.


import asyncio

import yaml

import sys


from messaging.message_bus import MessageBus

from coordinator.coordinator_agent import CoordinatorAgent

from agents.mail_agent import MailAgent

from agents.calendar_agent import CalendarAgent

from agents.tool_agent import ToolAgent

from mcp_client.mcp_server_registry import MCPServerRegistry

from utils.logger import get_logger


logger = get_logger("system", "system")



def load_config(path: str = "config.yaml") -> dict:

    """Load and parse the YAML configuration file."""

    try:

        with open(path, "r") as f:

            return yaml.safe_load(f)

    except FileNotFoundError:

        logger.error(f"Configuration file not found: {path}")

        sys.exit(1)

    except yaml.YAMLError as e:

        logger.error(f"Error parsing configuration file: {e}")

        sys.exit(1)



async def main():

    """

    The complete Octopussy Light startup sequence.


    1. Load configuration.

    2. Initialize the message bus.

    3. Connect to all configured MCP servers.

    4. Initialize all agents with the MCP registry.

    5. Run all agents concurrently.

    6. Handle graceful shutdown on exit.

    """

    logger.info("=" * 60)

    logger.info("Starting Octopussy Light - Complete System")

    logger.info("=" * 60)


    config = load_config()


    # Step 1: Initialize the message bus.

    bus = MessageBus()


    # Step 2: Initialize and connect the MCP server registry.

    mcp_configs = config.get("mcp_servers", [])

    mcp_registry = MCPServerRegistry(mcp_configs)


    logger.info("Connecting to MCP servers...")

    await mcp_registry.connect_all()

    logger.info("MCP servers connected.")


    # Step 3: Initialize all agents.

    # The coordinator receives the MCP registry and passes it to the

    # WorkerManager, which in turn passes it to each new worker agent.

    coordinator = CoordinatorAgent(config, bus, mcp_registry=mcp_registry)

    mail_agent = MailAgent(config, bus)

    calendar_agent = CalendarAgent(config, bus)

    tool_agent = ToolAgent(config, bus)


    all_agents = [coordinator, mail_agent, calendar_agent, tool_agent]


    logger.info("All agents initialized. Starting Octopussy Light...")


    try:

        # Step 4: Run all agents concurrently.

        await asyncio.gather(

            *[agent.start() for agent in all_agents]

        )

    finally:

        # Step 5: Graceful shutdown.

        logger.info("Stopping all agents...")

        for agent in all_agents:

            await agent.stop()


        logger.info("Disconnecting from MCP servers...")

        await mcp_registry.disconnect_all()


        logger.info("Octopussy Light has shut down cleanly.")



if __name__ == "__main__":

    try:

        asyncio.run(main())

    except KeyboardInterrupt:

        print("\nShutting down Octopussy Light...")


The complete project structure for Octopussy Light is as follows:


octopussy_light/

    config.yaml                         (configuration)

    main.py                             (entry point)

    requirements.txt                    (dependencies)

    memory/                             (created at runtime by ChromaDB)

        worker_<id>/                    (per-worker ChromaDB stores)

    coordinator/

        __init__.py

        coordinator_agent.py            (coordinator agent - final version)

        worker_manager.py               (worker lifecycle management)

        prompt_optimizer.py             (task prompt optimization)

    agents/

        __init__.py

        base_agent.py                   (abstract base class)

        mail_agent.py                   (email monitoring)

        calendar_agent.py               (calendar monitoring)

        worker_agent.py                 (task execution with MCP tools)

        tool_agent.py                   (dynamic tool creation)

    messaging/

        __init__.py

        message_bus.py                  (asyncio-based message routing)

        message_types.py                (message type definitions)

    llm/

        __init__.py

        llm_client.py                   (unified LLM client)

    mcp_server/

        builtin_tools_server.py         (predefined MCP tools server)

    mcp_client/

        __init__.py

        mcp_tool_client.py              (MCP client for agents)

        mcp_server_registry.py          (multi-server management)

    utils/

        __init__.py

        logger.py                       (colored logging)


All __init__.py files are empty unless otherwise shown. The memory/ directory is created automatically by ChromaDB when the first worker agent initializes its long-term memory store.


CHAPTER  ELEVEN: A COMPLETE INTERACTION WALKTHROUGH


To make the full system concrete, let us walk through a complete interaction session that exercises all the major components.


The user starts Octopussy Light:


[system] 10:00:00 INFO: Starting Octopussy Light - Complete System

[system] 10:00:00 INFO: Connecting to MCP servers...

[mcp_registry] 10:00:01 INFO: Connected to MCP server 'builtin'.

Available tools: ['read_file', 'write_file', 'list_directory',

'execute_python']

[coordinator] 10:00:01 INFO: Coordinator initialized with model: llama3.2

[mail_agent] 10:00:01 INFO: Mail agent initialized. Polling every 60s.

[calendar_agent] 10:00:01 INFO: Calendar agent initialized.

[tool_agent] 10:00:01 INFO: Tool agent initialized.


Octopussy Light is ready. Type 'quit' or 'exit' to stop.


You: Hello! What can you do?


Coordinator: Hello! I am Octopussy Light, your personal agentic AI assistant.

Here is what I can do for you: I monitor your email inbox and notify you when

messages matching your configured filters arrive. I watch your calendar and

alert you to upcoming events. I can spawn worker agents to execute complex

tasks like research, writing, coding, and data analysis. Workers have access

to built-in tools including file reading and writing, directory listing, and

sandboxed Python code execution. I can also create entirely new tools on

demand if you need a capability that does not yet exist. What would you like

to do?


You: Please analyze the file /tmp/sales_data.csv and give me a summary.


Coordinator: Task assigned (ID: f3a9b1c2). A worker is now executing it.

Results will appear shortly.


[worker_f3a9b1] 10:00:15 INFO: Received task f3a9b1c2.

[worker_f3a9b1] 10:00:16 INFO: Task f3a9b1c2: calling tool 'read_file'

with args: {'file_path': '/tmp/sales_data.csv'}


[TASK RESULT] Task f3a9b1c2 (response #1):

I have analyzed the sales data file. Here is the summary:


The CSV file contains 1,247 rows of sales transactions from Q3 2025.

Total revenue: $2,847,392. Top performing product: Widget Pro (34% of

revenue). The Western region outperformed all others by 23%. There is

a notable spike in sales during the week of September 15th, likely

corresponding to a promotional event. Average transaction value: $2,283.


You: I need a tool that can convert Celsius to Fahrenheit.


Coordinator: I have asked the tool agent to create a new tool:

'Convert temperature values between Celsius and Fahrenheit'. You will

be notified when it is ready.


[tool_agent] 10:01:30 INFO: Tool creation requested.

[tool_agent] 10:01:32 INFO: Generated tool code. Running sandbox tests...

[tool_agent] 10:01:33 INFO: Sandbox tests passed. Adding tool to server.


[TOOL AGENT] New tool created: 'Convert temperature values between Celsius

and Fahrenheit'. The tool is now available as 'celsius_to_fahrenheit'

from the next system startup.



This walkthrough demonstrates the full system in action: conversational interaction with the coordinator, task delegation to a worker that uses predefined MCP tools, dynamic tool creation by the tool agent, and background notifications from the mail and calendar agents, all happening concurrently in a single terminal session.


CONCLUSION: WHAT YOU HAVE BUILT AND WHERE TO GO NEXT


You have built Octopussy Light from scratch, starting with a simple conversational coordinator and incrementally adding every major component of a production-grade agentic AI system. Let us take stock of what the complete system contains.

The coordinator agent serves as the brain of the system, communicating with the user, routing requests to the appropriate agents, displaying notifications, and managing the overall workflow. It uses an LLM to distinguish between conversational messages, task requests, and tool creation requests, and it delegates each to the appropriate handler.

The mail agent runs in the background, polling an IMAP inbox at a configurable interval, applying regular expression filters to incoming messages, summarizing matches with an LLM, and notifying the coordinator. It handles the full complexity of email parsing, including multipart messages and encoded headers.

The calendar agent monitors either a local .ics file or Google Calendar, checking for events that fall within a configurable notification window and alerting the coordinator in advance. It handles the asynchronous integration of synchronous APIs using thread pool executors.

The worker agents are the workhorses of the system. They are created on demand, reused when idle, and cleaned up when excess. Each worker has short-term memory (conversation history for the current task) and long-term memory (a ChromaDB vector store). Workers implement the ReAct agentic loop, reasoning about tasks, calling MCP tools, observing results, and continuing until they produce a final answer.

The predefined MCP tools — read_file, write_file, list_directory, and execute_python — give workers immediate real-world capabilities from the moment the system starts. These tools are exposed by the built-in MCP tools server and discovered automatically by the registry at startup.

The prompt optimizer improves task descriptions before they reach workers, using an LLM to add clarity, specificity, and structure to raw user requests. This consistently improves the quality of worker outputs.

The tool agent is the most remarkable component: it can create entirely new MCP tools on demand, using an LLM to generate the code, testing it safely by writing the code to a dedicated temp file and running a separate validation script against it, and deploying it to the system without any manual intervention. New tools are appended to the built-in tools server file and become available from the next system startup.

The MCP server registry manages connections to all configured MCP servers, both local and remote, providing a unified tool discovery and invocation interface to all agents. Adding a new server requires only a configuration change. The message bus ties everything together, routing messages between agents using asyncio queues with backpressure and clean shutdown support. The base agent's message loop guarantees that queue.task_done() is always called — even when message handling raises an exception — preventing any future use of queue.join() from blocking indefinitely.

Where should you go from here? There are several natural extensions to explore. 

You could, for example,

add a web search tool to the built-in MCP server, giving workers the ability to retrieve current information from the internet,

implement the IMAP IDLE command in the mail agent to receive instant notifications instead of polling, 

add a proper REST API to the coordinator so that external systems can send tasks and receive results programmatically,

containerize the system with Docker to make it portable and deployable. You could add more sophisticated memory management, such as automatic summarization of old conversation history and relevance-based pruning of long-term memories,

implement scheduled tasks, where a worker repeats a task periodically and sends multiple responses over time. You could also explore the MCP 2025-11-25 Tasks primitive to convert long-running worker tasks into proper async MCP tasks, enabling clients to poll for results rather than waiting for push notifications,

and should harden the code and add security means, because in Octopussy Light wa are using authentication credentials in raw text.

Octopussy Light is not just a tutorial project. It is a genuine, working foundation for building sophisticated agentic AI systems. The architecture is clean, the components are modular, and every piece is designed to be extended. The octopus has eight arms, and you have built all of them. Now it is time to teach them to swim.