Friday, April 10, 2026

LEVERAGING LARGE LANGUAGE MODELS FOR YOUR OWN IMPLEMENTATIONS: A COMPLETE GUIDE FOR DUMMIES




INTRODUCTION

Large Language Models, commonly known as LLMs, represent one of the most significant breakthroughs in artificial intelligence. These models are neural networks trained on vast amounts of text data, enabling them to understand and generate human-like text responses. Think of an LLM as an extremely well-read assistant that has absorbed millions of books, articles, and conversations, allowing it to engage in meaningful dialogue on virtually any topic.


The beauty of modern LLMs lies in their versatility. Unlike traditional software that requires explicit programming for each task, LLMs can adapt to new challenges through natural language instructions called prompts. This flexibility has opened doors to countless applications, from simple chatbots to complex reasoning systems.


In this comprehensive guide, we will explore six increasingly sophisticated implementations of LLM-based systems. We start with a basic chatbot and progressively build more advanced architectures including Retrieval-Augmented Generation (RAG), Graph-based RAG, Multi-Agent Systems, Agentic AI, and the Model Context Protocol.


Before diving into implementations, let us establish the fundamental concepts. An LLM processes text input and generates text output based on patterns learned during training. The quality of responses depends heavily on the prompt engineering, which is the art of crafting effective instructions for the model. Context windows define how much previous conversation the model can remember, typically ranging from a few thousand to hundreds of thousands of tokens.


Hardware considerations play a crucial role in LLM deployment. Graphics Processing Units (GPUs) dramatically accelerate inference speed compared to Central Processing Units (CPUs). NVIDIA GPUs use CUDA cores, AMD GPUs leverage ROCm technology, and Apple Silicon employs Metal Performance Shaders (MPS). When GPU resources are unavailable, CPU-only execution remains viable, albeit slower.


CHAPTER A: IMPLEMENTING A BASIC LLM CHATBOT


A basic LLM chatbot represents the simplest form of human-computer interaction using large language models. This implementation focuses on establishing a conversation loop where users input messages and receive AI-generated responses. The fundamental architecture consists of three components: input processing, model inference, and output formatting.


The rationale behind starting with a basic chatbot lies in understanding core concepts without additional complexity. This foundation enables developers to grasp essential patterns like prompt construction, response handling, and conversation state management before advancing to more sophisticated architectures.


Setting up the development environment requires several key dependencies. Python serves as our primary programming language due to its extensive machine learning ecosystem. The transformers library from Hugging Face provides pre-trained models and inference utilities. PyTorch handles the underlying neural network operations, while additional libraries manage GPU acceleration and text processing.


Installation begins with creating a virtual environment to isolate dependencies:


python -m venv llm_chatbot_env

source llm_chatbot_env/bin/activate  

# On Windows: llm_chatbot_env\Scripts\activate

pip install torch transformers accelerate


The torch installation varies based on your hardware configuration. For NVIDIA GPUs with CUDA support, install the CUDA-enabled version. AMD GPU users should install the ROCm variant, while Apple Silicon users benefit from MPS optimization. CPU-only installations use the standard PyTorch distribution.


Model selection significantly impacts performance and resource requirements. Smaller models like GPT-2 or DistilBERT run efficiently on modest hardware but provide limited capabilities. Medium-sized models such as Llama-2-7B offer better performance while remaining accessible to consumer hardware. Large models like GPT-3.5 or Claude require substantial computational resources or API access.


For local deployment, we will use Microsoft's DialoGPT, a conversational model optimized for dialogue generation. This model balances performance with resource requirements, making it suitable for learning purposes.


The core chatbot implementation begins with importing necessary libraries and initializing the model:


from transformers import AutoModelForCausalLM, AutoTokenizer

import torch


class BasicChatbot:

    def __init__(self, model_name="microsoft/DialoGPT-medium"):

        self.device = self._get_optimal_device()

        self.tokenizer = AutoTokenizer.from_pretrained(model_name)

        self.model = AutoModelForCausalLM.from_pretrained(model_name)

        self.model.to(self.device)

        self.chat_history_ids = None

        

    def _get_optimal_device(self):

        if torch.cuda.is_available():

            return torch.device("cuda")

        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():

            return torch.device("mps")

        else:

            return torch.device("cpu")


The device selection logic automatically detects available hardware acceleration. CUDA takes priority for NVIDIA GPUs, followed by MPS for Apple Silicon, with CPU as the fallback option. This ensures optimal performance across different hardware configurations.


Conversation handling requires maintaining context between exchanges. The chat history stores previous interactions, enabling the model to generate contextually appropriate responses:


def generate_response(self, user_input):

    # Encode user input and append to chat history

    new_user_input_ids = self.tokenizer.encode(

        user_input + self.tokenizer.eos_token, 

        return_tensors='pt'

    ).to(self.device)

    

    # Append to existing chat history or initialize

    if self.chat_history_ids is not None:

        bot_input_ids = torch.cat([self.chat_history_ids, new_user_input_ids], dim=-1)

    else:

        bot_input_ids = new_user_input_ids

    

    # Generate response with controlled parameters

    self.chat_history_ids = self.model.generate(

        bot_input_ids,

        max_length=1000,

        num_beams=5,

        no_repeat_ngram_size=3,

        do_sample=True,

        temperature=0.7,

        pad_token_id=self.tokenizer.eos_token_id

    )

    

    # Extract and decode the bot's response

    response = self.tokenizer.decode(

        self.chat_history_ids[:, bot_input_ids.shape[-1]:][0], 

        skip_special_tokens=True

    )

    

    return response


The generation parameters control response quality and creativity. Temperature affects randomness, with lower values producing more focused responses and higher values increasing creativity. Beam search explores multiple response possibilities, selecting the most probable sequence. The no_repeat_ngram_size parameter prevents repetitive text generation.


Memory management becomes crucial for extended conversations. Long chat histories consume increasing amounts of memory and processing time. Implementing a sliding window approach maintains recent context while discarding older exchanges:


def manage_context_window(self, max_length=800):

    if self.chat_history_ids is not None and self.chat_history_ids.shape[-1] > max_length:

        # Keep only the most recent tokens

        self.chat_history_ids = self.chat_history_ids[:, -max_length:]


Error handling ensures robust operation under various conditions. Network interruptions, memory limitations, and invalid inputs require graceful handling to maintain user experience:


def safe_generate_response(self, user_input):

    try:

        if not user_input.strip():

            return "I didn't receive any input. Could you please say something?"

        

        response = self.generate_response(user_input)

        self.manage_context_window()

        

        if not response.strip():

            return "I'm having trouble generating a response. Could you try rephrasing?"

        

        return response

        

    except Exception as e:

        print(f"Error generating response: {e}")

        return "I encountered an error. Please try again."


The main conversation loop provides a user interface for interacting with the chatbot. This implementation includes conversation history, graceful exit handling, and user feedback:


def run_conversation(self):

    print("Chatbot initialized. Type 'quit' to exit.")

    print("=" * 50)

    

    while True:

        user_input = input("You: ").strip()

        

        if user_input.lower() in ['quit', 'exit', 'bye']:

            print("Chatbot: Goodbye! Thanks for chatting.")

            break

        

        if not user_input:

            continue

        

        print("Chatbot: ", end="", flush=True)

        response = self.safe_generate_response(user_input)

        print(response)

        print("-" * 30)


COMPLETE RUNNING EXAMPLE FOR BASIC CHATBOT:


from transformers import AutoModelForCausalLM, AutoTokenizer

import torch

import warnings

warnings.filterwarnings("ignore")


class BasicChatbot:

    """

    A basic LLM-powered chatbot implementation using DialoGPT.

    

    This chatbot maintains conversation context and provides natural

    language responses using a pre-trained conversational model.

    """

    

    def __init__(self, model_name="microsoft/DialoGPT-medium", max_context_length=800):

        """

        Initialize the chatbot with specified model and configuration.

        

        Args:

            model_name (str): Hugging Face model identifier

            max_context_length (int): Maximum tokens to maintain in context

        """

        print("Initializing chatbot...")

        self.max_context_length = max_context_length

        self.device = self._get_optimal_device()

        print(f"Using device: {self.device}")

        

        # Load tokenizer and model

        self.tokenizer = AutoTokenizer.from_pretrained(model_name)

        self.model = AutoModelForCausalLM.from_pretrained(model_name)

        

        # Set pad token if not present

        if self.tokenizer.pad_token is None:

            self.tokenizer.pad_token = self.tokenizer.eos_token

        

        # Move model to optimal device

        self.model.to(self.device)

        self.model.eval()  # Set to evaluation mode

        

        # Initialize conversation state

        self.chat_history_ids = None

        self.conversation_count = 0

        

        print("Chatbot ready!")

    

    def _get_optimal_device(self):

        """

        Determine the best available device for model inference.

        

        Returns:

            torch.device: Optimal device (CUDA, MPS, or CPU)

        """

        if torch.cuda.is_available():

            return torch.device("cuda")

        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():

            return torch.device("mps")

        else:

            return torch.device("cpu")

    

    def _manage_context_window(self):

        """

        Manage conversation context to prevent memory overflow.

        

        Trims older conversation history while preserving recent context.

        """

        if (self.chat_history_ids is not None and 

            self.chat_history_ids.shape[-1] > self.max_context_length):

            

            # Keep only the most recent tokens

            self.chat_history_ids = self.chat_history_ids[:, -self.max_context_length:]

    

    def generate_response(self, user_input):

        """

        Generate a response to user input using the LLM.

        

        Args:

            user_input (str): User's message

            

        Returns:

            str: Generated response from the chatbot

        """

        try:

            # Encode user input

            new_user_input_ids = self.tokenizer.encode(

                user_input + self.tokenizer.eos_token,

                return_tensors='pt'

            ).to(self.device)

            

            # Append to existing conversation or start new one

            if self.chat_history_ids is not None:

                bot_input_ids = torch.cat([self.chat_history_ids, new_user_input_ids], dim=-1)

            else:

                bot_input_ids = new_user_input_ids

            

            # Generate response with optimized parameters

            with torch.no_grad():

                self.chat_history_ids = self.model.generate(

                    bot_input_ids,

                    max_length=min(bot_input_ids.shape[-1] + 100, 1000),

                    num_beams=3,

                    no_repeat_ngram_size=3,

                    do_sample=True,

                    temperature=0.7,

                    top_p=0.9,

                    pad_token_id=self.tokenizer.eos_token_id,

                    early_stopping=True

                )

            

            # Extract bot response

            response = self.tokenizer.decode(

                self.chat_history_ids[:, bot_input_ids.shape[-1]:][0],

                skip_special_tokens=True

            )

            

            # Manage context window

            self._manage_context_window()

            self.conversation_count += 1

            

            return response.strip()

            

        except Exception as e:

            print(f"Error in response generation: {e}")

            return "I apologize, but I encountered an error processing your message."

    

    def reset_conversation(self):

        """Reset the conversation history to start fresh."""

        self.chat_history_ids = None

        self.conversation_count = 0

        print("Conversation history cleared.")

    

    def get_conversation_stats(self):

        """

        Get statistics about the current conversation.

        

        Returns:

            dict: Conversation statistics

        """

        context_length = 0

        if self.chat_history_ids is not None:

            context_length = self.chat_history_ids.shape[-1]

        

        return {

            'exchanges': self.conversation_count,

            'context_tokens': context_length,

            'device': str(self.device),

            'max_context': self.max_context_length

        }

    

    def run_interactive_session(self):

        """

        Run an interactive chat session with the user.

        

        Provides a command-line interface for chatting with the bot.

        """

        print("\n" + "=" * 60)

        print("BASIC LLM CHATBOT - Interactive Session")

        print("=" * 60)

        print("Commands:")

        print("  'quit' or 'exit' - End the conversation")

        print("  'reset' - Clear conversation history")

        print("  'stats' - Show conversation statistics")

        print("=" * 60)

        

        while True:

            try:

                user_input = input("\nYou: ").strip()

                

                # Handle special commands

                if user_input.lower() in ['quit', 'exit', 'bye']:

                    print("\nChatbot: Thank you for chatting! Goodbye!")

                    break

                

                elif user_input.lower() == 'reset':

                    self.reset_conversation()

                    continue

                

                elif user_input.lower() == 'stats':

                    stats = self.get_conversation_stats()

                    print(f"\nConversation Statistics:")

                    print(f"  Exchanges: {stats['exchanges']}")

                    print(f"  Context tokens: {stats['context_tokens']}")

                    print(f"  Device: {stats['device']}")

                    print(f"  Max context: {stats['max_context']}")

                    continue

                

                # Skip empty input

                if not user_input:

                    print("Please enter a message.")

                    continue

                

                # Generate and display response

                print("Chatbot: ", end="", flush=True)

                response = self.generate_response(user_input)

                print(response)

                

            except KeyboardInterrupt:

                print("\n\nChatbot: Session interrupted. Goodbye!")

                break

            except Exception as e:

                print(f"\nError: {e}")

                print("Please try again.")


def main():

    """

    Main function to demonstrate the basic chatbot functionality.

    """

    try:

        # Initialize chatbot

        chatbot = BasicChatbot()

        

        # Run interactive session

        chatbot.run_interactive_session()

        

    except Exception as e:

        print(f"Failed to initialize chatbot: {e}")

        print("Please check your installation and try again.")


if __name__ == "__main__":

    main()


This complete implementation provides a production-ready basic chatbot with proper error handling, memory management, and user interface. The code follows clean architecture principles with clear separation of concerns and comprehensive documentation.


CHAPTER B: IMPLEMENTING RAG (RETRIEVAL-AUGMENTED GENERATION) CHATBOT


Retrieval-Augmented Generation represents a significant advancement over basic chatbots by combining the generative capabilities of LLMs with external knowledge retrieval. RAG addresses a fundamental limitation of standalone LLMs: their knowledge cutoff and inability to access real-time or domain-specific information not present in their training data.


The RAG architecture consists of two primary components working in tandem. The retrieval component searches through a knowledge base to find relevant information based on user queries. The generation component then uses this retrieved context alongside the original query to produce informed, accurate responses. This approach dramatically improves factual accuracy and enables chatbots to work with specialized knowledge domains.


The rationale for implementing RAG stems from practical limitations of basic chatbots. While LLMs possess broad knowledge, they cannot access current events, proprietary documentation, or specialized databases. RAG bridges this gap by dynamically incorporating relevant external information into the generation process, creating more knowledgeable and useful conversational agents.


Setting up a RAG system requires additional dependencies beyond basic chatbot requirements. Vector databases store and search through document embeddings, while embedding models convert text into numerical representations suitable for similarity search. Popular choices include FAISS for vector operations, sentence-transformers for embedding generation, and various document processing libraries.


Installation extends our previous environment with RAG-specific packages:


pip install faiss-cpu sentence-transformers langchain pypdf python-docx


The choice between faiss-cpu and faiss-gpu depends on available hardware. GPU-accelerated FAISS significantly improves search performance for large knowledge bases, while CPU versions remain suitable for smaller datasets or development purposes.


Document processing forms the foundation of any RAG system. The knowledge base requires preprocessing to extract text, chunk documents into manageable segments, and generate embeddings for similarity search. Different document types require specialized handling approaches.


Text chunking strategy significantly impacts retrieval quality. Chunks must be large enough to contain meaningful context but small enough to avoid diluting relevant information. Overlapping chunks ensure important information spanning chunk boundaries remains accessible:


from langchain.text_splitter import RecursiveCharacterTextSplitter

from sentence_transformers import SentenceTransformer

import faiss

import numpy as np

import pickle

import os


class DocumentProcessor:

    def __init__(self, chunk_size=500, chunk_overlap=50):

        self.chunk_size = chunk_size

        self.chunk_overlap = chunk_overlap

        self.text_splitter = RecursiveCharacterTextSplitter(

            chunk_size=chunk_size,

            chunk_overlap=chunk_overlap,

            separators=["\n\n", "\n", " ", ""]

        )

    

    def process_text(self, text, source="unknown"):

        chunks = self.text_splitter.split_text(text)

        return [{"content": chunk, "source": source, "chunk_id": i} 

                for i, chunk in enumerate(chunks)]


The RecursiveCharacterTextSplitter intelligently splits text at natural boundaries like paragraphs and sentences, preserving semantic coherence within chunks. The separator hierarchy ensures optimal splitting points while maintaining readability.


Embedding generation converts text chunks into vector representations suitable for similarity search. Modern embedding models like sentence-transformers provide high-quality representations that capture semantic meaning beyond simple keyword matching:


class EmbeddingManager:

    def __init__(self, model_name="all-MiniLM-L6-v2"):

        self.model = SentenceTransformer(model_name)

        self.dimension = self.model.get_sentence_embedding_dimension()

    

    def generate_embeddings(self, texts):

        embeddings = self.model.encode(texts, show_progress_bar=True)

        return embeddings.astype('float32')

    

    def encode_query(self, query):

        return self.model.encode([query]).astype('float32')


The all-MiniLM-L6-v2 model provides an excellent balance between quality and performance for most applications. Larger models like all-mpnet-base-v2 offer improved accuracy at the cost of increased computational requirements.


Vector database implementation enables efficient similarity search across large document collections. FAISS provides optimized algorithms for nearest neighbor search, supporting both exact and approximate search methods:


class VectorDatabase:

    def __init__(self, dimension):

        self.dimension = dimension

        self.index = faiss.IndexFlatIP(dimension)  # Inner product for cosine similarity

        self.documents = []

        self.embeddings = None

    

    def add_documents(self, documents, embeddings):

        # Normalize embeddings for cosine similarity

        faiss.normalize_L2(embeddings)

        

        self.index.add(embeddings)

        self.documents.extend(documents)

        

        if self.embeddings is None:

            self.embeddings = embeddings

        else:

            self.embeddings = np.vstack([self.embeddings, embeddings])

    

    def search(self, query_embedding, k=5):

        faiss.normalize_L2(query_embedding)

        scores, indices = self.index.search(query_embedding, k)

        

        results = []

        for score, idx in zip(scores[0], indices[0]):

            if idx < len(self.documents):

                results.append({

                    "document": self.documents[idx],

                    "score": float(score),

                    "embedding": self.embeddings[idx]

                })

        

        return results


The IndexFlatIP provides exact search results using inner product similarity, which approximates cosine similarity when embeddings are normalized. For larger datasets, approximate indices like IndexIVFFlat offer faster search at the cost of slight accuracy reduction.


Knowledge base construction combines document processing, embedding generation, and vector database operations into a cohesive workflow. This process typically runs offline to prepare the retrieval system:


class KnowledgeBase:

    def __init__(self, embedding_model="all-MiniLM-L6-v2"):

        self.processor = DocumentProcessor()

        self.embedding_manager = EmbeddingManager(embedding_model)

        self.vector_db = VectorDatabase(self.embedding_manager.dimension)

        self.is_built = False

    

    def add_text_documents(self, texts, sources=None):

        if sources is None:

            sources = [f"document_{i}" for i in range(len(texts))]

        

        all_chunks = []

        for text, source in zip(texts, sources):

            chunks = self.processor.process_text(text, source)

            all_chunks.extend(chunks)

        

        if all_chunks:

            chunk_texts = [chunk["content"] for chunk in all_chunks]

            embeddings = self.embedding_manager.generate_embeddings(chunk_texts)

            self.vector_db.add_documents(all_chunks, embeddings)

            self.is_built = True

    

    def search_relevant_context(self, query, k=3):

        if not self.is_built:

            return []

        

        query_embedding = self.embedding_manager.encode_query(query)

        results = self.vector_db.search(query_embedding, k)

        

        return [result["document"]["content"] for result in results]


The RAG chatbot integrates retrieval and generation components to produce contextually informed responses. The retrieval step finds relevant documents, while the generation step incorporates this context into the LLM prompt:


from transformers import AutoModelForCausalLM, AutoTokenizer

import torch


class RAGChatbot:

    def __init__(self, llm_model="microsoft/DialoGPT-medium", embedding_model="all-MiniLM-L6-v2"):

        # Initialize LLM components

        self.device = self._get_optimal_device()

        self.tokenizer = AutoTokenizer.from_pretrained(llm_model)

        self.model = AutoModelForCausalLM.from_pretrained(llm_model)

        self.model.to(self.device)

        

        if self.tokenizer.pad_token is None:

            self.tokenizer.pad_token = self.tokenizer.eos_token

        

        # Initialize RAG components

        self.knowledge_base = KnowledgeBase(embedding_model)

        self.chat_history = []

        self.max_context_length = 800

    

    def _get_optimal_device(self):

        if torch.cuda.is_available():

            return torch.device("cuda")

        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():

            return torch.device("mps")

        else:

            return torch.device("cpu")

    

    def build_knowledge_base(self, documents, sources=None):

        print("Building knowledge base...")

        self.knowledge_base.add_text_documents(documents, sources)

        print("Knowledge base ready!")

    

    def _construct_rag_prompt(self, user_input, retrieved_context):

        context_text = "\n".join(retrieved_context)

        

        prompt = f"""Based on the following context information, please provide a helpful response to the user's question.


Context:

{context_text}


User Question: {user_input}


Response:"""

        return prompt

    

    def generate_response(self, user_input, use_rag=True):

        try:

            if use_rag and self.knowledge_base.is_built:

                # Retrieve relevant context

                retrieved_context = self.knowledge_base.search_relevant_context(user_input, k=3)

                

                if retrieved_context:

                    # Construct RAG prompt

                    prompt = self._construct_rag_prompt(user_input, retrieved_context)

                else:

                    prompt = user_input

            else:

                prompt = user_input

            

            # Generate response

            inputs = self.tokenizer.encode(prompt, return_tensors='pt', truncate=True, max_length=512)

            inputs = inputs.to(self.device)

            

            with torch.no_grad():

                outputs = self.model.generate(

                    inputs,

                    max_length=inputs.shape[1] + 150,

                    num_beams=3,

                    no_repeat_ngram_size=3,

                    do_sample=True,

                    temperature=0.7,

                    top_p=0.9,

                    pad_token_id=self.tokenizer.eos_token_id,

                    early_stopping=True

                )

            

            response = self.tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)

            return response.strip()

            

        except Exception as e:

            print(f"Error generating response: {e}")

            return "I apologize, but I encountered an error processing your request."


COMPLETE RUNNING EXAMPLE FOR RAG CHATBOT:


import os

import pickle

import numpy as np

import torch

import faiss

from sentence_transformers import SentenceTransformer

from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

from langchain.text_splitter import RecursiveCharacterTextSplitter

import warnings

warnings.filterwarnings("ignore")


class DocumentProcessor:

    """

    Handles document processing and text chunking for RAG systems.

    

    This class splits documents into manageable chunks while preserving

    semantic coherence and maintaining source attribution.

    """

    

    def __init__(self, chunk_size=500, chunk_overlap=50):

        """

        Initialize document processor with chunking parameters.

        

        Args:

            chunk_size (int): Maximum characters per chunk

            chunk_overlap (int): Overlap between consecutive chunks

        """

        self.chunk_size = chunk_size

        self.chunk_overlap = chunk_overlap

        self.text_splitter = RecursiveCharacterTextSplitter(

            chunk_size=chunk_size,

            chunk_overlap=chunk_overlap,

            separators=["\n\n", "\n", ". ", " ", ""]

        )

    

    def process_text(self, text, source="unknown"):

        """

        Process text into chunks with metadata.

        

        Args:

            text (str): Input text to process

            source (str): Source identifier for the text

            

        Returns:

            list: List of chunk dictionaries with content and metadata

        """

        if not text or not text.strip():

            return []

        

        chunks = self.text_splitter.split_text(text.strip())

        return [

            {

                "content": chunk.strip(),

                "source": source,

                "chunk_id": i,

                "length": len(chunk)

            }

            for i, chunk in enumerate(chunks) if chunk.strip()

        ]

    

    def process_multiple_documents(self, documents):

        """

        Process multiple documents with automatic source naming.

        

        Args:

            documents (list): List of text documents or (text, source) tuples

            

        Returns:

            list: Combined list of all chunks from all documents

        """

        all_chunks = []

        

        for i, doc in enumerate(documents):

            if isinstance(doc, tuple):

                text, source = doc

            else:

                text = doc

                source = f"document_{i}"

            

            chunks = self.process_text(text, source)

            all_chunks.extend(chunks)

        

        return all_chunks


class EmbeddingManager:

    """

    Manages text embedding generation using sentence transformers.

    

    This class handles the conversion of text into vector representations

    suitable for similarity search and retrieval operations.

    """

    

    def __init__(self, model_name="all-MiniLM-L6-v2"):

        """

        Initialize embedding manager with specified model.

        

        Args:

            model_name (str): Hugging Face model identifier for embeddings

        """

        print(f"Loading embedding model: {model_name}")

        self.model = SentenceTransformer(model_name)

        self.dimension = self.model.get_sentence_embedding_dimension()

        self.model_name = model_name

        print(f"Embedding dimension: {self.dimension}")

    

    def generate_embeddings(self, texts, batch_size=32):

        """

        Generate embeddings for a list of texts.

        

        Args:

            texts (list): List of text strings to embed

            batch_size (int): Batch size for processing

            

        Returns:

            numpy.ndarray: Array of embeddings

        """

        if not texts:

            return np.array([]).reshape(0, self.dimension)

        

        embeddings = self.model.encode(

            texts,

            batch_size=batch_size,

            show_progress_bar=len(texts) > 10,

            convert_to_numpy=True

        )

        return embeddings.astype('float32')

    

    def encode_query(self, query):

        """

        Encode a single query string.

        

        Args:

            query (str): Query text to encode

            

        Returns:

            numpy.ndarray: Query embedding

        """

        return self.model.encode([query], convert_to_numpy=True).astype('float32')


class VectorDatabase:

    """

    Vector database for efficient similarity search using FAISS.

    

    This class provides storage and retrieval capabilities for document

    embeddings with support for various similarity metrics.

    """

    

    def __init__(self, dimension, metric="cosine"):

        """

        Initialize vector database with specified parameters.

        

        Args:

            dimension (int): Embedding dimension

            metric (str): Similarity metric ("cosine" or "euclidean")

        """

        self.dimension = dimension

        self.metric = metric

        

        # Choose appropriate FAISS index based on metric

        if metric == "cosine":

            self.index = faiss.IndexFlatIP(dimension)  # Inner product for cosine similarity

        else:

            self.index = faiss.IndexFlatL2(dimension)  # L2 distance for euclidean

        

        self.documents = []

        self.embeddings = None

        self.total_documents = 0

    

    def add_documents(self, documents, embeddings):

        """

        Add documents and their embeddings to the database.

        

        Args:

            documents (list): List of document dictionaries

            embeddings (numpy.ndarray): Corresponding embeddings

        """

        if len(documents) != len(embeddings):

            raise ValueError("Number of documents must match number of embeddings")

        

        # Normalize embeddings for cosine similarity

        if self.metric == "cosine":

            faiss.normalize_L2(embeddings)

        

        # Add to FAISS index

        self.index.add(embeddings)

        

        # Store documents and embeddings

        self.documents.extend(documents)

        

        if self.embeddings is None:

            self.embeddings = embeddings.copy()

        else:

            self.embeddings = np.vstack([self.embeddings, embeddings])

        

        self.total_documents += len(documents)

        print(f"Added {len(documents)} documents. Total: {self.total_documents}")

    

    def search(self, query_embedding, k=5, score_threshold=0.0):

        """

        Search for similar documents.

        

        Args:

            query_embedding (numpy.ndarray): Query embedding

            k (int): Number of results to return

            score_threshold (float): Minimum similarity score

            

        Returns:

            list: List of search results with documents and scores

        """

        if self.total_documents == 0:

            return []

        

        # Normalize query for cosine similarity

        if self.metric == "cosine":

            faiss.normalize_L2(query_embedding)

        

        # Perform search

        scores, indices = self.index.search(query_embedding, min(k, self.total_documents))

        

        results = []

        for score, idx in zip(scores[0], indices[0]):

            if idx >= 0 and idx < len(self.documents) and score >= score_threshold:

                results.append({

                    "document": self.documents[idx],

                    "score": float(score),

                    "index": int(idx)

                })

        

        return results

    

    def save(self, filepath):

        """Save the vector database to disk."""

        data = {

            'documents': self.documents,

            'embeddings': self.embeddings,

            'dimension': self.dimension,

            'metric': self.metric,

            'total_documents': self.total_documents

        }

        

        with open(filepath, 'wb') as f:

            pickle.dump(data, f)

        

        # Save FAISS index separately

        faiss.write_index(self.index, filepath + '.faiss')

    

    def load(self, filepath):

        """Load the vector database from disk."""

        with open(filepath, 'rb') as f:

            data = pickle.load(f)

        

        self.documents = data['documents']

        self.embeddings = data['embeddings']

        self.dimension = data['dimension']

        self.metric = data['metric']

        self.total_documents = data['total_documents']

        

        # Load FAISS index

        self.index = faiss.read_index(filepath + '.faiss')


class KnowledgeBase:

    """

    Complete knowledge base implementation for RAG systems.

    

    This class combines document processing, embedding generation,

    and vector search into a unified interface.

    """

    

    def __init__(self, embedding_model="all-MiniLM-L6-v2", chunk_size=500):

        """

        Initialize knowledge base with specified parameters.

        

        Args:

            embedding_model (str): Model name for embeddings

            chunk_size (int): Size of text chunks

        """

        self.processor = DocumentProcessor(chunk_size=chunk_size)

        self.embedding_manager = EmbeddingManager(embedding_model)

        self.vector_db = VectorDatabase(self.embedding_manager.dimension)

        self.is_built = False

        self.document_count = 0

    

    def add_documents(self, documents, sources=None):

        """

        Add documents to the knowledge base.

        

        Args:

            documents (list): List of text documents

            sources (list): Optional list of source identifiers

        """

        if not documents:

            print("No documents provided")

            return

        

        print(f"Processing {len(documents)} documents...")

        

        # Prepare documents with sources

        doc_list = []

        for i, doc in enumerate(documents):

            if sources and i < len(sources):

                source = sources[i]

            else:

                source = f"document_{self.document_count + i}"

            doc_list.append((doc, source))

        

        # Process documents into chunks

        all_chunks = self.processor.process_multiple_documents(doc_list)

        

        if not all_chunks:

            print("No valid chunks generated from documents")

            return

        

        print(f"Generated {len(all_chunks)} chunks")

        

        # Generate embeddings

        chunk_texts = [chunk["content"] for chunk in all_chunks]

        embeddings = self.embedding_manager.generate_embeddings(chunk_texts)

        

        # Add to vector database

        self.vector_db.add_documents(all_chunks, embeddings)

        

        self.document_count += len(documents)

        self.is_built = True

        print("Documents added successfully!")

    

    def search_relevant_context(self, query, k=3, score_threshold=0.1):

        """

        Search for relevant context given a query.

        

        Args:

            query (str): Search query

            k (int): Number of results to return

            score_threshold (float): Minimum similarity score

            

        Returns:

            list: List of relevant text chunks

        """

        if not self.is_built:

            return []

        

        query_embedding = self.embedding_manager.encode_query(query)

        results = self.vector_db.search(query_embedding, k, score_threshold)

        

        return [

            {

                "content": result["document"]["content"],

                "source": result["document"]["source"],

                "score": result["score"]

            }

            for result in results

        ]

    

    def get_stats(self):

        """Get knowledge base statistics."""

        return {

            "total_documents": self.document_count,

            "total_chunks": self.vector_db.total_documents,

            "embedding_dimension": self.embedding_manager.dimension,

            "is_built": self.is_built

        }

    

    def save(self, filepath):

        """Save knowledge base to disk."""

        if self.is_built:

            self.vector_db.save(filepath)

            print(f"Knowledge base saved to {filepath}")

        else:

            print("Knowledge base not built yet")

    

    def load(self, filepath):

        """Load knowledge base from disk."""

        try:

            self.vector_db.load(filepath)

            self.is_built = True

            print(f"Knowledge base loaded from {filepath}")

        except Exception as e:

            print(f"Failed to load knowledge base: {e}")


class RAGChatbot:

    """

    Complete RAG chatbot implementation combining retrieval and generation.

    

    This chatbot uses retrieval-augmented generation to provide informed

    responses based on a knowledge base of documents.

    """

    

    def __init__(self, llm_model="microsoft/DialoGPT-medium", embedding_model="all-MiniLM-L6-v2"):

        """

        Initialize RAG chatbot with specified models.

        

        Args:

            llm_model (str): Language model for generation

            embedding_model (str): Model for embeddings

        """

        print("Initializing RAG Chatbot...")

        

        # Initialize device

        self.device = self._get_optimal_device()

        print(f"Using device: {self.device}")

        

        # Initialize LLM components

        print(f"Loading language model: {llm_model}")

        self.tokenizer = AutoTokenizer.from_pretrained(llm_model)

        self.model = AutoModelForCausalLM.from_pretrained(llm_model)

        

        if self.tokenizer.pad_token is None:

            self.tokenizer.pad_token = self.tokenizer.eos_token

        

        self.model.to(self.device)

        self.model.eval()

        

        # Initialize knowledge base

        self.knowledge_base = KnowledgeBase(embedding_model)

        

        # Conversation state

        self.conversation_history = []

        self.max_context_length = 800

        

        print("RAG Chatbot ready!")

    

    def _get_optimal_device(self):

        """Determine optimal device for inference."""

        if torch.cuda.is_available():

            return torch.device("cuda")

        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():

            return torch.device("mps")

        else:

            return torch.device("cpu")

    

    def build_knowledge_base(self, documents, sources=None):

        """

        Build the knowledge base from documents.

        

        Args:

            documents (list): List of text documents

            sources (list): Optional source identifiers

        """

        self.knowledge_base.add_documents(documents, sources)

    

    def _construct_rag_prompt(self, user_input, retrieved_context):

        """

        Construct a prompt incorporating retrieved context.

        

        Args:

            user_input (str): User's question

            retrieved_context (list): Retrieved context chunks

            

        Returns:

            str: Formatted prompt for the LLM

        """

        if not retrieved_context:

            return user_input

        

        context_text = "\n\n".join([

            f"Source: {ctx['source']}\nContent: {ctx['content']}"

            for ctx in retrieved_context

        ])

        

        prompt = f"""Context Information:

{context_text}


Based on the above context, please answer the following question. If the context doesn't contain relevant information, please say so.


Question: {user_input}


Answer:"""

        

        return prompt

    

    def generate_response(self, user_input, use_rag=True, k=3):

        """

        Generate response using RAG or fallback to basic generation.

        

        Args:

            user_input (str): User's input

            use_rag (bool): Whether to use RAG

            k (int): Number of context chunks to retrieve

            

        Returns:

            dict: Response with content and metadata

        """

        try:

            retrieved_context = []

            

            if use_rag and self.knowledge_base.is_built:

                # Retrieve relevant context

                context_results = self.knowledge_base.search_relevant_context(user_input, k)

                

                if context_results:

                    retrieved_context = context_results

                    prompt = self._construct_rag_prompt(user_input, retrieved_context)

                else:

                    prompt = user_input

            else:

                prompt = user_input

            

            # Generate response

            inputs = self.tokenizer.encode(

                prompt,

                return_tensors='pt',

                truncation=True,

                max_length=512

            )

            inputs = inputs.to(self.device)

            

            with torch.no_grad():

                outputs = self.model.generate(

                    inputs,

                    max_length=inputs.shape[1] + 150,

                    num_beams=3,

                    no_repeat_ngram_size=3,

                    do_sample=True,

                    temperature=0.7,

                    top_p=0.9,

                    pad_token_id=self.tokenizer.eos_token_id,

                    early_stopping=True

                )

            

            response = self.tokenizer.decode(

                outputs[0][inputs.shape[1]:],

                skip_special_tokens=True

            ).strip()

            

            # Store conversation

            self.conversation_history.append({

                "user": user_input,

                "assistant": response,

                "used_rag": use_rag and bool(retrieved_context),

                "context_sources": [ctx["source"] for ctx in retrieved_context] if retrieved_context else []

            })

            

            return {

                "response": response,

                "used_rag": use_rag and bool(retrieved_context),

                "retrieved_context": retrieved_context,

                "sources": list(set([ctx["source"] for ctx in retrieved_context])) if retrieved_context else []

            }

            

        except Exception as e:

            print(f"Error generating response: {e}")

            return {

                "response": "I apologize, but I encountered an error processing your request.",

                "used_rag": False,

                "retrieved_context": [],

                "sources": []

            }

    

    def get_conversation_stats(self):

        """Get conversation statistics."""

        kb_stats = self.knowledge_base.get_stats()

        rag_usage = sum(1 for conv in self.conversation_history if conv.get("used_rag", False))

        

        return {

            "total_exchanges": len(self.conversation_history),

            "rag_responses": rag_usage,

            "knowledge_base": kb_stats,

            "device": str(self.device)

        }

    

    def reset_conversation(self):

        """Reset conversation history."""

        self.conversation_history = []

        print("Conversation history cleared.")

    

    def run_interactive_session(self):

        """Run interactive chat session."""

        print("\n" + "=" * 70)

        print("RAG CHATBOT - Interactive Session")

        print("=" * 70)

        print("Commands:")

        print("  'quit' or 'exit' - End conversation")

        print("  'reset' - Clear conversation history")

        print("  'stats' - Show conversation statistics")

        print("  'toggle_rag' - Toggle RAG on/off")

        print("  'sources' - Show sources from last response")

        print("=" * 70)

        

        use_rag = True

        last_response_data = None

        

        while True:

            try:

                user_input = input(f"\nYou: ").strip()

                

                if user_input.lower() in ['quit', 'exit', 'bye']:

                    print("\nChatbot: Thank you for using RAG Chatbot! Goodbye!")

                    break

                

                elif user_input.lower() == 'reset':

                    self.reset_conversation()

                    continue

                

                elif user_input.lower() == 'stats':

                    stats = self.get_conversation_stats()

                    print(f"\nConversation Statistics:")

                    print(f"  Total exchanges: {stats['total_exchanges']}")

                    print(f"  RAG responses: {stats['rag_responses']}")

                    print(f"  Knowledge base documents: {stats['knowledge_base']['total_documents']}")

                    print(f"  Knowledge base chunks: {stats['knowledge_base']['total_chunks']}")

                    print(f"  Device: {stats['device']}")

                    continue

                

                elif user_input.lower() == 'toggle_rag':

                    use_rag = not use_rag

                    status = "enabled" if use_rag else "disabled"

                    print(f"\nRAG {status}")

                    continue

                

                elif user_input.lower() == 'sources':

                    if last_response_data and last_response_data.get("sources"):

                        print(f"\nSources from last response:")

                        for source in last_response_data["sources"]:

                            print(f"  - {source}")

                    else:

                        print("\nNo sources available from last response")

                    continue

                

                if not user_input:

                    continue

                

                # Generate response

                rag_indicator = "[RAG]" if use_rag and self.knowledge_base.is_built else "[Direct]"

                print(f"Chatbot {rag_indicator}: ", end="", flush=True)

                

                last_response_data = self.generate_response(user_input, use_rag)

                print(last_response_data["response"])

                

                # Show sources if RAG was used

                if last_response_data["used_rag"] and last_response_data["sources"]:

                    print(f"Sources: {', '.join(last_response_data['sources'])}")

                

            except KeyboardInterrupt:

                print("\n\nSession interrupted. Goodbye!")

                break

            except Exception as e:

                print(f"\nError: {e}")


