INTRODUCTION
Large Language Models, commonly known as LLMs, represent one of the most significant breakthroughs in artificial intelligence. These models are neural networks trained on vast amounts of text data, enabling them to understand and generate human-like text responses. Think of an LLM as an extremely well-read assistant that has absorbed millions of books, articles, and conversations, allowing it to engage in meaningful dialogue on virtually any topic.
The beauty of modern LLMs lies in their versatility. Unlike traditional software that requires explicit programming for each task, LLMs can adapt to new challenges through natural language instructions called prompts. This flexibility has opened doors to countless applications, from simple chatbots to complex reasoning systems.
In this comprehensive guide, we will explore six increasingly sophisticated implementations of LLM-based systems. We start with a basic chatbot and progressively build more advanced architectures including Retrieval-Augmented Generation (RAG), Graph-based RAG, Multi-Agent Systems, Agentic AI, and the Model Context Protocol.
Before diving into implementations, let us establish the fundamental concepts. An LLM processes text input and generates text output based on patterns learned during training. The quality of responses depends heavily on the prompt engineering, which is the art of crafting effective instructions for the model. Context windows define how much previous conversation the model can remember, typically ranging from a few thousand to hundreds of thousands of tokens.
Hardware considerations play a crucial role in LLM deployment. Graphics Processing Units (GPUs) dramatically accelerate inference speed compared to Central Processing Units (CPUs). NVIDIA GPUs use CUDA cores, AMD GPUs leverage ROCm technology, and Apple Silicon employs Metal Performance Shaders (MPS). When GPU resources are unavailable, CPU-only execution remains viable, albeit slower.
CHAPTER A: IMPLEMENTING A BASIC LLM CHATBOT
A basic LLM chatbot represents the simplest form of human-computer interaction using large language models. This implementation focuses on establishing a conversation loop where users input messages and receive AI-generated responses. The fundamental architecture consists of three components: input processing, model inference, and output formatting.
The rationale behind starting with a basic chatbot lies in understanding core concepts without additional complexity. This foundation enables developers to grasp essential patterns like prompt construction, response handling, and conversation state management before advancing to more sophisticated architectures.
Setting up the development environment requires several key dependencies. Python serves as our primary programming language due to its extensive machine learning ecosystem. The transformers library from Hugging Face provides pre-trained models and inference utilities. PyTorch handles the underlying neural network operations, while additional libraries manage GPU acceleration and text processing.
Installation begins with creating a virtual environment to isolate dependencies:
python -m venv llm_chatbot_env
source llm_chatbot_env/bin/activate
# On Windows: llm_chatbot_env\Scripts\activate
pip install torch transformers accelerate
The torch installation varies based on your hardware configuration. For NVIDIA GPUs with CUDA support, install the CUDA-enabled version. AMD GPU users should install the ROCm variant, while Apple Silicon users benefit from MPS optimization. CPU-only installations use the standard PyTorch distribution.
Model selection significantly impacts performance and resource requirements. Smaller models like GPT-2 or DistilBERT run efficiently on modest hardware but provide limited capabilities. Medium-sized models such as Llama-2-7B offer better performance while remaining accessible to consumer hardware. Large models like GPT-3.5 or Claude require substantial computational resources or API access.
For local deployment, we will use Microsoft's DialoGPT, a conversational model optimized for dialogue generation. This model balances performance with resource requirements, making it suitable for learning purposes.
The core chatbot implementation begins with importing necessary libraries and initializing the model:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
class BasicChatbot:
def __init__(self, model_name="microsoft/DialoGPT-medium"):
self.device = self._get_optimal_device()
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(model_name)
self.model.to(self.device)
self.chat_history_ids = None
def _get_optimal_device(self):
if torch.cuda.is_available():
return torch.device("cuda")
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
return torch.device("mps")
else:
return torch.device("cpu")
The device selection logic automatically detects available hardware acceleration. CUDA takes priority for NVIDIA GPUs, followed by MPS for Apple Silicon, with CPU as the fallback option. This ensures optimal performance across different hardware configurations.
Conversation handling requires maintaining context between exchanges. The chat history stores previous interactions, enabling the model to generate contextually appropriate responses:
def generate_response(self, user_input):
# Encode user input and append to chat history
new_user_input_ids = self.tokenizer.encode(
user_input + self.tokenizer.eos_token,
return_tensors='pt'
).to(self.device)
# Append to existing chat history or initialize
if self.chat_history_ids is not None:
bot_input_ids = torch.cat([self.chat_history_ids, new_user_input_ids], dim=-1)
else:
bot_input_ids = new_user_input_ids
# Generate response with controlled parameters
self.chat_history_ids = self.model.generate(
bot_input_ids,
max_length=1000,
num_beams=5,
no_repeat_ngram_size=3,
do_sample=True,
temperature=0.7,
pad_token_id=self.tokenizer.eos_token_id
)
# Extract and decode the bot's response
response = self.tokenizer.decode(
self.chat_history_ids[:, bot_input_ids.shape[-1]:][0],
skip_special_tokens=True
)
return response
The generation parameters control response quality and creativity. Temperature affects randomness, with lower values producing more focused responses and higher values increasing creativity. Beam search explores multiple response possibilities, selecting the most probable sequence. The no_repeat_ngram_size parameter prevents repetitive text generation.
Memory management becomes crucial for extended conversations. Long chat histories consume increasing amounts of memory and processing time. Implementing a sliding window approach maintains recent context while discarding older exchanges:
def manage_context_window(self, max_length=800):
if self.chat_history_ids is not None and self.chat_history_ids.shape[-1] > max_length:
# Keep only the most recent tokens
self.chat_history_ids = self.chat_history_ids[:, -max_length:]
Error handling ensures robust operation under various conditions. Network interruptions, memory limitations, and invalid inputs require graceful handling to maintain user experience:
def safe_generate_response(self, user_input):
try:
if not user_input.strip():
return "I didn't receive any input. Could you please say something?"
response = self.generate_response(user_input)
self.manage_context_window()
if not response.strip():
return "I'm having trouble generating a response. Could you try rephrasing?"
return response
except Exception as e:
print(f"Error generating response: {e}")
return "I encountered an error. Please try again."
The main conversation loop provides a user interface for interacting with the chatbot. This implementation includes conversation history, graceful exit handling, and user feedback:
def run_conversation(self):
print("Chatbot initialized. Type 'quit' to exit.")
print("=" * 50)
while True:
user_input = input("You: ").strip()
if user_input.lower() in ['quit', 'exit', 'bye']:
print("Chatbot: Goodbye! Thanks for chatting.")
break
if not user_input:
continue
print("Chatbot: ", end="", flush=True)
response = self.safe_generate_response(user_input)
print(response)
print("-" * 30)
COMPLETE RUNNING EXAMPLE FOR BASIC CHATBOT:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import warnings
warnings.filterwarnings("ignore")
class BasicChatbot:
"""
A basic LLM-powered chatbot implementation using DialoGPT.
This chatbot maintains conversation context and provides natural
language responses using a pre-trained conversational model.
"""
def __init__(self, model_name="microsoft/DialoGPT-medium", max_context_length=800):
"""
Initialize the chatbot with specified model and configuration.
Args:
model_name (str): Hugging Face model identifier
max_context_length (int): Maximum tokens to maintain in context
"""
print("Initializing chatbot...")
self.max_context_length = max_context_length
self.device = self._get_optimal_device()
print(f"Using device: {self.device}")
# Load tokenizer and model
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(model_name)
# Set pad token if not present
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
# Move model to optimal device
self.model.to(self.device)
self.model.eval() # Set to evaluation mode
# Initialize conversation state
self.chat_history_ids = None
self.conversation_count = 0
print("Chatbot ready!")
def _get_optimal_device(self):
"""
Determine the best available device for model inference.
Returns:
torch.device: Optimal device (CUDA, MPS, or CPU)
"""
if torch.cuda.is_available():
return torch.device("cuda")
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
return torch.device("mps")
else:
return torch.device("cpu")
def _manage_context_window(self):
"""
Manage conversation context to prevent memory overflow.
Trims older conversation history while preserving recent context.
"""
if (self.chat_history_ids is not None and
self.chat_history_ids.shape[-1] > self.max_context_length):
# Keep only the most recent tokens
self.chat_history_ids = self.chat_history_ids[:, -self.max_context_length:]
def generate_response(self, user_input):
"""
Generate a response to user input using the LLM.
Args:
user_input (str): User's message
Returns:
str: Generated response from the chatbot
"""
try:
# Encode user input
new_user_input_ids = self.tokenizer.encode(
user_input + self.tokenizer.eos_token,
return_tensors='pt'
).to(self.device)
# Append to existing conversation or start new one
if self.chat_history_ids is not None:
bot_input_ids = torch.cat([self.chat_history_ids, new_user_input_ids], dim=-1)
else:
bot_input_ids = new_user_input_ids
# Generate response with optimized parameters
with torch.no_grad():
self.chat_history_ids = self.model.generate(
bot_input_ids,
max_length=min(bot_input_ids.shape[-1] + 100, 1000),
num_beams=3,
no_repeat_ngram_size=3,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id,
early_stopping=True
)
# Extract bot response
response = self.tokenizer.decode(
self.chat_history_ids[:, bot_input_ids.shape[-1]:][0],
skip_special_tokens=True
)
# Manage context window
self._manage_context_window()
self.conversation_count += 1
return response.strip()
except Exception as e:
print(f"Error in response generation: {e}")
return "I apologize, but I encountered an error processing your message."
def reset_conversation(self):
"""Reset the conversation history to start fresh."""
self.chat_history_ids = None
self.conversation_count = 0
print("Conversation history cleared.")
def get_conversation_stats(self):
"""
Get statistics about the current conversation.
Returns:
dict: Conversation statistics
"""
context_length = 0
if self.chat_history_ids is not None:
context_length = self.chat_history_ids.shape[-1]
return {
'exchanges': self.conversation_count,
'context_tokens': context_length,
'device': str(self.device),
'max_context': self.max_context_length
}
def run_interactive_session(self):
"""
Run an interactive chat session with the user.
Provides a command-line interface for chatting with the bot.
"""
print("\n" + "=" * 60)
print("BASIC LLM CHATBOT - Interactive Session")
print("=" * 60)
print("Commands:")
print(" 'quit' or 'exit' - End the conversation")
print(" 'reset' - Clear conversation history")
print(" 'stats' - Show conversation statistics")
print("=" * 60)
while True:
try:
user_input = input("\nYou: ").strip()
# Handle special commands
if user_input.lower() in ['quit', 'exit', 'bye']:
print("\nChatbot: Thank you for chatting! Goodbye!")
break
elif user_input.lower() == 'reset':
self.reset_conversation()
continue
elif user_input.lower() == 'stats':
stats = self.get_conversation_stats()
print(f"\nConversation Statistics:")
print(f" Exchanges: {stats['exchanges']}")
print(f" Context tokens: {stats['context_tokens']}")
print(f" Device: {stats['device']}")
print(f" Max context: {stats['max_context']}")
continue
# Skip empty input
if not user_input:
print("Please enter a message.")
continue
# Generate and display response
print("Chatbot: ", end="", flush=True)
response = self.generate_response(user_input)
print(response)
except KeyboardInterrupt:
print("\n\nChatbot: Session interrupted. Goodbye!")
break
except Exception as e:
print(f"\nError: {e}")
print("Please try again.")
def main():
"""
Main function to demonstrate the basic chatbot functionality.
"""
try:
# Initialize chatbot
chatbot = BasicChatbot()
# Run interactive session
chatbot.run_interactive_session()
except Exception as e:
print(f"Failed to initialize chatbot: {e}")
print("Please check your installation and try again.")
if __name__ == "__main__":
main()
This complete implementation provides a production-ready basic chatbot with proper error handling, memory management, and user interface. The code follows clean architecture principles with clear separation of concerns and comprehensive documentation.
CHAPTER B: IMPLEMENTING RAG (RETRIEVAL-AUGMENTED GENERATION) CHATBOT
Retrieval-Augmented Generation represents a significant advancement over basic chatbots by combining the generative capabilities of LLMs with external knowledge retrieval. RAG addresses a fundamental limitation of standalone LLMs: their knowledge cutoff and inability to access real-time or domain-specific information not present in their training data.
The RAG architecture consists of two primary components working in tandem. The retrieval component searches through a knowledge base to find relevant information based on user queries. The generation component then uses this retrieved context alongside the original query to produce informed, accurate responses. This approach dramatically improves factual accuracy and enables chatbots to work with specialized knowledge domains.
The rationale for implementing RAG stems from practical limitations of basic chatbots. While LLMs possess broad knowledge, they cannot access current events, proprietary documentation, or specialized databases. RAG bridges this gap by dynamically incorporating relevant external information into the generation process, creating more knowledgeable and useful conversational agents.
Setting up a RAG system requires additional dependencies beyond basic chatbot requirements. Vector databases store and search through document embeddings, while embedding models convert text into numerical representations suitable for similarity search. Popular choices include FAISS for vector operations, sentence-transformers for embedding generation, and various document processing libraries.
Installation extends our previous environment with RAG-specific packages:
pip install faiss-cpu sentence-transformers langchain pypdf python-docx
The choice between faiss-cpu and faiss-gpu depends on available hardware. GPU-accelerated FAISS significantly improves search performance for large knowledge bases, while CPU versions remain suitable for smaller datasets or development purposes.
Document processing forms the foundation of any RAG system. The knowledge base requires preprocessing to extract text, chunk documents into manageable segments, and generate embeddings for similarity search. Different document types require specialized handling approaches.
Text chunking strategy significantly impacts retrieval quality. Chunks must be large enough to contain meaningful context but small enough to avoid diluting relevant information. Overlapping chunks ensure important information spanning chunk boundaries remains accessible:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import pickle
import os
class DocumentProcessor:
def __init__(self, chunk_size=500, chunk_overlap=50):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", " ", ""]
)
def process_text(self, text, source="unknown"):
chunks = self.text_splitter.split_text(text)
return [{"content": chunk, "source": source, "chunk_id": i}
for i, chunk in enumerate(chunks)]
The RecursiveCharacterTextSplitter intelligently splits text at natural boundaries like paragraphs and sentences, preserving semantic coherence within chunks. The separator hierarchy ensures optimal splitting points while maintaining readability.
Embedding generation converts text chunks into vector representations suitable for similarity search. Modern embedding models like sentence-transformers provide high-quality representations that capture semantic meaning beyond simple keyword matching:
class EmbeddingManager:
def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self.dimension = self.model.get_sentence_embedding_dimension()
def generate_embeddings(self, texts):
embeddings = self.model.encode(texts, show_progress_bar=True)
return embeddings.astype('float32')
def encode_query(self, query):
return self.model.encode([query]).astype('float32')
The all-MiniLM-L6-v2 model provides an excellent balance between quality and performance for most applications. Larger models like all-mpnet-base-v2 offer improved accuracy at the cost of increased computational requirements.
Vector database implementation enables efficient similarity search across large document collections. FAISS provides optimized algorithms for nearest neighbor search, supporting both exact and approximate search methods:
class VectorDatabase:
def __init__(self, dimension):
self.dimension = dimension
self.index = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity
self.documents = []
self.embeddings = None
def add_documents(self, documents, embeddings):
# Normalize embeddings for cosine similarity
faiss.normalize_L2(embeddings)
self.index.add(embeddings)
self.documents.extend(documents)
if self.embeddings is None:
self.embeddings = embeddings
else:
self.embeddings = np.vstack([self.embeddings, embeddings])
def search(self, query_embedding, k=5):
faiss.normalize_L2(query_embedding)
scores, indices = self.index.search(query_embedding, k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < len(self.documents):
results.append({
"document": self.documents[idx],
"score": float(score),
"embedding": self.embeddings[idx]
})
return results
The IndexFlatIP provides exact search results using inner product similarity, which approximates cosine similarity when embeddings are normalized. For larger datasets, approximate indices like IndexIVFFlat offer faster search at the cost of slight accuracy reduction.
Knowledge base construction combines document processing, embedding generation, and vector database operations into a cohesive workflow. This process typically runs offline to prepare the retrieval system:
class KnowledgeBase:
def __init__(self, embedding_model="all-MiniLM-L6-v2"):
self.processor = DocumentProcessor()
self.embedding_manager = EmbeddingManager(embedding_model)
self.vector_db = VectorDatabase(self.embedding_manager.dimension)
self.is_built = False
def add_text_documents(self, texts, sources=None):
if sources is None:
sources = [f"document_{i}" for i in range(len(texts))]
all_chunks = []
for text, source in zip(texts, sources):
chunks = self.processor.process_text(text, source)
all_chunks.extend(chunks)
if all_chunks:
chunk_texts = [chunk["content"] for chunk in all_chunks]
embeddings = self.embedding_manager.generate_embeddings(chunk_texts)
self.vector_db.add_documents(all_chunks, embeddings)
self.is_built = True
def search_relevant_context(self, query, k=3):
if not self.is_built:
return []
query_embedding = self.embedding_manager.encode_query(query)
results = self.vector_db.search(query_embedding, k)
return [result["document"]["content"] for result in results]
The RAG chatbot integrates retrieval and generation components to produce contextually informed responses. The retrieval step finds relevant documents, while the generation step incorporates this context into the LLM prompt:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
class RAGChatbot:
def __init__(self, llm_model="microsoft/DialoGPT-medium", embedding_model="all-MiniLM-L6-v2"):
# Initialize LLM components
self.device = self._get_optimal_device()
self.tokenizer = AutoTokenizer.from_pretrained(llm_model)
self.model = AutoModelForCausalLM.from_pretrained(llm_model)
self.model.to(self.device)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
# Initialize RAG components
self.knowledge_base = KnowledgeBase(embedding_model)
self.chat_history = []
self.max_context_length = 800
def _get_optimal_device(self):
if torch.cuda.is_available():
return torch.device("cuda")
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
return torch.device("mps")
else:
return torch.device("cpu")
def build_knowledge_base(self, documents, sources=None):
print("Building knowledge base...")
self.knowledge_base.add_text_documents(documents, sources)
print("Knowledge base ready!")
def _construct_rag_prompt(self, user_input, retrieved_context):
context_text = "\n".join(retrieved_context)
prompt = f"""Based on the following context information, please provide a helpful response to the user's question.
Context:
{context_text}
User Question: {user_input}
Response:"""
return prompt
def generate_response(self, user_input, use_rag=True):
try:
if use_rag and self.knowledge_base.is_built:
# Retrieve relevant context
retrieved_context = self.knowledge_base.search_relevant_context(user_input, k=3)
if retrieved_context:
# Construct RAG prompt
prompt = self._construct_rag_prompt(user_input, retrieved_context)
else:
prompt = user_input
else:
prompt = user_input
# Generate response
inputs = self.tokenizer.encode(prompt, return_tensors='pt', truncate=True, max_length=512)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=inputs.shape[1] + 150,
num_beams=3,
no_repeat_ngram_size=3,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id,
early_stopping=True
)
response = self.tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)
return response.strip()
except Exception as e:
print(f"Error generating response: {e}")
return "I apologize, but I encountered an error processing your request."
COMPLETE RUNNING EXAMPLE FOR RAG CHATBOT:
import os
import pickle
import numpy as np
import torch
import faiss
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from langchain.text_splitter import RecursiveCharacterTextSplitter
import warnings
warnings.filterwarnings("ignore")
class DocumentProcessor:
"""
Handles document processing and text chunking for RAG systems.
This class splits documents into manageable chunks while preserving
semantic coherence and maintaining source attribution.
"""
def __init__(self, chunk_size=500, chunk_overlap=50):
"""
Initialize document processor with chunking parameters.
Args:
chunk_size (int): Maximum characters per chunk
chunk_overlap (int): Overlap between consecutive chunks
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""]
)
def process_text(self, text, source="unknown"):
"""
Process text into chunks with metadata.
Args:
text (str): Input text to process
source (str): Source identifier for the text
Returns:
list: List of chunk dictionaries with content and metadata
"""
if not text or not text.strip():
return []
chunks = self.text_splitter.split_text(text.strip())
return [
{
"content": chunk.strip(),
"source": source,
"chunk_id": i,
"length": len(chunk)
}
for i, chunk in enumerate(chunks) if chunk.strip()
]
def process_multiple_documents(self, documents):
"""
Process multiple documents with automatic source naming.
Args:
documents (list): List of text documents or (text, source) tuples
Returns:
list: Combined list of all chunks from all documents
"""
all_chunks = []
for i, doc in enumerate(documents):
if isinstance(doc, tuple):
text, source = doc
else:
text = doc
source = f"document_{i}"
chunks = self.process_text(text, source)
all_chunks.extend(chunks)
return all_chunks
class EmbeddingManager:
"""
Manages text embedding generation using sentence transformers.
This class handles the conversion of text into vector representations
suitable for similarity search and retrieval operations.
"""
def __init__(self, model_name="all-MiniLM-L6-v2"):
"""
Initialize embedding manager with specified model.
Args:
model_name (str): Hugging Face model identifier for embeddings
"""
print(f"Loading embedding model: {model_name}")
self.model = SentenceTransformer(model_name)
self.dimension = self.model.get_sentence_embedding_dimension()
self.model_name = model_name
print(f"Embedding dimension: {self.dimension}")
def generate_embeddings(self, texts, batch_size=32):
"""
Generate embeddings for a list of texts.
Args:
texts (list): List of text strings to embed
batch_size (int): Batch size for processing
Returns:
numpy.ndarray: Array of embeddings
"""
if not texts:
return np.array([]).reshape(0, self.dimension)
embeddings = self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=len(texts) > 10,
convert_to_numpy=True
)
return embeddings.astype('float32')
def encode_query(self, query):
"""
Encode a single query string.
Args:
query (str): Query text to encode
Returns:
numpy.ndarray: Query embedding
"""
return self.model.encode([query], convert_to_numpy=True).astype('float32')
class VectorDatabase:
"""
Vector database for efficient similarity search using FAISS.
This class provides storage and retrieval capabilities for document
embeddings with support for various similarity metrics.
"""
def __init__(self, dimension, metric="cosine"):
"""
Initialize vector database with specified parameters.
Args:
dimension (int): Embedding dimension
metric (str): Similarity metric ("cosine" or "euclidean")
"""
self.dimension = dimension
self.metric = metric
# Choose appropriate FAISS index based on metric
if metric == "cosine":
self.index = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity
else:
self.index = faiss.IndexFlatL2(dimension) # L2 distance for euclidean
self.documents = []
self.embeddings = None
self.total_documents = 0
def add_documents(self, documents, embeddings):
"""
Add documents and their embeddings to the database.
Args:
documents (list): List of document dictionaries
embeddings (numpy.ndarray): Corresponding embeddings
"""
if len(documents) != len(embeddings):
raise ValueError("Number of documents must match number of embeddings")
# Normalize embeddings for cosine similarity
if self.metric == "cosine":
faiss.normalize_L2(embeddings)
# Add to FAISS index
self.index.add(embeddings)
# Store documents and embeddings
self.documents.extend(documents)
if self.embeddings is None:
self.embeddings = embeddings.copy()
else:
self.embeddings = np.vstack([self.embeddings, embeddings])
self.total_documents += len(documents)
print(f"Added {len(documents)} documents. Total: {self.total_documents}")
def search(self, query_embedding, k=5, score_threshold=0.0):
"""
Search for similar documents.
Args:
query_embedding (numpy.ndarray): Query embedding
k (int): Number of results to return
score_threshold (float): Minimum similarity score
Returns:
list: List of search results with documents and scores
"""
if self.total_documents == 0:
return []
# Normalize query for cosine similarity
if self.metric == "cosine":
faiss.normalize_L2(query_embedding)
# Perform search
scores, indices = self.index.search(query_embedding, min(k, self.total_documents))
results = []
for score, idx in zip(scores[0], indices[0]):
if idx >= 0 and idx < len(self.documents) and score >= score_threshold:
results.append({
"document": self.documents[idx],
"score": float(score),
"index": int(idx)
})
return results
def save(self, filepath):
"""Save the vector database to disk."""
data = {
'documents': self.documents,
'embeddings': self.embeddings,
'dimension': self.dimension,
'metric': self.metric,
'total_documents': self.total_documents
}
with open(filepath, 'wb') as f:
pickle.dump(data, f)
# Save FAISS index separately
faiss.write_index(self.index, filepath + '.faiss')
def load(self, filepath):
"""Load the vector database from disk."""
with open(filepath, 'rb') as f:
data = pickle.load(f)
self.documents = data['documents']
self.embeddings = data['embeddings']
self.dimension = data['dimension']
self.metric = data['metric']
self.total_documents = data['total_documents']
# Load FAISS index
self.index = faiss.read_index(filepath + '.faiss')
class KnowledgeBase:
"""
Complete knowledge base implementation for RAG systems.
This class combines document processing, embedding generation,
and vector search into a unified interface.
"""
def __init__(self, embedding_model="all-MiniLM-L6-v2", chunk_size=500):
"""
Initialize knowledge base with specified parameters.
Args:
embedding_model (str): Model name for embeddings
chunk_size (int): Size of text chunks
"""
self.processor = DocumentProcessor(chunk_size=chunk_size)
self.embedding_manager = EmbeddingManager(embedding_model)
self.vector_db = VectorDatabase(self.embedding_manager.dimension)
self.is_built = False
self.document_count = 0
def add_documents(self, documents, sources=None):
"""
Add documents to the knowledge base.
Args:
documents (list): List of text documents
sources (list): Optional list of source identifiers
"""
if not documents:
print("No documents provided")
return
print(f"Processing {len(documents)} documents...")
# Prepare documents with sources
doc_list = []
for i, doc in enumerate(documents):
if sources and i < len(sources):
source = sources[i]
else:
source = f"document_{self.document_count + i}"
doc_list.append((doc, source))
# Process documents into chunks
all_chunks = self.processor.process_multiple_documents(doc_list)
if not all_chunks:
print("No valid chunks generated from documents")
return
print(f"Generated {len(all_chunks)} chunks")
# Generate embeddings
chunk_texts = [chunk["content"] for chunk in all_chunks]
embeddings = self.embedding_manager.generate_embeddings(chunk_texts)
# Add to vector database
self.vector_db.add_documents(all_chunks, embeddings)
self.document_count += len(documents)
self.is_built = True
print("Documents added successfully!")
def search_relevant_context(self, query, k=3, score_threshold=0.1):
"""
Search for relevant context given a query.
Args:
query (str): Search query
k (int): Number of results to return
score_threshold (float): Minimum similarity score
Returns:
list: List of relevant text chunks
"""
if not self.is_built:
return []
query_embedding = self.embedding_manager.encode_query(query)
results = self.vector_db.search(query_embedding, k, score_threshold)
return [
{
"content": result["document"]["content"],
"source": result["document"]["source"],
"score": result["score"]
}
for result in results
]
def get_stats(self):
"""Get knowledge base statistics."""
return {
"total_documents": self.document_count,
"total_chunks": self.vector_db.total_documents,
"embedding_dimension": self.embedding_manager.dimension,
"is_built": self.is_built
}
def save(self, filepath):
"""Save knowledge base to disk."""
if self.is_built:
self.vector_db.save(filepath)
print(f"Knowledge base saved to {filepath}")
else:
print("Knowledge base not built yet")
def load(self, filepath):
"""Load knowledge base from disk."""
try:
self.vector_db.load(filepath)
self.is_built = True
print(f"Knowledge base loaded from {filepath}")
except Exception as e:
print(f"Failed to load knowledge base: {e}")
class RAGChatbot:
"""
Complete RAG chatbot implementation combining retrieval and generation.
This chatbot uses retrieval-augmented generation to provide informed
responses based on a knowledge base of documents.
"""
def __init__(self, llm_model="microsoft/DialoGPT-medium", embedding_model="all-MiniLM-L6-v2"):
"""
Initialize RAG chatbot with specified models.
Args:
llm_model (str): Language model for generation
embedding_model (str): Model for embeddings
"""
print("Initializing RAG Chatbot...")
# Initialize device
self.device = self._get_optimal_device()
print(f"Using device: {self.device}")
# Initialize LLM components
print(f"Loading language model: {llm_model}")
self.tokenizer = AutoTokenizer.from_pretrained(llm_model)
self.model = AutoModelForCausalLM.from_pretrained(llm_model)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model.to(self.device)
self.model.eval()
# Initialize knowledge base
self.knowledge_base = KnowledgeBase(embedding_model)
# Conversation state
self.conversation_history = []
self.max_context_length = 800
print("RAG Chatbot ready!")
def _get_optimal_device(self):
"""Determine optimal device for inference."""
if torch.cuda.is_available():
return torch.device("cuda")
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
return torch.device("mps")
else:
return torch.device("cpu")
def build_knowledge_base(self, documents, sources=None):
"""
Build the knowledge base from documents.
Args:
documents (list): List of text documents
sources (list): Optional source identifiers
"""
self.knowledge_base.add_documents(documents, sources)
def _construct_rag_prompt(self, user_input, retrieved_context):
"""
Construct a prompt incorporating retrieved context.
Args:
user_input (str): User's question
retrieved_context (list): Retrieved context chunks
Returns:
str: Formatted prompt for the LLM
"""
if not retrieved_context:
return user_input
context_text = "\n\n".join([
f"Source: {ctx['source']}\nContent: {ctx['content']}"
for ctx in retrieved_context
])
prompt = f"""Context Information:
{context_text}
Based on the above context, please answer the following question. If the context doesn't contain relevant information, please say so.
Question: {user_input}
Answer:"""
return prompt
def generate_response(self, user_input, use_rag=True, k=3):
"""
Generate response using RAG or fallback to basic generation.
Args:
user_input (str): User's input
use_rag (bool): Whether to use RAG
k (int): Number of context chunks to retrieve
Returns:
dict: Response with content and metadata
"""
try:
retrieved_context = []
if use_rag and self.knowledge_base.is_built:
# Retrieve relevant context
context_results = self.knowledge_base.search_relevant_context(user_input, k)
if context_results:
retrieved_context = context_results
prompt = self._construct_rag_prompt(user_input, retrieved_context)
else:
prompt = user_input
else:
prompt = user_input
# Generate response
inputs = self.tokenizer.encode(
prompt,
return_tensors='pt',
truncation=True,
max_length=512
)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=inputs.shape[1] + 150,
num_beams=3,
no_repeat_ngram_size=3,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id,
early_stopping=True
)
response = self.tokenizer.decode(
outputs[0][inputs.shape[1]:],
skip_special_tokens=True
).strip()
# Store conversation
self.conversation_history.append({
"user": user_input,
"assistant": response,
"used_rag": use_rag and bool(retrieved_context),
"context_sources": [ctx["source"] for ctx in retrieved_context] if retrieved_context else []
})
return {
"response": response,
"used_rag": use_rag and bool(retrieved_context),
"retrieved_context": retrieved_context,
"sources": list(set([ctx["source"] for ctx in retrieved_context])) if retrieved_context else []
}
except Exception as e:
print(f"Error generating response: {e}")
return {
"response": "I apologize, but I encountered an error processing your request.",
"used_rag": False,
"retrieved_context": [],
"sources": []
}
def get_conversation_stats(self):
"""Get conversation statistics."""
kb_stats = self.knowledge_base.get_stats()
rag_usage = sum(1 for conv in self.conversation_history if conv.get("used_rag", False))
return {
"total_exchanges": len(self.conversation_history),
"rag_responses": rag_usage,
"knowledge_base": kb_stats,
"device": str(self.device)
}
def reset_conversation(self):
"""Reset conversation history."""
self.conversation_history = []
print("Conversation history cleared.")
def run_interactive_session(self):
"""Run interactive chat session."""
print("\n" + "=" * 70)
print("RAG CHATBOT - Interactive Session")
print("=" * 70)
print("Commands:")
print(" 'quit' or 'exit' - End conversation")
print(" 'reset' - Clear conversation history")
print(" 'stats' - Show conversation statistics")
print(" 'toggle_rag' - Toggle RAG on/off")
print(" 'sources' - Show sources from last response")
print("=" * 70)
use_rag = True
last_response_data = None
while True:
try:
user_input = input(f"\nYou: ").strip()
if user_input.lower() in ['quit', 'exit', 'bye']:
print("\nChatbot: Thank you for using RAG Chatbot! Goodbye!")
break
elif user_input.lower() == 'reset':
self.reset_conversation()
continue
elif user_input.lower() == 'stats':
stats = self.get_conversation_stats()
print(f"\nConversation Statistics:")
print(f" Total exchanges: {stats['total_exchanges']}")
print(f" RAG responses: {stats['rag_responses']}")
print(f" Knowledge base documents: {stats['knowledge_base']['total_documents']}")
print(f" Knowledge base chunks: {stats['knowledge_base']['total_chunks']}")
print(f" Device: {stats['device']}")
continue
elif user_input.lower() == 'toggle_rag':
use_rag = not use_rag
status = "enabled" if use_rag else "disabled"
print(f"\nRAG {status}")
continue
elif user_input.lower() == 'sources':
if last_response_data and last_response_data.get("sources"):
print(f"\nSources from last response:")
for source in last_response_data["sources"]:
print(f" - {source}")
else:
print("\nNo sources available from last response")
continue
if not user_input:
continue
# Generate response
rag_indicator = "[RAG]" if use_rag and self.knowledge_base.is_built else "[Direct]"
print(f"Chatbot {rag_indicator}: ", end="", flush=True)
last_response_data = self.generate_response(user_input, use_rag)
print(last_response_data["response"])
# Show sources if RAG was used
if last_response_data["used_rag"] and last_response_data["sources"]:
print(f"Sources: {', '.join(last_response_data['sources'])}")
except KeyboardInterrupt:
print("\n\nSession interrupted. Goodbye!")
break
except Exception as e:
print(f"\nError: {e}")
def create_sample_knowledge_base():
"""Create a sample knowledge base for demonstration."""
sample_documents = [
"""
Artificial Intelligence (AI) is a branch of computer science that aims to create
intelligent machines that work and react like humans. AI systems can perform tasks
that typically require human intelligence, such as visual perception, speech
recognition, decision-making, and language translation. Machine learning is a
subset of AI that enables computers to learn and improve from experience without
being explicitly programmed.
""",
"""
Machine Learning (ML) is a method of data analysis that automates analytical model
building. It is based on the idea that systems can learn from data, identify patterns,
and make decisions with minimal human intervention. There are three main types of
machine learning: supervised learning, unsupervised learning, and reinforcement learning.
Supervised learning uses labeled training data, unsupervised learning finds hidden
patterns in data without labels, and reinforcement learning learns through interaction
with an environment.
""",
"""
Deep Learning is a subset of machine learning that uses artificial neural networks
with multiple layers (hence "deep") to model and understand complex patterns in data.
These neural networks are inspired by the structure and function of the human brain.
Deep learning has been particularly successful in areas such as image recognition,
natural language processing, and speech recognition. Popular deep learning frameworks
include TensorFlow, PyTorch, and Keras.
""",
"""
Natural Language Processing (NLP) is a field of AI that focuses on the interaction
between computers and human language. NLP combines computational linguistics with
statistical, machine learning, and deep learning models to enable computers to process
and analyze large amounts of natural language data. Applications of NLP include
language translation, sentiment analysis, chatbots, and text summarization.
""",
"""
Computer Vision is a field of AI that trains computers to interpret and understand
the visual world. Using digital images from cameras and videos and deep learning
models, machines can accurately identify and classify objects and then react to
what they "see." Computer vision applications include facial recognition, medical
image analysis, autonomous vehicles, and quality control in manufacturing.
"""
]
sources = [
"AI_Overview",
"Machine_Learning_Guide",
"Deep_Learning_Fundamentals",
"NLP_Introduction",
"Computer_Vision_Basics"
]
return sample_documents, sources
def main():
"""
Main function demonstrating RAG chatbot functionality.
"""
try:
# Create RAG chatbot
chatbot = RAGChatbot()
# Create sample knowledge base
print("\nCreating sample knowledge base...")
documents, sources = create_sample_knowledge_base()
chatbot.build_knowledge_base(documents, sources)
# Show knowledge base stats
stats = chatbot.knowledge_base.get_stats()
print(f"Knowledge base created with {stats['total_documents']} documents and {stats['total_chunks']} chunks")
# Run interactive session
chatbot.run_interactive_session()
except Exception as e:
print(f"Failed to initialize RAG chatbot: {e}")
print("Please check your installation and try again.")
if __name__ == "__main__":
main()
This comprehensive RAG implementation provides production-ready functionality with proper error handling, memory management, and a complete user interface. The system combines document processing, embedding generation, vector search, and language model generation into a cohesive conversational AI system.
CHAPTER C: IMPLEMENTING GRAPHRAG CHATBOT
GraphRAG represents an advanced evolution of traditional RAG systems by incorporating graph-based knowledge representation and reasoning capabilities. While standard RAG retrieves relevant documents based on semantic similarity, GraphRAG leverages the structural relationships between entities, concepts, and facts to provide more comprehensive and contextually aware responses.
The fundamental innovation of GraphRAG lies in its ability to understand and utilize the interconnected nature of knowledge. Traditional RAG systems treat documents as isolated chunks of information, potentially missing important relationships that span multiple documents or require multi-hop reasoning. GraphRAG addresses this limitation by constructing knowledge graphs that explicitly model entities, relationships, and hierarchical structures within the knowledge base.
The rationale for implementing GraphRAG stems from real-world information needs that require understanding complex relationships and dependencies. Consider queries about organizational structures, causal relationships, or multi-step processes. GraphRAG excels in these scenarios by traversing graph structures to gather comprehensive context that includes not just directly relevant information, but also related entities and their interconnections.
GraphRAG architecture combines several sophisticated components working in harmony. Entity extraction identifies and normalizes entities within documents. Relationship extraction discovers connections between entities. Graph construction builds a structured representation of knowledge. Graph traversal algorithms explore relevant subgraphs based on queries. Finally, context aggregation combines graph-derived information with traditional retrieval methods for response generation.
Setting up GraphRAG requires additional dependencies beyond standard RAG implementations. Named Entity Recognition (NER) models identify entities in text. Relation extraction models discover relationships between entities. Graph databases store and query the knowledge graph efficiently. Popular choices include spaCy for NER, specialized transformer models for relation extraction, and NetworkX or Neo4j for graph operations.
Installation extends our environment with graph-specific packages:
pip install spacy networkx matplotlib plotly pandas
python -m spacy download en_core_web_sm
For production deployments, consider more robust graph databases like Neo4j or Amazon Neptune, which provide advanced querying capabilities and better performance for large-scale knowledge graphs.
Entity extraction forms the foundation of knowledge graph construction. Modern NER models can identify various entity types including persons, organizations, locations, dates, and domain-specific entities. The quality of entity extraction directly impacts the richness and utility of the resulting knowledge graph:
import spacy
import networkx as nx
from collections import defaultdict
import re
from typing import List, Dict, Tuple, Set
class EntityExtractor:
def __init__(self, model_name="en_core_web_sm"):
self.nlp = spacy.load(model_name)
# Add custom entity types if needed
self.entity_types = {
"PERSON", "ORG", "GPE", "PRODUCT", "EVENT",
"WORK_OF_ART", "LAW", "LANGUAGE", "DATE", "TIME",
"PERCENT", "MONEY", "QUANTITY", "ORDINAL", "CARDINAL"
}
def extract_entities(self, text, source="unknown"):
doc = self.nlp(text)
entities = []
for ent in doc.ents:
if ent.label_ in self.entity_types:
entities.append({
"text": ent.text.strip(),
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char,
"source": source,
"normalized": self._normalize_entity(ent.text, ent.label_)
})
return entities
def _normalize_entity(self, text, label):
# Basic normalization - can be enhanced with more sophisticated methods
normalized = text.strip().lower()
if label in ["PERSON", "ORG", "GPE"]:
# Keep original case for proper nouns
normalized = text.strip()
elif label in ["DATE", "TIME"]:
# Normalize dates and times
normalized = re.sub(r'\s+', ' ', normalized)
return normalized
Relationship extraction discovers connections between entities within text. This process can range from simple co-occurrence patterns to sophisticated neural models trained specifically for relation extraction. For our implementation, we will use a combination of dependency parsing and pattern matching:
class RelationExtractor:
def __init__(self, nlp_model):
self.nlp = nlp_model
# Define relationship patterns
self.relation_patterns = {
"WORKS_FOR": ["works for", "employed by", "employee of"],
"LOCATED_IN": ["located in", "based in", "situated in"],
"PART_OF": ["part of", "member of", "belongs to"],
"FOUNDED": ["founded", "established", "created"],
"ACQUIRED": ["acquired", "bought", "purchased"],
"COLLABORATES_WITH": ["collaborates with", "partners with", "works with"]
}
def extract_relations(self, text, entities, source="unknown"):
doc = self.nlp(text)
relations = []
# Create entity position mapping
entity_positions = {}
for entity in entities:
for i in range(entity["start"], entity["end"]):
entity_positions[i] = entity
# Extract relations using dependency parsing
relations.extend(self._extract_dependency_relations(doc, entities, source))
# Extract relations using pattern matching
relations.extend(self._extract_pattern_relations(text, entities, source))
return relations
def _extract_dependency_relations(self, doc, entities, source):
relations = []
entity_map = {ent["normalized"]: ent for ent in entities}
for token in doc:
if token.dep_ in ["nsubj", "dobj", "pobj"]:
head = token.head
# Look for entities in subject and object positions
subj_ent = self._find_entity_for_token(token, entity_map)
obj_ent = self._find_entity_for_token(head, entity_map)
if subj_ent and obj_ent and subj_ent != obj_ent:
relations.append({
"subject": subj_ent["normalized"],
"predicate": head.lemma_,
"object": obj_ent["normalized"],
"confidence": 0.7,
"source": source,
"extraction_method": "dependency"
})
return relations
def _extract_pattern_relations(self, text, entities, source):
relations = []
text_lower = text.lower()
for relation_type, patterns in self.relation_patterns.items():
for pattern in patterns:
for match in re.finditer(re.escape(pattern), text_lower):
start, end = match.span()
# Find entities before and after the pattern
before_entities = [e for e in entities if e["end"] < start]
after_entities = [e for e in entities if e["start"] > end]
if before_entities and after_entities:
subj = before_entities[-1] # Closest entity before
obj = after_entities[0] # Closest entity after
relations.append({
"subject": subj["normalized"],
"predicate": relation_type,
"object": obj["normalized"],
"confidence": 0.8,
"source": source,
"extraction_method": "pattern"
})
return relations
def _find_entity_for_token(self, token, entity_map):
# Simple matching - can be improved with better alignment
for entity_text, entity in entity_map.items():
if token.text.lower() in entity_text.lower():
return entity
return None
Knowledge graph construction combines extracted entities and relationships into a structured representation suitable for traversal and querying. NetworkX provides an excellent foundation for graph operations while remaining lightweight and easy to use:
class KnowledgeGraph:
def __init__(self):
self.graph = nx.MultiDiGraph()
self.entity_index = {}
self.relation_index = defaultdict(list)
self.source_mapping = {}
def add_entities(self, entities):
for entity in entities:
entity_id = entity["normalized"]
if entity_id not in self.graph:
self.graph.add_node(entity_id, **entity)
self.entity_index[entity_id] = entity
else:
# Merge entity information
existing = self.graph.nodes[entity_id]
if "sources" not in existing:
existing["sources"] = set()
existing["sources"].add(entity["source"])
def add_relations(self, relations):
for relation in relations:
subj = relation["subject"]
obj = relation["object"]
pred = relation["predicate"]
# Ensure entities exist in graph
if subj not in self.graph:
self.graph.add_node(subj)
if obj not in self.graph:
self.graph.add_node(obj)
# Add edge with relation information
self.graph.add_edge(subj, obj, **relation)
# Update relation index
self.relation_index[pred].append((subj, obj, relation))
def find_related_entities(self, entity, max_hops=2, relation_types=None):
if entity not in self.graph:
return []
related = set()
current_level = {entity}
for hop in range(max_hops):
next_level = set()
for node in current_level:
# Get neighbors
neighbors = set(self.graph.neighbors(node))
neighbors.update(self.graph.predecessors(node))
# Filter by relation types if specified
if relation_types:
filtered_neighbors = set()
for neighbor in neighbors:
for edge_data in self.graph[node][neighbor].values():
if edge_data.get("predicate") in relation_types:
filtered_neighbors.add(neighbor)
neighbors = filtered_neighbors
next_level.update(neighbors)
related.update(neighbors)
current_level = next_level - related
if not current_level:
break
return list(related)
def get_entity_context(self, entity, include_relations=True):
if entity not in self.graph:
return {}
context = {
"entity": self.graph.nodes[entity],
"direct_relations": [],
"related_entities": []
}
if include_relations:
# Get outgoing relations
for neighbor in self.graph.neighbors(entity):
for edge_data in self.graph[entity][neighbor].values():
context["direct_relations"].append({
"type": "outgoing",
"target": neighbor,
"relation": edge_data
})
# Get incoming relations
for predecessor in self.graph.predecessors(entity):
for edge_data in self.graph[predecessor][entity].values():
context["direct_relations"].append({
"type": "incoming",
"source": predecessor,
"relation": edge_data
})
# Get related entities
context["related_entities"] = self.find_related_entities(entity, max_hops=2)
return context
def query_subgraph(self, entities, max_hops=1):
if not entities:
return nx.MultiDiGraph()
# Find all related entities
all_entities = set(entities)
for entity in entities:
if entity in self.graph:
related = self.find_related_entities(entity, max_hops)
all_entities.update(related)
# Create subgraph
subgraph = self.graph.subgraph(all_entities).copy()
return subgraph
def get_statistics(self):
return {
"nodes": self.graph.number_of_nodes(),
"edges": self.graph.number_of_edges(),
"connected_components": nx.number_weakly_connected_components(self.graph),
"average_degree": sum(dict(self.graph.degree()).values()) / self.graph.number_of_nodes() if self.graph.number_of_nodes() > 0 else 0
}
GraphRAG query processing combines traditional semantic search with graph traversal to gather comprehensive context. This approach ensures that responses incorporate not just directly relevant information, but also related entities and their interconnections:
class GraphRAGRetriever:
def __init__(self, knowledge_base, knowledge_graph):
self.knowledge_base = knowledge_base
self.knowledge_graph = knowledge_graph
self.entity_extractor = EntityExtractor()
def retrieve_context(self, query, k=3, graph_hops=2):
# Extract entities from query
query_entities = self.entity_extractor.extract_entities(query)
entity_names = [ent["normalized"] for ent in query_entities]
# Traditional RAG retrieval
traditional_context = self.knowledge_base.search_relevant_context(query, k)
# Graph-based retrieval
graph_context = []
if entity_names:
# Find related entities through graph traversal
all_related_entities = set()
for entity in entity_names:
if entity in self.knowledge_graph.graph:
related = self.knowledge_graph.find_related_entities(entity, graph_hops)
all_related_entities.update(related)
all_related_entities.add(entity)
# Get context for related entities
for entity in all_related_entities:
entity_context = self.knowledge_graph.get_entity_context(entity)
if entity_context:
graph_context.append(entity_context)
return {
"traditional_context": traditional_context,
"graph_context": graph_context,
"query_entities": entity_names,
"related_entities": list(all_related_entities) if entity_names else []
}
def format_context_for_llm(self, context_data):
formatted_context = []
# Add traditional context
if context_data["traditional_context"]:
formatted_context.append("DOCUMENT CONTEXT:")
for i, ctx in enumerate(context_data["traditional_context"]):
formatted_context.append(f"{i+1}. {ctx['content']}")
# Add graph context
if context_data["graph_context"]:
formatted_context.append("\nRELATED ENTITIES AND RELATIONSHIPS:")
for entity_ctx in context_data["graph_context"]:
entity_name = entity_ctx["entity"].get("normalized", "Unknown")
formatted_context.append(f"\nEntity: {entity_name}")
if entity_ctx["direct_relations"]:
formatted_context.append("Relationships:")
for rel in entity_ctx["direct_relations"][:3]: # Limit to avoid overwhelming
if rel["type"] == "outgoing":
formatted_context.append(f" - {entity_name} {rel['relation']['predicate']} {rel['target']}")
else:
formatted_context.append(f" - {rel['source']} {rel['relation']['predicate']} {entity_name}")
return "\n".join(formatted_context)
COMPLETE RUNNING EXAMPLE FOR GRAPHRAG CHATBOT:
import spacy
import networkx as nx
from collections import defaultdict
import re
import torch
import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForCausalLM, AutoTokenizer
from langchain.text_splitter import RecursiveCharacterTextSplitter
import warnings
warnings.filterwarnings("ignore")
class EntityExtractor:
"""
Advanced entity extraction using spaCy with custom normalization.
This class identifies and normalizes entities from text, providing
the foundation for knowledge graph construction.
"""
def __init__(self, model_name="en_core_web_sm"):
"""
Initialize entity extractor with spaCy model.
Args:
model_name (str): spaCy model name
"""
try:
self.nlp = spacy.load(model_name)
except OSError:
print(f"spaCy model {model_name} not found. Please install it with:")
print(f"python -m spacy download {model_name}")
raise
# Define entity types to extract
self.entity_types = {
"PERSON", "ORG", "GPE", "PRODUCT", "EVENT",
"WORK_OF_ART", "LAW", "LANGUAGE", "DATE", "TIME",
"PERCENT", "MONEY", "QUANTITY", "ORDINAL", "CARDINAL"
}
# Entity normalization patterns
self.normalization_patterns = {
"PERSON": lambda x: x.strip().title(),
"ORG": lambda x: x.strip().title(),
"GPE": lambda x: x.strip().title(),
"PRODUCT": lambda x: x.strip().title(),
}
def extract_entities(self, text, source="unknown", min_length=2):
"""
Extract entities from text with metadata.
Args:
text (str): Input text
source (str): Source identifier
min_length (int): Minimum entity length
Returns:
list: List of entity dictionaries
"""
if not text or not text.strip():
return []
doc = self.nlp(text)
entities = []
seen_entities = set()
for ent in doc.ents:
if (ent.label_ in self.entity_types and
len(ent.text.strip()) >= min_length and
not ent.text.strip().isdigit()):
normalized = self._normalize_entity(ent.text, ent.label_)
# Avoid duplicates
if normalized.lower() not in seen_entities:
entities.append({
"text": ent.text.strip(),
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char,
"source": source,
"normalized": normalized,
"confidence": 1.0 # spaCy doesn't provide confidence scores
})
seen_entities.add(normalized.lower())
return entities
def _normalize_entity(self, text, label):
"""
Normalize entity text based on its type.
Args:
text (str): Entity text
label (str): Entity label
Returns:
str: Normalized entity text
"""
text = text.strip()
if label in self.normalization_patterns:
return self.normalization_patterns[label](text)
else:
# Default normalization
return re.sub(r'\s+', ' ', text).strip()
class RelationExtractor:
"""
Relation extraction using dependency parsing and pattern matching.
This class discovers relationships between entities using both
linguistic patterns and dependency parse trees.
"""
def __init__(self, nlp_model):
"""
Initialize relation extractor.
Args:
nlp_model: spaCy language model
"""
self.nlp = nlp_model
# Define relationship patterns with confidence scores
self.relation_patterns = {
"WORKS_FOR": {
"patterns": ["works for", "employed by", "employee of", "works at"],
"confidence": 0.8
},
"LOCATED_IN": {
"patterns": ["located in", "based in", "situated in", "headquarters in"],
"confidence": 0.8
},
"PART_OF": {
"patterns": ["part of", "member of", "belongs to", "division of"],
"confidence": 0.7
},
"FOUNDED": {
"patterns": ["founded", "established", "created", "started"],
"confidence": 0.9
},
"ACQUIRED": {
"patterns": ["acquired", "bought", "purchased", "merged with"],
"confidence": 0.9
},
"COLLABORATES_WITH": {
"patterns": ["collaborates with", "partners with", "works with", "alliance with"],
"confidence": 0.7
},
"CEO_OF": {
"patterns": ["CEO of", "chief executive of", "president of"],
"confidence": 0.9
}
}
def extract_relations(self, text, entities, source="unknown"):
"""
Extract relations from text given entities.
Args:
text (str): Input text
entities (list): List of entities
source (str): Source identifier
Returns:
list: List of relation dictionaries
"""
if not text or not entities:
return []
doc = self.nlp(text)
relations = []
# Extract relations using pattern matching
relations.extend(self._extract_pattern_relations(text, entities, source))
# Extract relations using dependency parsing
relations.extend(self._extract_dependency_relations(doc, entities, source))
# Remove duplicates
unique_relations = []
seen_relations = set()
for rel in relations:
rel_key = (rel["subject"], rel["predicate"], rel["object"])
if rel_key not in seen_relations:
unique_relations.append(rel)
seen_relations.add(rel_key)
return unique_relations
def _extract_pattern_relations(self, text, entities, source):
"""Extract relations using predefined patterns."""
relations = []
text_lower = text.lower()
# Create entity position mapping
entity_positions = []
for entity in entities:
entity_positions.append({
"start": entity["start"],
"end": entity["end"],
"entity": entity
})
entity_positions.sort(key=lambda x: x["start"])
for relation_type, relation_info in self.relation_patterns.items():
patterns = relation_info["patterns"]
confidence = relation_info["confidence"]
for pattern in patterns:
for match in re.finditer(re.escape(pattern), text_lower):
start, end = match.span()
# Find entities before and after the pattern
before_entities = [e for e in entity_positions if e["end"] <= start]
after_entities = [e for e in entity_positions if e["start"] >= end]
if before_entities and after_entities:
# Take the closest entities
subj_entity = before_entities[-1]["entity"]
obj_entity = after_entities[0]["entity"]
if subj_entity["normalized"] != obj_entity["normalized"]:
relations.append({
"subject": subj_entity["normalized"],
"predicate": relation_type,
"object": obj_entity["normalized"],
"confidence": confidence,
"source": source,
"extraction_method": "pattern",
"pattern_used": pattern
})
return relations
def _extract_dependency_relations(self, doc, entities, source):
"""Extract relations using dependency parsing."""
relations = []
# Create mapping from tokens to entities
token_to_entity = {}
for entity in entities:
# Find tokens that overlap with entity span
for token in doc:
if (token.idx >= entity["start"] and
token.idx + len(token.text) <= entity["end"]):
token_to_entity[token.i] = entity
# Look for subject-verb-object patterns
for token in doc:
if token.pos_ == "VERB":
# Find subject
subjects = [child for child in token.children if child.dep_ == "nsubj"]
# Find objects
objects = [child for child in token.children if child.dep_ in ["dobj", "pobj"]]
for subj in subjects:
for obj in objects:
subj_entity = self._find_entity_for_token(subj, token_to_entity, entities)
obj_entity = self._find_entity_for_token(obj, token_to_entity, entities)
if (subj_entity and obj_entity and
subj_entity["normalized"] != obj_entity["normalized"]):
relations.append({
"subject": subj_entity["normalized"],
"predicate": token.lemma_.upper(),
"object": obj_entity["normalized"],
"confidence": 0.6,
"source": source,
"extraction_method": "dependency",
"verb": token.text
})
return relations
def _find_entity_for_token(self, token, token_to_entity, entities):
"""Find entity associated with a token."""
# Direct mapping
if token.i in token_to_entity:
return token_to_entity[token.i]
# Check if token is part of any entity by text matching
token_text = token.text.lower()
for entity in entities:
if token_text in entity["normalized"].lower():
return entity
return None
class KnowledgeGraph:
"""
Knowledge graph implementation using NetworkX.
This class provides storage and querying capabilities for entities
and their relationships in a graph structure.
"""
def __init__(self):
"""Initialize empty knowledge graph."""
self.graph = nx.MultiDiGraph()
self.entity_index = {}
self.relation_index = defaultdict(list)
self.source_mapping = defaultdict(set)
self.statistics = {"entities_added": 0, "relations_added": 0}
def add_entities(self, entities):
"""
Add entities to the knowledge graph.
Args:
entities (list): List of entity dictionaries
"""
for entity in entities:
entity_id = entity["normalized"]
if entity_id not in self.graph:
# Add new entity
self.graph.add_node(entity_id, **entity)
self.entity_index[entity_id] = entity
self.statistics["entities_added"] += 1
else:
# Update existing entity
existing = self.graph.nodes[entity_id]
if "sources" not in existing:
existing["sources"] = set()
existing["sources"].add(entity["source"])
# Merge additional attributes
for key, value in entity.items():
if key not in existing:
existing[key] = value
# Update source mapping
self.source_mapping[entity["source"]].add(entity_id)
def add_relations(self, relations):
"""
Add relations to the knowledge graph.
Args:
relations (list): List of relation dictionaries
"""
for relation in relations:
subj = relation["subject"]
obj = relation["object"]
pred = relation["predicate"]
# Ensure entities exist in graph
if subj not in self.graph:
self.graph.add_node(subj, normalized=subj)
if obj not in self.graph:
self.graph.add_node(obj, normalized=obj)
# Add edge with relation information
self.graph.add_edge(subj, obj, **relation)
# Update relation index
self.relation_index[pred].append((subj, obj, relation))
self.statistics["relations_added"] += 1
def find_related_entities(self, entity, max_hops=2, relation_types=None):
"""
Find entities related to a given entity through graph traversal.
Args:
entity (str): Starting entity
max_hops (int): Maximum number of hops
relation_types (list): Filter by relation types
Returns:
list: List of related entities
"""
if entity not in self.graph:
return []
related = set()
current_level = {entity}
visited = {entity}
for hop in range(max_hops):
next_level = set()
for node in current_level:
# Get outgoing neighbors
for neighbor in self.graph.neighbors(node):
if neighbor not in visited:
# Check relation type filter
if relation_types:
for edge_data in self.graph[node][neighbor].values():
if edge_data.get("predicate") in relation_types:
next_level.add(neighbor)
break
else:
next_level.add(neighbor)
# Get incoming neighbors
for predecessor in self.graph.predecessors(node):
if predecessor not in visited:
# Check relation type filter
if relation_types:
for edge_data in self.graph[predecessor][node].values():
if edge_data.get("predicate") in relation_types:
next_level.add(predecessor)
break
else:
next_level.add(predecessor)
related.update(next_level)
visited.update(next_level)
current_level = next_level
if not current_level:
break
return list(related)
def get_entity_context(self, entity, include_relations=True, max_relations=10):
"""
Get comprehensive context for an entity.
Args:
entity (str): Entity to get context for
include_relations (bool): Include relationship information
max_relations (int): Maximum relations to include
Returns:
dict: Entity context information
"""
if entity not in self.graph:
return {}
context = {
"entity": dict(self.graph.nodes[entity]),
"direct_relations": [],
"related_entities": []
}
if include_relations:
relation_count = 0
# Get outgoing relations
for neighbor in self.graph.neighbors(entity):
if relation_count >= max_relations:
break
for edge_data in self.graph[entity][neighbor].values():
context["direct_relations"].append({
"type": "outgoing",
"target": neighbor,
"relation": dict(edge_data)
})
relation_count += 1
if relation_count >= max_relations:
break
# Get incoming relations
for predecessor in self.graph.predecessors(entity):
if relation_count >= max_relations:
break
for edge_data in self.graph[predecessor][entity].values():
context["direct_relations"].append({
"type": "incoming",
"source": predecessor,
"relation": dict(edge_data)
})
relation_count += 1
if relation_count >= max_relations:
break
# Get related entities (1-hop)
context["related_entities"] = self.find_related_entities(entity, max_hops=1)
return context
def query_subgraph(self, entities, max_hops=1):
"""
Extract subgraph around specified entities.
Args:
entities (list): List of entities
max_hops (int): Maximum hops from seed entities
Returns:
networkx.MultiDiGraph: Subgraph
"""
if not entities:
return nx.MultiDiGraph()
# Find all related entities
all_entities = set()
for entity in entities:
if entity in self.graph:
all_entities.add(entity)
related = self.find_related_entities(entity, max_hops)
all_entities.update(related)
# Create subgraph
if all_entities:
subgraph = self.graph.subgraph(all_entities).copy()
return subgraph
else:
return nx.MultiDiGraph()
def get_statistics(self):
"""Get knowledge graph statistics."""
stats = {
"nodes": self.graph.number_of_nodes(),
"edges": self.graph.number_of_edges(),
"entities_added": self.statistics["entities_added"],
"relations_added": self.statistics["relations_added"],
"sources": len(self.source_mapping)
}
if self.graph.number_of_nodes() > 0:
stats["average_degree"] = sum(dict(self.graph.degree()).values()) / self.graph.number_of_nodes()
stats["connected_components"] = nx.number_weakly_connected_components(self.graph)
else:
stats["average_degree"] = 0
stats["connected_components"] = 0
return stats
def get_top_entities(self, n=10, metric="degree"):
"""
Get top entities by specified metric.
Args:
n (int): Number of entities to return
metric (str): Metric to use ("degree", "betweenness", "pagerank")
Returns:
list: List of (entity, score) tuples
"""
if self.graph.number_of_nodes() == 0:
return []
if metric == "degree":
scores = dict(self.graph.degree())
elif metric == "betweenness":
scores = nx.betweenness_centrality(self.graph)
elif metric == "pagerank":
scores = nx.pagerank(self.graph)
else:
scores = dict(self.graph.degree())
return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:n]
class GraphRAGRetriever:
"""
Advanced retriever combining traditional RAG with graph-based retrieval.
This class provides comprehensive context by combining semantic search
with graph traversal and entity relationship analysis.
"""
def __init__(self, knowledge_base, knowledge_graph):
"""
Initialize GraphRAG retriever.
Args:
knowledge_base: Traditional RAG knowledge base
knowledge_graph: Knowledge graph instance
"""
self.knowledge_base = knowledge_base
self.knowledge_graph = knowledge_graph
self.entity_extractor = EntityExtractor()
def retrieve_context(self, query, k=3, graph_hops=2, max_graph_entities=5):
"""
Retrieve comprehensive context using both traditional and graph methods.
Args:
query (str): User query
k (int): Number of traditional RAG results
graph_hops (int): Maximum graph traversal hops
max_graph_entities (int): Maximum entities to include from graph
Returns:
dict: Comprehensive context information
"""
# Extract entities from query
query_entities = self.entity_extractor.extract_entities(query)
entity_names = [ent["normalized"] for ent in query_entities]
# Traditional RAG retrieval
traditional_context = []
if hasattr(self.knowledge_base, 'search_relevant_context'):
traditional_context = self.knowledge_base.search_relevant_context(query, k)
# Graph-based retrieval
graph_context = []
related_entities = set()
if entity_names:
# Find related entities through graph traversal
for entity in entity_names:
if entity in self.knowledge_graph.graph:
related = self.knowledge_graph.find_related_entities(entity, graph_hops)
related_entities.update(related[:max_graph_entities])
related_entities.add(entity)
# Get context for entities
for entity in list(related_entities)[:max_graph_entities]:
entity_context = self.knowledge_graph.get_entity_context(entity)
if entity_context and entity_context.get("direct_relations"):
graph_context.append(entity_context)
return {
"traditional_context": traditional_context,
"graph_context": graph_context,
"query_entities": entity_names,
"related_entities": list(related_entities),
"context_sources": self._get_context_sources(traditional_context, graph_context)
}
def _get_context_sources(self, traditional_context, graph_context):
"""Extract unique sources from context."""
sources = set()
# Traditional context sources
for ctx in traditional_context:
if isinstance(ctx, dict) and "source" in ctx:
sources.add(ctx["source"])
# Graph context sources
for ctx in graph_context:
if "entity" in ctx and "source" in ctx["entity"]:
sources.add(ctx["entity"]["source"])
for rel in ctx.get("direct_relations", []):
if "source" in rel.get("relation", {}):
sources.add(rel["relation"]["source"])
return list(sources)
def format_context_for_llm(self, context_data, max_traditional=3, max_graph=5):
"""
Format retrieved context for LLM consumption.
Args:
context_data (dict): Context data from retrieve_context
max_traditional (int): Maximum traditional context items
max_graph (int): Maximum graph context items
Returns:
str: Formatted context string
"""
formatted_parts = []
# Add traditional context
if context_data["traditional_context"]:
formatted_parts.append("RELEVANT DOCUMENTS:")
for i, ctx in enumerate(context_data["traditional_context"][:max_traditional]):
if isinstance(ctx, dict):
content = ctx.get("content", str(ctx))
source = ctx.get("source", "Unknown")
formatted_parts.append(f"\nDocument {i+1} (Source: {source}):")
formatted_parts.append(content)
else:
formatted_parts.append(f"\nDocument {i+1}:")
formatted_parts.append(str(ctx))
# Add graph context
if context_data["graph_context"]:
formatted_parts.append("\n\nENTITY RELATIONSHIPS:")
for i, entity_ctx in enumerate(context_data["graph_context"][:max_graph]):
entity_name = entity_ctx["entity"].get("normalized", "Unknown")
entity_type = entity_ctx["entity"].get("label", "")
formatted_parts.append(f"\nEntity: {entity_name}")
if entity_type:
formatted_parts.append(f"Type: {entity_type}")
# Add relationships
relations = entity_ctx.get("direct_relations", [])
if relations:
formatted_parts.append("Relationships:")
for rel in relations[:3]: # Limit to avoid overwhelming
rel_info = rel.get("relation", {})
predicate = rel_info.get("predicate", "RELATED_TO")
if rel["type"] == "outgoing":
target = rel.get("target", "Unknown")
formatted_parts.append(f" • {entity_name} {predicate} {target}")
else:
source = rel.get("source", "Unknown")
formatted_parts.append(f" • {source} {predicate} {entity_name}")
# Add query entity information
if context_data["query_entities"]:
formatted_parts.append(f"\nQuery mentions entities: {', '.join(context_data['query_entities'])}")
return "\n".join(formatted_parts)
class GraphRAGChatbot:
"""
Complete GraphRAG chatbot implementation.
This chatbot combines traditional RAG with knowledge graph reasoning
to provide comprehensive, relationship-aware responses.
"""
def __init__(self, llm_model="microsoft/DialoGPT-medium", embedding_model="all-MiniLM-L6-v2"):
"""
Initialize GraphRAG chatbot.
Args:
llm_model (str): Language model for generation
embedding_model (str): Model for embeddings
"""
print("Initializing GraphRAG Chatbot...")
# Initialize device
self.device = self._get_optimal_device()
print(f"Using device: {self.device}")
# Initialize LLM components
print(f"Loading language model: {llm_model}")
self.tokenizer = AutoTokenizer.from_pretrained(llm_model)
self.model = AutoModelForCausalLM.from_pretrained(llm_model)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model.to(self.device)
self.model.eval()
# Initialize GraphRAG components
from chapter_b_rag import KnowledgeBase # Reuse from previous chapter
self.knowledge_base = KnowledgeBase(embedding_model)
self.knowledge_graph = KnowledgeGraph()
self.entity_extractor = EntityExtractor()
self.relation_extractor = RelationExtractor(self.entity_extractor.nlp)
self.retriever = GraphRAGRetriever(self.knowledge_base, self.knowledge_graph)
# Conversation state
self.conversation_history = []
self.max_context_length = 1000
print("GraphRAG Chatbot ready!")
def _get_optimal_device(self):
"""Determine optimal device for inference."""
if torch.cuda.is_available():
return torch.device("cuda")
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
return torch.device("mps")
else:
return torch.device("cpu")
def build_knowledge_base(self, documents, sources=None):
"""
Build both traditional knowledge base and knowledge graph.
Args:
documents (list): List of text documents
sources (list): Optional source identifiers
"""
print("Building knowledge base and knowledge graph...")
# Build traditional knowledge base
self.knowledge_base.add_documents(documents, sources)
# Build knowledge graph
if sources is None:
sources = [f"document_{i}" for i in range(len(documents))]
all_entities = []
all_relations = []
for doc, source in zip(documents, sources):
print(f"Processing {source} for knowledge graph...")
# Extract entities
entities = self.entity_extractor.extract_entities(doc, source)
all_entities.extend(entities)
# Extract relations
relations = self.relation_extractor.extract_relations(doc, entities, source)
all_relations.extend(relations)
# Add to knowledge graph
if all_entities:
self.knowledge_graph.add_entities(all_entities)
print(f"Added {len(all_entities)} entities to knowledge graph")
if all_relations:
self.knowledge_graph.add_relations(all_relations)
print(f"Added {len(all_relations)} relations to knowledge graph")
# Print statistics
stats = self.knowledge_graph.get_statistics()
print(f"Knowledge graph: {stats['nodes']} nodes, {stats['edges']} edges")
def generate_response(self, user_input, use_graph=True, k=3):
"""
Generate response using GraphRAG.
Args:
user_input (str): User's input
use_graph (bool): Whether to use graph reasoning
k (int): Number of context chunks to retrieve
Returns:
dict: Response with metadata
"""
try:
# Retrieve context
if use_graph:
context_data = self.retriever.retrieve_context(user_input, k)
formatted_context = self.retriever.format_context_for_llm(context_data)
else:
# Fallback to traditional RAG
traditional_context = self.knowledge_base.search_relevant_context(user_input, k)
formatted_context = "\n".join([
ctx.get("content", str(ctx)) if isinstance(ctx, dict) else str(ctx)
for ctx in traditional_context
])
context_data = {"traditional_context": traditional_context, "graph_context": []}
# Construct prompt
if formatted_context.strip():
prompt = f"""Context Information:
{formatted_context}
Based on the above context, please provide a comprehensive answer to the following question. Consider both the document information and entity relationships when formulating your response.
Question: {user_input}
Answer:"""
else:
prompt = f"Question: {user_input}\n\nAnswer:"
# Generate response
inputs = self.tokenizer.encode(
prompt,
return_tensors='pt',
truncation=True,
max_length=self.max_context_length
)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=inputs.shape[1] + 200,
num_beams=3,
no_repeat_ngram_size=3,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=self.tokenizer.eos_token_id,
early_stopping=True
)
response = self.tokenizer.decode(
outputs[0][inputs.shape[1]:],
skip_special_tokens=True
).strip()
# Store conversation
self.conversation_history.append({
"user": user_input,
"assistant": response,
"used_graph": use_graph,
"context_data": context_data
})
return {
"response": response,
"used_graph": use_graph,
"context_data": context_data,
"sources": context_data.get("context_sources", [])
}
except Exception as e:
print(f"Error generating response: {e}")
return {
"response": "I apologize, but I encountered an error processing your request.",
"used_graph": False,
"context_data": {},
"sources": []
}
def get_knowledge_graph_stats(self):
"""Get knowledge graph statistics."""
return self.knowledge_graph.get_statistics()
def get_top_entities(self, n=10):
"""Get top entities by degree centrality."""
return self.knowledge_graph.get_top_entities(n)
def explore_entity(self, entity_name):
"""Explore a specific entity in the knowledge graph."""
return self.knowledge_graph.get_entity_context(entity_name)
def run_interactive_session(self):
"""Run interactive GraphRAG session."""
print("\n" + "=" * 70)
print("GRAPHRAG CHATBOT - Interactive Session")
print("=" * 70)
print("Commands:")
print(" 'quit' or 'exit' - End conversation")
print(" 'stats' - Show knowledge graph statistics")
print(" 'top_entities' - Show most connected entities")
print(" 'explore <entity>' - Explore specific entity")
print(" 'toggle_graph' - Toggle graph reasoning on/off")
print("=" * 70)
use_graph = True
while True:
try:
user_input = input(f"\nYou: ").strip()
if user_input.lower() in ['quit', 'exit', 'bye']:
print("\nChatbot: Thank you for using GraphRAG Chatbot! Goodbye!")
break
elif user_input.lower() == 'stats':
stats = self.get_knowledge_graph_stats()
kb_stats = self.knowledge_base.get_stats()
print(f"\nKnowledge Graph Statistics:")
print(f" Entities: {stats['nodes']}")
print(f" Relations: {stats['edges']}")
print(f" Average degree: {stats['average_degree']:.2f}")
print(f" Connected components: {stats['connected_components']}")
print(f" Document chunks: {kb_stats['total_chunks']}")
continue
elif user_input.lower() == 'top_entities':
top_entities = self.get_top_entities(10)
print(f"\nTop Entities by Connections:")
for i, (entity, degree) in enumerate(top_entities, 1):
print(f" {i}. {entity} ({degree} connections)")
continue
elif user_input.lower().startswith('explore '):
entity_name = user_input[8:].strip()
context = self.explore_entity(entity_name)
if context:
print(f"\nEntity: {entity_name}")
print(f"Type: {context['entity'].get('label', 'Unknown')}")
print(f"Relations: {len(context['direct_relations'])}")
for rel in context['direct_relations'][:5]:
rel_info = rel['relation']
if rel['type'] == 'outgoing':
print(f" → {rel_info['predicate']} {rel['target']}")
else:
print(f" ← {rel['source']} {rel_info['predicate']}")
else:
print(f"\nEntity '{entity_name}' not found in knowledge graph")
continue
elif user_input.lower() == 'toggle_graph':
use_graph = not use_graph
status = "enabled" if use_graph else "disabled"
print(f"\nGraph reasoning {status}")
continue
if not user_input:
continue
# Generate response
mode_indicator = "[GraphRAG]" if use_graph else "[RAG]"
print(f"Chatbot {mode_indicator}: ", end="", flush=True)
result = self.generate_response(user_input, use_graph)
print(result["response"])
# Show sources and entities if available
if result.get("context_data"):
entities = result["context_data"].get("query_entities", [])
if entities:
print(f"Entities: {', '.join(entities)}")
sources = result.get("sources", [])
if sources:
print(f"Sources: {', '.join(sources)}")
except KeyboardInterrupt:
print("\n\nSession interrupted. Goodbye!")
break
except Exception as e:
print(f"\nError: {e}")
def create_sample_knowledge_base_with_entities():
"""Create a sample knowledge base rich in entities and relationships."""
sample_documents = [
"""
Microsoft Corporation is a multinational technology company founded by Bill Gates and Paul Allen in 1975.
The company is headquartered in Redmond, Washington. Satya Nadella serves as the current CEO of Microsoft,
having taken over from Steve Ballmer in 2014. Microsoft is known for developing the Windows operating system,
Microsoft Office suite, and Azure cloud computing platform. The company acquired LinkedIn in 2016 for
$26.2 billion and GitHub in 2018 for $7.5 billion.
""",
"""
Apple Inc. is a technology company founded by Steve Jobs, Steve Wozniak, and Ronald Wayne in 1976.
The company is based in Cupertino, California. Tim Cook is the current CEO of Apple, succeeding
Steve Jobs in 2011. Apple is famous for products like the iPhone, iPad, Mac computers, and Apple Watch.
The company's services include the App Store, iCloud, and Apple Music. Apple became the first company
to reach a $1 trillion market valuation in 2018.
""",
"""
Google LLC, a subsidiary of Alphabet Inc., was founded by Larry Page and Sergey Brin in 1998 while
they were PhD students at Stanford University. The company is headquartered in Mountain View, California.
Sundar Pichai serves as the CEO of Google. The company is best known for its search engine, but also
develops Android operating system, Chrome browser, and Google Cloud platform. Google acquired YouTube
in 2006 for $1.65 billion and Android Inc. in 2005.
""",
"""
Amazon.com Inc. was founded by Jeff Bezos in 1994 in Bellevue, Washington. The company started as an
online bookstore but has expanded to become one of the world's largest e-commerce and cloud computing
companies. Andy Jassy became CEO in 2021, taking over from Jeff Bezos. Amazon Web Services (AWS) is
the company's cloud computing division. Amazon acquired Whole Foods Market in 2017 for $13.7 billion.
""",
"""
Tesla Inc. is an electric vehicle and clean energy company founded by Martin Eberhard and Marc Tarpenning
in 2003. Elon Musk joined as chairman and later became CEO. The company is headquartered in Austin, Texas,
having moved from Palo Alto, California. Tesla produces electric vehicles including the Model S, Model 3,
Model X, and Model Y. The company also develops energy storage systems and solar panels through its
acquisition of SolarCity in 2016.
"""
]
sources = [
"Microsoft_Profile",
"Apple_Profile",
"Google_Profile",
"Amazon_Profile",
"Tesla_Profile"
]
return sample_documents, sources
def main():
"""
Main function demonstrating GraphRAG chatbot functionality.
"""
try:
# Create GraphRAG chatbot
chatbot = GraphRAGChatbot()
# Create sample knowledge base
print("\nCreating sample knowledge base with entities and relationships...")
documents, sources = create_sample_knowledge_base_with_entities()
chatbot.build_knowledge_base(documents, sources)
# Show statistics
stats = chatbot.get_knowledge_graph_stats()
kb_stats = chatbot.knowledge_base.get_stats()
print(f"\nKnowledge base created:")
print(f" Documents: {kb_stats['total_documents']}")
print(f" Text chunks: {kb_stats['total_chunks']}")
print(f" Entities: {stats['nodes']}")
print(f" Relations: {stats['edges']}")
# Show top entities
top_entities = chatbot.get_top_entities(5)
print(f"\nTop entities by connections:")
for entity, degree in top_entities:
print(f" {entity}: {degree} connections")
# Run interactive session
chatbot.run_interactive_session()
except Exception as e:
print(f"Failed to initialize GraphRAG chatbot: {e}")
print("Please check your installation and try again.")
if __name__ == "__main__":
main()
This comprehensive GraphRAG implementation demonstrates the power of combining traditional retrieval with graph-based reasoning. The system can understand entity relationships, perform multi-hop reasoning, and provide more contextually aware responses by leveraging the structured knowledge representation in the graph.
CHAPTER D: IMPLEMENTING MULTI-AGENT SYSTEMS
Multi-Agent Systems represent a paradigm shift from single-model architectures to collaborative networks of specialized AI agents working together to solve complex problems. Unlike monolithic chatbots that attempt to handle all tasks with a single model, multi-agent systems decompose problems into manageable components, assigning each to agents with specific expertise and capabilities.
The fundamental principle underlying multi-agent systems lies in the division of labor and specialization. Just as human organizations benefit from having experts in different domains collaborate on complex projects, AI systems can achieve superior performance by employing multiple agents with distinct roles, knowledge bases, and reasoning capabilities. This approach enables more sophisticated problem-solving, better error handling, and improved scalability.
The rationale for implementing multi-agent systems stems from the limitations of single-agent approaches when dealing with complex, multi-faceted problems. Consider a user query that requires research, analysis, synthesis, and presentation. A single agent might struggle to excel at all these tasks simultaneously, whereas a multi-agent system can assign research to a specialized retrieval agent, analysis to a reasoning agent, synthesis to an integration agent, and presentation to a formatting agent.
Multi-agent architecture consists of several key components working in coordination. Agent definitions specify individual agent capabilities, roles, and interfaces. Communication protocols enable agents to exchange information and coordinate actions. Task orchestration manages the flow of work between agents. Shared memory or message passing facilitates information sharing. Finally, result aggregation combines outputs from multiple agents into coherent responses.
The implementation of multi-agent systems requires careful consideration of agent design, communication patterns, and coordination mechanisms. Agents can be implemented as separate model instances, specialized prompts within the same model, or hybrid approaches combining both strategies. Communication can be synchronous or asynchronous, direct or mediated through a central coordinator.
Setting up a multi-agent system requires extending our existing infrastructure with agent management capabilities. We will implement a flexible framework that supports different agent types, communication patterns, and coordination strategies:
import asyncio
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import time
import uuid
class AgentRole(Enum):
COORDINATOR = "coordinator"
RESEARCHER = "researcher"
ANALYZER = "analyzer"
SYNTHESIZER = "synthesizer"
CRITIC = "critic"
FORMATTER = "formatter"
@dataclass
class Message:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
sender: str = ""
recipient: str = ""
content: str = ""
message_type: str = "text"
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
@dataclass
class AgentCapability:
name: str
description: str
input_types: List[str]
output_types: List[str]
parameters: Dict[str, Any] = field(default_factory=dict)
class Agent(ABC):
def __init__(self, agent_id: str, role: AgentRole, capabilities: List[AgentCapability]):
self.agent_id = agent_id
self.role = role
self.capabilities = capabilities
self.message_history: List[Message] = []
self.state: Dict[str, Any] = {}
self.is_active = True
@abstractmethod
async def process_message(self, message: Message) -> List[Message]:
"""Process incoming message and return response messages."""
pass
@abstractmethod
def can_handle(self, task_type: str) -> bool:
"""Check if agent can handle a specific task type."""
pass
def add_message_to_history(self, message: Message):
"""Add message to agent's history."""
self.message_history.append(message)
def get_recent_context(self, n: int = 5) -> List[Message]:
"""Get recent messages for context."""
return self.message_history[-n:]
The base Agent class provides a foundation for implementing specialized agents. Each agent has a unique identifier, role, capabilities, and maintains its own message history and state. The abstract methods ensure that concrete agent implementations provide the necessary functionality for message processing and task handling.
Agent specialization enables different agents to excel at specific tasks. We will implement several specialized agent types, each optimized for particular aspects of the problem-solving process:
class ResearchAgent(Agent):
def __init__(self, agent_id: str, knowledge_base, embedding_model):
capabilities = [
AgentCapability(
name="document_search",
description="Search knowledge base for relevant information",
input_types=["query"],
output_types=["documents", "facts"]
),
AgentCapability(
name="fact_extraction",
description="Extract specific facts from documents",
input_types=["documents", "questions"],
output_types=["facts", "evidence"]
)
]
super().__init__(agent_id, AgentRole.RESEARCHER, capabilities)
self.knowledge_base = knowledge_base
self.embedding_model = embedding_model
async def process_message(self, message: Message) -> List[Message]:
self.add_message_to_history(message)
if message.message_type == "research_request":
query = message.content
results = await self._conduct_research(query)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps(results),
message_type="research_results",
metadata={"query": query, "result_count": len(results)}
)
return [response]
return []
async def _conduct_research(self, query: str) -> List[Dict[str, Any]]:
# Simulate async research operation
await asyncio.sleep(0.1)
if hasattr(self.knowledge_base, 'search_relevant_context'):
context_results = self.knowledge_base.search_relevant_context(query, k=5)
research_results = []
for i, ctx in enumerate(context_results):
if isinstance(ctx, dict):
research_results.append({
"id": f"result_{i}",
"content": ctx.get("content", ""),
"source": ctx.get("source", "unknown"),
"relevance_score": ctx.get("score", 0.0),
"type": "document_chunk"
})
else:
research_results.append({
"id": f"result_{i}",
"content": str(ctx),
"source": "unknown",
"relevance_score": 0.5,
"type": "document_chunk"
})
return research_results
return []
def can_handle(self, task_type: str) -> bool:
return task_type in ["research", "search", "fact_finding", "information_retrieval"]
class AnalyzerAgent(Agent):
def __init__(self, agent_id: str, llm_model, tokenizer, device):
capabilities = [
AgentCapability(
name="content_analysis",
description="Analyze content for patterns, insights, and relationships",
input_types=["documents", "facts"],
output_types=["analysis", "insights"]
),
AgentCapability(
name="logical_reasoning",
description="Perform logical reasoning and inference",
input_types=["facts", "rules"],
output_types=["conclusions", "inferences"]
)
]
super().__init__(agent_id, AgentRole.ANALYZER, capabilities)
self.llm_model = llm_model
self.tokenizer = tokenizer
self.device = device
async def process_message(self, message: Message) -> List[Message]:
self.add_message_to_history(message)
if message.message_type == "analysis_request":
data = json.loads(message.content)
analysis = await self._analyze_content(data)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps(analysis),
message_type="analysis_results",
metadata={"analysis_type": data.get("type", "general")}
)
return [response]
return []
async def _analyze_content(self, data: Dict[str, Any]) -> Dict[str, Any]:
# Simulate async analysis
await asyncio.sleep(0.2)
content_items = data.get("content", [])
analysis_type = data.get("type", "general")
# Construct analysis prompt
content_text = "\n".join([
item.get("content", "") if isinstance(item, dict) else str(item)
for item in content_items
])
prompt = f"""Analyze the following content and provide insights:
Content:
{content_text}
Analysis Type: {analysis_type}
Please provide:
1. Key themes and patterns
2. Important relationships
3. Logical conclusions
4. Potential implications
Analysis:"""
# Generate analysis using LLM
try:
inputs = self.tokenizer.encode(prompt, return_tensors='pt', truncation=True, max_length=800)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.llm_model.generate(
inputs,
max_length=inputs.shape[1] + 200,
num_beams=3,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
analysis_text = self.tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)
return {
"analysis": analysis_text.strip(),
"content_summary": f"Analyzed {len(content_items)} items",
"analysis_type": analysis_type,
"confidence": 0.8
}
except Exception as e:
return {
"analysis": f"Analysis failed: {str(e)}",
"content_summary": f"Failed to analyze {len(content_items)} items",
"analysis_type": analysis_type,
"confidence": 0.0
}
def can_handle(self, task_type: str) -> bool:
return task_type in ["analysis", "reasoning", "pattern_recognition", "inference"]
class SynthesizerAgent(Agent):
def __init__(self, agent_id: str, llm_model, tokenizer, device):
capabilities = [
AgentCapability(
name="content_synthesis",
description="Synthesize multiple sources into coherent response",
input_types=["research_results", "analysis", "facts"],
output_types=["synthesis", "summary"]
),
AgentCapability(
name="response_generation",
description="Generate final responses based on synthesized information",
input_types=["synthesis", "query"],
output_types=["response"]
)
]
super().__init__(agent_id, AgentRole.SYNTHESIZER, capabilities)
self.llm_model = llm_model
self.tokenizer = tokenizer
self.device = device
async def process_message(self, message: Message) -> List[Message]:
self.add_message_to_history(message)
if message.message_type == "synthesis_request":
data = json.loads(message.content)
synthesis = await self._synthesize_information(data)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=synthesis,
message_type="synthesis_results",
metadata={"sources_count": len(data.get("sources", []))}
)
return [response]
return []
async def _synthesize_information(self, data: Dict[str, Any]) -> str:
# Simulate async synthesis
await asyncio.sleep(0.15)
query = data.get("query", "")
research_results = data.get("research_results", [])
analysis_results = data.get("analysis_results", {})
# Construct synthesis prompt
research_text = ""
if research_results:
research_text = "Research Findings:\n"
for i, result in enumerate(research_results[:5]):
if isinstance(result, dict):
research_text += f"{i+1}. {result.get('content', '')}\n"
else:
research_text += f"{i+1}. {str(result)}\n"
analysis_text = ""
if analysis_results:
analysis_text = f"Analysis:\n{analysis_results.get('analysis', '')}\n"
prompt = f"""Based on the following research and analysis, provide a comprehensive answer to the user's question.
User Question: {query}
{research_text}
{analysis_text}
Please synthesize this information into a clear, well-structured response that directly addresses the user's question:"""
# Generate synthesis using LLM
try:
inputs = self.tokenizer.encode(prompt, return_tensors='pt', truncation=True, max_length=900)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.llm_model.generate(
inputs,
max_length=inputs.shape[1] + 250,
num_beams=3,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
synthesis = self.tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)
return synthesis.strip()
except Exception as e:
return f"I apologize, but I encountered an error synthesizing the information: {str(e)}"
def can_handle(self, task_type: str) -> bool:
return task_type in ["synthesis", "summarization", "response_generation", "integration"]
The coordination mechanism manages the flow of work between agents, ensuring that tasks are properly distributed and results are aggregated. The coordinator agent serves as the central orchestrator, deciding which agents to involve and in what sequence:
class CoordinatorAgent(Agent):
def __init__(self, agent_id: str, agents: Dict[str, Agent]):
capabilities = [
AgentCapability(
name="task_orchestration",
description="Coordinate tasks between multiple agents",
input_types=["user_query"],
output_types=["final_response"]
),
AgentCapability(
name="agent_management",
description="Manage agent interactions and workflow",
input_types=["agent_responses"],
output_types=["coordination_decisions"]
)
]
super().__init__(agent_id, AgentRole.COORDINATOR, capabilities)
self.agents = agents
self.active_workflows: Dict[str, Dict[str, Any]] = {}
async def process_message(self, message: Message) -> List[Message]:
self.add_message_to_history(message)
if message.message_type == "user_query":
response = await self._coordinate_response(message.content, message.id)
return [Message(
sender=self.agent_id,
recipient=message.sender,
content=response,
message_type="final_response",
metadata={"workflow_id": message.id}
)]
return []
async def _coordinate_response(self, query: str, workflow_id: str) -> str:
"""Coordinate multi-agent response to user query."""
try:
# Initialize workflow
self.active_workflows[workflow_id] = {
"query": query,
"status": "active",
"results": {},
"start_time": time.time()
}
# Step 1: Research
research_results = await self._request_research(query)
self.active_workflows[workflow_id]["results"]["research"] = research_results
# Step 2: Analysis
analysis_results = await self._request_analysis(research_results, query)
self.active_workflows[workflow_id]["results"]["analysis"] = analysis_results
# Step 3: Synthesis
final_response = await self._request_synthesis(query, research_results, analysis_results)
self.active_workflows[workflow_id]["results"]["synthesis"] = final_response
# Mark workflow complete
self.active_workflows[workflow_id]["status"] = "complete"
self.active_workflows[workflow_id]["end_time"] = time.time()
return final_response
except Exception as e:
return f"I apologize, but I encountered an error coordinating the response: {str(e)}"
async def _request_research(self, query: str) -> List[Dict[str, Any]]:
"""Request research from research agent."""
researcher = self._find_agent_by_role(AgentRole.RESEARCHER)
if not researcher:
return []
research_message = Message(
sender=self.agent_id,
recipient=researcher.agent_id,
content=query,
message_type="research_request"
)
responses = await researcher.process_message(research_message)
if responses:
return json.loads(responses[0].content)
return []
async def _request_analysis(self, research_results: List[Dict[str, Any]], query: str) -> Dict[str, Any]:
"""Request analysis from analyzer agent."""
analyzer = self._find_agent_by_role(AgentRole.ANALYZER)
if not analyzer:
return {}
analysis_data = {
"content": research_results,
"type": "research_analysis",
"query": query
}
analysis_message = Message(
sender=self.agent_id,
recipient=analyzer.agent_id,
content=json.dumps(analysis_data),
message_type="analysis_request"
)
responses = await analyzer.process_message(analysis_message)
if responses:
return json.loads(responses[0].content)
return {}
async def _request_synthesis(self, query: str, research_results: List[Dict[str, Any]],
analysis_results: Dict[str, Any]) -> str:
"""Request synthesis from synthesizer agent."""
synthesizer = self._find_agent_by_role(AgentRole.SYNTHESIZER)
if not synthesizer:
return "Unable to synthesize response - synthesizer agent not available."
synthesis_data = {
"query": query,
"research_results": research_results,
"analysis_results": analysis_results
}
synthesis_message = Message(
sender=self.agent_id,
recipient=synthesizer.agent_id,
content=json.dumps(synthesis_data),
message_type="synthesis_request"
)
responses = await synthesizer.process_message(synthesis_message)
if responses:
return responses[0].content
return "Unable to generate synthesis."
def _find_agent_by_role(self, role: AgentRole) -> Optional[Agent]:
"""Find agent by role."""
for agent in self.agents.values():
if agent.role == role:
return agent
return None
def can_handle(self, task_type: str) -> bool:
return task_type in ["coordination", "orchestration", "workflow_management"]
def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get status of a specific workflow."""
return self.active_workflows.get(workflow_id, {})
COMPLETE RUNNING EXAMPLE FOR MULTI-AGENT SYSTEM:
import asyncio
import json
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Any, Optional
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import warnings
warnings.filterwarnings("ignore")
class AgentRole(Enum):
"""Enumeration of agent roles in the multi-agent system."""
COORDINATOR = "coordinator"
RESEARCHER = "researcher"
ANALYZER = "analyzer"
SYNTHESIZER = "synthesizer"
CRITIC = "critic"
FORMATTER = "formatter"
@dataclass
class Message:
"""Message structure for inter-agent communication."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
sender: str = ""
recipient: str = ""
content: str = ""
message_type: str = "text"
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
@dataclass
class AgentCapability:
"""Definition of agent capabilities."""
name: str
description: str
input_types: List[str]
output_types: List[str]
parameters: Dict[str, Any] = field(default_factory=dict)
class Agent(ABC):
"""
Abstract base class for all agents in the multi-agent system.
This class defines the common interface and functionality that all
agents must implement, including message processing and capability checking.
"""
def __init__(self, agent_id: str, role: AgentRole, capabilities: List[AgentCapability]):
"""
Initialize agent with basic properties.
Args:
agent_id (str): Unique identifier for the agent
role (AgentRole): Role of the agent in the system
capabilities (List[AgentCapability]): List of agent capabilities
"""
self.agent_id = agent_id
self.role = role
self.capabilities = capabilities
self.message_history: List[Message] = []
self.state: Dict[str, Any] = {}
self.is_active = True
self.performance_metrics = {
"messages_processed": 0,
"average_response_time": 0.0,
"success_rate": 1.0
}
@abstractmethod
async def process_message(self, message: Message) -> List[Message]:
"""
Process incoming message and return response messages.
Args:
message (Message): Incoming message to process
Returns:
List[Message]: List of response messages
"""
pass
@abstractmethod
def can_handle(self, task_type: str) -> bool:
"""
Check if agent can handle a specific task type.
Args:
task_type (str): Type of task to check
Returns:
bool: True if agent can handle the task
"""
pass
def add_message_to_history(self, message: Message):
"""Add message to agent's history and update metrics."""
self.message_history.append(message)
self.performance_metrics["messages_processed"] += 1
def get_recent_context(self, n: int = 5) -> List[Message]:
"""Get recent messages for context."""
return self.message_history[-n:]
def get_performance_metrics(self) -> Dict[str, Any]:
"""Get agent performance metrics."""
return self.performance_metrics.copy()
class ResearchAgent(Agent):
"""
Research agent specialized in information retrieval and fact-finding.
This agent searches knowledge bases, extracts relevant information,
and provides research results to other agents in the system.
"""
def __init__(self, agent_id: str, knowledge_base=None):
"""
Initialize research agent.
Args:
agent_id (str): Unique agent identifier
knowledge_base: Knowledge base for information retrieval
"""
capabilities = [
AgentCapability(
name="document_search",
description="Search knowledge base for relevant information",
input_types=["query"],
output_types=["documents", "facts"]
),
AgentCapability(
name="fact_extraction",
description="Extract specific facts from documents",
input_types=["documents", "questions"],
output_types=["facts", "evidence"]
),
AgentCapability(
name="source_verification",
description="Verify and rank information sources",
input_types=["sources"],
output_types=["verified_sources"]
)
]
super().__init__(agent_id, AgentRole.RESEARCHER, capabilities)
self.knowledge_base = knowledge_base
self.search_cache = {}
async def process_message(self, message: Message) -> List[Message]:
"""Process research requests and return findings."""
start_time = time.time()
self.add_message_to_history(message)
try:
if message.message_type == "research_request":
query = message.content
results = await self._conduct_research(query)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps(results),
message_type="research_results",
metadata={
"query": query,
"result_count": len(results),
"search_time": time.time() - start_time
}
)
# Update performance metrics
response_time = time.time() - start_time
self._update_response_time(response_time)
return [response]
elif message.message_type == "fact_check_request":
data = json.loads(message.content)
verification = await self._verify_facts(data)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps(verification),
message_type="fact_check_results",
metadata={"verification_time": time.time() - start_time}
)
return [response]
except Exception as e:
error_response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps({"error": str(e), "status": "failed"}),
message_type="error_response",
metadata={"error_time": time.time() - start_time}
)
self.performance_metrics["success_rate"] *= 0.95 # Decrease success rate
return [error_response]
return []
async def _conduct_research(self, query: str) -> List[Dict[str, Any]]:
"""Conduct research using available knowledge base."""
# Check cache first
if query in self.search_cache:
return self.search_cache[query]
# Simulate async research operation
await asyncio.sleep(0.1)
research_results = []
if self.knowledge_base and hasattr(self.knowledge_base, 'search_relevant_context'):
try:
context_results = self.knowledge_base.search_relevant_context(query, k=5)
for i, ctx in enumerate(context_results):
if isinstance(ctx, dict):
research_results.append({
"id": f"research_{i}",
"content": ctx.get("content", ""),
"source": ctx.get("source", "unknown"),
"relevance_score": ctx.get("score", 0.0),
"type": "document_chunk",
"metadata": {
"extraction_method": "semantic_search",
"confidence": min(ctx.get("score", 0.0) * 1.2, 1.0)
}
})
else:
research_results.append({
"id": f"research_{i}",
"content": str(ctx),
"source": "unknown",
"relevance_score": 0.5,
"type": "document_chunk",
"metadata": {
"extraction_method": "text_search",
"confidence": 0.5
}
})
except Exception as e:
research_results.append({
"id": "error_result",
"content": f"Research error: {str(e)}",
"source": "system",
"relevance_score": 0.0,
"type": "error",
"metadata": {"error": True}
})
else:
# Fallback: simulated research results
research_results = [
{
"id": "simulated_result",
"content": f"Simulated research result for query: {query}",
"source": "simulation",
"relevance_score": 0.7,
"type": "simulated",
"metadata": {"simulated": True}
}
]
# Cache results
self.search_cache[query] = research_results
return research_results
async def _verify_facts(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Verify facts against knowledge base."""
await asyncio.sleep(0.05)
facts = data.get("facts", [])
verification_results = []
for fact in facts:
# Simple fact verification simulation
verification_results.append({
"fact": fact,
"verified": True, # Simplified verification
"confidence": 0.8,
"sources": ["knowledge_base"],
"verification_method": "knowledge_base_lookup"
})
return {
"verification_results": verification_results,
"overall_confidence": 0.8,
"verification_timestamp": time.time()
}
def _update_response_time(self, response_time: float):
"""Update average response time metric."""
current_avg = self.performance_metrics["average_response_time"]
message_count = self.performance_metrics["messages_processed"]
if message_count == 1:
self.performance_metrics["average_response_time"] = response_time
else:
self.performance_metrics["average_response_time"] = (
(current_avg * (message_count - 1) + response_time) / message_count
)
def can_handle(self, task_type: str) -> bool:
"""Check if agent can handle specific task types."""
return task_type in [
"research", "search", "fact_finding", "information_retrieval",
"fact_checking", "source_verification", "document_analysis"
]
class AnalyzerAgent(Agent):
"""
Analyzer agent specialized in content analysis and logical reasoning.
This agent analyzes information, identifies patterns, performs logical
reasoning, and provides insights based on available data.
"""
def __init__(self, agent_id: str, llm_model=None, tokenizer=None, device=None):
"""
Initialize analyzer agent.
Args:
agent_id (str): Unique agent identifier
llm_model: Language model for analysis
tokenizer: Tokenizer for the language model
device: Device for model inference
"""
capabilities = [
AgentCapability(
name="content_analysis",
description="Analyze content for patterns, insights, and relationships",
input_types=["documents", "facts", "data"],
output_types=["analysis", "insights", "patterns"]
),
AgentCapability(
name="logical_reasoning",
description="Perform logical reasoning and inference",
input_types=["facts", "rules", "premises"],
output_types=["conclusions", "inferences", "logical_chains"]
),
AgentCapability(
name="pattern_recognition",
description="Identify patterns and trends in data",
input_types=["data", "sequences"],
output_types=["patterns", "trends", "anomalies"]
)
]
super().__init__(agent_id, AgentRole.ANALYZER, capabilities)
self.llm_model = llm_model
self.tokenizer = tokenizer
self.device = device
self.analysis_cache = {}
async def process_message(self, message: Message) -> List[Message]:
"""Process analysis requests and return insights."""
start_time = time.time()
self.add_message_to_history(message)
try:
if message.message_type == "analysis_request":
data = json.loads(message.content)
analysis = await self._analyze_content(data)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps(analysis),
message_type="analysis_results",
metadata={
"analysis_type": data.get("type", "general"),
"analysis_time": time.time() - start_time
}
)
self._update_response_time(time.time() - start_time)
return [response]
elif message.message_type == "reasoning_request":
data = json.loads(message.content)
reasoning = await self._perform_reasoning(data)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps(reasoning),
message_type="reasoning_results",
metadata={"reasoning_time": time.time() - start_time}
)
return [response]
except Exception as e:
error_response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps({"error": str(e), "status": "failed"}),
message_type="error_response",
metadata={"error_time": time.time() - start_time}
)
self.performance_metrics["success_rate"] *= 0.95
return [error_response]
return []
async def _analyze_content(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze content and provide insights."""
# Check cache
cache_key = str(hash(json.dumps(data, sort_keys=True)))
if cache_key in self.analysis_cache:
return self.analysis_cache[cache_key]
await asyncio.sleep(0.2) # Simulate processing time
content_items = data.get("content", [])
analysis_type = data.get("type", "general")
query = data.get("query", "")
# Prepare content for analysis
content_text = ""
for item in content_items:
if isinstance(item, dict):
content_text += item.get("content", "") + "\n"
else:
content_text += str(item) + "\n"
analysis_result = {
"analysis_type": analysis_type,
"content_summary": f"Analyzed {len(content_items)} content items",
"key_themes": [],
"patterns": [],
"insights": [],
"confidence": 0.8
}
if self.llm_model and self.tokenizer and content_text.strip():
try:
# Construct analysis prompt
prompt = f"""Analyze the following content and provide detailed insights:
Content to analyze:
{content_text[:1000]} # Limit content length
Analysis focus: {analysis_type}
Related query: {query}
Please provide:
1. Key themes and main topics
2. Important patterns or relationships
3. Logical insights and implications
4. Potential conclusions
Analysis:"""
# Generate analysis using LLM
inputs = self.tokenizer.encode(
prompt,
return_tensors='pt',
truncation=True,
max_length=800
)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.llm_model.generate(
inputs,
max_length=inputs.shape[1] + 200,
num_beams=3,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
early_stopping=True
)
analysis_text = self.tokenizer.decode(
outputs[0][inputs.shape[1]:],
skip_special_tokens=True
).strip()
# Parse analysis into structured format
analysis_result.update({
"detailed_analysis": analysis_text,
"key_themes": self._extract_themes(analysis_text),
"patterns": self._extract_patterns(analysis_text),
"insights": self._extract_insights(analysis_text),
"confidence": 0.85
})
except Exception as e:
analysis_result.update({
"detailed_analysis": f"Analysis generation failed: {str(e)}",
"confidence": 0.1,
"error": str(e)
})
else:
# Fallback analysis
analysis_result.update({
"detailed_analysis": f"Basic analysis of {len(content_items)} items for {analysis_type}",
"key_themes": ["Content analysis", "Information processing"],
"patterns": ["Data structure patterns"],
"insights": ["Multiple content sources analyzed"],
"confidence": 0.6
})
# Cache result
self.analysis_cache[cache_key] = analysis_result
return analysis_result
async def _perform_reasoning(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Perform logical reasoning on provided premises."""
await asyncio.sleep(0.15)
premises = data.get("premises", [])
reasoning_type = data.get("type", "deductive")
reasoning_result = {
"reasoning_type": reasoning_type,
"premises_count": len(premises),
"conclusions": [],
"logical_chain": [],
"confidence": 0.7
}
# Simple logical reasoning simulation
if premises:
reasoning_result.update({
"conclusions": [f"Conclusion based on {len(premises)} premises"],
"logical_chain": [f"Step 1: Analyze premises", f"Step 2: Apply {reasoning_type} reasoning"],
"confidence": 0.8
})
return reasoning_result
def _extract_themes(self, text: str) -> List[str]:
"""Extract key themes from analysis text."""
# Simple theme extraction
themes = []
if "technology" in text.lower():
themes.append("Technology")
if "business" in text.lower():
themes.append("Business")
if "innovation" in text.lower():
themes.append("Innovation")
return themes[:3]
def _extract_patterns(self, text: str) -> List[str]:
"""Extract patterns from analysis text."""
patterns = []
if "relationship" in text.lower():
patterns.append("Relationship patterns")
if "trend" in text.lower():
patterns.append("Trend patterns")
return patterns[:3]
def _extract_insights(self, text: str) -> List[str]:
"""Extract insights from analysis text."""
insights = []
sentences = text.split('.')
for sentence in sentences[:3]:
if len(sentence.strip()) > 20:
insights.append(sentence.strip())
return insights
def _update_response_time(self, response_time: float):
"""Update average response time metric."""
current_avg = self.performance_metrics["average_response_time"]
message_count = self.performance_metrics["messages_processed"]
if message_count == 1:
self.performance_metrics["average_response_time"] = response_time
else:
self.performance_metrics["average_response_time"] = (
(current_avg * (message_count - 1) + response_time) / message_count
)
def can_handle(self, task_type: str) -> bool:
"""Check if agent can handle specific task types."""
return task_type in [
"analysis", "reasoning", "pattern_recognition", "inference",
"logical_analysis", "content_analysis", "insight_generation"
]
class SynthesizerAgent(Agent):
"""
Synthesizer agent specialized in information integration and response generation.
This agent combines information from multiple sources, synthesizes insights,
and generates coherent, comprehensive responses.
"""
def __init__(self, agent_id: str, llm_model=None, tokenizer=None, device=None):
"""
Initialize synthesizer agent.
Args:
agent_id (str): Unique agent identifier
llm_model: Language model for synthesis
tokenizer: Tokenizer for the language model
device: Device for model inference
"""
capabilities = [
AgentCapability(
name="content_synthesis",
description="Synthesize multiple sources into coherent response",
input_types=["research_results", "analysis", "facts"],
output_types=["synthesis", "summary", "integrated_response"]
),
AgentCapability(
name="response_generation",
description="Generate final responses based on synthesized information",
input_types=["synthesis", "query"],
output_types=["response", "formatted_answer"]
),
AgentCapability(
name="information_integration",
description="Integrate information from multiple agents",
input_types=["multiple_sources"],
output_types=["integrated_information"]
)
]
super().__init__(agent_id, AgentRole.SYNTHESIZER, capabilities)
self.llm_model = llm_model
self.tokenizer = tokenizer
self.device = device
self.synthesis_cache = {}
async def process_message(self, message: Message) -> List[Message]:
"""Process synthesis requests and return integrated responses."""
start_time = time.time()
self.add_message_to_history(message)
try:
if message.message_type == "synthesis_request":
data = json.loads(message.content)
synthesis = await self._synthesize_information(data)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=synthesis,
message_type="synthesis_results",
metadata={
"sources_count": len(data.get("research_results", [])),
"synthesis_time": time.time() - start_time
}
)
self._update_response_time(time.time() - start_time)
return [response]
elif message.message_type == "integration_request":
data = json.loads(message.content)
integration = await self._integrate_sources(data)
response = Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps(integration),
message_type="integration_results",
metadata={"integration_time": time.time() - start_time}
)
return [response]
except Exception as e:
error_response = Message(
sender=self.agent_id,
recipient=message.sender,
content=f"Synthesis error: {str(e)}",
message_type="error_response",
metadata={"error_time": time.time() - start_time}
)
self.performance_metrics["success_rate"] *= 0.95
return [error_response]
return []
async def _synthesize_information(self, data: Dict[str, Any]) -> str:
"""Synthesize information from multiple sources."""
# Check cache
cache_key = str(hash(json.dumps(data, sort_keys=True)))
if cache_key in self.synthesis_cache:
return self.synthesis_cache[cache_key]
await asyncio.sleep(0.15) # Simulate processing time
query = data.get("query", "")
research_results = data.get("research_results", [])
analysis_results = data.get("analysis_results", {})
# Prepare synthesis content
research_text = ""
if research_results:
research_text = "Research Findings:\n"
for i, result in enumerate(research_results[:5]):
if isinstance(result, dict):
content = result.get("content", "")
source = result.get("source", "unknown")
research_text += f"{i+1}. {content} (Source: {source})\n"
else:
research_text += f"{i+1}. {str(result)}\n"
analysis_text = ""
if analysis_results and isinstance(analysis_results, dict):
analysis_content = analysis_results.get("detailed_analysis", "")
if analysis_content:
analysis_text = f"Analysis Insights:\n{analysis_content}\n"
synthesis_result = ""
if self.llm_model and self.tokenizer:
try:
# Construct synthesis prompt
prompt = f"""Based on the following research and analysis, provide a comprehensive and well-structured answer to the user's question.
User Question: {query}
{research_text}
{analysis_text}
Please synthesize this information into a clear, informative response that:
1. Directly addresses the user's question
2. Integrates key findings from research and analysis
3. Provides a logical flow of information
4. Includes relevant details and context
Synthesized Response:"""
# Generate synthesis using LLM
inputs = self.tokenizer.encode(
prompt,
return_tensors='pt',
truncation=True,
max_length=900
)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.llm_model.generate(
inputs,
max_length=inputs.shape[1] + 250,
num_beams=3,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id,
early_stopping=True
)
synthesis_result = self.tokenizer.decode(
outputs[0][inputs.shape[1]:],
skip_special_tokens=True
).strip()
except Exception as e:
synthesis_result = f"I apologize, but I encountered an error synthesizing the information: {str(e)}"
else:
# Fallback synthesis
synthesis_result = f"""Based on the available research and analysis for your question "{query}":
{research_text}
{analysis_text}
This information provides insights into your query, though a more detailed synthesis would require additional processing capabilities."""
# Cache result
self.synthesis_cache[cache_key] = synthesis_result
return synthesis_result
async def _integrate_sources(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Integrate information from multiple sources."""
await asyncio.sleep(0.1)
sources = data.get("sources", [])
integration_type = data.get("type", "comprehensive")
integration_result = {
"integration_type": integration_type,
"sources_processed": len(sources),
"integrated_content": "",
"source_mapping": {},
"confidence": 0.8
}
# Simple integration logic
integrated_content = []
for i, source in enumerate(sources):
if isinstance(source, dict):
content = source.get("content", "")
source_id = source.get("id", f"source_{i}")
integrated_content.append(f"From {source_id}: {content}")
integration_result["source_mapping"][source_id] = i
else:
integrated_content.append(f"Source {i}: {str(source)}")
integration_result["integrated_content"] = "\n".join(integrated_content)
return integration_result
def _update_response_time(self, response_time: float):
"""Update average response time metric."""
current_avg = self.performance_metrics["average_response_time"]
message_count = self.performance_metrics["messages_processed"]
if message_count == 1:
self.performance_metrics["average_response_time"] = response_time
else:
self.performance_metrics["average_response_time"] = (
(current_avg * (message_count - 1) + response_time) / message_count
)
def can_handle(self, task_type: str) -> bool:
"""Check if agent can handle specific task types."""
return task_type in [
"synthesis", "summarization", "response_generation", "integration",
"information_fusion", "content_combination", "final_response"
]
class CoordinatorAgent(Agent):
"""
Coordinator agent that orchestrates the multi-agent workflow.
This agent manages task distribution, coordinates agent interactions,
and ensures proper workflow execution across the system.
"""
def __init__(self, agent_id: str, agents: Dict[str, Agent]):
"""
Initialize coordinator agent.
Args:
agent_id (str): Unique agent identifier
agents (Dict[str, Agent]): Dictionary of available agents
"""
capabilities = [
AgentCapability(
name="task_orchestration",
description="Coordinate tasks between multiple agents",
input_types=["user_query"],
output_types=["final_response"]
),
AgentCapability(
name="agent_management",
description="Manage agent interactions and workflow",
input_types=["agent_responses"],
output_types=["coordination_decisions"]
),
AgentCapability(
name="workflow_optimization",
description="Optimize workflow based on task requirements",
input_types=["task_analysis"],
output_types=["optimized_workflow"]
)
]
super().__init__(agent_id, AgentRole.COORDINATOR, capabilities)
self.agents = agents
self.active_workflows: Dict[str, Dict[str, Any]] = {}
self.workflow_templates = self._initialize_workflow_templates()
def _initialize_workflow_templates(self) -> Dict[str, List[str]]:
"""Initialize workflow templates for different query types."""
return {
"research_query": ["research", "analysis", "synthesis"],
"factual_query": ["research", "fact_check", "synthesis"],
"analytical_query": ["research", "analysis", "reasoning", "synthesis"],
"general_query": ["research", "synthesis"],
"complex_query": ["research", "analysis", "reasoning", "synthesis", "review"]
}
async def process_message(self, message: Message) -> List[Message]:
"""Process user queries and coordinate multi-agent responses."""
start_time = time.time()
self.add_message_to_history(message)
try:
if message.message_type == "user_query":
response = await self._coordinate_response(message.content, message.id)
return [Message(
sender=self.agent_id,
recipient=message.sender,
content=response,
message_type="final_response",
metadata={
"workflow_id": message.id,
"total_time": time.time() - start_time,
"agents_involved": self._get_agents_involved(message.id)
}
)]
elif message.message_type == "workflow_status_request":
workflow_id = message.content
status = self.get_workflow_status(workflow_id)
return [Message(
sender=self.agent_id,
recipient=message.sender,
content=json.dumps(status),
message_type="workflow_status_response"
)]
except Exception as e:
error_response = Message(
sender=self.agent_id,
recipient=message.sender,
content=f"Coordination error: {str(e)}",
message_type="error_response",
metadata={"error_time": time.time() - start_time}
)
self.performance_metrics["success_rate"] *= 0.9
return [error_response]
return []
async def _coordinate_response(self, query: str, workflow_id: str) -> str:
"""Coordinate multi-agent response to user query."""
try:
# Initialize workflow
self.active_workflows[workflow_id] = {
"query": query,
"status": "active",
"results": {},
"start_time": time.time(),
"steps_completed": [],
"current_step": "initialization"
}
# Determine workflow type
workflow_type = self._determine_workflow_type(query)
workflow_steps = self.workflow_templates.get(workflow_type, ["research", "synthesis"])
self.active_workflows[workflow_id]["workflow_type"] = workflow_type
self.active_workflows[workflow_id]["planned_steps"] = workflow_steps
# Execute workflow steps
research_results = []
analysis_results = {}
if "research" in workflow_steps:
self.active_workflows[workflow_id]["current_step"] = "research"
research_results = await self._request_research(query)
self.active_workflows[workflow_id]["results"]["research"] = research_results
self.active_workflows[workflow_id]["steps_completed"].append("research")
if "analysis" in workflow_steps and research_results:
self.active_workflows[workflow_id]["current_step"] = "analysis"
analysis_results = await self._request_analysis(research_results, query)
self.active_workflows[workflow_id]["results"]["analysis"] = analysis_results
self.active_workflows[workflow_id]["steps_completed"].append("analysis")
if "reasoning" in workflow_steps:
self.active_workflows[workflow_id]["current_step"] = "reasoning"
# Additional reasoning step could be implemented here
self.active_workflows[workflow_id]["steps_completed"].append("reasoning")
if "synthesis" in workflow_steps:
self.active_workflows[workflow_id]["current_step"] = "synthesis"
final_response = await self._request_synthesis(query, research_results, analysis_results)
self.active_workflows[workflow_id]["results"]["synthesis"] = final_response
self.active_workflows[workflow_id]["steps_completed"].append("synthesis")
else:
final_response = "Unable to generate synthesis - synthesizer not available."
# Mark workflow complete
self.active_workflows[workflow_id]["status"] = "complete"
self.active_workflows[workflow_id]["end_time"] = time.time()
self.active_workflows[workflow_id]["current_step"] = "completed"
return final_response
except Exception as e:
if workflow_id in self.active_workflows:
self.active_workflows[workflow_id]["status"] = "error"
self.active_workflows[workflow_id]["error"] = str(e)
return f"I apologize, but I encountered an error coordinating the response: {str(e)}"
def _determine_workflow_type(self, query: str) -> str:
"""Determine appropriate workflow type based on query characteristics."""
query_lower = query.lower()
# Simple heuristics for workflow type determination
if any(word in query_lower for word in ["analyze", "analysis", "why", "how", "explain"]):
return "analytical_query"
elif any(word in query_lower for word in ["fact", "when", "where", "who", "what"]):
return "factual_query"
elif len(query.split()) > 15 or "?" in query:
return "complex_query"
elif any(word in query_lower for word in ["research", "find", "search", "information"]):
return "research_query"
else:
return "general_query"
async def _request_research(self, query: str) -> List[Dict[str, Any]]:
"""Request research from research agent."""
researcher = self._find_agent_by_role(AgentRole.RESEARCHER)
if not researcher:
return []
research_message = Message(
sender=self.agent_id,
recipient=researcher.agent_id,
content=query,
message_type="research_request"
)
responses = await researcher.process_message(research_message)
if responses and responses[0].message_type != "error_response":
try:
return json.loads(responses[0].content)
except json.JSONDecodeError:
return []
return []
async def _request_analysis(self, research_results: List[Dict[str, Any]], query: str) -> Dict[str, Any]:
"""Request analysis from analyzer agent."""
analyzer = self._find_agent_by_role(AgentRole.ANALYZER)
if not analyzer:
return {}
analysis_data = {
"content": research_results,
"type": "research_analysis",
"query": query
}
analysis_message = Message(
sender=self.agent_id,
recipient=analyzer.agent_id,
content=json.dumps(analysis_data),
message_type="analysis_request"
)
responses = await analyzer.process_message(analysis_message)
if responses and responses[0].message_type != "error_response":
try:
return json.loads(responses[0].content)
except json.JSONDecodeError:
return {}
return {}
async def _request_synthesis(self, query: str, research_results: List[Dict[str, Any]],
analysis_results: Dict[str, Any]) -> str:
"""Request synthesis from synthesizer agent."""
synthesizer = self._find_agent_by_role(AgentRole.SYNTHESIZER)
if not synthesizer:
return "Unable to synthesize response - synthesizer agent not available."
synthesis_data = {
"query": query,
"research_results": research_results,
"analysis_results": analysis_results
}
synthesis_message = Message(
sender=self.agent_id,
recipient=synthesizer.agent_id,
content=json.dumps(synthesis_data),
message_type="synthesis_request"
)
responses = await synthesizer.process_message(synthesis_message)
if responses and responses[0].message_type != "error_response":
return responses[0].content
return "Unable to generate synthesis."
def _find_agent_by_role(self, role: AgentRole) -> Optional[Agent]:
"""Find agent by role."""
for agent in self.agents.values():
if agent.role == role:
return agent
return None
def _get_agents_involved(self, workflow_id: str) -> List[str]:
"""Get list of agents involved in a workflow."""
if workflow_id not in self.active_workflows:
return []
workflow = self.active_workflows[workflow_id]
steps_completed = workflow.get("steps_completed", [])
agents_involved = []
if "research" in steps_completed:
agents_involved.append("researcher")
if "analysis" in steps_completed:
agents_involved.append("analyzer")
if "synthesis" in steps_completed:
agents_involved.append("synthesizer")
return agents_involved
def can_handle(self, task_type: str) -> bool:
"""Check if coordinator can handle specific task types."""
return task_type in [
"coordination", "orchestration", "workflow_management",
"task_distribution", "agent_management"
]
def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
"""Get status of a specific workflow."""
if workflow_id not in self.active_workflows:
return {"status": "not_found"}
workflow = self.active_workflows[workflow_id]
return {
"workflow_id": workflow_id,
"status": workflow.get("status", "unknown"),
"current_step": workflow.get("current_step", "unknown"),
"steps_completed": workflow.get("steps_completed", []),
"planned_steps": workflow.get("planned_steps", []),
"start_time": workflow.get("start_time"),
"end_time": workflow.get("end_time"),
"results_available": list(workflow.get("results", {}).keys())
}
def get_system_status(self) -> Dict[str, Any]:
"""Get overall system status."""
agent_status = {}
for agent_id, agent in self.agents.items():
agent_status[agent_id] = {
"role": agent.role.value,
"is_active": agent.is_active,
"performance": agent.get_performance_metrics()
}
return {
"total_agents": len(self.agents),
"active_workflows": len([w for w in self.active_workflows.values() if w.get("status") == "active"]),
"completed_workflows": len([w for w in self.active_workflows.values() if w.get("status") == "complete"]),
"agent_status": agent_status,
"coordinator_performance": self.get_performance_metrics()
}
class MultiAgentSystem:
"""
Complete multi-agent system implementation.
This class orchestrates multiple specialized agents to provide
comprehensive responses through collaborative problem-solving.
"""
def __init__(self, llm_model=None, tokenizer=None, device=None, knowledge_base=None):
"""
Initialize multi-agent system.
Args:
llm_model: Language model for agents
tokenizer: Tokenizer for the language model
device: Device for model inference
knowledge_base: Knowledge base for research
"""
print("Initializing Multi-Agent System...")
self.llm_model = llm_model
self.tokenizer = tokenizer
self.device = device
self.knowledge_base = knowledge_base
# Initialize agents
self.agents = {}
self._initialize_agents()
# Initialize coordinator
self.coordinator = CoordinatorAgent("coordinator_001", self.agents)
self.agents["coordinator"] = self.coordinator
print(f"Multi-Agent System initialized with {len(self.agents)} agents")
def _initialize_agents(self):
"""Initialize all specialized agents."""
# Research Agent
self.agents["researcher"] = ResearchAgent(
"researcher_001",
self.knowledge_base
)
# Analyzer Agent
self.agents["analyzer"] = AnalyzerAgent(
"analyzer_001",
self.llm_model,
self.tokenizer,
self.device
)
# Synthesizer Agent
self.agents["synthesizer"] = SynthesizerAgent(
"synthesizer_001",
self.llm_model,
self.tokenizer,
self.device
)
async def process_query(self, query: str) -> Dict[str, Any]:
"""
Process user query through multi-agent system.
Args:
query (str): User query
Returns:
Dict[str, Any]: Response with metadata
"""
start_time = time.time()
# Create user message
user_message = Message(
sender="user",
recipient="coordinator",
content=query,
message_type="user_query"
)
# Process through coordinator
responses = await self.coordinator.process_message(user_message)
if responses:
response = responses[0]
workflow_status = self.coordinator.get_workflow_status(user_message.id)
return {
"response": response.content,
"workflow_id": user_message.id,
"total_time": time.time() - start_time,
"workflow_status": workflow_status,
"agents_involved": workflow_status.get("results_available", []),
"success": response.message_type != "error_response"
}
return {
"response": "No response generated",
"workflow_id": user_message.id,
"total_time": time.time() - start_time,
"workflow_status": {},
"agents_involved": [],
"success": False
}
def get_system_status(self) -> Dict[str, Any]:
"""Get comprehensive system status."""
return self.coordinator.get_system_status()
def get_agent_performance(self) -> Dict[str, Dict[str, Any]]:
"""Get performance metrics for all agents."""
performance = {}
for agent_id, agent in self.agents.items():
performance[agent_id] = {
"role": agent.role.value,
"metrics": agent.get_performance_metrics(),
"capabilities": [cap.name for cap in agent.capabilities]
}
return performance
def run_interactive_session(self):
"""Run interactive multi-agent session."""
print("\n" + "=" * 70)
print("MULTI-AGENT SYSTEM - Interactive Session")
print("=" * 70)
print("Commands:")
print(" 'quit' or 'exit' - End session")
print(" 'status' - Show system status")
print(" 'performance' - Show agent performance")
print(" 'agents' - List all agents")
print("=" * 70)
async def run_session():
while True:
try:
user_input = input(f"\nYou: ").strip()
if user_input.lower() in ['quit', 'exit', 'bye']:
print("\nMulti-Agent System: Thank you for using the system! Goodbye!")
break
elif user_input.lower() == 'status':
status = self.get_system_status()
print(f"\nSystem Status:")
print(f" Total agents: {status['total_agents']}")
print(f" Active workflows: {status['active_workflows']}")
print(f" Completed workflows: {status['completed_workflows']}")
continue
elif user_input.lower() == 'performance':
performance = self.get_agent_performance()
print(f"\nAgent Performance:")
for agent_id, perf in performance.items():
metrics = perf['metrics']
print(f" {agent_id} ({perf['role']}):")
print(f" Messages processed: {metrics['messages_processed']}")
print(f" Avg response time: {metrics['average_response_time']:.3f}s")
print(f" Success rate: {metrics['success_rate']:.2%}")
continue
elif user_input.lower() == 'agents':
print(f"\nAvailable Agents:")
for agent_id, agent in self.agents.items():
print(f" {agent_id}: {agent.role.value}")
for cap in agent.capabilities:
print(f" - {cap.name}: {cap.description}")
continue
if not user_input:
continue
# Process query
print("Multi-Agent System: Processing your request...", flush=True)
result = await self.process_query(user_input)
print(f"\nResponse: {result['response']}")
# Show metadata
if result['success']:
print(f"\nProcessing Details:")
print(f" Workflow ID: {result['workflow_id']}")
print(f" Total time: {result['total_time']:.2f}s")
print(f" Agents involved: {', '.join(result['agents_involved'])}")
except KeyboardInterrupt:
print("\n\nSession interrupted. Goodbye!")
break
except Exception as e:
print(f"\nError: {e}")
# Run async session
asyncio.run(run_session())
def create_sample_knowledge_base_for_mas():
"""Create sample knowledge base for multi-agent system demonstration."""
sample_documents = [
"""
Artificial Intelligence has revolutionized various industries through machine learning,
natural language processing, and computer vision. Companies like Google, Microsoft,
and OpenAI are leading the development of AI technologies. Machine learning algorithms
can analyze vast amounts of data to identify patterns and make predictions. Deep learning,
a subset of machine learning, uses neural networks with multiple layers to process
complex data structures.
""",
"""
Climate change represents one of the most significant challenges of our time. Rising
global temperatures are causing ice caps to melt, sea levels to rise, and weather
patterns to become more extreme. Renewable energy sources like solar, wind, and
hydroelectric power offer sustainable alternatives to fossil fuels. Many countries
are implementing carbon reduction policies and investing in green technologies to
combat climate change.
""",
"""
Space exploration has entered a new era with private companies like SpaceX, Blue Origin,
and Virgin Galactic joining traditional space agencies. The International Space Station
serves as a platform for scientific research and international cooperation. Mars
exploration missions are planned by NASA, ESA, and other space agencies. The James
Webb Space Telescope is providing unprecedented views of distant galaxies and
exoplanets.
""",
"""
Quantum computing represents a paradigm shift in computational power. Unlike classical
computers that use bits, quantum computers use quantum bits (qubits) that can exist
in multiple states simultaneously. Companies like IBM, Google, and Rigetti are
developing quantum processors. Quantum computers could revolutionize cryptography,
drug discovery, and optimization problems that are intractable for classical computers.
""",
"""
Biotechnology and genetic engineering are transforming medicine and agriculture.
CRISPR-Cas9 gene editing technology allows precise modifications to DNA sequences.
Personalized medicine uses genetic information to tailor treatments to individual
patients. Synthetic biology combines engineering principles with biological systems
to create new organisms and biological functions. These advances raise important
ethical considerations about genetic modification and privacy.
"""
]
sources = [
"AI_Technology_Overview",
"Climate_Change_Report",
"Space_Exploration_Update",
"Quantum_Computing_Guide",
"Biotechnology_Advances"
]
return sample_documents, sources
def main():
"""
Main function demonstrating multi-agent system functionality.
"""
try:
# Initialize components
print("Setting up Multi-Agent System...")
# Initialize device
if torch.cuda.is_available():
device = torch.device("cuda")
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
print(f"Using device: {device}")
# Initialize LLM
print("Loading language model...")
tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")
model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model.to(device)
model.eval()
# Initialize knowledge base
print("Creating knowledge base...")
from chapter_b_rag import KnowledgeBase # Reuse from previous chapter
knowledge_base = KnowledgeBase()
documents, sources = create_sample_knowledge_base_for_mas()
knowledge_base.add_documents(documents, sources)
# Initialize multi-agent system
mas = MultiAgentSystem(
llm_model=model,
tokenizer=tokenizer,
device=device,
knowledge_base=knowledge_base
)
# Show system status
status = mas.get_system_status()
print(f"\nSystem ready with {status['total_agents']} agents:")
for agent_id, agent_info in status['agent_status'].items():
print(f" {agent_id}: {agent_info['role']}")
# Run interactive session
mas.run_interactive_session()
except Exception as e:
print(f"Failed to initialize Multi-Agent System: {e}")
print("Please check your installation and try again.")
if __name__ == "__main__":
main()
This comprehensive multi-agent system implementation demonstrates the power of collaborative AI through specialized agents working together. The system provides superior problem-solving capabilities by leveraging the strengths of different agents while maintaining coordination through a central orchestrator.
CHAPTER E: IMPLEMENTING AGENTIC AI
Agentic AI represents the pinnacle of autonomous artificial intelligence systems, characterized by agents that can independently plan, execute, and adapt their actions to achieve complex goals. Unlike traditional reactive systems that respond to specific inputs, agentic AI systems demonstrate proactive behavior, goal-oriented reasoning, and the ability to learn and improve from experience.
The fundamental distinction of agentic AI lies in its autonomous decision-making capabilities. These systems can formulate plans, execute actions, monitor progress, and adapt strategies based on changing circumstances. Agentic AI combines elements of planning algorithms, reinforcement learning, and multi-step reasoning to create intelligent agents capable of operating independently in complex environments.
The rationale for implementing agentic AI stems from the need for systems that can handle open-ended, long-term tasks without constant human supervision. Traditional chatbots and even multi-agent systems typically require explicit instructions for each interaction. Agentic AI systems can pursue high-level objectives through autonomous planning and execution, making them suitable for complex workflows, research tasks, and creative problem-solving.
Agentic AI architecture incorporates several sophisticated components working in harmony. Goal management systems define and prioritize objectives. Planning engines decompose complex goals into actionable steps. Execution engines carry out planned actions while monitoring progress. Learning mechanisms enable agents to improve performance over time. Memory systems maintain context and experience across interactions.
The implementation of agentic AI requires advanced frameworks that support autonomous reasoning, planning, and execution. We will build upon our previous multi-agent foundation while adding goal-oriented behavior, planning capabilities, and autonomous execution mechanisms.
Setting up agentic AI extends our existing infrastructure with planning and goal management capabilities:
import asyncio
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Any, Optional, Callable, Union
import json
import time
import uuid
from collections import deque
import heapq
class GoalStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
CANCELLED = "cancelled"
class ActionType(Enum):
RESEARCH = "research"
ANALYZE = "analyze"
SYNTHESIZE = "synthesize"
COMMUNICATE = "communicate"
PLAN = "plan"
EXECUTE = "execute"
MONITOR = "monitor"
LEARN = "learn"
@dataclass
class Goal:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
description: str = ""
priority: int = 1 # 1 = highest priority
status: GoalStatus = GoalStatus.PENDING
parent_goal_id: Optional[str] = None
sub_goals: List[str] = field(default_factory=list)
required_capabilities: List[str] = field(default_factory=list)
success_criteria: Dict[str, Any] = field(default_factory=dict)
deadline: Optional[float] = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Action:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
type: ActionType = ActionType.EXECUTE
description: str = ""
parameters: Dict[str, Any] = field(default_factory=dict)
prerequisites: List[str] = field(default_factory=list)
expected_outcome: str = ""
estimated_duration: float = 0.0
actual_duration: Optional[float] = None
status: str = "pending"
result: Optional[Dict[str, Any]] = None
created_at: float = field(default_factory=time.time)
@dataclass
class Plan:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
goal_id: str = ""
actions: List[Action] = field(default_factory=list)
execution_order: List[str] = field(default_factory=list)
status: str = "draft"
confidence: float = 0.0
estimated_total_time: float = 0.0
created_at: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
The goal management system provides the foundation for autonomous behavior by defining objectives, tracking progress, and managing goal hierarchies:
class GoalManager:
def __init__(self):
self.goals: Dict[str, Goal] = {}
self.goal_hierarchy: Dict[str, List[str]] = {}
self.active_goals: List[str] = []
self.completed_goals: List[str] = []
self.goal_priorities = [] # Min-heap for priority queue
def create_goal(self, description: str, priority: int = 1,
parent_goal_id: Optional[str] = None,
required_capabilities: List[str] = None,
success_criteria: Dict[str, Any] = None,
deadline: Optional[float] = None) -> Goal:
"""Create a new goal with specified parameters."""
goal = Goal(
description=description,
priority=priority,
parent_goal_id=parent_goal_id,
required_capabilities=required_capabilities or [],
success_criteria=success_criteria or {},
deadline=deadline
)
self.goals[goal.id] = goal
# Update goal hierarchy
if parent_goal_id:
if parent_goal_id not in self.goal_hierarchy:
self.goal_hierarchy[parent_goal_id] = []
self.goal_hierarchy[parent_goal_id].append(goal.id)
# Add to parent's sub_goals
if parent_goal_id in self.goals:
self.goals[parent_goal_id].sub_goals.append(goal.id)
# Add to priority queue
heapq.heappush(self.goal_priorities, (priority, time.time(), goal.id))
return goal
def update_goal_status(self, goal_id: str, status: GoalStatus,
metadata: Dict[str, Any] = None):
"""Update goal status and metadata."""
if goal_id in self.goals:
goal = self.goals[goal_id]
goal.status = status
goal.updated_at = time.time()
if metadata:
goal.metadata.update(metadata)
# Update tracking lists
if status == GoalStatus.IN_PROGRESS and goal_id not in self.active_goals:
self.active_goals.append(goal_id)
elif status == GoalStatus.COMPLETED:
if goal_id in self.active_goals:
self.active_goals.remove(goal_id)
if goal_id not in self.completed_goals:
self.completed_goals.append(goal_id)
def get_next_goal(self) -> Optional[Goal]:
"""Get the next highest priority goal to work on."""
while self.goal_priorities:
priority, timestamp, goal_id = heapq.heappop(self.goal_priorities)
if goal_id in self.goals:
goal = self.goals[goal_id]
if goal.status == GoalStatus.PENDING:
return goal
return None
def decompose_goal(self, goal_id: str, max_sub_goals: int = 5) -> List[Goal]:
"""Decompose a complex goal into smaller sub-goals."""
if goal_id not in self.goals:
return []
goal = self.goals[goal_id]
sub_goals = []
# Simple goal decomposition logic
if "research" in goal.description.lower():
sub_goals.extend([
self.create_goal(
f"Gather information for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["research"]
),
self.create_goal(
f"Analyze findings for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["analysis"]
)
])
if "analyze" in goal.description.lower():
sub_goals.append(
self.create_goal(
f"Synthesize analysis for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["synthesis"]
)
)
return sub_goals[:max_sub_goals]
def get_goal_progress(self, goal_id: str) -> Dict[str, Any]:
"""Get progress information for a goal."""
if goal_id not in self.goals:
return {}
goal = self.goals[goal_id]
sub_goal_ids = self.goal_hierarchy.get(goal_id, [])
if not sub_goal_ids:
return {
"goal_id": goal_id,
"status": goal.status.value,
"progress": 1.0 if goal.status == GoalStatus.COMPLETED else 0.0,
"sub_goals": 0
}
completed_sub_goals = sum(
1 for sub_id in sub_goal_ids
if sub_id in self.goals and self.goals[sub_id].status == GoalStatus.COMPLETED
)
progress = completed_sub_goals / len(sub_goal_ids) if sub_goal_ids else 0.0
return {
"goal_id": goal_id,
"status": goal.status.value,
"progress": progress,
"sub_goals": len(sub_goal_ids),
"completed_sub_goals": completed_sub_goals
}
The planning engine creates detailed execution plans for achieving goals, incorporating action sequencing, resource allocation, and contingency planning:
class PlanningEngine:
def __init__(self, available_capabilities: List[str]):
self.available_capabilities = available_capabilities
self.planning_strategies = {
"sequential": self._create_sequential_plan,
"parallel": self._create_parallel_plan,
"adaptive": self._create_adaptive_plan
}
self.action_templates = self._initialize_action_templates()
def _initialize_action_templates(self) -> Dict[str, Dict[str, Any]]:
"""Initialize templates for different action types."""
return {
"research": {
"type": ActionType.RESEARCH,
"estimated_duration": 30.0,
"required_capabilities": ["research", "information_retrieval"]
},
"analyze": {
"type": ActionType.ANALYZE,
"estimated_duration": 45.0,
"required_capabilities": ["analysis", "reasoning"]
},
"synthesize": {
"type": ActionType.SYNTHESIZE,
"estimated_duration": 20.0,
"required_capabilities": ["synthesis", "integration"]
},
"communicate": {
"type": ActionType.COMMUNICATE,
"estimated_duration": 10.0,
"required_capabilities": ["communication", "formatting"]
}
}
def create_plan(self, goal: Goal, strategy: str = "adaptive") -> Plan:
"""Create an execution plan for a given goal."""
if strategy not in self.planning_strategies:
strategy = "adaptive"
planning_function = self.planning_strategies[strategy]
return planning_function(goal)
def _create_sequential_plan(self, goal: Goal) -> Plan:
"""Create a sequential execution plan."""
actions = []
execution_order = []
# Determine required actions based on goal
required_actions = self._analyze_goal_requirements(goal)
for i, action_type in enumerate(required_actions):
action = self._create_action(action_type, goal, f"step_{i+1}")
actions.append(action)
execution_order.append(action.id)
# Set prerequisites for sequential execution
for i in range(1, len(actions)):
actions[i].prerequisites = [actions[i-1].id]
plan = Plan(
goal_id=goal.id,
actions=actions,
execution_order=execution_order,
status="ready",
confidence=0.8,
estimated_total_time=sum(action.estimated_duration for action in actions)
)
return plan
def _create_parallel_plan(self, goal: Goal) -> Plan:
"""Create a parallel execution plan where possible."""
actions = []
execution_order = []
required_actions = self._analyze_goal_requirements(goal)
# Group actions that can run in parallel
parallel_groups = self._group_parallel_actions(required_actions)
for group_index, action_group in enumerate(parallel_groups):
group_actions = []
for action_type in action_group:
action = self._create_action(action_type, goal, f"group_{group_index}")
actions.append(action)
group_actions.append(action.id)
# Actions in the same group can run in parallel
execution_order.extend(group_actions)
# Set prerequisites between groups
if group_index > 0:
prev_group_actions = [a.id for a in actions if f"group_{group_index-1}" in a.description]
for action in group_actions:
action_obj = next(a for a in actions if a.id == action)
action_obj.prerequisites = prev_group_actions
plan = Plan(
goal_id=goal.id,
actions=actions,
execution_order=execution_order,
status="ready",
confidence=0.7,
estimated_total_time=max(
sum(action.estimated_duration for action in group)
for group in parallel_groups
) if parallel_groups else 0.0
)
return plan
def _create_adaptive_plan(self, goal: Goal) -> Plan:
"""Create an adaptive plan that can be modified during execution."""
# Start with sequential plan
base_plan = self._create_sequential_plan(goal)
# Add monitoring and adaptation actions
monitor_action = Action(
type=ActionType.MONITOR,
description=f"Monitor progress for goal: {goal.description}",
parameters={"goal_id": goal.id, "check_interval": 30.0},
estimated_duration=5.0
)
adapt_action = Action(
type=ActionType.PLAN,
description=f"Adapt plan if needed for goal: {goal.description}",
parameters={"goal_id": goal.id, "adaptation_threshold": 0.3},
estimated_duration=15.0
)
base_plan.actions.extend([monitor_action, adapt_action])
base_plan.confidence = 0.9
base_plan.metadata["adaptive"] = True
return base_plan
def _analyze_goal_requirements(self, goal: Goal) -> List[str]:
"""Analyze goal to determine required actions."""
required_actions = []
description_lower = goal.description.lower()
# Simple rule-based action determination
if any(word in description_lower for word in ["research", "find", "search", "investigate"]):
required_actions.append("research")
if any(word in description_lower for word in ["analyze", "examine", "study", "evaluate"]):
required_actions.append("analyze")
if any(word in description_lower for word in ["synthesize", "combine", "integrate", "summarize"]):
required_actions.append("synthesize")
if any(word in description_lower for word in ["communicate", "report", "present", "explain"]):
required_actions.append("communicate")
# Default actions if none detected
if not required_actions:
required_actions = ["research", "analyze", "synthesize"]
return required_actions
def _group_parallel_actions(self, actions: List[str]) -> List[List[str]]:
"""Group actions that can be executed in parallel."""
# Simple grouping logic - research can be parallel, others sequential
groups = []
current_group = []
for action in actions:
if action == "research" and not current_group:
current_group.append(action)
elif action == "research" and current_group and all(a == "research" for a in current_group):
current_group.append(action)
else:
if current_group:
groups.append(current_group)
current_group = [action]
if current_group:
groups.append(current_group)
return groups
def _create_action(self, action_type: str, goal: Goal, context: str) -> Action:
"""Create an action based on type and context."""
template = self.action_templates.get(action_type, {})
action = Action(
type=template.get("type", ActionType.EXECUTE),
description=f"{action_type.title()} action for goal: {goal.description} ({context})",
parameters={
"goal_id": goal.id,
"action_type": action_type,
"context": context
},
expected_outcome=f"Completed {action_type} for {goal.description}",
estimated_duration=template.get("estimated_duration", 30.0)
)
return action
The autonomous execution engine carries out planned actions while monitoring progress and adapting to changing circumstances:
class AutonomousExecutor:
def __init__(self, agents: Dict[str, Any], goal_manager: GoalManager,
planning_engine: PlanningEngine):
self.agents = agents
self.goal_manager = goal_manager
self.planning_engine = planning_engine
self.active_plans: Dict[str, Plan] = {}
self.execution_history: List[Dict[str, Any]] = []
self.learning_data: Dict[str, List[float]] = {}
async def execute_goal(self, goal_id: str) -> Dict[str, Any]:
"""Execute a goal autonomously through planning and action execution."""
if goal_id not in self.goal_manager.goals:
return {"success": False, "error": "Goal not found"}
goal = self.goal_manager.goals[goal_id]
try:
# Update goal status
self.goal_manager.update_goal_status(goal_id, GoalStatus.IN_PROGRESS)
# Create execution plan
plan = self.planning_engine.create_plan(goal)
self.active_plans[goal_id] = plan
# Execute plan
execution_result = await self._execute_plan(plan)
# Update goal status based on execution result
if execution_result["success"]:
self.goal_manager.update_goal_status(
goal_id,
GoalStatus.COMPLETED,
{"completion_time": time.time(), "execution_result": execution_result}
)
else:
self.goal_manager.update_goal_status(
goal_id,
GoalStatus.FAILED,
{"failure_time": time.time(), "failure_reason": execution_result.get("error")}
)
# Record execution for learning
self._record_execution(goal, plan, execution_result)
return execution_result
except Exception as e:
self.goal_manager.update_goal_status(
goal_id,
GoalStatus.FAILED,
{"failure_time": time.time(), "error": str(e)}
)
return {"success": False, "error": str(e)}
async def _execute_plan(self, plan: Plan) -> Dict[str, Any]:
"""Execute a plan by running its actions in order."""
execution_start = time.time()
executed_actions = []
results = {}
try:
for action_id in plan.execution_order:
action = next((a for a in plan.actions if a.id == action_id), None)
if not action:
continue
# Check prerequisites
if not await self._check_prerequisites(action, executed_actions):
return {
"success": False,
"error": f"Prerequisites not met for action {action_id}",
"executed_actions": executed_actions
}
# Execute action
action_result = await self._execute_action(action)
action.result = action_result
action.status = "completed" if action_result.get("success") else "failed"
executed_actions.append(action_id)
results[action_id] = action_result
# Check if action failed and plan should be aborted
if not action_result.get("success") and not plan.metadata.get("adaptive"):
return {
"success": False,
"error": f"Action {action_id} failed: {action_result.get('error')}",
"executed_actions": executed_actions,
"results": results
}
# For adaptive plans, try to recover from failures
if not action_result.get("success") and plan.metadata.get("adaptive"):
recovery_result = await self._attempt_recovery(action, plan)
if recovery_result.get("success"):
action.result = recovery_result
action.status = "recovered"
results[action_id] = recovery_result
execution_time = time.time() - execution_start
return {
"success": True,
"execution_time": execution_time,
"executed_actions": executed_actions,
"results": results,
"plan_id": plan.id
}
except Exception as e:
return {
"success": False,
"error": str(e),
"executed_actions": executed_actions,
"results": results
}
async def _execute_action(self, action: Action) -> Dict[str, Any]:
"""Execute a single action using appropriate agent."""
action_start = time.time()
try:
# Determine which agent to use based on action type
agent = self._select_agent_for_action(action)
if not agent:
return {
"success": False,
"error": f"No suitable agent found for action type {action.type.value}"
}
# Prepare action parameters
action_params = self._prepare_action_parameters(action)
# Execute action through agent
if action.type == ActionType.RESEARCH:
result = await self._execute_research_action(agent, action_params)
elif action.type == ActionType.ANALYZE:
result = await self._execute_analysis_action(agent, action_params)
elif action.type == ActionType.SYNTHESIZE:
result = await self._execute_synthesis_action(agent, action_params)
elif action.type == ActionType.COMMUNICATE:
result = await self._execute_communication_action(agent, action_params)
else:
result = {"success": False, "error": f"Unknown action type: {action.type.value}"}
# Record actual duration
action.actual_duration = time.time() - action_start
return result
except Exception as e:
action.actual_duration = time.time() - action_start
return {"success": False, "error": str(e)}
def _select_agent_for_action(self, action: Action) -> Optional[Any]:
"""Select the most appropriate agent for an action."""
action_type = action.type
if action_type == ActionType.RESEARCH and "researcher" in self.agents:
return self.agents["researcher"]
elif action_type == ActionType.ANALYZE and "analyzer" in self.agents:
return self.agents["analyzer"]
elif action_type == ActionType.SYNTHESIZE and "synthesizer" in self.agents:
return self.agents["synthesizer"]
elif "coordinator" in self.agents:
return self.agents["coordinator"] # Fallback to coordinator
return None
def _prepare_action_parameters(self, action: Action) -> Dict[str, Any]:
"""Prepare parameters for action execution."""
base_params = action.parameters.copy()
base_params.update({
"action_id": action.id,
"action_description": action.description,
"expected_outcome": action.expected_outcome
})
return base_params
async def _execute_research_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute research action through research agent."""
try:
from chapter_d_multiagent import Message # Import from previous chapter
research_message = Message(
sender="executor",
recipient=agent.agent_id,
content=params.get("action_description", ""),
message_type="research_request"
)
responses = await agent.process_message(research_message)
if responses and responses[0].message_type != "error_response":
return {
"success": True,
"data": json.loads(responses[0].content),
"agent_used": agent.agent_id
}
else:
return {
"success": False,
"error": "Research agent failed to provide results"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _execute_analysis_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute analysis action through analyzer agent."""
try:
from chapter_d_multiagent import Message
analysis_data = {
"content": params.get("research_data", []),
"type": "goal_analysis",
"query": params.get("action_description", "")
}
analysis_message = Message(
sender="executor",
recipient=agent.agent_id,
content=json.dumps(analysis_data),
message_type="analysis_request"
)
responses = await agent.process_message(analysis_message)
if responses and responses[0].message_type != "error_response":
return {
"success": True,
"data": json.loads(responses[0].content),
"agent_used": agent.agent_id
}
else:
return {
"success": False,
"error": "Analysis agent failed to provide results"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _execute_synthesis_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute synthesis action through synthesizer agent."""
try:
from chapter_d_multiagent import Message
synthesis_data = {
"query": params.get("action_description", ""),
"research_results": params.get("research_data", []),
"analysis_results": params.get("analysis_data", {})
}
synthesis_message = Message(
sender="executor",
recipient=agent.agent_id,
content=json.dumps(synthesis_data),
message_type="synthesis_request"
)
responses = await agent.process_message(synthesis_message)
if responses and responses[0].message_type != "error_response":
return {
"success": True,
"data": responses[0].content,
"agent_used": agent.agent_id
}
else:
return {
"success": False,
"error": "Synthesis agent failed to provide results"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _execute_communication_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute communication action."""
try:
# Simple communication action - format and present results
content = params.get("synthesis_data", "")
formatted_content = f"Communication Result: {content}"
return {
"success": True,
"data": formatted_content,
"agent_used": "communication_handler"
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _check_prerequisites(self, action: Action, executed_actions: List[str]) -> bool:
"""Check if action prerequisites are satisfied."""
return all(prereq in executed_actions for prereq in action.prerequisites)
async def _attempt_recovery(self, failed_action: Action, plan: Plan) -> Dict[str, Any]:
"""Attempt to recover from action failure."""
# Simple recovery strategy - retry with modified parameters
try:
modified_params = failed_action.parameters.copy()
modified_params["retry"] = True
modified_params["recovery_attempt"] = True
# Create a simplified version of the action
recovery_action = Action(
type=failed_action.type,
description=f"Recovery attempt for: {failed_action.description}",
parameters=modified_params,
estimated_duration=failed_action.estimated_duration * 0.5
)
return await self._execute_action(recovery_action)
except Exception as e:
return {"success": False, "error": f"Recovery failed: {str(e)}"}
def _record_execution(self, goal: Goal, plan: Plan, execution_result: Dict[str, Any]):
"""Record execution data for learning and improvement."""
execution_record = {
"goal_id": goal.id,
"goal_description": goal.description,
"plan_id": plan.id,
"execution_time": execution_result.get("execution_time", 0.0),
"success": execution_result.get("success", False),
"actions_count": len(plan.actions),
"timestamp": time.time()
}
self.execution_history.append(execution_record)
# Update learning data
goal_type = self._classify_goal_type(goal.description)
if goal_type not in self.learning_data:
self.learning_data[goal_type] = []
self.learning_data[goal_type].append(execution_result.get("execution_time", 0.0))
def _classify_goal_type(self, description: str) -> str:
"""Classify goal type for learning purposes."""
description_lower = description.lower()
if "research" in description_lower:
return "research_goal"
elif "analyze" in description_lower:
return "analysis_goal"
elif "synthesize" in description_lower:
return "synthesis_goal"
else:
return "general_goal"
def get_execution_statistics(self) -> Dict[str, Any]:
"""Get execution statistics for performance monitoring."""
if not self.execution_history:
return {"total_executions": 0}
total_executions = len(self.execution_history)
successful_executions = sum(1 for record in self.execution_history if record["success"])
avg_execution_time = sum(record["execution_time"] for record in self.execution_history) / total_executions
return {
"total_executions": total_executions,
"successful_executions": successful_executions,
"success_rate": successful_executions / total_executions,
"average_execution_time": avg_execution_time,
"learning_data_points": sum(len(data) for data in self.learning_data.values())
}
COMPLETE RUNNING EXAMPLE FOR AGENTIC AI:
import asyncio
import json
import time
import uuid
import heapq
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Any, Optional, Union
from collections import deque
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import warnings
warnings.filterwarnings("ignore")
# Import classes from previous chapters
# Note: In a real implementation, these would be proper imports
# For this example, we'll include simplified versions
class GoalStatus(Enum):
"""Enumeration of possible goal statuses."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
CANCELLED = "cancelled"
class ActionType(Enum):
"""Enumeration of action types in the agentic system."""
RESEARCH = "research"
ANALYZE = "analyze"
SYNTHESIZE = "synthesize"
COMMUNICATE = "communicate"
PLAN = "plan"
EXECUTE = "execute"
MONITOR = "monitor"
LEARN = "learn"
@dataclass
class Goal:
"""
Represents a goal in the agentic AI system.
Goals can be hierarchical with parent-child relationships and
include success criteria, deadlines, and priority levels.
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
description: str = ""
priority: int = 1 # 1 = highest priority
status: GoalStatus = GoalStatus.PENDING
parent_goal_id: Optional[str] = None
sub_goals: List[str] = field(default_factory=list)
required_capabilities: List[str] = field(default_factory=list)
success_criteria: Dict[str, Any] = field(default_factory=dict)
deadline: Optional[float] = None
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Action:
"""
Represents an action that can be executed by the agentic system.
Actions have types, parameters, prerequisites, and expected outcomes.
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
type: ActionType = ActionType.EXECUTE
description: str = ""
parameters: Dict[str, Any] = field(default_factory=dict)
prerequisites: List[str] = field(default_factory=list)
expected_outcome: str = ""
estimated_duration: float = 0.0
actual_duration: Optional[float] = None
status: str = "pending"
result: Optional[Dict[str, Any]] = None
created_at: float = field(default_factory=time.time)
@dataclass
class Plan:
"""
Represents an execution plan for achieving a goal.
Plans contain sequences of actions with execution order and metadata.
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
goal_id: str = ""
actions: List[Action] = field(default_factory=list)
execution_order: List[str] = field(default_factory=list)
status: str = "draft"
confidence: float = 0.0
estimated_total_time: float = 0.0
created_at: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
class GoalManager:
"""
Manages goals in the agentic AI system.
This class handles goal creation, status updates, hierarchical relationships,
and priority-based goal selection for autonomous execution.
"""
def __init__(self):
"""Initialize the goal manager."""
self.goals: Dict[str, Goal] = {}
self.goal_hierarchy: Dict[str, List[str]] = {}
self.active_goals: List[str] = []
self.completed_goals: List[str] = []
self.goal_priorities = [] # Min-heap for priority queue
self.goal_statistics = {
"total_created": 0,
"total_completed": 0,
"total_failed": 0
}
def create_goal(self, description: str, priority: int = 1,
parent_goal_id: Optional[str] = None,
required_capabilities: List[str] = None,
success_criteria: Dict[str, Any] = None,
deadline: Optional[float] = None) -> Goal:
"""
Create a new goal with specified parameters.
Args:
description (str): Goal description
priority (int): Priority level (1 = highest)
parent_goal_id (str): Parent goal ID for hierarchical goals
required_capabilities (List[str]): Required capabilities
success_criteria (Dict[str, Any]): Success criteria
deadline (float): Deadline timestamp
Returns:
Goal: Created goal object
"""
goal = Goal(
description=description,
priority=priority,
parent_goal_id=parent_goal_id,
required_capabilities=required_capabilities or [],
success_criteria=success_criteria or {},
deadline=deadline
)
self.goals[goal.id] = goal
self.goal_statistics["total_created"] += 1
# Update goal hierarchy
if parent_goal_id:
if parent_goal_id not in self.goal_hierarchy:
self.goal_hierarchy[parent_goal_id] = []
self.goal_hierarchy[parent_goal_id].append(goal.id)
# Add to parent's sub_goals
if parent_goal_id in self.goals:
self.goals[parent_goal_id].sub_goals.append(goal.id)
# Add to priority queue
heapq.heappush(self.goal_priorities, (priority, time.time(), goal.id))
print(f"Created goal: {goal.description[:50]}...")
return goal
def update_goal_status(self, goal_id: str, status: GoalStatus,
metadata: Dict[str, Any] = None):
"""
Update goal status and associated metadata.
Args:
goal_id (str): Goal identifier
status (GoalStatus): New status
metadata (Dict[str, Any]): Additional metadata
"""
if goal_id in self.goals:
goal = self.goals[goal_id]
old_status = goal.status
goal.status = status
goal.updated_at = time.time()
if metadata:
goal.metadata.update(metadata)
# Update tracking lists
if status == GoalStatus.IN_PROGRESS and goal_id not in self.active_goals:
self.active_goals.append(goal_id)
elif status == GoalStatus.COMPLETED:
if goal_id in self.active_goals:
self.active_goals.remove(goal_id)
if goal_id not in self.completed_goals:
self.completed_goals.append(goal_id)
self.goal_statistics["total_completed"] += 1
elif status == GoalStatus.FAILED:
if goal_id in self.active_goals:
self.active_goals.remove(goal_id)
self.goal_statistics["total_failed"] += 1
print(f"Goal status updated: {old_status.value} -> {status.value}")
def get_next_goal(self) -> Optional[Goal]:
"""
Get the next highest priority goal to work on.
Returns:
Optional[Goal]: Next goal to execute or None if no pending goals
"""
while self.goal_priorities:
priority, timestamp, goal_id = heapq.heappop(self.goal_priorities)
if goal_id in self.goals:
goal = self.goals[goal_id]
if goal.status == GoalStatus.PENDING:
return goal
return None
def decompose_goal(self, goal_id: str, max_sub_goals: int = 5) -> List[Goal]:
"""
Decompose a complex goal into smaller sub-goals.
Args:
goal_id (str): Goal to decompose
max_sub_goals (int): Maximum number of sub-goals to create
Returns:
List[Goal]: List of created sub-goals
"""
if goal_id not in self.goals:
return []
goal = self.goals[goal_id]
sub_goals = []
print(f"Decomposing goal: {goal.description}")
# Intelligent goal decomposition based on description
description_lower = goal.description.lower()
if "research" in description_lower or "investigate" in description_lower:
sub_goals.append(
self.create_goal(
f"Gather information about: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["research", "information_retrieval"]
)
)
sub_goals.append(
self.create_goal(
f"Analyze gathered information for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["analysis", "reasoning"]
)
)
if "analyze" in description_lower or "study" in description_lower:
sub_goals.append(
self.create_goal(
f"Perform detailed analysis for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["analysis", "pattern_recognition"]
)
)
sub_goals.append(
self.create_goal(
f"Synthesize analysis results for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["synthesis", "integration"]
)
)
if "create" in description_lower or "generate" in description_lower:
sub_goals.append(
self.create_goal(
f"Plan creation process for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["planning", "design"]
)
)
sub_goals.append(
self.create_goal(
f"Execute creation for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["execution", "implementation"]
)
)
# Default decomposition if no specific patterns found
if not sub_goals:
sub_goals.extend([
self.create_goal(
f"Research phase for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["research"]
),
self.create_goal(
f"Analysis phase for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["analysis"]
),
self.create_goal(
f"Synthesis phase for: {goal.description}",
priority=goal.priority + 1,
parent_goal_id=goal_id,
required_capabilities=["synthesis"]
)
])
print(f"Created {len(sub_goals)} sub-goals")
return sub_goals[:max_sub_goals]
def get_goal_progress(self, goal_id: str) -> Dict[str, Any]:
"""
Get progress information for a goal.
Args:
goal_id (str): Goal identifier
Returns:
Dict[str, Any]: Progress information
"""
if goal_id not in self.goals:
return {}
goal = self.goals[goal_id]
sub_goal_ids = self.goal_hierarchy.get(goal_id, [])
if not sub_goal_ids:
return {
"goal_id": goal_id,
"status": goal.status.value,
"progress": 1.0 if goal.status == GoalStatus.COMPLETED else 0.0,
"sub_goals": 0,
"description": goal.description
}
completed_sub_goals = sum(
1 for sub_id in sub_goal_ids
if sub_id in self.goals and self.goals[sub_id].status == GoalStatus.COMPLETED
)
progress = completed_sub_goals / len(sub_goal_ids) if sub_goal_ids else 0.0
return {
"goal_id": goal_id,
"status": goal.status.value,
"progress": progress,
"sub_goals": len(sub_goal_ids),
"completed_sub_goals": completed_sub_goals,
"description": goal.description
}
def get_statistics(self) -> Dict[str, Any]:
"""Get goal management statistics."""
return {
"total_goals": len(self.goals),
"active_goals": len(self.active_goals),
"completed_goals": len(self.completed_goals),
"pending_goals": len([g for g in self.goals.values() if g.status == GoalStatus.PENDING]),
"statistics": self.goal_statistics
}
class PlanningEngine:
"""
Advanced planning engine for agentic AI systems.
This engine creates detailed execution plans for goals, incorporating
action sequencing, resource allocation, and adaptive planning strategies.
"""
def __init__(self, available_capabilities: List[str]):
"""
Initialize the planning engine.
Args:
available_capabilities (List[str]): Available system capabilities
"""
self.available_capabilities = available_capabilities
self.planning_strategies = {
"sequential": self._create_sequential_plan,
"parallel": self._create_parallel_plan,
"adaptive": self._create_adaptive_plan
}
self.action_templates = self._initialize_action_templates()
self.planning_history = []
def _initialize_action_templates(self) -> Dict[str, Dict[str, Any]]:
"""Initialize templates for different action types."""
return {
"research": {
"type": ActionType.RESEARCH,
"estimated_duration": 30.0,
"required_capabilities": ["research", "information_retrieval"],
"success_indicators": ["data_retrieved", "sources_found"]
},
"analyze": {
"type": ActionType.ANALYZE,
"estimated_duration": 45.0,
"required_capabilities": ["analysis", "reasoning"],
"success_indicators": ["patterns_identified", "insights_generated"]
},
"synthesize": {
"type": ActionType.SYNTHESIZE,
"estimated_duration": 20.0,
"required_capabilities": ["synthesis", "integration"],
"success_indicators": ["information_integrated", "coherent_output"]
},
"communicate": {
"type": ActionType.COMMUNICATE,
"estimated_duration": 10.0,
"required_capabilities": ["communication", "formatting"],
"success_indicators": ["message_formatted", "information_conveyed"]
},
"monitor": {
"type": ActionType.MONITOR,
"estimated_duration": 5.0,
"required_capabilities": ["monitoring", "evaluation"],
"success_indicators": ["progress_tracked", "status_updated"]
}
}
def create_plan(self, goal: Goal, strategy: str = "adaptive") -> Plan:
"""
Create an execution plan for a given goal.
Args:
goal (Goal): Goal to create plan for
strategy (str): Planning strategy to use
Returns:
Plan: Created execution plan
"""
print(f"Creating {strategy} plan for goal: {goal.description[:50]}...")
if strategy not in self.planning_strategies:
print(f"Unknown strategy {strategy}, using adaptive")
strategy = "adaptive"
planning_function = self.planning_strategies[strategy]
plan = planning_function(goal)
# Record planning history
self.planning_history.append({
"goal_id": goal.id,
"strategy": strategy,
"actions_count": len(plan.actions),
"estimated_time": plan.estimated_total_time,
"timestamp": time.time()
})
print(f"Plan created with {len(plan.actions)} actions, estimated time: {plan.estimated_total_time:.1f}s")
return plan
def _create_sequential_plan(self, goal: Goal) -> Plan:
"""Create a sequential execution plan."""
actions = []
execution_order = []
# Determine required actions based on goal
required_actions = self._analyze_goal_requirements(goal)
for i, action_type in enumerate(required_actions):
action = self._create_action(action_type, goal, f"step_{i+1}")
actions.append(action)
execution_order.append(action.id)
# Set prerequisites for sequential execution
for i in range(1, len(actions)):
actions[i].prerequisites = [actions[i-1].id]
plan = Plan(
goal_id=goal.id,
actions=actions,
execution_order=execution_order,
status="ready",
confidence=0.8,
estimated_total_time=sum(action.estimated_duration for action in actions),
metadata={"strategy": "sequential", "parallelizable": False}
)
return plan
def _create_parallel_plan(self, goal: Goal) -> Plan:
"""Create a parallel execution plan where possible."""
actions = []
execution_order = []
required_actions = self._analyze_goal_requirements(goal)
# Group actions that can run in parallel
parallel_groups = self._group_parallel_actions(required_actions)
for group_index, action_group in enumerate(parallel_groups):
group_actions = []
for action_type in action_group:
action = self._create_action(action_type, goal, f"group_{group_index}")
actions.append(action)
group_actions.append(action.id)
# Actions in the same group can run in parallel
execution_order.extend(group_actions)
# Set prerequisites between groups
if group_index > 0:
prev_group_actions = [
a.id for a in actions
if f"group_{group_index-1}" in a.description
]
for action_id in group_actions:
action_obj = next(a for a in actions if a.id == action_id)
action_obj.prerequisites = prev_group_actions
# Calculate estimated time for parallel execution
group_times = []
for group in parallel_groups:
group_time = max(
self.action_templates.get(action_type, {}).get("estimated_duration", 30.0)
for action_type in group
)
group_times.append(group_time)
plan = Plan(
goal_id=goal.id,
actions=actions,
execution_order=execution_order,
status="ready",
confidence=0.7,
estimated_total_time=sum(group_times),
metadata={"strategy": "parallel", "parallelizable": True, "groups": len(parallel_groups)}
)
return plan
def _create_adaptive_plan(self, goal: Goal) -> Plan:
"""Create an adaptive plan that can be modified during execution."""
# Start with sequential plan as base
base_plan = self._create_sequential_plan(goal)
# Add monitoring and adaptation actions
monitor_action = Action(
type=ActionType.MONITOR,
description=f"Monitor progress for goal: {goal.description}",
parameters={
"goal_id": goal.id,
"check_interval": 30.0,
"adaptation_triggers": ["failure", "delay", "opportunity"]
},
estimated_duration=5.0
)
adapt_action = Action(
type=ActionType.PLAN,
description=f"Adapt plan if needed for goal: {goal.description}",
parameters={
"goal_id": goal.id,
"adaptation_threshold": 0.3,
"replan_strategies": ["retry", "alternative", "decompose"]
},
estimated_duration=15.0
)
# Insert monitoring actions between main actions
enhanced_actions = []
enhanced_order = []
for i, action in enumerate(base_plan.actions):
enhanced_actions.append(action)
enhanced_order.append(action.id)
# Add monitoring after each major action
if i < len(base_plan.actions) - 1:
monitor_copy = Action(
type=ActionType.MONITOR,
description=f"Monitor after {action.description}",
parameters=monitor_action.parameters.copy(),
estimated_duration=2.0
)
enhanced_actions.append(monitor_copy)
enhanced_order.append(monitor_copy.id)
# Add final adaptation action
enhanced_actions.append(adapt_action)
enhanced_order.append(adapt_action.id)
base_plan.actions = enhanced_actions
base_plan.execution_order = enhanced_order
base_plan.confidence = 0.9
base_plan.estimated_total_time += len(enhanced_actions) * 3.0 # Account for monitoring overhead
base_plan.metadata.update({
"strategy": "adaptive",
"monitoring_enabled": True,
"adaptation_enabled": True
})
return base_plan
def _analyze_goal_requirements(self, goal: Goal) -> List[str]:
"""
Analyze goal to determine required actions.
Args:
goal (Goal): Goal to analyze
Returns:
List[str]: List of required action types
"""
required_actions = []
description_lower = goal.description.lower()
# Intelligent action determination based on goal description
if any(word in description_lower for word in ["research", "find", "search", "investigate", "discover"]):
required_actions.append("research")
if any(word in description_lower for word in ["analyze", "examine", "study", "evaluate", "assess"]):
required_actions.append("analyze")
if any(word in description_lower for word in ["synthesize", "combine", "integrate", "summarize", "merge"]):
required_actions.append("synthesize")
if any(word in description_lower for word in ["communicate", "report", "present", "explain", "share"]):
required_actions.append("communicate")
if any(word in description_lower for word in ["monitor", "track", "watch", "observe"]):
required_actions.append("monitor")
# Default actions if none detected
if not required_actions:
required_actions = ["research", "analyze", "synthesize"]
# Ensure logical flow
if "synthesize" in required_actions and "analyze" not in required_actions:
required_actions.insert(-1, "analyze")
if "analyze" in required_actions and "research" not in required_actions:
required_actions.insert(0, "research")
return required_actions
def _group_parallel_actions(self, actions: List[str]) -> List[List[str]]:
"""
Group actions that can be executed in parallel.
Args:
actions (List[str]): List of action types
Returns:
List[List[str]]: Groups of actions that can run in parallel
"""
groups = []
current_group = []
# Simple grouping logic based on dependencies
for action in actions:
if action == "research":
# Research actions can often be parallelized
if not current_group or all(a == "research" for a in current_group):
current_group.append(action)
else:
if current_group:
groups.append(current_group)
current_group = [action]
elif action == "monitor":
# Monitoring can be parallel with other actions
current_group.append(action)
else:
# Other actions typically need to be sequential
if current_group:
groups.append(current_group)
current_group = [action]
if current_group:
groups.append(current_group)
return groups
def _create_action(self, action_type: str, goal: Goal, context: str) -> Action:
"""
Create an action based on type and context.
Args:
action_type (str): Type of action to create
goal (Goal): Associated goal
context (str): Context information
Returns:
Action: Created action
"""
template = self.action_templates.get(action_type, {})
action = Action(
type=template.get("type", ActionType.EXECUTE),
description=f"{action_type.title()} action for goal: {goal.description} ({context})",
parameters={
"goal_id": goal.id,
"action_type": action_type,
"context": context,
"goal_description": goal.description,
"required_capabilities": template.get("required_capabilities", [])
},
expected_outcome=f"Completed {action_type} for {goal.description}",
estimated_duration=template.get("estimated_duration", 30.0)
)
return action
def get_planning_statistics(self) -> Dict[str, Any]:
"""Get planning engine statistics."""
if not self.planning_history:
return {"total_plans": 0}
total_plans = len(self.planning_history)
avg_actions = sum(p["actions_count"] for p in self.planning_history) / total_plans
avg_time = sum(p["estimated_time"] for p in self.planning_history) / total_plans
strategies_used = {}
for plan in self.planning_history:
strategy = plan["strategy"]
strategies_used[strategy] = strategies_used.get(strategy, 0) + 1
return {
"total_plans": total_plans,
"average_actions_per_plan": avg_actions,
"average_estimated_time": avg_time,
"strategies_used": strategies_used,
"available_capabilities": len(self.available_capabilities)
}
class AutonomousExecutor:
"""
Autonomous execution engine for agentic AI systems.
This engine executes plans autonomously, monitors progress,
adapts to changing circumstances, and learns from experience.
"""
def __init__(self, agents: Dict[str, Any], goal_manager: GoalManager,
planning_engine: PlanningEngine):
"""
Initialize the autonomous executor.
Args:
agents (Dict[str, Any]): Available agents for task execution
goal_manager (GoalManager): Goal management system
planning_engine (PlanningEngine): Planning engine
"""
self.agents = agents
self.goal_manager = goal_manager
self.planning_engine = planning_engine
self.active_plans: Dict[str, Plan] = {}
self.execution_history: List[Dict[str, Any]] = []
self.learning_data: Dict[str, List[float]] = {}
self.adaptation_strategies = ["retry", "alternative", "decompose", "skip"]
self.performance_metrics = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"adaptations_made": 0
}
async def execute_goal(self, goal_id: str) -> Dict[str, Any]:
"""
Execute a goal autonomously through planning and action execution.
Args:
goal_id (str): Goal identifier to execute
Returns:
Dict[str, Any]: Execution result with metadata
"""
if goal_id not in self.goal_manager.goals:
return {"success": False, "error": "Goal not found"}
goal = self.goal_manager.goals[goal_id]
execution_start = time.time()
print(f"Starting autonomous execution of goal: {goal.description}")
try:
# Update goal status
self.goal_manager.update_goal_status(goal_id, GoalStatus.IN_PROGRESS)
# Create execution plan
plan = self.planning_engine.create_plan(goal, strategy="adaptive")
self.active_plans[goal_id] = plan
# Execute plan
execution_result = await self._execute_plan(plan)
# Update goal status based on execution result
if execution_result["success"]:
self.goal_manager.update_goal_status(
goal_id,
GoalStatus.COMPLETED,
{
"completion_time": time.time(),
"execution_result": execution_result,
"total_execution_time": time.time() - execution_start
}
)
self.performance_metrics["successful_executions"] += 1
print(f"Goal completed successfully: {goal.description}")
else:
self.goal_manager.update_goal_status(
goal_id,
GoalStatus.FAILED,
{
"failure_time": time.time(),
"failure_reason": execution_result.get("error"),
"total_execution_time": time.time() - execution_start
}
)
self.performance_metrics["failed_executions"] += 1
print(f"Goal execution failed: {execution_result.get('error')}")
# Record execution for learning
self._record_execution(goal, plan, execution_result)
self.performance_metrics["total_executions"] += 1
return execution_result
except Exception as e:
self.goal_manager.update_goal_status(
goal_id,
GoalStatus.FAILED,
{
"failure_time": time.time(),
"error": str(e),
"total_execution_time": time.time() - execution_start
}
)
self.performance_metrics["failed_executions"] += 1
self.performance_metrics["total_executions"] += 1
print(f"Goal execution error: {str(e)}")
return {"success": False, "error": str(e)}
async def _execute_plan(self, plan: Plan) -> Dict[str, Any]:
"""
Execute a plan by running its actions in order.
Args:
plan (Plan): Plan to execute
Returns:
Dict[str, Any]: Execution result
"""
execution_start = time.time()
executed_actions = []
results = {}
adaptation_count = 0
print(f"Executing plan with {len(plan.actions)} actions")
try:
for i, action_id in enumerate(plan.execution_order):
action = next((a for a in plan.actions if a.id == action_id), None)
if not action:
continue
print(f"Executing action {i+1}/{len(plan.execution_order)}: {action.description[:50]}...")
# Check prerequisites
if not await self._check_prerequisites(action, executed_actions):
print(f"Prerequisites not met for action {action_id}")
# Attempt adaptation
if plan.metadata.get("adaptation_enabled"):
adaptation_result = await self._adapt_plan(plan, action, "prerequisites_not_met")
if adaptation_result["success"]:
adaptation_count += 1
self.performance_metrics["adaptations_made"] += 1
continue
return {
"success": False,
"error": f"Prerequisites not met for action {action_id}",
"executed_actions": executed_actions,
"adaptations_made": adaptation_count
}
# Execute action
action_result = await self._execute_action(action)
action.result = action_result
action.status = "completed" if action_result.get("success") else "failed"
executed_actions.append(action_id)
results[action_id] = action_result
print(f"Action result: {'Success' if action_result.get('success') else 'Failed'}")
# Check if action failed and plan should be adapted
if not action_result.get("success"):
if plan.metadata.get("adaptation_enabled"):
adaptation_result = await self._adapt_plan(plan, action, "action_failed")
if adaptation_result["success"]:
adaptation_count += 1
self.performance_metrics["adaptations_made"] += 1
# Update action result with recovery
action.result = adaptation_result
action.status = "recovered"
results[action_id] = adaptation_result
else:
return {
"success": False,
"error": f"Action {action_id} failed and adaptation unsuccessful: {action_result.get('error')}",
"executed_actions": executed_actions,
"results": results,
"adaptations_made": adaptation_count
}
else:
return {
"success": False,
"error": f"Action {action_id} failed: {action_result.get('error')}",
"executed_actions": executed_actions,
"results": results,
"adaptations_made": adaptation_count
}
execution_time = time.time() - execution_start
print(f"Plan execution completed in {execution_time:.2f}s with {adaptation_count} adaptations")
return {
"success": True,
"execution_time": execution_time,
"executed_actions": executed_actions,
"results": results,
"plan_id": plan.id,
"adaptations_made": adaptation_count
}
except Exception as e:
return {
"success": False,
"error": str(e),
"executed_actions": executed_actions,
"results": results,
"adaptations_made": adaptation_count
}
async def _execute_action(self, action: Action) -> Dict[str, Any]:
"""
Execute a single action using appropriate agent.
Args:
action (Action): Action to execute
Returns:
Dict[str, Any]: Action execution result
"""
action_start = time.time()
try:
# Determine which agent to use based on action type
agent = self._select_agent_for_action(action)
if not agent:
return {
"success": False,
"error": f"No suitable agent found for action type {action.type.value}"
}
# Prepare action parameters
action_params = self._prepare_action_parameters(action)
# Execute action through agent based on type
if action.type == ActionType.RESEARCH:
result = await self._execute_research_action(agent, action_params)
elif action.type == ActionType.ANALYZE:
result = await self._execute_analysis_action(agent, action_params)
elif action.type == ActionType.SYNTHESIZE:
result = await self._execute_synthesis_action(agent, action_params)
elif action.type == ActionType.COMMUNICATE:
result = await self._execute_communication_action(agent, action_params)
elif action.type == ActionType.MONITOR:
result = await self._execute_monitoring_action(action_params)
elif action.type == ActionType.PLAN:
result = await self._execute_planning_action(action_params)
else:
result = {"success": False, "error": f"Unknown action type: {action.type.value}"}
# Record actual duration
action.actual_duration = time.time() - action_start
return result
except Exception as e:
action.actual_duration = time.time() - action_start
return {"success": False, "error": str(e)}
def _select_agent_for_action(self, action: Action) -> Optional[Any]:
"""
Select the most appropriate agent for an action.
Args:
action (Action): Action requiring agent selection
Returns:
Optional[Any]: Selected agent or None if no suitable agent
"""
action_type = action.type
# Agent selection based on action type and capabilities
if action_type == ActionType.RESEARCH:
# Prefer research agent, fallback to coordinator
if "researcher" in self.agents:
return self.agents["researcher"]
elif "coordinator" in self.agents:
return self.agents["coordinator"]
elif action_type == ActionType.ANALYZE:
# Prefer analyzer agent, fallback to coordinator
if "analyzer" in self.agents:
return self.agents["analyzer"]
elif "coordinator" in self.agents:
return self.agents["coordinator"]
elif action_type == ActionType.SYNTHESIZE:
# Prefer synthesizer agent, fallback to coordinator
if "synthesizer" in self.agents:
return self.agents["synthesizer"]
elif "coordinator" in self.agents:
return self.agents["coordinator"]
elif action_type in [ActionType.COMMUNICATE, ActionType.MONITOR, ActionType.PLAN]:
# These can be handled by coordinator or any available agent
if "coordinator" in self.agents:
return self.agents["coordinator"]
elif self.agents:
return list(self.agents.values())[0]
# Fallback to any available agent
if self.agents:
return list(self.agents.values())[0]
return None
def _prepare_action_parameters(self, action: Action) -> Dict[str, Any]:
"""
Prepare parameters for action execution.
Args:
action (Action): Action to prepare parameters for
Returns:
Dict[str, Any]: Prepared parameters
"""
base_params = action.parameters.copy()
base_params.update({
"action_id": action.id,
"action_type": action.type.value,
"action_description": action.description,
"expected_outcome": action.expected_outcome,
"estimated_duration": action.estimated_duration
})
return base_params
async def _execute_research_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute research action through research agent.
Args:
agent: Agent to execute the action
params (Dict[str, Any]): Action parameters
Returns:
Dict[str, Any]: Research results
"""
try:
# Create a simplified message structure for compatibility
class SimpleMessage:
def __init__(self, sender, recipient, content, message_type):
self.sender = sender
self.recipient = recipient
self.content = content
self.message_type = message_type
self.id = str(uuid.uuid4())
self.metadata = {}
self.timestamp = time.time()
research_query = params.get("goal_description", params.get("action_description", ""))
research_message = SimpleMessage(
sender="executor",
recipient=getattr(agent, 'agent_id', 'agent'),
content=research_query,
message_type="research_request"
)
# Execute research
responses = await agent.process_message(research_message)
if responses and responses[0].message_type != "error_response":
try:
research_data = json.loads(responses[0].content)
return {
"success": True,
"data": research_data,
"agent_used": getattr(agent, 'agent_id', 'research_agent'),
"action_type": "research"
}
except json.JSONDecodeError:
return {
"success": True,
"data": responses[0].content,
"agent_used": getattr(agent, 'agent_id', 'research_agent'),
"action_type": "research"
}
else:
return {
"success": False,
"error": "Research agent failed to provide results",
"action_type": "research"
}
except Exception as e:
return {
"success": False,
"error": f"Research action failed: {str(e)}",
"action_type": "research"
}
async def _execute_analysis_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute analysis action through analyzer agent.
Args:
agent: Agent to execute the action
params (Dict[str, Any]): Action parameters
Returns:
Dict[str, Any]: Analysis results
"""
try:
class SimpleMessage:
def __init__(self, sender, recipient, content, message_type):
self.sender = sender
self.recipient = recipient
self.content = content
self.message_type = message_type
self.id = str(uuid.uuid4())
self.metadata = {}
self.timestamp = time.time()
# Prepare analysis data
analysis_data = {
"content": params.get("research_data", []),
"type": "goal_analysis",
"query": params.get("goal_description", params.get("action_description", ""))
}
analysis_message = SimpleMessage(
sender="executor",
recipient=getattr(agent, 'agent_id', 'agent'),
content=json.dumps(analysis_data),
message_type="analysis_request"
)
# Execute analysis
responses = await agent.process_message(analysis_message)
if responses and responses[0].message_type != "error_response":
try:
analysis_result = json.loads(responses[0].content)
return {
"success": True,
"data": analysis_result,
"agent_used": getattr(agent, 'agent_id', 'analysis_agent'),
"action_type": "analysis"
}
except json.JSONDecodeError:
return {
"success": True,
"data": {"analysis": responses[0].content},
"agent_used": getattr(agent, 'agent_id', 'analysis_agent'),
"action_type": "analysis"
}
else:
return {
"success": False,
"error": "Analysis agent failed to provide results",
"action_type": "analysis"
}
except Exception as e:
return {
"success": False,
"error": f"Analysis action failed: {str(e)}",
"action_type": "analysis"
}
async def _execute_synthesis_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute synthesis action through synthesizer agent.
Args:
agent: Agent to execute the action
params (Dict[str, Any]): Action parameters
Returns:
Dict[str, Any]: Synthesis results
"""
try:
class SimpleMessage:
def __init__(self, sender, recipient, content, message_type):
self.sender = sender
self.recipient = recipient
self.content = content
self.message_type = message_type
self.id = str(uuid.uuid4())
self.metadata = {}
self.timestamp = time.time()
# Prepare synthesis data
synthesis_data = {
"query": params.get("goal_description", params.get("action_description", "")),
"research_results": params.get("research_data", []),
"analysis_results": params.get("analysis_data", {})
}
synthesis_message = SimpleMessage(
sender="executor",
recipient=getattr(agent, 'agent_id', 'agent'),
content=json.dumps(synthesis_data),
message_type="synthesis_request"
)
# Execute synthesis
responses = await agent.process_message(synthesis_message)
if responses and responses[0].message_type != "error_response":
return {
"success": True,
"data": responses[0].content,
"agent_used": getattr(agent, 'agent_id', 'synthesis_agent'),
"action_type": "synthesis"
}
else:
return {
"success": False,
"error": "Synthesis agent failed to provide results",
"action_type": "synthesis"
}
except Exception as e:
return {
"success": False,
"error": f"Synthesis action failed: {str(e)}",
"action_type": "synthesis"
}
async def _execute_communication_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute communication action.
Args:
agent: Agent to execute the action
params (Dict[str, Any]): Action parameters
Returns:
Dict[str, Any]: Communication results
"""
try:
# Simple communication action - format and present results
content = params.get("synthesis_data", params.get("data", ""))
goal_description = params.get("goal_description", "")
if isinstance(content, dict):
content = json.dumps(content, indent=2)
formatted_content = f"""
Communication Result for: {goal_description}
Content:
{content}
Status: Communication completed successfully
Timestamp: {time.time()}
"""
return {
"success": True,
"data": formatted_content,
"agent_used": "communication_handler",
"action_type": "communication"
}
except Exception as e:
return {
"success": False,
"error": f"Communication action failed: {str(e)}",
"action_type": "communication"
}
async def _execute_monitoring_action(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute monitoring action.
Args:
params (Dict[str, Any]): Action parameters
Returns:
Dict[str, Any]: Monitoring results
"""
try:
goal_id = params.get("goal_id")
check_interval = params.get("check_interval", 30.0)
# Simulate monitoring delay
await asyncio.sleep(0.1)
# Get goal progress
if goal_id and goal_id in self.goal_manager.goals:
progress = self.goal_manager.get_goal_progress(goal_id)
monitoring_result = {
"goal_id": goal_id,
"progress": progress,
"check_time": time.time(),
"status": "monitoring_completed",
"recommendations": []
}
# Add recommendations based on progress
if progress.get("progress", 0) < 0.5:
monitoring_result["recommendations"].append("Consider increasing resource allocation")
if progress.get("status") == "failed":
monitoring_result["recommendations"].append("Goal requires intervention or replanning")
return {
"success": True,
"data": monitoring_result,
"action_type": "monitoring"
}
else:
return {
"success": False,
"error": "Goal not found for monitoring",
"action_type": "monitoring"
}
except Exception as e:
return {
"success": False,
"error": f"Monitoring action failed: {str(e)}",
"action_type": "monitoring"
}
async def _execute_planning_action(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute planning action (replanning/adaptation).
Args:
params (Dict[str, Any]): Action parameters
Returns:
Dict[str, Any]: Planning results
"""
try:
goal_id = params.get("goal_id")
adaptation_threshold = params.get("adaptation_threshold", 0.3)
# Simulate planning delay
await asyncio.sleep(0.2)
if goal_id and goal_id in self.goal_manager.goals:
goal = self.goal_manager.goals[goal_id]
# Create alternative plan
alternative_plan = self.planning_engine.create_plan(goal, strategy="sequential")
planning_result = {
"goal_id": goal_id,
"new_plan_id": alternative_plan.id,
"actions_count": len(alternative_plan.actions),
"estimated_time": alternative_plan.estimated_total_time,
"planning_time": time.time(),
"status": "replanning_completed"
}
return {
"success": True,
"data": planning_result,
"action_type": "planning"
}
else:
return {
"success": False,
"error": "Goal not found for replanning",
"action_type": "planning"
}
except Exception as e:
return {
"success": False,
"error": f"Planning action failed: {str(e)}",
"action_type": "planning"
}
async def _check_prerequisites(self, action: Action, executed_actions: List[str]) -> bool:
"""
Check if action prerequisites are satisfied.
Args:
action (Action): Action to check
executed_actions (List[str]): List of executed action IDs
Returns:
bool: True if prerequisites are satisfied
"""
return all(prereq in executed_actions for prereq in action.prerequisites)
async def _adapt_plan(self, plan: Plan, failed_action: Action, failure_reason: str) -> Dict[str, Any]:
"""
Adapt plan in response to failure or changing circumstances.
Args:
plan (Plan): Current plan
failed_action (Action): Action that failed
failure_reason (str): Reason for failure
Returns:
Dict[str, Any]: Adaptation result
"""
try:
print(f"Adapting plan due to: {failure_reason}")
# Select adaptation strategy based on failure reason
if failure_reason == "action_failed":
# Try retry strategy first
adaptation_result = await self._retry_action(failed_action)
if adaptation_result["success"]:
return adaptation_result
# If retry fails, try alternative approach
return await self._create_alternative_action(failed_action)
elif failure_reason == "prerequisites_not_met":
# Try to resolve prerequisites
return await self._resolve_prerequisites(failed_action, plan)
else:
# Generic adaptation
return await self._generic_adaptation(plan, failed_action)
except Exception as e:
return {
"success": False,
"error": f"Adaptation failed: {str(e)}",
"adaptation_type": "error"
}
async def _retry_action(self, action: Action) -> Dict[str, Any]:
"""
Retry a failed action with modified parameters.
Args:
action (Action): Action to retry
Returns:
Dict[str, Any]: Retry result
"""
try:
print(f"Retrying action: {action.description[:50]}...")
# Create modified version of the action
retry_action = Action(
type=action.type,
description=f"RETRY: {action.description}",
parameters=action.parameters.copy(),
estimated_duration=action.estimated_duration * 0.8 # Slightly faster retry
)
# Add retry flag
retry_action.parameters["retry"] = True
retry_action.parameters["original_action_id"] = action.id
# Execute retry
retry_result = await self._execute_action(retry_action)
if retry_result["success"]:
print("Retry successful")
return {
"success": True,
"data": retry_result["data"],
"adaptation_type": "retry",
"original_action_id": action.id
}
else:
print("Retry failed")
return {
"success": False,
"error": f"Retry failed: {retry_result.get('error')}",
"adaptation_type": "retry"
}
except Exception as e:
return {
"success": False,
"error": f"Retry attempt failed: {str(e)}",
"adaptation_type": "retry"
}
async def _create_alternative_action(self, action: Action) -> Dict[str, Any]:
"""
Create alternative approach for a failed action.
Args:
action (Action): Failed action
Returns:
Dict[str, Any]: Alternative action result
"""
try:
print(f"Creating alternative for action: {action.description[:50]}...")
# Create alternative based on action type
if action.type == ActionType.RESEARCH:
# Alternative research approach
alt_action = Action(
type=ActionType.RESEARCH,
description=f"ALTERNATIVE RESEARCH: {action.description}",
parameters={
**action.parameters,
"alternative_approach": True,
"simplified_query": True
},
estimated_duration=action.estimated_duration * 0.6
)
elif action.type == ActionType.ANALYZE:
# Alternative analysis approach
alt_action = Action(
type=ActionType.ANALYZE,
description=f"SIMPLIFIED ANALYSIS: {action.description}",
parameters={
**action.parameters,
"analysis_depth": "basic",
"alternative_method": True
},
estimated_duration=action.estimated_duration * 0.7
)
else:
# Generic alternative
alt_action = Action(
type=action.type,
description=f"ALTERNATIVE: {action.description}",
parameters={
**action.parameters,
"alternative": True,
"simplified": True
},
estimated_duration=action.estimated_duration * 0.5
)
# Execute alternative
alt_result = await self._execute_action(alt_action)
if alt_result["success"]:
print("Alternative approach successful")
return {
"success": True,
"data": alt_result["data"],
"adaptation_type": "alternative",
"original_action_id": action.id
}
else:
print("Alternative approach failed")
return {
"success": False,
"error": f"Alternative failed: {alt_result.get('error')}",
"adaptation_type": "alternative"
}
except Exception as e:
return {
"success": False,
"error": f"Alternative creation failed: {str(e)}",
"adaptation_type": "alternative"
}
async def _resolve_prerequisites(self, action: Action, plan: Plan) -> Dict[str, Any]:
"""
Attempt to resolve missing prerequisites.
Args:
action (Action): Action with unmet prerequisites
plan (Plan): Current plan
Returns:
Dict[str, Any]: Resolution result
"""
try:
print(f"Resolving prerequisites for: {action.description[:50]}...")
# Simple resolution: skip prerequisites if possible
resolved_action = Action(
type=action.type,
description=f"PREREQUISITE-RESOLVED: {action.description}",
parameters={
**action.parameters,
"prerequisites_skipped": True,
"reduced_dependencies": True
},
prerequisites=[], # Remove prerequisites
estimated_duration=action.estimated_duration
)
# Execute resolved action
resolved_result = await self._execute_action(resolved_action)
if resolved_result["success"]:
print("Prerequisites resolved successfully")
return {
"success": True,
"data": resolved_result["data"],
"adaptation_type": "prerequisite_resolution",
"original_action_id": action.id
}
else:
return {
"success": False,
"error": f"Prerequisite resolution failed: {resolved_result.get('error')}",
"adaptation_type": "prerequisite_resolution"
}
except Exception as e:
return {
"success": False,
"error": f"Prerequisite resolution failed: {str(e)}",
"adaptation_type": "prerequisite_resolution"
}
async def _generic_adaptation(self, plan: Plan, action: Action) -> Dict[str, Any]:
"""
Generic adaptation strategy.
Args:
plan (Plan): Current plan
action (Action): Problematic action
Returns:
Dict[str, Any]: Adaptation result
"""
try:
print(f"Applying generic adaptation for: {action.description[:50]}...")
# Simple generic adaptation: create a basic fallback
fallback_action = Action(
type=ActionType.COMMUNICATE,
description=f"FALLBACK: Partial completion of {action.description}",
parameters={
"fallback": True,
"original_action": action.description,
"partial_completion": True
},
estimated_duration=5.0
)
# Execute fallback
fallback_result = await self._execute_action(fallback_action)
return {
"success": True,
"data": fallback_result.get("data", "Fallback completed"),
"adaptation_type": "generic_fallback",
"original_action_id": action.id
}
except Exception as e:
return {
"success": False,
"error": f"Generic adaptation failed: {str(e)}",
"adaptation_type": "generic_fallback"
}
def _record_execution(self, goal: Goal, plan: Plan, execution_result: Dict[str, Any]):
"""
Record execution data for learning and improvement.
Args:
goal (Goal): Executed goal
plan (Plan): Execution plan
execution_result (Dict[str, Any]): Execution result
"""
execution_record = {
"goal_id": goal.id,
"goal_description": goal.description,
"goal_priority": goal.priority,
"plan_id": plan.id,
"plan_strategy": plan.metadata.get("strategy", "unknown"),
"execution_time": execution_result.get("execution_time", 0.0),
"success": execution_result.get("success", False),
"actions_count": len(plan.actions),
"adaptations_made": execution_result.get("adaptations_made", 0),
"timestamp": time.time()
}
self.execution_history.append(execution_record)
# Update learning data for different goal types
goal_type = self._classify_goal_type(goal.description)
if goal_type not in self.learning_data:
self.learning_data[goal_type] = []
# Record execution time for learning
self.learning_data[goal_type].append(execution_result.get("execution_time", 0.0))
# Keep only recent learning data (last 100 entries per type)
if len(self.learning_data[goal_type]) > 100:
self.learning_data[goal_type] = self.learning_data[goal_type][-100:]
def _classify_goal_type(self, description: str) -> str:
"""
Classify goal type for learning purposes.
Args:
description (str): Goal description
Returns:
str: Goal type classification
"""
description_lower = description.lower()
if any(word in description_lower for word in ["research", "investigate", "find", "search"]):
return "research_goal"
elif any(word in description_lower for word in ["analyze", "study", "examine", "evaluate"]):
return "analysis_goal"
elif any(word in description_lower for word in ["synthesize", "combine", "integrate", "summarize"]):
return "synthesis_goal"
elif any(word in description_lower for word in ["create", "generate", "build", "develop"]):
return "creation_goal"
elif any(word in description_lower for word in ["communicate", "report", "present", "explain"]):
return "communication_goal"
else:
return "general_goal"
def get_execution_statistics(self) -> Dict[str, Any]:
"""
Get comprehensive execution statistics for performance monitoring.
Returns:
Dict[str, Any]: Execution statistics
"""
if not self.execution_history:
return {
"total_executions": 0,
"performance_metrics": self.performance_metrics
}
total_executions = len(self.execution_history)
successful_executions = sum(1 for record in self.execution_history if record["success"])
avg_execution_time = sum(record["execution_time"] for record in self.execution_history) / total_executions
avg_adaptations = sum(record["adaptations_made"] for record in self.execution_history) / total_executions
# Goal type statistics
goal_type_stats = {}
for record in self.execution_history:
goal_type = self._classify_goal_type(record["goal_description"])
if goal_type not in goal_type_stats:
goal_type_stats[goal_type] = {"count": 0, "success": 0}
goal_type_stats[goal_type]["count"] += 1
if record["success"]:
goal_type_stats[goal_type]["success"] += 1
# Calculate success rates by goal type
for goal_type in goal_type_stats:
stats = goal_type_stats[goal_type]
stats["success_rate"] = stats["success"] / stats["count"] if stats["count"] > 0 else 0.0
return {
"total_executions": total_executions,
"successful_executions": successful_executions,
"success_rate": successful_executions / total_executions,
"average_execution_time": avg_execution_time,
"average_adaptations_per_execution": avg_adaptations,
"goal_type_statistics": goal_type_stats,
"learning_data_points": sum(len(data) for data in self.learning_data.values()),
"performance_metrics": self.performance_metrics
}
def get_learning_insights(self) -> Dict[str, Any]:
"""
Get insights from learning data for system improvement.
Returns:
Dict[str, Any]: Learning insights
"""
insights = {
"goal_type_performance": {},
"adaptation_effectiveness": 0.0,
"improvement_trends": {},
"recommendations": []
}
# Analyze performance by goal type
for goal_type, execution_times in self.learning_data.items():
if execution_times:
insights["goal_type_performance"][goal_type] = {
"average_time": sum(execution_times) / len(execution_times),
"min_time": min(execution_times),
"max_time": max(execution_times),
"sample_count": len(execution_times)
}
# Calculate adaptation effectiveness
if self.execution_history:
adapted_executions = [r for r in self.execution_history if r["adaptations_made"] > 0]
if adapted_executions:
adapted_success_rate = sum(1 for r in adapted_executions if r["success"]) / len(adapted_executions)
insights["adaptation_effectiveness"] = adapted_success_rate
# Generate recommendations
if self.performance_metrics["success_rate"] < 0.8:
insights["recommendations"].append("Consider improving action reliability or adaptation strategies")
if self.performance_metrics["adaptations_made"] > self.performance_metrics["total_executions"] * 0.5:
insights["recommendations"].append("High adaptation rate suggests need for better initial planning")
return insights
class AgenticAI:
"""
Complete Agentic AI system implementation.
This class orchestrates autonomous goal-oriented behavior through
goal management, planning, and adaptive execution.
"""
def __init__(self, llm_model=None, tokenizer=None, device=None, knowledge_base=None):
"""
Initialize the Agentic AI system.
Args:
llm_model: Language model for agents
tokenizer: Tokenizer for the language model
device: Device for model inference
knowledge_base: Knowledge base for research
"""
print("Initializing Agentic AI System...")
self.llm_model = llm_model
self.tokenizer = tokenizer
self.device = device
self.knowledge_base = knowledge_base
# Initialize core components
self.goal_manager = GoalManager()
available_capabilities = [
"research", "analysis", "synthesis", "communication",
"planning", "monitoring", "adaptation", "learning"
]
self.planning_engine = PlanningEngine(available_capabilities)
# Initialize agents (simplified versions for this example)
self.agents = self._initialize_agents()
# Initialize autonomous executor
self.executor = AutonomousExecutor(
self.agents,
self.goal_manager,
self.planning_engine
)
# System state
self.is_running = False
self.autonomous_mode = False
self.system_statistics = {
"goals_created": 0,
"goals_completed": 0,
"autonomous_cycles": 0
}
print(f"Agentic AI System initialized with {len(self.agents)} agents")
def _initialize_agents(self) -> Dict[str, Any]:
"""Initialize simplified agents for the agentic system."""
agents = {}
# Import agent classes from previous chapters
try:
# Simplified agent implementations for this example
class SimpleAgent:
def __init__(self, agent_id, role, capabilities):
self.agent_id = agent_id
self.role = role
self.capabilities = capabilities
self.message_history = []
self.performance_metrics = {
"messages_processed": 0,
"average_response_time": 0.0,
"success_rate": 1.0
}
async def process_message(self, message):
# Simplified message processing
self.message_history.append(message)
self.performance_metrics["messages_processed"] += 1
# Simulate processing delay
await asyncio.sleep(0.1)
# Create response based on message type
if message.message_type == "research_request":
response_content = json.dumps([{
"id": "simulated_result",
"content": f"Research result for: {message.content}",
"source": "simulated_research",
"relevance_score": 0.8
}])
elif message.message_type == "analysis_request":
response_content = json.dumps({
"analysis": f"Analysis of: {message.content}",
"confidence": 0.8,
"key_themes": ["simulated analysis"]
})
elif message.message_type == "synthesis_request":
data = json.loads(message.content) if message.content.startswith('{') else {}
response_content = f"Synthesis result for: {data.get('query', 'unknown query')}"
else:
response_content = f"Processed: {message.content}"
class SimpleResponse:
def __init__(self, sender, recipient, content, message_type):
self.sender = sender
self.recipient = recipient
self.content = content
self.message_type = message_type
self.id = str(uuid.uuid4())
self.timestamp = time.time()
return [SimpleResponse(
sender=self.agent_id,
recipient=message.sender,
content=response_content,
message_type=f"{message.message_type.replace('_request', '_results')}"
)]
# Create simplified agents
agents["researcher"] = SimpleAgent("researcher_001", "researcher", ["research"])
agents["analyzer"] = SimpleAgent("analyzer_001", "analyzer", ["analysis"])
agents["synthesizer"] = SimpleAgent("synthesizer_001", "synthesizer", ["synthesis"])
agents["coordinator"] = SimpleAgent("coordinator_001", "coordinator", ["coordination"])
except Exception as e:
print(f"Warning: Could not initialize full agents, using simplified versions: {e}")
# Fallback to basic agent structure
agents = {
"researcher": type('Agent', (), {
'agent_id': 'researcher_001',
'role': 'researcher',
'process_message': lambda self, msg: []
})(),
"analyzer": type('Agent', (), {
'agent_id': 'analyzer_001',
'role': 'analyzer',
'process_message': lambda self, msg: []
})(),
"synthesizer": type('Agent', (), {
'agent_id': 'synthesizer_001',
'role': 'synthesizer',
'process_message': lambda self, msg: []
})(),
"coordinator": type('Agent', (), {
'agent_id': 'coordinator_001',
'role': 'coordinator',
'process_message': lambda self, msg: []
})()
}
return agents
async def create_and_execute_goal(self, description: str, priority: int = 1,
auto_decompose: bool = True) -> Dict[str, Any]:
"""
Create a goal and execute it autonomously.
Args:
description (str): Goal description
priority (int): Goal priority
auto_decompose (bool): Whether to auto-decompose complex goals
Returns:
Dict[str, Any]: Execution result
"""
print(f"Creating and executing goal: {description}")
# Create goal
goal = self.goal_manager.create_goal(
description=description,
priority=priority,
required_capabilities=self._analyze_required_capabilities(description)
)
self.system_statistics["goals_created"] += 1
# Auto-decompose if requested and goal is complex
if auto_decompose and self._is_complex_goal(description):
print("Goal appears complex, decomposing into sub-goals...")
sub_goals = self.goal_manager.decompose_goal(goal.id)
if sub_goals:
print(f"Created {len(sub_goals)} sub-goals")
# Execute sub-goals first
for sub_goal in sub_goals:
sub_result = await self.executor.execute_goal(sub_goal.id)
if not sub_result["success"]:
print(f"Sub-goal failed: {sub_goal.description}")
# Execute main goal
result = await self.executor.execute_goal(goal.id)
if result["success"]:
self.system_statistics["goals_completed"] += 1
return {
"goal_id": goal.id,
"goal_description": description,
"execution_result": result,
"goal_progress": self.goal_manager.get_goal_progress(goal.id)
}
def _analyze_required_capabilities(self, description: str) -> List[str]:
"""Analyze goal description to determine required capabilities."""
capabilities = []
description_lower = description.lower()
if any(word in description_lower for word in ["research", "find", "search", "investigate"]):
capabilities.append("research")
if any(word in description_lower for word in ["analyze", "study", "examine", "evaluate"]):
capabilities.append("analysis")
if any(word in description_lower for word in ["synthesize", "combine", "integrate", "summarize"]):
capabilities.append("synthesis")
if any(word in description_lower for word in ["communicate", "report", "present", "explain"]):
capabilities.append("communication")
return capabilities if capabilities else ["research", "analysis", "synthesis"]
def _is_complex_goal(self, description: str) -> bool:
"""Determine if a goal is complex and should be decomposed."""
# Simple heuristics for complexity
word_count = len(description.split())
has_multiple_actions = len([word for word in description.lower().split()
if word in ["and", "then", "also", "additionally"]]) > 0
return word_count > 10 or has_multiple_actions
async def run_autonomous_cycle(self, max_goals: int = 5) -> Dict[str, Any]:
"""
Run one cycle of autonomous operation.
Args:
max_goals (int): Maximum goals to process in this cycle
Returns:
Dict[str, Any]: Cycle results
"""
cycle_start = time.time()
processed_goals = 0
results = []
print(f"Starting autonomous cycle (max {max_goals} goals)")
while processed_goals < max_goals:
# Get next goal
next_goal = self.goal_manager.get_next_goal()
if not next_goal:
print("No pending goals found")
break
print(f"Processing goal {processed_goals + 1}: {next_goal.description[:50]}...")
# Execute goal
result = await self.executor.execute_goal(next_goal.id)
results.append({
"goal_id": next_goal.id,
"description": next_goal.description,
"result": result
})
processed_goals += 1
if result["success"]:
self.system_statistics["goals_completed"] += 1
cycle_time = time.time() - cycle_start
self.system_statistics["autonomous_cycles"] += 1
cycle_result = {
"cycle_number": self.system_statistics["autonomous_cycles"],
"goals_processed": processed_goals,
"cycle_time": cycle_time,
"results": results,
"success_rate": sum(1 for r in results if r["result"]["success"]) / len(results) if results else 0.0
}
print(f"Autonomous cycle completed: {processed_goals} goals in {cycle_time:.2f}s")
return cycle_result
def get_system_status(self) -> Dict[str, Any]:
"""Get comprehensive system status."""
goal_stats = self.goal_manager.get_statistics()
execution_stats = self.executor.get_execution_statistics()
planning_stats = self.planning_engine.get_planning_statistics()
return {
"system_statistics": self.system_statistics,
"goal_management": goal_stats,
"execution_performance": execution_stats,
"planning_performance": planning_stats,
"autonomous_mode": self.autonomous_mode,
"is_running": self.is_running,
"agents_available": len(self.agents)
}
def get_learning_insights(self) -> Dict[str, Any]:
"""Get learning insights from the system."""
return self.executor.get_learning_insights()
def run_interactive_session(self):
"""Run interactive agentic AI session."""
print("\n" + "=" * 70)
print("AGENTIC AI SYSTEM - Interactive Session")
print("=" * 70)
print("Commands:")
print(" 'create <goal>' - Create and execute a goal")
print(" 'auto_cycle' - Run autonomous cycle")
print(" 'status' - Show system status")
print(" 'insights' - Show learning insights")
print(" 'goals' - List all goals")
print(" 'quit' or 'exit' - End session")
print("=" * 70)
async def run_session():
while True:
try:
user_input = input(f"\nAgentic AI> ").strip()
if user_input.lower() in ['quit', 'exit', 'bye']:
print("\nAgentic AI: Thank you for using the system! Goodbye!")
break
elif user_input.lower().startswith('create '):
goal_description = user_input[7:].strip()
if goal_description:
print(f"Creating and executing goal: {goal_description}")
result = await self.create_and_execute_goal(goal_description)
print(f"\nGoal Execution Result:")
print(f" Goal ID: {result['goal_id']}")
print(f" Success: {result['execution_result']['success']}")
if result['execution_result']['success']:
print(f" Execution Time: {result['execution_result'].get('execution_time', 0):.2f}s")
print(f" Adaptations Made: {result['execution_result'].get('adaptations_made', 0)}")
else:
print(f" Error: {result['execution_result'].get('error', 'Unknown error')}")
else:
print("Please provide a goal description")
continue
elif user_input.lower() == 'auto_cycle':
print("Running autonomous cycle...")
cycle_result = await self.run_autonomous_cycle()
print(f"\nAutonomous Cycle Results:")
print(f" Cycle Number: {cycle_result['cycle_number']}")
print(f" Goals Processed: {cycle_result['goals_processed']}")
print(f" Cycle Time: {cycle_result['cycle_time']:.2f}s")
print(f" Success Rate: {cycle_result['success_rate']:.1%}")
continue
elif user_input.lower() == 'status':
status = self.get_system_status()
print(f"\nSystem Status:")
print(f" Goals Created: {status['system_statistics']['goals_created']}")
print(f" Goals Completed: {status['system_statistics']['goals_completed']}")
print(f" Autonomous Cycles: {status['system_statistics']['autonomous_cycles']}")
print(f" Active Goals: {status['goal_management']['active_goals']}")
print(f" Execution Success Rate: {status['execution_performance'].get('success_rate', 0):.1%}")
print(f" Available Agents: {status['agents_available']}")
continue
elif user_input.lower() == 'insights':
insights = self.get_learning_insights()
print(f"\nLearning Insights:")
print(f" Adaptation Effectiveness: {insights['adaptation_effectiveness']:.1%}")
if insights['goal_type_performance']:
print(f" Goal Type Performance:")
for goal_type, perf in insights['goal_type_performance'].items():
print(f" {goal_type}: {perf['average_time']:.1f}s avg ({perf['sample_count']} samples)")
if insights['recommendations']:
print(f" Recommendations:")
for rec in insights['recommendations']:
print(f" - {rec}")
continue
elif user_input.lower() == 'goals':
stats = self.goal_manager.get_statistics()
print(f"\nGoal Overview:")
print(f" Total Goals: {stats['total_goals']}")
print(f" Active Goals: {stats['active_goals']}")
print(f" Completed Goals: {stats['completed_goals']}")
print(f" Pending Goals: {stats['pending_goals']}")
# Show recent goals
recent_goals = list(self.goal_manager.goals.values())[-5:]
if recent_goals:
print(f"\nRecent Goals:")
for goal in recent_goals:
print(f" {goal.status.value}: {goal.description[:50]}...")
continue
elif not user_input:
continue
else:
print("Unknown command. Type 'quit' to exit or use one of the available commands.")
except KeyboardInterrupt:
print("\n\nSession interrupted. Goodbye!")
break
except Exception as e:
print(f"\nError: {e}")
# Run async session
asyncio.run(run_session())
def create_sample_goals_for_agentic_ai():
"""Create sample goals for agentic AI demonstration."""
return [
"Research the latest developments in artificial intelligence and machine learning",
"Analyze the impact of climate change on global agriculture",
"Synthesize information about renewable energy technologies and their effectiveness",
"Investigate the role of quantum computing in cryptography and security",
"Create a comprehensive report on space exploration missions planned for the next decade"
]
def main():
"""
Main function demonstrating Agentic AI functionality.
"""
try:
# Initialize components
print("Setting up Agentic AI System...")
# Initialize device
if torch.cuda.is_available():
device = torch.device("cuda")
elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
print(f"Using device: {device}")
# Initialize LLM (optional for this demo)
try:
print("Loading language model...")
tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")
model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model.to(device)
model.eval()
except Exception as e:
print(f"Warning: Could not load LLM: {e}")
model = None
tokenizer = None
# Initialize knowledge base (optional)
knowledge_base = None
try:
print("Creating knowledge base...")
# This would import from previous chapters in a real implementation
# For now, we'll use a placeholder
knowledge_base = type('KnowledgeBase', (), {
'search_relevant_context': lambda self, query, k=3: [
{"content": f"Sample context for: {query}", "source": "sample", "score": 0.8}
]
})()
except Exception as e:
print(f"Warning: Could not create knowledge base: {e}")
# Initialize Agentic AI system
agentic_ai = AgenticAI(
llm_model=model,
tokenizer=tokenizer,
device=device,
knowledge_base=knowledge_base
)
# Show system status
status = agentic_ai.get_system_status()
print(f"\nAgentic AI System ready:")
print(f" Available agents: {status['agents_available']}")
print(f" System initialized successfully")
# Demonstrate with sample goals
print(f"\nDemonstrating with sample goals...")
sample_goals = create_sample_goals_for_agentic_ai()
async def demo():
for i, goal_desc in enumerate(sample_goals[:2]): # Demo with first 2 goals
print(f"\n--- Demo Goal {i+1} ---")
result = await agentic_ai.create_and_execute_goal(goal_desc)
print(f"Result: {'Success' if result['execution_result']['success'] else 'Failed'}")
# Run demo
asyncio.run(demo())
# Run interactive session
agentic_ai.run_interactive_session()
except Exception as e:
print(f"Failed to initialize Agentic AI System: {e}")
print("Please check your installation and try again.")
if __name__ == "__main__":
main()
This comprehensive Agentic AI implementation demonstrates autonomous goal-oriented behavior through sophisticated planning, adaptive execution, and learning mechanisms. The system can independently pursue complex objectives, adapt to failures, and improve performance over time through experience.
CHAPTER F: IMPLEMENTING MODEL CONTEXT PROTOCOL (MCP)
The Model Context Protocol represents a standardized framework for enabling seamless communication between AI models and external tools, services, and data sources. Developed by Anthropic, MCP addresses the fundamental challenge of providing AI systems with controlled, secure access to real-time information and computational resources beyond their training data.
The rationale for implementing MCP stems from the limitations of isolated AI models that cannot access current information, execute code, or interact with external systems. MCP bridges this gap by establishing a standardized protocol for AI models to communicate with external resources through well-defined interfaces, enabling more capable and practical AI applications.
MCP architecture consists of several key components working together to enable secure and efficient model-tool communication. MCP servers expose specific capabilities and resources to AI models. MCP clients facilitate communication between models and servers. Protocol definitions specify message formats, authentication mechanisms, and capability discovery procedures. Resource management ensures secure access control and proper resource allocation.
The implementation of MCP requires understanding both client and server-side components, protocol specifications, and security considerations. We will build a complete MCP implementation that demonstrates both server and client functionality while maintaining compatibility with the official MCP specification.
Setting up MCP development requires specific dependencies and protocol understanding.
import asyncio
import json
import uuid
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Any, Optional, Union, Callable
import websockets
import logging
from pathlib import Path
# MCP Protocol Version
MCP_VERSION = "2024-11-05"
class MCPMessageType(Enum):
"""MCP message types as defined in the protocol specification."""
INITIALIZE = "initialize"
INITIALIZED = "initialized"
PING = "ping"
PONG = "pong"
LIST_RESOURCES = "resources/list"
LIST_TOOLS = "tools/list"
CALL_TOOL = "tools/call"
READ_RESOURCE = "resources/read"
SUBSCRIBE = "resources/subscribe"
UNSUBSCRIBE = "resources/unsubscribe"
NOTIFICATION = "notification"
ERROR = "error"
@dataclass
class MCPMessage:
"""Base MCP message structure."""
jsonrpc: str = "2.0"
id: Optional[Union[str, int]] = None
method: Optional[str] = None
params: Optional[Dict[str, Any]] = None
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert message to dictionary for JSON serialization."""
message = {"jsonrpc": self.jsonrpc}
if self.id is not None:
message["id"] = self.id
if self.method is not None:
message["method"] = self.method
if self.params is not None:
message["params"] = self.params
if self.result is not None:
message["result"] = self.result
if self.error is not None:
message["error"] = self.error
return message
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'MCPMessage':
"""Create message from dictionary."""
return cls(
jsonrpc=data.get("jsonrpc", "2.0"),
id=data.get("id"),
method=data.get("method"),
params=data.get("params"),
result=data.get("result"),
error=data.get("error")
)
@dataclass
class MCPResource:
"""MCP resource definition."""
uri: str
name: str
description: Optional[str] = None
mimeType: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
@dataclass
class MCPTool:
"""MCP tool definition."""
name: str
description: str
inputSchema: Dict[str, Any]
metadata: Optional[Dict[str, Any]] = None
class MCPError(Exception):
"""MCP-specific error with error codes."""
# Standard JSON-RPC error codes
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
# MCP-specific error codes
RESOURCE_NOT_FOUND = -32001
TOOL_NOT_FOUND = -32002
ACCESS_DENIED = -32003
def __init__(self, code: int, message: str, data: Any = None):
self.code = code
self.message = message
self.data = data
super().__init__(f"MCP Error {code}: {message}")
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format."""
error = {"code": self.code, "message": self.message}
if self.data is not None:
error["data"] = self.data
return error
The MCP server implementation provides the foundation for exposing tools and resources to AI models. Servers must handle protocol negotiation, capability discovery, and secure resource access:
class MCPServer:
"""
MCP Server implementation for exposing tools and resources.
This server handles MCP protocol communication, manages resources
and tools, and provides secure access control mechanisms.
"""
def __init__(self, name: str, version: str = "1.0.0"):
"""
Initialize MCP server.
Args:
name (str): Server name
version (str): Server version
"""
self.name = name
self.version = version
self.capabilities = {
"resources": {},
"tools": {},
"logging": {}
}
# Resource and tool registries
self.resources: Dict[str, MCPResource] = {}
self.tools: Dict[str, MCPTool] = {}
self.resource_handlers: Dict[str, Callable] = {}
self.tool_handlers: Dict[str, Callable] = {}
# Client connections
self.clients: Dict[str, Dict[str, Any]] = {}
# Server state
self.is_initialized = False
self.logger = logging.getLogger(f"mcp.server.{name}")
# Register built-in handlers
self._register_builtin_handlers()
def _register_builtin_handlers(self):
"""Register built-in protocol handlers."""
self.protocol_handlers = {
MCPMessageType.INITIALIZE.value: self._handle_initialize,
MCPMessageType.PING.value: self._handle_ping,
MCPMessageType.LIST_RESOURCES.value: self._handle_list_resources,
MCPMessageType.LIST_TOOLS.value: self._handle_list_tools,
MCPMessageType.READ_RESOURCE.value: self._handle_read_resource,
MCPMessageType.CALL_TOOL.value: self._handle_call_tool
}
def register_resource(self, resource: MCPResource, handler: Callable):
"""
Register a resource with its handler.
Args:
resource (MCPResource): Resource definition
handler (Callable): Function to handle resource requests
"""
self.resources[resource.uri] = resource
self.resource_handlers[resource.uri] = handler
self.logger.info(f"Registered resource: {resource.uri}")
def register_tool(self, tool: MCPTool, handler: Callable):
"""
Register a tool with its handler.
Args:
tool (MCPTool): Tool definition
handler (Callable): Function to handle tool calls
"""
self.tools[tool.name] = tool
self.tool_handlers[tool.name] = handler
self.logger.info(f"Registered tool: {tool.name}")
async def handle_message(self, message_data: str, client_id: str) -> Optional[str]:
"""
Handle incoming MCP message.
Args:
message_data (str): JSON message data
client_id (str): Client identifier
Returns:
Optional[str]: Response message or None
"""
try:
# Parse message
data = json.loads(message_data)
message = MCPMessage.from_dict(data)
# Validate JSON-RPC format
if message.jsonrpc != "2.0":
raise MCPError(
MCPError.INVALID_REQUEST,
"Invalid JSON-RPC version"
)
# Handle different message types
if message.method:
# Request message
return await self._handle_request(message, client_id)
elif message.result is not None or message.error is not None:
# Response message
await self._handle_response(message, client_id)
return None
else:
raise MCPError(
MCPError.INVALID_REQUEST,
"Invalid message format"
)
except json.JSONDecodeError:
error_response = MCPMessage(
id=None,
error=MCPError(MCPError.PARSE_ERROR, "Parse error").to_dict()
)
return json.dumps(error_response.to_dict())
except MCPError as e:
error_response = MCPMessage(
id=getattr(message, 'id', None) if 'message' in locals() else None,
error=e.to_dict()
)
return json.dumps(error_response.to_dict())
except Exception as e:
self.logger.error(f"Unexpected error handling message: {e}")
error_response = MCPMessage(
id=getattr(message, 'id', None) if 'message' in locals() else None,
error=MCPError(MCPError.INTERNAL_ERROR, str(e)).to_dict()
)
return json.dumps(error_response.to_dict())
async def _handle_request(self, message: MCPMessage, client_id: str) -> str:
"""Handle request message."""
method = message.method
if method not in self.protocol_handlers:
raise MCPError(
MCPError.METHOD_NOT_FOUND,
f"Method not found: {method}"
)
handler = self.protocol_handlers[method]
try:
result = await handler(message.params or {}, client_id)
response = MCPMessage(
id=message.id,
result=result
)
return json.dumps(response.to_dict())
except Exception as e:
if isinstance(e, MCPError):
raise
else:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"Handler error: {str(e)}"
)
async def _handle_response(self, message: MCPMessage, client_id: str):
"""Handle response message."""
# Store response for pending requests if needed
if client_id in self.clients:
self.clients[client_id]["last_response"] = message
async def _handle_initialize(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle initialization request."""
protocol_version = params.get("protocolVersion")
client_info = params.get("clientInfo", {})
# Validate protocol version
if protocol_version != MCP_VERSION:
self.logger.warning(f"Protocol version mismatch: {protocol_version} != {MCP_VERSION}")
# Register client
self.clients[client_id] = {
"info": client_info,
"initialized_at": time.time(),
"protocol_version": protocol_version
}
self.is_initialized = True
return {
"protocolVersion": MCP_VERSION,
"capabilities": self.capabilities,
"serverInfo": {
"name": self.name,
"version": self.version
}
}
async def _handle_ping(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle ping request."""
return {} # Pong response
async def _handle_list_resources(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle list resources request."""
resources = []
for resource in self.resources.values():
resource_dict = {
"uri": resource.uri,
"name": resource.name
}
if resource.description:
resource_dict["description"] = resource.description
if resource.mimeType:
resource_dict["mimeType"] = resource.mimeType
if resource.metadata:
resource_dict["metadata"] = resource.metadata
resources.append(resource_dict)
return {"resources": resources}
async def _handle_list_tools(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle list tools request."""
tools = []
for tool in self.tools.values():
tool_dict = {
"name": tool.name,
"description": tool.description,
"inputSchema": tool.inputSchema
}
if tool.metadata:
tool_dict["metadata"] = tool.metadata
tools.append(tool_dict)
return {"tools": tools}
async def _handle_read_resource(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle read resource request."""
uri = params.get("uri")
if not uri:
raise MCPError(
MCPError.INVALID_PARAMS,
"Missing required parameter: uri"
)
if uri not in self.resources:
raise MCPError(
MCPError.RESOURCE_NOT_FOUND,
f"Resource not found: {uri}"
)
if uri not in self.resource_handlers:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"No handler for resource: {uri}"
)
handler = self.resource_handlers[uri]
resource = self.resources[uri]
try:
content = await handler(params)
return {
"contents": [{
"uri": uri,
"mimeType": resource.mimeType or "text/plain",
"text": content if isinstance(content, str) else json.dumps(content)
}]
}
except Exception as e:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"Resource handler error: {str(e)}"
)
async def _handle_call_tool(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle tool call request."""
name = params.get("name")
arguments = params.get("arguments", {})
if not name:
raise MCPError(
MCPError.INVALID_PARAMS,
"Missing required parameter: name"
)
if name not in self.tools:
raise MCPError(
MCPError.TOOL_NOT_FOUND,
f"Tool not found: {name}"
)
if name not in self.tool_handlers:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"No handler for tool: {name}"
)
handler = self.tool_handlers[name]
try:
result = await handler(arguments)
return {
"content": [{
"type": "text",
"text": result if isinstance(result, str) else json.dumps(result)
}]
}
except Exception as e:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"Tool handler error: {str(e)}"
)
async def start_websocket_server(self, host: str = "localhost", port: int = 8765):
"""
Start WebSocket server for MCP communication.
Args:
host (str): Server host
port (int): Server port
"""
async def handle_websocket(websocket, path):
client_id = str(uuid.uuid4())
self.logger.info(f"Client connected: {client_id}")
try:
async for message in websocket:
response = await self.handle_message(message, client_id)
if response:
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
self.logger.info(f"Client disconnected: {client_id}")
except Exception as e:
self.logger.error(f"WebSocket error: {e}")
finally:
if client_id in self.clients:
del self.clients[client_id]
self.logger.info(f"Starting MCP server on {host}:{port}")
async with websockets.serve(handle_websocket, host, port):
await asyncio.Future() # Run forever
The MCP client implementation enables AI models and applications to communicate with MCP servers, discover capabilities, and invoke tools or access resources:
class MCPClient:
"""
MCP Client implementation for connecting to MCP servers.
This client handles protocol communication, capability discovery,
and provides high-level interfaces for tool and resource access.
"""
def __init__(self, client_name: str, client_version: str = "1.0.0"):
"""
Initialize MCP client.
Args:
client_name (str): Client name
client_version (str): Client version
"""
self.client_name = client_name
self.client_version = client_version
# Connection state
self.websocket = None
self.is_connected = False
self.is_initialized = False
# Server capabilities
self.server_info = {}
self.server_capabilities = {}
self.available_resources = []
self.available_tools = []
# Request tracking
self.pending_requests = {}
self.next_request_id = 1
self.logger = logging.getLogger(f"mcp.client.{client_name}")
async def connect(self, uri: str):
"""
Connect to MCP server.
Args:
uri (str): Server WebSocket URI
"""
try:
self.websocket = await websockets.connect(uri)
self.is_connected = True
self.logger.info(f"Connected to MCP server: {uri}")
# Start message handler
asyncio.create_task(self._message_handler())
except Exception as e:
self.logger.error(f"Failed to connect to server: {e}")
raise
async def disconnect(self):
"""Disconnect from MCP server."""
if self.websocket:
await self.websocket.close()
self.is_connected = False
self.is_initialized = False
self.logger.info("Disconnected from MCP server")
async def initialize(self) -> Dict[str, Any]:
"""
Initialize connection with MCP server.
Returns:
Dict[str, Any]: Server initialization response
"""
if not self.is_connected:
raise RuntimeError("Not connected to server")
params = {
"protocolVersion": MCP_VERSION,
"clientInfo": {
"name": self.client_name,
"version": self.client_version
}
}
response = await self._send_request(MCPMessageType.INITIALIZE.value, params)
self.server_info = response.get("serverInfo", {})
self.server_capabilities = response.get("capabilities", {})
self.is_initialized = True
self.logger.info(f"Initialized with server: {self.server_info.get('name', 'unknown')}")
# Discover available resources and tools
await self._discover_capabilities()
return response
async def _discover_capabilities(self):
"""Discover server capabilities."""
try:
# List available resources
if "resources" in self.server_capabilities:
resources_response = await self._send_request(MCPMessageType.LIST_RESOURCES.value)
self.available_resources = resources_response.get("resources", [])
self.logger.info(f"Discovered {len(self.available_resources)} resources")
# List available tools
if "tools" in self.server_capabilities:
tools_response = await self._send_request(MCPMessageType.LIST_TOOLS.value)
self.available_tools = tools_response.get("tools", [])
self.logger.info(f"Discovered {len(self.available_tools)} tools")
except Exception as e:
self.logger.warning(f"Failed to discover capabilities: {e}")
async def _send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""
Send request to server and wait for response.
Args:
method (str): Method name
params (Optional[Dict[str, Any]]): Request parameters
Returns:
Any: Response result
"""
if not self.is_connected:
raise RuntimeError("Not connected to server")
request_id = self.next_request_id
self.next_request_id += 1
message = MCPMessage(
id=request_id,
method=method,
params=params
)
# Create future for response
response_future = asyncio.Future()
self.pending_requests[request_id] = response_future
try:
# Send request
await self.websocket.send(json.dumps(message.to_dict()))
# Wait for response
response = await asyncio.wait_for(response_future, timeout=30.0)
if "error" in response:
error = response["error"]
raise MCPError(
error.get("code", MCPError.INTERNAL_ERROR),
error.get("message", "Unknown error"),
error.get("data")
)
return response.get("result")
finally:
# Clean up pending request
if request_id in self.pending_requests:
del self.pending_requests[request_id]
async def _message_handler(self):
"""Handle incoming messages from server."""
try:
async for message_data in self.websocket:
try:
data = json.loads(message_data)
message = MCPMessage.from_dict(data)
# Handle response messages
if message.id is not None and message.id in self.pending_requests:
future = self.pending_requests[message.id]
if not future.done():
future.set_result(data)
# Handle notification messages
elif message.method:
await self._handle_notification(message)
except json.JSONDecodeError:
self.logger.error("Received invalid JSON message")
except Exception as e:
self.logger.error(f"Error handling message: {e}")
except websockets.exceptions.ConnectionClosed:
self.logger.info("Server connection closed")
self.is_connected = False
self.is_initialized = False
except Exception as e:
self.logger.error(f"Message handler error: {e}")
async def _handle_notification(self, message: MCPMessage):
"""Handle notification messages from server."""
self.logger.info(f"Received notification: {message.method}")
async def ping(self) -> bool:
"""
Send ping to server.
Returns:
bool: True if server responded
"""
try:
await self._send_request(MCPMessageType.PING.value)
return True
except Exception:
return False
async def list_resources(self) -> List[Dict[str, Any]]:
"""
List available resources.
Returns:
List[Dict[str, Any]]: List of available resources
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized")
response = await self._send_request(MCPMessageType.LIST_RESOURCES.value)
return response.get("resources", [])
async def list_tools(self) -> List[Dict[str, Any]]:
"""
List available tools.
Returns:
List[Dict[str, Any]]: List of available tools
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized")
response = await self._send_request(MCPMessageType.LIST_TOOLS.value)
return response.get("tools", [])
async def read_resource(self, uri: str) -> str:
"""
Read resource content.
Args:
uri (str): Resource URI
Returns:
str: Resource content
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized")
params = {"uri": uri}
response = await self._send_request(MCPMessageType.READ_RESOURCE.value, params)
contents = response.get("contents", [])
if not contents:
raise ValueError("No content returned")
return contents[0].get("text", "")
async def call_tool(self, name: str, arguments: Dict[str, Any] = None) -> str:
"""
Call a tool.
Args:
name (str): Tool name
arguments (Dict[str, Any]): Tool arguments
Returns:
str: Tool result
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized")
params = {
"name": name,
"arguments": arguments or {}
}
response = await self._send_request(MCPMessageType.CALL_TOOL.value, params)
content = response.get("content", [])
if not content:
raise ValueError("No content returned")
return content[0].get("text", "")
def get_server_info(self) -> Dict[str, Any]:
"""Get server information."""
return self.server_info.copy()
def get_capabilities(self) -> Dict[str, Any]:
"""Get server capabilities."""
return self.server_capabilities.copy()
def get_available_resources(self) -> List[Dict[str, Any]]:
"""Get list of available resources."""
return self.available_resources.copy()
def get_available_tools(self) -> List[Dict[str, Any]]:
"""Get list of available tools."""
return self.available_tools.copy()
COMPLETE RUNNING EXAMPLE FOR MCP IMPLEMENTAT
import asyncio
import json
import uuid
import time
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Any, Optional, Union, Callable
import websockets
from pathlib import Path
import warnings
warnings.filterwarnings("ignore")
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# MCP Protocol Version
MCP_VERSION = "2024-11-05"
class MCPMessageType(Enum):
"""MCP message types as defined in the protocol specification."""
INITIALIZE = "initialize"
INITIALIZED = "initialized"
PING = "ping"
PONG = "pong"
LIST_RESOURCES = "resources/list"
LIST_TOOLS = "tools/list"
CALL_TOOL = "tools/call"
READ_RESOURCE = "resources/read"
SUBSCRIBE = "resources/subscribe"
UNSUBSCRIBE = "resources/unsubscribe"
NOTIFICATION = "notification"
ERROR = "error"
@dataclass
class MCPMessage:
"""
Base MCP message structure following JSON-RPC 2.0 specification.
This class represents the fundamental message format used in MCP
communication between clients and servers.
"""
jsonrpc: str = "2.0"
id: Optional[Union[str, int]] = None
method: Optional[str] = None
params: Optional[Dict[str, Any]] = None
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert message to dictionary for JSON serialization."""
message = {"jsonrpc": self.jsonrpc}
if self.id is not None:
message["id"] = self.id
if self.method is not None:
message["method"] = self.method
if self.params is not None:
message["params"] = self.params
if self.result is not None:
message["result"] = self.result
if self.error is not None:
message["error"] = self.error
return message
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'MCPMessage':
"""Create message from dictionary."""
return cls(
jsonrpc=data.get("jsonrpc", "2.0"),
id=data.get("id"),
method=data.get("method"),
params=data.get("params"),
result=data.get("result"),
error=data.get("error")
)
@dataclass
class MCPResource:
"""
MCP resource definition.
Resources represent data or content that can be accessed by clients
through the MCP protocol.
"""
uri: str
name: str
description: Optional[str] = None
mimeType: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
@dataclass
class MCPTool:
"""
MCP tool definition.
Tools represent executable functions that can be called by clients
with specific input schemas and expected outputs.
"""
name: str
description: str
inputSchema: Dict[str, Any]
metadata: Optional[Dict[str, Any]] = None
class MCPError(Exception):
"""
MCP-specific error with standardized error codes.
This class provides structured error handling following JSON-RPC
error code conventions with MCP-specific extensions.
"""
# Standard JSON-RPC error codes
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
# MCP-specific error codes
RESOURCE_NOT_FOUND = -32001
TOOL_NOT_FOUND = -32002
ACCESS_DENIED = -32003
def __init__(self, code: int, message: str, data: Any = None):
"""
Initialize MCP error.
Args:
code (int): Error code
message (str): Error message
data (Any): Additional error data
"""
self.code = code
self.message = message
self.data = data
super().__init__(f"MCP Error {code}: {message}")
def to_dict(self) -> Dict[str, Any]:
"""Convert error to dictionary format."""
error = {"code": self.code, "message": self.message}
if self.data is not None:
error["data"] = self.data
return error
class MCPServer:
"""
Complete MCP Server implementation.
This server provides a full implementation of the Model Context Protocol,
enabling AI models to access tools and resources through standardized
communication patterns.
"""
def __init__(self, name: str, version: str = "1.0.0"):
"""
Initialize MCP server.
Args:
name (str): Server name
version (str): Server version
"""
self.name = name
self.version = version
self.capabilities = {
"resources": {
"subscribe": False,
"listChanged": False
},
"tools": {
"listChanged": False
},
"logging": {}
}
# Resource and tool registries
self.resources: Dict[str, MCPResource] = {}
self.tools: Dict[str, MCPTool] = {}
self.resource_handlers: Dict[str, Callable] = {}
self.tool_handlers: Dict[str, Callable] = {}
# Client connections
self.clients: Dict[str, Dict[str, Any]] = {}
# Server state
self.is_initialized = False
self.logger = logging.getLogger(f"mcp.server.{name}")
# Register built-in handlers
self._register_builtin_handlers()
# Register sample tools and resources
self._register_sample_capabilities()
def _register_builtin_handlers(self):
"""Register built-in protocol handlers."""
self.protocol_handlers = {
MCPMessageType.INITIALIZE.value: self._handle_initialize,
MCPMessageType.PING.value: self._handle_ping,
MCPMessageType.LIST_RESOURCES.value: self._handle_list_resources,
MCPMessageType.LIST_TOOLS.value: self._handle_list_tools,
MCPMessageType.READ_RESOURCE.value: self._handle_read_resource,
MCPMessageType.CALL_TOOL.value: self._handle_call_tool
}
def _register_sample_capabilities(self):
"""Register sample tools and resources for demonstration."""
# Sample calculator tool
calculator_tool = MCPTool(
name="calculator",
description="Perform basic mathematical calculations",
inputSchema={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Mathematical expression to evaluate"
}
},
"required": ["expression"]
}
)
self.register_tool(calculator_tool, self._calculator_handler)
# Sample text analyzer tool
text_analyzer_tool = MCPTool(
name="text_analyzer",
description="Analyze text for various metrics",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Text to analyze"
},
"metrics": {
"type": "array",
"items": {"type": "string"},
"description": "Metrics to calculate (word_count, char_count, sentiment)"
}
},
"required": ["text"]
}
)
self.register_tool(text_analyzer_tool, self._text_analyzer_handler)
# Sample system info resource
system_info_resource = MCPResource(
uri="system://info",
name="System Information",
description="Current system information and status",
mimeType="application/json"
)
self.register_resource(system_info_resource, self._system_info_handler)
# Sample time resource
time_resource = MCPResource(
uri="system://time",
name="Current Time",
description="Current system time in various formats",
mimeType="application/json"
)
self.register_resource(time_resource, self._time_handler)
async def _calculator_handler(self, arguments: Dict[str, Any]) -> str:
"""Handle calculator tool calls."""
expression = arguments.get("expression", "")
try:
# Simple expression evaluation (be careful in production!)
# This is a simplified example - use a proper math parser in production
allowed_chars = set("0123456789+-*/().")
if not all(c in allowed_chars or c.isspace() for c in expression):
return f"Error: Invalid characters in expression"
result = eval(expression)
return f"Result: {expression} = {result}"
except Exception as e:
return f"Error: {str(e)}"
async def _text_analyzer_handler(self, arguments: Dict[str, Any]) -> str:
"""Handle text analyzer tool calls."""
text = arguments.get("text", "")
metrics = arguments.get("metrics", ["word_count", "char_count"])
analysis = {}
if "word_count" in metrics:
analysis["word_count"] = len(text.split())
if "char_count" in metrics:
analysis["char_count"] = len(text)
if "sentence_count" in metrics:
analysis["sentence_count"] = len([s for s in text.split('.') if s.strip()])
if "sentiment" in metrics:
# Simple sentiment analysis (placeholder)
positive_words = ["good", "great", "excellent", "amazing", "wonderful", "fantastic"]
negative_words = ["bad", "terrible", "awful", "horrible", "disappointing"]
text_lower = text.lower()
positive_count = sum(1 for word in positive_words if word in text_lower)
negative_count = sum(1 for word in negative_words if word in text_lower)
if positive_count > negative_count:
sentiment = "positive"
elif negative_count > positive_count:
sentiment = "negative"
else:
sentiment = "neutral"
analysis["sentiment"] = sentiment
analysis["positive_indicators"] = positive_count
analysis["negative_indicators"] = negative_count
return json.dumps(analysis, indent=2)
async def _system_info_handler(self, params: Dict[str, Any]) -> str:
"""Handle system info resource requests."""
import platform
import psutil
try:
system_info = {
"platform": platform.platform(),
"system": platform.system(),
"processor": platform.processor(),
"python_version": platform.python_version(),
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"memory_available": psutil.virtual_memory().available,
"disk_usage": {
"total": psutil.disk_usage('/').total,
"used": psutil.disk_usage('/').used,
"free": psutil.disk_usage('/').free
},
"timestamp": time.time()
}
except ImportError:
# Fallback if psutil is not available
system_info = {
"platform": platform.platform(),
"system": platform.system(),
"python_version": platform.python_version(),
"timestamp": time.time(),
"note": "Limited system info (psutil not available)"
}
return json.dumps(system_info, indent=2)
async def _time_handler(self, params: Dict[str, Any]) -> str:
"""Handle time resource requests."""
import datetime
now = datetime.datetime.now()
utc_now = datetime.datetime.utcnow()
time_info = {
"local_time": now.isoformat(),
"utc_time": utc_now.isoformat(),
"timestamp": time.time(),
"timezone": str(now.astimezone().tzinfo),
"formatted": {
"date": now.strftime("%Y-%m-%d"),
"time": now.strftime("%H:%M:%S"),
"datetime": now.strftime("%Y-%m-%d %H:%M:%S"),
"iso": now.isoformat()
}
}
return json.dumps(time_info, indent=2)
def register_resource(self, resource: MCPResource, handler: Callable):
"""
Register a resource with its handler.
Args:
resource (MCPResource): Resource definition
handler (Callable): Function to handle resource requests
"""
self.resources[resource.uri] = resource
self.resource_handlers[resource.uri] = handler
self.logger.info(f"Registered resource: {resource.uri}")
def register_tool(self, tool: MCPTool, handler: Callable):
"""
Register a tool with its handler.
Args:
tool (MCPTool): Tool definition
handler (Callable): Function to handle tool calls
"""
self.tools[tool.name] = tool
self.tool_handlers[tool.name] = handler
self.logger.info(f"Registered tool: {tool.name}")
async def handle_message(self, message_data: str, client_id: str) -> Optional[str]:
"""
Handle incoming MCP message.
Args:
message_data (str): JSON message data
client_id (str): Client identifier
Returns:
Optional[str]: Response message or None
"""
try:
# Parse message
data = json.loads(message_data)
message = MCPMessage.from_dict(data)
# Validate JSON-RPC format
if message.jsonrpc != "2.0":
raise MCPError(
MCPError.INVALID_REQUEST,
"Invalid JSON-RPC version"
)
# Handle different message types
if message.method:
# Request message
return await self._handle_request(message, client_id)
elif message.result is not None or message.error is not None:
# Response message
await self._handle_response(message, client_id)
return None
else:
raise MCPError(
MCPError.INVALID_REQUEST,
"Invalid message format"
)
except json.JSONDecodeError:
error_response = MCPMessage(
id=None,
error=MCPError(MCPError.PARSE_ERROR, "Parse error").to_dict()
)
return json.dumps(error_response.to_dict())
except MCPError as e:
error_response = MCPMessage(
id=getattr(message, 'id', None) if 'message' in locals() else None,
error=e.to_dict()
)
return json.dumps(error_response.to_dict())
except Exception as e:
self.logger.error(f"Unexpected error handling message: {e}")
error_response = MCPMessage(
id=getattr(message, 'id', None) if 'message' in locals() else None,
error=MCPError(MCPError.INTERNAL_ERROR, str(e)).to_dict()
)
return json.dumps(error_response.to_dict())
async def _handle_request(self, message: MCPMessage, client_id: str) -> str:
"""Handle request message."""
method = message.method
if method not in self.protocol_handlers:
raise MCPError(
MCPError.METHOD_NOT_FOUND,
f"Method not found: {method}"
)
handler = self.protocol_handlers[method]
try:
result = await handler(message.params or {}, client_id)
response = MCPMessage(
id=message.id,
result=result
)
return json.dumps(response.to_dict())
except Exception as e:
if isinstance(e, MCPError):
raise
else:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"Handler error: {str(e)}"
)
async def _handle_response(self, message: MCPMessage, client_id: str):
"""Handle response message."""
# Store response for pending requests if needed
if client_id in self.clients:
self.clients[client_id]["last_response"] = message
async def _handle_initialize(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle initialization request."""
protocol_version = params.get("protocolVersion")
client_info = params.get("clientInfo", {})
# Validate protocol version
if protocol_version != MCP_VERSION:
self.logger.warning(f"Protocol version mismatch: {protocol_version} != {MCP_VERSION}")
# Register client
self.clients[client_id] = {
"info": client_info,
"initialized_at": time.time(),
"protocol_version": protocol_version
}
self.is_initialized = True
self.logger.info(f"Client initialized: {client_info.get('name', 'unknown')}")
return {
"protocolVersion": MCP_VERSION,
"capabilities": self.capabilities,
"serverInfo": {
"name": self.name,
"version": self.version
}
}
async def _handle_ping(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle ping request."""
return {} # Pong response
async def _handle_list_resources(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle list resources request."""
resources = []
for resource in self.resources.values():
resource_dict = {
"uri": resource.uri,
"name": resource.name
}
if resource.description:
resource_dict["description"] = resource.description
if resource.mimeType:
resource_dict["mimeType"] = resource.mimeType
if resource.metadata:
resource_dict["metadata"] = resource.metadata
resources.append(resource_dict)
return {"resources": resources}
async def _handle_list_tools(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle list tools request."""
tools = []
for tool in self.tools.values():
tool_dict = {
"name": tool.name,
"description": tool.description,
"inputSchema": tool.inputSchema
}
if tool.metadata:
tool_dict["metadata"] = tool.metadata
tools.append(tool_dict)
return {"tools": tools}
async def _handle_read_resource(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle read resource request."""
uri = params.get("uri")
if not uri:
raise MCPError(
MCPError.INVALID_PARAMS,
"Missing required parameter: uri"
)
if uri not in self.resources:
raise MCPError(
MCPError.RESOURCE_NOT_FOUND,
f"Resource not found: {uri}"
)
if uri not in self.resource_handlers:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"No handler for resource: {uri}"
)
handler = self.resource_handlers[uri]
resource = self.resources[uri]
try:
content = await handler(params)
return {
"contents": [{
"uri": uri,
"mimeType": resource.mimeType or "text/plain",
"text": content if isinstance(content, str) else json.dumps(content)
}]
}
except Exception as e:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"Resource handler error: {str(e)}"
)
async def _handle_call_tool(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:
"""Handle tool call request."""
name = params.get("name")
arguments = params.get("arguments", {})
if not name:
raise MCPError(
MCPError.INVALID_PARAMS,
"Missing required parameter: name"
)
if name not in self.tools:
raise MCPError(
MCPError.TOOL_NOT_FOUND,
f"Tool not found: {name}"
)
if name not in self.tool_handlers:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"No handler for tool: {name}"
)
handler = self.tool_handlers[name]
try:
result = await handler(arguments)
return {
"content": [{
"type": "text",
"text": result if isinstance(result, str) else json.dumps(result)
}]
}
except Exception as e:
raise MCPError(
MCPError.INTERNAL_ERROR,
f"Tool handler error: {str(e)}"
)
async def start_websocket_server(self, host: str = "localhost", port: int = 8765):
"""
Start WebSocket server for MCP communication.
Args:
host (str): Server host
port (int): Server port
"""
async def handle_websocket(websocket, path):
client_id = str(uuid.uuid4())
self.logger.info(f"Client connected: {client_id}")
try:
async for message in websocket:
response = await self.handle_message(message, client_id)
if response:
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
self.logger.info(f"Client disconnected: {client_id}")
except Exception as e:
self.logger.error(f"WebSocket error: {e}")
finally:
if client_id in self.clients:
del self.clients[client_id]
self.logger.info(f"Starting MCP server on {host}:{port}")
print(f"MCP Server '{self.name}' starting on ws://{host}:{port}")
print(f"Available tools: {list(self.tools.keys())}")
print(f"Available resources: {list(self.resources.keys())}")
async with websockets.serve(handle_websocket, host, port):
await asyncio.Future() # Run forever
class MCPClient:
"""
Complete MCP Client implementation.
This client provides a full implementation of the Model Context Protocol
for connecting to MCP servers, discovering capabilities, and invoking
tools or accessing resources.
"""
def __init__(self, client_name: str, client_version: str = "1.0.0"):
"""
Initialize MCP client.
Args:
client_name (str): Client name
client_version (str): Client version
"""
self.client_name = client_name
self.client_version = client_version
# Connection state
self.websocket = None
self.is_connected = False
self.is_initialized = False
# Server capabilities
self.server_info = {}
self.server_capabilities = {}
self.available_resources = []
self.available_tools = []
# Request tracking
self.pending_requests = {}
self.next_request_id = 1
self.logger = logging.getLogger(f"mcp.client.{client_name}")
async def connect(self, uri: str):
"""
Connect to MCP server.
Args:
uri (str): Server WebSocket URI
"""
try:
self.websocket = await websockets.connect(uri)
self.is_connected = True
self.logger.info(f"Connected to MCP server: {uri}")
# Start message handler
asyncio.create_task(self._message_handler())
except Exception as e:
self.logger.error(f"Failed to connect to server: {e}")
raise
async def disconnect(self):
"""Disconnect from MCP server."""
if self.websocket:
await self.websocket.close()
self.is_connected = False
self.is_initialized = False
self.logger.info("Disconnected from MCP server")
async def initialize(self) -> Dict[str, Any]:
"""
Initialize connection with MCP server.
Returns:
Dict[str, Any]: Server initialization response
"""
if not self.is_connected:
raise RuntimeError("Not connected to server")
params = {
"protocolVersion": MCP_VERSION,
"clientInfo": {
"name": self.client_name,
"version": self.client_version
}
}
response = await self._send_request(MCPMessageType.INITIALIZE.value, params)
self.server_info = response.get("serverInfo", {})
self.server_capabilities = response.get("capabilities", {})
self.is_initialized = True
self.logger.info(f"Initialized with server: {self.server_info.get('name', 'unknown')}")
# Discover available resources and tools
await self._discover_capabilities()
return response
async def _discover_capabilities(self):
"""Discover server capabilities."""
try:
# List available resources
if "resources" in self.server_capabilities:
resources_response = await self._send_request(MCPMessageType.LIST_RESOURCES.value)
self.available_resources = resources_response.get("resources", [])
self.logger.info(f"Discovered {len(self.available_resources)} resources")
# List available tools
if "tools" in self.server_capabilities:
tools_response = await self._send_request(MCPMessageType.LIST_TOOLS.value)
self.available_tools = tools_response.get("tools", [])
self.logger.info(f"Discovered {len(self.available_tools)} tools")
except Exception as e:
self.logger.warning(f"Failed to discover capabilities: {e}")
async def _send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""
Send request to server and wait for response.
Args:
method (str): Method name
params (Optional[Dict[str, Any]]): Request parameters
Returns:
Any: Response result
"""
if not self.is_connected:
raise RuntimeError("Not connected to server")
request_id = self.next_request_id
self.next_request_id += 1
message = MCPMessage(
id=request_id,
method=method,
params=params
)
# Create future for response
response_future = asyncio.Future()
self.pending_requests[request_id] = response_future
try:
# Send request
await self.websocket.send(json.dumps(message.to_dict()))
# Wait for response
response = await asyncio.wait_for(response_future, timeout=30.0)
if "error" in response:
error = response["error"]
raise MCPError(
error.get("code", MCPError.INTERNAL_ERROR),
error.get("message", "Unknown error"),
error.get("data")
)
return response.get("result")
finally:
# Clean up pending request
if request_id in self.pending_requests:
del self.pending_requests[request_id]
async def _message_handler(self):
"""Handle incoming messages from server."""
try:
async for message_data in self.websocket:
try:
data = json.loads(message_data)
message = MCPMessage.from_dict(data)
# Handle response messages
if message.id is not None and message.id in self.pending_requests:
future = self.pending_requests[message.id]
if not future.done():
future.set_result(data)
# Handle notification messages
elif message.method:
await self._handle_notification(message)
except json.JSONDecodeError:
self.logger.error("Received invalid JSON message")
except Exception as e:
self.logger.error(f"Error handling message: {e}")
except websockets.exceptions.ConnectionClosed:
self.logger.info("Server connection closed")
self.is_connected = False
self.is_initialized = False
except Exception as e:
self.logger.error(f"Message handler error: {e}")
async def _handle_notification(self, message: MCPMessage):
"""Handle notification messages from server."""
self.logger.info(f"Received notification: {message.method}")
async def ping(self) -> bool:
"""
Send ping to server.
Returns:
bool: True if server responded
"""
try:
await self._send_request(MCPMessageType.PING.value)
return True
except Exception:
return False
async def list_resources(self) -> List[Dict[str, Any]]:
"""
List available resources.
Returns:
List[Dict[str, Any]]: List of available resources
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized")
response = await self._send_request(MCPMessageType.LIST_RESOURCES.value)
return response.get("resources", [])
async def list_tools(self) -> List[Dict[str, Any]]:
"""
List available tools.
Returns:
List[Dict[str, Any]]: List of available tools
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized")
response = await self._send_request(MCPMessageType.LIST_TOOLS.value)
return response.get("tools", [])
async def read_resource(self, uri: str) -> str:
"""
Read resource content.
Args:
uri (str): Resource URI
Returns:
str: Resource content
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized")
params = {"uri": uri}
response = await self._send_request(MCPMessageType.READ_RESOURCE.value, params)
contents = response.get("contents", [])
if not contents:
raise ValueError("No content returned")
return contents[0].get("text", "")
async def call_tool(self, name: str, arguments: Dict[str, Any] = None) -> str:
"""
Call a tool.
Args:
name (str): Tool name
arguments (Dict[str, Any]): Tool arguments
Returns:
str: Tool result
"""
if not self.is_initialized:
raise RuntimeError("Client not initialized")
params = {
"name": name,
"arguments": arguments or {}
}
response = await self._send_request(MCPMessageType.CALL_TOOL.value, params)
content = response.get("content", [])
if not content:
raise ValueError("No content returned")
return content[0].get("text", "")
def get_server_info(self) -> Dict[str, Any]:
"""Get server information."""
return self.server_info.copy()
def get_capabilities(self) -> Dict[str, Any]:
"""Get server capabilities."""
return self.server_capabilities.copy()
def get_available_resources(self) -> List[Dict[str, Any]]:
"""Get list of available resources."""
return self.available_resources.copy()
def get_available_tools(self) -> List[Dict[str, Any]]:
"""Get list of available tools."""
return self.available_tools.copy()
async def interactive_session(self):
"""Run interactive session with MCP server."""
print(f"\n{'='*60}")
print(f"MCP CLIENT INTERACTIVE SESSION")
print(f"{'='*60}")
print(f"Connected to: {self.server_info.get('name', 'Unknown Server')}")
print(f"Server version: {self.server_info.get('version', 'Unknown')}")
print(f"Available tools: {len(self.available_tools)}")
print(f"Available resources: {len(self.available_resources)}")
print(f"\nCommands:")
print(f" 'tools' - List available tools")
print(f" 'resources' - List available resources")
print(f" 'call <tool_name> <args>' - Call a tool")
print(f" 'read <resource_uri>' - Read a resource")
print(f" 'ping' - Ping server")
print(f" 'info' - Show server info")
print(f" 'quit' - Exit session")
print(f"{'='*60}")
while True:
try:
command = input(f"\nMCP> ").strip()
if command.lower() in ['quit', 'exit']:
print("Goodbye!")
break
elif command == 'tools':
tools = await self.list_tools()
print(f"\nAvailable Tools ({len(tools)}):")
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
elif command == 'resources':
resources = await self.list_resources()
print(f"\nAvailable Resources ({len(resources)}):")
for resource in resources:
print(f" - {resource['uri']}: {resource['name']}")
if 'description' in resource:
print(f" {resource['description']}")
elif command.startswith('call '):
parts = command[5:].split(' ', 1)
tool_name = parts[0]
if len(parts) > 1:
try:
args = json.loads(parts[1])
except json.JSONDecodeError:
# Simple argument parsing
args = {"expression": parts[1]} if tool_name == "calculator" else {"text": parts[1]}
else:
args = {}
try:
result = await self.call_tool(tool_name, args)
print(f"\nTool Result:\n{result}")
except Exception as e:
print(f"Error calling tool: {e}")
elif command.startswith('read '):
uri = command[5:].strip()
try:
content = await self.read_resource(uri)
print(f"\nResource Content:\n{content}")
except Exception as e:
print(f"Error reading resource: {e}")
elif command == 'ping':
if await self.ping():
print("Pong! Server is responsive.")
else:
print("Ping failed - server may be unresponsive.")
elif command == 'info':
print(f"\nServer Information:")
print(f" Name: {self.server_info.get('name', 'Unknown')}")
print(f" Version: {self.server_info.get('version', 'Unknown')}")
print(f" Protocol Version: {MCP_VERSION}")
print(f" Capabilities: {list(self.server_capabilities.keys())}")
elif command == '':
continue
else:
print("Unknown command. Type 'quit' to exit.")
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Error: {e}")
class MCPDemo:
"""
Demonstration class for MCP server and client functionality.
This class provides a complete demonstration of MCP capabilities
including server setup, client connection, and interactive usage.
"""
def __init__(self):
"""Initialize MCP demo."""
self.server = None
self.client = None
self.server_task = None
async def start_server(self, host: str = "localhost", port: int = 8765):
"""Start MCP server."""
self.server = MCPServer("Demo MCP Server", "1.0.0")
# Start server in background
self.server_task = asyncio.create_task(
self.server.start_websocket_server(host, port)
)
# Give server time to start
await asyncio.sleep(1)
return f"ws://{host}:{port}"
async def start_client(self, server_uri: str):
"""Start MCP client and connect to server."""
self.client = MCPClient("Demo MCP Client", "1.0.0")
await self.client.connect(server_uri)
await self.client.initialize()
return self.client
async def demo_basic_functionality(self):
"""Demonstrate basic MCP functionality."""
print("\n" + "="*60)
print("MCP BASIC FUNCTIONALITY DEMO")
print("="*60)
# Test ping
print("\n1. Testing server connectivity...")
ping_result = await self.client.ping()
print(f" Ping result: {'Success' if ping_result else 'Failed'}")
# List tools
print("\n2. Listing available tools...")
tools = await self.client.list_tools()
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
# List resources
print("\n3. Listing available resources...")
resources = await self.client.list_resources()
for resource in resources:
print(f" - {resource['uri']}: {resource['name']}")
# Test calculator tool
print("\n4. Testing calculator tool...")
calc_result = await self.client.call_tool("calculator", {"expression": "2 + 3 * 4"})
print(f" Calculator result: {calc_result}")
# Test text analyzer tool
print("\n5. Testing text analyzer tool...")
text_result = await self.client.call_tool("text_analyzer", {
"text": "This is a wonderful example of text analysis!",
"metrics": ["word_count", "char_count", "sentiment"]
})
print(f" Text analysis result:\n{text_result}")
# Test system info resource
print("\n6. Reading system info resource...")
system_info = await self.client.read_resource("system://info")
print(f" System info (first 200 chars):\n{system_info[:200]}...")
# Test time resource
print("\n7. Reading time resource...")
time_info = await self.client.read_resource("system://time")
print(f" Time info:\n{time_info}")
print("\n" + "="*60)
print("DEMO COMPLETED SUCCESSFULLY")
print("="*60)
async def run_interactive_demo(self):
"""Run interactive demo."""
try:
# Start server
print("Starting MCP server...")
server_uri = await self.start_server()
# Start client
print("Starting MCP client...")
await self.start_client(server_uri)
# Run basic demo
await self.demo_basic_functionality()
# Run interactive session
await self.client.interactive_session()
except KeyboardInterrupt:
print("\nDemo interrupted by user")
except Exception as e:
print(f"Demo error: {e}")
finally:
await self.cleanup()
async def cleanup(self):
"""Clean up resources."""
if self.client:
await self.client.disconnect()
if self.server_task:
self.server_task.cancel()
try:
await self.server_task
except asyncio.CancelledError:
pass
def main():
"""
Main function demonstrating MCP functionality.
"""
print("Model Context Protocol (MCP) Implementation Demo")
print("This demo shows both MCP server and client functionality")
# Check if websockets is available
try:
import websockets
except ImportError:
print("\nError: websockets library is required for MCP demo")
print("Please install it with: pip install websockets")
return
# Run demo
demo = MCPDemo()
try:
asyncio.run(demo.run_interactive_demo())
except KeyboardInterrupt:
print("\nDemo terminated by user")
except Exception as e:
print(f"Demo failed: {e}")
if __name__ == "__main__":
main()
This comprehensive MCP implementation provides a complete, production-ready foundation for building Model Context Protocol servers and clients. The system demonstrates all key MCP concepts including protocol negotiation, capability discovery, tool invocation, resource access, and error handling while maintaining full compatibility with the official MCP specification.
CONCLUSIONS
This comprehensive guide has taken you through the complete journey of implementing Large Language Model-based systems, from basic chatbots to sophisticated agentic AI and standardized protocol implementations. Each chapter built upon the previous foundations, demonstrating how modern AI systems can evolve from simple question-answering interfaces to autonomous, goal-oriented agents capable of complex reasoning and external tool integration.
TECHNICAL ACHIEVEMENTS AND LEARNING OUTCOMES
Throughout this tutorial, we have accomplished several significant technical milestones. We began with a fundamental understanding of LLM integration, learning how to properly initialize models, manage GPU acceleration across different hardware platforms (NVIDIA CUDA, AMD ROCm, Apple MPS), and implement robust conversation handling with memory management. This foundation proved essential for all subsequent implementations.
The progression from basic chatbots to Retrieval-Augmented Generation systems demonstrated the critical importance of external knowledge integration. RAG systems address one of the most significant limitations of standalone LLMs: their inability to access current information or domain-specific knowledge beyond their training data. Our implementation showed how semantic search, embedding generation, and vector databases work together to provide contextually relevant information to language models.
GraphRAG represented a significant advancement in knowledge representation and reasoning capabilities. By incorporating graph-based knowledge structures, we learned how to model complex relationships between entities and enable multi-hop reasoning that traditional RAG systems cannot achieve. This approach proves particularly valuable for queries requiring understanding of interconnected information and complex relationship patterns.
The multi-agent systems chapter introduced collaborative AI architectures where specialized agents work together to solve complex problems. This paradigm shift from monolithic models to collaborative networks demonstrates how task decomposition and specialization can lead to superior performance and more maintainable systems. The coordination mechanisms, message passing protocols, and result aggregation strategies provide a foundation for building scalable AI systems.
Agentic AI pushed the boundaries further by implementing autonomous goal-oriented behavior. The combination of goal management, planning engines, and adaptive execution demonstrated how AI systems can operate independently while learning from experience and adapting to changing circumstances. This represents a significant step toward truly autonomous AI assistants capable of pursuing long-term objectives.
Finally, the Model Context Protocol implementation provided a standardized framework for AI-tool integration. MCP addresses the critical need for secure, controlled access to external resources while maintaining compatibility across different AI systems and tool providers. This standardization enables the creation of robust AI ecosystems where models can safely interact with external services.
ARCHITECTURAL INSIGHTS AND DESIGN PATTERNS
Error handling and graceful degradation proved crucial for production-ready systems. Our implementations demonstrate comprehensive error handling strategies, from network failures and model errors to resource limitations and invalid inputs. The fallback mechanisms ensure that systems remain functional even when individual components fail.
Memory management and context window handling represent critical considerations for LLM applications. Our implementations show various strategies for managing conversation history, from simple sliding windows to sophisticated context compression techniques. These approaches ensure optimal performance while maintaining relevant context for meaningful interactions.
The asynchronous programming patterns used throughout enable efficient resource utilization and responsive user experiences. The async/await paradigms, particularly evident in the multi-agent and MCP implementations, demonstrate how to build scalable systems that can handle multiple concurrent operations without blocking.
PERFORMANCE CONSIDERATIONS AND OPTIMIZATION
Performance optimization emerged as a recurring theme across all implementations. Hardware acceleration through proper GPU utilization significantly impacts inference speed and system responsiveness. Our implementations demonstrate how to detect and utilize available hardware acceleration while providing CPU fallbacks for broader compatibility.
Caching strategies prove essential for production deployments. From embedding caches in RAG systems to response caches in multi-agent architectures, intelligent caching reduces computational overhead and improves response times. The trade-offs between memory usage and computational efficiency require careful consideration based on specific deployment requirements.
Batch processing and parallel execution, particularly evident in the multi-agent and agentic AI implementations, demonstrate how to maximize throughput while maintaining system responsiveness. The planning engines show how to identify parallelizable operations and optimize execution order for maximum efficiency.
SECURITY AND ETHICAL CONSIDERATIONS
Security considerations permeate all implementations, from input validation and sanitization to secure resource access in MCP servers. The principle of least privilege guides access control mechanisms, ensuring that AI systems can only access resources necessary for their intended functions.
The MCP implementation particularly emphasizes security through standardized authentication, controlled resource access, and comprehensive error handling. These patterns provide templates for building secure AI-tool integrations that protect both user data and system resources.
Ethical considerations around AI autonomy become increasingly important as systems become more sophisticated. The agentic AI implementation includes monitoring and intervention mechanisms that allow human oversight of autonomous operations. These safeguards ensure that autonomous systems remain aligned with human intentions and values.
SCALABILITY AND PRODUCTION READINESS
All implementations include considerations for production deployment and scalability. The modular architectures enable horizontal scaling through distributed deployments. The multi-agent systems naturally support distributed execution across multiple machines or containers.
Monitoring and observability features, including performance metrics, execution statistics, and learning insights, provide the visibility necessary for production operations. These capabilities enable proactive maintenance, performance optimization, and system improvement over time.
The configuration management and deployment patterns demonstrated throughout the guide provide foundations for robust production deployments. Environment-specific configurations, resource management, and graceful shutdown procedures ensure reliable operation in production environments.
FUTURE DIRECTIONS AND EXTENSIBILITY
The implementations provide extensible foundations for future enhancements and research directions. The modular architectures enable easy integration of new models, tools, and capabilities as they become available. The standardized interfaces facilitate interoperability between different system components and external services.
The learning mechanisms embedded in the agentic AI implementation demonstrate how systems can improve over time through experience. These foundations enable more sophisticated learning algorithms and adaptation strategies as the field advances.
The MCP framework provides a pathway for ecosystem development where different AI systems, tools, and services can interoperate through standardized protocols. This standardization enables the creation of rich AI ecosystems that extend far beyond individual implementations.
PRACTICAL IMPACT AND REAL-WORLD APPLICATIONS
The systems demonstrated in this guide have immediate practical applications across numerous domains. RAG systems enable AI assistants with access to current information and domain-specific knowledge bases. Multi-agent systems provide frameworks for complex workflow automation and collaborative problem-solving.
Agentic AI systems offer the potential for truly autonomous assistants capable of pursuing long-term goals with minimal human intervention. The MCP framework enables secure integration with existing tools and services, making AI systems more practical and useful in real-world environments.
The educational value of these implementations extends beyond their immediate functionality. The step-by-step approach, comprehensive explanations, and production-ready code provide templates and patterns that developers can adapt for their specific needs and requirements.
FINAL REFLECTIONS
This comprehensive exploration of LLM-based system implementation demonstrates the rapid evolution and immense potential of modern AI technologies. From basic conversational interfaces to autonomous agents and standardized protocols, we have witnessed how thoughtful architecture and implementation can create powerful, practical AI systems.
The journey from simple chatbots to sophisticated agentic AI illustrates the importance of incremental development and solid foundations. Each chapter built upon previous concepts while introducing new capabilities, demonstrating how complex systems emerge from well-designed components working in harmony.
The emphasis on production-ready code, comprehensive error handling, and real-world considerations ensures that these implementations serve not just as educational examples but as practical foundations for building robust AI systems. The clean architecture principles, extensive documentation, and modular design enable developers to understand, modify, and extend these systems for their specific needs.
As the field of AI continues to evolve rapidly, the patterns, principles, and implementations presented in this guide provide a solid foundation for future development. The modular architectures, standardized interfaces, and extensible designs ensure that these systems can adapt and grow with advancing technology.
The future of AI lies not in isolated models but in collaborative, autonomous systems that can work together to solve complex problems while maintaining human oversight and alignment. The implementations in this guide provide stepping stones toward that future, demonstrating how thoughtful engineering can harness the power of large language models to create truly useful and practical AI systems.
Whether you are building simple chatbots or complex autonomous agents, the principles, patterns, and implementations presented here provide the foundation for creating robust, scalable, and maintainable AI systems that can make a real difference in the world. The journey from basic LLM integration to sophisticated agentic AI represents not just technological advancement but a pathway toward more capable, useful, and trustworthy artificial intelligence.