"Architecture is the art of how to waste space." — Philip Johnson. In Agentic AI, architecture is the art of how not to waste tokens, time, or trust.
Preface
The emergence of Large Language Models (LLMs) as reasoning engines has fundamentally disrupted how we think about software architecture. For decades, software systems were deterministic: given the same input, you could guarantee the same output. Agentic AI systems shatter that assumption. They reason, plan, delegate, reflect, and sometimes surprise you — occasionally in ways you did not intend.
This creates a fascinating and urgent challenge for software architects. The patterns we relied upon for microservices, event-driven systems, and distributed computing still apply — but they must be extended, adapted, and in some cases entirely reimagined to accommodate agents that think, tools that can have side effects, and pipelines where a single hallucination can cascade into a catastrophic failure.
This article presents a comprehensive catalog of software architecture patterns for Agentic AI systems. Each pattern is described with its intent, motivation, structure, implementation guidance, trade-offs, and real-world applicability. Whether you are building a single autonomous agent or a fleet of collaborating AI workers, these patterns will help you design systems that are reliable, maintainable, observable, and safe.
The patterns are organized into six thematic chapters:
- Foundational Agent Patterns — the core building blocks
- Memory and State Patterns — managing what agents know and remember
- Multi-Agent Coordination Patterns — how agents work together
- Reliability and Safety Patterns — keeping systems trustworthy
- Integration Patterns — connecting agents to the broader world
- Anti-Patterns — what to actively avoid
Let us begin.
CHAPTER 1 — Foundational Agent Patterns
The patterns in this section form the bedrock of any agentic system. Before you can build a fleet of collaborating agents, you need to understand how a single agent reasons, acts, manages its tools, and remains model-agnostic. These are not optional abstractions — they are the vocabulary of agentic software design.
1.1 — ReAct Loop Pattern
The Canonical Reason-Act-Observe Cycle
Intent
Enable an agent to interleave reasoning and acting in an iterative loop, observing the results of each action before deciding what to do next. This creates a dynamic, adaptive execution model that is far more powerful than a single-shot prompt-response interaction.
Motivation
Early LLM applications were essentially sophisticated autocomplete engines: you sent a prompt, you received a response, and that was the end of the interaction. This works well for simple tasks — summarizing a document, drafting an email, translating text — but it breaks down completely for complex, multi-step tasks where the agent needs to gather information, make decisions based on intermediate results, and adapt its plan as new data arrives.
The ReAct pattern (Reasoning + Acting), introduced in the seminal 2022 paper "ReAct: Synergizing Reasoning and Acting in Language Models" by Yao et al., addresses this limitation by structuring agent execution as a loop with three distinct phases:
- Reason (Thought): The agent articulates its current understanding of the problem and decides what to do next.
- Act (Action): The agent executes a specific action — typically a tool call — based on its reasoning.
- Observe (Observation): The agent receives the result of the action and incorporates it into its context.
This loop continues until the agent determines that it has sufficient information to produce a final answer, or until a termination condition is met.
Structure
┌─────────────────────────────────────────────────────┐
│ ReAct Agent │
│ │
│ ┌──────────┐ ┌──────────┐ ┌─────────────┐ │
│ │ REASON │───▶│ ACT │───▶│ OBSERVE │ │
│ │ (Thought)│ │ (Action) │ │(Observation)│ │
│ └──────────┘ └──────────┘ └──────┬──────┘ │
│ ▲ │ │
│ └─────────────────────────────────┘ │
│ │
│ ┌─────────┐ │
│ │ FINISH │ ◀── Terminal State │
│ └─────────┘ │
└─────────────────────────────────────────────────────┘
Implementation
A ReAct loop is typically implemented as a while loop that continues until the agent signals completion. The agent's "scratchpad" — the accumulated history of thoughts, actions, and observations — is passed back to the LLM on each iteration, allowing it to build on prior reasoning.
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class AgentState(Enum):
REASONING = "reasoning"
ACTING = "acting"
OBSERVING = "observing"
FINISHED = "finished"
@dataclass
class AgentStep:
thought: str
action: Optional[str] = None
action_input: Optional[dict] = None
observation: Optional[str] = None
@dataclass
class ReActAgent:
llm_client: "LLMClient"
tool_registry: "ToolRegistry"
system_prompt: str
max_iterations: int = 10
scratchpad: list[AgentStep] = field(default_factory=list)
def run(self, user_query: str) -> str:
"""
Execute the ReAct loop until a final answer is produced
or the maximum iteration limit is reached.
"""
iteration = 0
while iteration < self.max_iterations:
iteration += 1
# Phase 1: REASON
# Build the full context from scratchpad history
context = self._build_context(user_query)
llm_response = self.llm_client.complete(
system=self.system_prompt,
user=context
)
# Parse the LLM response into thought + action
step = self._parse_llm_response(llm_response)
self.scratchpad.append(step)
# Check for terminal state
if step.action == "FINISH":
return step.action_input.get("answer", "No answer produced.")
# Phase 2: ACT
if step.action:
tool = self.tool_registry.get(step.action)
if not tool:
step.observation = f"Error: Tool '{step.action}' not found."
else:
# Phase 3: OBSERVE
try:
result = tool.execute(**step.action_input)
step.observation = str(result)
except Exception as e:
step.observation = f"Tool execution error: {str(e)}"
return "Maximum iterations reached without a final answer."
def _build_context(self, query: str) -> str:
"""Serialize the scratchpad into a prompt-friendly format."""
context = f"Question: {query}\n\n"
for step in self.scratchpad:
context += f"Thought: {step.thought}\n"
if step.action:
context += f"Action: {step.action}\n"
context += f"Action Input: {step.action_input}\n"
if step.observation:
context += f"Observation: {step.observation}\n\n"
context += "Thought:" # Prompt the next reasoning step
return context
def _parse_llm_response(self, response: str) -> AgentStep:
"""
Parse LLM output into a structured AgentStep.
In production, use a more robust parser or structured outputs.
"""
# Simplified parsing logic — production systems should use
# structured output schemas (e.g., JSON mode or function calling)
lines = response.strip().split("\n")
thought = ""
action = None
action_input = {}
for line in lines:
if line.startswith("Thought:"):
thought = line.replace("Thought:", "").strip()
elif line.startswith("Action:"):
action = line.replace("Action:", "").strip()
elif line.startswith("Action Input:"):
import json
raw = line.replace("Action Input:", "").strip()
action_input = json.loads(raw)
return AgentStep(
thought=thought,
action=action,
action_input=action_input
)
Trade-offs
| Dimension | Benefit | Cost |
|---|---|---|
| Flexibility | Adapts dynamically to intermediate results | Unpredictable number of LLM calls |
| Transparency | Reasoning chain is fully visible | Scratchpad grows large, consuming tokens |
| Correctness | Can self-correct based on observations | May loop or hallucinate tool calls |
| Latency | Produces better answers for complex tasks | Sequential calls add significant latency |
When to Use
- Tasks requiring information gathering from multiple sources
- Problems where the path to the answer is not known upfront
- Scenarios where intermediate results influence subsequent steps
- Any task that benefits from dynamic tool selection
When Not to Use
- Simple, single-step tasks where a direct prompt suffices
- Latency-critical applications where sequential LLM calls are prohibitive
- Tasks with a fixed, well-known execution path (use Plan-and-Execute instead)
1.2 — Plan-and-Execute Pattern
Separate Planning from Execution for Better Control
Intent
Decompose agent execution into two distinct phases: a planning phase, where a high-level plan is generated upfront, and an execution phase, where each step of the plan is carried out sequentially or in parallel. This separation provides greater predictability, observability, and control compared to the fully dynamic ReAct loop.
Motivation
The ReAct loop is powerful, but its dynamism is a double-edged sword. Because the agent decides its next action at every iteration, the overall execution path is opaque until it completes. This makes it difficult to:
- Preview what the agent is going to do before it does it
- Parallelize independent steps
- Interrupt execution at a meaningful boundary
- Audit the agent's decision-making process
The Plan-and-Execute pattern addresses these concerns by front-loading the reasoning. A Planner agent (or a planning-specific prompt) generates a structured, multi-step plan. An Executor then carries out each step, potentially using a simpler, cheaper model for execution since the hard reasoning has already been done.
This mirrors how human teams operate: a senior architect designs the system; junior engineers implement the components. The planner is the architect; the executor is the engineer.
Structure
┌────────────────────────────────────────────────────────────┐
│ Plan-and-Execute Agent │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ PLANNER │ │
│ │ Input: User Query │ │
│ │ Output: Structured Plan (list of steps) │ │
│ └──────────────────────┬──────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ EXECUTOR │ │
│ │ │ │
│ │ Step 1 ──▶ Tool Call ──▶ Result │ │
│ │ Step 2 ──▶ Tool Call ──▶ Result │ │
│ │ Step 3 ──▶ Tool Call ──▶ Result │ │
│ │ ... │ │
│ │ Step N ──▶ Final Answer │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Optional: Re-Planner (updates plan based on results) │
└────────────────────────────────────────────────────────────┘
Implementation
from dataclasses import dataclass, field
from typing import Optional
import json
@dataclass
class PlanStep:
step_id: int
description: str
tool: str
tool_input: dict
depends_on: list[int] = field(default_factory=list)
result: Optional[str] = None
status: str = "pending" # pending | running | complete | failed
@dataclass
class ExecutionPlan:
goal: str
steps: list[PlanStep]
class Planner:
"""
Responsible for generating a structured execution plan
from a high-level user goal.
"""
def __init__(self, llm_client: "LLMClient"):
self.llm = llm_client
self.system_prompt = """
You are a planning agent. Given a user goal, produce a structured
JSON execution plan with numbered steps. Each step must specify:
- step_id: integer
- description: what this step accomplishes
- tool: the tool to invoke
- tool_input: parameters for the tool
- depends_on: list of step_ids that must complete first
Return ONLY valid JSON. No prose.
"""
def create_plan(self, goal: str) -> ExecutionPlan:
response = self.llm.complete(
system=self.system_prompt,
user=f"Goal: {goal}"
)
raw_plan = json.loads(response)
steps = [PlanStep(**step) for step in raw_plan["steps"]]
return ExecutionPlan(goal=goal, steps=steps)
def replan(self, original_plan: ExecutionPlan, failed_step: PlanStep,
error: str) -> ExecutionPlan:
"""
Optionally regenerate the plan when a step fails.
This is the 'Re-Planner' component.
"""
context = f"""
Original goal: {original_plan.goal}
Completed steps: {[s for s in original_plan.steps if s.status == 'complete']}
Failed step: {failed_step}
Error: {error}
Please generate a revised plan to complete the remaining goal.
"""
response = self.llm.complete(
system=self.system_prompt,
user=context
)
raw_plan = json.loads(response)
steps = [PlanStep(**step) for step in raw_plan["steps"]]
return ExecutionPlan(goal=original_plan.goal, steps=steps)
class Executor:
"""
Responsible for executing each step of a plan,
respecting dependencies between steps.
"""
def __init__(self, tool_registry: "ToolRegistry"):
self.tool_registry = tool_registry
def execute_plan(self, plan: ExecutionPlan) -> dict[int, str]:
results = {}
for step in self._topological_sort(plan.steps):
step.status = "running"
# Inject results from dependent steps into tool_input
enriched_input = self._inject_dependencies(
step.tool_input, step.depends_on, results
)
tool = self.tool_registry.get(step.tool)
if not tool:
step.status = "failed"
step.result = f"Tool '{step.tool}' not found."
results[step.step_id] = step.result
continue
try:
result = tool.execute(**enriched_input)
step.result = str(result)
step.status = "complete"
results[step.step_id] = step.result
except Exception as e:
step.status = "failed"
step.result = f"Execution error: {str(e)}"
results[step.step_id] = step.result
return results
def _topological_sort(self, steps: list[PlanStep]) -> list[PlanStep]:
"""Sort steps respecting dependency order."""
sorted_steps = []
visited = set()
def visit(step: PlanStep):
if step.step_id in visited:
return
for dep_id in step.depends_on:
dep = next(s for s in steps if s.step_id == dep_id)
visit(dep)
visited.add(step.step_id)
sorted_steps.append(step)
for step in steps:
visit(step)
return sorted_steps
def _inject_dependencies(self, tool_input: dict,
depends_on: list[int],
results: dict[int, str]) -> dict:
"""Replace dependency placeholders with actual results."""
enriched = dict(tool_input)
for dep_id in depends_on:
placeholder = f"{{step_{dep_id}_result}}"
for key, value in enriched.items():
if isinstance(value, str) and placeholder in value:
enriched[key] = value.replace(placeholder, results.get(dep_id, ""))
return enriched
class PlanAndExecuteAgent:
def __init__(self, planner: Planner, executor: Executor,
enable_replanning: bool = True):
self.planner = planner
self.executor = executor
self.enable_replanning = enable_replanning
def run(self, goal: str) -> str:
plan = self.planner.create_plan(goal)
results = self.executor.execute_plan(plan)
# Collect final output from the last step
final_step = max(plan.steps, key=lambda s: s.step_id)
return results.get(final_step.step_id, "Execution completed.")
Trade-offs
| Dimension | Benefit | Cost |
|---|---|---|
| Predictability | Full plan visible before execution begins | Plan may be wrong; requires replanning logic |
| Parallelism | Independent steps can run concurrently | Dependency graph adds complexity |
| Cost | Executor can use cheaper models | Planner requires a capable, expensive model |
| Control | Easy to insert human approval at plan stage | Less adaptive than ReAct for dynamic tasks |
1.3 — Reflection Pattern
Agents That Critique and Revise Their Own Outputs
Intent
Introduce a self-critique loop in which an agent evaluates its own output against a set of quality criteria and iteratively revises it until the output meets a defined standard or a maximum revision count is reached.
Motivation
LLMs are remarkably capable at generating plausible-sounding text, but "plausible" is not the same as "correct," "complete," or "well-structured." A first-pass response from an LLM is often good but rarely optimal. Human experts routinely review and revise their own work — why shouldn't AI agents?
The Reflection pattern formalizes this self-improvement loop. It is inspired by the Constitutional AI approach from Anthropic and the Self-Refine paper by Madaan et al. (2023), which demonstrated that LLMs can meaningfully improve their outputs through iterative self-critique.
The pattern introduces two conceptual roles (which may be implemented as a single LLM with different prompts, or as separate models):
- Generator: Produces an initial output
- Critic: Evaluates the output and provides structured feedback
- Reviser: Incorporates the critique to produce an improved output
Structure
┌──────────────────────────────────────────────────────────┐
│ Reflection Agent │
│ │
│ User Input │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │GENERATOR│ ──▶ Initial Output │
│ └─────────┘ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ CRITIC │ ──▶ Critique + Score │
│ └─────────┘ │ │
│ ▼ │
│ Score ≥ Threshold? │
│ ┌────────────────┐ │
│ Yes │ No │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ FINISH ┌──────────────────┐ │
│ │ REVISER │ │
│ │ (Improved Output)│ │
│ └────────┬─────────┘ │
│ │ │
│ Back to CRITIC │
└──────────────────────────────────────────────────────────┘
Implementation
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class CritiqueResult:
score: float # 0.0 to 1.0
issues: list[str] # Specific problems identified
suggestions: list[str] # Actionable improvement suggestions
is_acceptable: bool # Whether output meets the quality bar
class ReflectionAgent:
"""
An agent that generates, critiques, and revises its own output
until quality criteria are met.
"""
GENERATOR_PROMPT = """
You are an expert assistant. Produce a high-quality response to the
user's request. Be thorough, accurate, and well-structured.
"""
CRITIC_PROMPT = """
You are a rigorous quality evaluator. Analyze the provided output
against these criteria:
1. Accuracy: Is all information factually correct?
2. Completeness: Does it fully address the request?
3. Clarity: Is it well-structured and easy to understand?
4. Conciseness: Is it free of unnecessary verbosity?
Return a JSON object with:
- score: float between 0.0 and 1.0
- issues: list of specific problems
- suggestions: list of actionable improvements
- is_acceptable: boolean (true if score >= 0.8)
"""
REVISER_PROMPT = """
You are a skilled editor. Given an original output and a critique,
produce an improved version that addresses all identified issues.
Preserve what works well; fix what does not.
"""
def __init__(self, llm_client: "LLMClient", max_revisions: int = 3,
quality_threshold: float = 0.8):
self.llm = llm_client
self.max_revisions = max_revisions
self.quality_threshold = quality_threshold
def run(self, user_request: str) -> tuple[str, list[CritiqueResult]]:
"""
Returns the final output and the history of critiques.
"""
critique_history = []
# Step 1: Generate initial output
current_output = self.llm.complete(
system=self.GENERATOR_PROMPT,
user=user_request
)
for revision_num in range(self.max_revisions + 1):
# Step 2: Critique the current output
critique_response = self.llm.complete(
system=self.CRITIC_PROMPT,
user=f"Original Request: {user_request}\n\nOutput to Evaluate:\n{current_output}"
)
critique = self._parse_critique(critique_response)
critique_history.append(critique)
# Step 3: Check if quality threshold is met
if critique.is_acceptable or revision_num == self.max_revisions:
break
# Step 4: Revise based on critique
revision_prompt = f"""
Original Request: {user_request}
Current Output:
{current_output}
Critique:
Issues: {critique.issues}
Suggestions: {critique.suggestions}
Please produce an improved version.
"""
current_output = self.llm.complete(
system=self.REVISER_PROMPT,
user=revision_prompt
)
return current_output, critique_history
def _parse_critique(self, response: str) -> CritiqueResult:
data = json.loads(response)
return CritiqueResult(
score=data["score"],
issues=data["issues"],
suggestions=data["suggestions"],
is_acceptable=data.get("is_acceptable", data["score"] >= self.quality_threshold)
)
Variants
- Peer Reflection: Two separate LLM instances critique each other's outputs, avoiding the echo-chamber problem of self-critique
- Constitutional Reflection: The critic evaluates against a fixed set of principles (e.g., safety guidelines, brand voice, regulatory requirements)
- Targeted Reflection: The critic focuses on a single dimension (e.g., factual accuracy only), reducing noise in the feedback
1.4 — Tool Registry Pattern
Centralized Tool Management and Discovery
Intent
Provide a single, centralized registry through which agents discover, validate, and invoke tools. The registry acts as the authoritative source of truth for available capabilities, enforcing schemas, permissions, and versioning.
Motivation
As agentic systems grow, the number of tools available to agents proliferates rapidly. Without a centralized registry, you end up with tools defined inline in prompts, duplicated across agents, with no consistent validation or access control. The Tool Registry pattern brings order to this chaos.
It is the agentic equivalent of a service registry in microservices architecture (think Consul or Eureka), but specialized for LLM-callable tools.
Implementation
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
from enum import Enum
import inspect
import json
class ToolCategory(Enum):
SEARCH = "search"
COMPUTATION = "computation"
DATA_RETRIEVAL = "data_retrieval"
COMMUNICATION = "communication"
FILE_SYSTEM = "file_system"
EXTERNAL_API = "external_api"
@dataclass
class ToolSchema:
name: str
description: str
category: ToolCategory
parameters: dict # JSON Schema for parameters
returns: str # Description of return value
requires_confirmation: bool = False # Human-in-the-loop flag
max_calls_per_session: Optional[int] = None
allowed_roles: list[str] = field(default_factory=lambda: ["agent"])
@dataclass
class RegisteredTool:
schema: ToolSchema
handler: Callable
call_count: int = 0
class ToolRegistry:
"""
Centralized registry for all agent-callable tools.
Provides discovery, validation, access control, and invocation.
"""
def __init__(self):
self._tools: dict[str, RegisteredTool] = {}
def register(self, schema: ToolSchema, handler: Callable) -> None:
"""Register a new tool with its schema and handler."""
self._validate_handler_signature(schema, handler)
self._tools[schema.name] = RegisteredTool(schema=schema, handler=handler)
def tool(self, schema: ToolSchema):
"""Decorator for convenient tool registration."""
def decorator(func: Callable):
self.register(schema, func)
return func
return decorator
def get(self, name: str) -> Optional[RegisteredTool]:
return self._tools.get(name)
def discover(self, category: Optional[ToolCategory] = None,
role: str = "agent") -> list[ToolSchema]:
"""
Return schemas for all tools accessible to a given role,
optionally filtered by category.
"""
return [
tool.schema
for tool in self._tools.values()
if role in tool.schema.allowed_roles
and (category is None or tool.schema.category == category)
]
def invoke(self, name: str, caller_role: str = "agent",
**kwargs) -> Any:
"""
Invoke a tool by name with access control and rate limiting.
"""
tool = self._tools.get(name)
if not tool:
raise ValueError(f"Tool '{name}' not found in registry.")
# Access control
if caller_role not in tool.schema.allowed_roles:
raise PermissionError(
f"Role '{caller_role}' is not authorized to use tool '{name}'."
)
# Rate limiting
if (tool.schema.max_calls_per_session is not None and
tool.call_count >= tool.schema.max_calls_per_session):
raise RuntimeError(
f"Tool '{name}' has reached its maximum call limit "
f"({tool.schema.max_calls_per_session}) for this session."
)
# Parameter validation against JSON schema
self._validate_parameters(tool.schema, kwargs)
# Execute
tool.call_count += 1
return tool.handler(**kwargs)
def to_llm_format(self, role: str = "agent") -> list[dict]:
"""
Export tool schemas in the format expected by LLM APIs
(e.g., OpenAI function calling format).
"""
schemas = self.discover(role=role)
return [
{
"type": "function",
"function": {
"name": s.name,
"description": s.description,
"parameters": s.parameters
}
}
for s in schemas
]
def _validate_handler_signature(self, schema: ToolSchema,
handler: Callable) -> None:
"""Ensure handler parameters match the schema."""
sig = inspect.signature(handler)
schema_params = set(schema.parameters.get("properties", {}).keys())
handler_params = set(sig.parameters.keys())
if not schema_params.issubset(handler_params):
missing = schema_params - handler_params
raise ValueError(
f"Handler for '{schema.name}' is missing parameters: {missing}"
)
def _validate_parameters(self, schema: ToolSchema, kwargs: dict) -> None:
"""Basic parameter presence validation."""
required = schema.parameters.get("required", [])
for param in required:
if param not in kwargs:
raise ValueError(
f"Missing required parameter '{param}' for tool '{schema.name}'."
)
# Example usage
registry = ToolRegistry()
@registry.tool(ToolSchema(
name="web_search",
description="Search the web for current information on a topic.",
category=ToolCategory.SEARCH,
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query"},
"max_results": {"type": "integer", "default": 5}
},
"required": ["query"]
},
returns="A list of search result snippets with URLs.",
max_calls_per_session=20
))
def web_search(query: str, max_results: int = 5) -> list[dict]:
# Implementation would call a real search API
return [{"title": "Result", "url": "https://example.com", "snippet": "..."}]
1.5 — Adapter Pattern for LLM Clients
Model-Agnostic Agent Code
Intent
Define a uniform interface for interacting with LLM providers, allowing agent code to be written against an abstraction rather than a specific provider's SDK. Switching between models (OpenAI, Anthropic, Google, local models) requires only a configuration change, not a code rewrite.
Motivation
The LLM landscape is evolving at extraordinary speed. A model that is state-of-the-art today may be superseded in six months. Provider APIs differ in their request/response formats, streaming behavior, token counting, error types, and function-calling conventions. Tying your agent code directly to a specific provider's SDK is a form of vendor lock-in that will cost you dearly when you need to migrate.
The Adapter pattern, a classic from the Gang of Four, is the natural solution. You define a target interface that your agents depend on, and write adapters that translate between this interface and each provider's specific API.
Implementation
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Iterator
@dataclass
class LLMMessage:
role: str # "system" | "user" | "assistant" | "tool"
content: str
tool_call_id: Optional[str] = None
tool_calls: Optional[list[dict]] = None
@dataclass
class LLMResponse:
content: str
model: str
input_tokens: int
output_tokens: int
tool_calls: Optional[list[dict]] = None
finish_reason: str = "stop"
class LLMClient(ABC):
"""
Target interface — all agent code depends only on this abstraction.
"""
@abstractmethod
def complete(self, messages: list[LLMMessage],
tools: Optional[list[dict]] = None,
temperature: float = 0.7,
max_tokens: int = 4096) -> LLMResponse:
"""Generate a completion from a list of messages."""
pass
@abstractmethod
def stream(self, messages: list[LLMMessage],
temperature: float = 0.7) -> Iterator[str]:
"""Stream a completion token by token."""
pass
@abstractmethod
def count_tokens(self, text: str) -> int:
"""Count the number of tokens in a string."""
pass
class OpenAIAdapter(LLMClient):
"""Adapter for OpenAI's Chat Completions API."""
def __init__(self, api_key: str, model: str = "gpt-4o"):
import openai
self.client = openai.OpenAI(api_key=api_key)
self.model = model
def complete(self, messages: list[LLMMessage],
tools: Optional[list[dict]] = None,
temperature: float = 0.7,
max_tokens: int = 4096) -> LLMResponse:
oai_messages = [
{"role": m.role, "content": m.content}
for m in messages
]
kwargs = {
"model": self.model,
"messages": oai_messages,
"temperature": temperature,
"max_tokens": max_tokens
}
if tools:
kwargs["tools"] = tools
response = self.client.chat.completions.create(**kwargs)
choice = response.choices[0]
return LLMResponse(
content=choice.message.content or "",
model=self.model,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
tool_calls=[tc.model_dump() for tc in (choice.message.tool_calls or [])],
finish_reason=choice.finish_reason
)
def stream(self, messages: list[LLMMessage],
temperature: float = 0.7) -> Iterator[str]:
oai_messages = [{"role": m.role, "content": m.content} for m in messages]
stream = self.client.chat.completions.create(
model=self.model,
messages=oai_messages,
temperature=temperature,
stream=True
)
for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield delta
def count_tokens(self, text: str) -> int:
import tiktoken
enc = tiktoken.encoding_for_model(self.model)
return len(enc.encode(text))
class AnthropicAdapter(LLMClient):
"""Adapter for Anthropic's Claude API."""
def __init__(self, api_key: str, model: str = "claude-opus-4-5"):
import anthropic
self.client = anthropic.Anthropic(api_key=api_key)
self.model = model
def complete(self, messages: list[LLMMessage],
tools: Optional[list[dict]] = None,
temperature: float = 0.7,
max_tokens: int = 4096) -> LLMResponse:
# Anthropic separates system messages from the conversation
system_msg = next(
(m.content for m in messages if m.role == "system"), ""
)
conv_messages = [
{"role": m.role, "content": m.content}
for m in messages if m.role != "system"
]
kwargs = {
"model": self.model,
"system": system_msg,
"messages": conv_messages,
"temperature": temperature,
"max_tokens": max_tokens
}
if tools:
# Anthropic uses a different tool format — adapter handles translation
kwargs["tools"] = self._translate_tools(tools)
response = self.client.messages.create(**kwargs)
content_block = response.content[0]
return LLMResponse(
content=content_block.text if hasattr(content_block, "text") else "",
model=self.model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
finish_reason=response.stop_reason
)
def stream(self, messages: list[LLMMessage],
temperature: float = 0.7) -> Iterator[str]:
system_msg = next(
(m.content for m in messages if m.role == "system"), ""
)
conv_messages = [
{"role": m.role, "content": m.content}
for m in messages if m.role != "system"
]
with self.client.messages.stream(
model=self.model,
system=system_msg,
messages=conv_messages,
temperature=temperature,
max_tokens=4096
) as stream:
for text in stream.text_stream:
yield text
def count_tokens(self, text: str) -> int:
# Anthropic provides a token counting endpoint
response = self.client.beta.messages.count_tokens(
model=self.model,
messages=[{"role": "user", "content": text}]
)
return response.input_tokens
def _translate_tools(self, openai_tools: list[dict]) -> list[dict]:
"""Translate OpenAI tool format to Anthropic format."""
anthropic_tools = []
for tool in openai_tools:
func = tool.get("function", {})
anthropic_tools.append({
"name": func["name"],
"description": func["description"],
"input_schema": func["parameters"]
})
return anthropic_tools
class LLMClientFactory:
"""Factory for creating LLM clients from configuration."""
@staticmethod
def create(provider: str, **kwargs) -> LLMClient:
adapters = {
"openai": OpenAIAdapter,
"anthropic": AnthropicAdapter,
}
if provider not in adapters:
raise ValueError(f"Unknown provider: {provider}. "
f"Supported: {list(adapters.keys())}")
return adapters[provider](**kwargs)
CHAPTER 2 — Memory and State Patterns
Memory is the Achilles' heel of LLM-based agents. Every model has a finite context window — a hard limit on how much information it can process in a single call. As conversations grow longer, as agents accumulate observations, and as multi-agent systems share state, memory management becomes one of the most critical architectural concerns. The patterns in this section address the full spectrum of memory challenges.
2.1 — Context Window Management Pattern
Strategies for Staying Within Token Limits
Intent
Implement a systematic strategy for managing the content of the LLM's context window, ensuring that the most relevant information is always present while staying within token limits and minimizing cost.
Motivation
The context window is the agent's working memory. Everything the agent knows in a given moment — the conversation history, tool results, system instructions, retrieved documents — must fit within this window. Exceeding it causes truncation errors; approaching it inflates costs and degrades performance (research has shown that LLMs attend less effectively to content in the middle of very long contexts — the so-called "lost in the middle" phenomenon).
Effective context window management is not simply about truncation. It requires a principled strategy for what to keep, what to compress, and what to retrieve on demand.
Implementation
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class MessagePriority(Enum):
CRITICAL = 0 # System prompt, current task — never evict
HIGH = 1 # Recent messages, active tool results
MEDIUM = 2 # Older conversation turns
LOW = 3 # Background context, cached summaries
@dataclass
class ManagedMessage:
message: "LLMMessage"
priority: MessagePriority
token_count: int
timestamp: float
class ContextWindowManager:
"""
Manages the agent's context window using a priority-based
eviction strategy with sliding window and summarization fallback.
"""
def __init__(self, llm_client: "LLMClient",
max_tokens: int = 128_000,
target_tokens: int = 100_000, # Leave headroom for response
summarizer: Optional["MemoryDistiller"] = None):
self.llm = llm_client
self.max_tokens = max_tokens
self.target_tokens = target_tokens
self.summarizer = summarizer
self._messages: list[ManagedMessage] = []
self._total_tokens: int = 0
def add(self, message: "LLMMessage",
priority: MessagePriority = MessagePriority.MEDIUM) -> None:
"""Add a message to the context, evicting lower-priority content if needed."""
token_count = self.llm.count_tokens(message.content)
import time
managed = ManagedMessage(
message=message,
priority=priority,
token_count=token_count,
timestamp=time.time()
)
self._messages.append(managed)
self._total_tokens += token_count
# Evict if necessary
if self._total_tokens > self.target_tokens:
self._evict()
def get_messages(self) -> list["LLMMessage"]:
"""Return the current context as a list of LLM messages."""
return [m.message for m in self._messages]
def get_token_count(self) -> int:
return self._total_tokens
def _evict(self) -> None:
"""
Evict messages using a priority + recency strategy.
Lower priority and older messages are evicted first.
"""
# Sort by eviction priority: lower priority first, then older first
evictable = [
m for m in self._messages
if m.priority not in (MessagePriority.CRITICAL,)
]
evictable.sort(key=lambda m: (m.priority.value * -1, m.timestamp))
while self._total_tokens > self.target_tokens and evictable:
to_evict = evictable.pop() # Evict lowest priority, oldest first
# Optionally summarize before evicting
if self.summarizer and to_evict.priority == MessagePriority.MEDIUM:
summary = self.summarizer.distill([to_evict.message])
self._replace_with_summary(to_evict, summary)
else:
self._messages.remove(to_evict)
self._total_tokens -= to_evict.token_count
def _replace_with_summary(self, original: ManagedMessage,
summary: str) -> None:
"""Replace a message with a compressed summary."""
from dataclasses import replace
summary_message = LLMMessage(
role="system",
content=f"[Summary of earlier context]: {summary}"
)
summary_tokens = self.llm.count_tokens(summary)
idx = self._messages.index(original)
self._messages[idx] = ManagedMessage(
message=summary_message,
priority=MessagePriority.LOW,
token_count=summary_tokens,
timestamp=original.timestamp
)
self._total_tokens = self._total_tokens - original.token_count + summary_tokens
2.2 — Memory Distillation Pattern
Compressing Long Histories into Summaries
Intent
Periodically compress the agent's conversation history or observation log into a compact summary that preserves essential information while dramatically reducing token consumption.
Motivation
In long-running agent sessions — customer support conversations, research assistants, coding sessions — the raw message history can grow to hundreds of thousands of tokens. Even with large context windows, this is expensive and slow. Memory distillation is the process of taking a long history and producing a shorter, semantically equivalent summary that can stand in for the original.
This is analogous to how humans remember: we do not replay every conversation verbatim; we retain the key facts, decisions, and outcomes.
Implementation
from dataclasses import dataclass
from typing import Optional
@dataclass
class DistillationResult:
summary: str
original_token_count: int
summary_token_count: int
compression_ratio: float
key_facts: list[str]
pending_actions: list[str]
class MemoryDistiller:
"""
Compresses conversation history into structured summaries,
preserving key facts, decisions, and pending actions.
"""
DISTILLATION_PROMPT = """
You are a memory distillation system. Given a conversation history,
produce a structured summary that preserves all information needed
to continue the conversation intelligently.
Your summary must include:
1. A concise narrative summary (2-4 sentences)
2. Key facts established in the conversation
3. Decisions made
4. Any pending actions or open questions
5. User preferences or constraints mentioned
Return as JSON with keys: summary, key_facts, decisions,
pending_actions, preferences.
"""
def __init__(self, llm_client: "LLMClient",
distillation_threshold: int = 50_000):
self.llm = llm_client
self.threshold = distillation_threshold
def should_distill(self, messages: list["LLMMessage"],
token_count: int) -> bool:
"""Determine if distillation is warranted."""
return token_count >= self.threshold
def distill(self, messages: list["LLMMessage"]) -> DistillationResult:
"""Compress a list of messages into a structured summary."""
import json
# Format messages for the distillation prompt
history_text = "\n".join([
f"{m.role.upper()}: {m.content}"
for m in messages
])
original_tokens = sum(
self.llm.count_tokens(m.content) for m in messages
)
response = self.llm.complete(
messages=[
LLMMessage(role="system", content=self.DISTILLATION_PROMPT),
LLMMessage(
role="user",
content=f"Conversation to distill:\n\n{history_text}"
)
]
)
data = json.loads(response.content)
summary_text = (
f"CONVERSATION SUMMARY:\n{data['summary']}\n\n"
f"KEY FACTS: {'; '.join(data['key_facts'])}\n"
f"DECISIONS: {'; '.join(data.get('decisions', []))}\n"
f"PENDING: {'; '.join(data.get('pending_actions', []))}"
)
summary_tokens = self.llm.count_tokens(summary_text)
return DistillationResult(
summary=summary_text,
original_token_count=original_tokens,
summary_token_count=summary_tokens,
compression_ratio=original_tokens / max(summary_tokens, 1),
key_facts=data.get("key_facts", []),
pending_actions=data.get("pending_actions", [])
)
def rolling_distill(self, messages: list["LLMMessage"],
keep_recent: int = 10) -> tuple[str, list["LLMMessage"]]:
"""
Distill all but the most recent N messages,
keeping recent context intact.
"""
if len(messages) <= keep_recent:
return "", messages
to_distill = messages[:-keep_recent]
recent = messages[-keep_recent:]
result = self.distill(to_distill)
return result.summary, recent
2.3 — Semantic Cache Pattern
Caching LLM Responses for Similar Queries
Intent
Cache LLM responses indexed by semantic similarity rather than exact string matching, so that queries that are semantically equivalent — even if worded differently — can be served from cache, reducing latency and cost.
Motivation
Traditional caching relies on exact key matching. This works well for deterministic systems but is nearly useless for LLM applications, where users phrase the same question in dozens of different ways. "What is the capital of France?", "Tell me the capital city of France", and "Which city is France's capital?" are semantically identical but lexically distinct.
Semantic caching uses embedding models to convert queries into dense vector representations, then performs approximate nearest-neighbor search to find cached responses that are semantically similar to the incoming query. If a sufficiently similar cached response exists, it is returned immediately without invoking the LLM.
Implementation
from dataclasses import dataclass, field
from typing import Optional
import time
import hashlib
@dataclass
class CacheEntry:
query: str
query_embedding: list[float]
response: str
model: str
created_at: float
hit_count: int = 0
ttl_seconds: Optional[float] = None
def is_expired(self) -> bool:
if self.ttl_seconds is None:
return False
return (time.time() - self.created_at) > self.ttl_seconds
class SemanticCache:
"""
Caches LLM responses indexed by semantic similarity.
Uses cosine similarity on embeddings for cache lookup.
"""
def __init__(self, embedding_client: "EmbeddingClient",
similarity_threshold: float = 0.92,
max_entries: int = 10_000,
default_ttl: Optional[float] = 3600):
self.embedder = embedding_client
self.threshold = similarity_threshold
self.max_entries = max_entries
self.default_ttl = default_ttl
self._cache: list[CacheEntry] = []
self._stats = {"hits": 0, "misses": 0}
def get(self, query: str) -> Optional[str]:
"""
Look up a cached response for a semantically similar query.
Returns None if no sufficiently similar cached response exists.
"""
query_embedding = self.embedder.embed(query)
best_match: Optional[CacheEntry] = None
best_similarity = 0.0
for entry in self._cache:
if entry.is_expired():
continue
similarity = self._cosine_similarity(query_embedding,
entry.query_embedding)
if similarity > best_similarity:
best_similarity = similarity
best_match = entry
if best_match and best_similarity >= self.threshold:
best_match.hit_count += 1
self._stats["hits"] += 1
return best_match.response
self._stats["misses"] += 1
return None
def set(self, query: str, response: str, model: str,
ttl: Optional[float] = None) -> None:
"""Cache a query-response pair."""
# Evict expired entries and enforce size limit
self._evict()
query_embedding = self.embedder.embed(query)
entry = CacheEntry(
query=query,
query_embedding=query_embedding,
response=response,
model=model,
created_at=time.time(),
ttl_seconds=ttl or self.default_ttl
)
self._cache.append(entry)
def get_stats(self) -> dict:
total = self._stats["hits"] + self._stats["misses"]
hit_rate = self._stats["hits"] / total if total > 0 else 0.0
return {**self._stats, "hit_rate": hit_rate, "cache_size": len(self._cache)}
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
"""Compute cosine similarity between two embedding vectors."""
import math
dot = sum(x * y for x, y in zip(a, b))
norm_a = math.sqrt(sum(x ** 2 for x in a))
norm_b = math.sqrt(sum(x ** 2 for x in b))
if norm_a == 0 or norm_b == 0:
return 0.0
return dot / (norm_a * norm_b)
def _evict(self) -> None:
"""Remove expired entries and enforce max size."""
self._cache = [e for e in self._cache if not e.is_expired()]
if len(self._cache) >= self.max_entries:
# Evict least recently used (lowest hit count, oldest)
self._cache.sort(key=lambda e: (e.hit_count, e.created_at))
self._cache = self._cache[self.max_entries // 4:] # Evict 25%
class CachedLLMClient:
"""
Decorator that wraps an LLMClient with semantic caching.
"""
def __init__(self, llm_client: "LLMClient", cache: SemanticCache,
cacheable_roles: set[str] = None):
self.llm = llm_client
self.cache = cache
self.cacheable_roles = cacheable_roles or {"user"}
def complete(self, messages: list["LLMMessage"], **kwargs) -> "LLMResponse":
# Only cache single-turn user queries
last_user_msg = next(
(m for m in reversed(messages) if m.role in self.cacheable_roles),
None
)
if last_user_msg:
cached = self.cache.get(last_user_msg.content)
if cached:
return LLMResponse(
content=cached,
model="cache",
input_tokens=0,
output_tokens=0,
finish_reason="cache_hit"
)
# Cache miss — invoke the real LLM
response = self.llm.complete(messages, **kwargs)
if last_user_msg and response.finish_reason == "stop":
self.cache.set(
query=last_user_msg.content,
response=response.content,
model=response.model
)
return response
2.4 — Shared Blackboard Pattern
Multi-Agent Shared Knowledge Store
Intent
Provide a shared, structured knowledge store — the blackboard — that multiple agents can read from and write to, enabling asynchronous, decoupled collaboration without direct agent-to-agent communication.
Motivation
In multi-agent systems, agents frequently need to share information: one agent discovers a fact, another needs to act on it. The naive approach is direct agent-to-agent messaging, but this creates tight coupling and makes the system brittle. The Blackboard pattern, borrowed from AI research of the 1970s (the HEARSAY-II speech understanding system), provides a more elegant solution.
The blackboard is a shared data structure — essentially a structured whiteboard — that any agent can read from or write to. Agents are decoupled: they do not need to know about each other, only about the blackboard. A Controllercomponent monitors the blackboard and decides which agent to activate next based on the current state of knowledge.
Implementation
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
from enum import Enum
import threading
import time
import uuid
class EntryStatus(Enum):
PROPOSED = "proposed"
VALIDATED = "validated"
REJECTED = "rejected"
SUPERSEDED = "superseded"
@dataclass
class BlackboardEntry:
entry_id: str
key: str
value: Any
author_agent: str
confidence: float # 0.0 to 1.0
status: EntryStatus = EntryStatus.PROPOSED
timestamp: float = field(default_factory=time.time)
metadata: dict = field(default_factory=dict)
class Blackboard:
"""
Thread-safe shared knowledge store for multi-agent systems.
Supports structured reads, writes, conflict resolution,
and change notification.
"""
def __init__(self):
self._entries: dict[str, list[BlackboardEntry]] = {}
self._lock = threading.RLock()
self._subscribers: dict[str, list[Callable]] = {}
def write(self, key: str, value: Any, author: str,
confidence: float = 1.0, metadata: dict = None) -> str:
"""Write a new entry to the blackboard."""
with self._lock:
entry = BlackboardEntry(
entry_id=str(uuid.uuid4()),
key=key,
value=value,
author_agent=author,
confidence=confidence,
metadata=metadata or {}
)
if key not in self._entries:
self._entries[key] = []
self._entries[key].append(entry)
# Notify subscribers
self._notify(key, entry)
return entry.entry_id
def read(self, key: str,
min_confidence: float = 0.0) -> Optional[BlackboardEntry]:
"""
Read the highest-confidence validated entry for a key.
"""
with self._lock:
entries = self._entries.get(key, [])
valid = [
e for e in entries
if e.status == EntryStatus.VALIDATED
and e.confidence >= min_confidence
]
if not valid:
# Fall back to proposed entries if no validated ones exist
valid = [
e for e in entries
if e.status == EntryStatus.PROPOSED
and e.confidence >= min_confidence
]
if not valid:
return None
return max(valid, key=lambda e: (e.confidence, e.timestamp))
def read_all(self, key: str) -> list[BlackboardEntry]:
"""Read all entries for a key (useful for consensus checking)."""
with self._lock:
return list(self._entries.get(key, []))
def validate(self, entry_id: str) -> None:
"""Mark an entry as validated (typically by a supervisor agent)."""
with self._lock:
for entries in self._entries.values():
for entry in entries:
if entry.entry_id == entry_id:
entry.status = EntryStatus.VALIDATED
return
def reject(self, entry_id: str, reason: str = "") -> None:
"""Mark an entry as rejected."""
with self._lock:
for entries in self._entries.values():
for entry in entries:
if entry.entry_id == entry_id:
entry.status = EntryStatus.REJECTED
entry.metadata["rejection_reason"] = reason
return
def subscribe(self, key: str, callback: Callable) -> None:
"""Subscribe to changes on a specific key."""
with self._lock:
if key not in self._subscribers:
self._subscribers[key] = []
self._subscribers[key].append(callback)
def snapshot(self) -> dict[str, Any]:
"""Return the current best-known state of the blackboard."""
with self._lock:
return {
key: self.read(key).value
for key in self._entries
if self.read(key) is not None
}
def _notify(self, key: str, entry: BlackboardEntry) -> None:
for callback in self._subscribers.get(key, []):
try:
callback(key, entry)
except Exception:
pass # Subscribers must not crash the blackboard
CHAPTER 3 — Multi-Agent Coordination Patterns
When a single agent is not enough — when tasks are too complex, too broad, or require specialized expertise — you need multiple agents working together. But coordination is hard. These patterns provide proven strategies for organizing agent collaboration, from simple orchestration to sophisticated debate and consensus mechanisms.
3.1 — Orchestrator-Worker Pattern
Central Coordinator with Specialized Workers
Intent
Establish a central orchestrator agent that decomposes tasks, delegates subtasks to specialized worker agents, collects their results, and synthesizes a final output. Workers are focused, efficient, and unaware of the broader task context.
Motivation
The division of labor is one of humanity's most powerful productivity innovations. The same principle applies to AI agents. A single general-purpose agent attempting to research a topic, write code, run tests, and generate a report will be less effective than a team of specialized agents — a Researcher, a Coder, a Tester, and a Writer — coordinated by an Orchestrator.
The Orchestrator-Worker pattern is the most common multi-agent architecture pattern and the foundation upon which more sophisticated patterns are built.
Implementation
from dataclasses import dataclass, field
from typing import Optional
from abc import ABC, abstractmethod
import asyncio
@dataclass
class WorkerTask:
task_id: str
worker_type: str
instruction: str
context: dict = field(default_factory=dict)
priority: int = 0
@dataclass
class WorkerResult:
task_id: str
worker_type: str
output: str
success: bool
error: Optional[str] = None
metadata: dict = field(default_factory=dict)
class WorkerAgent(ABC):
"""Base class for all worker agents."""
def __init__(self, agent_id: str, llm_client: "LLMClient",
tool_registry: "ToolRegistry"):
self.agent_id = agent_id
self.llm = llm_client
self.tools = tool_registry
@property
@abstractmethod
def worker_type(self) -> str:
pass
@property
@abstractmethod
def system_prompt(self) -> str:
pass
async def execute(self, task: WorkerTask) -> WorkerResult:
try:
result = await self._execute_impl(task)
return WorkerResult(
task_id=task.task_id,
worker_type=self.worker_type,
output=result,
success=True
)
except Exception as e:
return WorkerResult(
task_id=task.task_id,
worker_type=self.worker_type,
output="",
success=False,
error=str(e)
)
@abstractmethod
async def _execute_impl(self, task: WorkerTask) -> str:
pass
class ResearchWorker(WorkerAgent):
@property
def worker_type(self) -> str:
return "researcher"
@property
def system_prompt(self) -> str:
return """You are a research specialist. Your job is to gather
accurate, comprehensive information on the given topic using
available search tools. Always cite your sources."""
async def _execute_impl(self, task: WorkerTask) -> str:
# Use ReAct loop internally for research tasks
agent = ReActAgent(
llm_client=self.llm,
tool_registry=self.tools,
system_prompt=self.system_prompt
)
return agent.run(task.instruction)
class OrchestratorAgent:
"""
Central coordinator that decomposes tasks, delegates to workers,
and synthesizes results.
"""
ORCHESTRATOR_PROMPT = """
You are an orchestration agent. Your role is to:
1. Analyze the user's request
2. Decompose it into subtasks for specialized workers
3. Synthesize worker results into a coherent final answer
Available worker types: {worker_types}
When decomposing tasks, return a JSON array of task objects with:
- worker_type: which worker to use
- instruction: specific instruction for that worker
- depends_on: list of task indices this task depends on
"""
def __init__(self, llm_client: "LLMClient",
workers: dict[str, WorkerAgent]):
self.llm = llm_client
self.workers = workers
async def run(self, user_request: str) -> str:
import json
import uuid
# Step 1: Decompose into worker tasks
decomposition_prompt = self.ORCHESTRATOR_PROMPT.format(
worker_types=list(self.workers.keys())
)
decomposition_response = self.llm.complete(
messages=[
LLMMessage(role="system", content=decomposition_prompt),
LLMMessage(role="user", content=user_request)
]
)
task_specs = json.loads(decomposition_response.content)
tasks = [
WorkerTask(
task_id=str(uuid.uuid4()),
worker_type=spec["worker_type"],
instruction=spec["instruction"]
)
for spec in task_specs
]
# Step 2: Execute tasks (respecting dependencies)
results = await self._execute_with_dependencies(tasks, task_specs)
# Step 3: Synthesize results
synthesis_context = "\n\n".join([
f"[{r.worker_type.upper()} RESULT]:\n{r.output}"
for r in results if r.success
])
final_response = self.llm.complete(
messages=[
LLMMessage(
role="system",
content="Synthesize the following worker outputs into a "
"coherent, well-structured final answer."
),
LLMMessage(
role="user",
content=f"Original request: {user_request}\n\n"
f"Worker outputs:\n{synthesis_context}"
)
]
)
return final_response.content
async def _execute_with_dependencies(
self, tasks: list[WorkerTask], specs: list[dict]
) -> list[WorkerResult]:
results = []
completed_indices = set()
# Simple topological execution
remaining = list(enumerate(zip(tasks, specs)))
while remaining:
# Find tasks whose dependencies are all satisfied
ready = [
(i, task, spec)
for i, (task, spec) in remaining
if all(dep in completed_indices
for dep in spec.get("depends_on", []))
]
if not ready:
break # Circular dependency or all blocked
# Execute ready tasks concurrently
batch_results = await asyncio.gather(*[
self.workers[task.worker_type].execute(task)
for _, task, _ in ready
if task.worker_type in self.workers
])
for (i, _, _), result in zip(ready, batch_results):
results.append(result)
completed_indices.add(i)
remaining = [(i, t, s) for i, (t, s) in remaining
if i not in completed_indices]
return results
3.2 — Supervisor Pattern
A Meta-Agent That Monitors and Corrects Other Agents
Intent
Deploy a supervisor agent that monitors the outputs of worker agents in real time, detects errors, inconsistencies, or policy violations, and intervenes to correct or redirect agent behavior before errors propagate downstream.
Motivation
In complex multi-agent pipelines, errors compound. A worker agent that produces a subtly incorrect result can cause downstream agents to build on a flawed foundation, leading to catastrophic final outputs. The Supervisor pattern introduces a meta-level agent whose sole responsibility is quality assurance and error correction.
The supervisor does not do the work — it watches the work being done and intervenes when necessary. This is analogous to a code reviewer in a software team: they do not write the code, but they prevent bad code from reaching production.
Implementation
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class SupervisorAction(Enum):
APPROVE = "approve"
REJECT = "reject"
REVISE = "revise"
ESCALATE = "escalate"
@dataclass
class SupervisorDecision:
action: SupervisorAction
reason: str
revised_output: Optional[str] = None
escalation_message: Optional[str] = None
class SupervisorAgent:
"""
Meta-agent that monitors worker outputs and intervenes when necessary.
"""
SUPERVISOR_PROMPT = """
You are a supervisor agent responsible for quality assurance.
Review the worker's output against these criteria:
1. ACCURACY: Is the information factually correct?
2. COMPLETENESS: Does it fully address the task?
3. SAFETY: Does it comply with safety and policy guidelines?
4. CONSISTENCY: Is it consistent with established facts in the context?
5. FORMAT: Is it in the expected format?
Return a JSON object with:
- action: one of "approve", "reject", "revise", "escalate"
- reason: explanation of your decision
- revised_output: (if action is "revise") the corrected output
- escalation_message: (if action is "escalate") message for human review
"""
def __init__(self, llm_client: "LLMClient",
max_revision_attempts: int = 2):
self.llm = llm_client
self.max_revisions = max_revision_attempts
def supervise(self, task: "WorkerTask",
output: str,
context: dict = None) -> tuple[str, SupervisorDecision]:
"""
Review a worker's output. Returns the final (possibly revised)
output and the supervisor's decision.
"""
import json
for attempt in range(self.max_revisions + 1):
review_prompt = f"""
Task: {task.instruction}
Worker Type: {task.worker_type}
Context: {context or {}}
Worker Output:
{output}
Revision Attempt: {attempt + 1} of {self.max_revisions + 1}
"""
response = self.llm.complete(
messages=[
LLMMessage(role="system", content=self.SUPERVISOR_PROMPT),
LLMMessage(role="user", content=review_prompt)
]
)
data = json.loads(response.content)
decision = SupervisorDecision(
action=SupervisorAction(data["action"]),
reason=data["reason"],
revised_output=data.get("revised_output"),
escalation_message=data.get("escalation_message")
)
if decision.action == SupervisorAction.APPROVE:
return output, decision
elif decision.action == SupervisorAction.REVISE:
if decision.revised_output:
output = decision.revised_output
# Continue loop to re-review the revision
else:
break
elif decision.action in (SupervisorAction.REJECT,
SupervisorAction.ESCALATE):
return output, decision
return output, decision
3.3 — Handoff Pattern
Clean Agent-to-Agent Task Delegation
Intent
Define a structured protocol for transferring task ownership from one agent to another, ensuring that all necessary context, state, and instructions are passed cleanly, and that the receiving agent has everything it needs to continue without ambiguity.
Motivation
In multi-agent workflows, tasks often need to transition between agents — from a triage agent to a specialist, from a planning agent to an execution agent, from a customer-facing agent to a backend processing agent. Without a structured handoff protocol, critical context is lost, agents make incorrect assumptions, and the user experience degrades.
The Handoff pattern defines a handoff packet — a structured data object that carries all the information the receiving agent needs — and a handoff protocol that ensures the transfer is acknowledged and the sending agent's responsibilities are cleanly terminated.
Implementation
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
import time
import uuid
class HandoffStatus(Enum):
PENDING = "pending"
ACCEPTED = "accepted"
REJECTED = "rejected"
COMPLETED = "completed"
@dataclass
class HandoffPacket:
"""
The structured data object transferred between agents.
Contains everything the receiving agent needs to continue the task.
"""
handoff_id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
task_description: str = ""
conversation_history: list[dict] = field(default_factory=list)
established_facts: dict[str, Any] = field(default_factory=dict)
completed_steps: list[str] = field(default_factory=list)
remaining_steps: list[str] = field(default_factory=list)
constraints: list[str] = field(default_factory=list)
user_preferences: dict[str, Any] = field(default_factory=dict)
artifacts: dict[str, Any] = field(default_factory=dict)
priority: int = 0
deadline: Optional[float] = None
status: HandoffStatus = HandoffStatus.PENDING
created_at: float = field(default_factory=time.time)
metadata: dict = field(default_factory=dict)
class HandoffCoordinator:
"""
Manages the handoff protocol between agents.
Ensures packets are delivered, acknowledged, and tracked.
"""
def __init__(self):
self._pending_handoffs: dict[str, HandoffPacket] = {}
self._agent_registry: dict[str, "WorkerAgent"] = {}
def register_agent(self, agent_type: str, agent: "WorkerAgent") -> None:
self._agent_registry[agent_type] = agent
def initiate_handoff(self, packet: HandoffPacket) -> str:
"""Initiate a handoff. Returns the handoff ID."""
if packet.to_agent not in self._agent_registry:
raise ValueError(f"Target agent '{packet.to_agent}' not registered.")
self._pending_handoffs[packet.handoff_id] = packet
return packet.handoff_id
async def execute_handoff(self, handoff_id: str) -> "WorkerResult":
"""Execute a pending handoff."""
packet = self._pending_handoffs.get(handoff_id)
if not packet:
raise ValueError(f"Handoff '{handoff_id}' not found.")
target_agent = self._agent_registry[packet.to_agent]
# Convert handoff packet to a worker task
task = WorkerTask(
task_id=handoff_id,
worker_type=packet.to_agent,
instruction=self._build_instruction_from_packet(packet),
context={
"established_facts": packet.established_facts,
"completed_steps": packet.completed_steps,
"constraints": packet.constraints,
"user_preferences": packet.user_preferences,
"artifacts": packet.artifacts
}
)
packet.status = HandoffStatus.ACCEPTED
result = await target_agent.execute(task)
packet.status = (HandoffStatus.COMPLETED if result.success
else HandoffStatus.REJECTED)
del self._pending_handoffs[handoff_id]
return result
def _build_instruction_from_packet(self, packet: HandoffPacket) -> str:
return f"""
HANDOFF FROM: {packet.from_agent}
TASK: {packet.task_description}
CONTEXT:
- Completed steps: {packet.completed_steps}
- Remaining steps: {packet.remaining_steps}
- Established facts: {packet.established_facts}
- Constraints: {packet.constraints}
Please continue from where the previous agent left off,
completing the remaining steps.
"""
3.4 — Consensus Pattern
Multiple Agents Vote on an Answer for Reliability
Intent
Have multiple independent agents (or multiple independent runs of the same agent) produce answers to the same question, then aggregate their responses through a voting or confidence-weighted consensus mechanism to produce a more reliable final answer.
Motivation
LLMs are stochastic. The same prompt, run multiple times, can produce different answers — and some of those answers may be wrong. For high-stakes questions where accuracy is paramount, relying on a single model run is risky. The Consensus pattern mitigates this by exploiting the statistical property that, across multiple independent runs, correct answers tend to cluster while incorrect answers tend to diverge.
This is the AI equivalent of a jury system: no single juror's opinion is definitive, but the collective judgment of twelve independent minds is far more reliable.
Implementation
from dataclasses import dataclass, field
from typing import Any
from collections import Counter
import asyncio
@dataclass
class ConsensusResult:
final_answer: str
confidence: float
vote_distribution: dict[str, int]
dissenting_answers: list[str]
reached_consensus: bool
consensus_threshold: float
class ConsensusAgent:
"""
Runs multiple independent agent instances and aggregates
their answers through majority voting.
"""
def __init__(self, llm_client: "LLMClient",
num_agents: int = 5,
consensus_threshold: float = 0.6,
temperature_spread: list[float] = None):
self.llm = llm_client
self.num_agents = num_agents
self.consensus_threshold = consensus_threshold
# Use varied temperatures to encourage diversity
self.temperatures = temperature_spread or [
0.3, 0.5, 0.7, 0.9, 1.0
][:num_agents]
async def run(self, question: str,
system_prompt: str = "") -> ConsensusResult:
"""
Run multiple agents in parallel and aggregate their answers.
"""
# Run all agents concurrently with different temperatures
tasks = [
self._single_agent_run(question, system_prompt, temp)
for temp in self.temperatures[:self.num_agents]
]
answers = await asyncio.gather(*tasks)
# Normalize answers for comparison
normalized = [self._normalize_answer(a) for a in answers]
# Count votes
vote_counts = Counter(normalized)
total_votes = len(normalized)
# Find the majority answer
winner, winner_votes = vote_counts.most_common(1)[0]
confidence = winner_votes / total_votes
reached_consensus = confidence >= self.consensus_threshold
# Map normalized winner back to original answer
final_answer = next(
a for a, n in zip(answers, normalized) if n == winner
)
dissenting = [
a for a, n in zip(answers, normalized) if n != winner
]
return ConsensusResult(
final_answer=final_answer,
confidence=confidence,
vote_distribution=dict(vote_counts),
dissenting_answers=dissenting,
reached_consensus=reached_consensus,
consensus_threshold=self.consensus_threshold
)
async def _single_agent_run(self, question: str,
system_prompt: str,
temperature: float) -> str:
response = self.llm.complete(
messages=[
LLMMessage(role="system", content=system_prompt),
LLMMessage(role="user", content=question)
],
temperature=temperature
)
return response.content
def _normalize_answer(self, answer: str) -> str:
"""
Normalize answers for comparison.
In production, use semantic similarity rather than string matching.
"""
return answer.strip().lower()
3.5 — Debate Pattern
Agents Argue Opposing Positions to Improve Reasoning Quality
Intent
Assign opposing positions to multiple agents and have them argue their cases in a structured debate, with a judge agentevaluating the arguments and rendering a final, well-reasoned verdict. The adversarial dynamic forces agents to surface weaknesses in each other's reasoning.
Motivation
The Consensus pattern assumes that correct answers cluster. But for complex, nuanced questions — ethical dilemmas, strategic decisions, ambiguous interpretations — there may not be a clear majority answer. What is needed is not a vote, but a rigorous examination of the evidence and arguments on all sides.
The Debate pattern is inspired by adversarial legal proceedings and Socratic dialogue. By forcing agents to argue opposing positions, the pattern ensures that the strongest counterarguments are surfaced and addressed, leading to more robust and defensible conclusions.
Research by Du et al. (2023) in "Improving Factuality and Reasoning in Language Models through Multiagent Debate"demonstrated that debate-based approaches can significantly improve factual accuracy and reasoning quality over single-agent and consensus approaches.
Implementation
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class DebateArgument:
agent_role: str # "proponent" | "opponent"
round_number: int
argument: str
key_points: list[str]
rebuttals: list[str]
@dataclass
class DebateResult:
question: str
proponent_position: str
opponent_position: str
rounds: list[DebateArgument]
judge_verdict: str
winning_position: str
confidence: float
key_insights: list[str]
class DebateOrchestrator:
"""
Orchestrates a structured debate between two agent positions,
with a judge rendering a final verdict.
"""
PROPONENT_PROMPT = """
You are arguing IN FAVOR of the following position: {position}
In each round:
1. Present your strongest arguments with evidence
2. Directly rebut the opponent's previous arguments
3. Acknowledge any valid points the opponent made
4. Be intellectually honest — do not make claims you cannot support
Be rigorous, specific, and persuasive.
"""
OPPONENT_PROMPT = """
You are arguing AGAINST the following position: {position}
(i.e., you are arguing that it is false, harmful, or insufficient)
In each round:
1. Present your strongest counterarguments with evidence
2. Directly rebut the proponent's previous arguments
3. Acknowledge any valid points the proponent made
4. Be intellectually honest — do not make claims you cannot support
Be rigorous, specific, and persuasive.
"""
JUDGE_PROMPT = """
You are an impartial judge evaluating a structured debate.
Your role is to:
1. Evaluate the quality and validity of arguments from both sides
2. Identify which position was better supported by evidence and logic
3. Note the strongest insights from each side
4. Render a nuanced verdict that acknowledges complexity
You are NOT simply picking a "winner" — you are synthesizing the
best thinking from both sides into a well-reasoned conclusion.
Return JSON with:
- verdict: your final conclusion
- winning_position: "proponent", "opponent", or "neither"
- confidence: 0.0 to 1.0
- key_insights: list of the most valuable insights from the debate
- nuances: important caveats or conditions on your verdict
"""
def __init__(self, llm_client: "LLMClient", num_rounds: int = 3):
self.llm = llm_client
self.num_rounds = num_rounds
def debate(self, question: str,
proponent_position: str,
opponent_position: str) -> DebateResult:
import json
rounds: list[DebateArgument] = []
debate_history = []
for round_num in range(1, self.num_rounds + 1):
# Proponent's turn
proponent_context = self._build_debate_context(
question, debate_history, "proponent", round_num
)
proponent_response = self.llm.complete(
messages=[
LLMMessage(
role="system",
content=self.PROPONENT_PROMPT.format(
position=proponent_position
)
),
LLMMessage(role="user", content=proponent_context)
],
temperature=0.8
)
proponent_arg = DebateArgument(
agent_role="proponent",
round_number=round_num,
argument=proponent_response.content,
key_points=self._extract_key_points(proponent_response.content),
rebuttals=[]
)
rounds.append(proponent_arg)
debate_history.append(proponent_arg)
# Opponent's turn
opponent_context = self._build_debate_context(
question, debate_history, "opponent", round_num
)
opponent_response = self.llm.complete(
messages=[
LLMMessage(
role="system",
content=self.OPPONENT_PROMPT.format(
position=proponent_position
)
),
LLMMessage(role="user", content=opponent_context)
],
temperature=0.8
)
opponent_arg = DebateArgument(
agent_role="opponent",
round_number=round_num,
argument=opponent_response.content,
key_points=self._extract_key_points(opponent_response.content),
rebuttals=[]
)
rounds.append(opponent_arg)
debate_history.append(opponent_arg)
# Judge renders verdict
full_debate_text = self._format_full_debate(
question, proponent_position, opponent_position, rounds
)
judge_response = self.llm.complete(
messages=[
LLMMessage(role="system", content=self.JUDGE_PROMPT),
LLMMessage(role="user", content=full_debate_text)
],
temperature=0.3 # Low temperature for consistent judgment
)
verdict_data = json.loads(judge_response.content)
return DebateResult(
question=question,
proponent_position=proponent_position,
opponent_position=opponent_position,
rounds=rounds,
judge_verdict=verdict_data["verdict"],
winning_position=verdict_data["winning_position"],
confidence=verdict_data["confidence"],
key_insights=verdict_data.get("key_insights", [])
)
def _build_debate_context(self, question: str,
history: list[DebateArgument],
current_role: str,
round_num: int) -> str:
context = f"Question under debate: {question}\n\nRound {round_num}\n\n"
for arg in history:
context += f"[{arg.agent_role.upper()} - Round {arg.round_number}]:\n"
context += f"{arg.argument}\n\n"
context += f"Now present your arguments as the {current_role}:"
return context
def _extract_key_points(self, argument: str) -> list[str]:
# Simplified — in production, use structured output
lines = argument.split("\n")
return [l.strip() for l in lines if l.strip().startswith(("-", "•", "*"))]
def _format_full_debate(self, question: str, pro_pos: str,
opp_pos: str, rounds: list[DebateArgument]) -> str:
text = f"DEBATE QUESTION: {question}\n"
text += f"PROPONENT POSITION: {pro_pos}\n"
text += f"OPPONENT POSITION: {opp_pos}\n\n"
text += "=== DEBATE TRANSCRIPT ===\n\n"
for arg in rounds:
text += f"[{arg.agent_role.upper()} - Round {arg.round_number}]:\n"
text += f"{arg.argument}\n\n"
return text
CHAPTER 4 — Reliability and Safety Patterns
Reliability and safety are not afterthoughts in agentic AI — they are first-class architectural concerns. Agents that can take actions in the world — sending emails, executing code, modifying databases, making API calls — can cause real harm if they malfunction. The patterns in this section are your safety net.
4.1 — Guardrail Pattern
Input/Output Validation Wrappers Around LLM Calls
Intent
Wrap LLM calls with validation layers that screen inputs for policy violations before they reach the model, and screen outputs for harmful, incorrect, or non-compliant content before they are returned to the caller.
Motivation
LLMs can be manipulated through adversarial prompts (prompt injection), can produce harmful content (jailbreaks), can leak sensitive information, and can generate outputs that violate regulatory requirements. Guardrails are the defensive perimeter around your LLM calls — they do not prevent all problems, but they catch the most common and most dangerous ones.
The Guardrail pattern is analogous to input validation in web security: you never trust user input; you always validate it before processing.
Implementation
from dataclasses import dataclass, field
from typing import Callable, Optional
from abc import ABC, abstractmethod
from enum import Enum
class GuardrailAction(Enum):
ALLOW = "allow"
BLOCK = "block"
MODIFY = "modify"
FLAG = "flag"
@dataclass
class GuardrailResult:
action: GuardrailAction
original_content: str
modified_content: Optional[str] = None
reason: Optional[str] = None
guardrail_name: str = ""
severity: str = "low" # low | medium | high | critical
class Guardrail(ABC):
"""Abstract base class for all guardrails."""
@property
@abstractmethod
def name(self) -> str:
pass
@abstractmethod
def check(self, content: str, context: dict = None) -> GuardrailResult:
pass
class PIIDetectionGuardrail(Guardrail):
"""Detects and redacts Personally Identifiable Information."""
PII_PATTERNS = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b"
}
@property
def name(self) -> str:
return "pii_detection"
def check(self, content: str, context: dict = None) -> GuardrailResult:
import re
modified = content
detected_types = []
for pii_type, pattern in self.PII_PATTERNS.items():
matches = re.findall(pattern, content)
if matches:
detected_types.append(pii_type)
modified = re.sub(pattern, f"[REDACTED_{pii_type.upper()}]", modified)
if detected_types:
return GuardrailResult(
action=GuardrailAction.MODIFY,
original_content=content,
modified_content=modified,
reason=f"PII detected and redacted: {detected_types}",
guardrail_name=self.name,
severity="high"
)
return GuardrailResult(
action=GuardrailAction.ALLOW,
original_content=content,
guardrail_name=self.name
)
class PromptInjectionGuardrail(Guardrail):
"""Detects common prompt injection patterns."""
INJECTION_PATTERNS = [
"ignore previous instructions",
"ignore all previous",
"disregard your instructions",
"you are now",
"pretend you are",
"act as if you have no restrictions",
"jailbreak",
"dan mode",
"developer mode"
]
@property
def name(self) -> str:
return "prompt_injection"
def check(self, content: str, context: dict = None) -> GuardrailResult:
content_lower = content.lower()
detected = [
pattern for pattern in self.INJECTION_PATTERNS
if pattern in content_lower
]
if detected:
return GuardrailResult(
action=GuardrailAction.BLOCK,
original_content=content,
reason=f"Potential prompt injection detected: {detected}",
guardrail_name=self.name,
severity="critical"
)
return GuardrailResult(
action=GuardrailAction.ALLOW,
original_content=content,
guardrail_name=self.name
)
class GuardrailPipeline:
"""
Runs content through a sequence of guardrails.
Stops at the first BLOCK; applies all MODIFYs.
"""
def __init__(self, guardrails: list[Guardrail]):
self.guardrails = guardrails
def check_input(self, content: str,
context: dict = None) -> tuple[str, list[GuardrailResult]]:
"""
Run input through all guardrails.
Returns (processed_content, list_of_results).
Raises ValueError if content is blocked.
"""
results = []
current_content = content
for guardrail in self.guardrails:
result = guardrail.check(current_content, context)
results.append(result)
if result.action == GuardrailAction.BLOCK:
raise ValueError(
f"Content blocked by guardrail '{guardrail.name}': "
f"{result.reason}"
)
elif result.action == GuardrailAction.MODIFY:
current_content = result.modified_content or current_content
return current_content, results
def check_output(self, content: str,
context: dict = None) -> tuple[str, list[GuardrailResult]]:
"""Same as check_input but applied to LLM outputs."""
return self.check_input(content, context)
class GuardedLLMClient:
"""LLM client wrapper with integrated guardrail pipeline."""
def __init__(self, llm_client: "LLMClient",
input_guardrails: GuardrailPipeline,
output_guardrails: GuardrailPipeline):
self.llm = llm_client
self.input_pipeline = input_guardrails
self.output_pipeline = output_guardrails
def complete(self, messages: list["LLMMessage"], **kwargs) -> "LLMResponse":
# Screen the last user message
last_user = next(
(m for m in reversed(messages) if m.role == "user"), None
)
if last_user:
screened_content, _ = self.input_pipeline.check_input(
last_user.content
)
last_user.content = screened_content
# Call the LLM
response = self.llm.complete(messages, **kwargs)
# Screen the output
screened_output, _ = self.output_pipeline.check_output(
response.content
)
response.content = screened_output
return response
4.2 — Circuit Breaker Pattern
Preventing Cascading Failures in Agent Pipelines
Intent
Wrap calls to external services (LLM APIs, tools, databases) with a circuit breaker that monitors failure rates and, when failures exceed a threshold, "opens" the circuit to prevent further calls, allowing the system to fail fast and recover gracefully rather than cascading into total failure.
Motivation
In distributed systems, a slow or failing dependency can cause cascading failures: threads pile up waiting for responses, queues fill, and the entire system grinds to a halt. The Circuit Breaker pattern, popularized by Michael Nygard in Release It!, prevents this by acting as an automatic switch that cuts off calls to a failing service.
In agentic systems, this is particularly important because LLM API calls are expensive, slow, and subject to rate limiting, quota exhaustion, and service outages. Without a circuit breaker, a single failing LLM provider can cause your entire agent pipeline to hang.
Implementation
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
from enum import Enum
import time
import threading
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing — reject all calls
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Failures before opening
success_threshold: int = 2 # Successes to close from half-open
timeout_seconds: float = 60.0 # Time before trying half-open
call_timeout_seconds: float = 30.0 # Max time for a single call
class CircuitBreaker:
"""
Monitors calls to a service and opens the circuit when
failure rate exceeds the threshold.
"""
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: Optional[float] = None
self._lock = threading.Lock()
@property
def state(self) -> CircuitState:
with self._lock:
if (self._state == CircuitState.OPEN and
self._last_failure_time and
time.time() - self._last_failure_time >
self.config.timeout_seconds):
self._state = CircuitState.HALF_OPEN
self._success_count = 0
return self._state
def call(self, func: Callable, *args, **kwargs) -> Any:
"""
Execute a function through the circuit breaker.
Raises CircuitOpenError if the circuit is open.
"""
state = self.state
if state == CircuitState.OPEN:
raise CircuitOpenError(
f"Circuit breaker '{self.name}' is OPEN. "
f"Service unavailable. Retry after "
f"{self.config.timeout_seconds}s."
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self) -> None:
with self._lock:
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.config.success_threshold:
self._state = CircuitState.CLOSED
self._failure_count = 0
elif self._state == CircuitState.CLOSED:
self._failure_count = max(0, self._failure_count - 1)
def _on_failure(self) -> None:
with self._lock:
self._failure_count += 1
self._last_failure_time = time.time()
if self._failure_count >= self.config.failure_threshold:
self._state = CircuitState.OPEN
def get_metrics(self) -> dict:
return {
"name": self.name,
"state": self.state.value,
"failure_count": self._failure_count,
"last_failure": self._last_failure_time
}
class CircuitOpenError(Exception):
pass
4.3 — Retry with Backoff Pattern
Handling Transient LLM API Failures
Intent
Automatically retry failed LLM API calls with an exponentially increasing delay between attempts, preventing thundering-herd problems and giving transient failures time to resolve.
Implementation
from dataclasses import dataclass
from typing import Callable, Any, Type
import time
import random
import logging
logger = logging.getLogger(__name__)
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay_seconds: float = 1.0
max_delay_seconds: float = 60.0
exponential_base: float = 2.0
jitter: bool = True # Add randomness to prevent thundering herd
retryable_exceptions: tuple = (
ConnectionError,
TimeoutError,
# Add provider-specific rate limit exceptions here
)
class RetryWithBackoff:
"""
Decorator/wrapper that retries a function with exponential backoff.
"""
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
def execute(self, func: Callable, *args, **kwargs) -> Any:
"""Execute a function with retry logic."""
last_exception = None
for attempt in range(1, self.config.max_attempts + 1):
try:
return func(*args, **kwargs)
except self.config.retryable_exceptions as e:
last_exception = e
if attempt == self.config.max_attempts:
logger.error(
f"All {self.config.max_attempts} attempts failed. "
f"Last error: {e}"
)
raise
delay = self._calculate_delay(attempt)
logger.warning(
f"Attempt {attempt}/{self.config.max_attempts} failed: {e}. "
f"Retrying in {delay:.2f}s..."
)
time.sleep(delay)
except Exception as e:
# Non-retryable exception — fail immediately
logger.error(f"Non-retryable error: {e}")
raise
raise last_exception
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and optional jitter."""
delay = min(
self.config.base_delay_seconds * (self.config.exponential_base ** (attempt - 1)),
self.config.max_delay_seconds
)
if self.config.jitter:
# Full jitter: random value between 0 and calculated delay
delay = random.uniform(0, delay)
return delay
def __call__(self, func: Callable) -> Callable:
"""Use as a decorator."""
def wrapper(*args, **kwargs):
return self.execute(func, *args, **kwargs)
return wrapper
class ResilientLLMClient:
"""
LLM client with integrated retry, circuit breaker, and fallback.
"""
def __init__(self, primary: "LLMClient",
fallback: Optional["LLMClient"] = None,
retry_config: RetryConfig = None,
circuit_config: CircuitBreakerConfig = None):
self.primary = primary
self.fallback = fallback
self.retry = RetryWithBackoff(retry_config or RetryConfig())
self.circuit = CircuitBreaker("llm_primary", circuit_config)
def complete(self, messages: list["LLMMessage"], **kwargs) -> "LLMResponse":
try:
return self.circuit.call(
lambda: self.retry.execute(
self.primary.complete, messages, **kwargs
)
)
except (CircuitOpenError, Exception) as e:
if self.fallback:
logger.warning(f"Primary LLM failed ({e}), using fallback.")
return self.fallback.complete(messages, **kwargs)
raise
4.4 — Human-in-the-Loop Pattern
Approval Gates for High-Stakes Actions
Intent
Insert human approval checkpoints at critical decision points in the agent pipeline, requiring explicit human confirmation before the agent takes actions that are irreversible, high-risk, or high-cost.
Motivation
Autonomous agents are powerful, but autonomy without accountability is dangerous. When an agent is about to send an email to 10,000 customers, execute a database migration, make a financial transaction, or deploy code to production, you want a human to review and approve that action before it happens. The Human-in-the-Loop pattern provides the architectural mechanism for this.
Implementation
from dataclasses import dataclass, field
from typing import Optional, Callable, Any
from enum import Enum
import uuid
import time
class ApprovalStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
TIMED_OUT = "timed_out"
@dataclass
class ApprovalRequest:
request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
agent_id: str = ""
action_type: str = ""
action_description: str = ""
action_parameters: dict = field(default_factory=dict)
risk_level: str = "medium" # low | medium | high | critical
context: str = ""
requested_at: float = field(default_factory=time.time)
expires_at: Optional[float] = None
status: ApprovalStatus = ApprovalStatus.PENDING
reviewer: Optional[str] = None
reviewer_notes: Optional[str] = None
decided_at: Optional[float] = None
class ApprovalGateway:
"""
Manages approval requests for high-stakes agent actions.
Integrates with notification systems and approval UIs.
"""
def __init__(self, notification_service: "NotificationService",
default_timeout_seconds: float = 3600,
auto_approve_low_risk: bool = False):
self.notifier = notification_service
self.default_timeout = default_timeout_seconds
self.auto_approve_low_risk = auto_approve_low_risk
self._pending: dict[str, ApprovalRequest] = {}
self._callbacks: dict[str, Callable] = {}
def request_approval(self, request: ApprovalRequest,
on_approved: Callable = None,
on_rejected: Callable = None) -> str:
"""
Submit an action for human approval.
Returns the request ID.
"""
# Auto-approve low-risk actions if configured
if self.auto_approve_low_risk and request.risk_level == "low":
request.status = ApprovalStatus.APPROVED
request.decided_at = time.time()
if on_approved:
on_approved(request)
return request.request_id
# Set expiry
if request.expires_at is None:
request.expires_at = time.time() + self.default_timeout
self._pending[request.request_id] = request
if on_approved:
self._callbacks[f"{request.request_id}_approved"] = on_approved
if on_rejected:
self._callbacks[f"{request.request_id}_rejected"] = on_rejected
# Notify reviewers
self.notifier.send_approval_request(request)
return request.request_id
def approve(self, request_id: str, reviewer: str,
notes: str = "") -> None:
"""Human approves an action."""
request = self._get_active_request(request_id)
request.status = ApprovalStatus.APPROVED
request.reviewer = reviewer
request.reviewer_notes = notes
request.decided_at = time.time()
callback = self._callbacks.pop(f"{request_id}_approved", None)
if callback:
callback(request)
self._cleanup(request_id)
def reject(self, request_id: str, reviewer: str,
reason: str = "") -> None:
"""Human rejects an action."""
request = self._get_active_request(request_id)
request.status = ApprovalStatus.REJECTED
request.reviewer = reviewer
request.reviewer_notes = reason
request.decided_at = time.time()
callback = self._callbacks.pop(f"{request_id}_rejected", None)
if callback:
callback(request)
self._cleanup(request_id)
def wait_for_decision(self, request_id: str,
poll_interval: float = 5.0) -> ApprovalRequest:
"""
Synchronously wait for a human decision (blocking).
In production, use async/event-driven approach instead.
"""
while True:
request = self._pending.get(request_id)
if not request:
raise ValueError(f"Request {request_id} not found.")
if time.time() > request.expires_at:
request.status = ApprovalStatus.TIMED_OUT
self._cleanup(request_id)
return request
if request.status != ApprovalStatus.PENDING:
return request
time.sleep(poll_interval)
def _get_active_request(self, request_id: str) -> ApprovalRequest:
request = self._pending.get(request_id)
if not request:
raise ValueError(f"Approval request '{request_id}' not found.")
if time.time() > request.expires_at:
request.status = ApprovalStatus.TIMED_OUT
raise TimeoutError(f"Approval request '{request_id}' has expired.")
return request
def _cleanup(self, request_id: str) -> None:
self._pending.pop(request_id, None)
self._callbacks.pop(f"{request_id}_approved", None)
self._callbacks.pop(f"{request_id}_rejected", None)
4.5 — Audit Trail Pattern
Immutable Logging of All Agent Decisions and Actions
Intent
Maintain a tamper-evident, append-only log of every decision, action, tool call, and output produced by agents, providing full accountability, reproducibility, and forensic capability.
Motivation
When an agent makes a mistake — and it will — you need to understand exactly what happened, why, and how to prevent it from happening again. Without a comprehensive audit trail, debugging agentic systems is like investigating a crime scene without any evidence. The Audit Trail pattern ensures that every agent action is recorded with sufficient context to reconstruct the full sequence of events.
Beyond debugging, audit trails are essential for regulatory compliance (GDPR, SOX, HIPAA), security forensics, and model governance.
Implementation
from dataclasses import dataclass, field
from typing import Any, Optional
from enum import Enum
import hashlib
import json
import time
import uuid
class AuditEventType(Enum):
AGENT_STARTED = "agent_started"
AGENT_COMPLETED = "agent_completed"
AGENT_FAILED = "agent_failed"
LLM_CALL = "llm_call"
TOOL_INVOKED = "tool_invoked"
TOOL_RESULT = "tool_result"
DECISION_MADE = "decision_made"
HANDOFF = "handoff"
APPROVAL_REQUESTED = "approval_requested"
APPROVAL_GRANTED = "approval_granted"
APPROVAL_DENIED = "approval_denied"
GUARDRAIL_TRIGGERED = "guardrail_triggered"
ERROR = "error"
@dataclass
class AuditEvent:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: AuditEventType = AuditEventType.DECISION_MADE
session_id: str = ""
agent_id: str = ""
timestamp: float = field(default_factory=time.time)
data: dict = field(default_factory=dict)
previous_event_hash: Optional[str] = None
event_hash: str = field(default="", init=False)
def __post_init__(self):
self.event_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""Compute a hash of this event for chain integrity."""
content = json.dumps({
"event_id": self.event_id,
"event_type": self.event_type.value,
"session_id": self.session_id,
"agent_id": self.agent_id,
"timestamp": self.timestamp,
"data": self.data,
"previous_event_hash": self.previous_event_hash
}, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
class AuditTrail:
"""
Immutable, append-only audit log with hash chaining
for tamper detection.
"""
def __init__(self, storage_backend: "AuditStorage"):
self.storage = storage_backend
self._last_hash: Optional[str] = None
self._session_id: str = str(uuid.uuid4())
def log(self, event_type: AuditEventType, agent_id: str,
data: dict = None) -> AuditEvent:
"""Append an event to the audit trail."""
event = AuditEvent(
event_type=event_type,
session_id=self._session_id,
agent_id=agent_id,
data=data or {},
previous_event_hash=self._last_hash
)
self._last_hash = event.event_hash
self.storage.append(event)
return event
def verify_integrity(self) -> tuple[bool, Optional[str]]:
"""
Verify the integrity of the audit chain.
Returns (is_valid, error_message).
"""
events = self.storage.get_session(self._session_id)
previous_hash = None
for event in events:
expected_hash = event._compute_hash()
if event.event_hash != expected_hash:
return False, f"Hash mismatch at event {event.event_id}"
if event.previous_event_hash != previous_hash:
return False, f"Chain broken at event {event.event_id}"
previous_hash = event.event_hash
return True, None
def get_session_summary(self) -> dict:
"""Return a summary of all events in the current session."""
events = self.storage.get_session(self._session_id)
return {
"session_id": self._session_id,
"total_events": len(events),
"event_types": {
et.value: sum(1 for e in events if e.event_type == et)
for et in AuditEventType
},
"agents_involved": list({e.agent_id for e in events}),
"duration_seconds": (
events[-1].timestamp - events[0].timestamp
if len(events) >= 2 else 0
),
"integrity_valid": self.verify_integrity()[0]
}
CHAPTER 5 — Integration Patterns
Agents do not exist in isolation. They must integrate with existing services, respond to events, expose their capabilities to other systems, and operate within the broader enterprise architecture. These patterns address the integration challenges unique to agentic systems.
5.1 — MCP Server Pattern
Standardized Tool Exposure via MCP
Intent
Expose agent tools and resources through the Model Context Protocol (MCP), a standardized open protocol that enables LLMs and agents to discover and invoke tools in a provider-agnostic, interoperable way.
Motivation
Before MCP, every agent framework had its own proprietary tool format. OpenAI had function calling, LangChain had its tool interface, AutoGen had its own convention. This fragmentation meant that tools built for one framework could not be reused in another.
The Model Context Protocol, introduced by Anthropic in 2024, provides a standardized JSON-RPC-based protocol for exposing tools, resources, and prompts to LLM clients. An MCP Server exposes capabilities; an MCP Client (typically an agent or IDE) discovers and invokes them. This separation of concerns enables a rich ecosystem of reusable, interoperable tools.
Implementation
from dataclasses import dataclass, field
from typing import Any, Optional, Callable
import json
@dataclass
class MCPToolDefinition:
"""MCP-compliant tool definition."""
name: str
description: str
input_schema: dict # JSON Schema
@dataclass
class MCPResource:
"""An MCP resource (data source accessible to agents)."""
uri: str
name: str
description: str
mime_type: str = "text/plain"
@dataclass
class MCPCallResult:
content: list[dict] # MCP content blocks
is_error: bool = False
class MCPServer:
"""
A Model Context Protocol server that exposes tools and resources
to MCP-compatible clients.
"""
def __init__(self, server_name: str, version: str = "1.0.0"):
self.server_name = server_name
self.version = version
self._tools: dict[str, tuple[MCPToolDefinition, Callable]] = {}
self._resources: dict[str, tuple[MCPResource, Callable]] = {}
def register_tool(self, definition: MCPToolDefinition,
handler: Callable) -> None:
"""Register a tool with the MCP server."""
self._tools[definition.name] = (definition, handler)
def register_resource(self, resource: MCPResource,
reader: Callable) -> None:
"""Register a resource with the MCP server."""
self._resources[resource.uri] = (resource, reader)
def tool(self, name: str, description: str, input_schema: dict):
"""Decorator for registering MCP tools."""
def decorator(func: Callable):
self.register_tool(
MCPToolDefinition(name=name, description=description,
input_schema=input_schema),
func
)
return func
return decorator
def handle_request(self, request: dict) -> dict:
"""
Handle an MCP JSON-RPC request.
Dispatches to the appropriate handler based on method.
"""
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
handlers = {
"initialize": self._handle_initialize,
"tools/list": self._handle_tools_list,
"tools/call": self._handle_tool_call,
"resources/list": self._handle_resources_list,
"resources/read": self._handle_resource_read,
}
handler = handlers.get(method)
if not handler:
return self._error_response(request_id, -32601,
f"Method not found: {method}")
try:
result = handler(params)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
except Exception as e:
return self._error_response(request_id, -32603, str(e))
def _handle_initialize(self, params: dict) -> dict:
return {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {"listChanged": False},
"resources": {"subscribe": False, "listChanged": False}
},
"serverInfo": {
"name": self.server_name,
"version": self.version
}
}
def _handle_tools_list(self, params: dict) -> dict:
return {
"tools": [
{
"name": defn.name,
"description": defn.description,
"inputSchema": defn.input_schema
}
for defn, _ in self._tools.values()
]
}
def _handle_tool_call(self, params: dict) -> dict:
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in self._tools:
raise ValueError(f"Tool '{tool_name}' not found.")
_, handler = self._tools[tool_name]
try:
result = handler(**arguments)
return {
"content": [
{"type": "text", "text": str(result)}
],
"isError": False
}
except Exception as e:
return {
"content": [
{"type": "text", "text": f"Error: {str(e)}"}
],
"isError": True
}
def _handle_resources_list(self, params: dict) -> dict:
return {
"resources": [
{
"uri": res.uri,
"name": res.name,
"description": res.description,
"mimeType": res.mime_type
}
for res, _ in self._resources.values()
]
}
def _handle_resource_read(self, params: dict) -> dict:
uri = params.get("uri")
if uri not in self._resources:
raise ValueError(f"Resource '{uri}' not found.")
resource, reader = self._resources[uri]
content = reader(uri)
return {
"contents": [
{
"uri": uri,
"mimeType": resource.mime_type,
"text": str(content)
}
]
}
def _error_response(self, request_id: Any, code: int,
message: str) -> dict:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": code, "message": message}
}
# Industrial MCP Server
industrial_mcp = MCPServer("industrial-tools", "2.0.0")
@industrial_mcp.tool(
name="get_sensor_reading",
description="Retrieve the latest reading from an industrial sensor.",
input_schema={
"type": "object",
"properties": {
"sensor_id": {"type": "string", "description": "Sensor identifier"},
"metric": {"type": "string", "enum": ["temperature", "pressure", "flow"]}
},
"required": ["sensor_id", "metric"]
}
)
def get_sensor_reading(sensor_id: str, metric: str) -> dict:
# In production, this would query the actual sensor API
return {"sensor_id": sensor_id, "metric": metric, "value": 42.7, "unit": "°C"}
5.2 — Event-Driven Agent Pattern
Agents Triggered by Events, Not Just Requests
Intent
Design agents to be event-driven — triggered by events from message queues, webhooks, or event streams — rather than exclusively responding to synchronous user requests. This enables agents to operate asynchronously, at scale, and in response to real-world triggers.
Motivation
Most agent frameworks are designed around the request-response model: a user sends a message, the agent responds. But many real-world use cases are event-driven: a new customer order arrives, a sensor threshold is breached, a CI/CD pipeline fails, a document is uploaded. In these scenarios, you want an agent to spring into action automatically, without a human initiating the interaction.
The Event-Driven Agent pattern connects agents to event infrastructure (Kafka, RabbitMQ, AWS EventBridge, webhooks) and defines how agents consume, process, and produce events.
Implementation
from dataclasses import dataclass, field
from typing import Callable, Any, Optional
from abc import ABC, abstractmethod
from enum import Enum
import asyncio
import json
@dataclass
class AgentEvent:
event_id: str
event_type: str
source: str
payload: dict
timestamp: float
correlation_id: Optional[str] = None
metadata: dict = field(default_factory=dict)
class EventHandler(ABC):
"""Abstract base for event-driven agent handlers."""
@property
@abstractmethod
def handles_event_types(self) -> list[str]:
"""List of event types this handler processes."""
pass
@abstractmethod
async def handle(self, event: AgentEvent) -> Optional[AgentEvent]:
"""
Process an event. Optionally return a new event to publish.
"""
pass
class EventDrivenAgent:
"""
An agent that subscribes to an event stream and processes
events asynchronously.
"""
def __init__(self, agent_id: str,
event_bus: "EventBus",
llm_client: "LLMClient",
tool_registry: "ToolRegistry"):
self.agent_id = agent_id
self.event_bus = event_bus
self.llm = llm_client
self.tools = tool_registry
self._handlers: dict[str, list[EventHandler]] = {}
self._running = False
def register_handler(self, handler: EventHandler) -> None:
"""Register an event handler for specific event types."""
for event_type in handler.handles_event_types:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def start(self) -> None:
"""Start consuming events from the event bus."""
self._running = True
async for event in self.event_bus.subscribe(
agent_id=self.agent_id,
event_types=list(self._handlers.keys())
):
if not self._running:
break
await self._process_event(event)
async def stop(self) -> None:
self._running = False
async def _process_event(self, event: AgentEvent) -> None:
handlers = self._handlers.get(event.event_type, [])
for handler in handlers:
try:
result_event = await handler.handle(event)
if result_event:
await self.event_bus.publish(result_event)
except Exception as e:
error_event = AgentEvent(
event_id=f"error-{event.event_id}",
event_type="agent.error",
source=self.agent_id,
payload={
"original_event_id": event.event_id,
"error": str(e),
"agent_id": self.agent_id
},
timestamp=__import__("time").time(),
correlation_id=event.correlation_id
)
await self.event_bus.publish(error_event)
# Example: Alert processing handler
class AlertProcessingHandler(EventHandler):
"""Processes industrial alerts and generates recommended actions."""
def __init__(self, llm_client: "LLMClient"):
self.llm = llm_client
@property
def handles_event_types(self) -> list[str]:
return ["sensor.alert", "equipment.fault", "threshold.exceeded"]
async def handle(self, event: AgentEvent) -> Optional[AgentEvent]:
import time
# Use LLM to analyze the alert and recommend actions
response = self.llm.complete(
messages=[
LLMMessage(
role="system",
content="You are an industrial operations expert. "
"Analyze alerts and recommend immediate actions."
),
LLMMessage(
role="user",
content=f"Alert received: {json.dumps(event.payload)}\n"
f"What immediate actions should be taken?"
)
]
)
# Publish a recommendation event
return AgentEvent(
event_id=f"rec-{event.event_id}",
event_type="agent.recommendation",
source="alert-processing-agent",
payload={
"original_alert": event.payload,
"recommendation": response.content,
"urgency": "high" if "fault" in event.event_type else "medium"
},
timestamp=time.time(),
correlation_id=event.event_id
)
5.3 — Sidecar Agent Pattern
Agents Deployed Alongside Existing Services
Intent
Deploy an agent as a sidecar — a companion process running alongside an existing service — that augments the service with AI capabilities without requiring modifications to the service itself.
Motivation
Not every organization can afford to rewrite existing services to incorporate AI. The Sidecar Agent pattern, borrowed from the service mesh world (think Envoy in Istio), allows you to add AI capabilities to existing services by deploying an agent process alongside them. The sidecar intercepts or augments requests, adds AI-powered processing, and forwards results — all transparently to the original service.
This is particularly powerful for adding capabilities like intelligent log analysis, automatic documentation generation, anomaly detection, and natural language interfaces to legacy systems.
Implementation
from dataclasses import dataclass
from typing import Optional, Callable, Any
import asyncio
@dataclass
class SidecarConfig:
target_service_url: str
intercept_requests: bool = False # If True, pre-process requests
intercept_responses: bool = True # If True, post-process responses
augmentation_mode: str = "enrich" # "enrich" | "transform" | "validate"
async_mode: bool = True # Process augmentation asynchronously
class SidecarAgent:
"""
An agent that runs alongside an existing service,
augmenting its inputs or outputs with AI capabilities.
"""
def __init__(self, config: SidecarConfig,
llm_client: "LLMClient",
service_client: "ServiceClient"):
self.config = config
self.llm = llm_client
self.service = service_client
async def handle_request(self, request: dict) -> dict:
"""
Process a request, optionally augmenting it before
forwarding to the target service.
"""
processed_request = request
# Pre-processing (request interception)
if self.config.intercept_requests:
processed_request = await self._augment_request(request)
# Forward to target service
response = await self.service.call(
self.config.target_service_url,
processed_request
)
# Post-processing (response interception)
if self.config.intercept_responses:
response = await self._augment_response(request, response)
return response
async def _augment_request(self, request: dict) -> dict:
"""Augment an incoming request with AI-derived context."""
augmentation = self.llm.complete(
messages=[
LLMMessage(
role="system",
content="Extract and structure key information from this "
"request to improve downstream processing."
),
LLMMessage(
role="user",
content=f"Request: {request}"
)
]
)
request["_ai_context"] = augmentation.content
return request
async def _augment_response(self, original_request: dict,
response: dict) -> dict:
"""Augment a service response with AI-derived insights."""
if self.config.augmentation_mode == "enrich":
enrichment = self.llm.complete(
messages=[
LLMMessage(
role="system",
content="Enrich this service response with additional "
"context, explanations, or recommendations."
),
LLMMessage(
role="user",
content=f"Request: {original_request}\n"
f"Response: {response}"
)
]
)
response["_ai_enrichment"] = enrichment.content
return response
5.4 — API Gateway Pattern for Agents
Unified Entry Point for Multi-Agent Systems
Intent
Provide a single, unified API gateway that serves as the entry point for all external interactions with a multi-agent system, handling routing, authentication, rate limiting, load balancing, and observability in one place.
Motivation
As multi-agent systems grow, they expose a proliferating set of endpoints: different agents for different tasks, different versions of the same agent, different deployment environments. Without a unified gateway, clients must know about the internal topology of the agent system — a violation of encapsulation that makes the system brittle and hard to evolve.
The API Gateway pattern provides a stable, well-defined external interface that hides the internal complexity of the agent system.
Implementation
from dataclasses import dataclass, field
from typing import Optional, Callable
import time
import uuid
@dataclass
class AgentRoute:
path: str
agent_type: str
version: str = "latest"
requires_auth: bool = True
rate_limit_rpm: int = 60 # Requests per minute
timeout_seconds: float = 120.0
@dataclass
class GatewayRequest:
request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
path: str = ""
method: str = "POST"
headers: dict = field(default_factory=dict)
body: dict = field(default_factory=dict)
client_id: str = ""
received_at: float = field(default_factory=time.time)
@dataclass
class GatewayResponse:
request_id: str
status_code: int
body: dict
agent_type: str
processing_time_ms: float
headers: dict = field(default_factory=dict)
class AgentAPIGateway:
"""
Unified API gateway for multi-agent systems.
Handles routing, auth, rate limiting, and observability.
"""
def __init__(self, auth_service: "AuthService",
rate_limiter: "RateLimiter",
agent_registry: dict[str, "WorkerAgent"],
audit_trail: "AuditTrail"):
self.auth = auth_service
self.rate_limiter = rate_limiter
self.agents = agent_registry
self.audit = audit_trail
self._routes: dict[str, AgentRoute] = {}
def add_route(self, route: AgentRoute) -> None:
self._routes[route.path] = route
async def handle(self, request: GatewayRequest) -> GatewayResponse:
start_time = time.time()
# 1. Route matching
route = self._routes.get(request.path)
if not route:
return GatewayResponse(
request_id=request.request_id,
status_code=404,
body={"error": f"No route found for path: {request.path}"},
agent_type="gateway",
processing_time_ms=0
)
# 2. Authentication
if route.requires_auth:
auth_result = self.auth.verify(request.headers.get("Authorization"))
if not auth_result.is_valid:
return GatewayResponse(
request_id=request.request_id,
status_code=401,
body={"error": "Unauthorized"},
agent_type="gateway",
processing_time_ms=0
)
request.client_id = auth_result.client_id
# 3. Rate limiting
if not self.rate_limiter.allow(request.client_id, route.rate_limit_rpm):
return GatewayResponse(
request_id=request.request_id,
status_code=429,
body={"error": "Rate limit exceeded. Please retry later."},
agent_type="gateway",
processing_time_ms=0
)
# 4. Route to appropriate agent
agent = self.agents.get(route.agent_type)
if not agent:
return GatewayResponse(
request_id=request.request_id,
status_code=503,
body={"error": f"Agent '{route.agent_type}' unavailable"},
agent_type=route.agent_type,
processing_time_ms=0
)
# 5. Execute with timeout
try:
task = WorkerTask(
task_id=request.request_id,
worker_type=route.agent_type,
instruction=request.body.get("message", ""),
context=request.body.get("context", {})
)
result = await asyncio.wait_for(
agent.execute(task),
timeout=route.timeout_seconds
)
processing_time = (time.time() - start_time) * 1000
# 6. Audit logging
self.audit.log(
AuditEventType.AGENT_COMPLETED,
agent_id=route.agent_type,
data={
"request_id": request.request_id,
"client_id": request.client_id,
"processing_time_ms": processing_time
}
)
return GatewayResponse(
request_id=request.request_id,
status_code=200,
body={"result": result.output, "success": result.success},
agent_type=route.agent_type,
processing_time_ms=processing_time
)
except asyncio.TimeoutError:
return GatewayResponse(
request_id=request.request_id,
status_code=504,
body={"error": "Agent request timed out"},
agent_type=route.agent_type,
processing_time_ms=(time.time() - start_time) * 1000
)
CHAPTER 6 — Patterns to Avoid: Anti-Patterns in Agentic AI
Every catalog of patterns must include its dark mirror: the anti-patterns. These are the architectural mistakes that seem reasonable at first but lead to systems that are unmaintainable, unreliable, insecure, or simply ineffective. Study these carefully — not to implement them, but to recognize and avoid them.
6.1 — God Agent Anti-Pattern
One Agent Trying to Do Everything
What It Looks Like
# THE GOD AGENT — DO NOT DO THIS
class GodAgent:
"""
This agent can: search the web, write code, execute code,
send emails, manage databases, analyze images, generate reports,
handle customer support, manage inventory, process payments,
schedule meetings, and make coffee (metaphorically).
"""
SYSTEM_PROMPT = """
You are an all-powerful AI assistant. You can do ANYTHING.
You have access to 150 different tools. Use them as needed.
You handle all tasks for all departments of the company.
You are the single point of contact for all AI needs.
"""
def __init__(self):
self.tools = [tool for tool in ALL_POSSIBLE_TOOLS] # 150+ tools
self.context = [] # This will grow unboundedly
self.state = {} # Shared mutable state for everything
Why It Fails
1. Tool Confusion: With 150 tools available, the LLM struggles to select the right one. Research shows that LLM tool selection accuracy degrades significantly beyond 20-30 tools. The agent begins making incorrect tool choices, hallucinating tool names, or simply ignoring available tools.
2. Context Collapse: A God Agent accumulates context from all tasks, all users, and all domains. The context window fills rapidly, important information gets evicted or lost, and the agent begins confusing context from different tasks.
3. Blast Radius: When the God Agent fails — and it will fail — everything fails. There is no isolation, no fallback, no graceful degradation. A single bug or hallucination can have system-wide consequences.
4. Untestability: How do you write unit tests for an agent that does everything? You cannot. The God Agent is a testing nightmare.
5. Prompt Complexity: The system prompt for a God Agent becomes a sprawling, contradictory document that no single LLM can reliably follow.
The Fix
Apply the Single Responsibility Principle to agents. Each agent should have one clear purpose, a focused set of tools (ideally fewer than 15), and a well-defined scope of authority. Use the Orchestrator-Worker Pattern to coordinate specialized agents.
# THE RIGHT WAY — Specialized agents with clear responsibilities
class ResearchAgent:
"""Searches and synthesizes information. Tools: web_search, arxiv, wikipedia."""
pass
class CodeAgent:
"""Writes, reviews, and executes code. Tools: code_interpreter, github, linter."""
pass
class CommunicationAgent:
"""Handles emails and notifications. Tools: email, slack, calendar."""
pass
class OrchestratorAgent:
"""Coordinates the above agents. No tools — only delegation."""
pass
6.2 — Prompt Spaghetti Anti-Pattern
Unmanageable Prompt Engineering
What It Looks Like
# PROMPT SPAGHETTI — DO NOT DO THIS
def build_prompt(user_input, context, history, tools, persona,
constraints, examples, format_instructions,
safety_rules, company_guidelines, seasonal_adjustments,
user_tier, feature_flags):
prompt = f"""
You are {persona}. But also remember that you are {OTHER_PERSONA}.
Unless the user is a premium user ({user_tier}), in which case
you should be {PREMIUM_PERSONA}. But if it's Q4 ({seasonal_adjustments}),
be more formal. Unless the user asked for casual tone, then be casual,
but not too casual if {feature_flags['strict_mode']} is enabled.
IMPORTANT: Always follow these rules:
{safety_rules}
But sometimes ignore rule 3 if {some_condition}.
Actually, rule 7 overrides rule 3 in most cases.
Except on weekends.
Here are 47 examples of how to respond:
{examples} # 15,000 tokens of examples
Format your response as JSON. But if the user asks for markdown,
use markdown. Unless they're on mobile, then use plain text.
Actually, always use JSON. No wait, use whatever feels right.
{history} # Entire uncompressed conversation history
{context} # Everything we know about everything
{tools} # All 150 tools described in prose
User said: {user_input}
"""
return prompt
Why It Fails
1. Contradictions: When a prompt contains contradictory instructions, the LLM must choose which to follow — and its choice will be unpredictable and inconsistent.
2. Instruction Dilution: Research on LLM attention patterns shows that instructions buried in the middle of a very long prompt are frequently ignored or underweighted. Critical rules hidden in a wall of text will be missed.
3. Untestability: You cannot write deterministic tests for a prompt that behaves differently depending on which instructions the LLM happened to attend to.
4. Maintenance Nightmare: When behavior needs to change, you must hunt through thousands of tokens of prose to find and update the relevant instruction — and hope you do not break something else in the process.
5. Token Cost: Prompt spaghetti is expensive. Paying for 15,000 tokens of examples on every single API call is financially unsustainable at scale.
The Fix
Apply software engineering discipline to prompt management:
# THE RIGHT WAY — Structured, modular prompt management
class PromptTemplate:
"""A versioned, testable prompt component."""
def __init__(self, name: str, version: str, template: str):
self.name = name
self.version = version
self.template = template
def render(self, **kwargs) -> str:
return self.template.format(**kwargs)
class PromptComposer:
"""
Composes prompts from modular, versioned components.
Each component has a single responsibility.
"""
def __init__(self):
self._components: dict[str, PromptTemplate] = {}
def register(self, template: PromptTemplate) -> None:
self._components[template.name] = template
def compose(self, component_names: list[str], **kwargs) -> str:
parts = []
for name in component_names:
template = self._components.get(name)
if template:
parts.append(template.render(**kwargs))
return "\n\n".join(parts)
# Each component is focused, versioned, and independently testable
persona_template = PromptTemplate(
name="persona",
version="2.1.0",
template="You are a helpful {role} specializing in {domain}."
)
safety_template = PromptTemplate(
name="safety",
version="1.3.0",
template="Always follow these safety guidelines:\n{safety_rules}"
)
format_template = PromptTemplate(
name="format",
version="1.0.0",
template="Respond in {format} format."
)
Additional rules for prompt hygiene:
- Keep system prompts under 2,000 tokens where possible
- Use structured output (JSON mode, function calling) instead of prose format instructions
- Version control your prompts like code
- Write unit tests for prompt behavior
- Use few-shot examples sparingly and store them in a retrieval system, not inline
6.3 — Synchronous Blocking Anti-Pattern
Blocking the Main Thread on LLM Calls
What It Looks Like
# SYNCHRONOUS BLOCKING — DO NOT DO THIS
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/chat", methods=["POST"])
def chat():
user_message = request.json["message"]
# This blocks the entire thread for 10-60 seconds
# while waiting for the LLM response.
# No other requests can be served during this time.
response = llm_client.complete(user_message) # Blocking call
# If you have 10 concurrent users, 9 of them are waiting.
# If you have 100 concurrent users, your server is effectively down.
return jsonify({"response": response})
Why It Fails
LLM API calls are slow — typically 5-60 seconds for complex tasks. Blocking the main thread on these calls means:
- Throughput collapses: A synchronous server can handle at most
1 / avg_latencyrequests per second per thread - Timeouts cascade: Users waiting for responses time out, triggering retries, which make the problem worse
- Resource exhaustion: Thread pools fill up, memory grows, and the server becomes unresponsive
- User experience degrades: Users see spinning loaders or error messages instead of responses
The Fix
Use asynchronous programming and streaming for all LLM interactions:
# THE RIGHT WAY — Async and streaming
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
@app.post("/chat")
async def chat(request_body: dict):
user_message = request_body["message"]
# Non-blocking: other requests can be served while this awaits
async def generate():
async for token in llm_client.stream_async(user_message):
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@app.post("/chat/async")
async def chat_async(request_body: dict):
"""
For long-running tasks: return a job ID immediately,
process asynchronously, and let the client poll for results.
"""
import uuid
job_id = str(uuid.uuid4())
# Queue the job for background processing
asyncio.create_task(
process_agent_task(job_id, request_body["message"])
)
return {"job_id": job_id, "status": "processing",
"poll_url": f"/jobs/{job_id}"}
async def process_agent_task(job_id: str, message: str):
"""Background task — does not block any request handler."""
result = await agent.run_async(message)
await job_store.set(job_id, result)
6.4 — Hardcoded Model Anti-Pattern
Tying Code to a Specific LLM Version
What It Looks Like
# HARDCODED MODEL — DO NOT DO THIS
import openai
def analyze_document(text: str) -> str:
# Hardcoded to a specific model version
response = openai.chat.completions.create(
model="gpt-4-0613", # This model is now deprecated
messages=[{"role": "user", "content": text}],
# OpenAI-specific parameters hardcoded throughout
max_tokens=4096,
temperature=0.7,
presence_penalty=0.1,
frequency_penalty=0.1
)
return response.choices[0].message.content
# Scattered throughout the codebase:
# - 47 direct calls to openai.chat.completions.create
# - 12 calls using the Anthropic SDK directly
# - 8 calls to a local Ollama instance
# - No abstraction layer whatsoever
Why It Fails
1. Deprecation Risk: LLM providers regularly deprecate model versions. When gpt-4-0613 is deprecated, every hardcoded reference must be found and updated — and you will miss some.
2. Vendor Lock-in: Switching providers (e.g., from OpenAI to Anthropic for cost reasons) requires rewriting every LLM call in the codebase.
3. Testing Difficulty: You cannot mock or stub hardcoded SDK calls without monkey-patching, making unit tests fragile and slow.
4. Cost Optimization Blocked: You cannot route different types of tasks to different models (e.g., cheap models for simple tasks, expensive models for complex ones) without a refactor.
5. Inconsistent Behavior: Different parts of the codebase may use different models, leading to inconsistent behavior that is hard to debug.
The Fix
Always use the Adapter Pattern for LLM Clients (Section 1.5) and externalize model configuration:
# THE RIGHT WAY — Configuration-driven model selection
# config.yaml
llm_config = {
"default": {
"provider": "openai",
"model": "gpt-4o",
"temperature": 0.7
},
"fast": {
"provider": "openai",
"model": "gpt-4o-mini",
"temperature": 0.3
},
"reasoning": {
"provider": "anthropic",
"model": "claude-opus-4-5",
"temperature": 0.5
}
}
# All code uses the abstraction, never the SDK directly
def analyze_document(text: str, llm_client: LLMClient) -> str:
response = llm_client.complete(
messages=[LLMMessage(role="user", content=text)]
)
return response.content
# Model selection is a configuration concern, not a code concern
client = LLMClientFactory.create(
provider=config["default"]["provider"],
model=config["default"]["model"]
)
result = analyze_document(text, client)
6.5 — Unbounded Tool Access Anti-Pattern
Agents with Too Many Tools and No Scope
What It Looks Like
# UNBOUNDED TOOL ACCESS — DO NOT DO THIS
class CustomerSupportAgent:
"""
A customer support agent that somehow has access to:
- The production database (read AND write)
- The email system (can send to anyone)
- The billing system (can issue refunds of any amount)
- The user management system (can delete accounts)
- The internal HR system (why???)
- The code deployment pipeline (absolutely not)
- External payment processors
- Social media APIs
"""
def __init__(self):
self.tools = [
DatabaseReadTool(),
DatabaseWriteTool(), # ← Dangerous
EmailSendTool(), # ← No recipient restrictions
BillingTool(max_refund=None), # ← Unlimited refunds
UserDeletionTool(), # ← Irreversible
HRSystemTool(), # ← Completely out of scope
DeploymentTool(), # ← Catastrophically dangerous
PaymentProcessorTool() # ← Financial risk
]
Why It Fails
1. Catastrophic Failure Modes: An agent with access to the deployment pipeline and a hallucination about what "restart the service" means can take down production. This is not theoretical — it has happened.
2. Principle of Least Privilege Violated: The fundamental security principle that every component should have only the minimum access necessary to perform its function is completely ignored.
3. Prompt Injection Amplification: If an attacker can inject a prompt into the agent's input (e.g., through a malicious customer message), they can leverage the agent's broad tool access to cause significant damage.
4. Audit and Compliance Failures: Regulators and auditors expect that systems with access to sensitive data and financial operations have appropriate access controls. An agent with unbounded access is a compliance nightmare.
5. Debugging Complexity: When something goes wrong, determining which tool caused the problem in a sea of 50+ tools is extremely difficult.
The Fix
Apply strict scope limitation and the principle of least privilege to every agent:
# THE RIGHT WAY — Scoped, restricted tool access
class CustomerSupportAgent:
"""
A customer support agent with carefully scoped tool access.
"""
def __init__(self, tool_registry: ToolRegistry, agent_role: str = "support"):
# Only discover tools allowed for the support role
allowed_tools = tool_registry.discover(role=agent_role)
self.tools = allowed_tools
# In the tool registry, tools are scoped by role:
# - "support" role: read customer data, issue refunds up to $50,
# send emails to the requesting customer only
# - "support_senior" role: refunds up to $500, escalation tools
# - "support_admin" role: account modifications, large refunds
# - "engineer" role: deployment tools, database write access
# Tool-level restrictions:
refund_tool = ToolSchema(
name="issue_refund",
description="Issue a refund to a customer",
parameters={
"type": "object",
"properties": {
"order_id": {"type": "string"},
"amount": {
"type": "number",
"maximum": 50.0 # Hard limit enforced at schema level
}
},
"required": ["order_id", "amount"]
},
allowed_roles=["support", "support_senior", "support_admin"],
requires_confirmation=True # Human approval for any refund
)
# Additionally, implement runtime guardrails:
class ScopedToolExecutor:
"""Enforces tool scope at execution time."""
def execute(self, tool_name: str, agent_id: str,
session_id: str, **kwargs) -> Any:
# Check session-level limits
if self._session_limit_exceeded(agent_id, session_id, tool_name):
raise PermissionError(
f"Session limit exceeded for tool '{tool_name}'"
)
# Check parameter bounds
self._validate_parameter_bounds(tool_name, kwargs)
# Log every tool invocation
self.audit.log(
AuditEventType.TOOL_INVOKED,
agent_id=agent_id,
data={"tool": tool_name, "params": kwargs, "session": session_id}
)
return self.registry.invoke(tool_name, **kwargs)
Conclusion: Architecture as a First-Class Concern in Agentic AI
The patterns cataloged in this article represent the current state of the art in agentic AI system design. They are not academic exercises — they are hard-won lessons from teams who built agentic systems, watched them fail in unexpected ways, and iterated toward more robust designs.
Several overarching principles emerge from studying these patterns together:
1. Separation of Concerns is Non-Negotiable The most reliable agentic systems are those where reasoning, acting, memory, coordination, and safety are cleanly separated. The God Agent anti-pattern fails precisely because it collapses all these concerns into one. Every pattern in this catalog is, at its core, an application of separation of concerns to a specific agentic challenge.
2. Safety is Structural, Not Incidental Guardrails, circuit breakers, human-in-the-loop gates, and audit trails are not features you add after the system is built. They are structural elements that must be designed in from the beginning. An agentic system without these patterns is not a system — it is a liability.
3. Observability Enables Reliability Agents that cannot be observed cannot be debugged, and systems that cannot be debugged cannot be trusted. The Audit Trail pattern, combined with structured logging throughout the pipeline, transforms agentic systems from black boxes into transparent, accountable systems.
4. Composition Over Monolithism The most powerful agentic systems are not single, omnipotent agents — they are compositions of specialized, focused agents coordinated by well-designed orchestration patterns. This mirrors the most successful paradigm in software engineering: microservices over monoliths, components over god objects.
5. The Model is a Dependency, Not the Architecture Perhaps the most important lesson: the LLM is a component in your architecture, not the architecture itself. The Adapter Pattern for LLM Clients, the Hardcoded Model anti-pattern, and the MCP Server Pattern all point to the same truth — your architecture must be model-agnostic, because the models will change faster than your architecture should.
The field of agentic AI is evolving rapidly. New patterns will emerge, existing patterns will be refined, and some of what we consider best practice today will be superseded by better approaches tomorrow. But the underlying principles — separation of concerns, least privilege, observability, composability, and model-agnosticism — are timeless. Build your agentic systems on these foundations, and they will remain maintainable, reliable, and trustworthy as the technology evolves.