def create_sample_knowledge_base():

    """Create a sample knowledge base for demonstration."""

    sample_documents = [

        """

        Artificial Intelligence (AI) is a branch of computer science that aims to create 

        intelligent machines that work and react like humans. AI systems can perform tasks 

        that typically require human intelligence, such as visual perception, speech 

        recognition, decision-making, and language translation. Machine learning is a 

        subset of AI that enables computers to learn and improve from experience without 

        being explicitly programmed.

        """,

        """

        Machine Learning (ML) is a method of data analysis that automates analytical model 

        building. It is based on the idea that systems can learn from data, identify patterns, 

        and make decisions with minimal human intervention. There are three main types of 

        machine learning: supervised learning, unsupervised learning, and reinforcement learning. 

        Supervised learning uses labeled training data, unsupervised learning finds hidden 

        patterns in data without labels, and reinforcement learning learns through interaction 

        with an environment.

        """,

        """

        Deep Learning is a subset of machine learning that uses artificial neural networks 

        with multiple layers (hence "deep") to model and understand complex patterns in data. 

        These neural networks are inspired by the structure and function of the human brain. 

        Deep learning has been particularly successful in areas such as image recognition, 

        natural language processing, and speech recognition. Popular deep learning frameworks 

        include TensorFlow, PyTorch, and Keras.

        """,

        """

        Natural Language Processing (NLP) is a field of AI that focuses on the interaction 

        between computers and human language. NLP combines computational linguistics with 

        statistical, machine learning, and deep learning models to enable computers to process 

        and analyze large amounts of natural language data. Applications of NLP include 

        language translation, sentiment analysis, chatbots, and text summarization.

        """,

        """

        Computer Vision is a field of AI that trains computers to interpret and understand 

        the visual world. Using digital images from cameras and videos and deep learning 

        models, machines can accurately identify and classify objects and then react to 

        what they "see." Computer vision applications include facial recognition, medical 

        image analysis, autonomous vehicles, and quality control in manufacturing.

        """

    ]

    

    sources = [

        "AI_Overview",

        "Machine_Learning_Guide",

        "Deep_Learning_Fundamentals",

        "NLP_Introduction",

        "Computer_Vision_Basics"

    ]

    

    return sample_documents, sources


def main():

    """

    Main function demonstrating RAG chatbot functionality.

    """

    try:

        # Create RAG chatbot

        chatbot = RAGChatbot()

        

        # Create sample knowledge base

        print("\nCreating sample knowledge base...")

        documents, sources = create_sample_knowledge_base()

        chatbot.build_knowledge_base(documents, sources)

        

        # Show knowledge base stats

        stats = chatbot.knowledge_base.get_stats()

        print(f"Knowledge base created with {stats['total_documents']} documents and {stats['total_chunks']} chunks")

        

        # Run interactive session

        chatbot.run_interactive_session()

        

    except Exception as e:

        print(f"Failed to initialize RAG chatbot: {e}")

        print("Please check your installation and try again.")


if __name__ == "__main__":

    main()


This comprehensive RAG implementation provides production-ready functionality with proper error handling, memory management, and a complete user interface. The system combines document processing, embedding generation, vector search, and language model generation into a cohesive conversational AI system.


CHAPTER C: IMPLEMENTING GRAPHRAG CHATBOT


GraphRAG represents an advanced evolution of traditional RAG systems by incorporating graph-based knowledge representation and reasoning capabilities. While standard RAG retrieves relevant documents based on semantic similarity, GraphRAG leverages the structural relationships between entities, concepts, and facts to provide more comprehensive and contextually aware responses.


The fundamental innovation of GraphRAG lies in its ability to understand and utilize the interconnected nature of knowledge. Traditional RAG systems treat documents as isolated chunks of information, potentially missing important relationships that span multiple documents or require multi-hop reasoning. GraphRAG addresses this limitation by constructing knowledge graphs that explicitly model entities, relationships, and hierarchical structures within the knowledge base.


The rationale for implementing GraphRAG stems from real-world information needs that require understanding complex relationships and dependencies. Consider queries about organizational structures, causal relationships, or multi-step processes. GraphRAG excels in these scenarios by traversing graph structures to gather comprehensive context that includes not just directly relevant information, but also related entities and their interconnections.


GraphRAG architecture combines several sophisticated components working in harmony. Entity extraction identifies and normalizes entities within documents. Relationship extraction discovers connections between entities. Graph construction builds a structured representation of knowledge. Graph traversal algorithms explore relevant subgraphs based on queries. Finally, context aggregation combines graph-derived information with traditional retrieval methods for response generation.


Setting up GraphRAG requires additional dependencies beyond standard RAG implementations. Named Entity Recognition (NER) models identify entities in text. Relation extraction models discover relationships between entities. Graph databases store and query the knowledge graph efficiently. Popular choices include spaCy for NER, specialized transformer models for relation extraction, and NetworkX or Neo4j for graph operations.


Installation extends our environment with graph-specific packages:


pip install spacy networkx matplotlib plotly pandas

python -m spacy download en_core_web_sm


For production deployments, consider more robust graph databases like Neo4j or Amazon Neptune, which provide advanced querying capabilities and better performance for large-scale knowledge graphs.


Entity extraction forms the foundation of knowledge graph construction. Modern NER models can identify various entity types including persons, organizations, locations, dates, and domain-specific entities. The quality of entity extraction directly impacts the richness and utility of the resulting knowledge graph:


import spacy

import networkx as nx

from collections import defaultdict

import re

from typing import List, Dict, Tuple, Set


class EntityExtractor:

    def __init__(self, model_name="en_core_web_sm"):

        self.nlp = spacy.load(model_name)

        # Add custom entity types if needed

        self.entity_types = {

            "PERSON", "ORG", "GPE", "PRODUCT", "EVENT", 

            "WORK_OF_ART", "LAW", "LANGUAGE", "DATE", "TIME",

            "PERCENT", "MONEY", "QUANTITY", "ORDINAL", "CARDINAL"

        }

    

    def extract_entities(self, text, source="unknown"):

        doc = self.nlp(text)

        entities = []

        

        for ent in doc.ents:

            if ent.label_ in self.entity_types:

                entities.append({

                    "text": ent.text.strip(),

                    "label": ent.label_,

                    "start": ent.start_char,

                    "end": ent.end_char,

                    "source": source,

                    "normalized": self._normalize_entity(ent.text, ent.label_)

                })

        

        return entities

    

    def _normalize_entity(self, text, label):

        # Basic normalization - can be enhanced with more sophisticated methods

        normalized = text.strip().lower()

        

        if label in ["PERSON", "ORG", "GPE"]:

            # Keep original case for proper nouns

            normalized = text.strip()

        elif label in ["DATE", "TIME"]:

            # Normalize dates and times

            normalized = re.sub(r'\s+', ' ', normalized)

        

        return normalized


Relationship extraction discovers connections between entities within text. This process can range from simple co-occurrence patterns to sophisticated neural models trained specifically for relation extraction. For our implementation, we will use a combination of dependency parsing and pattern matching:


class RelationExtractor:

    def __init__(self, nlp_model):

        self.nlp = nlp_model

        # Define relationship patterns

        self.relation_patterns = {

            "WORKS_FOR": ["works for", "employed by", "employee of"],

            "LOCATED_IN": ["located in", "based in", "situated in"],

            "PART_OF": ["part of", "member of", "belongs to"],

            "FOUNDED": ["founded", "established", "created"],

            "ACQUIRED": ["acquired", "bought", "purchased"],

            "COLLABORATES_WITH": ["collaborates with", "partners with", "works with"]

        }

    

    def extract_relations(self, text, entities, source="unknown"):

        doc = self.nlp(text)

        relations = []

        

        # Create entity position mapping

        entity_positions = {}

        for entity in entities:

            for i in range(entity["start"], entity["end"]):

                entity_positions[i] = entity

        

        # Extract relations using dependency parsing

        relations.extend(self._extract_dependency_relations(doc, entities, source))

        

        # Extract relations using pattern matching

        relations.extend(self._extract_pattern_relations(text, entities, source))

        

        return relations

    

    def _extract_dependency_relations(self, doc, entities, source):

        relations = []

        entity_map = {ent["normalized"]: ent for ent in entities}

        

        for token in doc:

            if token.dep_ in ["nsubj", "dobj", "pobj"]:

                head = token.head

                

                # Look for entities in subject and object positions

                subj_ent = self._find_entity_for_token(token, entity_map)

                obj_ent = self._find_entity_for_token(head, entity_map)

                

                if subj_ent and obj_ent and subj_ent != obj_ent:

                    relations.append({

                        "subject": subj_ent["normalized"],

                        "predicate": head.lemma_,

                        "object": obj_ent["normalized"],

                        "confidence": 0.7,

                        "source": source,

                        "extraction_method": "dependency"

                    })

        

        return relations

    

    def _extract_pattern_relations(self, text, entities, source):

        relations = []

        text_lower = text.lower()

        

        for relation_type, patterns in self.relation_patterns.items():

            for pattern in patterns:

                for match in re.finditer(re.escape(pattern), text_lower):

                    start, end = match.span()

                    

                    # Find entities before and after the pattern

                    before_entities = [e for e in entities if e["end"] < start]

                    after_entities = [e for e in entities if e["start"] > end]

                    

                    if before_entities and after_entities:

                        subj = before_entities[-1]  # Closest entity before

                        obj = after_entities[0]     # Closest entity after

                        

                        relations.append({

                            "subject": subj["normalized"],

                            "predicate": relation_type,

                            "object": obj["normalized"],

                            "confidence": 0.8,

                            "source": source,

                            "extraction_method": "pattern"

                        })

        

        return relations

    

    def _find_entity_for_token(self, token, entity_map):

        # Simple matching - can be improved with better alignment

        for entity_text, entity in entity_map.items():

            if token.text.lower() in entity_text.lower():

                return entity

        return None


Knowledge graph construction combines extracted entities and relationships into a structured representation suitable for traversal and querying. NetworkX provides an excellent foundation for graph operations while remaining lightweight and easy to use:


class KnowledgeGraph:

    def __init__(self):

        self.graph = nx.MultiDiGraph()

        self.entity_index = {}

        self.relation_index = defaultdict(list)

        self.source_mapping = {}

    

    def add_entities(self, entities):

        for entity in entities:

            entity_id = entity["normalized"]

            

            if entity_id not in self.graph:

                self.graph.add_node(entity_id, **entity)

                self.entity_index[entity_id] = entity

            else:

                # Merge entity information

                existing = self.graph.nodes[entity_id]

                if "sources" not in existing:

                    existing["sources"] = set()

                existing["sources"].add(entity["source"])

    

    def add_relations(self, relations):

        for relation in relations:

            subj = relation["subject"]

            obj = relation["object"]

            pred = relation["predicate"]

            

            # Ensure entities exist in graph

            if subj not in self.graph:

                self.graph.add_node(subj)

            if obj not in self.graph:

                self.graph.add_node(obj)

            

            # Add edge with relation information

            self.graph.add_edge(subj, obj, **relation)

            

            # Update relation index

            self.relation_index[pred].append((subj, obj, relation))

    

    def find_related_entities(self, entity, max_hops=2, relation_types=None):

        if entity not in self.graph:

            return []

        

        related = set()

        current_level = {entity}

        

        for hop in range(max_hops):

            next_level = set()

            

            for node in current_level:

                # Get neighbors

                neighbors = set(self.graph.neighbors(node))

                neighbors.update(self.graph.predecessors(node))

                

                # Filter by relation types if specified

                if relation_types:

                    filtered_neighbors = set()

                    for neighbor in neighbors:

                        for edge_data in self.graph[node][neighbor].values():

                            if edge_data.get("predicate") in relation_types:

                                filtered_neighbors.add(neighbor)

                    neighbors = filtered_neighbors

                

                next_level.update(neighbors)

                related.update(neighbors)

            

            current_level = next_level - related

            if not current_level:

                break

        

        return list(related)

    

    def get_entity_context(self, entity, include_relations=True):

        if entity not in self.graph:

            return {}

        

        context = {

            "entity": self.graph.nodes[entity],

            "direct_relations": [],

            "related_entities": []

        }

        

        if include_relations:

            # Get outgoing relations

            for neighbor in self.graph.neighbors(entity):

                for edge_data in self.graph[entity][neighbor].values():

                    context["direct_relations"].append({

                        "type": "outgoing",

                        "target": neighbor,

                        "relation": edge_data

                    })

            

            # Get incoming relations

            for predecessor in self.graph.predecessors(entity):

                for edge_data in self.graph[predecessor][entity].values():

                    context["direct_relations"].append({

                        "type": "incoming",

                        "source": predecessor,

                        "relation": edge_data

                    })

            

            # Get related entities

            context["related_entities"] = self.find_related_entities(entity, max_hops=2)

        

        return context

    

    def query_subgraph(self, entities, max_hops=1):

        if not entities:

            return nx.MultiDiGraph()

        

        # Find all related entities

        all_entities = set(entities)

        for entity in entities:

            if entity in self.graph:

                related = self.find_related_entities(entity, max_hops)

                all_entities.update(related)

        

        # Create subgraph

        subgraph = self.graph.subgraph(all_entities).copy()

        return subgraph

    

    def get_statistics(self):

        return {

            "nodes": self.graph.number_of_nodes(),

            "edges": self.graph.number_of_edges(),

            "connected_components": nx.number_weakly_connected_components(self.graph),

            "average_degree": sum(dict(self.graph.degree()).values()) / self.graph.number_of_nodes() if self.graph.number_of_nodes() > 0 else 0

        }


GraphRAG query processing combines traditional semantic search with graph traversal to gather comprehensive context. This approach ensures that responses incorporate not just directly relevant information, but also related entities and their interconnections:


class GraphRAGRetriever:

    def __init__(self, knowledge_base, knowledge_graph):

        self.knowledge_base = knowledge_base

        self.knowledge_graph = knowledge_graph

        self.entity_extractor = EntityExtractor()

    

    def retrieve_context(self, query, k=3, graph_hops=2):

        # Extract entities from query

        query_entities = self.entity_extractor.extract_entities(query)

        entity_names = [ent["normalized"] for ent in query_entities]

        

        # Traditional RAG retrieval

        traditional_context = self.knowledge_base.search_relevant_context(query, k)

        

        # Graph-based retrieval

        graph_context = []

        if entity_names:

            # Find related entities through graph traversal

            all_related_entities = set()

            for entity in entity_names:

                if entity in self.knowledge_graph.graph:

                    related = self.knowledge_graph.find_related_entities(entity, graph_hops)

                    all_related_entities.update(related)

                    all_related_entities.add(entity)

            

            # Get context for related entities

            for entity in all_related_entities:

                entity_context = self.knowledge_graph.get_entity_context(entity)

                if entity_context:

                    graph_context.append(entity_context)

        

        return {

            "traditional_context": traditional_context,

            "graph_context": graph_context,

            "query_entities": entity_names,

            "related_entities": list(all_related_entities) if entity_names else []

        }

    

    def format_context_for_llm(self, context_data):

        formatted_context = []

        

        # Add traditional context

        if context_data["traditional_context"]:

            formatted_context.append("DOCUMENT CONTEXT:")

            for i, ctx in enumerate(context_data["traditional_context"]):

                formatted_context.append(f"{i+1}. {ctx['content']}")

        

        # Add graph context

        if context_data["graph_context"]:

            formatted_context.append("\nRELATED ENTITIES AND RELATIONSHIPS:")

            for entity_ctx in context_data["graph_context"]:

                entity_name = entity_ctx["entity"].get("normalized", "Unknown")

                formatted_context.append(f"\nEntity: {entity_name}")

                

                if entity_ctx["direct_relations"]:

                    formatted_context.append("Relationships:")

                    for rel in entity_ctx["direct_relations"][:3]:  # Limit to avoid overwhelming

                        if rel["type"] == "outgoing":

                            formatted_context.append(f"  - {entity_name} {rel['relation']['predicate']} {rel['target']}")

                        else:

                            formatted_context.append(f"  - {rel['source']} {rel['relation']['predicate']} {entity_name}")

        

        return "\n".join(formatted_context)


COMPLETE RUNNING EXAMPLE FOR GRAPHRAG CHATBOT:


import spacy

import networkx as nx

from collections import defaultdict

import re

import torch

import numpy as np

from sentence_transformers import SentenceTransformer

from transformers import AutoModelForCausalLM, AutoTokenizer

from langchain.text_splitter import RecursiveCharacterTextSplitter

import warnings

warnings.filterwarnings("ignore")


class EntityExtractor:

    """

    Advanced entity extraction using spaCy with custom normalization.

    

    This class identifies and normalizes entities from text, providing

    the foundation for knowledge graph construction.

    """

    

    def __init__(self, model_name="en_core_web_sm"):

        """

        Initialize entity extractor with spaCy model.

        

        Args:

            model_name (str): spaCy model name

        """

        try:

            self.nlp = spacy.load(model_name)

        except OSError:

            print(f"spaCy model {model_name} not found. Please install it with:")

            print(f"python -m spacy download {model_name}")

            raise

        

        # Define entity types to extract

        self.entity_types = {

            "PERSON", "ORG", "GPE", "PRODUCT", "EVENT", 

            "WORK_OF_ART", "LAW", "LANGUAGE", "DATE", "TIME",

            "PERCENT", "MONEY", "QUANTITY", "ORDINAL", "CARDINAL"

        }

        

        # Entity normalization patterns

        self.normalization_patterns = {

            "PERSON": lambda x: x.strip().title(),

            "ORG": lambda x: x.strip().title(),

            "GPE": lambda x: x.strip().title(),

            "PRODUCT": lambda x: x.strip().title(),

        }

    

    def extract_entities(self, text, source="unknown", min_length=2):

        """

        Extract entities from text with metadata.

        

        Args:

            text (str): Input text

            source (str): Source identifier

            min_length (int): Minimum entity length

            

        Returns:

            list: List of entity dictionaries

        """

        if not text or not text.strip():

            return []

        

        doc = self.nlp(text)

        entities = []

        seen_entities = set()

        

        for ent in doc.ents:

            if (ent.label_ in self.entity_types and 

                len(ent.text.strip()) >= min_length and

                not ent.text.strip().isdigit()):

                

                normalized = self._normalize_entity(ent.text, ent.label_)

                

                # Avoid duplicates

                if normalized.lower() not in seen_entities:

                    entities.append({

                        "text": ent.text.strip(),

                        "label": ent.label_,

                        "start": ent.start_char,

                        "end": ent.end_char,

                        "source": source,

                        "normalized": normalized,

                        "confidence": 1.0  # spaCy doesn't provide confidence scores

                    })

                    seen_entities.add(normalized.lower())

        

        return entities

    

    def _normalize_entity(self, text, label):

        """

        Normalize entity text based on its type.

        

        Args:

            text (str): Entity text

            label (str): Entity label

            

        Returns:

            str: Normalized entity text

        """

        text = text.strip()

        

        if label in self.normalization_patterns:

            return self.normalization_patterns[label](text)

        else:

            # Default normalization

            return re.sub(r'\s+', ' ', text).strip()


class RelationExtractor:

    """

    Relation extraction using dependency parsing and pattern matching.

    

    This class discovers relationships between entities using both

    linguistic patterns and dependency parse trees.

    """

    

    def __init__(self, nlp_model):

        """

        Initialize relation extractor.

        

        Args:

            nlp_model: spaCy language model

        """

        self.nlp = nlp_model

        

        # Define relationship patterns with confidence scores

        self.relation_patterns = {

            "WORKS_FOR": {

                "patterns": ["works for", "employed by", "employee of", "works at"],

                "confidence": 0.8

            },

            "LOCATED_IN": {

                "patterns": ["located in", "based in", "situated in", "headquarters in"],

                "confidence": 0.8

            },

            "PART_OF": {

                "patterns": ["part of", "member of", "belongs to", "division of"],

                "confidence": 0.7

            },

            "FOUNDED": {

                "patterns": ["founded", "established", "created", "started"],

                "confidence": 0.9

            },

            "ACQUIRED": {

                "patterns": ["acquired", "bought", "purchased", "merged with"],

                "confidence": 0.9

            },

            "COLLABORATES_WITH": {

                "patterns": ["collaborates with", "partners with", "works with", "alliance with"],

                "confidence": 0.7

            },

            "CEO_OF": {

                "patterns": ["CEO of", "chief executive of", "president of"],

                "confidence": 0.9

            }

        }

    

    def extract_relations(self, text, entities, source="unknown"):

        """

        Extract relations from text given entities.

        

        Args:

            text (str): Input text

            entities (list): List of entities

            source (str): Source identifier

            

        Returns:

            list: List of relation dictionaries

        """

        if not text or not entities:

            return []

        

        doc = self.nlp(text)

        relations = []

        

        # Extract relations using pattern matching

        relations.extend(self._extract_pattern_relations(text, entities, source))

        

        # Extract relations using dependency parsing

        relations.extend(self._extract_dependency_relations(doc, entities, source))

        

        # Remove duplicates

        unique_relations = []

        seen_relations = set()

        

        for rel in relations:

            rel_key = (rel["subject"], rel["predicate"], rel["object"])

            if rel_key not in seen_relations:

                unique_relations.append(rel)

                seen_relations.add(rel_key)

        

        return unique_relations

    

    def _extract_pattern_relations(self, text, entities, source):

        """Extract relations using predefined patterns."""

        relations = []

        text_lower = text.lower()

        

        # Create entity position mapping

        entity_positions = []

        for entity in entities:

            entity_positions.append({

                "start": entity["start"],

                "end": entity["end"],

                "entity": entity

            })

        entity_positions.sort(key=lambda x: x["start"])

        

        for relation_type, relation_info in self.relation_patterns.items():

            patterns = relation_info["patterns"]

            confidence = relation_info["confidence"]

            

            for pattern in patterns:

                for match in re.finditer(re.escape(pattern), text_lower):

                    start, end = match.span()

                    

                    # Find entities before and after the pattern

                    before_entities = [e for e in entity_positions if e["end"] <= start]

                    after_entities = [e for e in entity_positions if e["start"] >= end]

                    

                    if before_entities and after_entities:

                        # Take the closest entities

                        subj_entity = before_entities[-1]["entity"]

                        obj_entity = after_entities[0]["entity"]

                        

                        if subj_entity["normalized"] != obj_entity["normalized"]:

                            relations.append({

                                "subject": subj_entity["normalized"],

                                "predicate": relation_type,

                                "object": obj_entity["normalized"],

                                "confidence": confidence,

                                "source": source,

                                "extraction_method": "pattern",

                                "pattern_used": pattern

                            })

        

        return relations

    

    def _extract_dependency_relations(self, doc, entities, source):

        """Extract relations using dependency parsing."""

        relations = []

        

        # Create mapping from tokens to entities

        token_to_entity = {}

        for entity in entities:

            # Find tokens that overlap with entity span

            for token in doc:

                if (token.idx >= entity["start"] and 

                    token.idx + len(token.text) <= entity["end"]):

                    token_to_entity[token.i] = entity

        

        # Look for subject-verb-object patterns

        for token in doc:

            if token.pos_ == "VERB":

                # Find subject

                subjects = [child for child in token.children if child.dep_ == "nsubj"]

                # Find objects

                objects = [child for child in token.children if child.dep_ in ["dobj", "pobj"]]

                

                for subj in subjects:

                    for obj in objects:

                        subj_entity = self._find_entity_for_token(subj, token_to_entity, entities)

                        obj_entity = self._find_entity_for_token(obj, token_to_entity, entities)

                        

                        if (subj_entity and obj_entity and 

                            subj_entity["normalized"] != obj_entity["normalized"]):

                            relations.append({

                                "subject": subj_entity["normalized"],

                                "predicate": token.lemma_.upper(),

                                "object": obj_entity["normalized"],

                                "confidence": 0.6,

                                "source": source,

                                "extraction_method": "dependency",

                                "verb": token.text

                            })

        

        return relations

    

    def _find_entity_for_token(self, token, token_to_entity, entities):

        """Find entity associated with a token."""

        # Direct mapping

        if token.i in token_to_entity:

            return token_to_entity[token.i]

        

        # Check if token is part of any entity by text matching

        token_text = token.text.lower()

        for entity in entities:

            if token_text in entity["normalized"].lower():

                return entity

        

        return None


class KnowledgeGraph:

    """

    Knowledge graph implementation using NetworkX.

    

    This class provides storage and querying capabilities for entities

    and their relationships in a graph structure.

    """

    

    def __init__(self):

        """Initialize empty knowledge graph."""

        self.graph = nx.MultiDiGraph()

        self.entity_index = {}

        self.relation_index = defaultdict(list)

        self.source_mapping = defaultdict(set)

        self.statistics = {"entities_added": 0, "relations_added": 0}

    

    def add_entities(self, entities):

        """

        Add entities to the knowledge graph.

        

        Args:

            entities (list): List of entity dictionaries

        """

        for entity in entities:

            entity_id = entity["normalized"]

            

            if entity_id not in self.graph:

                # Add new entity

                self.graph.add_node(entity_id, **entity)

                self.entity_index[entity_id] = entity

                self.statistics["entities_added"] += 1

            else:

                # Update existing entity

                existing = self.graph.nodes[entity_id]

                if "sources" not in existing:

                    existing["sources"] = set()

                existing["sources"].add(entity["source"])

                

                # Merge additional attributes

                for key, value in entity.items():

                    if key not in existing:

                        existing[key] = value

            

            # Update source mapping

            self.source_mapping[entity["source"]].add(entity_id)

    

    def add_relations(self, relations):

        """

        Add relations to the knowledge graph.

        

        Args:

            relations (list): List of relation dictionaries

        """

        for relation in relations:

            subj = relation["subject"]

            obj = relation["object"]

            pred = relation["predicate"]

            

            # Ensure entities exist in graph

            if subj not in self.graph:

                self.graph.add_node(subj, normalized=subj)

            if obj not in self.graph:

                self.graph.add_node(obj, normalized=obj)

            

            # Add edge with relation information

            self.graph.add_edge(subj, obj, **relation)

            

            # Update relation index

            self.relation_index[pred].append((subj, obj, relation))

            self.statistics["relations_added"] += 1

    

    def find_related_entities(self, entity, max_hops=2, relation_types=None):

        """

        Find entities related to a given entity through graph traversal.

        

        Args:

            entity (str): Starting entity

            max_hops (int): Maximum number of hops

            relation_types (list): Filter by relation types

            

        Returns:

            list: List of related entities

        """

        if entity not in self.graph:

            return []

        

        related = set()

        current_level = {entity}

        visited = {entity}

        

        for hop in range(max_hops):

            next_level = set()

            

            for node in current_level:

                # Get outgoing neighbors

                for neighbor in self.graph.neighbors(node):

                    if neighbor not in visited:

                        # Check relation type filter

                        if relation_types:

                            for edge_data in self.graph[node][neighbor].values():

                                if edge_data.get("predicate") in relation_types:

                                    next_level.add(neighbor)

                                    break

                        else:

                            next_level.add(neighbor)

                

                # Get incoming neighbors

                for predecessor in self.graph.predecessors(node):

                    if predecessor not in visited:

                        # Check relation type filter

                        if relation_types:

                            for edge_data in self.graph[predecessor][node].values():

                                if edge_data.get("predicate") in relation_types:

                                    next_level.add(predecessor)

                                    break

                        else:

                            next_level.add(predecessor)

            

            related.update(next_level)

            visited.update(next_level)

            current_level = next_level

            

            if not current_level:

                break

        

        return list(related)

    

    def get_entity_context(self, entity, include_relations=True, max_relations=10):

        """

        Get comprehensive context for an entity.

        

        Args:

            entity (str): Entity to get context for

            include_relations (bool): Include relationship information

            max_relations (int): Maximum relations to include

            

        Returns:

            dict: Entity context information

        """

        if entity not in self.graph:

            return {}

        

        context = {

            "entity": dict(self.graph.nodes[entity]),

            "direct_relations": [],

            "related_entities": []

        }

        

        if include_relations:

            relation_count = 0

            

            # Get outgoing relations

            for neighbor in self.graph.neighbors(entity):

                if relation_count >= max_relations:

                    break

                for edge_data in self.graph[entity][neighbor].values():

                    context["direct_relations"].append({

                        "type": "outgoing",

                        "target": neighbor,

                        "relation": dict(edge_data)

                    })

                    relation_count += 1

                    if relation_count >= max_relations:

                        break

            

            # Get incoming relations

            for predecessor in self.graph.predecessors(entity):

                if relation_count >= max_relations:

                    break

                for edge_data in self.graph[predecessor][entity].values():

                    context["direct_relations"].append({

                        "type": "incoming",

                        "source": predecessor,

                        "relation": dict(edge_data)

                    })

                    relation_count += 1

                    if relation_count >= max_relations:

                        break

            

            # Get related entities (1-hop)

            context["related_entities"] = self.find_related_entities(entity, max_hops=1)

        

        return context

    

    def query_subgraph(self, entities, max_hops=1):

        """

        Extract subgraph around specified entities.

        

        Args:

            entities (list): List of entities

            max_hops (int): Maximum hops from seed entities

            

        Returns:

            networkx.MultiDiGraph: Subgraph

        """

        if not entities:

            return nx.MultiDiGraph()

        

        # Find all related entities

        all_entities = set()

        for entity in entities:

            if entity in self.graph:

                all_entities.add(entity)

                related = self.find_related_entities(entity, max_hops)

                all_entities.update(related)

        

        # Create subgraph

        if all_entities:

            subgraph = self.graph.subgraph(all_entities).copy()

            return subgraph

        else:

            return nx.MultiDiGraph()

    

    def get_statistics(self):

        """Get knowledge graph statistics."""

        stats = {

            "nodes": self.graph.number_of_nodes(),

            "edges": self.graph.number_of_edges(),

            "entities_added": self.statistics["entities_added"],

            "relations_added": self.statistics["relations_added"],

            "sources": len(self.source_mapping)

        }

        

        if self.graph.number_of_nodes() > 0:

            stats["average_degree"] = sum(dict(self.graph.degree()).values()) / self.graph.number_of_nodes()

            stats["connected_components"] = nx.number_weakly_connected_components(self.graph)

        else:

            stats["average_degree"] = 0

            stats["connected_components"] = 0

        

        return stats

    

    def get_top_entities(self, n=10, metric="degree"):

        """

        Get top entities by specified metric.

        

        Args:

            n (int): Number of entities to return

            metric (str): Metric to use ("degree", "betweenness", "pagerank")

            

        Returns:

            list: List of (entity, score) tuples

        """

        if self.graph.number_of_nodes() == 0:

            return []

        

        if metric == "degree":

            scores = dict(self.graph.degree())

        elif metric == "betweenness":

            scores = nx.betweenness_centrality(self.graph)

        elif metric == "pagerank":

            scores = nx.pagerank(self.graph)

        else:

            scores = dict(self.graph.degree())

        

        return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:n]


class GraphRAGRetriever:

    """

    Advanced retriever combining traditional RAG with graph-based retrieval.

    

    This class provides comprehensive context by combining semantic search

    with graph traversal and entity relationship analysis.

    """

    

    def __init__(self, knowledge_base, knowledge_graph):

        """

        Initialize GraphRAG retriever.

        

        Args:

            knowledge_base: Traditional RAG knowledge base

            knowledge_graph: Knowledge graph instance

        """

        self.knowledge_base = knowledge_base

        self.knowledge_graph = knowledge_graph

        self.entity_extractor = EntityExtractor()

    

    def retrieve_context(self, query, k=3, graph_hops=2, max_graph_entities=5):

        """

        Retrieve comprehensive context using both traditional and graph methods.

        

        Args:

            query (str): User query

            k (int): Number of traditional RAG results

            graph_hops (int): Maximum graph traversal hops

            max_graph_entities (int): Maximum entities to include from graph

            

        Returns:

            dict: Comprehensive context information

        """

        # Extract entities from query

        query_entities = self.entity_extractor.extract_entities(query)

        entity_names = [ent["normalized"] for ent in query_entities]

        

        # Traditional RAG retrieval

        traditional_context = []

        if hasattr(self.knowledge_base, 'search_relevant_context'):

            traditional_context = self.knowledge_base.search_relevant_context(query, k)

        

        # Graph-based retrieval

        graph_context = []

        related_entities = set()

        

        if entity_names:

            # Find related entities through graph traversal

            for entity in entity_names:

                if entity in self.knowledge_graph.graph:

                    related = self.knowledge_graph.find_related_entities(entity, graph_hops)

                    related_entities.update(related[:max_graph_entities])

                    related_entities.add(entity)

            

            # Get context for entities

            for entity in list(related_entities)[:max_graph_entities]:

                entity_context = self.knowledge_graph.get_entity_context(entity)

                if entity_context and entity_context.get("direct_relations"):

                    graph_context.append(entity_context)

        

        return {

            "traditional_context": traditional_context,

            "graph_context": graph_context,

            "query_entities": entity_names,

            "related_entities": list(related_entities),

            "context_sources": self._get_context_sources(traditional_context, graph_context)

        }

    

    def _get_context_sources(self, traditional_context, graph_context):

        """Extract unique sources from context."""

        sources = set()

        

        # Traditional context sources

        for ctx in traditional_context:

            if isinstance(ctx, dict) and "source" in ctx:

                sources.add(ctx["source"])

        

        # Graph context sources

        for ctx in graph_context:

            if "entity" in ctx and "source" in ctx["entity"]:

                sources.add(ctx["entity"]["source"])

            for rel in ctx.get("direct_relations", []):

                if "source" in rel.get("relation", {}):

                    sources.add(rel["relation"]["source"])

        

        return list(sources)

    

    def format_context_for_llm(self, context_data, max_traditional=3, max_graph=5):

        """

        Format retrieved context for LLM consumption.

        

        Args:

            context_data (dict): Context data from retrieve_context

            max_traditional (int): Maximum traditional context items

            max_graph (int): Maximum graph context items

            

        Returns:

            str: Formatted context string

        """

        formatted_parts = []

        

        # Add traditional context

        if context_data["traditional_context"]:

            formatted_parts.append("RELEVANT DOCUMENTS:")

            for i, ctx in enumerate(context_data["traditional_context"][:max_traditional]):

                if isinstance(ctx, dict):

                    content = ctx.get("content", str(ctx))

                    source = ctx.get("source", "Unknown")

                    formatted_parts.append(f"\nDocument {i+1} (Source: {source}):")

                    formatted_parts.append(content)

                else:

                    formatted_parts.append(f"\nDocument {i+1}:")

                    formatted_parts.append(str(ctx))

        

        # Add graph context

        if context_data["graph_context"]:

            formatted_parts.append("\n\nENTITY RELATIONSHIPS:")

            for i, entity_ctx in enumerate(context_data["graph_context"][:max_graph]):

                entity_name = entity_ctx["entity"].get("normalized", "Unknown")

                entity_type = entity_ctx["entity"].get("label", "")

                

                formatted_parts.append(f"\nEntity: {entity_name}")

                if entity_type:

                    formatted_parts.append(f"Type: {entity_type}")

                

                # Add relationships

                relations = entity_ctx.get("direct_relations", [])

                if relations:

                    formatted_parts.append("Relationships:")

                    for rel in relations[:3]:  # Limit to avoid overwhelming

                        rel_info = rel.get("relation", {})

                        predicate = rel_info.get("predicate", "RELATED_TO")

                        

                        if rel["type"] == "outgoing":

                            target = rel.get("target", "Unknown")

                            formatted_parts.append(f"  • {entity_name} {predicate} {target}")

                        else:

                            source = rel.get("source", "Unknown")

                            formatted_parts.append(f"  • {source} {predicate} {entity_name}")

        

        # Add query entity information

        if context_data["query_entities"]:

            formatted_parts.append(f"\nQuery mentions entities: {', '.join(context_data['query_entities'])}")

        

        return "\n".join(formatted_parts)


