Introduction to AI Agents and Agentic AI
Artificial Intelligence agents represent a paradigm shift in how we interact with and deploy AI systems. An AI agent is an autonomous entity that can perceive its environment, make decisions, and take actions to achieve specific goals. Unlike traditional AI systems that respond reactively to user queries, agents proactively work toward objectives, maintain state, and adapt their strategies based on feedback and changing conditions.
Agentic AI refers to AI systems that possess agency - the capacity to act independently, make decisions, and take goal-directed actions. These systems go beyond simple input-output mappings to incorporate planning, reasoning, and autonomous action. The core premise of agentic AI is that the system can decompose complex tasks into manageable steps, reason about which tools or approaches to use, and execute actions in a coherent sequence to achieve an objective.
The evolution from passive, query-based AI to agentic systems represents a fundamental shift in AI capabilities. While large language models (LLMs) provide the cognitive foundation for these agents through their reasoning and language understanding capabilities, the agent architecture provides the framework for sustained, goal-directed behavior. This combination enables AI systems to tackle complex tasks that require multiple steps, tool usage, and adaptation to changing circumstances.
Detailed Discussion of AI Agent Systems
Single AI Agent Systems: Definition, Properties, and Constituents
A single AI agent is an autonomous computational entity that perceives its environment, makes decisions, and takes actions to achieve specific goals. Unlike traditional AI systems that simply map inputs to outputs, agents maintain an ongoing interaction with their environment, adapting their behavior based on feedback and changing conditions.
The core properties that define a single AI agent include autonomy, reactivity, proactivity, and social ability. Autonomy refers to the agent's ability to operate without direct human intervention, making decisions based on its internal state and perceptions. Reactivity is the ability to perceive the environment and respond to changes in a timely manner. Proactivity enables the agent to take initiative and exhibit goal-directed behavior. Social ability allows the agent to interact with other agents or humans through some kind of communication language or protocol.
The fundamental constituents of a single AI agent system include several interconnected components:
The perception module is responsible for gathering and processing information from the environment. This can include text input from users, data from APIs, sensor readings, or any other form of external information. In language-based agents, this often involves natural language understanding to interpret user requests and extract relevant information.
The memory system maintains the agent's internal state across interactions. Short-term memory holds immediate context for the current task, while long-term memory stores information that might be relevant across different tasks or sessions. Episodic memory captures sequences of interactions that can inform future decisions. Working memory serves as a computational workspace where the agent can reason about its current task.
The reasoning and planning module enables the agent to make decisions and formulate plans to achieve its goals. This component leverages the underlying language model's capabilities but adds structure to ensure coherent and effective planning. Techniques like chain-of-thought reasoning help improve the quality of planning by explicitly working through the thinking process. Tree-of-thought approaches allow the agent to explore multiple potential solution paths before committing to a course of action.
The action selection mechanism determines which actions to take based on the agent's current state, goals, and plans. This involves evaluating different options, considering their potential outcomes, and selecting the most promising action. In tool-using agents, this includes deciding which tools to use and how to use them.
The execution module carries out the selected actions, whether that involves generating text responses, calling external APIs, manipulating data, or controlling physical systems. This component translates high-level plans into concrete actions and handles any errors or unexpected outcomes that may arise during execution.
The learning component enables the agent to improve its performance over time based on experience and feedback. This can involve updating internal models, adjusting action selection strategies, or refining planning approaches. Learning can occur through various mechanisms, including reinforcement learning, imitation learning, or explicit feedback from users.
The integration of these components creates a cohesive agent system capable of sustained, goal-directed behavior. The specific implementation of each component can vary depending on the agent's purpose, the underlying technologies, and the design choices made by the developers.
Multi-Agent Systems: Definition, Properties, and Constituents
Multi-agent systems consist of multiple autonomous agents that interact with each other to solve problems that might be difficult for a single agent to handle. These systems leverage the collective capabilities of specialized agents, enabling more complex behaviors through collaboration, competition, or coordination.
The distinctive properties of multi-agent systems include distribution, heterogeneity, coordination, and emergence. Distribution refers to the decentralized nature of multi-agent systems, where computation and decision-making are spread across multiple agents. Heterogeneity allows agents to have different capabilities, knowledge, or roles, enabling specialization and division of labor. Coordination mechanisms enable agents to work together effectively, whether through explicit communication or implicit coordination. Emergence describes how complex system-level behaviors can arise from the interactions of simpler agents, often producing capabilities that no single agent possesses.
The constituents of multi-agent systems include several components beyond those found in single agents:
The communication infrastructure enables agents to exchange information, requests, and responses. This includes message formats, communication protocols, and mechanisms for addressing and routing messages between agents. Standardized protocols like the Agent Communication Language (ACL) or more recent ones like Google's A2A Protocol provide structured ways for agents to communicate.
Coordination mechanisms manage the interactions between agents to ensure coherent collective behavior. These can include centralized coordination through manager agents, decentralized coordination through peer-to-peer interactions, or hybrid approaches that combine elements of both. Coordination mechanisms help resolve conflicts, allocate resources, and ensure that agents work together effectively.
Role assignment and specialization enable agents to focus on specific tasks or domains where they have particular expertise. This division of labor allows the system to handle complex problems by breaking them down into more manageable components that can be addressed by specialized agents.
Collective decision-making processes allow agents to reach agreements on actions, plans, or beliefs. These can include voting mechanisms, negotiation protocols, or consensus algorithms that enable agents to reconcile different perspectives or preferences.
Shared knowledge representations provide common ground for agents to understand each other and reason about shared tasks. These can include ontologies, knowledge graphs, or other structured representations that facilitate communication and coordination.
The integration of these components creates a multi-agent system capable of complex collaborative behaviors. The specific implementation can vary widely depending on the application domain, the types of agents involved, and the coordination patterns employed.
Agentic AI Systems: Definition, Properties, and Constituents
Agentic AI refers to AI systems that possess agency - the capacity to act independently, make decisions, and take goal-directed actions. These systems go beyond simple input-output mappings to incorporate planning, reasoning, and autonomous action. The core premise of agentic AI is that the system can decompose complex tasks into manageable steps, reason about which tools or approaches to use, and execute actions in a coherent sequence to achieve an objective.
The defining properties of agentic AI systems include intentionality, adaptability, persistence, and tool use. Intentionality refers to the system's ability to form and pursue goals, maintaining focus on objectives even across multiple interactions.
Adaptability enables the system to adjust its strategies based on feedback, changing conditions, or new information.
Persistence allows the system to maintain state and continue working toward goals over extended periods.
Tool use empowers the system to leverage external capabilities, expanding its range of possible actions beyond what it can do directly.
The constituents of agentic AI systems include several interconnected components:
The goal management system maintains the agent's objectives and ensures that actions are aligned with these goals. This includes mechanisms for goal decomposition, priority management, and progress tracking. The goal system enables the agent to maintain focus on its objectives even as it works through multiple steps or encounters obstacles.
The strategic planning component enables the agent to develop high-level plans for achieving its goals. This involves breaking down complex objectives into manageable steps, considering alternative approaches, and selecting promising strategies. Strategic planning operates at a higher level than tactical planning, focusing on overall approaches rather than specific actions.
The tactical planning component translates strategic plans into concrete action sequences. This involves determining the specific steps needed to implement a strategy, including which tools to use, what information to gather, and how to sequence actions. Tactical planning is more detailed and immediate than strategic planning, focusing on the specific actions needed in the near term.
The tool integration framework allows the agent to use external tools and services to expand its capabilities. This includes mechanisms for tool discovery, selection, invocation, and result interpretation. The tool framework enables the agent to leverage specialized capabilities that are not built into its core functionality.
The feedback processing system enables the agent to learn from the results of its actions and adjust its behavior accordingly. This includes mechanisms for evaluating outcomes, identifying successes and failures, and updating internal models or strategies. Feedback processing is essential for adaptive behavior and continuous improvement.
The integration of these components creates an agentic AI system capable of sustained, goal-directed behavior across complex tasks and extended interactions. The specific implementation can vary depending on the system's purpose, the underlying technologies, and the design choices made by the developers.
Architectural Styles for AI Agent Systems
Single AI Agent Architectures
Several architectural styles have emerged for implementing single AI agents, each with its own strengths, weaknesses, and appropriate use cases.
The reactive architecture focuses on mapping perceptions directly to actions without maintaining complex internal state or engaging in deliberative planning. Reactive agents use condition-action rules or similar mechanisms to determine their behavior based on current inputs. This architecture is simple, responsive, and robust to changes in the environment. However, it struggles with tasks that require memory, planning, or reasoning about the future. Reactive architectures are appropriate for applications where quick responses are more important than optimal decision-making, such as simple chatbots or real-time control systems.
The deliberative architecture incorporates explicit reasoning and planning processes. Deliberative agents maintain internal models of the world, reason about possible actions and their consequences, and develop plans to achieve their goals. This architecture enables more sophisticated behavior but can be computationally intensive and slower to respond. Deliberative architectures are appropriate for complex problem-solving tasks where optimal decision-making is more important than immediate responses, such as strategic planning or complex information retrieval.
The hybrid reactive-deliberative architecture combines elements of both reactive and deliberative approaches. These agents use reactive components for immediate responses and deliberative components for longer-term planning and reasoning. This architecture balances responsiveness with sophistication, enabling quick reactions to urgent situations while still supporting complex goal-directed behavior. Hybrid architectures are appropriate for applications that require both quick responses and sophisticated reasoning, such as personal assistants or customer service agents.
The BDI (Belief-Desire-Intention) architecture organizes the agent's internal state into beliefs about the world, desires representing goals, and intentions representing committed plans. BDI agents continuously update their beliefs based on perceptions, select desires to pursue based on their current beliefs, and formulate intentions to achieve those desires. This architecture provides a clear conceptual framework for reasoning about goals and actions but can be complex to implement. BDI architectures are appropriate for applications where the agent needs to balance multiple competing goals and adapt to changing circumstances, such as autonomous decision-making systems or complex workflow assistants.
The subsumption architecture organizes behavior into layers, with higher layers subsuming or overriding the behavior of lower layers when appropriate. Each layer implements a specific competence, such as obstacle avoidance, exploration, or goal pursuit. This architecture enables incremental development and graceful degradation but can be difficult to design for complex interactions between layers. Subsumption architectures are appropriate for applications where behaviors have clear priority relationships and the agent needs to maintain basic functionality even when higher-level capabilities fail.
Here's an example of implementing a hybrid reactive-deliberative agent using LangChain and FastAPI for REST integration:
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
import uvicorn
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain_community.tools import DuckDuckGoSearchRun
app = FastAPI(title="Hybrid Agent API")
# Define models for API requests and responses
class AgentRequest(BaseModel):
user_input: str
session_id: str
context: Optional[List[Dict[str, str]]] = []
class AgentResponse(BaseModel):
response: str
actions_taken: List[str]
session_id: str
# Session storage (in a production system, use a proper database)
sessions = {}
# Define tools
search_tool = DuckDuckGoSearchRun()
@tool
def calculator(expression: str) -> float:
"""Evaluate a mathematical expression."""
return eval(expression)
# Reactive component - handles immediate responses
def reactive_component(user_input: str) -> str:
"""Simple pattern matching for immediate responses."""
greetings = ["hello", "hi", "hey", "greetings"]
farewells = ["bye", "goodbye", "see you", "farewell"]
thanks = ["thank", "thanks", "appreciate"]
lower_input = user_input.lower()
if any(greeting in lower_input for greeting in greetings):
return "Hello! How can I assist you today?"
elif any(farewell in lower_input for farewell in farewells):
return "Goodbye! Have a great day!"
elif any(thank in lower_input for thank in thanks):
return "You're welcome! Is there anything else I can help with?"
# No reactive response, return None to indicate deliberative processing needed
return None
# Deliberative component - handles complex reasoning
def deliberative_component(user_input: str, context: List[Dict[str, str]]) -> Dict[str, Any]:
"""Complex reasoning and planning for non-immediate responses."""
llm = ChatOpenAI(model="gpt-4", temperature=0)
# First, determine if tools are needed
tool_selection_prompt = ChatPromptTemplate.from_messages([
("system", """Analyze the user's request and determine if any tools are needed:
- 'search' for information lookup
- 'calculator' for mathematical operations
- 'none' if no tools are needed
Return just the tool name."""),
*[{"role": msg["role"], "content": msg["content"]} for msg in context],
("user", user_input)
])
tool_decision = llm.invoke(tool_selection_prompt).content.strip().lower()
actions_taken = []
tool_result = None
# Use the appropriate tool if needed
if "search" in tool_decision:
try:
tool_result = search_tool.invoke(user_input)
actions_taken.append(f"Performed search: {user_input}")
except Exception as e:
tool_result = f"Error performing search: {str(e)}"
actions_taken.append("Search attempt failed")
elif "calculator" in tool_decision or "math" in tool_decision:
# Extract the expression (simplified approach)
try:
# This is a simplistic extraction - in a real system, use more robust parsing
expression = "".join(c for c in user_input if c.isdigit() or c in "+-*/().^ ")
tool_result = calculator.invoke(expression)
actions_taken.append(f"Calculated: {expression} = {tool_result}")
except Exception as e:
tool_result = f"Error performing calculation: {str(e)}"
actions_taken.append("Calculation attempt failed")
# Generate the final response
response_prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant. Provide a clear and concise response to the user's request.
If tool results are provided, incorporate them into your response."""),
*[{"role": msg["role"], "content": msg["content"]} for msg in context],
("user", user_input + (f"\n\nTool result: {tool_result}" if tool_result else ""))
])
response = llm.invoke(response_prompt).content
return {
"response": response,
"actions_taken": actions_taken
}
@app.post("/agent/interact", response_model=AgentResponse)
async def interact_with_agent(request: AgentRequest):
# Get or create session
session_id = request.session_id
if session_id not in sessions:
sessions[session_id] = []
# Update context with the new user message
context = request.context + [{"role": "user", "content": request.user_input}]
# Try reactive component first
reactive_response = reactive_component(request.user_input)
if reactive_response:
# If reactive component handled it, use that response
response = reactive_response
actions_taken = ["Used reactive response"]
else:
# Otherwise, use deliberative component
result = deliberative_component(request.user_input, context)
response = result["response"]
actions_taken = result["actions_taken"]
# Update session with the interaction
sessions[session_id] = context + [{"role": "assistant", "content": response}]
return AgentResponse(
response=response,
actions_taken=actions_taken,
session_id=session_id
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
This example demonstrates a hybrid reactive-deliberative architecture implemented as a REST API. The reactive component provides immediate responses to simple inputs like greetings or farewells. The deliberative component handles more complex requests that require reasoning, tool use, or context understanding. The API allows clients to maintain session context across multiple interactions, enabling more coherent conversations.
Multi-Agent Architectures
Several architectural styles have emerged for implementing multi-agent systems, each with its own strengths, weaknesses, and appropriate use cases.
The centralized architecture uses a manager or coordinator agent to oversee the activities of worker agents. The manager breaks down tasks, assigns them to appropriate workers, monitors progress, and integrates results. This architecture provides clear control flow and simplifies coordination but can create bottlenecks at the manager and single points of failure. Centralized architectures are appropriate for applications where task decomposition and result integration are complex, such as project management assistants or workflow orchestration systems.
The decentralized architecture allows agents to interact directly with each other without central coordination. Agents discover each other's capabilities, negotiate tasks, and collaborate through peer-to-peer interactions. This architecture is more flexible and resilient but can be more complex to design and debug. Decentralized architectures are appropriate for applications where agents need to form dynamic coalitions or where the system must continue functioning even if some agents fail, such as distributed problem-solving or resilient service networks.
The hierarchical architecture organizes agents into a tree-like structure, with higher-level agents managing groups of lower-level agents. Each level in the hierarchy handles a different level of abstraction or scope of responsibility. This architecture balances centralized control with distributed execution but can be rigid and slow to adapt to changing circumstances. Hierarchical architectures are appropriate for applications with natural organizational structures or where different levels of abstraction are needed, such as enterprise automation or complex system management.
The holonic architecture organizes agents into "holons" - entities that are both wholes and parts of larger wholes. Each holon can act autonomously within its scope but also participates in larger holons. This architecture is flexible and scalable but can be complex to implement. Holonic architectures are appropriate for applications where the system needs to reconfigure itself dynamically or where different organizational views are needed, such as manufacturing control or adaptive service networks.
The blackboard architecture uses a shared data structure (the blackboard) that all agents can read from and write to. Agents monitor the blackboard for relevant information, contribute their expertise when appropriate, and collaborate indirectly through the shared data. This architecture facilitates information sharing but requires careful management of concurrent access. Blackboard architectures are appropriate for applications where multiple specialists need to contribute to a common problem without direct communication, such as collaborative design or diagnostic systems.
Here's an example of implementing a centralized multi-agent system using LangChain, LangGraph, and FastAPI:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional, Union
import uvicorn
import uuid
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langgraph.graph import StateGraph, END
app = FastAPI(title="Multi-Agent System API")
# Define models for API requests and responses
class TaskRequest(BaseModel):
task_description: str
priority: Optional[int] = 1
deadline: Optional[str] = None
additional_context: Optional[Dict[str, Any]] = None
class TaskResponse(BaseModel):
task_id: str
status: str
assigned_agents: List[str]
estimated_completion: Optional[str] = None
class TaskStatusRequest(BaseModel):
task_id: str
class TaskStatusResponse(BaseModel):
task_id: str
status: str
progress: float
results: Optional[Dict[str, Any]] = None
agent_activities: List[Dict[str, Any]]
# Task storage (in a production system, use a proper database)
tasks = {}
# Define agent types and their capabilities
agent_capabilities = {
"researcher": ["information_gathering", "data_analysis", "fact_checking"],
"writer": ["content_creation", "summarization", "editing"],
"coder": ["code_generation", "debugging", "optimization"],
"designer": ["visual_design", "layout", "ui_creation"]
}
# Initialize LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Manager Agent
class ManagerAgent:
def __init__(self):
self.llm = llm
def analyze_task(self, task_description: str, additional_context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Analyze a task and break it down into subtasks with agent assignments."""
context_str = json.dumps(additional_context) if additional_context else "No additional context provided"
prompt = ChatPromptTemplate.from_messages([
("system", """You are a task manager agent. Analyze the given task and:
1. Break it down into 2-5 subtasks
2. For each subtask, assign the most appropriate agent type from: researcher, writer, coder, designer
3. Estimate the completion percentage for each subtask
Return your analysis as a JSON object with:
- 'subtasks': array of objects with 'id', 'description', 'agent_type', 'estimated_completion_percentage'
- 'dependencies': array of objects with 'id', 'depends_on' (array of subtask ids)"""),
("user", f"""Task Description: {task_description}
Additional Context: {context_str}
Available Agent Types and Their Capabilities:
- researcher: information gathering, data analysis, fact checking
- writer: content creation, summarization, editing
- coder: code generation, debugging, optimization
- designer: visual design, layout, UI creation""")
])
response = self.llm.invoke(prompt)
# Parse the response to extract the task breakdown
try:
task_breakdown = json.loads(response.content)
return task_breakdown
except:
# Fallback if JSON parsing fails
return {
"subtasks": [
{"id": "subtask_1", "description": task_description, "agent_type": "researcher", "estimated_completion_percentage": 100}
],
"dependencies": []
}
def integrate_results(self, task_id: str, subtask_results: Dict[str, Any]) -> Dict[str, Any]:
"""Integrate the results from multiple subtasks into a coherent final result."""
task = tasks.get(task_id)
if not task:
return {"error": "Task not found"}
results_str = json.dumps(subtask_results)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a result integration agent. Combine the results from multiple subtasks into a coherent final result.
Ensure that the final result addresses the original task completely and consistently."""),
("user", f"""Original Task: {task['description']}
Subtask Results: {results_str}
Please integrate these results into a coherent final result.""")
])
response = self.llm.invoke(prompt)
return {
"integrated_result": response.content,
"subtask_results": subtask_results
}
# Worker Agent
class WorkerAgent:
def __init__(self, agent_type: str):
self.agent_type = agent_type
self.capabilities = agent_capabilities.get(agent_type, [])
self.llm = llm
def execute_subtask(self, subtask_description: str, context: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a subtask based on the agent's capabilities."""
capabilities_str = ", ".join(self.capabilities)
context_str = json.dumps(context)
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are a {self.agent_type} agent with capabilities in {capabilities_str}.
Execute the assigned subtask to the best of your abilities.
Provide a detailed result that can be integrated with other subtasks."""),
("user", f"""Subtask: {subtask_description}
Context: {context_str}
Please execute this subtask and provide your result.""")
])
response = self.llm.invoke(prompt)
return {
"agent_type": self.agent_type,
"subtask_description": subtask_description,
"result": response.content
}
# Initialize agents
manager_agent = ManagerAgent()
worker_agents = {
agent_type: WorkerAgent(agent_type)
for agent_type in agent_capabilities.keys()
}
# Define the multi-agent workflow
class MultiAgentState(TypedDict):
task_id: str
task_description: str
subtasks: List[Dict[str, Any]]
dependencies: List[Dict[str, Any]]
results: Dict[str, Any]
completed_subtasks: List[str]
current_subtasks: List[str]
status: str
def initialize_workflow(state: MultiAgentState) -> MultiAgentState:
"""Initialize the workflow by analyzing the task and creating subtasks."""
task_description = state["task_description"]
task_id = state["task_id"]
# Analyze the task
task_breakdown = manager_agent.analyze_task(task_description)
# Find subtasks with no dependencies (can be started immediately)
dependent_subtasks = set()
for dep in task_breakdown.get("dependencies", []):
for subtask_id in dep.get("depends_on", []):
dependent_subtasks.add(subtask_id)
current_subtasks = []
for subtask in task_breakdown.get("subtasks", []):
if subtask["id"] not in dependent_subtasks:
current_subtasks.append(subtask["id"])
return {
"task_id": task_id,
"task_description": task_description,
"subtasks": task_breakdown.get("subtasks", []),
"dependencies": task_breakdown.get("dependencies", []),
"results": {},
"completed_subtasks": [],
"current_subtasks": current_subtasks,
"status": "in_progress"
}
def execute_subtasks(state: MultiAgentState) -> MultiAgentState:
"""Execute the current subtasks using appropriate worker agents."""
current_subtasks = state["current_subtasks"]
subtasks = state["subtasks"]
results = state["results"].copy()
completed_subtasks = state["completed_subtasks"].copy()
# Find the subtask details for each current subtask
for subtask_id in current_subtasks:
subtask = next((s for s in subtasks if s["id"] == subtask_id), None)
if subtask:
# Execute the subtask with the appropriate agent
agent_type = subtask.get("agent_type", "researcher")
agent = worker_agents.get(agent_type)
if agent:
result = agent.execute_subtask(subtask["description"], {"task_description": state["task_description"]})
results[subtask_id] = result
completed_subtasks.append(subtask_id)
# Find new subtasks that can be started
dependent_subtasks = {}
for dep in state["dependencies"]:
subtask_id = dep.get("id")
depends_on = set(dep.get("depends_on", []))
dependent_subtasks[subtask_id] = depends_on
new_current_subtasks = []
for subtask in subtasks:
subtask_id = subtask["id"]
# If the subtask is not completed and all its dependencies are completed
if subtask_id not in completed_subtasks and subtask_id not in current_subtasks:
dependencies = dependent_subtasks.get(subtask_id, set())
if all(dep in completed_subtasks for dep in dependencies):
new_current_subtasks.append(subtask_id)
# Determine if all subtasks are completed
all_completed = len(completed_subtasks) == len(subtasks)
status = "completed" if all_completed else "in_progress"
return {
**state,
"results": results,
"completed_subtasks": completed_subtasks,
"current_subtasks": new_current_subtasks,
"status": status
}
def integrate_results(state: MultiAgentState) -> MultiAgentState:
"""Integrate the results from all subtasks if the task is completed."""
if state["status"] == "completed":
integrated_result = manager_agent.integrate_results(state["task_id"], state["results"])
return {
**state,
"final_result": integrated_result,
"status": "completed"
}
return state
# Build the multi-agent workflow graph
workflow = StateGraph(MultiAgentState)
workflow.add_node("initialize", initialize_workflow)
workflow.add_node("execute", execute_subtasks)
workflow.add_node("integrate", integrate_results)
# Add edges
workflow.add_edge("initialize", "execute")
workflow.add_conditional_edges(
"execute",
lambda state: "integrate" if state["status"] == "completed" else "execute",
{
"integrate": "integrate",
"execute": "execute"
}
)
workflow.add_conditional_edges(
"integrate",
lambda state: "end",
{
"end": END
}
)
workflow.set_entry_point("initialize")
multi_agent_workflow = workflow.compile()
# API endpoints
@app.post("/tasks", response_model=TaskResponse)
async def create_task(request: TaskRequest):
task_id = str(uuid.uuid4())
# Store the task
tasks[task_id] = {
"id": task_id,
"description": request.task_description,
"priority": request.priority,
"deadline": request.deadline,
"additional_context": request.additional_context,
"status": "pending",
"created_at": datetime.datetime.now().isoformat()
}
# Initialize the workflow
workflow_state = multi_agent_workflow.invoke({
"task_id": task_id,
"task_description": request.task_description,
"subtasks": [],
"dependencies": [],
"results": {},
"completed_subtasks": [],
"current_subtasks": [],
"status": "pending"
})
# Update the task with workflow results
tasks[task_id].update({
"status": workflow_state["status"],
"subtasks": workflow_state["subtasks"],
"results": workflow_state.get("final_result", {}),
"completed_at": datetime.datetime.now().isoformat() if workflow_state["status"] == "completed" else None
})
# Prepare the response
assigned_agents = list(set(subtask["agent_type"] for subtask in workflow_state["subtasks"]))
return TaskResponse(
task_id=task_id,
status=tasks[task_id]["status"],
assigned_agents=assigned_agents,
estimated_completion="1 hour" # Simplified estimate
)
@app.get("/tasks/{task_id}", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
task = tasks[task_id]
# Calculate progress
subtasks = task.get("subtasks", [])
completed_subtasks = task.get("completed_subtasks", [])
progress = len(completed_subtasks) / len(subtasks) if subtasks else 0
# Prepare agent activities
agent_activities = []
for subtask in subtasks:
subtask_id = subtask["id"]
result = task.get("results", {}).get(subtask_id, {})
agent_activities.append({
"agent_type": subtask.get("agent_type", "unknown"),
"subtask": subtask.get("description", ""),
"status": "completed" if subtask_id in completed_subtasks else "pending",
"result": result.get("result", "") if subtask_id in completed_subtasks else ""
})
return TaskStatusResponse(
task_id=task_id,
status=task["status"],
progress=progress,
results=task.get("results", {}),
agent_activities=agent_activities
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
This example demonstrates a centralized multi-agent architecture implemented as a REST API. The manager agent analyzes tasks, breaks them down into subtasks, and assigns them to appropriate worker agents. The worker agents execute their assigned subtasks based on their specialized capabilities. The workflow graph manages the execution flow, ensuring that subtasks are executed in the correct order based on dependencies. The API allows clients to create tasks and monitor their progress.
Agentic AI Architectures
Several architectural styles have emerged for implementing agentic AI systems, each with its own strengths, weaknesses, and appropriate use cases.
The goal-based architecture organizes the agent's behavior around explicit goals. The agent maintains a goal stack or priority queue, selects goals to pursue based on priorities and conditions, and generates plans to achieve those goals. This architecture provides clear focus and purpose but can be rigid when goals need to be adjusted dynamically. Goal-based architectures are appropriate for applications where objectives are well-defined and relatively stable, such as task-oriented assistants or automated workflow systems.
The utility-based architecture has the agent evaluate actions based on their expected utility - a measure of desirability or value. The agent maintains a utility function that maps states or outcomes to numerical values and selects actions that maximize expected utility. This architecture enables flexible decision-making in uncertain environments but requires careful design of utility functions. Utility-based architectures are appropriate for applications where trade-offs between multiple factors need to be considered, such as resource allocation or recommendation systems.
The learning-based architecture emphasizes the agent's ability to improve through experience. The agent maintains models of its environment, actions, and their consequences, and updates these models based on feedback and observations. This architecture enables adaptation and improvement over time but may perform poorly initially before sufficient learning has occurred. Learning-based architectures are appropriate for applications where the environment or requirements change over time, such as personalized assistants or adaptive control systems.
The cognitive architecture models the agent's internal processes after human cognitive functions. The agent maintains multiple specialized modules for perception, memory, reasoning, planning, and learning, integrated through a unified cognitive framework. This architecture enables sophisticated human-like behavior but can be complex and computationally intensive. Cognitive architectures are appropriate for applications where human-like reasoning and behavior are important, such as advanced virtual assistants or social robots.
The tool-based architecture focuses on the agent's ability to use external tools and services. The agent maintains a toolkit of available tools, selects appropriate tools based on the current task, and composes tool usage into effective sequences. This architecture extends the agent's capabilities beyond its built-in functions but requires effective tool selection and result interpretation. Tool-based architectures are appropriate for applications where diverse specialized capabilities are needed, such as research assistants or creative tools.
Here's an example of implementing a goal-based agentic AI system using LangChain, LangGraph, and FastAPI:
fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional, Union
import uvicorn
import uuid
import json
import datetime
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain_community.tools import DuckDuckGoSearchRun
from langgraph.graph import StateGraph, END
app = FastAPI(title="Agentic AI System API")
# Define models for API requests and responses
class GoalRequest(BaseModel):
goal_description: str
user_id: str
context: Optional[Dict[str, Any]] = None
constraints: Optional[List[str]] = None
priority: Optional[int] = 1
class GoalResponse(BaseModel):
goal_id: str
status: str
estimated_steps: int
class GoalStatusRequest(BaseModel):
goal_id: str
class GoalStatusResponse(BaseModel):
goal_id: str
status: str
progress: float
current_step: Optional[str] = None
completed_steps: List[str]
results: Optional[Dict[str, Any]] = None
next_actions: List[str]
# Goal storage (in a production system, use a proper database)
goals = {}
# Define tools
search_tool = DuckDuckGoSearchRun()
@tool
def calculator(expression: str) -> float:
"""Evaluate a mathematical expression."""
return eval(expression)
@tool
def weather_api(location: str) -> Dict[str, Any]:
"""Get weather information for a location."""
# Simulated weather API
return {
"location": location,
"temperature": 72,
"conditions": "partly cloudy",
"forecast": ["sunny", "rainy", "cloudy"]
}
@tool
def calendar_api(action: str, date: str = None, event: str = None) -> Dict[str, Any]:
"""Interact with a calendar API."""
# Simulated calendar API
if action == "check":
return {
"date": date,
"events": ["Meeting at 10 AM", "Lunch at 1 PM"]
}
elif action == "add":
return {
"status": "success",
"message": f"Added event: {event} on {date}"
}
else:
return {
"status": "error",
"message": "Invalid action"
}
# Initialize LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Define the agentic AI system
class AgenticState(TypedDict):
goal_id: str
goal_description: str
user_id: str
context: Dict[str, Any]
constraints: List[str]
plan: List[Dict[str, Any]]
current_step_index: int
completed_steps: List[int]
step_results: Dict[int, Any]
status: str
final_result: Optional[Dict[str, Any]]
def goal_analyzer(state: AgenticState) -> AgenticState:
"""Analyze the goal and create a plan to achieve it."""
goal_description = state["goal_description"]
context = state["context"] or {}
constraints = state["constraints"] or []
context_str = json.dumps(context)
constraints_str = "\n".join([f"- {constraint}" for constraint in constraints])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a goal planning agent. Given a goal description, context, and constraints:
1. Break down the goal into 3-7 concrete, actionable steps
2. For each step, specify:
- A clear description of what needs to be done
- Which tools might be needed (search, calculator, weather_api, calendar_api, or none)
- Dependencies on previous steps (if any)
Return your plan as a JSON array of step objects, each with:
- 'description': detailed description of the step
- 'tools': array of tool names that might be needed
- 'dependencies': array of step indices (0-based) that must be completed first"""),
("user", f"""Goal: {goal_description}
Context: {context_str}
Constraints:
{constraints_str if constraints else "No specific constraints provided."}
Available tools:
- search: Look up information on the web
- calculator: Perform mathematical calculations
- weather_api: Get weather information for a location
- calendar_api: Check or modify calendar events""")
])
response = llm.invoke(prompt)
# Parse the response to extract the plan
try:
plan = json.loads(response.content)
except:
# Fallback if JSON parsing fails
plan = [{"description": "Achieve the goal", "tools": [], "dependencies": []}]
# Find steps with no dependencies (can be started immediately)
current_step_index = next((i for i, step in enumerate(plan) if not step.get("dependencies")), 0)
return {
**state,
"plan": plan,
"current_step_index": current_step_index,
"completed_steps": [],
"step_results": {},
"status": "in_progress"
}
def step_executor(state: AgenticState) -> AgenticState:
"""Execute the current step in the plan."""
plan = state["plan"]
current_step_index = state["current_step_index"]
completed_steps = state["completed_steps"].copy()
step_results = state["step_results"].copy()
# Check if we've completed all steps
if current_step_index >= len(plan):
return {
**state,
"status": "completed"
}
current_step = plan[current_step_index]
step_description = current_step["description"]
potential_tools = current_step.get("tools", [])
# Execute the step
prompt = ChatPromptTemplate.from_messages([
("system", """You are a step execution agent. Execute the current step in the plan.
If tools are needed, specify exactly which tool to use and with what parameters.
Return your result as a JSON object with:
- 'result': the outcome or finding from executing the step
- 'tool_calls': array of objects with 'tool' (name) and 'parameters' (object)
- 'next_actions': array of suggested next actions"""),
("user", f"""Step to execute: {step_description}
Goal context: {state["goal_description"]}
Previous step results: {json.dumps(step_results)}
Potential tools: {json.dumps(potential_tools)}""")
])
response = llm.invoke(prompt)
# Parse the response
try:
execution_result = json.loads(response.content)
except:
# Fallback if JSON parsing fails
execution_result = {
"result": response.content,
"tool_calls": [],
"next_actions": []
}
# Execute any tool calls
tool_results = []
for tool_call in execution_result.get("tool_calls", []):
tool_name = tool_call.get("tool")
parameters = tool_call.get("parameters", {})
if tool_name == "search":
query = parameters.get("query", "")
try:
result = search_tool.invoke(query)
tool_results.append({"tool": "search", "query": query, "result": result})
except Exception as e:
tool_results.append({"tool": "search", "query": query, "error": str(e)})
elif tool_name == "calculator":
expression = parameters.get("expression", "")
try:
result = calculator.invoke(expression)
tool_results.append({"tool": "calculator", "expression": expression, "result": result})
except Exception as e:
tool_results.append({"tool": "calculator", "expression": expression, "error": str(e)})
elif tool_name == "weather_api":
location = parameters.get("location", "")
try:
result = weather_api.invoke(location)
tool_results.append({"tool": "weather_api", "location": location, "result": result})
except Exception as e:
tool_results.append({"tool": "weather_api", "location": location, "error": str(e)})
elif tool_name == "calendar_api":
action = parameters.get("action", "")
date = parameters.get("date", "")
event = parameters.get("event", "")
try:
result = calendar_api.invoke(action, date, event)
tool_results.append({"tool": "calendar_api", "action": action, "result": result})
except Exception as e:
tool_results.append({"tool": "calendar_api", "action": action, "error": str(e)})
# If tools were used, interpret the results
if tool_results:
prompt = ChatPromptTemplate.from_messages([
("system", """You are a tool result interpreter. Given the results from tool calls,
interpret them in the context of the current step and update the step result."""),
("user", f"""Step: {step_description}
Original execution result: {json.dumps(execution_result)}
Tool results: {json.dumps(tool_results)}
Please interpret these results and provide an updated step result.""")
])
response = llm.invoke(prompt)
try:
updated_result = json.loads(response.content)
execution_result.update(updated_result)
except:
execution_result["tool_interpretation"] = response.content
# Store the step result
step_results[current_step_index] = execution_result
completed_steps.append(current_step_index)
# Find the next step to execute
next_step_index = current_step_index + 1
while next_step_index < len(plan):
next_step = plan[next_step_index]
dependencies = next_step.get("dependencies", [])
if all(dep in completed_steps for dep in dependencies):
break
next_step_index += 1
# Check if we've completed all steps
if next_step_index >= len(plan):
status = "ready_for_completion"
else:
status = "in_progress"
return {
**state,
"current_step_index": next_step_index,
"completed_steps": completed_steps,
"step_results": step_results,
"status": status
}
def goal_completer(state: AgenticState) -> AgenticState:
"""Complete the goal by integrating all step results."""
goal_description = state["goal_description"]
plan = state["plan"]
step_results = state["step_results"]
# Prepare a summary of all steps and their results
steps_summary = []
for i, step in enumerate(plan):
result = step_results.get(i, {}).get("result", "No result recorded")
steps_summary.append(f"Step {i}: {step['description']}\nResult: {result}")
steps_str = "\n\n".join(steps_summary)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a goal completion agent. Given the original goal and the results of all steps taken,
create a comprehensive final result that achieves the goal.
Your response should be thorough, well-structured, and directly address the original goal."""),
("user", f"""Original Goal: {goal_description}
Steps Taken and Results:
{steps_str}
Please provide a comprehensive final result that achieves the original goal.""")
])
response = llm.invoke(prompt)
return {
**state,
"final_result": {
"summary": response.content,
"step_results": step_results
},
"status": "completed"
}
# Build the agentic AI workflow graph
workflow = StateGraph(AgenticState)
workflow.add_node("goal_analyzer", goal_analyzer)
workflow.add_node("step_executor", step_executor)
workflow.add_node("goal_completer", goal_completer)
# Add edges
workflow.add_edge("goal_analyzer", "step_executor")
workflow.add_conditional_edges(
"step_executor",
lambda state: state["status"],
{
"in_progress": "step_executor",
"ready_for_completion": "goal_completer",
"completed": "goal_completer"
}
)
workflow.add_conditional_edges(
"goal_completer",
lambda state: "end",
{
"end": END
}
)
workflow.set_entry_point("goal_analyzer")
agentic_workflow = workflow.compile()
# API endpoints
@app.post("/goals", response_model=GoalResponse)
async def create_goal(request: GoalRequest):
goal_id = str(uuid.uuid4())
# Store the goal
goals[goal_id] = {
"id": goal_id,
"description": request.goal_description,
"user_id": request.user_id,
"context": request.context or {},
"constraints": request.constraints or [],
"priority": request.priority,
"status": "pending",
"created_at": datetime.datetime.now().isoformat()
}
# Initialize the workflow
workflow_state = agentic_workflow.invoke({
"goal_id": goal_id,
"goal_description": request.goal_description,
"user_id": request.user_id,
"context": request.context or {},
"constraints": request.constraints or [],
"plan": [],
"current_step_index": 0,
"completed_steps": [],
"step_results": {},
"status": "pending",
"final_result": None
})
# Update the goal with workflow results
goals[goal_id].update({
"status": workflow_state["status"],
"plan": workflow_state["plan"],
"current_step_index": workflow_state["current_step_index"],
"completed_steps": workflow_state["completed_steps"],
"step_results": workflow_state["step_results"],
"final_result": workflow_state.get("final_result"),
"completed_at": datetime.datetime.now().isoformat() if workflow_state["status"] == "completed" else None
})
return GoalResponse(
goal_id=goal_id,
status=goals[goal_id]["status"],
estimated_steps=len(workflow_state["plan"])
)
@app.get("/goals/{goal_id}", response_model=GoalStatusResponse)
async def get_goal_status(goal_id: str):
if goal_id not in goals:
raise HTTPException(status_code=404, detail="Goal not found")
goal = goals[goal_id]
plan = goal.get("plan", [])
# Calculate progress
completed_steps = goal.get("completed_steps", [])
progress = len(completed_steps) / len(plan) if plan else 0
# Get current step description
current_step_index = goal.get("current_step_index", 0)
current_step = plan[current_step_index]["description"] if current_step_index < len(plan) else None
# Get completed step descriptions
completed_step_descriptions = [
plan[i]["description"] for i in completed_steps if i < len(plan)
]
# Determine next actions
next_actions = []
if goal["status"] != "completed" and current_step_index < len(plan):
current_step_result = goal.get("step_results", {}).get(current_step_index, {})
next_actions = current_step_result.get("next_actions", ["Proceed to next step"])
return GoalStatusResponse(
goal_id=goal_id,
status=goal["status"],
progress=progress,
current_step=current_step,
completed_steps=completed_step_descriptions,
results=goal.get("final_result"),
next_actions=next_actions
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
This example demonstrates a goal-based agentic AI architecture implemented as a REST API. The system takes high-level goals from users, breaks them down into concrete steps, executes each step using appropriate tools, and integrates the results to achieve the original goal. The workflow graph manages the execution flow, ensuring that steps are executed in the correct order based on dependencies. The API allows clients to create goals and monitor their progress toward completion.
When to Use Different Agent Architectures
Single AI Agents: Use Cases and Limitations
Single AI agents are most appropriate when:
The task domain is well-defined and relatively narrow. Single agents excel when focused on specific domains where the required knowledge and capabilities are clearly bounded. Examples include specialized assistants for customer service, technical support, or domain-specific information retrieval.
Real-time responsiveness is critical. Single agents typically have simpler decision-making processes than multi-agent systems, enabling faster responses. This makes them suitable for applications like chatbots, voice assistants, or interactive tools where users expect immediate feedback.
Resource constraints limit the deployment of multiple agents. Single agents generally require fewer computational resources than multi-agent systems, making them appropriate for deployment in resource-constrained environments like mobile devices or edge computing scenarios.
The interaction pattern is primarily dyadic (one-to-one). Single agents work well when the primary interaction is between one user and one agent, without the need for complex coordination with other agents or systems.
However, single AI agents have important limitations:
They may struggle with tasks that require diverse expertise across multiple domains. A single agent typically has a bounded set of capabilities and knowledge, making it difficult to handle tasks that span widely different domains.
They can become bottlenecks when handling multiple complex tasks simultaneously. Without the ability to distribute work across multiple specialized agents, a single agent may become overwhelmed when trying to juggle multiple complex tasks.
They often lack the benefit of diverse perspectives that can emerge from multi-agent collaboration. A single agent represents one approach to problem-solving, potentially missing alternative viewpoints or solutions that might emerge from multiple agents with different approaches.
They typically have fixed capabilities that cannot be easily extended or reconfigured dynamically. Adding new capabilities to a single agent often requires retraining or redesigning the entire agent, rather than simply adding a new specialized agent to a team.
Multi-Agent Systems: Use Cases and Limitations
Multi-agent systems are most appropriate when:
The task requires diverse expertise across multiple domains. Multi-agent systems can incorporate specialized agents for different domains, enabling them to handle complex tasks that span multiple areas of expertise. Examples include comprehensive research assistants, enterprise automation systems, or creative collaboration tools.
Scalability and load distribution are important. By distributing tasks across multiple agents, multi-agent systems can handle larger workloads and scale more effectively than single agents. This makes them suitable for applications with high throughput requirements or variable demand.
Robustness and fault tolerance are critical. Multi-agent systems can continue functioning even if some agents fail, providing greater resilience than single-agent approaches. This is valuable in mission-critical applications or environments where continuous operation is essential.
The task benefits from diverse perspectives or approaches. By incorporating multiple agents with different capabilities or approaches, multi-agent systems can explore alternative solutions and potentially find better outcomes than a single agent. This is valuable in creative tasks, complex problem-solving, or scenarios with multiple valid approaches.
However, multi-agent systems have important limitations:
They introduce coordination overhead and complexity. Managing communication and coordination between multiple agents adds complexity to the system design and can introduce latency. This overhead may not be justified for simpler tasks.
They can suffer from communication inefficiencies or misalignments. When agents have different internal representations or communication styles, information may be lost or misinterpreted during inter-agent communication, potentially leading to suboptimal outcomes.
They typically require more computational resources than single-agent approaches. Running multiple agents simultaneously demands more computational power and memory than a single agent, potentially limiting deployment options.
They can be more difficult to design, debug, and maintain. The interactions between multiple agents can create emergent behaviors that are difficult to predict or debug, making system development and maintenance more challenging.
Agentic AI: Use Cases and Limitations
Agentic AI systems are most appropriate when:
Long-term, goal-directed behavior is required. Agentic systems excel at maintaining focus on objectives across extended interactions and multiple steps, making them suitable for complex tasks that require sustained effort. Examples include project management assistants, research agents, or personal productivity tools.
Autonomous decision-making and initiative are valuable. Agentic systems can take initiative, identify opportunities, and make decisions without constant user guidance, making them appropriate for applications where user attention is limited or intermittent.
Tool use and environment interaction are essential. Agentic systems are designed to leverage external tools and interact with their environment, making them suitable for tasks that require integration with various services, APIs, or data sources.
Adaptability to changing conditions or requirements is important. Agentic systems can adjust their strategies based on feedback and changing circumstances, making them valuable in dynamic environments where conditions may change unexpectedly.
However, agentic AI systems have important limitations:
They can be difficult to control or constrain effectively. The autonomy that makes agentic systems powerful also makes them challenging to constrain, potentially leading to unexpected or undesired behaviors if goals or constraints are not carefully specified.
They may pursue goals in ways that don't align with user intentions. Without careful design and alignment mechanisms, agentic systems might optimize for their stated goals in ways that don't match the user's actual intentions or values.
They typically require more sophisticated safety and oversight mechanisms. The autonomy and initiative of agentic systems create greater potential for harm if they malfunction or are misused, necessitating more robust safety measures.
They often have higher latency than simpler reactive systems. The planning and reasoning processes that enable goal-directed behavior can introduce latency, making agentic systems potentially less suitable for applications where millisecond-level responsiveness is critical.
Anthropic MCP: Client and Server Implementation
MCP Client Implementation
The Model Context Protocol (MCP) defines a structured approach for communication between clients and AI models. Here's a detailed implementation of an MCP client that interacts with Anthropic's Claude:
import requests
import json
from typing import Dict, List, Any, Optional, Union
import uuid
class MCPClient:
def __init__(self, api_key: str, base_url: str = "https://api.anthropic.com"):
self.api_key = api_key
self.base_url = base_url
self.default_headers = {
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"
}
def create_message(
self,
messages: List[Dict[str, str]],
system_prompt: Optional[str] = None,
tools: Optional[List[Dict[str, Any]]] = None,
max_tokens: int = 1024,
temperature: float = 0.0,
model: str = "claude-3-opus-20240229"
) -> Dict[str, Any]:
"""
Send a message to Claude using the Model Context Protocol.
Args:
messages: List of message objects with 'role' and 'content'
system_prompt: Optional system prompt to guide the model's behavior
tools: Optional list of tool definitions
max_tokens: Maximum number of tokens to generate
temperature: Sampling temperature (0.0 to 1.0)
model: Model identifier
Returns:
The model's response
"""
endpoint = f"{self.base_url}/v1/messages"
# Format the request according to MCP
request_body = {
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
"messages": messages
}
# Add system prompt if provided
if system_prompt:
request_body["system"] = system_prompt
# Add tools if provided
if tools:
request_body["tools"] = tools
# Send the request
response = requests.post(
endpoint,
headers=self.default_headers,
json=request_body
)
if response.status_code != 200:
raise Exception(f"API request failed with status {response.status_code}: {response.text}")
return response.json()
def handle_tool_use(
self,
response: Dict[str, Any],
tool_results: Dict[str, Any]
) -> Dict[str, Any]:
"""
Handle tool use in the model's response by sending tool results back.
Args:
response: The original model response containing tool use
tool_results: Results from executing the tools
Returns:
The model's updated response after receiving tool results
"""
if "tool_use" not in response:
return response
# Extract the conversation ID and message ID
conversation_id = response.get("conversation_id")
message_id = response.get("id")
if not conversation_id or not message_id:
raise ValueError("Missing conversation_id or message_id in response")
# Prepare the tool results message
tool_results_message = {
"conversation_id": conversation_id,
"message_id": str(uuid.uuid4()),
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": response["tool_use"]["id"],
"result": json.dumps(tool_results)
}
]
}
# Send the tool results
endpoint = f"{self.base_url}/v1/messages"
response = requests.post(
endpoint,
headers=self.default_headers,
json=tool_results_message
)
if response.status_code != 200:
raise Exception(f"Tool results request failed with status {response.status_code}: {response.text}")
return response.json()
def create_conversation(
self,
initial_message: str,
system_prompt: Optional[str] = None,
tools: Optional[List[Dict[str, Any]]] = None,
model: str = "claude-3-opus-20240229"
) -> Dict[str, Any]:
"""
Create a new conversation with Claude.
Args:
initial_message: The first user message
system_prompt: Optional system prompt to guide the model's behavior
tools: Optional list of tool definitions
model: Model identifier
Returns:
Conversation object with initial exchange
"""
messages = [{"role": "user", "content": initial_message}]
response = self.create_message(
messages=messages,
system_prompt=system_prompt,
tools=tools,
model=model
)
conversation = {
"id": response.get("conversation_id", str(uuid.uuid4())),
"messages": messages + [{"role": "assistant", "content": response.get("content", "")}],
"system_prompt": system_prompt,
"tools": tools,
"model": model
}
return conversation
def continue_conversation(
self,
conversation: Dict[str, Any],
user_message: str
) -> Dict[str, Any]:
"""
Continue an existing conversation with a new user message.
Args:
conversation: Existing conversation object
user_message: New user message
Returns:
Updated conversation object
"""
# Add the new user message
conversation["messages"].append({"role": "user", "content": user_message})
# Send the updated conversation to the model
response = self.create_message(
messages=conversation["messages"],
system_prompt=conversation.get("system_prompt"),
tools=conversation.get("tools"),
model=conversation.get("model", "claude-3-opus-20240229")
)
# Add the model's response to the conversation
conversation["messages"].append({"role": "assistant", "content": response.get("content", "")})
# Handle tool use if present
if "tool_use" in response:
# In a real implementation, this would execute the actual tools
# For this example, we'll simulate tool results
tool_name = response["tool_use"]["name"]
tool_input = response["tool_use"]["input"]
# Simulate tool execution
tool_results = {
"result": f"Simulated result for {tool_name} with input {json.dumps(tool_input)}"
}
# Send tool results back to the model
updated_response = self.handle_tool_use(response, tool_results)
# Update the conversation with the new response
conversation["messages"][-1] = {"role": "assistant", "content": updated_response.get("content", "")}
return conversation
This client implementation handles the core aspects of the MCP:
- Message formatting according to the protocol specifications
- System prompts for guiding model behavior
- Tool definitions and handling tool use responses
- Conversation management across multiple turns
- Error handling and response processing
The client can be used to create new conversations, continue existing ones, and handle tool use when the model requests to use a tool.
MCP Server Implementation
While most developers will interact with Anthropic's hosted API as clients, understanding the server-side implementation of MCP can be valuable for creating custom model serving solutions or for understanding how the protocol works internally. Here's a simplified implementation of an MCP server using FastAPI:
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional, Union
import uuid
import json
import datetime
import asyncio
from fastapi.security import APIKeyHeader
app = FastAPI(title="MCP Server Implementation")
# Security scheme for API key authentication
api_key_header = APIKeyHeader(name="x-api-key")
# Define models for API requests and responses
class MessageContent(BaseModel):
type: str = "text"
text: Optional[str] = None
tool_use_id: Optional[str] = None
result: Optional[str] = None
class Message(BaseModel):
role: str
content: Union[str, List[MessageContent]]
class ToolParameter(BaseModel):
type: str
description: Optional[str] = None
enum: Optional[List[str]] = None
required: Optional[bool] = None
class ToolParameters(BaseModel):
type: str = "object"
properties: Dict[str, ToolParameter]
required: Optional[List[str]] = None
class Tool(BaseModel):
name: str
description: str
input_schema: ToolParameters
class ToolUse(BaseModel):
id: str = Field(default_factory=lambda: f"tu_{uuid.uuid4()}")
name: str
input: Dict[str, Any]
class MessageRequest(BaseModel):
model: str
messages: List[Message]
system: Optional[str] = None
max_tokens: Optional[int] = 1024
temperature: Optional[float] = 0.0
tools: Optional[List[Tool]] = None
class MessageResponse(BaseModel):
id: str = Field(default_factory=lambda: f"msg_{uuid.uuid4()}")
type: str = "message"
role: str = "assistant"
content: str
model: str
stop_reason: str = "end_turn"
stop_sequence: Optional[str] = None
usage: Dict[str, int] = Field(default_factory=lambda: {
"input_tokens": 0,
"output_tokens": 0
})
tool_use: Optional[ToolUse] = None
# Simulated model and tool execution
async def simulate_model_response(request: MessageRequest) -> MessageResponse:
"""Simulate a model generating a response based on the input."""
# Extract the last user message
last_message = next((m for m in reversed(request.messages) if m.role == "user"), None)
if not last_message:
return MessageResponse(
content="I don't see a user message to respond to.",
model=request.model
)
# Get the content text
if isinstance(last_message.content, str):
user_content = last_message.content
else:
# Handle structured content
text_parts = [c.text for c in last_message.content if c.type == "text" and c.text]
user_content = " ".join(text_parts) if text_parts else "Empty message"
# Check if this is a tool result
tool_results = [c for c in last_message.content if isinstance(c, dict) and c.get("type") == "tool_result"] if isinstance(last_message.content, list) else []
if tool_results:
# This is a follow-up after tool use
return MessageResponse(
content=f"I've processed the tool results: {tool_results[0].get('result', 'No result')}",
model=request.model
)
# Check if we should use a tool
tools = request.tools or []
should_use_tool = any(tool_keyword in user_content.lower() for tool_keyword in ["weather", "calculator", "search"])
if should_use_tool and tools:
# Select a tool to use
selected_tool = tools[0] # Simplified selection
# Create a tool use response
tool_use = ToolUse(
name=selected_tool.name,
input={"query": user_content} # Simplified input
)
return MessageResponse(
content=f"I need to use the {selected_tool.name} tool to answer your question.",
model=request.model,
tool_use=tool_use
)
# Generate a simple response
await asyncio.sleep(1) # Simulate processing time
# Include system prompt in response if provided
system_context = f" As instructed: '{request.system}'" if request.system else ""
return MessageResponse(
content=f"This is a simulated response to: '{user_content}'.{system_context}",
model=request.model,
usage={
"input_tokens": len(user_content.split()),
"output_tokens": 20 # Simplified token counting
}
)
# Validate API key
async def validate_api_key(api_key: str = Depends(api_key_header)):
if api_key != "test_api_key": # In a real system, check against a secure store
raise HTTPException(status_code=401, detail="Invalid API key")
return api_key
# Endpoints
@app.post("/v1/messages", response_model=MessageResponse)
async def create_message(
request: MessageRequest,
api_key: str = Depends(validate_api_key)
):
"""
Create a new message using the Model Context Protocol.
"""
try:
response = await simulate_model_response(request)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing message: {str(e)}")
# Run the server
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
This server implementation demonstrates the key components of the MCP from the server side:
- Structured message format for both requests and responses
- Support for system prompts to guide model behavior
- Tool definition and tool use handling
- API key authentication for security
- Token usage tracking
The server simulates model responses, including deciding when to use tools based on the input. In a real implementation, this would connect to an actual language model.
Implementation Patterns for AI Agent Systems
Patterns for Single AI Agents
Several design patterns have emerged for implementing effective single AI agents:
The Chain of Thought pattern makes the agent's reasoning process explicit by breaking down complex tasks into step-by-step thinking. The agent articulates its reasoning, considers alternatives, and explains its decisions, leading to more reliable outcomes. This pattern is implemented by prompting the agent to "think step by step" or by structuring the reasoning process into explicit stages.
def chain_of_thought_reasoning(question: str) -> str:
"""Implement chain of thought reasoning for a question."""
llm = ChatOpenAI(model="gpt-4", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a reasoning agent that solves problems step by step.
For each problem:
1. Break down the problem into smaller components
2. Reason about each component separately
3. Combine your reasoning to reach a final answer
4. Verify your answer by checking it against the original problem
Show your complete reasoning process before giving the final answer."""),
("user", question)
])
response = llm.invoke(prompt)
return response.content
The ReAct (Reasoning + Acting) pattern combines reasoning with action taking. The agent alternates between reasoning about the current state and taking actions based on that reasoning. This creates a feedback loop where each action provides new information that informs subsequent reasoning. ReAct is particularly effective for tasks that require both thinking and doing.
def react_pattern(task: str, tools: List[Dict[str, Any]]) -> str:
"""Implement the ReAct pattern for a task with available tools."""
llm = ChatOpenAI(model="gpt-4", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", """You solve tasks through a cycle of Reasoning and Acting.
For each step:
1. REASONING: Think about the current state, what you know, and what you need to find out
2. ACTION: Decide on the next action to take (using available tools)
3. OBSERVATION: Review the result of your action
Continue this cycle until you can provide a final answer.
Available tools:
{tools}""".format(tools=json.dumps(tools, indent=2))),
("user", task)
])
# In a real implementation, this would be a loop that executes actions
# and feeds observations back to the agent
response = llm.invoke(prompt)
return response.content
The Reflection pattern has the agent evaluate and critique its own outputs before finalizing them. The agent generates an initial response, reviews it for errors or improvements, and then refines it based on that reflection. This pattern improves output quality by catching errors and addressing weaknesses before presenting the final result.
def reflection_pattern(task: str) -> str:
"""Implement the Reflection pattern for a task."""
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Generate initial response
initial_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Provide a response to the user's request."),
("user", task)
])
initial_response = llm.invoke(initial_prompt)
# Reflect on the response
reflection_prompt = ChatPromptTemplate.from_messages([
("system", """You are a critical reviewer. Evaluate the following response for:
1. Factual accuracy
2. Completeness
3. Clarity
4. Potential improvements
Provide specific suggestions for improvement."""),
("user", f"Task: {task}\n\nResponse: {initial_response.content}")
])
reflection = llm.invoke(reflection_prompt)
# Generate improved response
improved_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Provide an improved response based on the critique."),
("user", f"""Original Task: {task}
Initial Response: {initial_response.content}
Critique: {reflection.content}
Please provide an improved response that addresses the critique.""")
])
improved_response = llm.invoke(improved_prompt)
return improved_response.content
The Retrieval-Augmented Generation (RAG) pattern enhances the agent's responses by retrieving relevant information from external knowledge sources. The agent analyzes the input, retrieves pertinent information, and incorporates it into the response generation process. This pattern improves accuracy and relevance, especially for domain-specific or factual questions.
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
def rag_pattern(question: str, documents: List[str]) -> str:
"""Implement the RAG pattern for a question with a document collection."""
# Process documents
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
texts = text_splitter.create_documents(documents)
# Create vector store
embeddings = OpenAIEmbeddings()
vector_store = FAISS.from_documents(texts, embeddings)
# Retrieve relevant documents
retriever = vector_store.as_retriever(search_kwargs={"k": 3})
retrieved_docs = retriever.get_relevant_documents(question)
context = "\n\n".join([doc.page_content for doc in retrieved_docs])
# Generate response with retrieved context
llm = ChatOpenAI(model="gpt-4", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a knowledge-based assistant. Use the provided context to answer the question.
If the context doesn't contain the information needed, acknowledge the limitations of your knowledge."""),
("user", f"""Context:
{context}
Question: {question}""")
])
response = llm.invoke(prompt)
return response.content
The Self-Consistency pattern improves reliability by generating multiple independent solutions to the same problem and selecting the most consistent one. The agent solves the problem several times with different approaches or starting points, then compares the results to identify the most reliable answer. This pattern is particularly valuable for complex reasoning or problem-solving tasks.
def self_consistency_pattern(problem: str, num_samples: int = 5) -> str:
"""Implement the Self-Consistency pattern for a problem."""
llm = ChatOpenAI(model="gpt-4", temperature=0.7) # Higher temperature for diversity
# Generate multiple solutions
solutions = []
for i in range(num_samples):
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are solving a problem. This is attempt {i+1} of {num_samples}.
Solve the problem step by step, showing your reasoning clearly.
Provide your final answer in the format: "Final Answer: [your answer]" """),
("user", problem)
])
response = llm.invoke(prompt)
solutions.append(response.content)
# Extract final answers
final_answers = []
for solution in solutions:
# Simple extraction - in a real system, use more robust parsing
if "Final Answer:" in solution:
answer = solution.split("Final Answer:")[-1].strip()
final_answers.append(answer)
# Find the most common answer
from collections import Counter
answer_counts = Counter(final_answers)
most_common_answer, count = answer_counts.most_common(1)[0]
# Calculate consistency score
consistency_score = count / num_samples
return f"Most consistent answer: {most_common_answer} (Consistency: {consistency_score:.2f})"
Patterns for Multi-Agent Systems
Several design patterns have emerged for implementing effective multi-agent systems:
The Manager-Worker pattern organizes agents into a hierarchical structure where a manager agent coordinates the activities of specialized worker agents. The manager breaks down complex tasks, assigns them to appropriate workers, monitors progress, and integrates results. This pattern provides clear control flow and accountability but can create bottlenecks at the manager.
class ManagerWorkerSystem:
def __init__(self, worker_types: List[str]):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.worker_agents = {
worker_type: self.create_worker_agent(worker_type)
for worker_type in worker_types
}
def create_worker_agent(self, worker_type: str) -> Callable:
"""Create a worker agent with specialized capabilities."""
def worker_agent(task: str, context: Dict[str, Any]) -> Dict[str, Any]:
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are a specialized {worker_type} agent.
Execute the assigned task to the best of your abilities.
Focus on your area of expertise: {worker_type}."""),
("user", f"""Task: {task}
Context: {json.dumps(context)}
Please complete this task and provide your result.""")
])
response = self.llm.invoke(prompt)
return {
"worker_type": worker_type,
"task": task,
"result": response.content
}
return worker_agent
def execute_task(self, task: str) -> Dict[str, Any]:
"""Execute a complex task using the manager-worker pattern."""
# Manager breaks down the task
manager_prompt = ChatPromptTemplate.from_messages([
("system", f"""You are a manager agent. Break down the given task into subtasks
that can be assigned to specialized worker agents.
Available worker types: {", ".join(self.worker_agents.keys())}
For each subtask, specify:
1. A clear description of what needs to be done
2. Which worker type should handle it
3. The order or dependencies between subtasks
Return your breakdown as a JSON array of subtask objects."""),
("user", task)
])
manager_response = self.llm.invoke(manager_prompt)
try:
subtasks = json.loads(manager_response.content)
except:
# Fallback if JSON parsing fails
subtasks = [{"description": task, "worker_type": list(self.worker_agents.keys())[0], "order": 1}]
# Execute subtasks in order
results = {}
for subtask in sorted(subtasks, key=lambda x: x.get("order", 0)):
worker_type = subtask.get("worker_type")
if worker_type in self.worker_agents:
worker_result = self.worker_agents[worker_type](
subtask.get("description", ""),
{"original_task": task, "previous_results": results}
)
results[f"subtask_{len(results) + 1}"] = worker_result
# Manager integrates results
integration_prompt = ChatPromptTemplate.from_messages([
("system", """You are a manager agent. Integrate the results from multiple workers
into a coherent final result that addresses the original task."""),
("user", f"""Original Task: {task}
Worker Results: {json.dumps(results, indent=2)}
Please provide an integrated final result.""")
])
integration_response = self.llm.invoke(integration_prompt)
return {
"original_task": task,
"subtasks": subtasks,
"worker_results": results,
"integrated_result": integration_response.content
}
The Peer-to-Peer pattern enables direct communication between agents without central coordination. Agents discover each other's capabilities, negotiate tasks, and collaborate through direct interactions. This pattern is more flexible and resilient than hierarchical approaches but requires more sophisticated coordination mechanisms.
class PeerToPeerSystem:
def __init__(self, agent_capabilities: Dict[str, List[str]]):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.agents = {
agent_id: {"capabilities": capabilities}
for agent_id, capabilities in agent_capabilities.items()
}
def agent_function(self, agent_id: str, message: Dict[str, Any]) -> Dict[str, Any]:
"""Agent function that processes incoming messages and generates responses."""
capabilities = self.agents[agent_id]["capabilities"]
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are agent {agent_id} with capabilities in: {', '.join(capabilities)}.
Process the incoming message and generate an appropriate response.
If the task matches your capabilities, execute it.
If not, suggest which agent might be better suited."""),
("user", f"""Incoming message:
{json.dumps(message, indent=2)}
Your capabilities: {capabilities}
Known agents: {list(self.agents.keys())}
Please process this message and respond appropriately.""")
])
response = self.llm.invoke(prompt)
return {
"agent_id": agent_id,
"in_response_to": message.get("message_id"),
"message_id": str(uuid.uuid4()),
"content": response.content,
"timestamp": datetime.datetime.now().isoformat()
}
def execute_task(self, task: str, initiating_agent: str) -> Dict[str, Any]:
"""Execute a task using peer-to-peer communication."""
# Initialize message history
message_history = []
# Create initial message
initial_message = {
"message_id": str(uuid.uuid4()),
"from_agent": "user",
"to_agent": initiating_agent,
"content": task,
"timestamp": datetime.datetime.now().isoformat()
}
message_history.append(initial_message)
# Maximum number of message exchanges to prevent infinite loops
max_exchanges = 10
current_exchanges = 0
current_message = initial_message
current_recipient = initiating_agent
while current_exchanges < max_exchanges:
# Process the current message
agent_response = self.agent_function(current_recipient, current_message)
message_history.append(agent_response)
# Determine if we need to forward to another agent
response_content = agent_response["content"].lower()
# Check if task is complete
if "task complete" in response_content or "final result" in response_content:
break
# Check if we need to forward to another agent
next_agent = None
for agent_id in self.agents:
if f"forward to {agent_id}" in response_content or f"ask {agent_id}" in response_content:
next_agent = agent_id
break
if next_agent:
# Create forwarding message
forward_message = {
"message_id": str(uuid.uuid4()),
"from_agent": current_recipient,
"to_agent": next_agent,
"content": agent_response["content"],
"original_task": task,
"timestamp": datetime.datetime.now().isoformat()
}
message_history.append(forward_message)
current_message = forward_message
current_recipient = next_agent
else:
# No forwarding needed, respond to the original requester
final_message = {
"message_id": str(uuid.uuid4()),
"from_agent": current_recipient,
"to_agent": "user",
"content": agent_response["content"],
"timestamp": datetime.datetime.now().isoformat()
}
message_history.append(final_message)
break
current_exchanges += 1
return {
"original_task": task,
"message_history": message_history,
"final_result": message_history[-1]["content"]
}
The Blackboard pattern uses a shared data structure (the blackboard) that all agents can read from and write to. Agents monitor the blackboard for relevant information, contribute their expertise when appropriate, and collaborate indirectly through the shared data. This pattern facilitates information sharing without requiring direct agent-to-agent communication.
class BlackboardSystem:
def __init__(self, agent_specialties: Dict[str, str]):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.agents = agent_specialties
self.blackboard = {
"problem": "",
"status": "unsolved",
"contributions": [],
"solution": None
}
def agent_function(self, agent_id: str) -> Dict[str, Any]:
"""Agent function that reads the blackboard and makes contributions."""
specialty = self.agents[agent_id]
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are agent {agent_id}, specializing in {specialty}.
Examine the current state of the blackboard and make a contribution based on your expertise.
If you see that the problem is solved or you have nothing to contribute, indicate that."""),
("user", f"""Current Blackboard State:
Problem: {self.blackboard['problem']}
Status: {self.blackboard['status']}
Previous Contributions:
{json.dumps(self.blackboard['contributions'], indent=2)}
Based on your specialty in {specialty}, what would you contribute to help solve this problem?
If you believe the problem is solved or you have nothing to contribute, state so clearly.""")
])
response = self.llm.invoke(prompt)
contribution = {
"agent_id": agent_id,
"specialty": specialty,
"content": response.content,
"timestamp": datetime.datetime.now().isoformat()
}
return contribution
def controller_function(self) -> str:
"""Controller function that decides which agent should contribute next."""
prompt = ChatPromptTemplate.from_messages([
("system", """You are the controller of a blackboard system.
Based on the current state of the blackboard and previous contributions,
determine which agent should contribute next, or if the problem is solved."""),
("user", f"""Current Blackboard State:
Problem: {self.blackboard['problem']}
Status: {self.blackboard['status']}
Previous Contributions:
{json.dumps(self.blackboard['contributions'], indent=2)}
Available Agents:
{json.dumps(self.agents, indent=2)}
Which agent should contribute next? If you believe the problem is solved,
respond with "SOLVED" and provide a final solution.""")
])
response = self.llm.invoke(prompt)
return response.content
def solve_problem(self, problem: str, max_iterations: int = 5) -> Dict[str, Any]:
"""Solve a problem using the blackboard pattern."""
# Initialize the blackboard
self.blackboard = {
"problem": problem,
"status": "unsolved",
"contributions": [],
"solution": None
}
for iteration in range(max_iterations):
# Get the next agent to contribute
controller_decision = self.controller_function()
# Check if the problem is solved
if "SOLVED" in controller_decision:
self.blackboard["status"] = "solved"
self.blackboard["solution"] = controller_decision.replace("SOLVED", "").strip()
break
# Determine which agent should contribute next
next_agent = None
for agent_id in self.agents:
if agent_id.lower() in controller_decision.lower():
next_agent = agent_id
break
if not next_agent and self.agents:
# Default to the first agent if none is specified
next_agent = list(self.agents.keys())[0]
if next_agent:
# Get contribution from the selected agent
contribution = self.agent_function(next_agent)
self.blackboard["contributions"].append(contribution)
# Check if the agent thinks the problem is solved
if "solved" in contribution["content"].lower() or "solution" in contribution["content"].lower():
# Generate final solution
solution_prompt = ChatPromptTemplate.from_messages([
("system", """You are the final solution synthesizer.
Based on all contributions to the blackboard, provide a comprehensive solution to the problem."""),
("user", f"""Problem: {self.blackboard['problem']}
All Contributions:
{json.dumps(self.blackboard['contributions'], indent=2)}
Please provide a comprehensive final solution.""")
])
solution_response = self.llm.invoke(solution_prompt)
self.blackboard["status"] = "solved"
self.blackboard["solution"] = solution_response.content
break
return self.blackboard
The Competitive Teams pattern organizes agents into teams that compete to solve the same problem. Each team approaches the problem differently, and their solutions are evaluated against predefined criteria. This pattern leverages diversity of thought and approach to find better solutions than any single team might discover.
class CompetitiveTeamsSystem:
def __init__(self, team_configurations: List[Dict[str, Any]]):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.teams = team_configurations
def team_function(self, team_config: Dict[str, Any], problem: str) -> Dict[str, Any]:
"""Team function that generates a solution approach based on team configuration."""
team_name = team_config["name"]
approach = team_config["approach"]
members = team_config["members"]
members_str = "\n".join([f"- {member['role']}: {member['specialty']}" for member in members])
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are team {team_name}, using a {approach} approach to problem-solving.
Your team consists of:
{members_str}
Work together as a team to solve the given problem.
Show the contributions of each team member and how they build on each other's ideas."""),
("user", f"""Problem to solve: {problem}
Please provide your team's solution, showing how each member contributed.""")
])
response = self.llm.invoke(prompt)
return {
"team_name": team_name,
"approach": approach,
"solution": response.content
}
def evaluator_function(self, problem: str, solutions: List[Dict[str, Any]], criteria: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Evaluator function that assesses solutions based on predefined criteria."""
solutions_str = "\n\n".join([
f"Team: {solution['team_name']} ({solution['approach']})\n{solution['solution']}"
for solution in solutions
])
criteria_str = "\n".join([
f"- {criterion['name']} (weight: {criterion['weight']}): {criterion['description']}"
for criterion in criteria
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are an impartial evaluator. Assess the solutions provided by different teams
based on the specified criteria. Provide scores and justifications for each criterion.
Determine the winning solution and explain why it's superior."""),
("user", f"""Problem: {problem}
Evaluation Criteria:
{criteria_str}
Solutions to Evaluate:
{solutions_str}
Please provide your evaluation, including scores for each team on each criterion,
and determine the overall winner.""")
])
response = self.llm.invoke(prompt)
return {
"problem": problem,
"evaluator_assessment": response.content
}
def solve_problem(self, problem: str, evaluation_criteria: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Solve a problem using competitive teams."""
# Generate solutions from each team
team_solutions = []
for team_config in self.teams:
solution = self.team_function(team_config, problem)
team_solutions.append(solution)
# Evaluate the solutions
evaluation = self.evaluator_function(problem, team_solutions, evaluation_criteria)
return {
"problem": problem,
"team_solutions": team_solutions,
"evaluation": evaluation
}
The Consensus Building pattern has agents work together to reach agreement on complex decisions. Agents share their perspectives, critique each other's views, and iteratively refine their collective understanding until they reach consensus. This pattern is valuable for decisions that benefit from diverse viewpoints and careful deliberation.
class ConsensusSystem:
def __init__(self, agent_perspectives: Dict[str, str]):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.agents = agent_perspectives
def agent_perspective(self, agent_id: str, issue: str, previous_perspectives: List[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Generate an agent's perspective on an issue."""
perspective_type = self.agents[agent_id]
previous = ""
if previous_perspectives:
previous = "Previous perspectives:\n" + "\n".join([
f"- {p['agent_id']} ({p['perspective_type']}): {p['summary']}"
for p in previous_perspectives
])
prompt = ChatPromptTemplate.from_messages([
("system", f"""You represent the {perspective_type} perspective on issues.
Provide your view on the given issue, emphasizing aspects that would be important
from a {perspective_type} standpoint."""),
("user", f"""Issue to consider: {issue}
{previous}
Please provide your perspective as {agent_id} ({perspective_type}).""")
])
response = self.llm.invoke(prompt)
# Extract a summary (first paragraph or first 100 characters)
content = response.content
summary = content.split("\n\n")[0] if "\n\n" in content else content[:100] + "..."
return {
"agent_id": agent_id,
"perspective_type": perspective_type,
"content": content,
"summary": summary
}
def critique_perspectives(self, issue: str, perspectives: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Generate critiques of each perspective."""
critiques = []
for perspective in perspectives:
agent_id = perspective["agent_id"]
# Combine all other perspectives
other_perspectives = "\n\n".join([
f"{p['agent_id']} ({p['perspective_type']}): {p['content']}"
for p in perspectives if p['agent_id'] != agent_id
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a critical evaluator. Review the given perspective in light of other perspectives.
Identify strengths, weaknesses, blind spots, and areas where this perspective might be enhanced
by incorporating insights from others."""),
("user", f"""Issue: {issue}
Perspective to critique:
{agent_id} ({perspective['perspective_type']}): {perspective['content']}
Other perspectives:
{other_perspectives}
Please provide a balanced critique of the {agent_id} perspective.""")
])
response = self.llm.invoke(prompt)
critiques.append({
"target_agent": agent_id,
"content": response.content
})
return critiques
def synthesize_consensus(self, issue: str, perspectives: List[Dict[str, Any]], critiques: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Synthesize a consensus view from multiple perspectives and critiques."""
perspectives_str = "\n\n".join([
f"{p['agent_id']} ({p['perspective_type']}): {p['content']}"
for p in perspectives
])
critiques_str = "\n\n".join([
f"Critique of {c['target_agent']}: {c['content']}"
for c in critiques
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a consensus builder. Synthesize multiple perspectives and their critiques
into a balanced, comprehensive consensus view that incorporates the strengths of each perspective
while addressing their weaknesses."""),
("user", f"""Issue: {issue}
Perspectives:
{perspectives_str}
Critiques:
{critiques_str}
Please synthesize a consensus view that represents the best integration of these perspectives.""")
])
response = self.llm.invoke(prompt)
return {
"issue": issue,
"consensus": response.content
}
def build_consensus(self, issue: str) -> Dict[str, Any]:
"""Build consensus on an issue through multiple rounds of perspective sharing and critique."""
# Round 1: Initial perspectives
perspectives = []
for agent_id in self.agents:
perspective = self.agent_perspective(agent_id, issue)
perspectives.append(perspective)
# Round 2: Critiques
critiques = self.critique_perspectives(issue, perspectives)
# Round 3: Refined perspectives
refined_perspectives = []
for agent_id in self.agents:
# Find critiques targeting this agent
agent_critiques = [c for c in critiques if c["target_agent"] == agent_id]
critiques_str = "\n\n".join([c["content"] for c in agent_critiques])
prompt = ChatPromptTemplate.from_messages([
("system", f"""You represent the {self.agents[agent_id]} perspective on issues.
Review the critiques of your initial perspective and provide a refined view that
addresses these critiques while maintaining your core values and insights."""),
("user", f"""Issue: {issue}
Your initial perspective:
{next(p['content'] for p in perspectives if p['agent_id'] == agent_id)}
Critiques of your perspective:
{critiques_str}
Please provide your refined perspective.""")
])
response = self.llm.invoke(prompt)
refined_perspectives.append({
"agent_id": agent_id,
"perspective_type": self.agents[agent_id],
"content": response.content,
"summary": response.content.split("\n\n")[0] if "\n\n" in response.content else response.content[:100] + "..."
})
# Final consensus
consensus = self.synthesize_consensus(issue, refined_perspectives, critiques)
return {
"issue": issue,
"initial_perspectives": perspectives,
"critiques": critiques,
"refined_perspectives": refined_perspectives,
"consensus": consensus["consensus"]
}
Patterns for Agentic AI
Several design patterns have emerged for implementing effective agentic AI systems:
The Goal Decomposition pattern breaks down high-level goals into hierarchical subgoals that are easier to plan for and execute. The agent maintains a goal tree, refines goals into more specific subgoals, and works on them in an appropriate order. This pattern makes complex goals more manageable and enables more effective planning.
class GoalDecompositionAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
def decompose_goal(self, goal: str) -> Dict[str, Any]:
"""Decompose a high-level goal into a hierarchical structure of subgoals."""
prompt = ChatPromptTemplate.from_messages([
("system", """You are a goal decomposition specialist. Break down high-level goals into
hierarchical structures of subgoals that are specific, measurable, and actionable.
For each subgoal, specify:
1. A clear description
2. Success criteria
3. Dependencies on other subgoals
Return your decomposition as a JSON object with:
- 'main_goal': the original goal
- 'subgoals': array of first-level subgoals
- Each subgoal has 'description', 'success_criteria', 'dependencies', and optional 'children'"""),
("user", f"Goal to decompose: {goal}")
])
response = self.llm.invoke(prompt)
try:
goal_structure = json.loads(response.content)
return goal_structure
except:
# Fallback if JSON parsing fails
return {
"main_goal": goal,
"subgoals": [{"description": goal, "success_criteria": "Goal achieved", "dependencies": []}]
}
def plan_for_subgoal(self, subgoal: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
"""Create a plan to achieve a specific subgoal."""
prompt = ChatPromptTemplate.from_messages([
("system", """You are a planning specialist. Create a detailed plan to achieve the given subgoal.
Your plan should include:
1. Specific steps to take
2. Resources needed
3. Potential obstacles and how to overcome them
4. How to verify success
Consider the context and dependencies in your planning."""),
("user", f"""Subgoal: {subgoal['description']}
Success Criteria: {subgoal['success_criteria']}
Dependencies: {subgoal.get('dependencies', [])}
Context: {json.dumps(context)}
Please create a detailed plan for achieving this subgoal.""")
])
response = self.llm.invoke(prompt)
return {
"subgoal": subgoal,
"plan": response.content
}
def execute_plan(self, plan: Dict[str, Any], available_tools: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Execute a plan for a subgoal using available tools."""
subgoal = plan["subgoal"]
plan_details = plan["plan"]
tools_str = "\n".join([f"- {tool['name']}: {tool['description']}" for tool in available_tools])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a plan execution specialist. Execute the given plan using available tools.
For each step in the plan:
1. Determine if tools are needed
2. Select the appropriate tool and parameters
3. Interpret the results and decide on next steps
Document your execution process and results."""),
("user", f"""Subgoal: {subgoal['description']}
Plan:
{plan_details}
Available Tools:
{tools_str}
Please execute this plan and report your results.""")
])
response = self.llm.invoke(prompt)
return {
"subgoal": subgoal,
"plan": plan_details,
"execution_results": response.content,
"status": "completed" # In a real system, this would be determined based on actual execution
}
def achieve_goal(self, goal: str, context: Dict[str, Any], available_tools: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Achieve a goal through decomposition, planning, and execution."""
# Decompose the goal
goal_structure = self.decompose_goal(goal)
# Track results for each subgoal
subgoal_results = {}
# Process subgoals in dependency order (simplified approach)
for subgoal in goal_structure.get("subgoals", []):
# Check if dependencies are met
dependencies = subgoal.get("dependencies", [])
dependencies_met = all(dep in subgoal_results and subgoal_results[dep]["status"] == "completed"
for dep in dependencies)
if dependencies_met:
# Create plan for the subgoal
plan = self.plan_for_subgoal(subgoal, {
**context,
"completed_subgoals": {
sg: subgoal_results[sg]["execution_results"]
for sg in subgoal_results
}
})
# Execute the plan
result = self.execute_plan(plan, available_tools)
# Store the result
subgoal_results[subgoal["description"]] = result
# Synthesize overall result
synthesis_prompt = ChatPromptTemplate.from_messages([
("system", """You are a goal achievement synthesizer. Create a comprehensive summary of the
goal achievement process, including the decomposition, planning, execution, and results."""),
("user", f"""Original Goal: {goal}
Goal Structure:
{json.dumps(goal_structure, indent=2)}
Subgoal Results:
{json.dumps(subgoal_results, indent=2)}
Please provide a comprehensive summary of the goal achievement process.""")
])
synthesis_response = self.llm.invoke(synthesis_prompt)
return {
"original_goal": goal,
"goal_structure": goal_structure,
"subgoal_results": subgoal_results,
"overall_summary": synthesis_response.content
}
The Tool Selection pattern helps agents choose the most appropriate tools for specific tasks. The agent analyzes the task requirements, evaluates available tools based on their capabilities and constraints, and selects the best tool or combination of tools. This pattern improves the effectiveness of tool use by ensuring that the right tools are used for each task.
class ToolSelectionAgent:
def __init__(self, available_tools: List[Dict[str, Any]]):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.available_tools = available_tools
def analyze_task(self, task: str) -> Dict[str, Any]:
"""Analyze a task to determine its requirements and constraints."""
prompt = ChatPromptTemplate.from_messages([
("system", """You are a task analysis specialist. Analyze the given task to determine:
1. Key requirements
2. Constraints or limitations
3. Types of information or capabilities needed
4. Success criteria
Return your analysis as a structured assessment."""),
("user", f"Task to analyze: {task}")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"analysis": response.content
}
def evaluate_tools(self, task_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate available tools based on task requirements."""
task = task_analysis["task"]
analysis = task_analysis["analysis"]
tools_str = "\n".join([
f"- {tool['name']}: {tool['description']}\n Capabilities: {', '.join(tool.get('capabilities', []))}\n Limitations: {', '.join(tool.get('limitations', []))}"
for tool in self.available_tools
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a tool evaluation specialist. Evaluate the available tools based on the task requirements.
For each tool, assess:
1. Relevance to the task
2. Strengths for this specific task
3. Limitations or drawbacks for this task
4. Overall suitability score (0-10)
Return your evaluation as a structured assessment of each tool."""),
("user", f"""Task: {task}
Task Analysis:
{analysis}
Available Tools:
{tools_str}
Please evaluate the suitability of each tool for this task.""")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"analysis": analysis,
"tool_evaluation": response.content
}
def select_tools(self, evaluation: Dict[str, Any]) -> Dict[str, Any]:
"""Select the most appropriate tool(s) based on the evaluation."""
task = evaluation["task"]
analysis = evaluation["analysis"]
tool_evaluation = evaluation["tool_evaluation"]
prompt = ChatPromptTemplate.from_messages([
("system", """You are a tool selection specialist. Based on the task requirements and tool evaluations,
select the most appropriate tool or combination of tools.
Your selection should include:
1. Primary tool(s) to use
2. Justification for your selection
3. How to use the selected tool(s) effectively
4. Any complementary tools that might be needed
Make your selection based on the best match between task requirements and tool capabilities."""),
("user", f"""Task: {task}
Task Analysis:
{analysis}
Tool Evaluation:
{tool_evaluation}
Available Tools:
{json.dumps(self.available_tools, indent=2)}
Please select the most appropriate tool(s) for this task.""")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"selected_tools": response.content
}
def execute_with_tools(self, selection: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the task using the selected tools."""
task = selection["task"]
selected_tools = selection["selected_tools"]
prompt = ChatPromptTemplate.from_messages([
("system", """You are a tool execution specialist. Execute the given task using the selected tools.
Document your process, including:
1. How you used each tool
2. The results obtained
3. Any adjustments made during execution
4. The final outcome
Be thorough in your execution and documentation."""),
("user", f"""Task: {task}
Selected Tools:
{selected_tools}
Please execute this task using the selected tools and report your results.""")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"selected_tools": selected_tools,
"execution_results": response.content
}
def complete_task(self, task: str) -> Dict[str, Any]:
"""Complete a task through tool selection and execution."""
# Analyze the task
task_analysis = self.analyze_task(task)
# Evaluate available tools
tool_evaluation = self.evaluate_tools(task_analysis)
# Select appropriate tools
tool_selection = self.select_tools(tool_evaluation)
# Execute with selected tools
execution_results = self.execute_with_tools(tool_selection)
return {
"task": task,
"task_analysis": task_analysis["analysis"],
"tool_evaluation": tool_evaluation["tool_evaluation"],
"tool_selection": tool_selection["selected_tools"],
"execution_results": execution_results["execution_results"]
}
The Feedback Integration pattern enables agents to learn from experience by incorporating feedback into their decision-making processes. The agent collects feedback on its actions, analyzes patterns and trends, and adjusts its strategies accordingly. This pattern improves performance over time through continuous learning and adaptation.
class FeedbackIntegrationAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.feedback_history = []
self.strategy_adjustments = []
def execute_task(self, task: str, current_strategy: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a task using the current strategy."""
strategy_str = json.dumps(current_strategy, indent=2)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a task execution specialist. Execute the given task using the specified strategy.
Document your process, including:
1. Steps taken
2. Decisions made
3. Results obtained
Be thorough in your execution and documentation."""),
("user", f"""Task: {task}
Current Strategy:
{strategy_str}
Please execute this task and report your results.""")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"strategy": current_strategy,
"execution_results": response.content,
"timestamp": datetime.datetime.now().isoformat()
}
def collect_feedback(self, execution_result: Dict[str, Any], feedback_source: str, feedback: str) -> Dict[str, Any]:
"""Collect feedback on a task execution."""
feedback_entry = {
"task": execution_result["task"],
"strategy": execution_result["strategy"],
"execution_results": execution_result["execution_results"],
"feedback_source": feedback_source,
"feedback": feedback,
"timestamp": datetime.datetime.now().isoformat()
}
self.feedback_history.append(feedback_entry)
return feedback_entry
def analyze_feedback(self) -> Dict[str, Any]:
"""Analyze collected feedback to identify patterns and trends."""
if not self.feedback_history:
return {"analysis": "No feedback available for analysis."}
feedback_str = "\n\n".join([
f"Task: {entry['task']}\nStrategy: {json.dumps(entry['strategy'])}\nFeedback ({entry['feedback_source']}): {entry['feedback']}"
for entry in self.feedback_history
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a feedback analysis specialist. Analyze the collected feedback to identify:
1. Common patterns or themes
2. Strengths in current strategies
3. Areas for improvement
4. Specific recommendations for strategy adjustments
Provide a comprehensive analysis that can guide strategy refinement."""),
("user", f"""Feedback History:
{feedback_str}
Please analyze this feedback and provide insights for strategy improvement.""")
])
response = self.llm.invoke(prompt)
analysis = {
"feedback_count": len(self.feedback_history),
"analysis": response.content,
"timestamp": datetime.datetime.now().isoformat()
}
return analysis
def adjust_strategy(self, current_strategy: Dict[str, Any], feedback_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Adjust the current strategy based on feedback analysis."""
strategy_str = json.dumps(current_strategy, indent=2)
analysis = feedback_analysis["analysis"]
prompt = ChatPromptTemplate.from_messages([
("system", """You are a strategy adjustment specialist. Refine the current strategy based on feedback analysis.
Your adjusted strategy should:
1. Address identified weaknesses
2. Build on recognized strengths
3. Incorporate specific recommendations from the analysis
4. Maintain coherence and effectiveness
Provide a clear explanation of your adjustments and their rationale."""),
("user", f"""Current Strategy:
{strategy_str}
Feedback Analysis:
{analysis}
Please provide an adjusted strategy based on this feedback.""")
])
response = self.llm.invoke(prompt)
# Try to parse the response as JSON if it appears to be in that format
adjusted_strategy = None
if response.content.strip().startswith("{") and response.content.strip().endswith("}"):
try:
adjusted_strategy = json.loads(response.content)
except:
adjusted_strategy = None
if not adjusted_strategy:
# If parsing failed or response wasn't JSON, use the full text
adjusted_strategy = {
"description": "Adjusted strategy based on feedback",
"details": response.content
}
adjustment = {
"previous_strategy": current_strategy,
"feedback_analysis": feedback_analysis,
"adjusted_strategy": adjusted_strategy,
"timestamp": datetime.datetime.now().isoformat()
}
self.strategy_adjustments.append(adjustment)
return adjusted_strategy
def execute_with_learning(self, task: str, initial_strategy: Dict[str, Any], feedback_sources: List[str], iterations: int = 3) -> Dict[str, Any]:
"""Execute a task multiple times with learning from feedback."""
current_strategy = initial_strategy
execution_history = []
for i in range(iterations):
# Execute the task
execution_result = self.execute_task(task, current_strategy)
execution_history.append(execution_result)
# Collect simulated feedback
for source in feedback_sources:
feedback_prompt = ChatPromptTemplate.from_messages([
("system", f"""You are a {source} providing feedback on task execution.
Evaluate the execution results and provide constructive feedback that can help improve future performance.
Be specific about strengths, weaknesses, and suggestions for improvement."""),
("user", f"""Task: {task}
Strategy Used:
{json.dumps(current_strategy, indent=2)}
Execution Results:
{execution_result['execution_results']}
Please provide your feedback as {source}.""")
])
feedback_response = self.llm.invoke(feedback_prompt)
self.collect_feedback(execution_result, source, feedback_response.content)
# Analyze feedback and adjust strategy
if i < iterations - 1: # Don't adjust after the final iteration
feedback_analysis = self.analyze_feedback()
current_strategy = self.adjust_strategy(current_strategy, feedback_analysis)
# Final summary
summary_prompt = ChatPromptTemplate.from_messages([
("system", """You are a learning process summarizer. Create a comprehensive summary of the
task execution process across multiple iterations, highlighting how the strategy evolved
based on feedback and how performance changed as a result."""),
("user", f"""Task: {task}
Initial Strategy:
{json.dumps(initial_strategy, indent=2)}
Execution History:
{json.dumps(execution_history, indent=2)}
Strategy Adjustments:
{json.dumps(self.strategy_adjustments, indent=2)}
Please provide a comprehensive summary of the learning process.""")
])
summary_response = self.llm.invoke(summary_prompt)
return {
"task": task,
"initial_strategy": initial_strategy,
"final_strategy": current_strategy,
"execution_history": execution_history,
"feedback_history": self.feedback_history,
"strategy_adjustments": self.strategy_adjustments,
"learning_summary": summary_response.content
}
The Ethical Guardrails pattern ensures that agent behavior aligns with ethical principles and user values. The agent evaluates potential actions against ethical guidelines, identifies potential harms or risks, and adjusts its behavior accordingly. This pattern helps prevent harmful outputs and ensures that the agent acts in accordance with user intentions and values.
class EthicalGuardrailsAgent:
def __init__(self, ethical_principles: List[Dict[str, str]]):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
self.ethical_principles = ethical_principles
def evaluate_task(self, task: str) -> Dict[str, Any]:
"""Evaluate a task for potential ethical concerns."""
principles_str = "\n".join([f"- {p['name']}: {p['description']}" for p in self.ethical_principles])
prompt = ChatPromptTemplate.from_messages([
("system", """You are an ethical evaluation specialist. Evaluate the given task for potential ethical concerns.
Consider:
1. Potential harms or risks
2. Alignment with ethical principles
3. Possible unintended consequences
4. Stakeholders who might be affected
Provide a thorough ethical assessment."""),
("user", f"""Task: {task}
Ethical Principles to Consider:
{principles_str}
Please evaluate this task for ethical concerns.""")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"ethical_evaluation": response.content
}
def design_guardrails(self, task: str, evaluation: Dict[str, Any]) -> Dict[str, Any]:
"""Design ethical guardrails based on the evaluation."""
ethical_evaluation = evaluation["ethical_evaluation"]
prompt = ChatPromptTemplate.from_messages([
("system", """You are a guardrail design specialist. Based on the ethical evaluation,
design specific guardrails to ensure the task is executed ethically.
Your guardrails should include:
1. Specific constraints on actions or outputs
2. Monitoring mechanisms to detect potential issues
3. Intervention protocols if problems arise
4. Transparency requirements
Make your guardrails specific, actionable, and effective."""),
("user", f"""Task: {task}
Ethical Evaluation:
{ethical_evaluation}
Please design ethical guardrails for this task.""")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"ethical_evaluation": ethical_evaluation,
"guardrails": response.content
}
def apply_guardrails(self, task: str, plan: str, guardrails: Dict[str, Any]) -> Dict[str, Any]:
"""Apply ethical guardrails to a task execution plan."""
guardrails_content = guardrails["guardrails"]
prompt = ChatPromptTemplate.from_messages([
("system", """You are a guardrail application specialist. Review the task execution plan
and apply the ethical guardrails to ensure ethical execution.
Your review should:
1. Identify any aspects of the plan that might violate guardrails
2. Suggest specific modifications to address concerns
3. Add monitoring and transparency elements
4. Ensure the modified plan still achieves the core objectives
Provide a revised plan that incorporates the guardrails."""),
("user", f"""Task: {task}
Original Execution Plan:
{plan}
Ethical Guardrails:
{guardrails_content}
Please review and revise this plan to incorporate the ethical guardrails.""")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"original_plan": plan,
"guardrails": guardrails_content,
"revised_plan": response.content
}
def execute_with_guardrails(self, task: str) -> Dict[str, Any]:
"""Execute a task with ethical guardrails."""
# Initial plan creation
plan_prompt = ChatPromptTemplate.from_messages([
("system", """You are a planning specialist. Create a detailed plan to execute the given task.
Your plan should include specific steps, methods, and expected outcomes."""),
("user", f"Task to plan: {task}")
])
plan_response = self.llm.invoke(plan_prompt)
original_plan = plan_response.content
# Ethical evaluation
evaluation = self.evaluate_task(task)
# Design guardrails
guardrails = self.design_guardrails(task, evaluation)
# Apply guardrails to plan
revised_plan = self.apply_guardrails(task, original_plan, guardrails)
# Execute with guardrails
execution_prompt = ChatPromptTemplate.from_messages([
("system", """You are a task execution specialist with strong ethical awareness.
Execute the given task according to the ethically revised plan.
During execution:
1. Continuously monitor for ethical concerns
2. Apply the specified guardrails
3. Document any ethical considerations that arise
4. Adjust your approach if needed to maintain ethical alignment
Provide a detailed account of your execution process and results."""),
("user", f"""Task: {task}
Ethically Revised Plan:
{revised_plan['revised_plan']}
Ethical Guardrails:
{guardrails['guardrails']}
Please execute this task according to the ethically revised plan.""")
])
execution_response = self.llm.invoke(execution_prompt)
return {
"task": task,
"ethical_evaluation": evaluation["ethical_evaluation"],
"guardrails": guardrails["guardrails"],
"original_plan": original_plan,
"revised_plan": revised_plan["revised_plan"],
"execution_results": execution_response.content
}
The Metacognitive Monitoring pattern enables agents to reflect on their own cognitive processes and adjust their approach based on self-assessment. The agent monitors its progress, identifies when it's stuck or making errors, and adapts its strategy accordingly. This pattern improves robustness and effectiveness by enabling the agent to recover from failures and optimize its approach.
class MetacognitiveAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4", temperature=0)
def set_task_parameters(self, task: str) -> Dict[str, Any]:
"""Set initial parameters for task execution based on task analysis."""
prompt = ChatPromptTemplate.from_messages([
("system", """You are a task analysis specialist. Analyze the given task to determine:
1. Key success criteria
2. Potential challenges or pitfalls
3. Appropriate monitoring metrics
4. Stopping conditions
Return your analysis as a structured assessment."""),
("user", f"Task to analyze: {task}")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"parameters": response.content
}
def execute_step(self, task: str, parameters: str, previous_steps: List[Dict[str, Any]], current_state: str) -> Dict[str, Any]:
"""Execute a single step with metacognitive monitoring."""
previous_steps_str = "\n\n".join([
f"Step {i+1}:\nAction: {step['action']}\nResult: {step['result']}\nReflection: {step['reflection']}"
for i, step in enumerate(previous_steps)
]) if previous_steps else "No previous steps."
prompt = ChatPromptTemplate.from_messages([
("system", """You are an agent with metacognitive monitoring capabilities.
Execute the next step toward completing the task, while monitoring your own thinking process.
Your response should include:
1. Action: What specific action you're taking for this step
2. Result: The outcome of your action
3. Reflection: Your assessment of the result, including:
- How effective was this step?
- Are you making progress toward the goal?
- Do you need to adjust your approach?
- Are there any errors or misconceptions to correct?
4. Next State: The updated state after this step
Be thorough in your metacognitive monitoring."""),
("user", f"""Task: {task}
Parameters:
{parameters}
Previous Steps:
{previous_steps_str}
Current State:
{current_state}
Please execute the next step with metacognitive monitoring.""")
])
response = self.llm.invoke(prompt)
# Parse the response to extract components (simplified parsing)
content = response.content
action = ""
result = ""
reflection = ""
next_state = current_state # Default if parsing fails
if "Action:" in content:
action_parts = content.split("Action:")[1].split("Result:")[0] if "Result:" in content else content.split("Action:")[1]
action = action_parts.strip()
if "Result:" in content:
result_parts = content.split("Result:")[1].split("Reflection:")[0] if "Reflection:" in content else content.split("Result:")[1]
result = result_parts.strip()
if "Reflection:" in content:
reflection_parts = content.split("Reflection:")[1].split("Next State:")[0] if "Next State:" in content else content.split("Reflection:")[1]
reflection = reflection_parts.strip()
if "Next State:" in content:
next_state_parts = content.split("Next State:")[1]
next_state = next_state_parts.strip()
return {
"action": action,
"result": result,
"reflection": reflection,
"next_state": next_state,
"full_response": content
}
def evaluate_progress(self, task: str, parameters: str, steps: List[Dict[str, Any]], current_state: str) -> Dict[str, Any]:
"""Evaluate overall progress and decide whether to continue, adjust, or complete."""
steps_str = "\n\n".join([
f"Step {i+1}:\nAction: {step['action']}\nResult: {step['result']}\nReflection: {step['reflection']}"
for i, step in enumerate(steps)
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a metacognitive evaluation specialist. Assess the overall progress toward the task goal.
Your evaluation should determine:
1. Current progress level (percentage toward completion)
2. Whether the current approach is effective
3. Whether adjustments are needed
4. Whether the task is complete
Provide a decision on how to proceed: 'continue' with current approach, 'adjust' the approach, or 'complete' the task."""),
("user", f"""Task: {task}
Parameters:
{parameters}
Steps Taken:
{steps_str}
Current State:
{current_state}
Please evaluate progress and provide a decision.""")
])
response = self.llm.invoke(prompt)
# Simple decision extraction (in a real system, use more robust parsing)
decision = "continue" # Default
if "adjust" in response.content.lower():
decision = "adjust"
elif "complete" in response.content.lower() and not "not complete" in response.content.lower():
decision = "complete"
return {
"evaluation": response.content,
"decision": decision
}
def adjust_approach(self, task: str, parameters: str, steps: List[Dict[str, Any]], current_state: str, evaluation: Dict[str, Any]) -> Dict[str, Any]:
"""Adjust the approach based on metacognitive evaluation."""
steps_str = "\n\n".join([
f"Step {i+1}:\nAction: {step['action']}\nResult: {step['result']}\nReflection: {step['reflection']}"
for i, step in enumerate(steps)
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are an approach adjustment specialist. Based on the metacognitive evaluation,
design an adjusted approach to the task.
Your adjustment should:
1. Address identified issues or inefficiencies
2. Build on what has worked well
3. Provide clear direction for next steps
4. Include updated parameters if needed
Provide a comprehensive adjustment plan."""),
("user", f"""Task: {task}
Current Parameters:
{parameters}
Steps Taken:
{steps_str}
Current State:
{current_state}
Evaluation:
{evaluation['evaluation']}
Please provide an adjusted approach.""")
])
response = self.llm.invoke(prompt)
return {
"adjusted_approach": response.content,
"original_parameters": parameters
}
def complete_task(self, task: str, steps: List[Dict[str, Any]], current_state: str) -> Dict[str, Any]:
"""Generate final output and metacognitive summary."""
steps_str = "\n\n".join([
f"Step {i+1}:\nAction: {step['action']}\nResult: {step['result']}\nReflection: {step['reflection']}"
for i, step in enumerate(steps)
])
prompt = ChatPromptTemplate.from_messages([
("system", """You are a task completion specialist with metacognitive awareness.
Provide a final result for the task and a metacognitive summary of the process.
Your response should include:
1. Final Result: The complete answer or solution to the task
2. Metacognitive Summary: Analysis of the problem-solving process, including:
- Key insights or realizations
- Effective strategies
- Challenges and how they were overcome
- Lessons for future tasks
Be thorough and insightful in your metacognitive summary."""),
("user", f"""Task: {task}
Steps Taken:
{steps_str}
Current State:
{current_state}
Please provide the final result and metacognitive summary.""")
])
response = self.llm.invoke(prompt)
return {
"task": task,
"final_result": response.content,
"steps_taken": len(steps)
}
def execute_with_metacognition(self, task: str, max_steps: int = 10) -> Dict[str, Any]:
"""Execute a task with metacognitive monitoring."""
# Set initial parameters
parameters = self.set_task_parameters(task)
steps = []
current_state = "Initial state. No actions taken yet."
for i in range(max_steps):
# Execute a step
step_result = self.execute_step(task, parameters["parameters"], steps, current_state)
steps.append(step_result)
current_state = step_result["next_state"]
# Evaluate progress
evaluation = self.evaluate_progress(task, parameters["parameters"], steps, current_state)
# Decide how to proceed
if evaluation["decision"] == "complete":
break
elif evaluation["decision"] == "adjust":
adjustment = self.adjust_approach(task, parameters["parameters"], steps, current_state, evaluation)
parameters["parameters"] = adjustment["adjusted_approach"]
# Complete the task
completion = self.complete_task(task, steps, current_state)
return {
"task": task,
"parameters": parameters["parameters"],
"steps": steps,
"final_state": current_state,
"final_result": completion["final_result"]
}
Conclusion
AI agents represent a significant evolution in artificial intelligence, moving from passive, query-based systems to proactive, goal-directed entities that can take autonomous actions. This article has explored the implementation of both single and multi-agent systems using LangChain and LangGraph, along with communication protocols like Anthropic's MCP and Google's A2A Protocol.
The detailed discussion of agent architectures, implementation patterns, and practical examples demonstrates the rich design space available for creating specialized agents that solve complex problems through structured workflows, tool usage, and reasoning. Each architectural style and pattern offers unique advantages and trade-offs, making them suitable for different applications and requirements.
As the field continues to evolve, the integration of more sophisticated planning, memory, and reasoning capabilities will enable even more powerful and flexible agents. The development of standardized protocols for agent communication and interoperability will facilitate the creation of complex multi-agent systems that can collaborate effectively on challenging tasks.
While significant challenges remain in areas like safety, alignment, and control, the rapid progress in agentic AI suggests a future where AI systems can serve as increasingly capable and trusted partners in a wide range of domains. By understanding the architectural principles, implementation patterns, and communication protocols discussed in this article, developers can begin building the next generation of AI agents that combine the reasoning capabilities of large language models with the structured workflows and tool integration needed for effective real-world applications.
No comments:
Post a Comment