class GraphRAGChatbot:

    """

    Complete GraphRAG chatbot implementation.

    

    This chatbot combines traditional RAG with knowledge graph reasoning

    to provide comprehensive, relationship-aware responses.

    """

    

    def __init__(self, llm_model="microsoft/DialoGPT-medium", embedding_model="all-MiniLM-L6-v2"):

        """

        Initialize GraphRAG chatbot.

        

        Args:

            llm_model (str): Language model for generation

            embedding_model (str): Model for embeddings

        """

        print("Initializing GraphRAG Chatbot...")

        

        # Initialize device

        self.device = self._get_optimal_device()

        print(f"Using device: {self.device}")

        

        # Initialize LLM components

        print(f"Loading language model: {llm_model}")

        self.tokenizer = AutoTokenizer.from_pretrained(llm_model)

        self.model = AutoModelForCausalLM.from_pretrained(llm_model)

        

        if self.tokenizer.pad_token is None:

            self.tokenizer.pad_token = self.tokenizer.eos_token

        

        self.model.to(self.device)

        self.model.eval()

        

        # Initialize GraphRAG components

        from chapter_b_rag import KnowledgeBase  # Reuse from previous chapter

        self.knowledge_base = KnowledgeBase(embedding_model)

        self.knowledge_graph = KnowledgeGraph()

        self.entity_extractor = EntityExtractor()

        self.relation_extractor = RelationExtractor(self.entity_extractor.nlp)

        self.retriever = GraphRAGRetriever(self.knowledge_base, self.knowledge_graph)

        

        # Conversation state

        self.conversation_history = []

        self.max_context_length = 1000

        

        print("GraphRAG Chatbot ready!")

    

    def _get_optimal_device(self):

        """Determine optimal device for inference."""

        if torch.cuda.is_available():

            return torch.device("cuda")

        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():

            return torch.device("mps")

        else:

            return torch.device("cpu")

    

    def build_knowledge_base(self, documents, sources=None):

        """

        Build both traditional knowledge base and knowledge graph.

        

        Args:

            documents (list): List of text documents

            sources (list): Optional source identifiers

        """

        print("Building knowledge base and knowledge graph...")

        

        # Build traditional knowledge base

        self.knowledge_base.add_documents(documents, sources)

        

        # Build knowledge graph

        if sources is None:

            sources = [f"document_{i}" for i in range(len(documents))]

        

        all_entities = []

        all_relations = []

        

        for doc, source in zip(documents, sources):

            print(f"Processing {source} for knowledge graph...")

            

            # Extract entities

            entities = self.entity_extractor.extract_entities(doc, source)

            all_entities.extend(entities)

            

            # Extract relations

            relations = self.relation_extractor.extract_relations(doc, entities, source)

            all_relations.extend(relations)

        

        # Add to knowledge graph

        if all_entities:

            self.knowledge_graph.add_entities(all_entities)

            print(f"Added {len(all_entities)} entities to knowledge graph")

        

        if all_relations:

            self.knowledge_graph.add_relations(all_relations)

            print(f"Added {len(all_relations)} relations to knowledge graph")

        

        # Print statistics

        stats = self.knowledge_graph.get_statistics()

        print(f"Knowledge graph: {stats['nodes']} nodes, {stats['edges']} edges")

    

    def generate_response(self, user_input, use_graph=True, k=3):

        """

        Generate response using GraphRAG.

        

        Args:

            user_input (str): User's input

            use_graph (bool): Whether to use graph reasoning

            k (int): Number of context chunks to retrieve

            

        Returns:

            dict: Response with metadata

        """

        try:

            # Retrieve context

            if use_graph:

                context_data = self.retriever.retrieve_context(user_input, k)

                formatted_context = self.retriever.format_context_for_llm(context_data)

            else:

                # Fallback to traditional RAG

                traditional_context = self.knowledge_base.search_relevant_context(user_input, k)

                formatted_context = "\n".join([

                    ctx.get("content", str(ctx)) if isinstance(ctx, dict) else str(ctx)

                    for ctx in traditional_context

                ])

                context_data = {"traditional_context": traditional_context, "graph_context": []}

            

            # Construct prompt

            if formatted_context.strip():

                prompt = f"""Context Information:

{formatted_context}


Based on the above context, please provide a comprehensive answer to the following question. Consider both the document information and entity relationships when formulating your response.


Question: {user_input}


Answer:"""

            else:

                prompt = f"Question: {user_input}\n\nAnswer:"

            

            # Generate response

            inputs = self.tokenizer.encode(

                prompt,

                return_tensors='pt',

                truncation=True,

                max_length=self.max_context_length

            )

            inputs = inputs.to(self.device)

            

            with torch.no_grad():

                outputs = self.model.generate(

                    inputs,

                    max_length=inputs.shape[1] + 200,

                    num_beams=3,

                    no_repeat_ngram_size=3,

                    do_sample=True,

                    temperature=0.7,

                    top_p=0.9,

                    pad_token_id=self.tokenizer.eos_token_id,

                    early_stopping=True

                )

            

            response = self.tokenizer.decode(

                outputs[0][inputs.shape[1]:],

                skip_special_tokens=True

            ).strip()

            

            # Store conversation

            self.conversation_history.append({

                "user": user_input,

                "assistant": response,

                "used_graph": use_graph,

                "context_data": context_data

            })

            

            return {

                "response": response,

                "used_graph": use_graph,

                "context_data": context_data,

                "sources": context_data.get("context_sources", [])

            }

            

        except Exception as e:

            print(f"Error generating response: {e}")

            return {

                "response": "I apologize, but I encountered an error processing your request.",

                "used_graph": False,

                "context_data": {},

                "sources": []

            }

    

    def get_knowledge_graph_stats(self):

        """Get knowledge graph statistics."""

        return self.knowledge_graph.get_statistics()

    

    def get_top_entities(self, n=10):

        """Get top entities by degree centrality."""

        return self.knowledge_graph.get_top_entities(n)

    

    def explore_entity(self, entity_name):

        """Explore a specific entity in the knowledge graph."""

        return self.knowledge_graph.get_entity_context(entity_name)

    

    def run_interactive_session(self):

        """Run interactive GraphRAG session."""

        print("\n" + "=" * 70)

        print("GRAPHRAG CHATBOT - Interactive Session")

        print("=" * 70)

        print("Commands:")

        print("  'quit' or 'exit' - End conversation")

        print("  'stats' - Show knowledge graph statistics")

        print("  'top_entities' - Show most connected entities")

        print("  'explore <entity>' - Explore specific entity")

        print("  'toggle_graph' - Toggle graph reasoning on/off")

        print("=" * 70)

        

        use_graph = True

        

        while True:

            try:

                user_input = input(f"\nYou: ").strip()

                

                if user_input.lower() in ['quit', 'exit', 'bye']:

                    print("\nChatbot: Thank you for using GraphRAG Chatbot! Goodbye!")

                    break

                

                elif user_input.lower() == 'stats':

                    stats = self.get_knowledge_graph_stats()

                    kb_stats = self.knowledge_base.get_stats()

                    print(f"\nKnowledge Graph Statistics:")

                    print(f"  Entities: {stats['nodes']}")

                    print(f"  Relations: {stats['edges']}")

                    print(f"  Average degree: {stats['average_degree']:.2f}")

                    print(f"  Connected components: {stats['connected_components']}")

                    print(f"  Document chunks: {kb_stats['total_chunks']}")

                    continue

                

                elif user_input.lower() == 'top_entities':

                    top_entities = self.get_top_entities(10)

                    print(f"\nTop Entities by Connections:")

                    for i, (entity, degree) in enumerate(top_entities, 1):

                        print(f"  {i}. {entity} ({degree} connections)")

                    continue

                

                elif user_input.lower().startswith('explore '):

                    entity_name = user_input[8:].strip()

                    context = self.explore_entity(entity_name)

                    if context:

                        print(f"\nEntity: {entity_name}")

                        print(f"Type: {context['entity'].get('label', 'Unknown')}")

                        print(f"Relations: {len(context['direct_relations'])}")

                        for rel in context['direct_relations'][:5]:

                            rel_info = rel['relation']

                            if rel['type'] == 'outgoing':

                                print(f"  → {rel_info['predicate']} {rel['target']}")

                            else:

                                print(f"  ← {rel['source']} {rel_info['predicate']}")

                    else:

                        print(f"\nEntity '{entity_name}' not found in knowledge graph")

                    continue

                

                elif user_input.lower() == 'toggle_graph':

                    use_graph = not use_graph

                    status = "enabled" if use_graph else "disabled"

                    print(f"\nGraph reasoning {status}")

                    continue

                

                if not user_input:

                    continue

                

                # Generate response

                mode_indicator = "[GraphRAG]" if use_graph else "[RAG]"

                print(f"Chatbot {mode_indicator}: ", end="", flush=True)

                

                result = self.generate_response(user_input, use_graph)

                print(result["response"])

                

                # Show sources and entities if available

                if result.get("context_data"):

                    entities = result["context_data"].get("query_entities", [])

                    if entities:

                        print(f"Entities: {', '.join(entities)}")

                    

                    sources = result.get("sources", [])

                    if sources:

                        print(f"Sources: {', '.join(sources)}")

                

            except KeyboardInterrupt:

                print("\n\nSession interrupted. Goodbye!")

                break

            except Exception as e:

                print(f"\nError: {e}")


def create_sample_knowledge_base_with_entities():

    """Create a sample knowledge base rich in entities and relationships."""

    sample_documents = [

        """

        Microsoft Corporation is a multinational technology company founded by Bill Gates and Paul Allen in 1975. 

        The company is headquartered in Redmond, Washington. Satya Nadella serves as the current CEO of Microsoft, 

        having taken over from Steve Ballmer in 2014. Microsoft is known for developing the Windows operating system, 

        Microsoft Office suite, and Azure cloud computing platform. The company acquired LinkedIn in 2016 for 

        $26.2 billion and GitHub in 2018 for $7.5 billion.

        """,

        """

        Apple Inc. is a technology company founded by Steve Jobs, Steve Wozniak, and Ronald Wayne in 1976. 

        The company is based in Cupertino, California. Tim Cook is the current CEO of Apple, succeeding 

        Steve Jobs in 2011. Apple is famous for products like the iPhone, iPad, Mac computers, and Apple Watch. 

        The company's services include the App Store, iCloud, and Apple Music. Apple became the first company 

        to reach a $1 trillion market valuation in 2018.

        """,

        """

        Google LLC, a subsidiary of Alphabet Inc., was founded by Larry Page and Sergey Brin in 1998 while 

        they were PhD students at Stanford University. The company is headquartered in Mountain View, California. 

        Sundar Pichai serves as the CEO of Google. The company is best known for its search engine, but also 

        develops Android operating system, Chrome browser, and Google Cloud platform. Google acquired YouTube 

        in 2006 for $1.65 billion and Android Inc. in 2005.

        """,

        """

        Amazon.com Inc. was founded by Jeff Bezos in 1994 in Bellevue, Washington. The company started as an 

        online bookstore but has expanded to become one of the world's largest e-commerce and cloud computing 

        companies. Andy Jassy became CEO in 2021, taking over from Jeff Bezos. Amazon Web Services (AWS) is 

        the company's cloud computing division. Amazon acquired Whole Foods Market in 2017 for $13.7 billion.

        """,

        """

        Tesla Inc. is an electric vehicle and clean energy company founded by Martin Eberhard and Marc Tarpenning 

        in 2003. Elon Musk joined as chairman and later became CEO. The company is headquartered in Austin, Texas, 

        having moved from Palo Alto, California. Tesla produces electric vehicles including the Model S, Model 3, 

        Model X, and Model Y. The company also develops energy storage systems and solar panels through its 

        acquisition of SolarCity in 2016.

        """

    ]

    

    sources = [

        "Microsoft_Profile",

        "Apple_Profile", 

        "Google_Profile",

        "Amazon_Profile",

        "Tesla_Profile"

    ]

    

    return sample_documents, sources


def main():

    """

    Main function demonstrating GraphRAG chatbot functionality.

    """

    try:

        # Create GraphRAG chatbot

        chatbot = GraphRAGChatbot()

        

        # Create sample knowledge base

        print("\nCreating sample knowledge base with entities and relationships...")

        documents, sources = create_sample_knowledge_base_with_entities()

        chatbot.build_knowledge_base(documents, sources)

        

        # Show statistics

        stats = chatbot.get_knowledge_graph_stats()

        kb_stats = chatbot.knowledge_base.get_stats()

        print(f"\nKnowledge base created:")

        print(f"  Documents: {kb_stats['total_documents']}")

        print(f"  Text chunks: {kb_stats['total_chunks']}")

        print(f"  Entities: {stats['nodes']}")

        print(f"  Relations: {stats['edges']}")

        

        # Show top entities

        top_entities = chatbot.get_top_entities(5)

        print(f"\nTop entities by connections:")

        for entity, degree in top_entities:

            print(f"  {entity}: {degree} connections")

        

        # Run interactive session

        chatbot.run_interactive_session()

        

    except Exception as e:

        print(f"Failed to initialize GraphRAG chatbot: {e}")

        print("Please check your installation and try again.")


if __name__ == "__main__":

    main()


This comprehensive GraphRAG implementation demonstrates the power of combining traditional retrieval with graph-based reasoning. The system can understand entity relationships, perform multi-hop reasoning, and provide more contextually aware responses by leveraging the structured knowledge representation in the graph.


CHAPTER D: IMPLEMENTING MULTI-AGENT SYSTEMS


Multi-Agent Systems represent a paradigm shift from single-model architectures to collaborative networks of specialized AI agents working together to solve complex problems. Unlike monolithic chatbots that attempt to handle all tasks with a single model, multi-agent systems decompose problems into manageable components, assigning each to agents with specific expertise and capabilities.


The fundamental principle underlying multi-agent systems lies in the division of labor and specialization. Just as human organizations benefit from having experts in different domains collaborate on complex projects, AI systems can achieve superior performance by employing multiple agents with distinct roles, knowledge bases, and reasoning capabilities. This approach enables more sophisticated problem-solving, better error handling, and improved scalability.


The rationale for implementing multi-agent systems stems from the limitations of single-agent approaches when dealing with complex, multi-faceted problems. Consider a user query that requires research, analysis, synthesis, and presentation. A single agent might struggle to excel at all these tasks simultaneously, whereas a multi-agent system can assign research to a specialized retrieval agent, analysis to a reasoning agent, synthesis to an integration agent, and presentation to a formatting agent.


Multi-agent architecture consists of several key components working in coordination. Agent definitions specify individual agent capabilities, roles, and interfaces. Communication protocols enable agents to exchange information and coordinate actions. Task orchestration manages the flow of work between agents. Shared memory or message passing facilitates information sharing. Finally, result aggregation combines outputs from multiple agents into coherent responses.


The implementation of multi-agent systems requires careful consideration of agent design, communication patterns, and coordination mechanisms. Agents can be implemented as separate model instances, specialized prompts within the same model, or hybrid approaches combining both strategies. Communication can be synchronous or asynchronous, direct or mediated through a central coordinator.


Setting up a multi-agent system requires extending our existing infrastructure with agent management capabilities. We will implement a flexible framework that supports different agent types, communication patterns, and coordination strategies:


import asyncio

from abc import ABC, abstractmethod

from typing import Dict, List, Any, Optional, Callable

from dataclasses import dataclass, field

from enum import Enum

import json

import time

import uuid


class AgentRole(Enum):

    COORDINATOR = "coordinator"

    RESEARCHER = "researcher"

    ANALYZER = "analyzer"

    SYNTHESIZER = "synthesizer"

    CRITIC = "critic"

    FORMATTER = "formatter"


@dataclass

class Message:

    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    sender: str = ""

    recipient: str = ""

    content: str = ""

    message_type: str = "text"

    metadata: Dict[str, Any] = field(default_factory=dict)

    timestamp: float = field(default_factory=time.time)


@dataclass

class AgentCapability:

    name: str

    description: str

    input_types: List[str]

    output_types: List[str]

    parameters: Dict[str, Any] = field(default_factory=dict)


class Agent(ABC):

    def __init__(self, agent_id: str, role: AgentRole, capabilities: List[AgentCapability]):

        self.agent_id = agent_id

        self.role = role

        self.capabilities = capabilities

        self.message_history: List[Message] = []

        self.state: Dict[str, Any] = {}

        self.is_active = True

    

    @abstractmethod

    async def process_message(self, message: Message) -> List[Message]:

        """Process incoming message and return response messages."""

        pass

    

    @abstractmethod

    def can_handle(self, task_type: str) -> bool:

        """Check if agent can handle a specific task type."""

        pass

    

    def add_message_to_history(self, message: Message):

        """Add message to agent's history."""

        self.message_history.append(message)

    

    def get_recent_context(self, n: int = 5) -> List[Message]:

        """Get recent messages for context."""

        return self.message_history[-n:]


The base Agent class provides a foundation for implementing specialized agents. Each agent has a unique identifier, role, capabilities, and maintains its own message history and state. The abstract methods ensure that concrete agent implementations provide the necessary functionality for message processing and task handling.


Agent specialization enables different agents to excel at specific tasks. We will implement several specialized agent types, each optimized for particular aspects of the problem-solving process:


class ResearchAgent(Agent):

    def __init__(self, agent_id: str, knowledge_base, embedding_model):

        capabilities = [

            AgentCapability(

                name="document_search",

                description="Search knowledge base for relevant information",

                input_types=["query"],

                output_types=["documents", "facts"]

            ),

            AgentCapability(

                name="fact_extraction",

                description="Extract specific facts from documents",

                input_types=["documents", "questions"],

                output_types=["facts", "evidence"]

            )

        ]

        super().__init__(agent_id, AgentRole.RESEARCHER, capabilities)

        self.knowledge_base = knowledge_base

        self.embedding_model = embedding_model

    

    async def process_message(self, message: Message) -> List[Message]:

        self.add_message_to_history(message)

        

        if message.message_type == "research_request":

            query = message.content

            results = await self._conduct_research(query)

            

            response = Message(

                sender=self.agent_id,

                recipient=message.sender,

                content=json.dumps(results),

                message_type="research_results",

                metadata={"query": query, "result_count": len(results)}

            )

            return [response]

        

        return []

    

    async def _conduct_research(self, query: str) -> List[Dict[str, Any]]:

        # Simulate async research operation

        await asyncio.sleep(0.1)

        

        if hasattr(self.knowledge_base, 'search_relevant_context'):

            context_results = self.knowledge_base.search_relevant_context(query, k=5)

            

            research_results = []

            for i, ctx in enumerate(context_results):

                if isinstance(ctx, dict):

                    research_results.append({

                        "id": f"result_{i}",

                        "content": ctx.get("content", ""),

                        "source": ctx.get("source", "unknown"),

                        "relevance_score": ctx.get("score", 0.0),

                        "type": "document_chunk"

                    })

                else:

                    research_results.append({

                        "id": f"result_{i}",

                        "content": str(ctx),

                        "source": "unknown",

                        "relevance_score": 0.5,

                        "type": "document_chunk"

                    })

            

            return research_results

        

        return []

    

    def can_handle(self, task_type: str) -> bool:

        return task_type in ["research", "search", "fact_finding", "information_retrieval"]


class AnalyzerAgent(Agent):

    def __init__(self, agent_id: str, llm_model, tokenizer, device):

        capabilities = [

            AgentCapability(

                name="content_analysis",

                description="Analyze content for patterns, insights, and relationships",

                input_types=["documents", "facts"],

                output_types=["analysis", "insights"]

            ),

            AgentCapability(

                name="logical_reasoning",

                description="Perform logical reasoning and inference",

                input_types=["facts", "rules"],

                output_types=["conclusions", "inferences"]

            )

        ]

        super().__init__(agent_id, AgentRole.ANALYZER, capabilities)

        self.llm_model = llm_model

        self.tokenizer = tokenizer

        self.device = device

    

    async def process_message(self, message: Message) -> List[Message]:

        self.add_message_to_history(message)

        

        if message.message_type == "analysis_request":

            data = json.loads(message.content)

            analysis = await self._analyze_content(data)

            

            response = Message(

                sender=self.agent_id,

                recipient=message.sender,

                content=json.dumps(analysis),

                message_type="analysis_results",

                metadata={"analysis_type": data.get("type", "general")}

            )

            return [response]

        

        return []

    

    async def _analyze_content(self, data: Dict[str, Any]) -> Dict[str, Any]:

        # Simulate async analysis

        await asyncio.sleep(0.2)

        

        content_items = data.get("content", [])

        analysis_type = data.get("type", "general")

        

        # Construct analysis prompt

        content_text = "\n".join([

            item.get("content", "") if isinstance(item, dict) else str(item)

            for item in content_items

        ])

        

        prompt = f"""Analyze the following content and provide insights:


Content:

{content_text}


Analysis Type: {analysis_type}


Please provide:

1. Key themes and patterns

2. Important relationships

3. Logical conclusions

4. Potential implications


Analysis:"""

        

        # Generate analysis using LLM

        try:

            inputs = self.tokenizer.encode(prompt, return_tensors='pt', truncation=True, max_length=800)

            inputs = inputs.to(self.device)

            

            with torch.no_grad():

                outputs = self.llm_model.generate(

                    inputs,

                    max_length=inputs.shape[1] + 200,

                    num_beams=3,

                    temperature=0.7,

                    do_sample=True,

                    pad_token_id=self.tokenizer.eos_token_id

                )

            

            analysis_text = self.tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)

            

            return {

                "analysis": analysis_text.strip(),

                "content_summary": f"Analyzed {len(content_items)} items",

                "analysis_type": analysis_type,

                "confidence": 0.8

            }

        

        except Exception as e:

            return {

                "analysis": f"Analysis failed: {str(e)}",

                "content_summary": f"Failed to analyze {len(content_items)} items",

                "analysis_type": analysis_type,

                "confidence": 0.0

            }

    

    def can_handle(self, task_type: str) -> bool:

        return task_type in ["analysis", "reasoning", "pattern_recognition", "inference"]


class SynthesizerAgent(Agent):

    def __init__(self, agent_id: str, llm_model, tokenizer, device):

        capabilities = [

            AgentCapability(

                name="content_synthesis",

                description="Synthesize multiple sources into coherent response",

                input_types=["research_results", "analysis", "facts"],

                output_types=["synthesis", "summary"]

            ),

            AgentCapability(

                name="response_generation",

                description="Generate final responses based on synthesized information",

                input_types=["synthesis", "query"],

                output_types=["response"]

            )

        ]

        super().__init__(agent_id, AgentRole.SYNTHESIZER, capabilities)

        self.llm_model = llm_model

        self.tokenizer = tokenizer

        self.device = device

    

    async def process_message(self, message: Message) -> List[Message]:

        self.add_message_to_history(message)

        

        if message.message_type == "synthesis_request":

            data = json.loads(message.content)

            synthesis = await self._synthesize_information(data)

            

            response = Message(

                sender=self.agent_id,

                recipient=message.sender,

                content=synthesis,

                message_type="synthesis_results",

                metadata={"sources_count": len(data.get("sources", []))}

            )

            return [response]

        

        return []

    

    async def _synthesize_information(self, data: Dict[str, Any]) -> str:

        # Simulate async synthesis

        await asyncio.sleep(0.15)

        

        query = data.get("query", "")

        research_results = data.get("research_results", [])

        analysis_results = data.get("analysis_results", {})

        

        # Construct synthesis prompt

        research_text = ""

        if research_results:

            research_text = "Research Findings:\n"

            for i, result in enumerate(research_results[:5]):

                if isinstance(result, dict):

                    research_text += f"{i+1}. {result.get('content', '')}\n"

                else:

                    research_text += f"{i+1}. {str(result)}\n"

        

        analysis_text = ""

        if analysis_results:

            analysis_text = f"Analysis:\n{analysis_results.get('analysis', '')}\n"

        

        prompt = f"""Based on the following research and analysis, provide a comprehensive answer to the user's question.


User Question: {query}


{research_text}


{analysis_text}


Please synthesize this information into a clear, well-structured response that directly addresses the user's question:"""

        

        # Generate synthesis using LLM

        try:

            inputs = self.tokenizer.encode(prompt, return_tensors='pt', truncation=True, max_length=900)

            inputs = inputs.to(self.device)

            

            with torch.no_grad():

                outputs = self.llm_model.generate(

                    inputs,

                    max_length=inputs.shape[1] + 250,

                    num_beams=3,

                    temperature=0.7,

                    do_sample=True,

                    pad_token_id=self.tokenizer.eos_token_id

                )

            

            synthesis = self.tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)

            return synthesis.strip()

        

        except Exception as e:

            return f"I apologize, but I encountered an error synthesizing the information: {str(e)}"

    

    def can_handle(self, task_type: str) -> bool:

        return task_type in ["synthesis", "summarization", "response_generation", "integration"]


The coordination mechanism manages the flow of work between agents, ensuring that tasks are properly distributed and results are aggregated. The coordinator agent serves as the central orchestrator, deciding which agents to involve and in what sequence:


class CoordinatorAgent(Agent):

    def __init__(self, agent_id: str, agents: Dict[str, Agent]):

        capabilities = [

            AgentCapability(

                name="task_orchestration",

                description="Coordinate tasks between multiple agents",

                input_types=["user_query"],

                output_types=["final_response"]

            ),

            AgentCapability(

                name="agent_management",

                description="Manage agent interactions and workflow",

                input_types=["agent_responses"],

                output_types=["coordination_decisions"]

            )

        ]

        super().__init__(agent_id, AgentRole.COORDINATOR, capabilities)

        self.agents = agents

        self.active_workflows: Dict[str, Dict[str, Any]] = {}

    

    async def process_message(self, message: Message) -> List[Message]:

        self.add_message_to_history(message)

        

        if message.message_type == "user_query":

            response = await self._coordinate_response(message.content, message.id)

            

            return [Message(

                sender=self.agent_id,

                recipient=message.sender,

                content=response,

                message_type="final_response",

                metadata={"workflow_id": message.id}

            )]

        

        return []

    

    async def _coordinate_response(self, query: str, workflow_id: str) -> str:

        """Coordinate multi-agent response to user query."""

        try:

            # Initialize workflow

            self.active_workflows[workflow_id] = {

                "query": query,

                "status": "active",

                "results": {},

                "start_time": time.time()

            }

            

            # Step 1: Research

            research_results = await self._request_research(query)

            self.active_workflows[workflow_id]["results"]["research"] = research_results

            

            # Step 2: Analysis

            analysis_results = await self._request_analysis(research_results, query)

            self.active_workflows[workflow_id]["results"]["analysis"] = analysis_results

            

            # Step 3: Synthesis

            final_response = await self._request_synthesis(query, research_results, analysis_results)

            self.active_workflows[workflow_id]["results"]["synthesis"] = final_response

            

            # Mark workflow complete

            self.active_workflows[workflow_id]["status"] = "complete"

            self.active_workflows[workflow_id]["end_time"] = time.time()

            

            return final_response

        

        except Exception as e:

            return f"I apologize, but I encountered an error coordinating the response: {str(e)}"

    

    async def _request_research(self, query: str) -> List[Dict[str, Any]]:

        """Request research from research agent."""

        researcher = self._find_agent_by_role(AgentRole.RESEARCHER)

        if not researcher:

            return []

        

        research_message = Message(

            sender=self.agent_id,

            recipient=researcher.agent_id,

            content=query,

            message_type="research_request"

        )

        

        responses = await researcher.process_message(research_message)

        if responses:

            return json.loads(responses[0].content)

        return []

    

    async def _request_analysis(self, research_results: List[Dict[str, Any]], query: str) -> Dict[str, Any]:

        """Request analysis from analyzer agent."""

        analyzer = self._find_agent_by_role(AgentRole.ANALYZER)

        if not analyzer:

            return {}

        

        analysis_data = {

            "content": research_results,

            "type": "research_analysis",

            "query": query

        }

        

        analysis_message = Message(

            sender=self.agent_id,

            recipient=analyzer.agent_id,

            content=json.dumps(analysis_data),

            message_type="analysis_request"

        )

        

        responses = await analyzer.process_message(analysis_message)

        if responses:

            return json.loads(responses[0].content)

        return {}

    

    async def _request_synthesis(self, query: str, research_results: List[Dict[str, Any]], 

                                analysis_results: Dict[str, Any]) -> str:

        """Request synthesis from synthesizer agent."""

        synthesizer = self._find_agent_by_role(AgentRole.SYNTHESIZER)

        if not synthesizer:

            return "Unable to synthesize response - synthesizer agent not available."

        

        synthesis_data = {

            "query": query,

            "research_results": research_results,

            "analysis_results": analysis_results

        }

        

        synthesis_message = Message(

            sender=self.agent_id,

            recipient=synthesizer.agent_id,

            content=json.dumps(synthesis_data),

            message_type="synthesis_request"

        )

        

        responses = await synthesizer.process_message(synthesis_message)

        if responses:

            return responses[0].content

        return "Unable to generate synthesis."

    

    def _find_agent_by_role(self, role: AgentRole) -> Optional[Agent]:

        """Find agent by role."""

        for agent in self.agents.values():

            if agent.role == role:

                return agent

        return None

    

    def can_handle(self, task_type: str) -> bool:

        return task_type in ["coordination", "orchestration", "workflow_management"]

    

    def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:

        """Get status of a specific workflow."""

        return self.active_workflows.get(workflow_id, {})


COMPLETE RUNNING EXAMPLE FOR MULTI-AGENT SYSTEM:


import asyncio

import json

import time

import uuid

from abc import ABC, abstractmethod

from dataclasses import dataclass, field

from enum import Enum

from typing import Dict, List, Any, Optional

import torch

from transformers import AutoModelForCausalLM, AutoTokenizer

import warnings

warnings.filterwarnings("ignore")


class AgentRole(Enum):

    """Enumeration of agent roles in the multi-agent system."""

    COORDINATOR = "coordinator"

    RESEARCHER = "researcher"

    ANALYZER = "analyzer"

    SYNTHESIZER = "synthesizer"

    CRITIC = "critic"

    FORMATTER = "formatter"


@dataclass

class Message:

    """Message structure for inter-agent communication."""

    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    sender: str = ""

    recipient: str = ""

    content: str = ""

    message_type: str = "text"

    metadata: Dict[str, Any] = field(default_factory=dict)

    timestamp: float = field(default_factory=time.time)


@dataclass

class AgentCapability:

    """Definition of agent capabilities."""

    name: str

    description: str

    input_types: List[str]

    output_types: List[str]

    parameters: Dict[str, Any] = field(default_factory=dict)


class Agent(ABC):

    """

    Abstract base class for all agents in the multi-agent system.

    

    This class defines the common interface and functionality that all

    agents must implement, including message processing and capability checking.

    """

    

    def __init__(self, agent_id: str, role: AgentRole, capabilities: List[AgentCapability]):

        """

        Initialize agent with basic properties.

        

        Args:

            agent_id (str): Unique identifier for the agent

            role (AgentRole): Role of the agent in the system

            capabilities (List[AgentCapability]): List of agent capabilities

        """

        self.agent_id = agent_id

        self.role = role

        self.capabilities = capabilities

        self.message_history: List[Message] = []

        self.state: Dict[str, Any] = {}

        self.is_active = True

        self.performance_metrics = {

            "messages_processed": 0,

            "average_response_time": 0.0,

            "success_rate": 1.0

        }

    

    @abstractmethod

    async def process_message(self, message: Message) -> List[Message]:

        """

        Process incoming message and return response messages.

        

        Args:

            message (Message): Incoming message to process

            

        Returns:

            List[Message]: List of response messages

        """

        pass

    

    @abstractmethod

    def can_handle(self, task_type: str) -> bool:

        """

        Check if agent can handle a specific task type.

        

        Args:

            task_type (str): Type of task to check

            

        Returns:

            bool: True if agent can handle the task

        """

        pass

    

    def add_message_to_history(self, message: Message):

        """Add message to agent's history and update metrics."""

        self.message_history.append(message)

        self.performance_metrics["messages_processed"] += 1

    

    def get_recent_context(self, n: int = 5) -> List[Message]:

        """Get recent messages for context."""

        return self.message_history[-n:]

    

    def get_performance_metrics(self) -> Dict[str, Any]:

        """Get agent performance metrics."""

        return self.performance_metrics.copy()


class ResearchAgent(Agent):

    """

    Research agent specialized in information retrieval and fact-finding.

    

    This agent searches knowledge bases, extracts relevant information,

    and provides research results to other agents in the system.

    """

    

    def __init__(self, agent_id: str, knowledge_base=None):

        """

        Initialize research agent.

        

        Args:

            agent_id (str): Unique agent identifier

            knowledge_base: Knowledge base for information retrieval

        """

        capabilities = [

            AgentCapability(

                name="document_search",

                description="Search knowledge base for relevant information",

                input_types=["query"],

                output_types=["documents", "facts"]

            ),

            AgentCapability(

                name="fact_extraction",

                description="Extract specific facts from documents",

                input_types=["documents", "questions"],

                output_types=["facts", "evidence"]

            ),

            AgentCapability(

                name="source_verification",

                description="Verify and rank information sources",

                input_types=["sources"],

                output_types=["verified_sources"]

            )

        ]

        super().__init__(agent_id, AgentRole.RESEARCHER, capabilities)

        self.knowledge_base = knowledge_base

        self.search_cache = {}

    

    async def process_message(self, message: Message) -> List[Message]:

        """Process research requests and return findings."""

        start_time = time.time()

        self.add_message_to_history(message)

        

        try:

            if message.message_type == "research_request":

                query = message.content

                results = await self._conduct_research(query)

                

                response = Message(

                    sender=self.agent_id,

                    recipient=message.sender,

                    content=json.dumps(results),

                    message_type="research_results",

                    metadata={

                        "query": query,

                        "result_count": len(results),

                        "search_time": time.time() - start_time

                    }

                )

                

                # Update performance metrics

                response_time = time.time() - start_time

                self._update_response_time(response_time)

                

                return [response]

            

            elif message.message_type == "fact_check_request":

                data = json.loads(message.content)

                verification = await self._verify_facts(data)

                

                response = Message(

                    sender=self.agent_id,

                    recipient=message.sender,

                    content=json.dumps(verification),

                    message_type="fact_check_results",

                    metadata={"verification_time": time.time() - start_time}

                )

                

                return [response]

        

        except Exception as e:

            error_response = Message(

                sender=self.agent_id,

                recipient=message.sender,

                content=json.dumps({"error": str(e), "status": "failed"}),

                message_type="error_response",

                metadata={"error_time": time.time() - start_time}

            )

            self.performance_metrics["success_rate"] *= 0.95  # Decrease success rate

            return [error_response]

        

        return []

    

    async def _conduct_research(self, query: str) -> List[Dict[str, Any]]:

        """Conduct research using available knowledge base."""

        # Check cache first

        if query in self.search_cache:

            return self.search_cache[query]

        

        # Simulate async research operation

        await asyncio.sleep(0.1)

        

        research_results = []

        

        if self.knowledge_base and hasattr(self.knowledge_base, 'search_relevant_context'):

            try:

                context_results = self.knowledge_base.search_relevant_context(query, k=5)

                

                for i, ctx in enumerate(context_results):

                    if isinstance(ctx, dict):

                        research_results.append({

                            "id": f"research_{i}",

                            "content": ctx.get("content", ""),

                            "source": ctx.get("source", "unknown"),

                            "relevance_score": ctx.get("score", 0.0),

                            "type": "document_chunk",

                            "metadata": {

                                "extraction_method": "semantic_search",

                                "confidence": min(ctx.get("score", 0.0) * 1.2, 1.0)

                            }

                        })

                    else:

                        research_results.append({

                            "id": f"research_{i}",

                            "content": str(ctx),

                            "source": "unknown",

                            "relevance_score": 0.5,

                            "type": "document_chunk",

                            "metadata": {

                                "extraction_method": "text_search",

                                "confidence": 0.5

                            }

                        })

            

            except Exception as e:

                research_results.append({

                    "id": "error_result",

                    "content": f"Research error: {str(e)}",

                    "source": "system",

                    "relevance_score": 0.0,

                    "type": "error",

                    "metadata": {"error": True}

                })

        

        else:

            # Fallback: simulated research results

            research_results = [

                {

                    "id": "simulated_result",

                    "content": f"Simulated research result for query: {query}",

                    "source": "simulation",

                    "relevance_score": 0.7,

                    "type": "simulated",

                    "metadata": {"simulated": True}

                }

            ]

        

        # Cache results

        self.search_cache[query] = research_results

        

        return research_results

    

    async def _verify_facts(self, data: Dict[str, Any]) -> Dict[str, Any]:

        """Verify facts against knowledge base."""

        await asyncio.sleep(0.05)

        

        facts = data.get("facts", [])

        verification_results = []

        

        for fact in facts:

            # Simple fact verification simulation

            verification_results.append({

                "fact": fact,

                "verified": True,  # Simplified verification

                "confidence": 0.8,

                "sources": ["knowledge_base"],

                "verification_method": "knowledge_base_lookup"

            })

        

        return {

            "verification_results": verification_results,

            "overall_confidence": 0.8,

            "verification_timestamp": time.time()

        }

    

    def _update_response_time(self, response_time: float):

        """Update average response time metric."""

        current_avg = self.performance_metrics["average_response_time"]

        message_count = self.performance_metrics["messages_processed"]

        

        if message_count == 1:

            self.performance_metrics["average_response_time"] = response_time

        else:

            self.performance_metrics["average_response_time"] = (

                (current_avg * (message_count - 1) + response_time) / message_count

            )

    

    def can_handle(self, task_type: str) -> bool:

        """Check if agent can handle specific task types."""

        return task_type in [

            "research", "search", "fact_finding", "information_retrieval",

            "fact_checking", "source_verification", "document_analysis"

        ]


class AnalyzerAgent(Agent):

    """

    Analyzer agent specialized in content analysis and logical reasoning.

    

    This agent analyzes information, identifies patterns, performs logical

    reasoning, and provides insights based on available data.

    """

    

    def __init__(self, agent_id: str, llm_model=None, tokenizer=None, device=None):

        """

        Initialize analyzer agent.

        

        Args:

            agent_id (str): Unique agent identifier

            llm_model: Language model for analysis

            tokenizer: Tokenizer for the language model

            device: Device for model inference

        """

        capabilities = [

            AgentCapability(

                name="content_analysis",

                description="Analyze content for patterns, insights, and relationships",

                input_types=["documents", "facts", "data"],

                output_types=["analysis", "insights", "patterns"]

            ),

            AgentCapability(

                name="logical_reasoning",

                description="Perform logical reasoning and inference",

                input_types=["facts", "rules", "premises"],

                output_types=["conclusions", "inferences", "logical_chains"]

            ),

            AgentCapability(

                name="pattern_recognition",

                description="Identify patterns and trends in data",

                input_types=["data", "sequences"],

                output_types=["patterns", "trends", "anomalies"]

            )

        ]

        super().__init__(agent_id, AgentRole.ANALYZER, capabilities)

        self.llm_model = llm_model

        self.tokenizer = tokenizer

        self.device = device

        self.analysis_cache = {}

    

    async def process_message(self, message: Message) -> List[Message]:

        """Process analysis requests and return insights."""

        start_time = time.time()

        self.add_message_to_history(message)

        

        try:

            if message.message_type == "analysis_request":

                data = json.loads(message.content)

                analysis = await self._analyze_content(data)

                

                response = Message(

                    sender=self.agent_id,

                    recipient=message.sender,

                    content=json.dumps(analysis),

                    message_type="analysis_results",

                    metadata={

                        "analysis_type": data.get("type", "general"),

                        "analysis_time": time.time() - start_time

                    }

                )

                

                self._update_response_time(time.time() - start_time)

                return [response]

            

            elif message.message_type == "reasoning_request":

                data = json.loads(message.content)

                reasoning = await self._perform_reasoning(data)

                

                response = Message(

                    sender=self.agent_id,

                    recipient=message.sender,

                    content=json.dumps(reasoning),

                    message_type="reasoning_results",

                    metadata={"reasoning_time": time.time() - start_time}

                )

                

                return [response]

        

        except Exception as e:

            error_response = Message(

                sender=self.agent_id,

                recipient=message.sender,

                content=json.dumps({"error": str(e), "status": "failed"}),

                message_type="error_response",

                metadata={"error_time": time.time() - start_time}

            )

            self.performance_metrics["success_rate"] *= 0.95

            return [error_response]

        

        return []

    

    async def _analyze_content(self, data: Dict[str, Any]) -> Dict[str, Any]:

        """Analyze content and provide insights."""

        # Check cache

        cache_key = str(hash(json.dumps(data, sort_keys=True)))

        if cache_key in self.analysis_cache:

            return self.analysis_cache[cache_key]

        

        await asyncio.sleep(0.2)  # Simulate processing time

        

        content_items = data.get("content", [])

        analysis_type = data.get("type", "general")

        query = data.get("query", "")

        

        # Prepare content for analysis

        content_text = ""

        for item in content_items:

            if isinstance(item, dict):

                content_text += item.get("content", "") + "\n"

            else:

                content_text += str(item) + "\n"

        

        analysis_result = {

            "analysis_type": analysis_type,

            "content_summary": f"Analyzed {len(content_items)} content items",

            "key_themes": [],

            "patterns": [],

            "insights": [],

            "confidence": 0.8

        }

        

        if self.llm_model and self.tokenizer and content_text.strip():

            try:

                # Construct analysis prompt

                prompt = f"""Analyze the following content and provide detailed insights:


Content to analyze:

{content_text[:1000]}  # Limit content length


Analysis focus: {analysis_type}

Related query: {query}


Please provide:

1. Key themes and main topics

2. Important patterns or relationships

3. Logical insights and implications

4. Potential conclusions


Analysis:"""

                

                # Generate analysis using LLM

                inputs = self.tokenizer.encode(

                    prompt,

                    return_tensors='pt',

                    truncation=True,

                    max_length=800

                )

                inputs = inputs.to(self.device)

                

                with torch.no_grad():

                    outputs = self.llm_model.generate(

                        inputs,

                        max_length=inputs.shape[1] + 200,

                        num_beams=3,

                        temperature=0.7,

                        do_sample=True,

                        pad_token_id=self.tokenizer.eos_token_id,

                        early_stopping=True

                    )

                

                analysis_text = self.tokenizer.decode(

                    outputs[0][inputs.shape[1]:],

                    skip_special_tokens=True

                ).strip()

                

                # Parse analysis into structured format

                analysis_result.update({

                    "detailed_analysis": analysis_text,

                    "key_themes": self._extract_themes(analysis_text),

                    "patterns": self._extract_patterns(analysis_text),

                    "insights": self._extract_insights(analysis_text),

                    "confidence": 0.85

                })

            

            except Exception as e:

                analysis_result.update({

                    "detailed_analysis": f"Analysis generation failed: {str(e)}",

                    "confidence": 0.1,

                    "error": str(e)

                })

        

        else:

            # Fallback analysis

            analysis_result.update({

                "detailed_analysis": f"Basic analysis of {len(content_items)} items for {analysis_type}",

                "key_themes": ["Content analysis", "Information processing"],

                "patterns": ["Data structure patterns"],

                "insights": ["Multiple content sources analyzed"],

                "confidence": 0.6

            })

        

        # Cache result

        self.analysis_cache[cache_key] = analysis_result

        

        return analysis_result

    

    async def _perform_reasoning(self, data: Dict[str, Any]) -> Dict[str, Any]:

        """Perform logical reasoning on provided premises."""

        await asyncio.sleep(0.15)

        

        premises = data.get("premises", [])

        reasoning_type = data.get("type", "deductive")

        

        reasoning_result = {

            "reasoning_type": reasoning_type,

            "premises_count": len(premises),

            "conclusions": [],

            "logical_chain": [],

            "confidence": 0.7

        }

        

        # Simple logical reasoning simulation

        if premises:

            reasoning_result.update({

                "conclusions": [f"Conclusion based on {len(premises)} premises"],

                "logical_chain": [f"Step 1: Analyze premises", f"Step 2: Apply {reasoning_type} reasoning"],

                "confidence": 0.8

            })

        

        return reasoning_result

    

    def _extract_themes(self, text: str) -> List[str]:

        """Extract key themes from analysis text."""

        # Simple theme extraction

        themes = []

        if "technology" in text.lower():

            themes.append("Technology")

        if "business" in text.lower():

            themes.append("Business")

        if "innovation" in text.lower():

            themes.append("Innovation")

        return themes[:3]

    

    def _extract_patterns(self, text: str) -> List[str]:

        """Extract patterns from analysis text."""

        patterns = []

        if "relationship" in text.lower():

            patterns.append("Relationship patterns")

        if "trend" in text.lower():

            patterns.append("Trend patterns")

        return patterns[:3]

    

    def _extract_insights(self, text: str) -> List[str]:

        """Extract insights from analysis text."""

        insights = []

        sentences = text.split('.')

        for sentence in sentences[:3]:

            if len(sentence.strip()) > 20:

                insights.append(sentence.strip())

        return insights

    

    def _update_response_time(self, response_time: float):

        """Update average response time metric."""

        current_avg = self.performance_metrics["average_response_time"]

        message_count = self.performance_metrics["messages_processed"]

        

        if message_count == 1:

            self.performance_metrics["average_response_time"] = response_time

        else:

            self.performance_metrics["average_response_time"] = (

                (current_avg * (message_count - 1) + response_time) / message_count

            )

    

    def can_handle(self, task_type: str) -> bool:

        """Check if agent can handle specific task types."""

        return task_type in [

            "analysis", "reasoning", "pattern_recognition", "inference",

            "logical_analysis", "content_analysis", "insight_generation"

        ]


class SynthesizerAgent(Agent):

    """

    Synthesizer agent specialized in information integration and response generation.

    

    This agent combines information from multiple sources, synthesizes insights,

    and generates coherent, comprehensive responses.

    """

    

    def __init__(self, agent_id: str, llm_model=None, tokenizer=None, device=None):

        """

        Initialize synthesizer agent.

        

        Args:

            agent_id (str): Unique agent identifier

            llm_model: Language model for synthesis

            tokenizer: Tokenizer for the language model

            device: Device for model inference

        """

        capabilities = [

            AgentCapability(

                name="content_synthesis",

                description="Synthesize multiple sources into coherent response",

                input_types=["research_results", "analysis", "facts"],

                output_types=["synthesis", "summary", "integrated_response"]

            ),

            AgentCapability(

                name="response_generation",

                description="Generate final responses based on synthesized information",

                input_types=["synthesis", "query"],

                output_types=["response", "formatted_answer"]

            ),

            AgentCapability(

                name="information_integration",

                description="Integrate information from multiple agents",

                input_types=["multiple_sources"],

                output_types=["integrated_information"]

            )

        ]

        super().__init__(agent_id, AgentRole.SYNTHESIZER, capabilities)

        self.llm_model = llm_model

        self.tokenizer = tokenizer

        self.device = device

        self.synthesis_cache = {}

    

    async def process_message(self, message: Message) -> List[Message]:

        """Process synthesis requests and return integrated responses."""

        start_time = time.time()

        self.add_message_to_history(message)

        

        try:

            if message.message_type == "synthesis_request":

                data = json.loads(message.content)

                synthesis = await self._synthesize_information(data)

                

                response = Message(

                    sender=self.agent_id,

                    recipient=message.sender,

                    content=synthesis,

                    message_type="synthesis_results",

                    metadata={

                        "sources_count": len(data.get("research_results", [])),

                        "synthesis_time": time.time() - start_time

                    }

                )

                

                self._update_response_time(time.time() - start_time)

                return [response]

            

            elif message.message_type == "integration_request":

                data = json.loads(message.content)

                integration = await self._integrate_sources(data)

                

                response = Message(

                    sender=self.agent_id,

                    recipient=message.sender,

                    content=json.dumps(integration),

                    message_type="integration_results",

                    metadata={"integration_time": time.time() - start_time}

                )

                

                return [response]

        

        except Exception as e:

            error_response = Message(

                sender=self.agent_id,

                recipient=message.sender,

                content=f"Synthesis error: {str(e)}",

                message_type="error_response",

                metadata={"error_time": time.time() - start_time}

            )

            self.performance_metrics["success_rate"] *= 0.95

            return [error_response]

        

        return []

    

    async def _synthesize_information(self, data: Dict[str, Any]) -> str:

        """Synthesize information from multiple sources."""

        # Check cache

        cache_key = str(hash(json.dumps(data, sort_keys=True)))

        if cache_key in self.synthesis_cache:

            return self.synthesis_cache[cache_key]

        

        await asyncio.sleep(0.15)  # Simulate processing time

        

        query = data.get("query", "")

        research_results = data.get("research_results", [])

        analysis_results = data.get("analysis_results", {})

        

        # Prepare synthesis content

        research_text = ""

        if research_results:

            research_text = "Research Findings:\n"

            for i, result in enumerate(research_results[:5]):

                if isinstance(result, dict):

                    content = result.get("content", "")

                    source = result.get("source", "unknown")

                    research_text += f"{i+1}. {content} (Source: {source})\n"

                else:

                    research_text += f"{i+1}. {str(result)}\n"

        

        analysis_text = ""

        if analysis_results and isinstance(analysis_results, dict):

            analysis_content = analysis_results.get("detailed_analysis", "")

            if analysis_content:

                analysis_text = f"Analysis Insights:\n{analysis_content}\n"

        

        synthesis_result = ""

        

        if self.llm_model and self.tokenizer:

            try:

                # Construct synthesis prompt

                prompt = f"""Based on the following research and analysis, provide a comprehensive and well-structured answer to the user's question.


User Question: {query}


{research_text}


{analysis_text}


Please synthesize this information into a clear, informative response that:

1. Directly addresses the user's question

2. Integrates key findings from research and analysis

3. Provides a logical flow of information

4. Includes relevant details and context


Synthesized Response:"""

                

                # Generate synthesis using LLM

                inputs = self.tokenizer.encode(

                    prompt,

                    return_tensors='pt',

                    truncation=True,

                    max_length=900

                )

                inputs = inputs.to(self.device)

                

                with torch.no_grad():

                    outputs = self.llm_model.generate(

                        inputs,

                        max_length=inputs.shape[1] + 250,

                        num_beams=3,

                        temperature=0.7,

                        do_sample=True,

                        pad_token_id=self.tokenizer.eos_token_id,

                        early_stopping=True

                    )

                

                synthesis_result = self.tokenizer.decode(

                    outputs[0][inputs.shape[1]:],

                    skip_special_tokens=True

                ).strip()

            

            except Exception as e:

                synthesis_result = f"I apologize, but I encountered an error synthesizing the information: {str(e)}"

        

        else:

            # Fallback synthesis

            synthesis_result = f"""Based on the available research and analysis for your question "{query}":


{research_text}


{analysis_text}


This information provides insights into your query, though a more detailed synthesis would require additional processing capabilities."""

        

        # Cache result

        self.synthesis_cache[cache_key] = synthesis_result

        

        return synthesis_result

    

    async def _integrate_sources(self, data: Dict[str, Any]) -> Dict[str, Any]:

        """Integrate information from multiple sources."""

        await asyncio.sleep(0.1)

        

        sources = data.get("sources", [])

        integration_type = data.get("type", "comprehensive")

        

        integration_result = {

            "integration_type": integration_type,

            "sources_processed": len(sources),

            "integrated_content": "",

            "source_mapping": {},

            "confidence": 0.8

        }

        

        # Simple integration logic

        integrated_content = []

        for i, source in enumerate(sources):

            if isinstance(source, dict):

                content = source.get("content", "")

                source_id = source.get("id", f"source_{i}")

                integrated_content.append(f"From {source_id}: {content}")

                integration_result["source_mapping"][source_id] = i

            else:

                integrated_content.append(f"Source {i}: {str(source)}")

        

        integration_result["integrated_content"] = "\n".join(integrated_content)

        

        return integration_result

    

    def _update_response_time(self, response_time: float):

        """Update average response time metric."""

        current_avg = self.performance_metrics["average_response_time"]

        message_count = self.performance_metrics["messages_processed"]

        

        if message_count == 1:

            self.performance_metrics["average_response_time"] = response_time

        else:

            self.performance_metrics["average_response_time"] = (

                (current_avg * (message_count - 1) + response_time) / message_count

            )

    

    def can_handle(self, task_type: str) -> bool:

        """Check if agent can handle specific task types."""

        return task_type in [

            "synthesis", "summarization", "response_generation", "integration",

            "information_fusion", "content_combination", "final_response"

        ]


class CoordinatorAgent(Agent):

    """

    Coordinator agent that orchestrates the multi-agent workflow.

    

    This agent manages task distribution, coordinates agent interactions,

    and ensures proper workflow execution across the system.

    """

    

    def __init__(self, agent_id: str, agents: Dict[str, Agent]):

        """

        Initialize coordinator agent.

        

        Args:

            agent_id (str): Unique agent identifier

            agents (Dict[str, Agent]): Dictionary of available agents

        """

        capabilities = [

            AgentCapability(

                name="task_orchestration",

                description="Coordinate tasks between multiple agents",

                input_types=["user_query"],

                output_types=["final_response"]

            ),

            AgentCapability(

                name="agent_management",

                description="Manage agent interactions and workflow",

                input_types=["agent_responses"],

                output_types=["coordination_decisions"]

            ),

            AgentCapability(

                name="workflow_optimization",

                description="Optimize workflow based on task requirements",

                input_types=["task_analysis"],

                output_types=["optimized_workflow"]

            )

        ]

        super().__init__(agent_id, AgentRole.COORDINATOR, capabilities)

        self.agents = agents

        self.active_workflows: Dict[str, Dict[str, Any]] = {}

        self.workflow_templates = self._initialize_workflow_templates()

    

    def _initialize_workflow_templates(self) -> Dict[str, List[str]]:

        """Initialize workflow templates for different query types."""

        return {

            "research_query": ["research", "analysis", "synthesis"],

            "factual_query": ["research", "fact_check", "synthesis"],

            "analytical_query": ["research", "analysis", "reasoning", "synthesis"],

            "general_query": ["research", "synthesis"],

            "complex_query": ["research", "analysis", "reasoning", "synthesis", "review"]

        }

    

    async def process_message(self, message: Message) -> List[Message]:

        """Process user queries and coordinate multi-agent responses."""

        start_time = time.time()

        self.add_message_to_history(message)

        

        try:

            if message.message_type == "user_query":

                response = await self._coordinate_response(message.content, message.id)

                

                return [Message(

                    sender=self.agent_id,

                    recipient=message.sender,

                    content=response,

                    message_type="final_response",

                    metadata={

                        "workflow_id": message.id,

                        "total_time": time.time() - start_time,

                        "agents_involved": self._get_agents_involved(message.id)

                    }

                )]

            

            elif message.message_type == "workflow_status_request":

                workflow_id = message.content

                status = self.get_workflow_status(workflow_id)

                

                return [Message(

                    sender=self.agent_id,

                    recipient=message.sender,

                    content=json.dumps(status),

                    message_type="workflow_status_response"

                )]

        

        except Exception as e:

            error_response = Message(

                sender=self.agent_id,

                recipient=message.sender,

                content=f"Coordination error: {str(e)}",

                message_type="error_response",

                metadata={"error_time": time.time() - start_time}

            )

            self.performance_metrics["success_rate"] *= 0.9

            return [error_response]

        

        return []

    

    async def _coordinate_response(self, query: str, workflow_id: str) -> str:

        """Coordinate multi-agent response to user query."""

        try:

            # Initialize workflow

            self.active_workflows[workflow_id] = {

                "query": query,

                "status": "active",

                "results": {},

                "start_time": time.time(),

                "steps_completed": [],

                "current_step": "initialization"

            }

            

            # Determine workflow type

            workflow_type = self._determine_workflow_type(query)

            workflow_steps = self.workflow_templates.get(workflow_type, ["research", "synthesis"])

            

            self.active_workflows[workflow_id]["workflow_type"] = workflow_type

            self.active_workflows[workflow_id]["planned_steps"] = workflow_steps

            

            # Execute workflow steps

            research_results = []

            analysis_results = {}

            

            if "research" in workflow_steps:

                self.active_workflows[workflow_id]["current_step"] = "research"

                research_results = await self._request_research(query)

                self.active_workflows[workflow_id]["results"]["research"] = research_results

                self.active_workflows[workflow_id]["steps_completed"].append("research")

            

            if "analysis" in workflow_steps and research_results:

                self.active_workflows[workflow_id]["current_step"] = "analysis"

                analysis_results = await self._request_analysis(research_results, query)

                self.active_workflows[workflow_id]["results"]["analysis"] = analysis_results

                self.active_workflows[workflow_id]["steps_completed"].append("analysis")

            

            if "reasoning" in workflow_steps:

                self.active_workflows[workflow_id]["current_step"] = "reasoning"

                # Additional reasoning step could be implemented here

                self.active_workflows[workflow_id]["steps_completed"].append("reasoning")

            

            if "synthesis" in workflow_steps:

                self.active_workflows[workflow_id]["current_step"] = "synthesis"

                final_response = await self._request_synthesis(query, research_results, analysis_results)

                self.active_workflows[workflow_id]["results"]["synthesis"] = final_response

                self.active_workflows[workflow_id]["steps_completed"].append("synthesis")

            else:

                final_response = "Unable to generate synthesis - synthesizer not available."

            

            # Mark workflow complete

            self.active_workflows[workflow_id]["status"] = "complete"

            self.active_workflows[workflow_id]["end_time"] = time.time()

            self.active_workflows[workflow_id]["current_step"] = "completed"

            

            return final_response

        

        except Exception as e:

            if workflow_id in self.active_workflows:

                self.active_workflows[workflow_id]["status"] = "error"

                self.active_workflows[workflow_id]["error"] = str(e)

            return f"I apologize, but I encountered an error coordinating the response: {str(e)}"

    

    def _determine_workflow_type(self, query: str) -> str:

        """Determine appropriate workflow type based on query characteristics."""

        query_lower = query.lower()

        

        # Simple heuristics for workflow type determination

        if any(word in query_lower for word in ["analyze", "analysis", "why", "how", "explain"]):

            return "analytical_query"

        elif any(word in query_lower for word in ["fact", "when", "where", "who", "what"]):

            return "factual_query"

        elif len(query.split()) > 15 or "?" in query:

            return "complex_query"

        elif any(word in query_lower for word in ["research", "find", "search", "information"]):

            return "research_query"

        else:

            return "general_query"

    

    async def _request_research(self, query: str) -> List[Dict[str, Any]]:

        """Request research from research agent."""

        researcher = self._find_agent_by_role(AgentRole.RESEARCHER)

        if not researcher:

            return []

        

        research_message = Message(

            sender=self.agent_id,

            recipient=researcher.agent_id,

            content=query,

            message_type="research_request"

        )

        

        responses = await researcher.process_message(research_message)

        if responses and responses[0].message_type != "error_response":

            try:

                return json.loads(responses[0].content)

            except json.JSONDecodeError:

                return []

        return []

    

    async def _request_analysis(self, research_results: List[Dict[str, Any]], query: str) -> Dict[str, Any]:

        """Request analysis from analyzer agent."""

        analyzer = self._find_agent_by_role(AgentRole.ANALYZER)

        if not analyzer:

            return {}

        

        analysis_data = {

            "content": research_results,

            "type": "research_analysis",

            "query": query

        }

        

        analysis_message = Message(

            sender=self.agent_id,

            recipient=analyzer.agent_id,

            content=json.dumps(analysis_data),

            message_type="analysis_request"

        )

        

        responses = await analyzer.process_message(analysis_message)

        if responses and responses[0].message_type != "error_response":

            try:

                return json.loads(responses[0].content)

            except json.JSONDecodeError:

                return {}

        return {}

    

    async def _request_synthesis(self, query: str, research_results: List[Dict[str, Any]], 

                                analysis_results: Dict[str, Any]) -> str:

        """Request synthesis from synthesizer agent."""

        synthesizer = self._find_agent_by_role(AgentRole.SYNTHESIZER)

        if not synthesizer:

            return "Unable to synthesize response - synthesizer agent not available."

        

        synthesis_data = {

            "query": query,

            "research_results": research_results,

            "analysis_results": analysis_results

        }

        

        synthesis_message = Message(

            sender=self.agent_id,

            recipient=synthesizer.agent_id,

            content=json.dumps(synthesis_data),

            message_type="synthesis_request"

        )

        

        responses = await synthesizer.process_message(synthesis_message)

        if responses and responses[0].message_type != "error_response":

            return responses[0].content

        return "Unable to generate synthesis."

    

    def _find_agent_by_role(self, role: AgentRole) -> Optional[Agent]:

        """Find agent by role."""

        for agent in self.agents.values():

            if agent.role == role:

                return agent

        return None

    

    def _get_agents_involved(self, workflow_id: str) -> List[str]:

        """Get list of agents involved in a workflow."""

        if workflow_id not in self.active_workflows:

            return []

        

        workflow = self.active_workflows[workflow_id]

        steps_completed = workflow.get("steps_completed", [])

        

        agents_involved = []

        if "research" in steps_completed:

            agents_involved.append("researcher")

        if "analysis" in steps_completed:

            agents_involved.append("analyzer")

        if "synthesis" in steps_completed:

            agents_involved.append("synthesizer")

        

        return agents_involved

    

    def can_handle(self, task_type: str) -> bool:

        """Check if coordinator can handle specific task types."""

        return task_type in [

            "coordination", "orchestration", "workflow_management",

            "task_distribution", "agent_management"

        ]

    

    def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:

        """Get status of a specific workflow."""

        if workflow_id not in self.active_workflows:

            return {"status": "not_found"}

        

        workflow = self.active_workflows[workflow_id]

        return {

            "workflow_id": workflow_id,

            "status": workflow.get("status", "unknown"),

            "current_step": workflow.get("current_step", "unknown"),

            "steps_completed": workflow.get("steps_completed", []),

            "planned_steps": workflow.get("planned_steps", []),

            "start_time": workflow.get("start_time"),

            "end_time": workflow.get("end_time"),

            "results_available": list(workflow.get("results", {}).keys())

        }

    

    def get_system_status(self) -> Dict[str, Any]:

        """Get overall system status."""

        agent_status = {}

        for agent_id, agent in self.agents.items():

            agent_status[agent_id] = {

                "role": agent.role.value,

                "is_active": agent.is_active,

                "performance": agent.get_performance_metrics()

            }

        

        return {

            "total_agents": len(self.agents),

            "active_workflows": len([w for w in self.active_workflows.values() if w.get("status") == "active"]),

            "completed_workflows": len([w for w in self.active_workflows.values() if w.get("status") == "complete"]),

            "agent_status": agent_status,

            "coordinator_performance": self.get_performance_metrics()

        }


class MultiAgentSystem:

    """

    Complete multi-agent system implementation.

    

    This class orchestrates multiple specialized agents to provide

    comprehensive responses through collaborative problem-solving.

    """

    

    def __init__(self, llm_model=None, tokenizer=None, device=None, knowledge_base=None):

        """

        Initialize multi-agent system.

        

        Args:

            llm_model: Language model for agents

            tokenizer: Tokenizer for the language model

            device: Device for model inference

            knowledge_base: Knowledge base for research

        """

        print("Initializing Multi-Agent System...")

        

        self.llm_model = llm_model

        self.tokenizer = tokenizer

        self.device = device

        self.knowledge_base = knowledge_base

        

        # Initialize agents

        self.agents = {}

        self._initialize_agents()

        

        # Initialize coordinator

        self.coordinator = CoordinatorAgent("coordinator_001", self.agents)

        self.agents["coordinator"] = self.coordinator

        

        print(f"Multi-Agent System initialized with {len(self.agents)} agents")

    

    def _initialize_agents(self):

        """Initialize all specialized agents."""

        # Research Agent

        self.agents["researcher"] = ResearchAgent(

            "researcher_001",

            self.knowledge_base

        )

        

        # Analyzer Agent

        self.agents["analyzer"] = AnalyzerAgent(

            "analyzer_001",

            self.llm_model,

            self.tokenizer,

            self.device

        )

        

        # Synthesizer Agent

        self.agents["synthesizer"] = SynthesizerAgent(

            "synthesizer_001",

            self.llm_model,

            self.tokenizer,

            self.device

        )

    

    async def process_query(self, query: str) -> Dict[str, Any]:

        """

        Process user query through multi-agent system.

        

        Args:

            query (str): User query

            

        Returns:

            Dict[str, Any]: Response with metadata

        """

        start_time = time.time()

        

        # Create user message

        user_message = Message(

            sender="user",

            recipient="coordinator",

            content=query,

            message_type="user_query"

        )

        

        # Process through coordinator

        responses = await self.coordinator.process_message(user_message)

        

        if responses:

            response = responses[0]

            workflow_status = self.coordinator.get_workflow_status(user_message.id)

            

            return {

                "response": response.content,

                "workflow_id": user_message.id,

                "total_time": time.time() - start_time,

                "workflow_status": workflow_status,

                "agents_involved": workflow_status.get("results_available", []),

                "success": response.message_type != "error_response"

            }

        

        return {

            "response": "No response generated",

            "workflow_id": user_message.id,

            "total_time": time.time() - start_time,

            "workflow_status": {},

            "agents_involved": [],

            "success": False

        }

    

    def get_system_status(self) -> Dict[str, Any]:

        """Get comprehensive system status."""

        return self.coordinator.get_system_status()

    

    def get_agent_performance(self) -> Dict[str, Dict[str, Any]]:

        """Get performance metrics for all agents."""

        performance = {}

        for agent_id, agent in self.agents.items():

            performance[agent_id] = {

                "role": agent.role.value,

                "metrics": agent.get_performance_metrics(),

                "capabilities": [cap.name for cap in agent.capabilities]

            }

        return performance

    

    def run_interactive_session(self):

        """Run interactive multi-agent session."""

        print("\n" + "=" * 70)

        print("MULTI-AGENT SYSTEM - Interactive Session")

        print("=" * 70)

        print("Commands:")

        print("  'quit' or 'exit' - End session")

        print("  'status' - Show system status")

        print("  'performance' - Show agent performance")

        print("  'agents' - List all agents")

        print("=" * 70)

        

        async def run_session():

            while True:

                try:

                    user_input = input(f"\nYou: ").strip()

                    

                    if user_input.lower() in ['quit', 'exit', 'bye']:

                        print("\nMulti-Agent System: Thank you for using the system! Goodbye!")

                        break

                    

                    elif user_input.lower() == 'status':

                        status = self.get_system_status()

                        print(f"\nSystem Status:")

                        print(f"  Total agents: {status['total_agents']}")

                        print(f"  Active workflows: {status['active_workflows']}")

                        print(f"  Completed workflows: {status['completed_workflows']}")

                        continue

                    

                    elif user_input.lower() == 'performance':

                        performance = self.get_agent_performance()

                        print(f"\nAgent Performance:")

                        for agent_id, perf in performance.items():

                            metrics = perf['metrics']

                            print(f"  {agent_id} ({perf['role']}):")

                            print(f"    Messages processed: {metrics['messages_processed']}")

                            print(f"    Avg response time: {metrics['average_response_time']:.3f}s")

                            print(f"    Success rate: {metrics['success_rate']:.2%}")

                        continue

                    

                    elif user_input.lower() == 'agents':

                        print(f"\nAvailable Agents:")

                        for agent_id, agent in self.agents.items():

                            print(f"  {agent_id}: {agent.role.value}")

                            for cap in agent.capabilities:

                                print(f"    - {cap.name}: {cap.description}")

                        continue

                    

                    if not user_input:

                        continue

                    

                    # Process query

                    print("Multi-Agent System: Processing your request...", flush=True)

                    

                    result = await self.process_query(user_input)

                    

                    print(f"\nResponse: {result['response']}")

                    

                    # Show metadata

                    if result['success']:

                        print(f"\nProcessing Details:")

                        print(f"  Workflow ID: {result['workflow_id']}")

                        print(f"  Total time: {result['total_time']:.2f}s")

                        print(f"  Agents involved: {', '.join(result['agents_involved'])}")

                    

                except KeyboardInterrupt:

                    print("\n\nSession interrupted. Goodbye!")

                    break

                except Exception as e:

                    print(f"\nError: {e}")

        

        # Run async session

        asyncio.run(run_session())


def create_sample_knowledge_base_for_mas():

    """Create sample knowledge base for multi-agent system demonstration."""

    sample_documents = [

        """

        Artificial Intelligence has revolutionized various industries through machine learning, 

        natural language processing, and computer vision. Companies like Google, Microsoft, 

        and OpenAI are leading the development of AI technologies. Machine learning algorithms 

        can analyze vast amounts of data to identify patterns and make predictions. Deep learning, 

        a subset of machine learning, uses neural networks with multiple layers to process 

        complex data structures.

        """,

        """

        Climate change represents one of the most significant challenges of our time. Rising 

        global temperatures are causing ice caps to melt, sea levels to rise, and weather 

        patterns to become more extreme. Renewable energy sources like solar, wind, and 

        hydroelectric power offer sustainable alternatives to fossil fuels. Many countries 

        are implementing carbon reduction policies and investing in green technologies to 

        combat climate change.

        """,

        """

        Space exploration has entered a new era with private companies like SpaceX, Blue Origin, 

        and Virgin Galactic joining traditional space agencies. The International Space Station 

        serves as a platform for scientific research and international cooperation. Mars 

        exploration missions are planned by NASA, ESA, and other space agencies. The James 

        Webb Space Telescope is providing unprecedented views of distant galaxies and 

        exoplanets.

        """,

        """

        Quantum computing represents a paradigm shift in computational power. Unlike classical 

        computers that use bits, quantum computers use quantum bits (qubits) that can exist 

        in multiple states simultaneously. Companies like IBM, Google, and Rigetti are 

        developing quantum processors. Quantum computers could revolutionize cryptography, 

        drug discovery, and optimization problems that are intractable for classical computers.

        """,

        """

        Biotechnology and genetic engineering are transforming medicine and agriculture. 

        CRISPR-Cas9 gene editing technology allows precise modifications to DNA sequences. 

        Personalized medicine uses genetic information to tailor treatments to individual 

        patients. Synthetic biology combines engineering principles with biological systems 

        to create new organisms and biological functions. These advances raise important 

        ethical considerations about genetic modification and privacy.

        """

    ]

    

    sources = [

        "AI_Technology_Overview",

        "Climate_Change_Report",

        "Space_Exploration_Update",

        "Quantum_Computing_Guide",

        "Biotechnology_Advances"

    ]

    

    return sample_documents, sources


def main():

    """

    Main function demonstrating multi-agent system functionality.

    """

    try:

        # Initialize components

        print("Setting up Multi-Agent System...")

        

        # Initialize device

        if torch.cuda.is_available():

            device = torch.device("cuda")

        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():

            device = torch.device("mps")

        else:

            device = torch.device("cpu")

        

        print(f"Using device: {device}")

        

        # Initialize LLM

        print("Loading language model...")

        tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")

        model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")

        

        if tokenizer.pad_token is None:

            tokenizer.pad_token = tokenizer.eos_token

        

        model.to(device)

        model.eval()

        

        # Initialize knowledge base

        print("Creating knowledge base...")

        from chapter_b_rag import KnowledgeBase  # Reuse from previous chapter

        knowledge_base = KnowledgeBase()

        

        documents, sources = create_sample_knowledge_base_for_mas()

        knowledge_base.add_documents(documents, sources)

        

        # Initialize multi-agent system

        mas = MultiAgentSystem(

            llm_model=model,

            tokenizer=tokenizer,

            device=device,

            knowledge_base=knowledge_base

        )

        

        # Show system status

        status = mas.get_system_status()

        print(f"\nSystem ready with {status['total_agents']} agents:")

        for agent_id, agent_info in status['agent_status'].items():

            print(f"  {agent_id}: {agent_info['role']}")

        

        # Run interactive session

        mas.run_interactive_session()

        

    except Exception as e:

        print(f"Failed to initialize Multi-Agent System: {e}")

        print("Please check your installation and try again.")


if __name__ == "__main__":

    main()


This comprehensive multi-agent system implementation demonstrates the power of collaborative AI through specialized agents working together. The system provides superior problem-solving capabilities by leveraging the strengths of different agents while maintaining coordination through a central orchestrator.


CHAPTER E: IMPLEMENTING AGENTIC AI


Agentic AI represents the pinnacle of autonomous artificial intelligence systems, characterized by agents that can independently plan, execute, and adapt their actions to achieve complex goals. Unlike traditional reactive systems that respond to specific inputs, agentic AI systems demonstrate proactive behavior, goal-oriented reasoning, and the ability to learn and improve from experience.


The fundamental distinction of agentic AI lies in its autonomous decision-making capabilities. These systems can formulate plans, execute actions, monitor progress, and adapt strategies based on changing circumstances. Agentic AI combines elements of planning algorithms, reinforcement learning, and multi-step reasoning to create intelligent agents capable of operating independently in complex environments.


The rationale for implementing agentic AI stems from the need for systems that can handle open-ended, long-term tasks without constant human supervision. Traditional chatbots and even multi-agent systems typically require explicit instructions for each interaction. Agentic AI systems can pursue high-level objectives through autonomous planning and execution, making them suitable for complex workflows, research tasks, and creative problem-solving.


Agentic AI architecture incorporates several sophisticated components working in harmony. Goal management systems define and prioritize objectives. Planning engines decompose complex goals into actionable steps. Execution engines carry out planned actions while monitoring progress. Learning mechanisms enable agents to improve performance over time. Memory systems maintain context and experience across interactions.


The implementation of agentic AI requires advanced frameworks that support autonomous reasoning, planning, and execution. We will build upon our previous multi-agent foundation while adding goal-oriented behavior, planning capabilities, and autonomous execution mechanisms.


Setting up agentic AI extends our existing infrastructure with planning and goal management capabilities:


import asyncio

from abc import ABC, abstractmethod

from dataclasses import dataclass, field

from enum import Enum

from typing import Dict, List, Any, Optional, Callable, Union

import json

import time

import uuid

from collections import deque

import heapq


class GoalStatus(Enum):

    PENDING = "pending"

    IN_PROGRESS = "in_progress"

    COMPLETED = "completed"

    FAILED = "failed"

    PAUSED = "paused"

    CANCELLED = "cancelled"


class ActionType(Enum):

    RESEARCH = "research"

    ANALYZE = "analyze"

    SYNTHESIZE = "synthesize"

    COMMUNICATE = "communicate"

    PLAN = "plan"

    EXECUTE = "execute"

    MONITOR = "monitor"

    LEARN = "learn"


@dataclass

class Goal:

    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    description: str = ""

    priority: int = 1  # 1 = highest priority

    status: GoalStatus = GoalStatus.PENDING

    parent_goal_id: Optional[str] = None

    sub_goals: List[str] = field(default_factory=list)

    required_capabilities: List[str] = field(default_factory=list)

    success_criteria: Dict[str, Any] = field(default_factory=dict)

    deadline: Optional[float] = None

    created_at: float = field(default_factory=time.time)

    updated_at: float = field(default_factory=time.time)

    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass

class Action:

    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    type: ActionType = ActionType.EXECUTE

    description: str = ""

    parameters: Dict[str, Any] = field(default_factory=dict)

    prerequisites: List[str] = field(default_factory=list)

    expected_outcome: str = ""

    estimated_duration: float = 0.0

    actual_duration: Optional[float] = None

    status: str = "pending"

    result: Optional[Dict[str, Any]] = None

    created_at: float = field(default_factory=time.time)


@dataclass

class Plan:

    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    goal_id: str = ""

    actions: List[Action] = field(default_factory=list)

    execution_order: List[str] = field(default_factory=list)

    status: str = "draft"

    confidence: float = 0.0

    estimated_total_time: float = 0.0

    created_at: float = field(default_factory=time.time)

    metadata: Dict[str, Any] = field(default_factory=dict)


The goal management system provides the foundation for autonomous behavior by defining objectives, tracking progress, and managing goal hierarchies:


class GoalManager:

    def __init__(self):

        self.goals: Dict[str, Goal] = {}

        self.goal_hierarchy: Dict[str, List[str]] = {}

        self.active_goals: List[str] = []

        self.completed_goals: List[str] = []

        self.goal_priorities = []  # Min-heap for priority queue

    

    def create_goal(self, description: str, priority: int = 1, 

                   parent_goal_id: Optional[str] = None,

                   required_capabilities: List[str] = None,

                   success_criteria: Dict[str, Any] = None,

                   deadline: Optional[float] = None) -> Goal:

        """Create a new goal with specified parameters."""

        goal = Goal(

            description=description,

            priority=priority,

            parent_goal_id=parent_goal_id,

            required_capabilities=required_capabilities or [],

            success_criteria=success_criteria or {},

            deadline=deadline

        )

        

        self.goals[goal.id] = goal

        

        # Update goal hierarchy

        if parent_goal_id:

            if parent_goal_id not in self.goal_hierarchy:

                self.goal_hierarchy[parent_goal_id] = []

            self.goal_hierarchy[parent_goal_id].append(goal.id)

            

            # Add to parent's sub_goals

            if parent_goal_id in self.goals:

                self.goals[parent_goal_id].sub_goals.append(goal.id)

        

        # Add to priority queue

        heapq.heappush(self.goal_priorities, (priority, time.time(), goal.id))

        

        return goal

    

    def update_goal_status(self, goal_id: str, status: GoalStatus, 

                          metadata: Dict[str, Any] = None):

        """Update goal status and metadata."""

        if goal_id in self.goals:

            goal = self.goals[goal_id]

            goal.status = status

            goal.updated_at = time.time()

            

            if metadata:

                goal.metadata.update(metadata)

            

            # Update tracking lists

            if status == GoalStatus.IN_PROGRESS and goal_id not in self.active_goals:

                self.active_goals.append(goal_id)

            elif status == GoalStatus.COMPLETED:

                if goal_id in self.active_goals:

                    self.active_goals.remove(goal_id)

                if goal_id not in self.completed_goals:

                    self.completed_goals.append(goal_id)

    

    def get_next_goal(self) -> Optional[Goal]:

        """Get the next highest priority goal to work on."""

        while self.goal_priorities:

            priority, timestamp, goal_id = heapq.heappop(self.goal_priorities)

            

            if goal_id in self.goals:

                goal = self.goals[goal_id]

                if goal.status == GoalStatus.PENDING:

                    return goal

        

        return None

    

    def decompose_goal(self, goal_id: str, max_sub_goals: int = 5) -> List[Goal]:

        """Decompose a complex goal into smaller sub-goals."""

        if goal_id not in self.goals:

            return []

        

        goal = self.goals[goal_id]

        sub_goals = []

        

        # Simple goal decomposition logic

        if "research" in goal.description.lower():

            sub_goals.extend([

                self.create_goal(

                    f"Gather information for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["research"]

                ),

                self.create_goal(

                    f"Analyze findings for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["analysis"]

                )

            ])

        

        if "analyze" in goal.description.lower():

            sub_goals.append(

                self.create_goal(

                    f"Synthesize analysis for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["synthesis"]

                )

            )

        

        return sub_goals[:max_sub_goals]

    

    def get_goal_progress(self, goal_id: str) -> Dict[str, Any]:

        """Get progress information for a goal."""

        if goal_id not in self.goals:

            return {}

        

        goal = self.goals[goal_id]

        sub_goal_ids = self.goal_hierarchy.get(goal_id, [])

        

        if not sub_goal_ids:

            return {

                "goal_id": goal_id,

                "status": goal.status.value,

                "progress": 1.0 if goal.status == GoalStatus.COMPLETED else 0.0,

                "sub_goals": 0

            }

        

        completed_sub_goals = sum(

            1 for sub_id in sub_goal_ids 

            if sub_id in self.goals and self.goals[sub_id].status == GoalStatus.COMPLETED

        )

        

        progress = completed_sub_goals / len(sub_goal_ids) if sub_goal_ids else 0.0

        

        return {

            "goal_id": goal_id,

            "status": goal.status.value,

            "progress": progress,

            "sub_goals": len(sub_goal_ids),

            "completed_sub_goals": completed_sub_goals

        }


The planning engine creates detailed execution plans for achieving goals, incorporating action sequencing, resource allocation, and contingency planning:


class PlanningEngine:

    def __init__(self, available_capabilities: List[str]):

        self.available_capabilities = available_capabilities

        self.planning_strategies = {

            "sequential": self._create_sequential_plan,

            "parallel": self._create_parallel_plan,

            "adaptive": self._create_adaptive_plan

        }

        self.action_templates = self._initialize_action_templates()

    

    def _initialize_action_templates(self) -> Dict[str, Dict[str, Any]]:

        """Initialize templates for different action types."""

        return {

            "research": {

                "type": ActionType.RESEARCH,

                "estimated_duration": 30.0,

                "required_capabilities": ["research", "information_retrieval"]

            },

            "analyze": {

                "type": ActionType.ANALYZE,

                "estimated_duration": 45.0,

                "required_capabilities": ["analysis", "reasoning"]

            },

            "synthesize": {

                "type": ActionType.SYNTHESIZE,

                "estimated_duration": 20.0,

                "required_capabilities": ["synthesis", "integration"]

            },

            "communicate": {

                "type": ActionType.COMMUNICATE,

                "estimated_duration": 10.0,

                "required_capabilities": ["communication", "formatting"]

            }

        }

    

    def create_plan(self, goal: Goal, strategy: str = "adaptive") -> Plan:

        """Create an execution plan for a given goal."""

        if strategy not in self.planning_strategies:

            strategy = "adaptive"

        

        planning_function = self.planning_strategies[strategy]

        return planning_function(goal)

    

    def _create_sequential_plan(self, goal: Goal) -> Plan:

        """Create a sequential execution plan."""

        actions = []

        execution_order = []

        

        # Determine required actions based on goal

        required_actions = self._analyze_goal_requirements(goal)

        

        for i, action_type in enumerate(required_actions):

            action = self._create_action(action_type, goal, f"step_{i+1}")

            actions.append(action)

            execution_order.append(action.id)

        

        # Set prerequisites for sequential execution

        for i in range(1, len(actions)):

            actions[i].prerequisites = [actions[i-1].id]

        

        plan = Plan(

            goal_id=goal.id,

            actions=actions,

            execution_order=execution_order,

            status="ready",

            confidence=0.8,

            estimated_total_time=sum(action.estimated_duration for action in actions)

        )

        

        return plan

    

    def _create_parallel_plan(self, goal: Goal) -> Plan:

        """Create a parallel execution plan where possible."""

        actions = []

        execution_order = []

        

        required_actions = self._analyze_goal_requirements(goal)

        

        # Group actions that can run in parallel

        parallel_groups = self._group_parallel_actions(required_actions)

        

        for group_index, action_group in enumerate(parallel_groups):

            group_actions = []

            for action_type in action_group:

                action = self._create_action(action_type, goal, f"group_{group_index}")

                actions.append(action)

                group_actions.append(action.id)

            

            # Actions in the same group can run in parallel

            execution_order.extend(group_actions)

            

            # Set prerequisites between groups

            if group_index > 0:

                prev_group_actions = [a.id for a in actions if f"group_{group_index-1}" in a.description]

                for action in group_actions:

                    action_obj = next(a for a in actions if a.id == action)

                    action_obj.prerequisites = prev_group_actions

        

        plan = Plan(

            goal_id=goal.id,

            actions=actions,

            execution_order=execution_order,

            status="ready",

            confidence=0.7,

            estimated_total_time=max(

                sum(action.estimated_duration for action in group)

                for group in parallel_groups

            ) if parallel_groups else 0.0

        )

        

        return plan

    

    def _create_adaptive_plan(self, goal: Goal) -> Plan:

        """Create an adaptive plan that can be modified during execution."""

        # Start with sequential plan

        base_plan = self._create_sequential_plan(goal)

        

        # Add monitoring and adaptation actions

        monitor_action = Action(

            type=ActionType.MONITOR,

            description=f"Monitor progress for goal: {goal.description}",

            parameters={"goal_id": goal.id, "check_interval": 30.0},

            estimated_duration=5.0

        )

        

        adapt_action = Action(

            type=ActionType.PLAN,

            description=f"Adapt plan if needed for goal: {goal.description}",

            parameters={"goal_id": goal.id, "adaptation_threshold": 0.3},

            estimated_duration=15.0

        )

        

        base_plan.actions.extend([monitor_action, adapt_action])

        base_plan.confidence = 0.9

        base_plan.metadata["adaptive"] = True

        

        return base_plan

    

    def _analyze_goal_requirements(self, goal: Goal) -> List[str]:

        """Analyze goal to determine required actions."""

        required_actions = []

        description_lower = goal.description.lower()

        

        # Simple rule-based action determination

        if any(word in description_lower for word in ["research", "find", "search", "investigate"]):

            required_actions.append("research")

        

        if any(word in description_lower for word in ["analyze", "examine", "study", "evaluate"]):

            required_actions.append("analyze")

        

        if any(word in description_lower for word in ["synthesize", "combine", "integrate", "summarize"]):

            required_actions.append("synthesize")

        

        if any(word in description_lower for word in ["communicate", "report", "present", "explain"]):

            required_actions.append("communicate")

        

        # Default actions if none detected

        if not required_actions:

            required_actions = ["research", "analyze", "synthesize"]

        

        return required_actions

    

    def _group_parallel_actions(self, actions: List[str]) -> List[List[str]]:

        """Group actions that can be executed in parallel."""

        # Simple grouping logic - research can be parallel, others sequential

        groups = []

        current_group = []

        

        for action in actions:

            if action == "research" and not current_group:

                current_group.append(action)

            elif action == "research" and current_group and all(a == "research" for a in current_group):

                current_group.append(action)

            else:

                if current_group:

                    groups.append(current_group)

                current_group = [action]

        

        if current_group:

            groups.append(current_group)

        

        return groups

    

    def _create_action(self, action_type: str, goal: Goal, context: str) -> Action:

        """Create an action based on type and context."""

        template = self.action_templates.get(action_type, {})

        

        action = Action(

            type=template.get("type", ActionType.EXECUTE),

            description=f"{action_type.title()} action for goal: {goal.description} ({context})",

            parameters={

                "goal_id": goal.id,

                "action_type": action_type,

                "context": context

            },

            expected_outcome=f"Completed {action_type} for {goal.description}",

            estimated_duration=template.get("estimated_duration", 30.0)

        )

        

        return action


The autonomous execution engine carries out planned actions while monitoring progress and adapting to changing circumstances:


class AutonomousExecutor:

    def __init__(self, agents: Dict[str, Any], goal_manager: GoalManager, 

                 planning_engine: PlanningEngine):

        self.agents = agents

        self.goal_manager = goal_manager

        self.planning_engine = planning_engine

        self.active_plans: Dict[str, Plan] = {}

        self.execution_history: List[Dict[str, Any]] = []

        self.learning_data: Dict[str, List[float]] = {}

    

    async def execute_goal(self, goal_id: str) -> Dict[str, Any]:

        """Execute a goal autonomously through planning and action execution."""

        if goal_id not in self.goal_manager.goals:

            return {"success": False, "error": "Goal not found"}

        

        goal = self.goal_manager.goals[goal_id]

        

        try:

            # Update goal status

            self.goal_manager.update_goal_status(goal_id, GoalStatus.IN_PROGRESS)

            

            # Create execution plan

            plan = self.planning_engine.create_plan(goal)

            self.active_plans[goal_id] = plan

            

            # Execute plan

            execution_result = await self._execute_plan(plan)

            

            # Update goal status based on execution result

            if execution_result["success"]:

                self.goal_manager.update_goal_status(

                    goal_id, 

                    GoalStatus.COMPLETED,

                    {"completion_time": time.time(), "execution_result": execution_result}

                )

            else:

                self.goal_manager.update_goal_status(

                    goal_id,

                    GoalStatus.FAILED,

                    {"failure_time": time.time(), "failure_reason": execution_result.get("error")}

                )

            

            # Record execution for learning

            self._record_execution(goal, plan, execution_result)

            

            return execution_result

        

        except Exception as e:

            self.goal_manager.update_goal_status(

                goal_id,

                GoalStatus.FAILED,

                {"failure_time": time.time(), "error": str(e)}

            )

            return {"success": False, "error": str(e)}

    

    async def _execute_plan(self, plan: Plan) -> Dict[str, Any]:

        """Execute a plan by running its actions in order."""

        execution_start = time.time()

        executed_actions = []

        results = {}

        

        try:

            for action_id in plan.execution_order:

                action = next((a for a in plan.actions if a.id == action_id), None)

                if not action:

                    continue

                

                # Check prerequisites

                if not await self._check_prerequisites(action, executed_actions):

                    return {

                        "success": False,

                        "error": f"Prerequisites not met for action {action_id}",

                        "executed_actions": executed_actions

                    }

                

                # Execute action

                action_result = await self._execute_action(action)

                action.result = action_result

                action.status = "completed" if action_result.get("success") else "failed"

                

                executed_actions.append(action_id)

                results[action_id] = action_result

                

                # Check if action failed and plan should be aborted

                if not action_result.get("success") and not plan.metadata.get("adaptive"):

                    return {

                        "success": False,

                        "error": f"Action {action_id} failed: {action_result.get('error')}",

                        "executed_actions": executed_actions,

                        "results": results

                    }

                

                # For adaptive plans, try to recover from failures

                if not action_result.get("success") and plan.metadata.get("adaptive"):

                    recovery_result = await self._attempt_recovery(action, plan)

                    if recovery_result.get("success"):

                        action.result = recovery_result

                        action.status = "recovered"

                        results[action_id] = recovery_result

            

            execution_time = time.time() - execution_start

            

            return {

                "success": True,

                "execution_time": execution_time,

                "executed_actions": executed_actions,

                "results": results,

                "plan_id": plan.id

            }

        

        except Exception as e:

            return {

                "success": False,

                "error": str(e),

                "executed_actions": executed_actions,

                "results": results

            }

    

    async def _execute_action(self, action: Action) -> Dict[str, Any]:

        """Execute a single action using appropriate agent."""

        action_start = time.time()

        

        try:

            # Determine which agent to use based on action type

            agent = self._select_agent_for_action(action)

            if not agent:

                return {

                    "success": False,

                    "error": f"No suitable agent found for action type {action.type.value}"

                }

            

            # Prepare action parameters

            action_params = self._prepare_action_parameters(action)

            

            # Execute action through agent

            if action.type == ActionType.RESEARCH:

                result = await self._execute_research_action(agent, action_params)

            elif action.type == ActionType.ANALYZE:

                result = await self._execute_analysis_action(agent, action_params)

            elif action.type == ActionType.SYNTHESIZE:

                result = await self._execute_synthesis_action(agent, action_params)

            elif action.type == ActionType.COMMUNICATE:

                result = await self._execute_communication_action(agent, action_params)

            else:

                result = {"success": False, "error": f"Unknown action type: {action.type.value}"}

            

            # Record actual duration

            action.actual_duration = time.time() - action_start

            

            return result

        

        except Exception as e:

            action.actual_duration = time.time() - action_start

            return {"success": False, "error": str(e)}

    

    def _select_agent_for_action(self, action: Action) -> Optional[Any]:

        """Select the most appropriate agent for an action."""

        action_type = action.type

        

        if action_type == ActionType.RESEARCH and "researcher" in self.agents:

            return self.agents["researcher"]

        elif action_type == ActionType.ANALYZE and "analyzer" in self.agents:

            return self.agents["analyzer"]

        elif action_type == ActionType.SYNTHESIZE and "synthesizer" in self.agents:

            return self.agents["synthesizer"]

        elif "coordinator" in self.agents:

            return self.agents["coordinator"]  # Fallback to coordinator

        

        return None

    

    def _prepare_action_parameters(self, action: Action) -> Dict[str, Any]:

        """Prepare parameters for action execution."""

        base_params = action.parameters.copy()

        base_params.update({

            "action_id": action.id,

            "action_description": action.description,

            "expected_outcome": action.expected_outcome

        })

        return base_params

    

    async def _execute_research_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:

        """Execute research action through research agent."""

        try:

            from chapter_d_multiagent import Message  # Import from previous chapter

            

            research_message = Message(

                sender="executor",

                recipient=agent.agent_id,

                content=params.get("action_description", ""),

                message_type="research_request"

            )

            

            responses = await agent.process_message(research_message)

            

            if responses and responses[0].message_type != "error_response":

                return {

                    "success": True,

                    "data": json.loads(responses[0].content),

                    "agent_used": agent.agent_id

                }

            else:

                return {

                    "success": False,

                    "error": "Research agent failed to provide results"

                }

        

        except Exception as e:

            return {"success": False, "error": str(e)}

    

    async def _execute_analysis_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:

        """Execute analysis action through analyzer agent."""

        try:

            from chapter_d_multiagent import Message

            

            analysis_data = {

                "content": params.get("research_data", []),

                "type": "goal_analysis",

                "query": params.get("action_description", "")

            }

            

            analysis_message = Message(

                sender="executor",

                recipient=agent.agent_id,

                content=json.dumps(analysis_data),

                message_type="analysis_request"

            )

            

            responses = await agent.process_message(analysis_message)

            

            if responses and responses[0].message_type != "error_response":

                return {

                    "success": True,

                    "data": json.loads(responses[0].content),

                    "agent_used": agent.agent_id

                }

            else:

                return {

                    "success": False,

                    "error": "Analysis agent failed to provide results"

                }

        

        except Exception as e:

            return {"success": False, "error": str(e)}

    

    async def _execute_synthesis_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:

        """Execute synthesis action through synthesizer agent."""

        try:

            from chapter_d_multiagent import Message

            

            synthesis_data = {

                "query": params.get("action_description", ""),

                "research_results": params.get("research_data", []),

                "analysis_results": params.get("analysis_data", {})

            }

            

            synthesis_message = Message(

                sender="executor",

                recipient=agent.agent_id,

                content=json.dumps(synthesis_data),

                message_type="synthesis_request"

            )

            

            responses = await agent.process_message(synthesis_message)

            

            if responses and responses[0].message_type != "error_response":

                return {

                    "success": True,

                    "data": responses[0].content,

                    "agent_used": agent.agent_id

                }

            else:

                return {

                    "success": False,

                    "error": "Synthesis agent failed to provide results"

                }

        

        except Exception as e:

            return {"success": False, "error": str(e)}

    

    async def _execute_communication_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:

        """Execute communication action."""

        try:

            # Simple communication action - format and present results

            content = params.get("synthesis_data", "")

            formatted_content = f"Communication Result: {content}"

            

            return {

                "success": True,

                "data": formatted_content,

                "agent_used": "communication_handler"

            }

        

        except Exception as e:

            return {"success": False, "error": str(e)}

    

    async def _check_prerequisites(self, action: Action, executed_actions: List[str]) -> bool:

        """Check if action prerequisites are satisfied."""

        return all(prereq in executed_actions for prereq in action.prerequisites)

    

    async def _attempt_recovery(self, failed_action: Action, plan: Plan) -> Dict[str, Any]:

        """Attempt to recover from action failure."""

        # Simple recovery strategy - retry with modified parameters

        try:

            modified_params = failed_action.parameters.copy()

            modified_params["retry"] = True

            modified_params["recovery_attempt"] = True

            

            # Create a simplified version of the action

            recovery_action = Action(

                type=failed_action.type,

                description=f"Recovery attempt for: {failed_action.description}",

                parameters=modified_params,

                estimated_duration=failed_action.estimated_duration * 0.5

            )

            

            return await self._execute_action(recovery_action)

        

        except Exception as e:

            return {"success": False, "error": f"Recovery failed: {str(e)}"}

    

    def _record_execution(self, goal: Goal, plan: Plan, execution_result: Dict[str, Any]):

        """Record execution data for learning and improvement."""

        execution_record = {

            "goal_id": goal.id,

            "goal_description": goal.description,

            "plan_id": plan.id,

            "execution_time": execution_result.get("execution_time", 0.0),

            "success": execution_result.get("success", False),

            "actions_count": len(plan.actions),

            "timestamp": time.time()

        }

        

        self.execution_history.append(execution_record)

        

        # Update learning data

        goal_type = self._classify_goal_type(goal.description)

        if goal_type not in self.learning_data:

            self.learning_data[goal_type] = []

        

        self.learning_data[goal_type].append(execution_result.get("execution_time", 0.0))

    

    def _classify_goal_type(self, description: str) -> str:

        """Classify goal type for learning purposes."""

        description_lower = description.lower()

        

        if "research" in description_lower:

            return "research_goal"

        elif "analyze" in description_lower:

            return "analysis_goal"

        elif "synthesize" in description_lower:

            return "synthesis_goal"

        else:

            return "general_goal"

    

    def get_execution_statistics(self) -> Dict[str, Any]:

        """Get execution statistics for performance monitoring."""

        if not self.execution_history:

            return {"total_executions": 0}

        

        total_executions = len(self.execution_history)

        successful_executions = sum(1 for record in self.execution_history if record["success"])

        

        avg_execution_time = sum(record["execution_time"] for record in self.execution_history) / total_executions

        

        return {

            "total_executions": total_executions,

            "successful_executions": successful_executions,

            "success_rate": successful_executions / total_executions,

            "average_execution_time": avg_execution_time,

            "learning_data_points": sum(len(data) for data in self.learning_data.values())

        }


COMPLETE RUNNING EXAMPLE FOR AGENTIC AI:


import asyncio

import json

import time

import uuid

import heapq

from abc import ABC, abstractmethod

from dataclasses import dataclass, field

from enum import Enum

from typing import Dict, List, Any, Optional, Union

from collections import deque

import torch

from transformers import AutoModelForCausalLM, AutoTokenizer

import warnings

warnings.filterwarnings("ignore")


# Import classes from previous chapters

# Note: In a real implementation, these would be proper imports

# For this example, we'll include simplified versions


class GoalStatus(Enum):

    """Enumeration of possible goal statuses."""

    PENDING = "pending"

    IN_PROGRESS = "in_progress"

    COMPLETED = "completed"

    FAILED = "failed"

    PAUSED = "paused"

    CANCELLED = "cancelled"


class ActionType(Enum):

    """Enumeration of action types in the agentic system."""

    RESEARCH = "research"

    ANALYZE = "analyze"

    SYNTHESIZE = "synthesize"

    COMMUNICATE = "communicate"

    PLAN = "plan"

    EXECUTE = "execute"

    MONITOR = "monitor"

    LEARN = "learn"


@dataclass

class Goal:

    """

    Represents a goal in the agentic AI system.

    

    Goals can be hierarchical with parent-child relationships and

    include success criteria, deadlines, and priority levels.

    """

    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    description: str = ""

    priority: int = 1  # 1 = highest priority

    status: GoalStatus = GoalStatus.PENDING

    parent_goal_id: Optional[str] = None

    sub_goals: List[str] = field(default_factory=list)

    required_capabilities: List[str] = field(default_factory=list)

    success_criteria: Dict[str, Any] = field(default_factory=dict)

    deadline: Optional[float] = None

    created_at: float = field(default_factory=time.time)

    updated_at: float = field(default_factory=time.time)

    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass

class Action:

    """

    Represents an action that can be executed by the agentic system.

    

    Actions have types, parameters, prerequisites, and expected outcomes.

    """

    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    type: ActionType = ActionType.EXECUTE

    description: str = ""

    parameters: Dict[str, Any] = field(default_factory=dict)

    prerequisites: List[str] = field(default_factory=list)

    expected_outcome: str = ""

    estimated_duration: float = 0.0

    actual_duration: Optional[float] = None

    status: str = "pending"

    result: Optional[Dict[str, Any]] = None

    created_at: float = field(default_factory=time.time)


@dataclass

class Plan:

    """

    Represents an execution plan for achieving a goal.

    

    Plans contain sequences of actions with execution order and metadata.

    """

    id: str = field(default_factory=lambda: str(uuid.uuid4()))

    goal_id: str = ""

    actions: List[Action] = field(default_factory=list)

    execution_order: List[str] = field(default_factory=list)

    status: str = "draft"

    confidence: float = 0.0

    estimated_total_time: float = 0.0

    created_at: float = field(default_factory=time.time)

    metadata: Dict[str, Any] = field(default_factory=dict)


class GoalManager:

    """

    Manages goals in the agentic AI system.

    

    This class handles goal creation, status updates, hierarchical relationships,

    and priority-based goal selection for autonomous execution.

    """

    

    def __init__(self):

        """Initialize the goal manager."""

        self.goals: Dict[str, Goal] = {}

        self.goal_hierarchy: Dict[str, List[str]] = {}

        self.active_goals: List[str] = []

        self.completed_goals: List[str] = []

        self.goal_priorities = []  # Min-heap for priority queue

        self.goal_statistics = {

            "total_created": 0,

            "total_completed": 0,

            "total_failed": 0

        }

    

    def create_goal(self, description: str, priority: int = 1, 

                   parent_goal_id: Optional[str] = None,

                   required_capabilities: List[str] = None,

                   success_criteria: Dict[str, Any] = None,

                   deadline: Optional[float] = None) -> Goal:

        """

        Create a new goal with specified parameters.

        

        Args:

            description (str): Goal description

            priority (int): Priority level (1 = highest)

            parent_goal_id (str): Parent goal ID for hierarchical goals

            required_capabilities (List[str]): Required capabilities

            success_criteria (Dict[str, Any]): Success criteria

            deadline (float): Deadline timestamp

            

        Returns:

            Goal: Created goal object

        """

        goal = Goal(

            description=description,

            priority=priority,

            parent_goal_id=parent_goal_id,

            required_capabilities=required_capabilities or [],

            success_criteria=success_criteria or {},

            deadline=deadline

        )

        

        self.goals[goal.id] = goal

        self.goal_statistics["total_created"] += 1

        

        # Update goal hierarchy

        if parent_goal_id:

            if parent_goal_id not in self.goal_hierarchy:

                self.goal_hierarchy[parent_goal_id] = []

            self.goal_hierarchy[parent_goal_id].append(goal.id)

            

            # Add to parent's sub_goals

            if parent_goal_id in self.goals:

                self.goals[parent_goal_id].sub_goals.append(goal.id)

        

        # Add to priority queue

        heapq.heappush(self.goal_priorities, (priority, time.time(), goal.id))

        

        print(f"Created goal: {goal.description[:50]}...")

        return goal

    

    def update_goal_status(self, goal_id: str, status: GoalStatus, 

                          metadata: Dict[str, Any] = None):

        """

        Update goal status and associated metadata.

        

        Args:

            goal_id (str): Goal identifier

            status (GoalStatus): New status

            metadata (Dict[str, Any]): Additional metadata

        """

        if goal_id in self.goals:

            goal = self.goals[goal_id]

            old_status = goal.status

            goal.status = status

            goal.updated_at = time.time()

            

            if metadata:

                goal.metadata.update(metadata)

            

            # Update tracking lists

            if status == GoalStatus.IN_PROGRESS and goal_id not in self.active_goals:

                self.active_goals.append(goal_id)

            elif status == GoalStatus.COMPLETED:

                if goal_id in self.active_goals:

                    self.active_goals.remove(goal_id)

                if goal_id not in self.completed_goals:

                    self.completed_goals.append(goal_id)

                    self.goal_statistics["total_completed"] += 1

            elif status == GoalStatus.FAILED:

                if goal_id in self.active_goals:

                    self.active_goals.remove(goal_id)

                self.goal_statistics["total_failed"] += 1

            

            print(f"Goal status updated: {old_status.value} -> {status.value}")

    

    def get_next_goal(self) -> Optional[Goal]:

        """

        Get the next highest priority goal to work on.

        

        Returns:

            Optional[Goal]: Next goal to execute or None if no pending goals

        """

        while self.goal_priorities:

            priority, timestamp, goal_id = heapq.heappop(self.goal_priorities)

            

            if goal_id in self.goals:

                goal = self.goals[goal_id]

                if goal.status == GoalStatus.PENDING:

                    return goal

        

        return None

    

    def decompose_goal(self, goal_id: str, max_sub_goals: int = 5) -> List[Goal]:

        """

        Decompose a complex goal into smaller sub-goals.

        

        Args:

            goal_id (str): Goal to decompose

            max_sub_goals (int): Maximum number of sub-goals to create

            

        Returns:

            List[Goal]: List of created sub-goals

        """

        if goal_id not in self.goals:

            return []

        

        goal = self.goals[goal_id]

        sub_goals = []

        

        print(f"Decomposing goal: {goal.description}")

        

        # Intelligent goal decomposition based on description

        description_lower = goal.description.lower()

        

        if "research" in description_lower or "investigate" in description_lower:

            sub_goals.append(

                self.create_goal(

                    f"Gather information about: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["research", "information_retrieval"]

                )

            )

            

            sub_goals.append(

                self.create_goal(

                    f"Analyze gathered information for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["analysis", "reasoning"]

                )

            )

        

        if "analyze" in description_lower or "study" in description_lower:

            sub_goals.append(

                self.create_goal(

                    f"Perform detailed analysis for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["analysis", "pattern_recognition"]

                )

            )

            

            sub_goals.append(

                self.create_goal(

                    f"Synthesize analysis results for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["synthesis", "integration"]

                )

            )

        

        if "create" in description_lower or "generate" in description_lower:

            sub_goals.append(

                self.create_goal(

                    f"Plan creation process for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["planning", "design"]

                )

            )

            

            sub_goals.append(

                self.create_goal(

                    f"Execute creation for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["execution", "implementation"]

                )

            )

        

        # Default decomposition if no specific patterns found

        if not sub_goals:

            sub_goals.extend([

                self.create_goal(

                    f"Research phase for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["research"]

                ),

                self.create_goal(

                    f"Analysis phase for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["analysis"]

                ),

                self.create_goal(

                    f"Synthesis phase for: {goal.description}",

                    priority=goal.priority + 1,

                    parent_goal_id=goal_id,

                    required_capabilities=["synthesis"]

                )

            ])

        

        print(f"Created {len(sub_goals)} sub-goals")

        return sub_goals[:max_sub_goals]

    

    def get_goal_progress(self, goal_id: str) -> Dict[str, Any]:

        """

        Get progress information for a goal.

        

        Args:

            goal_id (str): Goal identifier

            

        Returns:

            Dict[str, Any]: Progress information

        """

        if goal_id not in self.goals:

            return {}

        

        goal = self.goals[goal_id]

        sub_goal_ids = self.goal_hierarchy.get(goal_id, [])

        

        if not sub_goal_ids:

            return {

                "goal_id": goal_id,

                "status": goal.status.value,

                "progress": 1.0 if goal.status == GoalStatus.COMPLETED else 0.0,

                "sub_goals": 0,

                "description": goal.description

            }

        

        completed_sub_goals = sum(

            1 for sub_id in sub_goal_ids 

            if sub_id in self.goals and self.goals[sub_id].status == GoalStatus.COMPLETED

        )

        

        progress = completed_sub_goals / len(sub_goal_ids) if sub_goal_ids else 0.0

        

        return {

            "goal_id": goal_id,

            "status": goal.status.value,

            "progress": progress,

            "sub_goals": len(sub_goal_ids),

            "completed_sub_goals": completed_sub_goals,

            "description": goal.description

        }

    

    def get_statistics(self) -> Dict[str, Any]:

        """Get goal management statistics."""

        return {

            "total_goals": len(self.goals),

            "active_goals": len(self.active_goals),

            "completed_goals": len(self.completed_goals),

            "pending_goals": len([g for g in self.goals.values() if g.status == GoalStatus.PENDING]),

            "statistics": self.goal_statistics

        }


class PlanningEngine:

    """

    Advanced planning engine for agentic AI systems.

    

    This engine creates detailed execution plans for goals, incorporating

    action sequencing, resource allocation, and adaptive planning strategies.

    """

    

    def __init__(self, available_capabilities: List[str]):

        """

        Initialize the planning engine.

        

        Args:

            available_capabilities (List[str]): Available system capabilities

        """

        self.available_capabilities = available_capabilities

        self.planning_strategies = {

            "sequential": self._create_sequential_plan,

            "parallel": self._create_parallel_plan,

            "adaptive": self._create_adaptive_plan

        }

        self.action_templates = self._initialize_action_templates()

        self.planning_history = []

    

    def _initialize_action_templates(self) -> Dict[str, Dict[str, Any]]:

        """Initialize templates for different action types."""

        return {

            "research": {

                "type": ActionType.RESEARCH,

                "estimated_duration": 30.0,

                "required_capabilities": ["research", "information_retrieval"],

                "success_indicators": ["data_retrieved", "sources_found"]

            },

            "analyze": {

                "type": ActionType.ANALYZE,

                "estimated_duration": 45.0,

                "required_capabilities": ["analysis", "reasoning"],

                "success_indicators": ["patterns_identified", "insights_generated"]

            },

            "synthesize": {

                "type": ActionType.SYNTHESIZE,

                "estimated_duration": 20.0,

                "required_capabilities": ["synthesis", "integration"],

                "success_indicators": ["information_integrated", "coherent_output"]

            },

            "communicate": {

                "type": ActionType.COMMUNICATE,

                "estimated_duration": 10.0,

                "required_capabilities": ["communication", "formatting"],

                "success_indicators": ["message_formatted", "information_conveyed"]

            },

            "monitor": {

                "type": ActionType.MONITOR,

                "estimated_duration": 5.0,

                "required_capabilities": ["monitoring", "evaluation"],

                "success_indicators": ["progress_tracked", "status_updated"]

            }

        }

    

    def create_plan(self, goal: Goal, strategy: str = "adaptive") -> Plan:

        """

        Create an execution plan for a given goal.

        

        Args:

            goal (Goal): Goal to create plan for

            strategy (str): Planning strategy to use

            

        Returns:

            Plan: Created execution plan

        """

        print(f"Creating {strategy} plan for goal: {goal.description[:50]}...")

        

        if strategy not in self.planning_strategies:

            print(f"Unknown strategy {strategy}, using adaptive")

            strategy = "adaptive"

        

        planning_function = self.planning_strategies[strategy]

        plan = planning_function(goal)

        

        # Record planning history

        self.planning_history.append({

            "goal_id": goal.id,

            "strategy": strategy,

            "actions_count": len(plan.actions),

            "estimated_time": plan.estimated_total_time,

            "timestamp": time.time()

        })

        

        print(f"Plan created with {len(plan.actions)} actions, estimated time: {plan.estimated_total_time:.1f}s")

        return plan

    

    def _create_sequential_plan(self, goal: Goal) -> Plan:

        """Create a sequential execution plan."""

        actions = []

        execution_order = []

        

        # Determine required actions based on goal

        required_actions = self._analyze_goal_requirements(goal)

        

        for i, action_type in enumerate(required_actions):

            action = self._create_action(action_type, goal, f"step_{i+1}")

            actions.append(action)

            execution_order.append(action.id)

        

        # Set prerequisites for sequential execution

        for i in range(1, len(actions)):

            actions[i].prerequisites = [actions[i-1].id]

        

        plan = Plan(

            goal_id=goal.id,

            actions=actions,

            execution_order=execution_order,

            status="ready",

            confidence=0.8,

            estimated_total_time=sum(action.estimated_duration for action in actions),

            metadata={"strategy": "sequential", "parallelizable": False}

        )

        

        return plan

    

    def _create_parallel_plan(self, goal: Goal) -> Plan:

        """Create a parallel execution plan where possible."""

        actions = []

        execution_order = []

        

        required_actions = self._analyze_goal_requirements(goal)

        

        # Group actions that can run in parallel

        parallel_groups = self._group_parallel_actions(required_actions)

        

        for group_index, action_group in enumerate(parallel_groups):

            group_actions = []

            for action_type in action_group:

                action = self._create_action(action_type, goal, f"group_{group_index}")

                actions.append(action)

                group_actions.append(action.id)

            

            # Actions in the same group can run in parallel

            execution_order.extend(group_actions)

            

            # Set prerequisites between groups

            if group_index > 0:

                prev_group_actions = [

                    a.id for a in actions 

                    if f"group_{group_index-1}" in a.description

                ]

                for action_id in group_actions:

                    action_obj = next(a for a in actions if a.id == action_id)

                    action_obj.prerequisites = prev_group_actions

        

        # Calculate estimated time for parallel execution

        group_times = []

        for group in parallel_groups:

            group_time = max(

                self.action_templates.get(action_type, {}).get("estimated_duration", 30.0)

                for action_type in group

            )

            group_times.append(group_time)

        

        plan = Plan(

            goal_id=goal.id,

            actions=actions,

            execution_order=execution_order,

            status="ready",

            confidence=0.7,

            estimated_total_time=sum(group_times),

            metadata={"strategy": "parallel", "parallelizable": True, "groups": len(parallel_groups)}

        )

        

        return plan

    

    def _create_adaptive_plan(self, goal: Goal) -> Plan:

        """Create an adaptive plan that can be modified during execution."""

        # Start with sequential plan as base

        base_plan = self._create_sequential_plan(goal)

        

        # Add monitoring and adaptation actions

        monitor_action = Action(

            type=ActionType.MONITOR,

            description=f"Monitor progress for goal: {goal.description}",

            parameters={

                "goal_id": goal.id,

                "check_interval": 30.0,

                "adaptation_triggers": ["failure", "delay", "opportunity"]

            },

            estimated_duration=5.0

        )

        

        adapt_action = Action(

            type=ActionType.PLAN,

            description=f"Adapt plan if needed for goal: {goal.description}",

            parameters={

                "goal_id": goal.id,

                "adaptation_threshold": 0.3,

                "replan_strategies": ["retry", "alternative", "decompose"]

            },

            estimated_duration=15.0

        )

        

        # Insert monitoring actions between main actions

        enhanced_actions = []

        enhanced_order = []

        

        for i, action in enumerate(base_plan.actions):

            enhanced_actions.append(action)

            enhanced_order.append(action.id)

            

            # Add monitoring after each major action

            if i < len(base_plan.actions) - 1:

                monitor_copy = Action(

                    type=ActionType.MONITOR,

                    description=f"Monitor after {action.description}",

                    parameters=monitor_action.parameters.copy(),

                    estimated_duration=2.0

                )

                enhanced_actions.append(monitor_copy)

                enhanced_order.append(monitor_copy.id)

        

        # Add final adaptation action

        enhanced_actions.append(adapt_action)

        enhanced_order.append(adapt_action.id)

        

        base_plan.actions = enhanced_actions

        base_plan.execution_order = enhanced_order

        base_plan.confidence = 0.9

        base_plan.estimated_total_time += len(enhanced_actions) * 3.0  # Account for monitoring overhead

        base_plan.metadata.update({

            "strategy": "adaptive",

            "monitoring_enabled": True,

            "adaptation_enabled": True

        })

        

        return base_plan

    

    def _analyze_goal_requirements(self, goal: Goal) -> List[str]:

        """

        Analyze goal to determine required actions.

        

        Args:

            goal (Goal): Goal to analyze

            

        Returns:

            List[str]: List of required action types

        """

        required_actions = []

        description_lower = goal.description.lower()

        

        # Intelligent action determination based on goal description

        if any(word in description_lower for word in ["research", "find", "search", "investigate", "discover"]):

            required_actions.append("research")

        

        if any(word in description_lower for word in ["analyze", "examine", "study", "evaluate", "assess"]):

            required_actions.append("analyze")

        

        if any(word in description_lower for word in ["synthesize", "combine", "integrate", "summarize", "merge"]):

            required_actions.append("synthesize")

        

        if any(word in description_lower for word in ["communicate", "report", "present", "explain", "share"]):

            required_actions.append("communicate")

        

        if any(word in description_lower for word in ["monitor", "track", "watch", "observe"]):

            required_actions.append("monitor")

        

        # Default actions if none detected

        if not required_actions:

            required_actions = ["research", "analyze", "synthesize"]

        

        # Ensure logical flow

        if "synthesize" in required_actions and "analyze" not in required_actions:

            required_actions.insert(-1, "analyze")

        

        if "analyze" in required_actions and "research" not in required_actions:

            required_actions.insert(0, "research")

        

        return required_actions

    

    def _group_parallel_actions(self, actions: List[str]) -> List[List[str]]:

        """

        Group actions that can be executed in parallel.

        

        Args:

            actions (List[str]): List of action types

            

        Returns:

            List[List[str]]: Groups of actions that can run in parallel

        """

        groups = []

        current_group = []

        

        # Simple grouping logic based on dependencies

        for action in actions:

            if action == "research":

                # Research actions can often be parallelized

                if not current_group or all(a == "research" for a in current_group):

                    current_group.append(action)

                else:

                    if current_group:

                        groups.append(current_group)

                    current_group = [action]

            elif action == "monitor":

                # Monitoring can be parallel with other actions

                current_group.append(action)

            else:

                # Other actions typically need to be sequential

                if current_group:

                    groups.append(current_group)

                current_group = [action]

        

        if current_group:

            groups.append(current_group)

        

        return groups

    

    def _create_action(self, action_type: str, goal: Goal, context: str) -> Action:

        """

        Create an action based on type and context.

        

        Args:

            action_type (str): Type of action to create

            goal (Goal): Associated goal

            context (str): Context information

            

        Returns:

            Action: Created action

        """

        template = self.action_templates.get(action_type, {})

        

        action = Action(

            type=template.get("type", ActionType.EXECUTE),

            description=f"{action_type.title()} action for goal: {goal.description} ({context})",

            parameters={

                "goal_id": goal.id,

                "action_type": action_type,

                "context": context,

                "goal_description": goal.description,

                "required_capabilities": template.get("required_capabilities", [])

            },

            expected_outcome=f"Completed {action_type} for {goal.description}",

            estimated_duration=template.get("estimated_duration", 30.0)

        )

        

        return action

    

    def get_planning_statistics(self) -> Dict[str, Any]:

        """Get planning engine statistics."""

        if not self.planning_history:

            return {"total_plans": 0}

        

        total_plans = len(self.planning_history)

        avg_actions = sum(p["actions_count"] for p in self.planning_history) / total_plans

        avg_time = sum(p["estimated_time"] for p in self.planning_history) / total_plans

        

        strategies_used = {}

        for plan in self.planning_history:

            strategy = plan["strategy"]

            strategies_used[strategy] = strategies_used.get(strategy, 0) + 1

        

        return {

            "total_plans": total_plans,

            "average_actions_per_plan": avg_actions,

            "average_estimated_time": avg_time,

            "strategies_used": strategies_used,

            "available_capabilities": len(self.available_capabilities)

        }


class AutonomousExecutor:

    """

    Autonomous execution engine for agentic AI systems.

    

    This engine executes plans autonomously, monitors progress,

    adapts to changing circumstances, and learns from experience.

    """

    

    def __init__(self, agents: Dict[str, Any], goal_manager: GoalManager, 

                 planning_engine: PlanningEngine):

        """

        Initialize the autonomous executor.

        

        Args:

            agents (Dict[str, Any]): Available agents for task execution

            goal_manager (GoalManager): Goal management system

            planning_engine (PlanningEngine): Planning engine

        """

        self.agents = agents

        self.goal_manager = goal_manager

        self.planning_engine = planning_engine

        self.active_plans: Dict[str, Plan] = {}

        self.execution_history: List[Dict[str, Any]] = []

        self.learning_data: Dict[str, List[float]] = {}

        self.adaptation_strategies = ["retry", "alternative", "decompose", "skip"]

        self.performance_metrics = {

            "total_executions": 0,

            "successful_executions": 0,

            "failed_executions": 0,

            "adaptations_made": 0

        }

    

    async def execute_goal(self, goal_id: str) -> Dict[str, Any]:

        """

        Execute a goal autonomously through planning and action execution.

        

        Args:

            goal_id (str): Goal identifier to execute

            

        Returns:

            Dict[str, Any]: Execution result with metadata

        """

        if goal_id not in self.goal_manager.goals:

            return {"success": False, "error": "Goal not found"}

        

        goal = self.goal_manager.goals[goal_id]

        execution_start = time.time()

        

        print(f"Starting autonomous execution of goal: {goal.description}")

        

        try:

            # Update goal status

            self.goal_manager.update_goal_status(goal_id, GoalStatus.IN_PROGRESS)

            

            # Create execution plan

            plan = self.planning_engine.create_plan(goal, strategy="adaptive")

            self.active_plans[goal_id] = plan

            

            # Execute plan

            execution_result = await self._execute_plan(plan)

            

            # Update goal status based on execution result

            if execution_result["success"]:

                self.goal_manager.update_goal_status(

                    goal_id, 

                    GoalStatus.COMPLETED,

                    {

                        "completion_time": time.time(),

                        "execution_result": execution_result,

                        "total_execution_time": time.time() - execution_start

                    }

                )

                self.performance_metrics["successful_executions"] += 1

                print(f"Goal completed successfully: {goal.description}")

            else:

                self.goal_manager.update_goal_status(

                    goal_id,

                    GoalStatus.FAILED,

                    {

                        "failure_time": time.time(),

                        "failure_reason": execution_result.get("error"),

                        "total_execution_time": time.time() - execution_start

                    }

                )

                self.performance_metrics["failed_executions"] += 1

                print(f"Goal execution failed: {execution_result.get('error')}")

            

            # Record execution for learning

            self._record_execution(goal, plan, execution_result)

            self.performance_metrics["total_executions"] += 1

            

            return execution_result

        

        except Exception as e:

            self.goal_manager.update_goal_status(

                goal_id,

                GoalStatus.FAILED,

                {

                    "failure_time": time.time(),

                    "error": str(e),

                    "total_execution_time": time.time() - execution_start

                }

            )

            self.performance_metrics["failed_executions"] += 1

            self.performance_metrics["total_executions"] += 1

            print(f"Goal execution error: {str(e)}")

            return {"success": False, "error": str(e)}

    

    async def _execute_plan(self, plan: Plan) -> Dict[str, Any]:

        """

        Execute a plan by running its actions in order.

        

        Args:

            plan (Plan): Plan to execute

            

        Returns:

            Dict[str, Any]: Execution result

        """

        execution_start = time.time()

        executed_actions = []

        results = {}

        adaptation_count = 0

        

        print(f"Executing plan with {len(plan.actions)} actions")

        

        try:

            for i, action_id in enumerate(plan.execution_order):

                action = next((a for a in plan.actions if a.id == action_id), None)

                if not action:

                    continue

                

                print(f"Executing action {i+1}/{len(plan.execution_order)}: {action.description[:50]}...")

                

                # Check prerequisites

                if not await self._check_prerequisites(action, executed_actions):

                    print(f"Prerequisites not met for action {action_id}")

                    

                    # Attempt adaptation

                    if plan.metadata.get("adaptation_enabled"):

                        adaptation_result = await self._adapt_plan(plan, action, "prerequisites_not_met")

                        if adaptation_result["success"]:

                            adaptation_count += 1

                            self.performance_metrics["adaptations_made"] += 1

                            continue

                    

                    return {

                        "success": False,

                        "error": f"Prerequisites not met for action {action_id}",

                        "executed_actions": executed_actions,

                        "adaptations_made": adaptation_count

                    }

                

                # Execute action

                action_result = await self._execute_action(action)

                action.result = action_result

                action.status = "completed" if action_result.get("success") else "failed"

                

                executed_actions.append(action_id)

                results[action_id] = action_result

                

                print(f"Action result: {'Success' if action_result.get('success') else 'Failed'}")

                

                # Check if action failed and plan should be adapted

                if not action_result.get("success"):

                    if plan.metadata.get("adaptation_enabled"):

                        adaptation_result = await self._adapt_plan(plan, action, "action_failed")

                        if adaptation_result["success"]:

                            adaptation_count += 1

                            self.performance_metrics["adaptations_made"] += 1

                            # Update action result with recovery

                            action.result = adaptation_result

                            action.status = "recovered"

                            results[action_id] = adaptation_result

                        else:

                            return {

                                "success": False,

                                "error": f"Action {action_id} failed and adaptation unsuccessful: {action_result.get('error')}",

                                "executed_actions": executed_actions,

                                "results": results,

                                "adaptations_made": adaptation_count

                            }

                    else:

                        return {

                            "success": False,

                            "error": f"Action {action_id} failed: {action_result.get('error')}",

                            "executed_actions": executed_actions,

                            "results": results,

                            "adaptations_made": adaptation_count

                        }

            

            execution_time = time.time() - execution_start

            

            print(f"Plan execution completed in {execution_time:.2f}s with {adaptation_count} adaptations")

            

            return {

                "success": True,

                "execution_time": execution_time,

                "executed_actions": executed_actions,

                "results": results,

                "plan_id": plan.id,

                "adaptations_made": adaptation_count

            }

        

        except Exception as e:

            return {

                "success": False,

                "error": str(e),

                "executed_actions": executed_actions,

                "results": results,

                "adaptations_made": adaptation_count

            }

    

    async def _execute_action(self, action: Action) -> Dict[str, Any]:

        """

        Execute a single action using appropriate agent.

        

        Args:

            action (Action): Action to execute

            

        Returns:

            Dict[str, Any]: Action execution result

        """

        action_start = time.time()

        

        try:

            # Determine which agent to use based on action type

            agent = self._select_agent_for_action(action)

            if not agent:

                return {

                    "success": False,

                    "error": f"No suitable agent found for action type {action.type.value}"

                }

            

            # Prepare action parameters

            action_params = self._prepare_action_parameters(action)

            

            # Execute action through agent based on type

            if action.type == ActionType.RESEARCH:

                result = await self._execute_research_action(agent, action_params)

            elif action.type == ActionType.ANALYZE:

                result = await self._execute_analysis_action(agent, action_params)

            elif action.type == ActionType.SYNTHESIZE:

                result = await self._execute_synthesis_action(agent, action_params)

            elif action.type == ActionType.COMMUNICATE:

                result = await self._execute_communication_action(agent, action_params)

            elif action.type == ActionType.MONITOR:

                result = await self._execute_monitoring_action(action_params)

            elif action.type == ActionType.PLAN:

                result = await self._execute_planning_action(action_params)

            else:

                result = {"success": False, "error": f"Unknown action type: {action.type.value}"}

            

            # Record actual duration

            action.actual_duration = time.time() - action_start

            

            return result

        

        except Exception as e:

            action.actual_duration = time.time() - action_start

            return {"success": False, "error": str(e)}

    

   

    def _select_agent_for_action(self, action: Action) -> Optional[Any]:

        """

        Select the most appropriate agent for an action.

        

        Args:

            action (Action): Action requiring agent selection

            

        Returns:

            Optional[Any]: Selected agent or None if no suitable agent

        """

        action_type = action.type

        

        # Agent selection based on action type and capabilities

        if action_type == ActionType.RESEARCH:

            # Prefer research agent, fallback to coordinator

            if "researcher" in self.agents:

                return self.agents["researcher"]

            elif "coordinator" in self.agents:

                return self.agents["coordinator"]

        

        elif action_type == ActionType.ANALYZE:

            # Prefer analyzer agent, fallback to coordinator

            if "analyzer" in self.agents:

                return self.agents["analyzer"]

            elif "coordinator" in self.agents:

                return self.agents["coordinator"]

        

        elif action_type == ActionType.SYNTHESIZE:

            # Prefer synthesizer agent, fallback to coordinator

            if "synthesizer" in self.agents:

                return self.agents["synthesizer"]

            elif "coordinator" in self.agents:

                return self.agents["coordinator"]

        

        elif action_type in [ActionType.COMMUNICATE, ActionType.MONITOR, ActionType.PLAN]:

            # These can be handled by coordinator or any available agent

            if "coordinator" in self.agents:

                return self.agents["coordinator"]

            elif self.agents:

                return list(self.agents.values())[0]

        

        # Fallback to any available agent

        if self.agents:

            return list(self.agents.values())[0]

        

        return None

    

    def _prepare_action_parameters(self, action: Action) -> Dict[str, Any]:

        """

        Prepare parameters for action execution.

        

        Args:

            action (Action): Action to prepare parameters for

            

        Returns:

            Dict[str, Any]: Prepared parameters

        """

        base_params = action.parameters.copy()

        base_params.update({

            "action_id": action.id,

            "action_type": action.type.value,

            "action_description": action.description,

            "expected_outcome": action.expected_outcome,

            "estimated_duration": action.estimated_duration

        })

        return base_params

    

    async def _execute_research_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:

        """

        Execute research action through research agent.

        

        Args:

            agent: Agent to execute the action

            params (Dict[str, Any]): Action parameters

            

        Returns:

            Dict[str, Any]: Research results

        """

        try:

            # Create a simplified message structure for compatibility

            class SimpleMessage:

                def __init__(self, sender, recipient, content, message_type):

                    self.sender = sender

                    self.recipient = recipient

                    self.content = content

                    self.message_type = message_type

                    self.id = str(uuid.uuid4())

                    self.metadata = {}

                    self.timestamp = time.time()

            

            research_query = params.get("goal_description", params.get("action_description", ""))

            

            research_message = SimpleMessage(

                sender="executor",

                recipient=getattr(agent, 'agent_id', 'agent'),

                content=research_query,

                message_type="research_request"

            )

            

            # Execute research

            responses = await agent.process_message(research_message)

            

            if responses and responses[0].message_type != "error_response":

                try:

                    research_data = json.loads(responses[0].content)

                    return {

                        "success": True,

                        "data": research_data,

                        "agent_used": getattr(agent, 'agent_id', 'research_agent'),

                        "action_type": "research"

                    }

                except json.JSONDecodeError:

                    return {

                        "success": True,

                        "data": responses[0].content,

                        "agent_used": getattr(agent, 'agent_id', 'research_agent'),

                        "action_type": "research"

                    }

            else:

                return {

                    "success": False,

                    "error": "Research agent failed to provide results",

                    "action_type": "research"

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Research action failed: {str(e)}",

                "action_type": "research"

            }

    

    async def _execute_analysis_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:

        """

        Execute analysis action through analyzer agent.

        

        Args:

            agent: Agent to execute the action

            params (Dict[str, Any]): Action parameters

            

        Returns:

            Dict[str, Any]: Analysis results

        """

        try:

            class SimpleMessage:

                def __init__(self, sender, recipient, content, message_type):

                    self.sender = sender

                    self.recipient = recipient

                    self.content = content

                    self.message_type = message_type

                    self.id = str(uuid.uuid4())

                    self.metadata = {}

                    self.timestamp = time.time()

            

            # Prepare analysis data

            analysis_data = {

                "content": params.get("research_data", []),

                "type": "goal_analysis",

                "query": params.get("goal_description", params.get("action_description", ""))

            }

            

            analysis_message = SimpleMessage(

                sender="executor",

                recipient=getattr(agent, 'agent_id', 'agent'),

                content=json.dumps(analysis_data),

                message_type="analysis_request"

            )

            

            # Execute analysis

            responses = await agent.process_message(analysis_message)

            

            if responses and responses[0].message_type != "error_response":

                try:

                    analysis_result = json.loads(responses[0].content)

                    return {

                        "success": True,

                        "data": analysis_result,

                        "agent_used": getattr(agent, 'agent_id', 'analysis_agent'),

                        "action_type": "analysis"

                    }

                except json.JSONDecodeError:

                    return {

                        "success": True,

                        "data": {"analysis": responses[0].content},

                        "agent_used": getattr(agent, 'agent_id', 'analysis_agent'),

                        "action_type": "analysis"

                    }

            else:

                return {

                    "success": False,

                    "error": "Analysis agent failed to provide results",

                    "action_type": "analysis"

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Analysis action failed: {str(e)}",

                "action_type": "analysis"

            }

    

    async def _execute_synthesis_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:

        """

        Execute synthesis action through synthesizer agent.

        

        Args:

            agent: Agent to execute the action

            params (Dict[str, Any]): Action parameters

            

        Returns:

            Dict[str, Any]: Synthesis results

        """

        try:

            class SimpleMessage:

                def __init__(self, sender, recipient, content, message_type):

                    self.sender = sender

                    self.recipient = recipient

                    self.content = content

                    self.message_type = message_type

                    self.id = str(uuid.uuid4())

                    self.metadata = {}

                    self.timestamp = time.time()

            

            # Prepare synthesis data

            synthesis_data = {

                "query": params.get("goal_description", params.get("action_description", "")),

                "research_results": params.get("research_data", []),

                "analysis_results": params.get("analysis_data", {})

            }

            

            synthesis_message = SimpleMessage(

                sender="executor",

                recipient=getattr(agent, 'agent_id', 'agent'),

                content=json.dumps(synthesis_data),

                message_type="synthesis_request"

            )

            

            # Execute synthesis

            responses = await agent.process_message(synthesis_message)

            

            if responses and responses[0].message_type != "error_response":

                return {

                    "success": True,

                    "data": responses[0].content,

                    "agent_used": getattr(agent, 'agent_id', 'synthesis_agent'),

                    "action_type": "synthesis"

                }

            else:

                return {

                    "success": False,

                    "error": "Synthesis agent failed to provide results",

                    "action_type": "synthesis"

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Synthesis action failed: {str(e)}",

                "action_type": "synthesis"

            }

    

    async def _execute_communication_action(self, agent: Any, params: Dict[str, Any]) -> Dict[str, Any]:

        """

        Execute communication action.

        

        Args:

            agent: Agent to execute the action

            params (Dict[str, Any]): Action parameters

            

        Returns:

            Dict[str, Any]: Communication results

        """

        try:

            # Simple communication action - format and present results

            content = params.get("synthesis_data", params.get("data", ""))

            goal_description = params.get("goal_description", "")

            

            if isinstance(content, dict):

                content = json.dumps(content, indent=2)

            

            formatted_content = f"""

Communication Result for: {goal_description}


Content:

{content}


Status: Communication completed successfully

Timestamp: {time.time()}

"""

            

            return {

                "success": True,

                "data": formatted_content,

                "agent_used": "communication_handler",

                "action_type": "communication"

            }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Communication action failed: {str(e)}",

                "action_type": "communication"

            }

    

    async def _execute_monitoring_action(self, params: Dict[str, Any]) -> Dict[str, Any]:

        """

        Execute monitoring action.

        

        Args:

            params (Dict[str, Any]): Action parameters

            

        Returns:

            Dict[str, Any]: Monitoring results

        """

        try:

            goal_id = params.get("goal_id")

            check_interval = params.get("check_interval", 30.0)

            

            # Simulate monitoring delay

            await asyncio.sleep(0.1)

            

            # Get goal progress

            if goal_id and goal_id in self.goal_manager.goals:

                progress = self.goal_manager.get_goal_progress(goal_id)

                

                monitoring_result = {

                    "goal_id": goal_id,

                    "progress": progress,

                    "check_time": time.time(),

                    "status": "monitoring_completed",

                    "recommendations": []

                }

                

                # Add recommendations based on progress

                if progress.get("progress", 0) < 0.5:

                    monitoring_result["recommendations"].append("Consider increasing resource allocation")

                

                if progress.get("status") == "failed":

                    monitoring_result["recommendations"].append("Goal requires intervention or replanning")

                

                return {

                    "success": True,

                    "data": monitoring_result,

                    "action_type": "monitoring"

                }

            else:

                return {

                    "success": False,

                    "error": "Goal not found for monitoring",

                    "action_type": "monitoring"

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Monitoring action failed: {str(e)}",

                "action_type": "monitoring"

            }

    

    async def _execute_planning_action(self, params: Dict[str, Any]) -> Dict[str, Any]:

        """

        Execute planning action (replanning/adaptation).

        

        Args:

            params (Dict[str, Any]): Action parameters

            

        Returns:

            Dict[str, Any]: Planning results

        """

        try:

            goal_id = params.get("goal_id")

            adaptation_threshold = params.get("adaptation_threshold", 0.3)

            

            # Simulate planning delay

            await asyncio.sleep(0.2)

            

            if goal_id and goal_id in self.goal_manager.goals:

                goal = self.goal_manager.goals[goal_id]

                

                # Create alternative plan

                alternative_plan = self.planning_engine.create_plan(goal, strategy="sequential")

                

                planning_result = {

                    "goal_id": goal_id,

                    "new_plan_id": alternative_plan.id,

                    "actions_count": len(alternative_plan.actions),

                    "estimated_time": alternative_plan.estimated_total_time,

                    "planning_time": time.time(),

                    "status": "replanning_completed"

                }

                

                return {

                    "success": True,

                    "data": planning_result,

                    "action_type": "planning"

                }

            else:

                return {

                    "success": False,

                    "error": "Goal not found for replanning",

                    "action_type": "planning"

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Planning action failed: {str(e)}",

                "action_type": "planning"

            }

    

    async def _check_prerequisites(self, action: Action, executed_actions: List[str]) -> bool:

        """

        Check if action prerequisites are satisfied.

        

        Args:

            action (Action): Action to check

            executed_actions (List[str]): List of executed action IDs

            

        Returns:

            bool: True if prerequisites are satisfied

        """

        return all(prereq in executed_actions for prereq in action.prerequisites)

    

    async def _adapt_plan(self, plan: Plan, failed_action: Action, failure_reason: str) -> Dict[str, Any]:

        """

        Adapt plan in response to failure or changing circumstances.

        

        Args:

            plan (Plan): Current plan

            failed_action (Action): Action that failed

            failure_reason (str): Reason for failure

            

        Returns:

            Dict[str, Any]: Adaptation result

        """

        try:

            print(f"Adapting plan due to: {failure_reason}")

            

            # Select adaptation strategy based on failure reason

            if failure_reason == "action_failed":

                # Try retry strategy first

                adaptation_result = await self._retry_action(failed_action)

                if adaptation_result["success"]:

                    return adaptation_result

                

                # If retry fails, try alternative approach

                return await self._create_alternative_action(failed_action)

            

            elif failure_reason == "prerequisites_not_met":

                # Try to resolve prerequisites

                return await self._resolve_prerequisites(failed_action, plan)

            

            else:

                # Generic adaptation

                return await self._generic_adaptation(plan, failed_action)

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Adaptation failed: {str(e)}",

                "adaptation_type": "error"

            }

    

    async def _retry_action(self, action: Action) -> Dict[str, Any]:

        """

        Retry a failed action with modified parameters.

        

        Args:

            action (Action): Action to retry

            

        Returns:

            Dict[str, Any]: Retry result

        """

        try:

            print(f"Retrying action: {action.description[:50]}...")

            

            # Create modified version of the action

            retry_action = Action(

                type=action.type,

                description=f"RETRY: {action.description}",

                parameters=action.parameters.copy(),

                estimated_duration=action.estimated_duration * 0.8  # Slightly faster retry

            )

            

            # Add retry flag

            retry_action.parameters["retry"] = True

            retry_action.parameters["original_action_id"] = action.id

            

            # Execute retry

            retry_result = await self._execute_action(retry_action)

            

            if retry_result["success"]:

                print("Retry successful")

                return {

                    "success": True,

                    "data": retry_result["data"],

                    "adaptation_type": "retry",

                    "original_action_id": action.id

                }

            else:

                print("Retry failed")

                return {

                    "success": False,

                    "error": f"Retry failed: {retry_result.get('error')}",

                    "adaptation_type": "retry"

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Retry attempt failed: {str(e)}",

                "adaptation_type": "retry"

            }

    

    async def _create_alternative_action(self, action: Action) -> Dict[str, Any]:

        """

        Create alternative approach for a failed action.

        

        Args:

            action (Action): Failed action

            

        Returns:

            Dict[str, Any]: Alternative action result

        """

        try:

            print(f"Creating alternative for action: {action.description[:50]}...")

            

            # Create alternative based on action type

            if action.type == ActionType.RESEARCH:

                # Alternative research approach

                alt_action = Action(

                    type=ActionType.RESEARCH,

                    description=f"ALTERNATIVE RESEARCH: {action.description}",

                    parameters={

                        **action.parameters,

                        "alternative_approach": True,

                        "simplified_query": True

                    },

                    estimated_duration=action.estimated_duration * 0.6

                )

            

            elif action.type == ActionType.ANALYZE:

                # Alternative analysis approach

                alt_action = Action(

                    type=ActionType.ANALYZE,

                    description=f"SIMPLIFIED ANALYSIS: {action.description}",

                    parameters={

                        **action.parameters,

                        "analysis_depth": "basic",

                        "alternative_method": True

                    },

                    estimated_duration=action.estimated_duration * 0.7

                )

            

            else:

                # Generic alternative

                alt_action = Action(

                    type=action.type,

                    description=f"ALTERNATIVE: {action.description}",

                    parameters={

                        **action.parameters,

                        "alternative": True,

                        "simplified": True

                    },

                    estimated_duration=action.estimated_duration * 0.5

                )

            

            # Execute alternative

            alt_result = await self._execute_action(alt_action)

            

            if alt_result["success"]:

                print("Alternative approach successful")

                return {

                    "success": True,

                    "data": alt_result["data"],

                    "adaptation_type": "alternative",

                    "original_action_id": action.id

                }

            else:

                print("Alternative approach failed")

                return {

                    "success": False,

                    "error": f"Alternative failed: {alt_result.get('error')}",

                    "adaptation_type": "alternative"

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Alternative creation failed: {str(e)}",

                "adaptation_type": "alternative"

            }

    

    async def _resolve_prerequisites(self, action: Action, plan: Plan) -> Dict[str, Any]:

        """

        Attempt to resolve missing prerequisites.

        

        Args:

            action (Action): Action with unmet prerequisites

            plan (Plan): Current plan

            

        Returns:

            Dict[str, Any]: Resolution result

        """

        try:

            print(f"Resolving prerequisites for: {action.description[:50]}...")

            

            # Simple resolution: skip prerequisites if possible

            resolved_action = Action(

                type=action.type,

                description=f"PREREQUISITE-RESOLVED: {action.description}",

                parameters={

                    **action.parameters,

                    "prerequisites_skipped": True,

                    "reduced_dependencies": True

                },

                prerequisites=[],  # Remove prerequisites

                estimated_duration=action.estimated_duration

            )

            

            # Execute resolved action

            resolved_result = await self._execute_action(resolved_action)

            

            if resolved_result["success"]:

                print("Prerequisites resolved successfully")

                return {

                    "success": True,

                    "data": resolved_result["data"],

                    "adaptation_type": "prerequisite_resolution",

                    "original_action_id": action.id

                }

            else:

                return {

                    "success": False,

                    "error": f"Prerequisite resolution failed: {resolved_result.get('error')}",

                    "adaptation_type": "prerequisite_resolution"

                }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Prerequisite resolution failed: {str(e)}",

                "adaptation_type": "prerequisite_resolution"

            }

    

    async def _generic_adaptation(self, plan: Plan, action: Action) -> Dict[str, Any]:

        """

        Generic adaptation strategy.

        

        Args:

            plan (Plan): Current plan

            action (Action): Problematic action

            

        Returns:

            Dict[str, Any]: Adaptation result

        """

        try:

            print(f"Applying generic adaptation for: {action.description[:50]}...")

            

            # Simple generic adaptation: create a basic fallback

            fallback_action = Action(

                type=ActionType.COMMUNICATE,

                description=f"FALLBACK: Partial completion of {action.description}",

                parameters={

                    "fallback": True,

                    "original_action": action.description,

                    "partial_completion": True

                },

                estimated_duration=5.0

            )

            

            # Execute fallback

            fallback_result = await self._execute_action(fallback_action)

            

            return {

                "success": True,

                "data": fallback_result.get("data", "Fallback completed"),

                "adaptation_type": "generic_fallback",

                "original_action_id": action.id

            }

        

        except Exception as e:

            return {

                "success": False,

                "error": f"Generic adaptation failed: {str(e)}",

                "adaptation_type": "generic_fallback"

            }

    

    def _record_execution(self, goal: Goal, plan: Plan, execution_result: Dict[str, Any]):

        """

        Record execution data for learning and improvement.

        

        Args:

            goal (Goal): Executed goal

            plan (Plan): Execution plan

            execution_result (Dict[str, Any]): Execution result

        """

        execution_record = {

            "goal_id": goal.id,

            "goal_description": goal.description,

            "goal_priority": goal.priority,

            "plan_id": plan.id,

            "plan_strategy": plan.metadata.get("strategy", "unknown"),

            "execution_time": execution_result.get("execution_time", 0.0),

            "success": execution_result.get("success", False),

            "actions_count": len(plan.actions),

            "adaptations_made": execution_result.get("adaptations_made", 0),

            "timestamp": time.time()

        }

        

        self.execution_history.append(execution_record)

        

        # Update learning data for different goal types

        goal_type = self._classify_goal_type(goal.description)

        if goal_type not in self.learning_data:

            self.learning_data[goal_type] = []

        

        # Record execution time for learning

        self.learning_data[goal_type].append(execution_result.get("execution_time", 0.0))

        

        # Keep only recent learning data (last 100 entries per type)

        if len(self.learning_data[goal_type]) > 100:

            self.learning_data[goal_type] = self.learning_data[goal_type][-100:]

    

    def _classify_goal_type(self, description: str) -> str:

        """

        Classify goal type for learning purposes.

        

        Args:

            description (str): Goal description

            

        Returns:

            str: Goal type classification

        """

        description_lower = description.lower()

        

        if any(word in description_lower for word in ["research", "investigate", "find", "search"]):

            return "research_goal"

        elif any(word in description_lower for word in ["analyze", "study", "examine", "evaluate"]):

            return "analysis_goal"

        elif any(word in description_lower for word in ["synthesize", "combine", "integrate", "summarize"]):

            return "synthesis_goal"

        elif any(word in description_lower for word in ["create", "generate", "build", "develop"]):

            return "creation_goal"

        elif any(word in description_lower for word in ["communicate", "report", "present", "explain"]):

            return "communication_goal"

        else:

            return "general_goal"

    

    def get_execution_statistics(self) -> Dict[str, Any]:

        """

        Get comprehensive execution statistics for performance monitoring.

        

        Returns:

            Dict[str, Any]: Execution statistics

        """

        if not self.execution_history:

            return {

                "total_executions": 0,

                "performance_metrics": self.performance_metrics

            }

        

        total_executions = len(self.execution_history)

        successful_executions = sum(1 for record in self.execution_history if record["success"])

        

        avg_execution_time = sum(record["execution_time"] for record in self.execution_history) / total_executions

        avg_adaptations = sum(record["adaptations_made"] for record in self.execution_history) / total_executions

        

        # Goal type statistics

        goal_type_stats = {}

        for record in self.execution_history:

            goal_type = self._classify_goal_type(record["goal_description"])

            if goal_type not in goal_type_stats:

                goal_type_stats[goal_type] = {"count": 0, "success": 0}

            goal_type_stats[goal_type]["count"] += 1

            if record["success"]:

                goal_type_stats[goal_type]["success"] += 1

        

        # Calculate success rates by goal type

        for goal_type in goal_type_stats:

            stats = goal_type_stats[goal_type]

            stats["success_rate"] = stats["success"] / stats["count"] if stats["count"] > 0 else 0.0

        

        return {

            "total_executions": total_executions,

            "successful_executions": successful_executions,

            "success_rate": successful_executions / total_executions,

            "average_execution_time": avg_execution_time,

            "average_adaptations_per_execution": avg_adaptations,

            "goal_type_statistics": goal_type_stats,

            "learning_data_points": sum(len(data) for data in self.learning_data.values()),

            "performance_metrics": self.performance_metrics

        }

    

    def get_learning_insights(self) -> Dict[str, Any]:

        """

        Get insights from learning data for system improvement.

        

        Returns:

            Dict[str, Any]: Learning insights

        """

        insights = {

            "goal_type_performance": {},

            "adaptation_effectiveness": 0.0,

            "improvement_trends": {},

            "recommendations": []

        }

        

        # Analyze performance by goal type

        for goal_type, execution_times in self.learning_data.items():

            if execution_times:

                insights["goal_type_performance"][goal_type] = {

                    "average_time": sum(execution_times) / len(execution_times),

                    "min_time": min(execution_times),

                    "max_time": max(execution_times),

                    "sample_count": len(execution_times)

                }

        

        # Calculate adaptation effectiveness

        if self.execution_history:

            adapted_executions = [r for r in self.execution_history if r["adaptations_made"] > 0]

            if adapted_executions:

                adapted_success_rate = sum(1 for r in adapted_executions if r["success"]) / len(adapted_executions)

                insights["adaptation_effectiveness"] = adapted_success_rate

        

        # Generate recommendations

        if self.performance_metrics["success_rate"] < 0.8:

            insights["recommendations"].append("Consider improving action reliability or adaptation strategies")

        

        if self.performance_metrics["adaptations_made"] > self.performance_metrics["total_executions"] * 0.5:

            insights["recommendations"].append("High adaptation rate suggests need for better initial planning")

        

        return insights


class AgenticAI:

    """

    Complete Agentic AI system implementation.

    

    This class orchestrates autonomous goal-oriented behavior through

    goal management, planning, and adaptive execution.

    """

    

    def __init__(self, llm_model=None, tokenizer=None, device=None, knowledge_base=None):

        """

        Initialize the Agentic AI system.

        

        Args:

            llm_model: Language model for agents

            tokenizer: Tokenizer for the language model

            device: Device for model inference

            knowledge_base: Knowledge base for research

        """

        print("Initializing Agentic AI System...")

        

        self.llm_model = llm_model

        self.tokenizer = tokenizer

        self.device = device

        self.knowledge_base = knowledge_base

        

        # Initialize core components

        self.goal_manager = GoalManager()

        

        available_capabilities = [

            "research", "analysis", "synthesis", "communication",

            "planning", "monitoring", "adaptation", "learning"

        ]

        self.planning_engine = PlanningEngine(available_capabilities)

        

        # Initialize agents (simplified versions for this example)

        self.agents = self._initialize_agents()

        

        # Initialize autonomous executor

        self.executor = AutonomousExecutor(

            self.agents,

            self.goal_manager,

            self.planning_engine

        )

        

        # System state

        self.is_running = False

        self.autonomous_mode = False

        self.system_statistics = {

            "goals_created": 0,

            "goals_completed": 0,

            "autonomous_cycles": 0

        }

        

        print(f"Agentic AI System initialized with {len(self.agents)} agents")

    

    def _initialize_agents(self) -> Dict[str, Any]:

        """Initialize simplified agents for the agentic system."""

        agents = {}

        

        # Import agent classes from previous chapters

        try:

            # Simplified agent implementations for this example

            class SimpleAgent:

                def __init__(self, agent_id, role, capabilities):

                    self.agent_id = agent_id

                    self.role = role

                    self.capabilities = capabilities

                    self.message_history = []

                    self.performance_metrics = {

                        "messages_processed": 0,

                        "average_response_time": 0.0,

                        "success_rate": 1.0

                    }

                

                async def process_message(self, message):

                    # Simplified message processing

                    self.message_history.append(message)

                    self.performance_metrics["messages_processed"] += 1

                    

                    # Simulate processing delay

                    await asyncio.sleep(0.1)

                    

                    # Create response based on message type

                    if message.message_type == "research_request":

                        response_content = json.dumps([{

                            "id": "simulated_result",

                            "content": f"Research result for: {message.content}",

                            "source": "simulated_research",

                            "relevance_score": 0.8

                        }])

                    elif message.message_type == "analysis_request":

                        response_content = json.dumps({

                            "analysis": f"Analysis of: {message.content}",

                            "confidence": 0.8,

                            "key_themes": ["simulated analysis"]

                        })

                    elif message.message_type == "synthesis_request":

                        data = json.loads(message.content) if message.content.startswith('{') else {}

                        response_content = f"Synthesis result for: {data.get('query', 'unknown query')}"

                    else:

                        response_content = f"Processed: {message.content}"

                    

                    class SimpleResponse:

                        def __init__(self, sender, recipient, content, message_type):

                            self.sender = sender

                            self.recipient = recipient

                            self.content = content

                            self.message_type = message_type

                            self.id = str(uuid.uuid4())

                            self.timestamp = time.time()

                    

                    return [SimpleResponse(

                        sender=self.agent_id,

                        recipient=message.sender,

                        content=response_content,

                        message_type=f"{message.message_type.replace('_request', '_results')}"

                    )]

            

            # Create simplified agents

            agents["researcher"] = SimpleAgent("researcher_001", "researcher", ["research"])

            agents["analyzer"] = SimpleAgent("analyzer_001", "analyzer", ["analysis"])

            agents["synthesizer"] = SimpleAgent("synthesizer_001", "synthesizer", ["synthesis"])

            agents["coordinator"] = SimpleAgent("coordinator_001", "coordinator", ["coordination"])

            

        except Exception as e:

            print(f"Warning: Could not initialize full agents, using simplified versions: {e}")

            # Fallback to basic agent structure

            agents = {

                "researcher": type('Agent', (), {

                    'agent_id': 'researcher_001',

                    'role': 'researcher',

                    'process_message': lambda self, msg: []

                })(),

                "analyzer": type('Agent', (), {

                    'agent_id': 'analyzer_001',

                    'role': 'analyzer',

                    'process_message': lambda self, msg: []

                })(),

                "synthesizer": type('Agent', (), {

                    'agent_id': 'synthesizer_001',

                    'role': 'synthesizer',

                    'process_message': lambda self, msg: []

                })(),

                "coordinator": type('Agent', (), {

                    'agent_id': 'coordinator_001',

                    'role': 'coordinator',

                    'process_message': lambda self, msg: []

                })()

            }

        

        return agents

    

    async def create_and_execute_goal(self, description: str, priority: int = 1,

                                     auto_decompose: bool = True) -> Dict[str, Any]:

        """

        Create a goal and execute it autonomously.

        

        Args:

            description (str): Goal description

            priority (int): Goal priority

            auto_decompose (bool): Whether to auto-decompose complex goals

            

        Returns:

            Dict[str, Any]: Execution result

        """

        print(f"Creating and executing goal: {description}")

        

        # Create goal

        goal = self.goal_manager.create_goal(

            description=description,

            priority=priority,

            required_capabilities=self._analyze_required_capabilities(description)

        )

        

        self.system_statistics["goals_created"] += 1

        

        # Auto-decompose if requested and goal is complex

        if auto_decompose and self._is_complex_goal(description):

            print("Goal appears complex, decomposing into sub-goals...")

            sub_goals = self.goal_manager.decompose_goal(goal.id)

            

            if sub_goals:

                print(f"Created {len(sub_goals)} sub-goals")

                # Execute sub-goals first

                for sub_goal in sub_goals:

                    sub_result = await self.executor.execute_goal(sub_goal.id)

                    if not sub_result["success"]:

                        print(f"Sub-goal failed: {sub_goal.description}")

        

        # Execute main goal

        result = await self.executor.execute_goal(goal.id)

        

        if result["success"]:

            self.system_statistics["goals_completed"] += 1

        

        return {

            "goal_id": goal.id,

            "goal_description": description,

            "execution_result": result,

            "goal_progress": self.goal_manager.get_goal_progress(goal.id)

        }

    

    def _analyze_required_capabilities(self, description: str) -> List[str]:

        """Analyze goal description to determine required capabilities."""

        capabilities = []

        description_lower = description.lower()

        

        if any(word in description_lower for word in ["research", "find", "search", "investigate"]):

            capabilities.append("research")

        

        if any(word in description_lower for word in ["analyze", "study", "examine", "evaluate"]):

            capabilities.append("analysis")

        

        if any(word in description_lower for word in ["synthesize", "combine", "integrate", "summarize"]):

            capabilities.append("synthesis")

        

        if any(word in description_lower for word in ["communicate", "report", "present", "explain"]):

            capabilities.append("communication")

        

        return capabilities if capabilities else ["research", "analysis", "synthesis"]

    

    def _is_complex_goal(self, description: str) -> bool:

        """Determine if a goal is complex and should be decomposed."""

        # Simple heuristics for complexity

        word_count = len(description.split())

        has_multiple_actions = len([word for word in description.lower().split() 

                                   if word in ["and", "then", "also", "additionally"]]) > 0

        

        return word_count > 10 or has_multiple_actions

    

    async def run_autonomous_cycle(self, max_goals: int = 5) -> Dict[str, Any]:

        """

        Run one cycle of autonomous operation.

        

        Args:

            max_goals (int): Maximum goals to process in this cycle

            

        Returns:

            Dict[str, Any]: Cycle results

        """

        cycle_start = time.time()

        processed_goals = 0

        results = []

        

        print(f"Starting autonomous cycle (max {max_goals} goals)")

        

        while processed_goals < max_goals:

            # Get next goal

            next_goal = self.goal_manager.get_next_goal()

            

            if not next_goal:

                print("No pending goals found")

                break

            

            print(f"Processing goal {processed_goals + 1}: {next_goal.description[:50]}...")

            

            # Execute goal

            result = await self.executor.execute_goal(next_goal.id)

            results.append({

                "goal_id": next_goal.id,

                "description": next_goal.description,

                "result": result

            })

            

            processed_goals += 1

            

            if result["success"]:

                self.system_statistics["goals_completed"] += 1

        

        cycle_time = time.time() - cycle_start

        self.system_statistics["autonomous_cycles"] += 1

        

        cycle_result = {

            "cycle_number": self.system_statistics["autonomous_cycles"],

            "goals_processed": processed_goals,

            "cycle_time": cycle_time,

            "results": results,

            "success_rate": sum(1 for r in results if r["result"]["success"]) / len(results) if results else 0.0

        }

        

        print(f"Autonomous cycle completed: {processed_goals} goals in {cycle_time:.2f}s")

        return cycle_result

    

    def get_system_status(self) -> Dict[str, Any]:

        """Get comprehensive system status."""

        goal_stats = self.goal_manager.get_statistics()

        execution_stats = self.executor.get_execution_statistics()

        planning_stats = self.planning_engine.get_planning_statistics()

        

        return {

            "system_statistics": self.system_statistics,

            "goal_management": goal_stats,

            "execution_performance": execution_stats,

            "planning_performance": planning_stats,

            "autonomous_mode": self.autonomous_mode,

            "is_running": self.is_running,

            "agents_available": len(self.agents)

        }

    

    def get_learning_insights(self) -> Dict[str, Any]:

        """Get learning insights from the system."""

        return self.executor.get_learning_insights()

    

    def run_interactive_session(self):

        """Run interactive agentic AI session."""

        print("\n" + "=" * 70)

        print("AGENTIC AI SYSTEM - Interactive Session")

        print("=" * 70)

        print("Commands:")

        print("  'create <goal>' - Create and execute a goal")

        print("  'auto_cycle' - Run autonomous cycle")

        print("  'status' - Show system status")

        print("  'insights' - Show learning insights")

        print("  'goals' - List all goals")

        print("  'quit' or 'exit' - End session")

        print("=" * 70)

        

        async def run_session():

            while True:

                try:

                    user_input = input(f"\nAgentic AI> ").strip()

                    

                    if user_input.lower() in ['quit', 'exit', 'bye']:

                        print("\nAgentic AI: Thank you for using the system! Goodbye!")

                        break

                    

                    elif user_input.lower().startswith('create '):

                        goal_description = user_input[7:].strip()

                        if goal_description:

                            print(f"Creating and executing goal: {goal_description}")

                            result = await self.create_and_execute_goal(goal_description)

                            

                            print(f"\nGoal Execution Result:")

                            print(f"  Goal ID: {result['goal_id']}")

                            print(f"  Success: {result['execution_result']['success']}")

                            if result['execution_result']['success']:

                                print(f"  Execution Time: {result['execution_result'].get('execution_time', 0):.2f}s")

                                print(f"  Adaptations Made: {result['execution_result'].get('adaptations_made', 0)}")

                            else:

                                print(f"  Error: {result['execution_result'].get('error', 'Unknown error')}")

                        else:

                            print("Please provide a goal description")

                        continue

                    

                    elif user_input.lower() == 'auto_cycle':

                        print("Running autonomous cycle...")

                        cycle_result = await self.run_autonomous_cycle()

                        

                        print(f"\nAutonomous Cycle Results:")

                        print(f"  Cycle Number: {cycle_result['cycle_number']}")

                        print(f"  Goals Processed: {cycle_result['goals_processed']}")

                        print(f"  Cycle Time: {cycle_result['cycle_time']:.2f}s")

                        print(f"  Success Rate: {cycle_result['success_rate']:.1%}")

                        continue

                    

                    elif user_input.lower() == 'status':

                        status = self.get_system_status()

                        print(f"\nSystem Status:")

                        print(f"  Goals Created: {status['system_statistics']['goals_created']}")

                        print(f"  Goals Completed: {status['system_statistics']['goals_completed']}")

                        print(f"  Autonomous Cycles: {status['system_statistics']['autonomous_cycles']}")

                        print(f"  Active Goals: {status['goal_management']['active_goals']}")

                        print(f"  Execution Success Rate: {status['execution_performance'].get('success_rate', 0):.1%}")

                        print(f"  Available Agents: {status['agents_available']}")

                        continue

                    

                    elif user_input.lower() == 'insights':

                        insights = self.get_learning_insights()

                        print(f"\nLearning Insights:")

                        print(f"  Adaptation Effectiveness: {insights['adaptation_effectiveness']:.1%}")

                        

                        if insights['goal_type_performance']:

                            print(f"  Goal Type Performance:")

                            for goal_type, perf in insights['goal_type_performance'].items():

                                print(f"    {goal_type}: {perf['average_time']:.1f}s avg ({perf['sample_count']} samples)")

                        

                        if insights['recommendations']:

                            print(f"  Recommendations:")

                            for rec in insights['recommendations']:

                                print(f"    - {rec}")

                        continue

                    

                    elif user_input.lower() == 'goals':

                        stats = self.goal_manager.get_statistics()

                        print(f"\nGoal Overview:")

                        print(f"  Total Goals: {stats['total_goals']}")

                        print(f"  Active Goals: {stats['active_goals']}")

                        print(f"  Completed Goals: {stats['completed_goals']}")

                        print(f"  Pending Goals: {stats['pending_goals']}")

                        

                        # Show recent goals

                        recent_goals = list(self.goal_manager.goals.values())[-5:]

                        if recent_goals:

                            print(f"\nRecent Goals:")

                            for goal in recent_goals:

                                print(f"    {goal.status.value}: {goal.description[:50]}...")

                        continue

                    

                    elif not user_input:

                        continue

                    

                    else:

                        print("Unknown command. Type 'quit' to exit or use one of the available commands.")

                

                except KeyboardInterrupt:

                    print("\n\nSession interrupted. Goodbye!")

                    break

                except Exception as e:

                    print(f"\nError: {e}")

        

        # Run async session

        asyncio.run(run_session())


def create_sample_goals_for_agentic_ai():

    """Create sample goals for agentic AI demonstration."""

    return [

        "Research the latest developments in artificial intelligence and machine learning",

        "Analyze the impact of climate change on global agriculture",

        "Synthesize information about renewable energy technologies and their effectiveness",

        "Investigate the role of quantum computing in cryptography and security",

        "Create a comprehensive report on space exploration missions planned for the next decade"

    ]


def main():

    """

    Main function demonstrating Agentic AI functionality.

    """

    try:

        # Initialize components

        print("Setting up Agentic AI System...")

        

        # Initialize device

        if torch.cuda.is_available():

            device = torch.device("cuda")

        elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():

            device = torch.device("mps")

        else:

            device = torch.device("cpu")

        

        print(f"Using device: {device}")

        

        # Initialize LLM (optional for this demo)

        try:

            print("Loading language model...")

            tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")

            model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")

            

            if tokenizer.pad_token is None:

                tokenizer.pad_token = tokenizer.eos_token

            

            model.to(device)

            model.eval()

        except Exception as e:

            print(f"Warning: Could not load LLM: {e}")

            model = None

            tokenizer = None

        

        # Initialize knowledge base (optional)

        knowledge_base = None

        try:

            print("Creating knowledge base...")

            # This would import from previous chapters in a real implementation

            # For now, we'll use a placeholder

            knowledge_base = type('KnowledgeBase', (), {

                'search_relevant_context': lambda self, query, k=3: [

                    {"content": f"Sample context for: {query}", "source": "sample", "score": 0.8}

                ]

            })()

        except Exception as e:

            print(f"Warning: Could not create knowledge base: {e}")

        

        # Initialize Agentic AI system

        agentic_ai = AgenticAI(

            llm_model=model,

            tokenizer=tokenizer,

            device=device,

            knowledge_base=knowledge_base

        )

        

        # Show system status

        status = agentic_ai.get_system_status()

        print(f"\nAgentic AI System ready:")

        print(f"  Available agents: {status['agents_available']}")

        print(f"  System initialized successfully")

        

        # Demonstrate with sample goals

        print(f"\nDemonstrating with sample goals...")

        sample_goals = create_sample_goals_for_agentic_ai()

        

        async def demo():

            for i, goal_desc in enumerate(sample_goals[:2]):  # Demo with first 2 goals

                print(f"\n--- Demo Goal {i+1} ---")

                result = await agentic_ai.create_and_execute_goal(goal_desc)

                print(f"Result: {'Success' if result['execution_result']['success'] else 'Failed'}")

        

        # Run demo

        asyncio.run(demo())

        

        # Run interactive session

        agentic_ai.run_interactive_session()

        

    except Exception as e:

        print(f"Failed to initialize Agentic AI System: {e}")

        print("Please check your installation and try again.")


if __name__ == "__main__":

    main()



This comprehensive Agentic AI implementation demonstrates autonomous goal-oriented behavior through sophisticated planning, adaptive execution, and learning mechanisms. The system can independently pursue complex objectives, adapt to failures, and improve performance over time through experience.


CHAPTER F: IMPLEMENTING MODEL CONTEXT PROTOCOL (MCP)


The Model Context Protocol represents a standardized framework for enabling seamless communication between AI models and external tools, services, and data sources. Developed by Anthropic, MCP addresses the fundamental challenge of providing AI systems with controlled, secure access to real-time information and computational resources beyond their training data.


The rationale for implementing MCP stems from the limitations of isolated AI models that cannot access current information, execute code, or interact with external systems. MCP bridges this gap by establishing a standardized protocol for AI models to communicate with external resources through well-defined interfaces, enabling more capable and practical AI applications.


MCP architecture consists of several key components working together to enable secure and efficient model-tool communication. MCP servers expose specific capabilities and resources to AI models. MCP clients facilitate communication between models and servers. Protocol definitions specify message formats, authentication mechanisms, and capability discovery procedures. Resource management ensures secure access control and proper resource allocation.


The implementation of MCP requires understanding both client and server-side components, protocol specifications, and security considerations. We will build a complete MCP implementation that demonstrates both server and client functionality while maintaining compatibility with the official MCP specification.


Setting up MCP development requires specific dependencies and protocol understanding.


import asyncio

import json

import uuid

import time

from abc import ABC, abstractmethod

from dataclasses import dataclass, field

from enum import Enum

from typing import Dict, List, Any, Optional, Union, Callable

import websockets

import logging

from pathlib import Path


# MCP Protocol Version

MCP_VERSION = "2024-11-05"


class MCPMessageType(Enum):

    """MCP message types as defined in the protocol specification."""

    INITIALIZE = "initialize"

    INITIALIZED = "initialized"

    PING = "ping"

    PONG = "pong"

    LIST_RESOURCES = "resources/list"

    LIST_TOOLS = "tools/list"

    CALL_TOOL = "tools/call"

    READ_RESOURCE = "resources/read"

    SUBSCRIBE = "resources/subscribe"

    UNSUBSCRIBE = "resources/unsubscribe"

    NOTIFICATION = "notification"

    ERROR = "error"


@dataclass

class MCPMessage:

    """Base MCP message structure."""

    jsonrpc: str = "2.0"

    id: Optional[Union[str, int]] = None

    method: Optional[str] = None

    params: Optional[Dict[str, Any]] = None

    result: Optional[Any] = None

    error: Optional[Dict[str, Any]] = None

    

    def to_dict(self) -> Dict[str, Any]:

        """Convert message to dictionary for JSON serialization."""

        message = {"jsonrpc": self.jsonrpc}

        

        if self.id is not None:

            message["id"] = self.id

        if self.method is not None:

            message["method"] = self.method

        if self.params is not None:

            message["params"] = self.params

        if self.result is not None:

            message["result"] = self.result

        if self.error is not None:

            message["error"] = self.error

            

        return message

    

    @classmethod

    def from_dict(cls, data: Dict[str, Any]) -> 'MCPMessage':

        """Create message from dictionary."""

        return cls(

            jsonrpc=data.get("jsonrpc", "2.0"),

            id=data.get("id"),

            method=data.get("method"),

            params=data.get("params"),

            result=data.get("result"),

            error=data.get("error")

        )


@dataclass

class MCPResource:

    """MCP resource definition."""

    uri: str

    name: str

    description: Optional[str] = None

    mimeType: Optional[str] = None

    metadata: Optional[Dict[str, Any]] = None


@dataclass

class MCPTool:

    """MCP tool definition."""

    name: str

    description: str

    inputSchema: Dict[str, Any]

    metadata: Optional[Dict[str, Any]] = None


class MCPError(Exception):

    """MCP-specific error with error codes."""

    

    # Standard JSON-RPC error codes

    PARSE_ERROR = -32700

    INVALID_REQUEST = -32600

    METHOD_NOT_FOUND = -32601

    INVALID_PARAMS = -32602

    INTERNAL_ERROR = -32603

    

    # MCP-specific error codes

    RESOURCE_NOT_FOUND = -32001

    TOOL_NOT_FOUND = -32002

    ACCESS_DENIED = -32003

    

    def __init__(self, code: int, message: str, data: Any = None):

        self.code = code

        self.message = message

        self.data = data

        super().__init__(f"MCP Error {code}: {message}")

    

    def to_dict(self) -> Dict[str, Any]:

        """Convert error to dictionary format."""

        error = {"code": self.code, "message": self.message}

        if self.data is not None:

            error["data"] = self.data

        return error



The MCP server implementation provides the foundation for exposing tools and resources to AI models. Servers must handle protocol negotiation, capability discovery, and secure resource access:



class MCPServer:

    """

    MCP Server implementation for exposing tools and resources.

    

    This server handles MCP protocol communication, manages resources

    and tools, and provides secure access control mechanisms.

    """

    

    def __init__(self, name: str, version: str = "1.0.0"):

        """

        Initialize MCP server.

        

        Args:

            name (str): Server name

            version (str): Server version

        """

        self.name = name

        self.version = version

        self.capabilities = {

            "resources": {},

            "tools": {},

            "logging": {}

        }

        

        # Resource and tool registries

        self.resources: Dict[str, MCPResource] = {}

        self.tools: Dict[str, MCPTool] = {}

        self.resource_handlers: Dict[str, Callable] = {}

        self.tool_handlers: Dict[str, Callable] = {}

        

        # Client connections

        self.clients: Dict[str, Dict[str, Any]] = {}

        

        # Server state

        self.is_initialized = False

        self.logger = logging.getLogger(f"mcp.server.{name}")

        

        # Register built-in handlers

        self._register_builtin_handlers()

    

    def _register_builtin_handlers(self):

        """Register built-in protocol handlers."""

        self.protocol_handlers = {

            MCPMessageType.INITIALIZE.value: self._handle_initialize,

            MCPMessageType.PING.value: self._handle_ping,

            MCPMessageType.LIST_RESOURCES.value: self._handle_list_resources,

            MCPMessageType.LIST_TOOLS.value: self._handle_list_tools,

            MCPMessageType.READ_RESOURCE.value: self._handle_read_resource,

            MCPMessageType.CALL_TOOL.value: self._handle_call_tool

        }

    

    def register_resource(self, resource: MCPResource, handler: Callable):

        """

        Register a resource with its handler.

        

        Args:

            resource (MCPResource): Resource definition

            handler (Callable): Function to handle resource requests

        """

        self.resources[resource.uri] = resource

        self.resource_handlers[resource.uri] = handler

        self.logger.info(f"Registered resource: {resource.uri}")

    

    def register_tool(self, tool: MCPTool, handler: Callable):

        """

        Register a tool with its handler.

        

        Args:

            tool (MCPTool): Tool definition

            handler (Callable): Function to handle tool calls

        """

        self.tools[tool.name] = tool

        self.tool_handlers[tool.name] = handler

        self.logger.info(f"Registered tool: {tool.name}")

    

    async def handle_message(self, message_data: str, client_id: str) -> Optional[str]:

        """

        Handle incoming MCP message.

        

        Args:

            message_data (str): JSON message data

            client_id (str): Client identifier

            

        Returns:

            Optional[str]: Response message or None

        """

        try:

            # Parse message

            data = json.loads(message_data)

            message = MCPMessage.from_dict(data)

            

            # Validate JSON-RPC format

            if message.jsonrpc != "2.0":

                raise MCPError(

                    MCPError.INVALID_REQUEST,

                    "Invalid JSON-RPC version"

                )

            

            # Handle different message types

            if message.method:

                # Request message

                return await self._handle_request(message, client_id)

            elif message.result is not None or message.error is not None:

                # Response message

                await self._handle_response(message, client_id)

                return None

            else:

                raise MCPError(

                    MCPError.INVALID_REQUEST,

                    "Invalid message format"

                )

        

        except json.JSONDecodeError:

            error_response = MCPMessage(

                id=None,

                error=MCPError(MCPError.PARSE_ERROR, "Parse error").to_dict()

            )

            return json.dumps(error_response.to_dict())

        

        except MCPError as e:

            error_response = MCPMessage(

                id=getattr(message, 'id', None) if 'message' in locals() else None,

                error=e.to_dict()

            )

            return json.dumps(error_response.to_dict())

        

        except Exception as e:

            self.logger.error(f"Unexpected error handling message: {e}")

            error_response = MCPMessage(

                id=getattr(message, 'id', None) if 'message' in locals() else None,

                error=MCPError(MCPError.INTERNAL_ERROR, str(e)).to_dict()

            )

            return json.dumps(error_response.to_dict())

    

    async def _handle_request(self, message: MCPMessage, client_id: str) -> str:

        """Handle request message."""

        method = message.method

        

        if method not in self.protocol_handlers:

            raise MCPError(

                MCPError.METHOD_NOT_FOUND,

                f"Method not found: {method}"

            )

        

        handler = self.protocol_handlers[method]

        

        try:

            result = await handler(message.params or {}, client_id)

            

            response = MCPMessage(

                id=message.id,

                result=result

            )

            

            return json.dumps(response.to_dict())

        

        except Exception as e:

            if isinstance(e, MCPError):

                raise

            else:

                raise MCPError(

                    MCPError.INTERNAL_ERROR,

                    f"Handler error: {str(e)}"

                )

    

    async def _handle_response(self, message: MCPMessage, client_id: str):

        """Handle response message."""

        # Store response for pending requests if needed

        if client_id in self.clients:

            self.clients[client_id]["last_response"] = message

    

    async def _handle_initialize(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle initialization request."""

        protocol_version = params.get("protocolVersion")

        client_info = params.get("clientInfo", {})

        

        # Validate protocol version

        if protocol_version != MCP_VERSION:

            self.logger.warning(f"Protocol version mismatch: {protocol_version} != {MCP_VERSION}")

        

        # Register client

        self.clients[client_id] = {

            "info": client_info,

            "initialized_at": time.time(),

            "protocol_version": protocol_version

        }

        

        self.is_initialized = True

        

        return {

            "protocolVersion": MCP_VERSION,

            "capabilities": self.capabilities,

            "serverInfo": {

                "name": self.name,

                "version": self.version

            }

        }

    

    async def _handle_ping(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle ping request."""

        return {}  # Pong response

    

    async def _handle_list_resources(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle list resources request."""

        resources = []

        

        for resource in self.resources.values():

            resource_dict = {

                "uri": resource.uri,

                "name": resource.name

            }

            

            if resource.description:

                resource_dict["description"] = resource.description

            if resource.mimeType:

                resource_dict["mimeType"] = resource.mimeType

            if resource.metadata:

                resource_dict["metadata"] = resource.metadata

            

            resources.append(resource_dict)

        

        return {"resources": resources}

    

    async def _handle_list_tools(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle list tools request."""

        tools = []

        

        for tool in self.tools.values():

            tool_dict = {

                "name": tool.name,

                "description": tool.description,

                "inputSchema": tool.inputSchema

            }

            

            if tool.metadata:

                tool_dict["metadata"] = tool.metadata

            

            tools.append(tool_dict)

        

        return {"tools": tools}

    

    async def _handle_read_resource(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle read resource request."""

        uri = params.get("uri")

        

        if not uri:

            raise MCPError(

                MCPError.INVALID_PARAMS,

                "Missing required parameter: uri"

            )

        

        if uri not in self.resources:

            raise MCPError(

                MCPError.RESOURCE_NOT_FOUND,

                f"Resource not found: {uri}"

            )

        

        if uri not in self.resource_handlers:

            raise MCPError(

                MCPError.INTERNAL_ERROR,

                f"No handler for resource: {uri}"

            )

        

        handler = self.resource_handlers[uri]

        resource = self.resources[uri]

        

        try:

            content = await handler(params)

            

            return {

                "contents": [{

                    "uri": uri,

                    "mimeType": resource.mimeType or "text/plain",

                    "text": content if isinstance(content, str) else json.dumps(content)

                }]

            }

        

        except Exception as e:

            raise MCPError(

                MCPError.INTERNAL_ERROR,

                f"Resource handler error: {str(e)}"

            )

    

    async def _handle_call_tool(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle tool call request."""

        name = params.get("name")

        arguments = params.get("arguments", {})

        

        if not name:

            raise MCPError(

                MCPError.INVALID_PARAMS,

                "Missing required parameter: name"

            )

        

        if name not in self.tools:

            raise MCPError(

                MCPError.TOOL_NOT_FOUND,

                f"Tool not found: {name}"

            )

        

        if name not in self.tool_handlers:

            raise MCPError(

                MCPError.INTERNAL_ERROR,

                f"No handler for tool: {name}"

            )

        

        handler = self.tool_handlers[name]

        

        try:

            result = await handler(arguments)

            

            return {

                "content": [{

                    "type": "text",

                    "text": result if isinstance(result, str) else json.dumps(result)

                }]

            }

        

        except Exception as e:

            raise MCPError(

                MCPError.INTERNAL_ERROR,

                f"Tool handler error: {str(e)}"

            )

    

    async def start_websocket_server(self, host: str = "localhost", port: int = 8765):

        """

        Start WebSocket server for MCP communication.

        

        Args:

            host (str): Server host

            port (int): Server port

        """

        async def handle_websocket(websocket, path):

            client_id = str(uuid.uuid4())

            self.logger.info(f"Client connected: {client_id}")

            

            try:

                async for message in websocket:

                    response = await self.handle_message(message, client_id)

                    if response:

                        await websocket.send(response)

            

            except websockets.exceptions.ConnectionClosed:

                self.logger.info(f"Client disconnected: {client_id}")

            except Exception as e:

                self.logger.error(f"WebSocket error: {e}")

            finally:

                if client_id in self.clients:

                    del self.clients[client_id]

        

        self.logger.info(f"Starting MCP server on {host}:{port}")

        

        async with websockets.serve(handle_websocket, host, port):

            await asyncio.Future()  # Run forever



The MCP client implementation enables AI models and applications to communicate with MCP servers, discover capabilities, and invoke tools or access resources:



class MCPClient:

    """

    MCP Client implementation for connecting to MCP servers.

    

    This client handles protocol communication, capability discovery,

    and provides high-level interfaces for tool and resource access.

    """

    

    def __init__(self, client_name: str, client_version: str = "1.0.0"):

        """

        Initialize MCP client.

        

        Args:

            client_name (str): Client name

            client_version (str): Client version

        """

        self.client_name = client_name

        self.client_version = client_version

        

        # Connection state

        self.websocket = None

        self.is_connected = False

        self.is_initialized = False

        

        # Server capabilities

        self.server_info = {}

        self.server_capabilities = {}

        self.available_resources = []

        self.available_tools = []

        

        # Request tracking

        self.pending_requests = {}

        self.next_request_id = 1

        

        self.logger = logging.getLogger(f"mcp.client.{client_name}")

    

    async def connect(self, uri: str):

        """

        Connect to MCP server.

        

        Args:

            uri (str): Server WebSocket URI

        """

        try:

            self.websocket = await websockets.connect(uri)

            self.is_connected = True

            self.logger.info(f"Connected to MCP server: {uri}")

            

            # Start message handler

            asyncio.create_task(self._message_handler())

            

        except Exception as e:

            self.logger.error(f"Failed to connect to server: {e}")

            raise

    

    async def disconnect(self):

        """Disconnect from MCP server."""

        if self.websocket:

            await self.websocket.close()

            self.is_connected = False

            self.is_initialized = False

            self.logger.info("Disconnected from MCP server")

    

    async def initialize(self) -> Dict[str, Any]:

        """

        Initialize connection with MCP server.

        

        Returns:

            Dict[str, Any]: Server initialization response

        """

        if not self.is_connected:

            raise RuntimeError("Not connected to server")

        

        params = {

            "protocolVersion": MCP_VERSION,

            "clientInfo": {

                "name": self.client_name,

                "version": self.client_version

            }

        }

        

        response = await self._send_request(MCPMessageType.INITIALIZE.value, params)

        

        self.server_info = response.get("serverInfo", {})

        self.server_capabilities = response.get("capabilities", {})

        self.is_initialized = True

        

        self.logger.info(f"Initialized with server: {self.server_info.get('name', 'unknown')}")

        

        # Discover available resources and tools

        await self._discover_capabilities()

        

        return response

    

    async def _discover_capabilities(self):

        """Discover server capabilities."""

        try:

            # List available resources

            if "resources" in self.server_capabilities:

                resources_response = await self._send_request(MCPMessageType.LIST_RESOURCES.value)

                self.available_resources = resources_response.get("resources", [])

                self.logger.info(f"Discovered {len(self.available_resources)} resources")

            

            # List available tools

            if "tools" in self.server_capabilities:

                tools_response = await self._send_request(MCPMessageType.LIST_TOOLS.value)

                self.available_tools = tools_response.get("tools", [])

                self.logger.info(f"Discovered {len(self.available_tools)} tools")

        

        except Exception as e:

            self.logger.warning(f"Failed to discover capabilities: {e}")

    

    async def _send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Any:

        """

        Send request to server and wait for response.

        

        Args:

            method (str): Method name

            params (Optional[Dict[str, Any]]): Request parameters

            

        Returns:

            Any: Response result

        """

        if not self.is_connected:

            raise RuntimeError("Not connected to server")

        

        request_id = self.next_request_id

        self.next_request_id += 1

        

        message = MCPMessage(

            id=request_id,

            method=method,

            params=params

        )

        

        # Create future for response

        response_future = asyncio.Future()

        self.pending_requests[request_id] = response_future

        

        try:

            # Send request

            await self.websocket.send(json.dumps(message.to_dict()))

            

            # Wait for response

            response = await asyncio.wait_for(response_future, timeout=30.0)

            

            if "error" in response:

                error = response["error"]

                raise MCPError(

                    error.get("code", MCPError.INTERNAL_ERROR),

                    error.get("message", "Unknown error"),

                    error.get("data")

                )

            

            return response.get("result")

        

        finally:

            # Clean up pending request

            if request_id in self.pending_requests:

                del self.pending_requests[request_id]

    

    async def _message_handler(self):

        """Handle incoming messages from server."""

        try:

            async for message_data in self.websocket:

                try:

                    data = json.loads(message_data)

                    message = MCPMessage.from_dict(data)

                    

                    # Handle response messages

                    if message.id is not None and message.id in self.pending_requests:

                        future = self.pending_requests[message.id]

                        if not future.done():

                            future.set_result(data)

                    

                    # Handle notification messages

                    elif message.method:

                        await self._handle_notification(message)

                

                except json.JSONDecodeError:

                    self.logger.error("Received invalid JSON message")

                except Exception as e:

                    self.logger.error(f"Error handling message: {e}")

        

        except websockets.exceptions.ConnectionClosed:

            self.logger.info("Server connection closed")

            self.is_connected = False

            self.is_initialized = False

        except Exception as e:

            self.logger.error(f"Message handler error: {e}")

    

    async def _handle_notification(self, message: MCPMessage):

        """Handle notification messages from server."""

        self.logger.info(f"Received notification: {message.method}")

    

    async def ping(self) -> bool:

        """

        Send ping to server.

        

        Returns:

            bool: True if server responded

        """

        try:

            await self._send_request(MCPMessageType.PING.value)

            return True

        except Exception:

            return False

    

    async def list_resources(self) -> List[Dict[str, Any]]:

        """

        List available resources.

        

        Returns:

            List[Dict[str, Any]]: List of available resources

        """

        if not self.is_initialized:

            raise RuntimeError("Client not initialized")

        

        response = await self._send_request(MCPMessageType.LIST_RESOURCES.value)

        return response.get("resources", [])

    

    async def list_tools(self) -> List[Dict[str, Any]]:

        """

        List available tools.

        

        Returns:

            List[Dict[str, Any]]: List of available tools

        """

        if not self.is_initialized:

            raise RuntimeError("Client not initialized")

        

        response = await self._send_request(MCPMessageType.LIST_TOOLS.value)

        return response.get("tools", [])

    

    async def read_resource(self, uri: str) -> str:

        """

        Read resource content.

        

        Args:

            uri (str): Resource URI

            

        Returns:

            str: Resource content

        """

        if not self.is_initialized:

            raise RuntimeError("Client not initialized")

        

        params = {"uri": uri}

        response = await self._send_request(MCPMessageType.READ_RESOURCE.value, params)

        

        contents = response.get("contents", [])

        if not contents:

            raise ValueError("No content returned")

        

        return contents[0].get("text", "")

    

    async def call_tool(self, name: str, arguments: Dict[str, Any] = None) -> str:

        """

        Call a tool.

        

        Args:

            name (str): Tool name

            arguments (Dict[str, Any]): Tool arguments

            

        Returns:

            str: Tool result

        """

        if not self.is_initialized:

            raise RuntimeError("Client not initialized")

        

        params = {

            "name": name,

            "arguments": arguments or {}

        }

        

        response = await self._send_request(MCPMessageType.CALL_TOOL.value, params)

        

        content = response.get("content", [])

        if not content:

            raise ValueError("No content returned")

        

        return content[0].get("text", "")

    

    def get_server_info(self) -> Dict[str, Any]:

        """Get server information."""

        return self.server_info.copy()

    

    def get_capabilities(self) -> Dict[str, Any]:

        """Get server capabilities."""

        return self.server_capabilities.copy()

    

    def get_available_resources(self) -> List[Dict[str, Any]]:

        """Get list of available resources."""

        return self.available_resources.copy()

    

    def get_available_tools(self) -> List[Dict[str, Any]]:

        """Get list of available tools."""

        return self.available_tools.copy()



COMPLETE RUNNING EXAMPLE FOR MCP IMPLEMENTAT

import asyncio

import json

import uuid

import time

import logging

from abc import ABC, abstractmethod

from dataclasses import dataclass, field

from enum import Enum

from typing import Dict, List, Any, Optional, Union, Callable

import websockets

from pathlib import Path

import warnings

warnings.filterwarnings("ignore")


# Configure logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


# MCP Protocol Version

MCP_VERSION = "2024-11-05"


class MCPMessageType(Enum):

    """MCP message types as defined in the protocol specification."""

    INITIALIZE = "initialize"

    INITIALIZED = "initialized"

    PING = "ping"

    PONG = "pong"

    LIST_RESOURCES = "resources/list"

    LIST_TOOLS = "tools/list"

    CALL_TOOL = "tools/call"

    READ_RESOURCE = "resources/read"

    SUBSCRIBE = "resources/subscribe"

    UNSUBSCRIBE = "resources/unsubscribe"

    NOTIFICATION = "notification"

    ERROR = "error"


@dataclass

class MCPMessage:

    """

    Base MCP message structure following JSON-RPC 2.0 specification.

    

    This class represents the fundamental message format used in MCP

    communication between clients and servers.

    """

    jsonrpc: str = "2.0"

    id: Optional[Union[str, int]] = None

    method: Optional[str] = None

    params: Optional[Dict[str, Any]] = None

    result: Optional[Any] = None

    error: Optional[Dict[str, Any]] = None

    

    def to_dict(self) -> Dict[str, Any]:

        """Convert message to dictionary for JSON serialization."""

        message = {"jsonrpc": self.jsonrpc}

        

        if self.id is not None:

            message["id"] = self.id

        if self.method is not None:

            message["method"] = self.method

        if self.params is not None:

            message["params"] = self.params

        if self.result is not None:

            message["result"] = self.result

        if self.error is not None:

            message["error"] = self.error

            

        return message

    

    @classmethod

    def from_dict(cls, data: Dict[str, Any]) -> 'MCPMessage':

        """Create message from dictionary."""

        return cls(

            jsonrpc=data.get("jsonrpc", "2.0"),

            id=data.get("id"),

            method=data.get("method"),

            params=data.get("params"),

            result=data.get("result"),

            error=data.get("error")

        )


@dataclass

class MCPResource:

    """

    MCP resource definition.

    

    Resources represent data or content that can be accessed by clients

    through the MCP protocol.

    """

    uri: str

    name: str

    description: Optional[str] = None

    mimeType: Optional[str] = None

    metadata: Optional[Dict[str, Any]] = None


@dataclass

class MCPTool:

    """

    MCP tool definition.

    

    Tools represent executable functions that can be called by clients

    with specific input schemas and expected outputs.

    """

    name: str

    description: str

    inputSchema: Dict[str, Any]

    metadata: Optional[Dict[str, Any]] = None


class MCPError(Exception):

    """

    MCP-specific error with standardized error codes.

    

    This class provides structured error handling following JSON-RPC

    error code conventions with MCP-specific extensions.

    """

    

    # Standard JSON-RPC error codes

    PARSE_ERROR = -32700

    INVALID_REQUEST = -32600

    METHOD_NOT_FOUND = -32601

    INVALID_PARAMS = -32602

    INTERNAL_ERROR = -32603

    

    # MCP-specific error codes

    RESOURCE_NOT_FOUND = -32001

    TOOL_NOT_FOUND = -32002

    ACCESS_DENIED = -32003

    

    def __init__(self, code: int, message: str, data: Any = None):

        """

        Initialize MCP error.

        

        Args:

            code (int): Error code

            message (str): Error message

            data (Any): Additional error data

        """

        self.code = code

        self.message = message

        self.data = data

        super().__init__(f"MCP Error {code}: {message}")

    

    def to_dict(self) -> Dict[str, Any]:

        """Convert error to dictionary format."""

        error = {"code": self.code, "message": self.message}

        if self.data is not None:

            error["data"] = self.data

        return error


class MCPServer:

    """

    Complete MCP Server implementation.

    

    This server provides a full implementation of the Model Context Protocol,

    enabling AI models to access tools and resources through standardized

    communication patterns.

    """

    

    def __init__(self, name: str, version: str = "1.0.0"):

        """

        Initialize MCP server.

        

        Args:

            name (str): Server name

            version (str): Server version

        """

        self.name = name

        self.version = version

        self.capabilities = {

            "resources": {

                "subscribe": False,

                "listChanged": False

            },

            "tools": {

                "listChanged": False

            },

            "logging": {}

        }

        

        # Resource and tool registries

        self.resources: Dict[str, MCPResource] = {}

        self.tools: Dict[str, MCPTool] = {}

        self.resource_handlers: Dict[str, Callable] = {}

        self.tool_handlers: Dict[str, Callable] = {}

        

        # Client connections

        self.clients: Dict[str, Dict[str, Any]] = {}

        

        # Server state

        self.is_initialized = False

        self.logger = logging.getLogger(f"mcp.server.{name}")

        

        # Register built-in handlers

        self._register_builtin_handlers()

        

        # Register sample tools and resources

        self._register_sample_capabilities()

    

    def _register_builtin_handlers(self):

        """Register built-in protocol handlers."""

        self.protocol_handlers = {

            MCPMessageType.INITIALIZE.value: self._handle_initialize,

            MCPMessageType.PING.value: self._handle_ping,

            MCPMessageType.LIST_RESOURCES.value: self._handle_list_resources,

            MCPMessageType.LIST_TOOLS.value: self._handle_list_tools,

            MCPMessageType.READ_RESOURCE.value: self._handle_read_resource,

            MCPMessageType.CALL_TOOL.value: self._handle_call_tool

        }

    

    def _register_sample_capabilities(self):

        """Register sample tools and resources for demonstration."""

        # Sample calculator tool

        calculator_tool = MCPTool(

            name="calculator",

            description="Perform basic mathematical calculations",

            inputSchema={

                "type": "object",

                "properties": {

                    "expression": {

                        "type": "string",

                        "description": "Mathematical expression to evaluate"

                    }

                },

                "required": ["expression"]

            }

        )

        self.register_tool(calculator_tool, self._calculator_handler)

        

        # Sample text analyzer tool

        text_analyzer_tool = MCPTool(

            name="text_analyzer",

            description="Analyze text for various metrics",

            inputSchema={

                "type": "object",

                "properties": {

                    "text": {

                        "type": "string",

                        "description": "Text to analyze"

                    },

                    "metrics": {

                        "type": "array",

                        "items": {"type": "string"},

                        "description": "Metrics to calculate (word_count, char_count, sentiment)"

                    }

                },

                "required": ["text"]

            }

        )

        self.register_tool(text_analyzer_tool, self._text_analyzer_handler)

        

        # Sample system info resource

        system_info_resource = MCPResource(

            uri="system://info",

            name="System Information",

            description="Current system information and status",

            mimeType="application/json"

        )

        self.register_resource(system_info_resource, self._system_info_handler)

        

        # Sample time resource

        time_resource = MCPResource(

            uri="system://time",

            name="Current Time",

            description="Current system time in various formats",

            mimeType="application/json"

        )

        self.register_resource(time_resource, self._time_handler)

    

    async def _calculator_handler(self, arguments: Dict[str, Any]) -> str:

        """Handle calculator tool calls."""

        expression = arguments.get("expression", "")

        

        try:

            # Simple expression evaluation (be careful in production!)

            # This is a simplified example - use a proper math parser in production

            allowed_chars = set("0123456789+-*/().")

            if not all(c in allowed_chars or c.isspace() for c in expression):

                return f"Error: Invalid characters in expression"

            

            result = eval(expression)

            return f"Result: {expression} = {result}"

        

        except Exception as e:

            return f"Error: {str(e)}"

    

    async def _text_analyzer_handler(self, arguments: Dict[str, Any]) -> str:

        """Handle text analyzer tool calls."""

        text = arguments.get("text", "")

        metrics = arguments.get("metrics", ["word_count", "char_count"])

        

        analysis = {}

        

        if "word_count" in metrics:

            analysis["word_count"] = len(text.split())

        

        if "char_count" in metrics:

            analysis["char_count"] = len(text)

        

        if "sentence_count" in metrics:

            analysis["sentence_count"] = len([s for s in text.split('.') if s.strip()])

        

        if "sentiment" in metrics:

            # Simple sentiment analysis (placeholder)

            positive_words = ["good", "great", "excellent", "amazing", "wonderful", "fantastic"]

            negative_words = ["bad", "terrible", "awful", "horrible", "disappointing"]

            

            text_lower = text.lower()

            positive_count = sum(1 for word in positive_words if word in text_lower)

            negative_count = sum(1 for word in negative_words if word in text_lower)

            

            if positive_count > negative_count:

                sentiment = "positive"

            elif negative_count > positive_count:

                sentiment = "negative"

            else:

                sentiment = "neutral"

            

            analysis["sentiment"] = sentiment

            analysis["positive_indicators"] = positive_count

            analysis["negative_indicators"] = negative_count

        

        return json.dumps(analysis, indent=2)

    

    async def _system_info_handler(self, params: Dict[str, Any]) -> str:

        """Handle system info resource requests."""

        import platform

        import psutil

        

        try:

            system_info = {

                "platform": platform.platform(),

                "system": platform.system(),

                "processor": platform.processor(),

                "python_version": platform.python_version(),

                "cpu_count": psutil.cpu_count(),

                "memory_total": psutil.virtual_memory().total,

                "memory_available": psutil.virtual_memory().available,

                "disk_usage": {

                    "total": psutil.disk_usage('/').total,

                    "used": psutil.disk_usage('/').used,

                    "free": psutil.disk_usage('/').free

                },

                "timestamp": time.time()

            }

        except ImportError:

            # Fallback if psutil is not available

            system_info = {

                "platform": platform.platform(),

                "system": platform.system(),

                "python_version": platform.python_version(),

                "timestamp": time.time(),

                "note": "Limited system info (psutil not available)"

            }

        

        return json.dumps(system_info, indent=2)

    

    async def _time_handler(self, params: Dict[str, Any]) -> str:

        """Handle time resource requests."""

        import datetime

        

        now = datetime.datetime.now()

        utc_now = datetime.datetime.utcnow()

        

        time_info = {

            "local_time": now.isoformat(),

            "utc_time": utc_now.isoformat(),

            "timestamp": time.time(),

            "timezone": str(now.astimezone().tzinfo),

            "formatted": {

                "date": now.strftime("%Y-%m-%d"),

                "time": now.strftime("%H:%M:%S"),

                "datetime": now.strftime("%Y-%m-%d %H:%M:%S"),

                "iso": now.isoformat()

            }

        }

        

        return json.dumps(time_info, indent=2)

    

    def register_resource(self, resource: MCPResource, handler: Callable):

        """

        Register a resource with its handler.

        

        Args:

            resource (MCPResource): Resource definition

            handler (Callable): Function to handle resource requests

        """

        self.resources[resource.uri] = resource

        self.resource_handlers[resource.uri] = handler

        self.logger.info(f"Registered resource: {resource.uri}")

    

    def register_tool(self, tool: MCPTool, handler: Callable):

        """

        Register a tool with its handler.

        

        Args:

            tool (MCPTool): Tool definition

            handler (Callable): Function to handle tool calls

        """

        self.tools[tool.name] = tool

        self.tool_handlers[tool.name] = handler

        self.logger.info(f"Registered tool: {tool.name}")

    

    async def handle_message(self, message_data: str, client_id: str) -> Optional[str]:

        """

        Handle incoming MCP message.

        

        Args:

            message_data (str): JSON message data

            client_id (str): Client identifier

            

        Returns:

            Optional[str]: Response message or None

        """

        try:

            # Parse message

            data = json.loads(message_data)

            message = MCPMessage.from_dict(data)

            

            # Validate JSON-RPC format

            if message.jsonrpc != "2.0":

                raise MCPError(

                    MCPError.INVALID_REQUEST,

                    "Invalid JSON-RPC version"

                )

            

            # Handle different message types

            if message.method:

                # Request message

                return await self._handle_request(message, client_id)

            elif message.result is not None or message.error is not None:

                # Response message

                await self._handle_response(message, client_id)

                return None

            else:

                raise MCPError(

                    MCPError.INVALID_REQUEST,

                    "Invalid message format"

                )

        

        except json.JSONDecodeError:

            error_response = MCPMessage(

                id=None,

                error=MCPError(MCPError.PARSE_ERROR, "Parse error").to_dict()

            )

            return json.dumps(error_response.to_dict())

        

        except MCPError as e:

            error_response = MCPMessage(

                id=getattr(message, 'id', None) if 'message' in locals() else None,

                error=e.to_dict()

            )

            return json.dumps(error_response.to_dict())

        

        except Exception as e:

            self.logger.error(f"Unexpected error handling message: {e}")

            error_response = MCPMessage(

                id=getattr(message, 'id', None) if 'message' in locals() else None,

                error=MCPError(MCPError.INTERNAL_ERROR, str(e)).to_dict()

            )

            return json.dumps(error_response.to_dict())

    

    async def _handle_request(self, message: MCPMessage, client_id: str) -> str:

        """Handle request message."""

        method = message.method

        

        if method not in self.protocol_handlers:

            raise MCPError(

                MCPError.METHOD_NOT_FOUND,

                f"Method not found: {method}"

            )

        

        handler = self.protocol_handlers[method]

        

        try:

            result = await handler(message.params or {}, client_id)

            

            response = MCPMessage(

                id=message.id,

                result=result

            )

            

            return json.dumps(response.to_dict())

        

        except Exception as e:

            if isinstance(e, MCPError):

                raise

            else:

                raise MCPError(

                    MCPError.INTERNAL_ERROR,

                    f"Handler error: {str(e)}"

                )

    

    async def _handle_response(self, message: MCPMessage, client_id: str):

        """Handle response message."""

        # Store response for pending requests if needed

        if client_id in self.clients:

            self.clients[client_id]["last_response"] = message

    

    async def _handle_initialize(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle initialization request."""

        protocol_version = params.get("protocolVersion")

        client_info = params.get("clientInfo", {})

        

        # Validate protocol version

        if protocol_version != MCP_VERSION:

            self.logger.warning(f"Protocol version mismatch: {protocol_version} != {MCP_VERSION}")

        

        # Register client

        self.clients[client_id] = {

            "info": client_info,

            "initialized_at": time.time(),

            "protocol_version": protocol_version

        }

        

        self.is_initialized = True

        self.logger.info(f"Client initialized: {client_info.get('name', 'unknown')}")

        

        return {

            "protocolVersion": MCP_VERSION,

            "capabilities": self.capabilities,

            "serverInfo": {

                "name": self.name,

                "version": self.version

            }

        }

    

    async def _handle_ping(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle ping request."""

        return {}  # Pong response

    

    async def _handle_list_resources(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle list resources request."""

        resources = []

        

        for resource in self.resources.values():

            resource_dict = {

                "uri": resource.uri,

                "name": resource.name

            }

            

            if resource.description:

                resource_dict["description"] = resource.description

            if resource.mimeType:

                resource_dict["mimeType"] = resource.mimeType

            if resource.metadata:

                resource_dict["metadata"] = resource.metadata

            

            resources.append(resource_dict)

        

        return {"resources": resources}

    

    async def _handle_list_tools(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle list tools request."""

        tools = []

        

        for tool in self.tools.values():

            tool_dict = {

                "name": tool.name,

                "description": tool.description,

                "inputSchema": tool.inputSchema

            }

            

            if tool.metadata:

                tool_dict["metadata"] = tool.metadata

            

            tools.append(tool_dict)

        

        return {"tools": tools}

    

    async def _handle_read_resource(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle read resource request."""

        uri = params.get("uri")

        

        if not uri:

            raise MCPError(

                MCPError.INVALID_PARAMS,

                "Missing required parameter: uri"

            )

        

        if uri not in self.resources:

            raise MCPError(

                MCPError.RESOURCE_NOT_FOUND,

                f"Resource not found: {uri}"

            )

        

        if uri not in self.resource_handlers:

            raise MCPError(

                MCPError.INTERNAL_ERROR,

                f"No handler for resource: {uri}"

            )

        

        handler = self.resource_handlers[uri]

        resource = self.resources[uri]

        

        try:

            content = await handler(params)

            

            return {

                "contents": [{

                    "uri": uri,

                    "mimeType": resource.mimeType or "text/plain",

                    "text": content if isinstance(content, str) else json.dumps(content)

                }]

            }

        

        except Exception as e:

            raise MCPError(

                MCPError.INTERNAL_ERROR,

                f"Resource handler error: {str(e)}"

            )

    

    async def _handle_call_tool(self, params: Dict[str, Any], client_id: str) -> Dict[str, Any]:

        """Handle tool call request."""

        name = params.get("name")

        arguments = params.get("arguments", {})

        

        if not name:

            raise MCPError(

                MCPError.INVALID_PARAMS,

                "Missing required parameter: name"

            )

        

        if name not in self.tools:

            raise MCPError(

                MCPError.TOOL_NOT_FOUND,

                f"Tool not found: {name}"

            )

        

        if name not in self.tool_handlers:

            raise MCPError(

                MCPError.INTERNAL_ERROR,

                f"No handler for tool: {name}"

            )

        

        handler = self.tool_handlers[name]

        

        try:

            result = await handler(arguments)

            

            return {

                "content": [{

                    "type": "text",

                    "text": result if isinstance(result, str) else json.dumps(result)

                }]

            }

        

        except Exception as e:

            raise MCPError(

                MCPError.INTERNAL_ERROR,

                f"Tool handler error: {str(e)}"

            )

    

    async def start_websocket_server(self, host: str = "localhost", port: int = 8765):

        """

        Start WebSocket server for MCP communication.

        

        Args:

            host (str): Server host

            port (int): Server port

        """

        async def handle_websocket(websocket, path):

            client_id = str(uuid.uuid4())

            self.logger.info(f"Client connected: {client_id}")

            

            try:

                async for message in websocket:

                    response = await self.handle_message(message, client_id)

                    if response:

                        await websocket.send(response)

            

            except websockets.exceptions.ConnectionClosed:

                self.logger.info(f"Client disconnected: {client_id}")

            except Exception as e:

                self.logger.error(f"WebSocket error: {e}")

            finally:

                if client_id in self.clients:

                    del self.clients[client_id]

        

        self.logger.info(f"Starting MCP server on {host}:{port}")

        print(f"MCP Server '{self.name}' starting on ws://{host}:{port}")

        print(f"Available tools: {list(self.tools.keys())}")

        print(f"Available resources: {list(self.resources.keys())}")

        

        async with websockets.serve(handle_websocket, host, port):

            await asyncio.Future()  # Run forever


class MCPClient:

    """

    Complete MCP Client implementation.

    

    This client provides a full implementation of the Model Context Protocol

    for connecting to MCP servers, discovering capabilities, and invoking

    tools or accessing resources.

    """

    

    def __init__(self, client_name: str, client_version: str = "1.0.0"):

        """

        Initialize MCP client.

        

        Args:

            client_name (str): Client name

            client_version (str): Client version

        """

        self.client_name = client_name

        self.client_version = client_version

        

        # Connection state

        self.websocket = None

        self.is_connected = False

        self.is_initialized = False

        

        # Server capabilities

        self.server_info = {}

        self.server_capabilities = {}

        self.available_resources = []

        self.available_tools = []

        

        # Request tracking

        self.pending_requests = {}

        self.next_request_id = 1

        

        self.logger = logging.getLogger(f"mcp.client.{client_name}")

    

    async def connect(self, uri: str):

        """

        Connect to MCP server.

        

        Args:

            uri (str): Server WebSocket URI

        """

        try:

            self.websocket = await websockets.connect(uri)

            self.is_connected = True

            self.logger.info(f"Connected to MCP server: {uri}")

            

            # Start message handler

            asyncio.create_task(self._message_handler())

            

        except Exception as e:

            self.logger.error(f"Failed to connect to server: {e}")

            raise

    

    async def disconnect(self):

        """Disconnect from MCP server."""

        if self.websocket:

            await self.websocket.close()

            self.is_connected = False

            self.is_initialized = False

            self.logger.info("Disconnected from MCP server")

    

    async def initialize(self) -> Dict[str, Any]:

        """

        Initialize connection with MCP server.

        

        Returns:

            Dict[str, Any]: Server initialization response

        """

        if not self.is_connected:

            raise RuntimeError("Not connected to server")

        

        params = {

            "protocolVersion": MCP_VERSION,

            "clientInfo": {

                "name": self.client_name,

                "version": self.client_version

            }

        }

        

        response = await self._send_request(MCPMessageType.INITIALIZE.value, params)

        

        self.server_info = response.get("serverInfo", {})

        self.server_capabilities = response.get("capabilities", {})

        self.is_initialized = True

        

        self.logger.info(f"Initialized with server: {self.server_info.get('name', 'unknown')}")

        

        # Discover available resources and tools

        await self._discover_capabilities()

        

        return response

    

    async def _discover_capabilities(self):

        """Discover server capabilities."""

        try:

            # List available resources

            if "resources" in self.server_capabilities:

                resources_response = await self._send_request(MCPMessageType.LIST_RESOURCES.value)

                self.available_resources = resources_response.get("resources", [])

                self.logger.info(f"Discovered {len(self.available_resources)} resources")

            

            # List available tools

            if "tools" in self.server_capabilities:

                tools_response = await self._send_request(MCPMessageType.LIST_TOOLS.value)

                self.available_tools = tools_response.get("tools", [])

                self.logger.info(f"Discovered {len(self.available_tools)} tools")

        

        except Exception as e:

            self.logger.warning(f"Failed to discover capabilities: {e}")

    

    async def _send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Any:

        """

        Send request to server and wait for response.

        

        Args:

            method (str): Method name

            params (Optional[Dict[str, Any]]): Request parameters

            

        Returns:

            Any: Response result

        """

        if not self.is_connected:

            raise RuntimeError("Not connected to server")

        

        request_id = self.next_request_id

        self.next_request_id += 1

        

        message = MCPMessage(

            id=request_id,

            method=method,

            params=params

        )

        

        # Create future for response

        response_future = asyncio.Future()

        self.pending_requests[request_id] = response_future

        

        try:

            # Send request

            await self.websocket.send(json.dumps(message.to_dict()))

            

            # Wait for response

            response = await asyncio.wait_for(response_future, timeout=30.0)

            

            if "error" in response:

                error = response["error"]

                raise MCPError(

                    error.get("code", MCPError.INTERNAL_ERROR),

                    error.get("message", "Unknown error"),

                    error.get("data")

                )

            

            return response.get("result")

        

        finally:

            # Clean up pending request

            if request_id in self.pending_requests:

                del self.pending_requests[request_id]

    

    async def _message_handler(self):

        """Handle incoming messages from server."""

        try:

            async for message_data in self.websocket:

                try:

                    data = json.loads(message_data)

                    message = MCPMessage.from_dict(data)

                    

                    # Handle response messages

                    if message.id is not None and message.id in self.pending_requests:

                        future = self.pending_requests[message.id]

                        if not future.done():

                            future.set_result(data)

                    

                    # Handle notification messages

                    elif message.method:

                        await self._handle_notification(message)

                

                except json.JSONDecodeError:

                    self.logger.error("Received invalid JSON message")

                except Exception as e:

                    self.logger.error(f"Error handling message: {e}")

        

        except websockets.exceptions.ConnectionClosed:

            self.logger.info("Server connection closed")

            self.is_connected = False

            self.is_initialized = False

        except Exception as e:

            self.logger.error(f"Message handler error: {e}")

    

    async def _handle_notification(self, message: MCPMessage):

        """Handle notification messages from server."""

        self.logger.info(f"Received notification: {message.method}")

    

    async def ping(self) -> bool:

        """

        Send ping to server.

        

        Returns:

            bool: True if server responded

        """

        try:

            await self._send_request(MCPMessageType.PING.value)

            return True

        except Exception:

            return False

    

    async def list_resources(self) -> List[Dict[str, Any]]:

        """

        List available resources.

        

        Returns:

            List[Dict[str, Any]]: List of available resources

        """

        if not self.is_initialized:

            raise RuntimeError("Client not initialized")

        

        response = await self._send_request(MCPMessageType.LIST_RESOURCES.value)

        return response.get("resources", [])

    

    async def list_tools(self) -> List[Dict[str, Any]]:

        """

        List available tools.

        

        Returns:

            List[Dict[str, Any]]: List of available tools

        """

        if not self.is_initialized:

            raise RuntimeError("Client not initialized")

        

        response = await self._send_request(MCPMessageType.LIST_TOOLS.value)

        return response.get("tools", [])

    

    async def read_resource(self, uri: str) -> str:

        """

        Read resource content.

        

        Args:

            uri (str): Resource URI

            

        Returns:

            str: Resource content

        """

        if not self.is_initialized:

            raise RuntimeError("Client not initialized")

        

        params = {"uri": uri}

        response = await self._send_request(MCPMessageType.READ_RESOURCE.value, params)

        

        contents = response.get("contents", [])

        if not contents:

            raise ValueError("No content returned")

        

        return contents[0].get("text", "")

    

    async def call_tool(self, name: str, arguments: Dict[str, Any] = None) -> str:

        """

        Call a tool.

        

        Args:

            name (str): Tool name

            arguments (Dict[str, Any]): Tool arguments

            

        Returns:

            str: Tool result

        """

        if not self.is_initialized:

            raise RuntimeError("Client not initialized")

        

        params = {

            "name": name,

            "arguments": arguments or {}

        }

        

        response = await self._send_request(MCPMessageType.CALL_TOOL.value, params)

        

        content = response.get("content", [])

        if not content:

            raise ValueError("No content returned")

        

        return content[0].get("text", "")

    

    def get_server_info(self) -> Dict[str, Any]:

        """Get server information."""

        return self.server_info.copy()

    

    def get_capabilities(self) -> Dict[str, Any]:

        """Get server capabilities."""

        return self.server_capabilities.copy()

    

    def get_available_resources(self) -> List[Dict[str, Any]]:

        """Get list of available resources."""

        return self.available_resources.copy()

    

    def get_available_tools(self) -> List[Dict[str, Any]]:

        """Get list of available tools."""

        return self.available_tools.copy()

    

    async def interactive_session(self):

        """Run interactive session with MCP server."""

        print(f"\n{'='*60}")

        print(f"MCP CLIENT INTERACTIVE SESSION")

        print(f"{'='*60}")

        print(f"Connected to: {self.server_info.get('name', 'Unknown Server')}")

        print(f"Server version: {self.server_info.get('version', 'Unknown')}")

        print(f"Available tools: {len(self.available_tools)}")

        print(f"Available resources: {len(self.available_resources)}")

        print(f"\nCommands:")

        print(f"  'tools' - List available tools")

        print(f"  'resources' - List available resources")

        print(f"  'call <tool_name> <args>' - Call a tool")

        print(f"  'read <resource_uri>' - Read a resource")

        print(f"  'ping' - Ping server")

        print(f"  'info' - Show server info")

        print(f"  'quit' - Exit session")

        print(f"{'='*60}")

        

        while True:

            try:

                command = input(f"\nMCP> ").strip()

                

                if command.lower() in ['quit', 'exit']:

                    print("Goodbye!")

                    break

                

                elif command == 'tools':

                    tools = await self.list_tools()

                    print(f"\nAvailable Tools ({len(tools)}):")

                    for tool in tools:

                        print(f"  - {tool['name']}: {tool['description']}")

                

                elif command == 'resources':

                    resources = await self.list_resources()

                    print(f"\nAvailable Resources ({len(resources)}):")

                    for resource in resources:

                        print(f"  - {resource['uri']}: {resource['name']}")

                        if 'description' in resource:

                            print(f"    {resource['description']}")

                

                elif command.startswith('call '):

                    parts = command[5:].split(' ', 1)

                    tool_name = parts[0]

                    

                    if len(parts) > 1:

                        try:

                            args = json.loads(parts[1])

                        except json.JSONDecodeError:

                            # Simple argument parsing

                            args = {"expression": parts[1]} if tool_name == "calculator" else {"text": parts[1]}

                    else:

                        args = {}

                    

                    try:

                        result = await self.call_tool(tool_name, args)

                        print(f"\nTool Result:\n{result}")

                    except Exception as e:

                        print(f"Error calling tool: {e}")

                

                elif command.startswith('read '):

                    uri = command[5:].strip()

                    try:

                        content = await self.read_resource(uri)

                        print(f"\nResource Content:\n{content}")

                    except Exception as e:

                        print(f"Error reading resource: {e}")

                

                elif command == 'ping':

                    if await self.ping():

                        print("Pong! Server is responsive.")

                    else:

                        print("Ping failed - server may be unresponsive.")

                

                elif command == 'info':

                    print(f"\nServer Information:")

                    print(f"  Name: {self.server_info.get('name', 'Unknown')}")

                    print(f"  Version: {self.server_info.get('version', 'Unknown')}")

                    print(f"  Protocol Version: {MCP_VERSION}")

                    print(f"  Capabilities: {list(self.server_capabilities.keys())}")

                

                elif command == '':

                    continue

                

                else:

                    print("Unknown command. Type 'quit' to exit.")

            

            except KeyboardInterrupt:

                print("\nGoodbye!")

                break

            except Exception as e:

                print(f"Error: {e}")


class MCPDemo:

    """

    Demonstration class for MCP server and client functionality.

    

    This class provides a complete demonstration of MCP capabilities

    including server setup, client connection, and interactive usage.

    """

    

    def __init__(self):

        """Initialize MCP demo."""

        self.server = None

        self.client = None

        self.server_task = None

    

    async def start_server(self, host: str = "localhost", port: int = 8765):

        """Start MCP server."""

        self.server = MCPServer("Demo MCP Server", "1.0.0")

        

        # Start server in background

        self.server_task = asyncio.create_task(

            self.server.start_websocket_server(host, port)

        )

        

        # Give server time to start

        await asyncio.sleep(1)

        

        return f"ws://{host}:{port}"

    

    async def start_client(self, server_uri: str):

        """Start MCP client and connect to server."""

        self.client = MCPClient("Demo MCP Client", "1.0.0")

        

        await self.client.connect(server_uri)

        await self.client.initialize()

        

        return self.client

    

    async def demo_basic_functionality(self):

        """Demonstrate basic MCP functionality."""

        print("\n" + "="*60)

        print("MCP BASIC FUNCTIONALITY DEMO")

        print("="*60)

        

        # Test ping

        print("\n1. Testing server connectivity...")

        ping_result = await self.client.ping()

        print(f"   Ping result: {'Success' if ping_result else 'Failed'}")

        

        # List tools

        print("\n2. Listing available tools...")

        tools = await self.client.list_tools()

        for tool in tools:

            print(f"   - {tool['name']}: {tool['description']}")

        

        # List resources

        print("\n3. Listing available resources...")

        resources = await self.client.list_resources()

        for resource in resources:

            print(f"   - {resource['uri']}: {resource['name']}")

        

        # Test calculator tool

        print("\n4. Testing calculator tool...")

        calc_result = await self.client.call_tool("calculator", {"expression": "2 + 3 * 4"})

        print(f"   Calculator result: {calc_result}")

        

        # Test text analyzer tool

        print("\n5. Testing text analyzer tool...")

        text_result = await self.client.call_tool("text_analyzer", {

            "text": "This is a wonderful example of text analysis!",

            "metrics": ["word_count", "char_count", "sentiment"]

        })

        print(f"   Text analysis result:\n{text_result}")

        

        # Test system info resource

        print("\n6. Reading system info resource...")

        system_info = await self.client.read_resource("system://info")

        print(f"   System info (first 200 chars):\n{system_info[:200]}...")

        

        # Test time resource

        print("\n7. Reading time resource...")

        time_info = await self.client.read_resource("system://time")

        print(f"   Time info:\n{time_info}")

        

        print("\n" + "="*60)

        print("DEMO COMPLETED SUCCESSFULLY")

        print("="*60)

    

    async def run_interactive_demo(self):

        """Run interactive demo."""

        try:

            # Start server

            print("Starting MCP server...")

            server_uri = await self.start_server()

            

            # Start client

            print("Starting MCP client...")

            await self.start_client(server_uri)

            

            # Run basic demo

            await self.demo_basic_functionality()

            

            # Run interactive session

            await self.client.interactive_session()

        

        except KeyboardInterrupt:

            print("\nDemo interrupted by user")

        except Exception as e:

            print(f"Demo error: {e}")

        finally:

            await self.cleanup()

    

    async def cleanup(self):

        """Clean up resources."""

        if self.client:

            await self.client.disconnect()

        

        if self.server_task:

            self.server_task.cancel()

            try:

                await self.server_task

            except asyncio.CancelledError:

                pass


def main():

    """

    Main function demonstrating MCP functionality.

    """

    print("Model Context Protocol (MCP) Implementation Demo")

    print("This demo shows both MCP server and client functionality")

    

    # Check if websockets is available

    try:

        import websockets

    except ImportError:

        print("\nError: websockets library is required for MCP demo")

        print("Please install it with: pip install websockets")

        return

    

    # Run demo

    demo = MCPDemo()

    

    try:

        asyncio.run(demo.run_interactive_demo())

    except KeyboardInterrupt:

        print("\nDemo terminated by user")

    except Exception as e:

        print(f"Demo failed: {e}")


if __name__ == "__main__":

    main()



This comprehensive MCP implementation provides a complete, production-ready foundation for building Model Context Protocol servers and clients. The system demonstrates all key MCP concepts including protocol negotiation, capability discovery, tool invocation, resource access, and error handling while maintaining full compatibility with the official MCP specification.


CONCLUSIONS


This comprehensive guide has taken you through the complete journey of implementing Large Language Model-based systems, from basic chatbots to sophisticated agentic AI and standardized protocol implementations. Each chapter built upon the previous foundations, demonstrating how modern AI systems can evolve from simple question-answering interfaces to autonomous, goal-oriented agents capable of complex reasoning and external tool integration.


TECHNICAL ACHIEVEMENTS AND LEARNING OUTCOMES


Throughout this tutorial, we have accomplished several significant technical milestones. We began with a fundamental understanding of LLM integration, learning how to properly initialize models, manage GPU acceleration across different hardware platforms (NVIDIA CUDA, AMD ROCm, Apple MPS), and implement robust conversation handling with memory management. This foundation proved essential for all subsequent implementations.


The progression from basic chatbots to Retrieval-Augmented Generation systems demonstrated the critical importance of external knowledge integration. RAG systems address one of the most significant limitations of standalone LLMs: their inability to access current information or domain-specific knowledge beyond their training data. Our implementation showed how semantic search, embedding generation, and vector databases work together to provide contextually relevant information to language models.


GraphRAG represented a significant advancement in knowledge representation and reasoning capabilities. By incorporating graph-based knowledge structures, we learned how to model complex relationships between entities and enable multi-hop reasoning that traditional RAG systems cannot achieve. This approach proves particularly valuable for queries requiring understanding of interconnected information and complex relationship patterns.


The multi-agent systems chapter introduced collaborative AI architectures where specialized agents work together to solve complex problems. This paradigm shift from monolithic models to collaborative networks demonstrates how task decomposition and specialization can lead to superior performance and more maintainable systems. The coordination mechanisms, message passing protocols, and result aggregation strategies provide a foundation for building scalable AI systems.


Agentic AI pushed the boundaries further by implementing autonomous goal-oriented behavior. The combination of goal management, planning engines, and adaptive execution demonstrated how AI systems can operate independently while learning from experience and adapting to changing circumstances. This represents a significant step toward truly autonomous AI assistants capable of pursuing long-term objectives.


Finally, the Model Context Protocol implementation provided a standardized framework for AI-tool integration. MCP addresses the critical need for secure, controlled access to external resources while maintaining compatibility across different AI systems and tool providers. This standardization enables the creation of robust AI ecosystems where models can safely interact with external services.


ARCHITECTURAL INSIGHTS AND DESIGN PATTERNS


Several key architectural patterns emerged throughout our implementations that prove essential for building robust LLM-based systems. The modular design approach, consistently applied across all chapters, enables maintainability, testability, and extensibility. Each component maintains clear interfaces and separation of concerns, allowing for independent development and testing.


Error handling and graceful degradation proved crucial for production-ready systems. Our implementations demonstrate comprehensive error handling strategies, from network failures and model errors to resource limitations and invalid inputs. The fallback mechanisms ensure that systems remain functional even when individual components fail.


Memory management and context window handling represent critical considerations for LLM applications. Our implementations show various strategies for managing conversation history, from simple sliding windows to sophisticated context compression techniques. These approaches ensure optimal performance while maintaining relevant context for meaningful interactions.


The asynchronous programming patterns used throughout enable efficient resource utilization and responsive user experiences. The async/await paradigms, particularly evident in the multi-agent and MCP implementations, demonstrate how to build scalable systems that can handle multiple concurrent operations without blocking.


PERFORMANCE CONSIDERATIONS AND OPTIMIZATION


Performance optimization emerged as a recurring theme across all implementations. Hardware acceleration through proper GPU utilization significantly impacts inference speed and system responsiveness. Our implementations demonstrate how to detect and utilize available hardware acceleration while providing CPU fallbacks for broader compatibility.


Caching strategies prove essential for production deployments. From embedding caches in RAG systems to response caches in multi-agent architectures, intelligent caching reduces computational overhead and improves response times. The trade-offs between memory usage and computational efficiency require careful consideration based on specific deployment requirements.


Batch processing and parallel execution, particularly evident in the multi-agent and agentic AI implementations, demonstrate how to maximize throughput while maintaining system responsiveness. The planning engines show how to identify parallelizable operations and optimize execution order for maximum efficiency.


SECURITY AND ETHICAL CONSIDERATIONS


Security considerations permeate all implementations, from input validation and sanitization to secure resource access in MCP servers. The principle of least privilege guides access control mechanisms, ensuring that AI systems can only access resources necessary for their intended functions.


The MCP implementation particularly emphasizes security through standardized authentication, controlled resource access, and comprehensive error handling. These patterns provide templates for building secure AI-tool integrations that protect both user data and system resources.


Ethical considerations around AI autonomy become increasingly important as systems become more sophisticated. The agentic AI implementation includes monitoring and intervention mechanisms that allow human oversight of autonomous operations. These safeguards ensure that autonomous systems remain aligned with human intentions and values.


SCALABILITY AND PRODUCTION READINESS


All implementations include considerations for production deployment and scalability. The modular architectures enable horizontal scaling through distributed deployments. The multi-agent systems naturally support distributed execution across multiple machines or containers.


Monitoring and observability features, including performance metrics, execution statistics, and learning insights, provide the visibility necessary for production operations. These capabilities enable proactive maintenance, performance optimization, and system improvement over time.


The configuration management and deployment patterns demonstrated throughout the guide provide foundations for robust production deployments. Environment-specific configurations, resource management, and graceful shutdown procedures ensure reliable operation in production environments.


FUTURE DIRECTIONS AND EXTENSIBILITY


The implementations provide extensible foundations for future enhancements and research directions. The modular architectures enable easy integration of new models, tools, and capabilities as they become available. The standardized interfaces facilitate interoperability between different system components and external services.


The learning mechanisms embedded in the agentic AI implementation demonstrate how systems can improve over time through experience. These foundations enable more sophisticated learning algorithms and adaptation strategies as the field advances.


The MCP framework provides a pathway for ecosystem development where different AI systems, tools, and services can interoperate through standardized protocols. This standardization enables the creation of rich AI ecosystems that extend far beyond individual implementations.


PRACTICAL IMPACT AND REAL-WORLD APPLICATIONS


The systems demonstrated in this guide have immediate practical applications across numerous domains. RAG systems enable AI assistants with access to current information and domain-specific knowledge bases. Multi-agent systems provide frameworks for complex workflow automation and collaborative problem-solving.


Agentic AI systems offer the potential for truly autonomous assistants capable of pursuing long-term goals with minimal human intervention. The MCP framework enables secure integration with existing tools and services, making AI systems more practical and useful in real-world environments.


The educational value of these implementations extends beyond their immediate functionality. The step-by-step approach, comprehensive explanations, and production-ready code provide templates and patterns that developers can adapt for their specific needs and requirements.


FINAL REFLECTIONS


This comprehensive exploration of LLM-based system implementation demonstrates the rapid evolution and immense potential of modern AI technologies. From basic conversational interfaces to autonomous agents and standardized protocols, we have witnessed how thoughtful architecture and implementation can create powerful, practical AI systems.


The journey from simple chatbots to sophisticated agentic AI illustrates the importance of incremental development and solid foundations. Each chapter built upon previous concepts while introducing new capabilities, demonstrating how complex systems emerge from well-designed components working in harmony.


The emphasis on production-ready code, comprehensive error handling, and real-world considerations ensures that these implementations serve not just as educational examples but as practical foundations for building robust AI systems. The clean architecture principles, extensive documentation, and modular design enable developers to understand, modify, and extend these systems for their specific needs.


As the field of AI continues to evolve rapidly, the patterns, principles, and implementations presented in this guide provide a solid foundation for future development. The modular architectures, standardized interfaces, and extensible designs ensure that these systems can adapt and grow with advancing technology.


The future of AI lies not in isolated models but in collaborative, autonomous systems that can work together to solve complex problems while maintaining human oversight and alignment. The implementations in this guide provide stepping stones toward that future, demonstrating how thoughtful engineering can harness the power of large language models to create truly useful and practical AI systems.


Whether you are building simple chatbots or complex autonomous agents, the principles, patterns, and implementations presented here provide the foundation for creating robust, scalable, and maintainable AI systems that can make a real difference in the world. The journey from basic LLM integration to sophisticated agentic AI represents not just technological advancement but a pathway toward more capable, useful, and trustworthy artificial intelligence.