Wednesday, January 21, 2026

Building an Intelligent Agentic AI System: Your Personalized Knowledge Navigator and Assistant


 

In today's fast-paced environment, staying informed and efficient is paramount. This article outlines the architecture and implementation of a powerful Agentic AI system designed to be your personal knowledge navigator and assistant. This system will not only answer your specific questions by leveraging a continuously updated knowledge base but also proactively expand its understanding of critical subjects relevant to your work.


Imagine an AI assistant that understands the nuances of your projects, keeps itself updated on the latest industry trends, and helps you sift through vast amounts of information with ease. That's precisely what we're building here. This Agentic AI system is engineered to handle user prompts on predefined subjects, drawing insights from a rich, dynamically updated knowledge base. It employs sophisticated techniques like Retrieval-Augmented Generation (RAG) with semantic chunking and advanced reranking to deliver accurate and highly relevant information. Furthermore, it intelligently manages its knowledge, automatically seeking out new information from the web and seamlessly integrating documents you provide.


Let's dive into the fascinating world of Agentic AI and explore how to construct such a robust and intelligent system.


1. Core Components of Our Agentic AI System


Our Agentic AI system is a symphony of interconnected modules, each playing a crucial role in its overall intelligence and functionality. Understanding these components is key to appreciating the system's capabilities.


  • Configuration Management: This foundational component defines the subjects the agent specializes in and specifies which Large Language Model (LLM), be it local or remote, the system should utilize for its reasoning and generation tasks. It acts as the central control panel for the agent's operational parameters.
  • Document Ingestion and Processing Pipeline: This vital pipeline is responsible for taking raw information from various sources, such as PDF files, HTML pages, and Markdown documents, and transforming it into a format suitable for retrieval. It involves crucial steps like parsing, semantic chunking, and deduplication to ensure data quality and efficiency.
  • Vector Database and Embeddings: At the heart of the system's knowledge storage lies the vector database. This component stores numerical representations, known as embeddings, of the processed document chunks. These embeddings allow for rapid and semantically meaningful similarity searches, enabling the system to find the most relevant pieces of information for any given query.
  • Retrieval-Augmented Generation (RAG) with Reranking: This is the brain of our question-answering mechanism. When a user asks a question, the RAG system first retrieves relevant information from the vector database. Crucially, it then employs sophisticated reranking strategies to refine these retrieved results, ensuring that only the most pertinent and accurate information is passed to the LLM for generating a comprehensive and contextually rich answer.
  • Agent Orchestration: This module acts as the conductor of our AI orchestra. It manages the flow of information, directs user prompts to the appropriate tools (like the RAG system or web search), and integrates the responses from the LLM to formulate a coherent and helpful answer for the user. It embodies the "agentic" nature by making decisions on how to best fulfill a user's request.
  • Knowledge Base Update Mechanisms: To remain effective, our agent must continuously learn and adapt. This component encompasses two key mechanisms for updating its knowledge.
  • Web Search for Relevant News and Papers: The agent can autonomously search the internet for the latest information on its configured subjects, ensuring its knowledge is always current.
  • User-Added Documents: Employees can easily contribute their own documents by simply placing them in a designated local folder, and the system will automatically process and integrate them.
  • Scheduling for Automated Updates: Beyond user-initiated updates, the system incorporates a scheduler that periodically triggers the knowledge base update process. This ensures that the agent's understanding of its subjects remains fresh and comprehensive, even without explicit user intervention.


2. Deep Dive into Each Component: Implementation Details

Let's explore each component in detail, including practical examples and code snippets to illustrate their functionality. We will use "Sustainable Manufacturing Practices" as our running example subject throughout this section.

2.1. Configuration Management


The configuration file is the central hub for defining the agent's behavior. It specifies the subjects the agent will focus on, the LLM it should use, and various paths for data storage. This separation of configuration from code allows for easy modification and scalability.


We will use a simple Python file, `config.py`, to store these settings.


Example: `config.py`


# config.py


import os


# Define the base directory for all data

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

DATA_DIR = os.path.join(BASE_DIR, "data")

DOCUMENTS_DIR = os.path.join(DATA_DIR, "documents")

VECTOR_DB_DIR = os.path.join(DATA_DIR, "vector_db")

PROCESSED_DOCS_METADATA_PATH = os.path.join(DATA_DIR, "processed_docs_metadata.json")


# Ensure directories exist

os.makedirs(DOCUMENTS_DIR, exist_ok=True)

os.makedirs(VECTOR_DB_DIR, exist_ok=True)


# Define the subjects the Agentic AI should be knowledgeable about.

# These subjects will guide web searches and contextual understanding.

AGENT_SUBJECTS = [

    "Sustainable Manufacturing Practices",

    "Industry 4.0 Technologies in Production",

    "Circular Economy in Engineering",

    "Energy Efficiency in Industrial Processes"

]


# Large Language Model (LLM) Configuration

# You can specify a local LLM (e.g., via Ollama) or a remote one (e.g., OpenAI).

# For local LLMs, ensure your Ollama server is running.

LLM_CONFIG = {

    "provider": "ollama",  # Options: "ollama", "openai", "huggingface_local"

    "model_name": "llama3", # e.g., "llama3", "gpt-3.5-turbo", "mistral"

    "api_base": "http://localhost:11434", # Only for "ollama" or self-hosted APIs

    "api_key": "YOUR_OPENAI_API_KEY" # Only for "openai"

}


# Embedding Model Configuration

# Used to convert text into numerical vectors for the vector database.

EMBEDDING_MODEL_CONFIG = {

    "provider": "huggingface_local", # Options: "huggingface_local", "openai"

    "model_name": "sentence-transformers/all-MiniLM-L6-v2", # Local model path or name

    "api_key": "YOUR_OPENAI_API_KEY" # Only for "openai"

}


# Reranker Model Configuration

# Used to re-rank retrieved documents for higher relevance.

RERANKER_MODEL_CONFIG = {

    "provider": "huggingface_local", # Options: "huggingface_local", "cohere"

    "model_name": "cross-encoder/ms-marco-MiniLM-L-6-v2", # Local cross-encoder model

    "api_key": "YOUR_COHERE_API_KEY" # Only for "cohere"

}


# Web Scraping Configuration

WEB_SCRAPE_CONFIG = {

    "search_engine_api_key": "YOUR_SERPER_API_KEY", # e.g., Serper, SerpApi, or custom

    "search_engine_base_url": "https://google.serper.dev/search",

    "max_search_results": 5,

    "scrape_timeout": 10 # seconds

}


# Knowledge Base Update Schedule

# Interval in hours for automated web-based knowledge updates.

AUTO_UPDATE_INTERVAL_HOURS = 24


# File Watcher Configuration

# Directory where users can place documents for automatic processing.

USER_UPLOAD_FOLDER = os.path.join(DATA_DIR, "user_uploads")

os.makedirs(USER_UPLOAD_FOLDER, exist_ok=True)


In this configuration, we establish paths for storing documents and the vector database, define the core subjects of interest, and specify the details for the LLM, embedding model, and reranker. We also include settings for web scraping and the automated update schedule.


2.2. Document Ingestion and Processing Pipeline


This pipeline is crucial for transforming raw documents into a usable format for the RAG system. It involves loading various file types, semantically chunking their content, and generating embeddings.


2.2.1. Document Loaders (PDF, HTML, Markdown)


The system needs to handle diverse document formats. We'll create specialized loaders for each type. For robustness, we'll use libraries like `pypdf` for PDFs, `BeautifulSoup` for HTML, and `markdown` for Markdown files.


Example: `document_loaders.py` (snippet for PDF loading)


# document_loaders.py (snippet)

import os

from pypdf import PdfReader


class PDFLoader:

    """

    A class responsible for loading text content from PDF files.

    """

    def load(self, file_path: str) -> str:

        """

        Loads and extracts all text content from a specified PDF file.


        Args:

            file_path (str): The full path to the PDF file.


        Returns:

            str: The concatenated text content from all pages of the PDF.

                 Returns an empty string if the file cannot be read or

                 contains no text.

        """

        if not os.path.exists(file_path):

            print(f"Warning: PDF file not found at {file_path}")

            return ""

        try:

            reader = PdfReader(file_path)

            text_content = ""

            for page in reader.pages:

                text_content += page.extract_text() + "\n"

            return text_content

        except Exception as e:

            print(f"Error loading PDF {file_path}: {e}")

            return ""


# Similar loaders would be implemented for HTML and Markdown.

# For HTML, you'd use BeautifulSoup to parse and extract visible text.

# For Markdown, you'd use the markdown library to convert to plain text.



2.2.2. Semantic Chunking


Traditional text splitting often breaks documents at arbitrary character counts, potentially separating related ideas. Semantic chunking aims to keep semantically related sentences or paragraphs together, even if it means varying chunk sizes. This significantly improves the quality of retrieval. A common approach involves splitting into smaller units (like sentences), embedding them, and then grouping adjacent units whose embeddings are sufficiently similar. For simplicity in this example, we will use a `RecursiveCharacterTextSplitter` from LangChain which is a good starting point for maintaining context, and mention how to enhance it with semantic similarity.


Example: `document_processor.py` (snippet for chunking)


# document_processor.py (snippet)

from typing import List, Dict, Any

from langchain.text_splitter import RecursiveCharacterTextSplitter


class DocumentProcessor:

    """

    Handles the chunking and embedding generation for document content.

    """

    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):

        """

        Initializes the DocumentProcessor with chunking parameters.


        Args:

            chunk_size (int): The maximum number of characters in a chunk.

            chunk_overlap (int): The number of characters to overlap between

                                 consecutive chunks to maintain context.

        """

        self.text_splitter = RecursiveCharacterTextSplitter(

            chunk_size=chunk_size,

            chunk_overlap=chunk_overlap,

            length_function=len,

            is_separator_regex=False,

        )


    def chunk_document(self, doc_content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:

        """

        Splits a document's content into smaller, semantically coherent chunks.


        Args:

            doc_content (str): The full text content of the document.

            metadata (Dict[str, Any]): Metadata associated with the document

                                       (e.g., source, title, file_path).


        Returns:

            List[Dict[str, Any]]: A list of dictionaries, where each dictionary

                                  represents a chunk with its content and

                                  associated metadata.

        """

        # LangChain's text splitter returns Document objects, we convert them

        # to our desired dictionary format.

        chunks = self.text_splitter.create_documents([doc_content])

        processed_chunks = []

        for i, chunk in enumerate(chunks):

            chunk_metadata = metadata.copy()

            chunk_metadata["chunk_id"] = f"{metadata.get('doc_id', 'unknown')}_chunk_{i}"

            chunk_metadata["text"] = chunk.page_content

            processed_chunks.append(chunk_metadata)

        return processed_chunks


# For true semantic chunking, one would typically:

# 1. Split text into smaller units (e.g., sentences).

# 2. Generate embeddings for each unit.

# 3. Group adjacent units whose embeddings are sufficiently similar,

#    or use a sliding window approach with embedding similarity checks.

# Libraries like LlamaIndex offer more advanced semantic chunking strategies

# that could be integrated here for even better performance.



2.2.3. Deduplication Strategies


Deduplication is essential to prevent redundant information from cluttering the vector database and skewing retrieval results. We can apply deduplication at two levels:

  1. Document Level: Prevent processing the same document multiple times. This can be done by storing a hash of the document's content or its unique file path and modification timestamp.
  2. Chunk Level: Identify and remove identical or near-identical chunks, which can arise from overlapping splits or highly repetitive content. Hashing chunk content or comparing chunk embeddings can achieve this.


Example: `document_processor.py` (snippet for deduplication logic)


# document_processor.py (snippet)

import hashlib

import json

from config import PROCESSED_DOCS_METADATA_PATH


class DocumentProcessor:

    # ... (previous __init__ and chunk_document methods)


    def _generate_content_hash(self, content: str) -> str:

        """Generates an SHA256 hash for the given string content."""

        return hashlib.sha256(content.encode('utf-8')).hexdigest()


    def _load_processed_metadata(self) -> Dict[str, str]:

        """Loads metadata of already processed documents."""

        if os.path.exists(PROCESSED_DOCS_METADATA_PATH):

            with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:

                return json.load(f)

        return {}


    def _save_processed_metadata(self, metadata: Dict[str, str]):

        """Saves metadata of processed documents."""

        with open(PROCESSED_DOCS_METADATA_PATH, 'w', encoding='utf-8') as f:

            json.dump(metadata, f, indent=4)


    def is_document_new_or_modified(self, file_path: str, current_content_hash: str) -> bool:

        """

        Checks if a document is new or has been modified since last processing.


        Args:

            file_path (str): The path to the document.

            current_content_hash (str): The SHA256 hash of the document's

                                        current content.


        Returns:

            bool: True if the document is new or its content hash has changed,

                  False otherwise.

        """

        processed_metadata = self._load_processed_metadata()

        stored_hash = processed_metadata.get(file_path)

        return stored_hash != current_content_hash


    def mark_document_as_processed(self, file_path: str, content_hash: str):

        """

        Marks a document as processed by storing its content hash.

        """

        processed_metadata = self._load_processed_metadata()

        processed_metadata[file_path] = content_hash

        self._save_processed_metadata(processed_metadata)


    def deduplicate_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:

        """

        Removes duplicate chunks based on their text content.


        Args:

            chunks (List[Dict[str, Any]]): A list of chunk dictionaries.


        Returns:

            List[Dict[str, Any]]: A list of unique chunk dictionaries.

        """

        seen_hashes = set()

        unique_chunks = []

        for chunk in chunks:

            chunk_hash = self._generate_content_hash(chunk["text"])

            if chunk_hash not in seen_hashes:

                unique_chunks.append(chunk)

                seen_hashes.add(chunk_hash)

        return unique_chunks



2.2.4. Embedding Generation


Embeddings are numerical vector representations of text that capture semantic meaning. Texts with similar meanings will have embeddings that are close to each other in the vector space. We will use a pre-trained sentence transformer model for this purpose.


Example: `embedding_generator.py`


# embedding_generator.py

from typing import List, Dict

from sentence_transformers import SentenceTransformer

import numpy as np

from config import EMBEDDING_MODEL_CONFIG


class EmbeddingGenerator:

    """

    Generates embeddings for text chunks using a specified sentence transformer model.

    """

    def __init__(self):

        """

        Initializes the EmbeddingGenerator by loading the specified model.

        """

        model_name = EMBEDDING_MODEL_CONFIG["model_name"]

        provider = EMBEDDING_MODEL_CONFIG["provider"]


        if provider == "huggingface_local":

            print(f"Loading local embedding model: {model_name}")

            self.model = SentenceTransformer(model_name)

        elif provider == "openai":

            # Placeholder for OpenAI embeddings. Requires openai client.

            # from openai import OpenAI

            # self.client = OpenAI(api_key=EMBEDDING_MODEL_CONFIG["api_key"])

            # self.model_name = model_name

            raise NotImplementedError("OpenAI embedding integration not shown in snippet.")

        else:

            raise ValueError(f"Unsupported embedding provider: {provider}")


    def generate_embeddings(self, texts: List[str]) -> List[List[float]]:

        """

        Generates embeddings for a list of text strings.


        Args:

            texts (List[str]): A list of text strings (e.g., document chunks).


        Returns:

            List[List[float]]: A list of embedding vectors, where each vector

                               is a list of floats.

        """

        if not texts:

            return []


        if EMBEDDING_MODEL_CONFIG["provider"] == "huggingface_local":

            embeddings = self.model.encode(texts, convert_to_numpy=True).tolist()

        elif EMBEDDING_MODEL_CONFIG["provider"] == "openai":

            # response = self.client.embeddings.create(input=texts, model=self.model_name)

            # embeddings = [d.embedding for d in response.data]

            raise NotImplementedError("OpenAI embedding integration not shown in snippet.")

        else:

            embeddings = [] # Should not reach here due to ValueError in __init__


        return embeddings



2.3. Vector Database and Document Storage


The vector database is where our processed chunks and their embeddings reside, enabling efficient similarity searches. We'll use ChromaDB for its ease of setup and local persistence, which aligns with our requirement for local storage. All raw documents will also be stored in a designated local folder.


Example: `vector_db.py`


# vector_db.py

from typing import List, Dict, Any

from chromadb import PersistentClient

from chromadb.utils import embedding_functions

import os

import shutil


from config import VECTOR_DB_DIR, EMBEDDING_MODEL_CONFIG


class VectorDatabase:

    """

    Manages the interaction with the ChromaDB vector database.

    Stores and retrieves document chunks and their embeddings.

    """

    def __init__(self, collection_name: str = "agentic_ai_knowledge"):

        """

        Initializes the VectorDatabase client and collection.


        Args:

            collection_name (str): The name of the collection to use in ChromaDB.

        """

        self.client = PersistentClient(path=VECTOR_DB_DIR)

        self.collection_name = collection_name


        # ChromaDB requires an embedding function. We'll use a wrapper

        # around our SentenceTransformer model.

        # Note: For production, consider using Chroma's built-in SentenceTransformer

        # embedding function or a custom one that matches your EmbeddingGenerator.

        # For simplicity, we'll assume embeddings are provided externally.

        self.collection = self.client.get_or_create_collection(

            name=self.collection_name,

            # If you want ChromaDB to handle embeddings directly:

            # embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(

            #     model_name=EMBEDDING_MODEL_CONFIG["model_name"]

            # )

        )

        print(f"Initialized ChromaDB at {VECTOR_DB_DIR} with collection '{collection_name}'")


    def add_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[float]]):

        """

        Adds a list of document chunks and their embeddings to the vector database.


        Args:

            chunks (List[Dict[str, Any]]): A list of chunk dictionaries, each

                                           containing 'text' and 'metadata'.

            embeddings (List[List[float]]): A list of embedding vectors,

                                            corresponding to the chunks.

        """

        if not chunks or not embeddings:

            print("No chunks or embeddings to add.")

            return


        ids = [chunk["chunk_id"] for chunk in chunks]

        documents = [chunk["text"] for chunk in chunks]

        metadatas = [{k: v for k, v in chunk.items() if k not in ["text", "chunk_id"]} for chunk in chunks]


        # ChromaDB's add method requires IDs, embeddings, metadatas, and documents.

        # We ensure no duplicate IDs are added.

        existing_ids = set(self.collection.get(ids=ids, include=[])["ids"])

        new_ids, new_documents, new_metadatas, new_embeddings = [], [], [], []


        for i, chunk_id in enumerate(ids):

            if chunk_id not in existing_ids:

                new_ids.append(chunk_id)

                new_documents.append(documents[i])

                new_metadatas.append(metadatas[i])

                new_embeddings.append(embeddings[i])

            else:

                print(f"Skipping existing chunk ID: {chunk_id}")


        if new_ids:

            self.collection.add(

                ids=new_ids,

                embeddings=new_embeddings,

                metadatas=new_metadatas,

                documents=new_documents

            )

            print(f"Added {len(new_ids)} new chunks to ChromaDB collection '{self.collection_name}'.")

        else:

            print("No new unique chunks to add after checking for existing IDs.")


    def query_chunks(self, query_embedding: List[float], n_results: int = 10) -> List[Dict[str, Any]]:

        """

        Queries the vector database for the most similar chunks.


        Args:

            query_embedding (List[float]): The embedding of the user's query.

            n_results (int): The number of top similar chunks to retrieve.


        Returns:

            List[Dict[str, Any]]: A list of dictionaries, each representing a

                                  retrieved chunk with its content and metadata.

        """

        results = self.collection.query(

            query_embeddings=[query_embedding],

            n_results=n_results,

            include=['documents', 'metadatas', 'distances']

        )


        retrieved_chunks = []

        if results and results['documents']:

            for i in range(len(results['documents'][0])):

                chunk = {

                    "text": results['documents'][0][i],

                    "metadata": results['metadatas'][0][i],

                    "distance": results['distances'][0][i]

                }

                retrieved_chunks.append(chunk)

        return retrieved_chunks


    def reset_database(self):

        """

        Completely resets (deletes and recreates) the vector database.

        Use with caution!

        """

        print(f"WARNING: Resetting ChromaDB at {VECTOR_DB_DIR}...")

        try:

            self.client.delete_collection(name=self.collection_name)

            self.collection = self.client.create_collection(name=self.collection_name)

            print("ChromaDB collection reset successfully.")

        except Exception as e:

            print(f"Error resetting ChromaDB collection: {e}")

            # If collection deletion fails, try recreating client and collection

            self.client = PersistentClient(path=VECTOR_DB_DIR)

            self.collection = self.client.get_or_create_collection(name=self.collection_name)


    def get_collection_count(self) -> int:

        """Returns the number of items in the collection."""

        return self.collection.count()




2.4. Retrieval-Augmented Generation (RAG) with Reranking



The RAG system is the core mechanism for answering user queries. It combines the power of information retrieval with the generative capabilities of an LLM.


2.4.1. Initial Retrieval (Similarity Search)


When a user submits a query, the first step is to convert that query into an embedding using the same model used for document chunks. This query embedding is then used to perform a similarity search in the vector database, retrieving the top `N` most similar chunks.


Example: `retriever.py` (snippet for initial retrieval)


# retriever.py (snippet)

from typing import List, Dict, Any

from embedding_generator import EmbeddingGenerator

from vector_db import VectorDatabase


class Retriever:

    """

    Handles the initial retrieval of relevant document chunks from the

    vector database based on a user query.

    """

    def __init__(self, vector_db: VectorDatabase, embedding_generator: EmbeddingGenerator):

        """

        Initializes the Retriever with a VectorDatabase instance and an

        EmbeddingGenerator.

        """

        self.vector_db = vector_db

        self.embedding_generator = embedding_generator


    def retrieve_chunks(self, query: str, k: int = 10) -> List[Dict[str, Any]]:

        """

        Retrieves the top-k most relevant document chunks for a given query.


        Args:

            query (str): The user's query string.

            k (int): The number of chunks to retrieve initially.


        Returns:

            List[Dict[str, Any]]: A list of dictionaries, each representing a

                                  retrieved chunk with its content and metadata.

        """

        # Generate embedding for the query

        query_embedding = self.embedding_generator.generate_embeddings([query])[0]


        # Query the vector database

        retrieved_chunks = self.vector_db.query_chunks(query_embedding, n_results=k)

        print(f"Initial retrieval found {len(retrieved_chunks)} chunks.")

        return retrieved_chunks



2.4.2. Sophisticated Reranking



Initial retrieval often returns chunks that are semantically similar but might not be directly relevant to the specific intent of the query. Sophisticated reranking, typically using a cross-encoder model, re-evaluates these initially retrieved chunks based on their relevance to the query. Cross-encoders take both the query and a document chunk as input and output a relevance score, providing a more granular and accurate assessment than pure semantic similarity.


Example: `reranker.py`


# reranker.py

from typing import List, Dict, Any

from transformers import AutoTokenizer, AutoModelForSequenceClassification

import torch

from config import RERANKER_MODEL_CONFIG


class Reranker:

    """

    Reranks a list of retrieved document chunks based on their relevance to a query

    using a cross-encoder model.

    """

    def __init__(self):

        """

        Initializes the Reranker by loading the specified cross-encoder model

        and tokenizer.

        """

        model_name = RERANKER_MODEL_CONFIG["model_name"]

        provider = RERANKER_MODEL_CONFIG["provider"]


        if provider == "huggingface_local":

            print(f"Loading local reranker model: {model_name}")

            self.tokenizer = AutoTokenizer.from_pretrained(model_name)

            self.model = AutoModelForSequenceClassification.from_pretrained(model_name)

            self.device = "cuda" if torch.cuda.is_available() else "cpu"

            self.model.to(self.device)

            self.model.eval() # Set model to evaluation mode

        elif provider == "cohere":

            # Placeholder for Cohere reranking. Requires cohere client.

            # import cohere

            # self.client = cohere.Client(RERANKER_MODEL_CONFIG["api_key"])

            raise NotImplementedError("Cohere reranking integration not shown in snippet.")

        else:

            raise ValueError(f"Unsupported reranker provider: {provider}")


    def rerank_chunks(self, query: str, chunks: List[Dict[str, Any]], top_n: int = 5) -> List[Dict[str, Any]]:

        """

        Reranks a list of document chunks based on their relevance to the query.


        Args:

            query (str): The user's query string.

            chunks (List[Dict[str, Any]]): A list of initially retrieved chunk

                                           dictionaries. Each dict must have a 'text' key.

            top_n (int): The number of top-ranked chunks to return.


        Returns:

            List[Dict[str, Any]]: A list of reranked chunk dictionaries, sorted

                                  by relevance score in descending order.

        """

        if not chunks:

            return []


        if RERANKER_MODEL_CONFIG["provider"] == "huggingface_local":

            sentences = [(query, chunk["text"]) for chunk in chunks]

            with torch.no_grad():

                inputs = self.tokenizer(sentences, padding=True, truncation=True,

                                        return_tensors="pt").to(self.device)

                scores = self.model(**inputs).logits.squeeze().cpu().numpy()


            # Pair chunks with their scores

            scored_chunks = []

            for i, chunk in enumerate(chunks):

                chunk_copy = chunk.copy()

                chunk_copy["relevance_score"] = float(scores[i]) # Convert numpy float to Python float

                scored_chunks.append(chunk_copy)


            # Sort by relevance score in descending order

            reranked_chunks = sorted(scored_chunks, key=lambda x: x["relevance_score"], reverse=True)

            print(f"Reranked {len(chunks)} chunks, returning top {min(top_n, len(reranked_chunks))}.")

            return reranked_chunks[:top_n]


        elif RERANKER_MODEL_CONFIG["provider"] == "cohere":

            # response = self.client.rerank(

            #     query=query,

            #     documents=[chunk["text"] for chunk in chunks],

            #     top_n=top_n,

            #     model=RERANKER_MODEL_CONFIG["model_name"]

            # )

            # reranked_results = []

            # for rank in response.results:

            #     original_chunk = chunks[rank.index]

            #     original_chunk["relevance_score"] = rank.relevance_score

            #     reranked_results.append(original_chunk)

            # return reranked_results

            raise NotImplementedError("Cohere reranking integration not shown in snippet.")

        else:

            return [] # Should not reach here



2.5. Agent Orchestration



The agent orchestration layer is where the intelligence truly comes alive. It interprets the user's intent, decides which tools to use (e.g., RAG, web search), and constructs a coherent response using the LLM. For this, we'll use a simple agentic loop, demonstrating how the LLM can be prompted to act as a coordinator.


Example: `agent.py` (snippet for LLM interaction)


# agent.py (snippet)

from typing import List, Dict, Any

from langchain_community.llms import Ollama

from langchain_openai import OpenAI

from config import LLM_CONFIG


class AgentLLM:

    """

    Manages the interaction with the Large Language Model (LLM), handling

    different providers based on configuration.

    """

    def __init__(self):

        """

        Initializes the LLM client based on the configuration.

        """

        provider = LLM_CONFIG["provider"]

        model_name = LLM_CONFIG["model_name"]


        if provider == "ollama":

            print(f"Initializing Ollama LLM with model: {model_name} at {LLM_CONFIG['api_base']}")

            self.llm = Ollama(model=model_name, base_url=LLM_CONFIG["api_base"])

        elif provider == "openai":

            print(f"Initializing OpenAI LLM with model: {model_name}")

            self.llm = OpenAI(model_name=model_name, openai_api_key=LLM_CONFIG["api_key"])

        elif provider == "huggingface_local":

            # For local HuggingFace models, you might use transformers pipeline

            # from langchain_community.llms import HuggingFacePipeline

            # from transformers import pipeline

            # self.llm = HuggingFacePipeline(pipeline=pipeline("text-generation", model=model_name))

            raise NotImplementedError("HuggingFace local LLM integration not shown in snippet.")

        else:

            raise ValueError(f"Unsupported LLM provider: {provider}")


    def generate_response(self, prompt: str) -> str:

        """

        Generates a text response from the LLM based on the given prompt.


        Args:

            prompt (str): The input prompt for the LLM.


        Returns:

            str: The LLM's generated response.

        """

        try:

            response = self.llm.invoke(prompt)

            return response

        except Exception as e:

            print(f"Error generating LLM response: {e}")

            return "I apologize, but I encountered an error while trying to generate a response."


# The full agent logic would combine this LLM with the RAG system.

# It would involve constructing a prompt that includes the user's query

# and the retrieved, reranked document chunks.

# A typical prompt structure might look like:

# "You are an expert on [Subject]. Answer the following question based

 on the provided context. If the context does not contain the answer,

 state that you don't have enough information.

 Question: [User Query]

 Context: [Concatenated Reranked Chunks]"



2.6. Knowledge Base Update Mechanisms


Maintaining a current and comprehensive knowledge base is paramount. Our system supports two primary methods for updating its knowledge.


2.6.1. Web Search for Relevant News and Papers


When a user requests an update for a specific topic or during an automated refresh, the agent will search the web for new, relevant information. This involves using a search engine API, scraping content from search results, and then processing it through our ingestion pipeline.


Example: `web_scraper.py`


# web_scraper.py

import requests

from bs4 import BeautifulSoup

from typing import List, Dict, Any

import time

from config import WEB_SCRAPE_CONFIG, AGENT_SUBJECTS


class WebScraper:

    """

    Handles web searching and scraping to gather new information.

    """

    def __init__(self):

        """

        Initializes the WebScraper with search engine configuration.

        """

        self.search_api_key = WEB_SCRAPE_CONFIG["search_engine_api_key"]

        self.search_base_url = WEB_SCRAPE_CONFIG["search_engine_base_url"]

        self.max_results = WEB_SCRAPE_CONFIG["max_search_results"]

        self.scrape_timeout = WEB_SCRAPE_CONFIG["scrape_timeout"]


        if not self.search_api_key:

            print("Warning: SERPER_API_KEY is not set. Web search will be limited or fail.")


    def _perform_search(self, query: str) -> List[Dict[str, Any]]:

        """

        Performs a web search using a search engine API (e.g., Serper.dev).


        Args:

            query (str): The search query string.


        Returns:

            List[Dict[str, Any]]: A list of search results, each containing

                                  'title', 'link', and 'snippet'.

        """

        headers = {

            'X-API-KEY': self.search_api_key,

            'Content-Type': 'application/json'

        }

        params = {

            'q': query,

            'num': self.max_results

        }

        try:

            response = requests.get(self.search_base_url, headers=headers, params=params, timeout=self.scrape_timeout)

            response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)

            data = response.json()

            organic_results = data.get('organic', [])

            return [{'title': r.get('title'), 'link': r.get('link'), 'snippet': r.get('snippet')}

                    for r in organic_results if r.get('link')]

        except requests.exceptions.RequestException as e:

            print(f"Error during web search for '{query}': {e}")

            return []


    def scrape_page_content(self, url: str) -> str:

        """

        Scrapes the main text content from a given URL.


        Args:

            url (str): The URL of the page to scrape.


        Returns:

            str: The extracted text content, or an empty string if scraping fails.

        """

        try:

            response = requests.get(url, timeout=self.scrape_timeout)

            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'html.parser')


            # Remove script, style, and other non-content elements

            for script_or_style in soup(["script", "style", "header", "footer", "nav", "form"]):

                script_or_style.decompose()


            # Get text and clean it

            text = soup.get_text()

            lines = (line.strip() for line in text.splitlines())

            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))

            text = '\n'.join(chunk for chunk in chunks if chunk)

            return text

        except requests.exceptions.RequestException as e:

            print(f"Error scraping {url}: {e}")

            return ""

        except Exception as e:

            print(f"General error scraping {url}: {e}")

            return ""


    def search_and_scrape_for_topic(self, topic: str) -> List[Dict[str, Any]]:

        """

        Performs a web search for a topic and scrapes content from the results.


        Args:

            topic (str): The subject topic to search for.


        Returns:

            List[Dict[str, Any]]: A list of dictionaries, each containing

                                  'content', 'source_url', 'title', 'timestamp'.

        """

        print(f"Searching web for new information on '{topic}'...")

        search_query = f"{topic} latest news OR research papers"

        results = self._perform_search(search_query)


        scraped_documents = []

        for i, result in enumerate(results):

            print(f"  Scraping result {i+1}/{len(results)}: {result['link']}")

            content = self.scrape_page_content(result['link'])

            if content:

                scraped_documents.append({

                    "content": content,

                    "source_url": result['link'],

                    "title": result['title'],

                    "timestamp": time.time(), # Record when it was scraped

                    "subject": topic

                })

            time.sleep(1) # Be polite to websites

        print(f"Finished web scraping for '{topic}'. Found {len(scraped_documents)} new documents.")

        return scraped_documents



2.6.2. User-Added Documents


Users can contribute documents by simply dropping them into a designated folder. A file system watcher continuously monitors this folder and triggers the processing pipeline for any *new* documents. This ensures that only newly added files are processed, avoiding redundant work.


Example: `file_watcher.py`


# file_watcher.py

import time

import os

import hashlib

import json

from watchdog.observers import Observer

from watchdog.events import FileSystemEventHandler

from typing import Callable, Dict, Any


from config import USER_UPLOAD_FOLDER, PROCESSED_DOCS_METADATA_PATH


class UserUploadHandler(FileSystemEventHandler):

    """

    Event handler for file system events in the user upload folder.

    Triggers processing for new files.

    """

    def __init__(self, process_new_document_callback: Callable[[str], None]):

        """

        Initializes the handler with a callback function to process new documents.


        Args:

            process_new_document_callback (Callable[[str], None]): A function

                                                                  that takes a

                                                                  file path

                                                                  and processes it.

        """

        self.process_new_document = process_new_document_callback

        self.processed_file_hashes = self._load_processed_hashes()

        print(f"Initialized UserUploadHandler. Monitoring: {USER_UPLOAD_FOLDER}")


    def _load_processed_hashes(self) -> Dict[str, str]:

        """Loads the hashes of already processed files from metadata."""

        if os.path.exists(PROCESSED_DOCS_METADATA_PATH):

            with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:

                # Filter for files that were user-uploaded

                return {path: hash_val for path, hash_val in json.load(f).items()

                        if path.startswith(USER_UPLOAD_FOLDER)}

        return {}


    def _save_processed_hash(self, file_path: str, content_hash: str):

        """Saves the hash of a newly processed file."""

        self.processed_file_hashes[file_path] = content_hash

        # Update the main metadata file

        all_metadata = {}

        if os.path.exists(PROCESSED_DOCS_METADATA_PATH):

            with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:

                all_metadata = json.load(f)

        all_metadata[file_path] = content_hash

        with open(PROCESSED_DOCS_METADATA_PATH, 'w', encoding='utf-8') as f:

            json.dump(all_metadata, f, indent=4)



    def _get_file_hash(self, file_path: str) -> str:

        """Generates a hash for the content of a file."""

        hasher = hashlib.sha256()

        try:

            with open(file_path, 'rb') as f:

                while chunk := f.read(8192):

                    hasher.update(chunk)

            return hasher.hexdigest()

        except Exception as e:

            print(f"Error hashing file {file_path}: {e}")

            return ""


    def on_created(self, event):

        """

        Called when a file or directory is created.

        """

        if not event.is_directory:

            print(f"Detected new file: {event.src_path}")

            self._check_and_process_file(event.src_path)


    def on_modified(self, event):

        """

        Called when a file or directory is modified.

        This can also indicate a new file being fully written.

        """

        if not event.is_directory:

            # Sometimes 'created' isn't fired immediately or reliably for large files.

            # 'modified' catches the completion of file writes.

            print(f"Detected modification to file: {event.src_path}")

            self._check_and_process_file(event.src_path)


    def _check_and_process_file(self, file_path: str):

        """

        Checks if a file is new or modified and processes it if needed.

        """

        # Ensure the file is fully written before processing

        time.sleep(1) # Give the OS a moment to finish writing


        current_hash = self._get_file_hash(file_path)

        if not current_hash:

            print(f"Could not get hash for {file_path}, skipping.")

            return


        if file_path not in self.processed_file_hashes or \

           self.processed_file_hashes.get(file_path) != current_hash:

            print(f"Processing new or modified user-uploaded document: {file_path}")

            self.process_new_document(file_path)

            self._save_processed_hash(file_path, current_hash)

        else:

            print(f"Document {file_path} already processed and unchanged. Skipping.")


class FileWatcher:

    """

    Sets up and starts an observer to watch for file system events.

    """

    def __init__(self, folder_to_watch: str, process_new_document_callback: Callable[[str], None]):

        """

        Initializes the FileWatcher.


        Args:

            folder_to_watch (str): The path to the folder to monitor.

            process_new_document_callback (Callable[[str], None]): Callback for new docs.

        """

        self.folder_to_watch = folder_to_watch

        self.event_handler = UserUploadHandler(process_new_document_callback)

        self.observer = Observer()


    def start(self):

        """Starts the file system observer."""

        print(f"Starting file watcher for folder: {self.folder_to_watch}")

        self.observer.schedule(self.event_handler, self.folder_to_watch, recursive=True)

        self.observer.start()

        try:

            while True:

                time.sleep(1)

        except KeyboardInterrupt:

            self.stop()


    def stop(self):

        """Stops the file system observer."""

        print("Stopping file watcher.")

        self.observer.stop()

        self.observer.join()



2.7. Scheduling for Automated Updates



To keep the knowledge base perpetually fresh, the system includes a scheduler that periodically triggers the web-based knowledge update process for all configured subjects. This ensures that the agent automatically learns about new developments.


Example: `scheduler.py`


# scheduler.py

from apscheduler.schedulers.background import BackgroundScheduler

import time

from typing import Callable


from config import AUTO_UPDATE_INTERVAL_HOURS


class KnowledgeBaseScheduler:

    """

    Schedules automated tasks, such as periodically updating the knowledge base.

    """

    def __init__(self, update_function: Callable[[], None]):

        """

        Initializes the scheduler with the function to call for updates.


        Args:

            update_function (Callable[[], None]): The function to execute

                                                  periodically for updates.

        """

        self.scheduler = BackgroundScheduler()

        self.update_function = update_function

        self.interval_hours = AUTO_UPDATE_INTERVAL_HOURS


    def start(self):

        """

        Starts the scheduler and adds the update job.

        """

        if self.interval_hours > 0:

            self.scheduler.add_job(self.update_function, 'interval', hours=self.interval_hours,

                                   next_run_time=time.time() + 60) # Start 1 min after launch

            print(f"Scheduled automated knowledge base updates every {self.interval_hours} hours.")

            self.scheduler.start()

        else:

            print("Automated updates are disabled (AUTO_UPDATE_INTERVAL_HOURS is 0 or less).")


    def stop(self):

        """

        Stops the scheduler.

        """

        if self.scheduler.running:

            print("Stopping knowledge base scheduler.")

            self.scheduler.shutdown()




3. Putting It All Together: The Agentic Workflow



Now that we've explored the individual components, let's see how they collaborate to form a cohesive Agentic AI system.


3.1. User Prompt Workflow



When a user sends a prompt to the Agentic AI:


  1. Intent Recognition: The agent (orchestration layer) first analyzes the user's prompt to understand its intent. Is it a question? A request for information? An instruction to update the knowledge base?
  2. Query Embedding: If it's a question requiring knowledge retrieval, the user's query is converted into a numerical embedding using the `EmbeddingGenerator`.
  3. Initial Retrieval: The `Retriever` uses this query embedding to perform a similarity search against the `VectorDatabase`, fetching an initial set of potentially relevant document chunks.
  4. Sophisticated Reranking: The `Reranker` then takes these initial chunks and the original query. It employs a cross-encoder model to re-evaluate and score each chunk based on its direct relevance to the query, presenting the most pertinent information at the top.
  5. Contextualized Prompt Generation: The agent constructs a comprehensive prompt for the `AgentLLM`. This prompt includes the original user query, clear instructions for the LLM (e.g., "answer based only on the provided context"), and the text content of the top-ranked chunks.
  6. LLM Response Generation: The `AgentLLM` processes this contextualized prompt and generates a well-informed, coherent, and accurate response, drawing directly from the provided context.
  7. Response to User: The agent delivers the LLM's response back to the user.


3.2. Knowledge Base Update Workflow (User-Initiated and Automated)



The knowledge base is designed to be dynamic and continuously updated.


3.2.1. User-Initiated Web Update (e.g., "Update knowledge on <topic>")


  1. Topic Identification: The agent identifies the specific topic the user wants to update (e.g., "Sustainable Manufacturing Practices").
  2. Web Search and Scraping: The `WebScraper` is invoked with the identified topic. It performs a web search and scrapes content from the most relevant results (HTML, PDFs if linked and handled, etc.).
  3. Document Storage: The raw content of newly scraped documents is saved to the `DOCUMENTS_DIR` local folder.
  4. Document Processing: Each new document's content is passed to the `DocumentProcessor`.  It checks for deduplication (has this exact document been processed before?).  It performs semantic chunking, breaking the document into smaller, meaningful pieces.  It generates embeddings for each chunk using the `EmbeddingGenerator`.
  5. Vector Database Update: The processed chunks and their embeddings are added to the `VectorDatabase`, making them available for future retrieval. Metadata about the source, title, and subject is also stored.
  6. Confirmation: The agent informs the user that the knowledge base for the specified topic has been updated.

3.2.2. Automated Web Update



This workflow is identical to the user-initiated web update but is triggered periodically by the `KnowledgeBaseScheduler` for all subjects defined in `AGENT_SUBJECTS` in the `config.py` file. This ensures proactive knowledge maintenance.


3.2.3. User-Added Documents (Local Folder Monitoring)


  1. File System Monitoring: The `FileWatcher` continuously monitors the `USER_UPLOAD_FOLDER` for any new files or modifications.
  2. New Document Detection: When a new file (e.g., a PDF report, an internal Markdown document) is detected, the `FileWatcher` verifies if it's truly new or modified by comparing its content hash against previously processed documents.
  3. Document Processing: If the document is new or modified, its content is loaded by the appropriate `DocumentLoader` (PDF, HTML, Markdown), then passed to the `DocumentProcessor` for chunking, deduplication, and embedding generation. The raw document is also moved or copied to the `DOCUMENTS_DIR` for central storage.
  4. Vector Database Update: The processed chunks and their embeddings are added to the `VectorDatabase`.
  5. Internal Notification: While not directly visible to the user unless explicitly requested, the system internally logs that new documents have been processed and integrated.



4. Benefits and Future Enhancements

This Agentic AI system offers numerous benefits:


  • Enhanced Productivity: Quickly retrieve precise information, reducing time spent searching through documents and web pages.
  • Always Up-to-Date Knowledge: Automated and user-driven updates ensure the knowledge base remains current with the latest industry trends and internal documentation.
  • Personalized Expertise: By focusing on configured subjects, the agent becomes a specialized expert tailored to specific team or project needs.
  • Improved Decision Making: Access to accurate, context-rich information supports better-informed decisions.
  • Reduced Information Overload: The agent sifts through noise, presenting only the most relevant content.


Future enhancements could include:


  • Advanced AgenticReasoning: Implement more complex planning and tool-use capabilities, allowing the agent to break down complex queries into sub-tasks and utilize multiple tools sequentially.
  • Feedback Loop: Allow users to provide feedback on answers, which can be used to fine-tune retrieval, reranking, or LLM generation.
  • Multi-Modal Support: Extend document processing to include images, videos, or audio transcripts.
  • User Interface: Develop a user-friendly web or desktop interface for easier interaction, document upload, and configuration management.
  • Security and Access Control: Implement robust security measures and access controls for sensitive information, especially when handling internal documents.



5. Conclusion



Building an Agentic AI system as described provides a powerful tool  to navigate and leverage vast amounts of information. By combining robust document processing, intelligent RAG with reranking, and proactive knowledge base management, this system transforms how we access and utilize information, ultimately fostering greater efficiency and innovation. We encourage you to explore these concepts and adapt them to your specific project requirements, empowering your teams with a truly intelligent assistant.




  ADDENDUM: Full Running Example Code



This section provides a complete, runnable example of the Agentic AI system, integrating all the components discussed. To run this, you will need Python 3.9+, `pip`, and potentially `ollama` running locally for the LLM.


  Prerequisites and Setup



1. Install Python Libraries:

    bash


    pip install pypdf beautifulsoup4 markdown-it-py sentence-transformers \

                chromadb langchain-community langchain-openai transformers \

                torch apscheduler watchdog requests

   

 Note: `torch` is a large dependency for `transformers`. If you don't have a

GPU, `pip install torch --index-url https://download.pytorch.org/whl/cpu`

might be faster.


2.  Ollama (for local LLM):

    Download and install Ollama from `ollama.com`.

    Run `ollama run llama3` (or your preferred model) in your terminal to

    download and start the LLM server.


3.  Serper API Key (for web search):

Sign up at `serper.dev` to get a free API key for web search.

Replace `"YOUR_SERPER_API_KEY"` in `config.py` with your actual key.

(Optional: If you don't want web search, you can set `max_search_results`

 to 0 in `config.py` or comment out the web scraper calls.)


4.  Create Project Structure:

Create a folder named `agentic_ai_system`.

Inside it, create empty `data` folder.

Inside `data`, create empty `user_uploads` folder.

    Place all the following Python files (`config.py`, `document_loaders.py`,

    `document_processor.py`, `embedding_generator.py`, `vector_db.py`,

    `retriever.py`, `reranker.py`, `agent.py`, `web_scraper.py`,

    `file_watcher.py`, `scheduler.py`, `main.py`) directly inside the

    `agentic_ai_system` folder.




  `config.py`



# config.py


import os


# Define the base directory for all data

BASE_DIR = os.path.dirname(os.path.abspath(__file__))

DATA_DIR = os.path.join(BASE_DIR, "data")

DOCUMENTS_DIR = os.path.join(DATA_DIR, "documents")

VECTOR_DB_DIR = os.path.join(DATA_DIR, "vector_db")

PROCESSED_DOCS_METADATA_PATH = os.path.join(DATA_DIR, "processed_docs_metadata.json")


# Ensure directories exist

os.makedirs(DOCUMENTS_DIR, exist_ok=True)

os.makedirs(VECTOR_DB_DIR, exist_ok=True)


# Define the subjects the Agentic AI should be knowledgeable about.

# These subjects will guide web searches and contextual understanding.

AGENT_SUBJECTS = [

    "Sustainable Manufacturing Practices",

    "Industry 4.0 Technologies in Production",

    "Circular Economy in Engineering",

    "Energy Efficiency in Industrial Processes"

]


# Large Language Model (LLM) Configuration

# You can specify a local LLM (e.g., via Ollama) or a remote one (e.g., OpenAI).

# For local LLMs, ensure your Ollama server is running.

LLM_CONFIG = {

    "provider": "ollama",  # Options: "ollama", "openai", "huggingface_local"

    "model_name": "llama3", # e.g., "llama3", "gpt-3.5-turbo", "mistral"

    "api_base": "http://localhost:11434", # Only for "ollama" or self-hosted APIs

    "api_key": "YOUR_OPENAI_API_KEY" # Only for "openai"

}


# Embedding Model Configuration

# Used to convert text into numerical vectors for the vector database.

EMBEDDING_MODEL_CONFIG = {

    "provider": "huggingface_local", # Options: "huggingface_local", "openai"

    "model_name": "sentence-transformers/all-MiniLM-L6-v2", # Local model path or name

    "api_key": "YOUR_OPENAI_API_KEY" # Only for "openai"

}


# Reranker Model Configuration

# Used to re-rank retrieved documents for higher relevance.

RERANKER_MODEL_CONFIG = {

    "provider": "huggingface_local", # Options: "huggingface_local", "cohere"

    "model_name": "cross-encoder/ms-marco-MiniLM-L-6-v2", # Local cross-encoder model

    "api_key": "YOUR_COHERE_API_KEY" # Only for "cohere"

}


# Web Scraping Configuration

WEB_SCRAPE_CONFIG = {

    "search_engine_api_key": "YOUR_SERPER_API_KEY", # e.g., Serper, SerpApi, or custom

    "search_engine_base_url": "https://google.serper.dev/search",

    "max_search_results": 3, # Reduced for example to limit API calls

    "scrape_timeout": 10 # seconds

}


# Knowledge Base Update Schedule

# Interval in hours for automated web-based knowledge updates.

# Set to a higher value (e.g., 24) for production, 0 to disable.

AUTO_UPDATE_INTERVAL_HOURS = 0.1 # For testing, every 6 minutes


# File Watcher Configuration

# Directory where users can place documents for automatic processing.

USER_UPLOAD_FOLDER = os.path.join(DATA_DIR, "user_uploads")

os.makedirs(USER_UPLOAD_FOLDER, exist_ok=True)



  `document_loaders.py`


# document_loaders.py

import os

from pypdf import PdfReader

from bs4 import BeautifulSoup

from markdown_it import MarkdownIt


class PDFLoader:

    """

    A class responsible for loading text content from PDF files.

    """

    def load(self, file_path: str) -> str:

        """

        Loads and extracts all text content from a specified PDF file.


        Args:

            file_path (str): The full path to the PDF file.


        Returns:

            str: The concatenated text content from all pages of the PDF.

                 Returns an empty string if the file cannot be read or

                 contains no text.

        """

        if not os.path.exists(file_path):

            print(f"Warning: PDF file not found at {file_path}")

            return ""

        try:

            reader = PdfReader(file_path)

            text_content = ""

            for page in reader.pages:

                text_content += page.extract_text() + "\n"

            return text_content

        except Exception as e:

            print(f"Error loading PDF {file_path}: {e}")

            return ""


class HTMLLoader:

    """

    A class responsible for loading and extracting visible text from HTML files.

    """

    def load(self, file_path: str) -> str:

        """

        Loads and extracts visible text content from a specified HTML file.


        Args:

            file_path (str): The full path to the HTML file.


        Returns:

            str: The extracted plain text content.

                 Returns an empty string if the file cannot be read or parsed.

        """

        if not os.path.exists(file_path):

            print(f"Warning: HTML file not found at {file_path}")

            return ""

        try:

            with open(file_path, 'r', encoding='utf-8') as f:

                html_content = f.read()

            soup = BeautifulSoup(html_content, 'html.parser')


            # Remove script, style, and other non-content elements

            for script_or_style in soup(["script", "style", "header", "footer", "nav", "form"]):

                script_or_style.decompose()


            text = soup.get_text()

            lines = (line.strip() for line in text.splitlines())

            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))

            text = '\n'.join(chunk for chunk in chunks if chunk)

            return text

        except Exception as e:

            print(f"Error loading HTML {file_path}: {e}")

            return ""


class MarkdownLoader:

    """

    A class responsible for loading and converting Markdown files to plain text.

    """

    def load(self, file_path: str) -> str:

        """

        Loads a Markdown file and converts its content to plain text.


        Args:

            file_path (str): The full path to the Markdown file.


        Returns:

            str: The plain text content of the Markdown file.

                 Returns an empty string if the file cannot be read.

        """

        if not os.path.exists(file_path):

            print(f"Warning: Markdown file not found at {file_path}")

            return ""

        try:

            with open(file_path, 'r', encoding='utf-8') as f:

                md_content = f.read()

            # Use markdown-it to convert markdown to HTML, then BeautifulSoup to get text

            md = MarkdownIt()

            html = md.render(md_content)

            soup = BeautifulSoup(html, 'html.parser')

            return soup.get_text()

        except Exception as e:

            print(f"Error loading Markdown {file_path}: {e}")

            return ""


class DocumentLoaderFactory:

    """

    A factory to get the appropriate document loader based on file extension.

    """

    def get_loader(self, file_path: str):

        """

        Returns an instance of the correct loader for the given file type.


        Args:

            file_path (str): The path to the document file.


        Returns:

            Union[PDFLoader, HTMLLoader, MarkdownLoader, None]: An instance

                                                                of the

                                                                appropriate

                                                                loader, or

                                                                None if

                                                                unsupported.

        """

        ext = os.path.splitext(file_path)[1].lower()

        if ext == ".pdf":

            return PDFLoader()

        elif ext in [".html", ".htm"]:

            return HTMLLoader()

        elif ext in [".md", ".markdown"]:

            return MarkdownLoader()

        else:

            print(f"Unsupported file type for loading: {ext}")

            return None



  `document_processor.py`



# document_processor.py

import os

import hashlib

import json

import shutil

from typing import List, Dict, Any

from langchain.text_splitter import RecursiveCharacterTextSplitter


from config import PROCESSED_DOCS_METADATA_PATH, DOCUMENTS_DIR


class DocumentProcessor:

    """

    Handles the chunking and embedding generation for document content,

    along with deduplication logic.

    """

    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):

        """

        Initializes the DocumentProcessor with chunking parameters.


        Args:

            chunk_size (int): The maximum number of characters in a chunk.

            chunk_overlap (int): The number of characters to overlap between

                                 consecutive chunks to maintain context.

        """

        self.text_splitter = RecursiveCharacterTextSplitter(

            chunk_size=chunk_size,

            chunk_overlap=chunk_overlap,

            length_function=len,

            is_separator_regex=False,

        )

        # Load existing metadata on initialization

        self.processed_metadata = self._load_processed_metadata()


    def _generate_content_hash(self, content: str) -> str:

        """Generates an SHA256 hash for the given string content."""

        return hashlib.sha256(content.encode('utf-8')).hexdigest()


    def _load_processed_metadata(self) -> Dict[str, str]:

        """Loads metadata of already processed documents."""

        if os.path.exists(PROCESSED_DOCS_METADATA_PATH):

            with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:

                return json.load(f)

        return {}


    def _save_processed_metadata(self):

        """Saves current state of processed documents metadata."""

        with open(PROCESSED_DOCS_METADATA_PATH, 'w', encoding='utf-8') as f:

            json.dump(self.processed_metadata, f, indent=4)


    def is_document_new_or_modified(self, doc_id: str, current_content_hash: str) -> bool:

        """

        Checks if a document is new or has been modified since last processing.


        Args:

            doc_id (str): A unique identifier for the document (e.g., file_path or URL).

            current_content_hash (str): The SHA256 hash of the document's

                                        current content.


        Returns:

            bool: True if the document is new or its content hash has changed,

                  False otherwise.

        """

        stored_hash = self.processed_metadata.get(doc_id)

        return stored_hash != current_content_hash


    def mark_document_as_processed(self, doc_id: str, content_hash: str):

        """

        Marks a document as processed by storing its content hash.

        """

        self.processed_metadata[doc_id] = content_hash

        self._save_processed_metadata()


    def chunk_document(self, doc_content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:

        """

        Splits a document's content into smaller, semantically coherent chunks.


        Args:

            doc_content (str): The full text content of the document.

            metadata (Dict[str, Any]): Metadata associated with the document

                                       (e.g., source, title, file_path).


        Returns:

            List[Dict[str, Any]]: A list of dictionaries, where each dictionary

                                  represents a chunk with its content and

                                  associated metadata.

        """

        if not doc_content.strip():

            print(f"Warning: Document content for {metadata.get('doc_id', 'unknown')} is empty, skipping chunking.")

            return []


        chunks = self.text_splitter.create_documents([doc_content])

        processed_chunks = []

        for i, chunk in enumerate(chunks):

            chunk_metadata = metadata.copy()

            # Ensure doc_id is present for chunk_id generation

            doc_id = metadata.get('doc_id', f"unknown_doc_{hashlib.sha256(doc_content.encode()).hexdigest()[:8]}")

            chunk_metadata["chunk_id"] = f"{doc_id}_chunk_{i}"

            chunk_metadata["text"] = chunk.page_content

            processed_chunks.append(chunk_metadata)

        return processed_chunks


    def deduplicate_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:

        """

        Removes duplicate chunks based on their text content.


        Args:

            chunks (List[Dict[str, Any]]): A list of chunk dictionaries.


        Returns:

            List[Dict[str, Any]]: A list of unique chunk dictionaries.

        """

        seen_hashes = set()

        unique_chunks = []

        for chunk in chunks:

            chunk_hash = self._generate_content_hash(chunk["text"])

            if chunk_hash not in seen_hashes:

                unique_chunks.append(chunk)

                seen_hashes.add(chunk_hash)

        return unique_chunks


    def process_document_file(self, file_path: str, loader, subject: str = "general") -> List[Dict[str, Any]]:

        """

        Loads, chunks, and prepares a document from a file for embedding and storage.


        Args:

            file_path (str): The path to the document file.

            loader: An instance of a document loader (e.g., PDFLoader).

            subject (str): The subject associated with this document.


        Returns:

            List[Dict[str, Any]]: A list of processed chunk dictionaries.

        """

        print(f"Processing file: {file_path}")

        doc_content = loader.load(file_path)

        if not doc_content:

            print(f"Could not load content from {file_path}, skipping.")

            return []


        content_hash = self._generate_content_hash(doc_content)

        # Use file_path as doc_id for user-uploaded files

        doc_id = file_path


        if not self.is_document_new_or_modified(doc_id, content_hash):

            print(f"Document {file_path} already processed and unchanged. Skipping.")

            return []


        # Store the raw document in the DOCUMENTS_DIR

        destination_path = os.path.join(DOCUMENTS_DIR, os.path.basename(file_path))

        if not os.path.exists(destination_path) or \

           self._generate_content_hash(open(destination_path, 'rb').read().decode('utf-8', errors='ignore')) != content_hash:

            shutil.copy(file_path, destination_path)

            print(f"Copied {file_path} to {destination_path}")

        else:

            print(f"Raw document {file_path} already exists in {DOCUMENTS_DIR}.")



        metadata = {

            "doc_id": doc_id,

            "source": f"local_file:{os.path.basename(file_path)}",

            "file_path": destination_path,

            "title": os.path.basename(file_path),

            "subject": subject,

            "timestamp": os.path.getmtime(file_path) # Last modified time

        }


        chunks = self.chunk_document(doc_content, metadata)

        unique_chunks = self.deduplicate_chunks(chunks)


        self.mark_document_as_processed(doc_id, content_hash)

        print(f"Successfully processed {file_path}. Generated {len(unique_chunks)} unique chunks.")

        return unique_chunks


    def process_web_document(self, content: str, url: str, title: str, subject: str, timestamp: float) -> List[Dict[str, Any]]:

        """

        Chunks and prepares a document from web-scraped content for embedding and storage.


        Args:

            content (str): The text content scraped from the web.

            url (str): The source URL of the content.

            title (str): The title of the web page.

            subject (str): The subject associated with this document.

            timestamp (float): Unix timestamp of when the content was scraped.


        Returns:

            List[Dict[str, Any]]: A list of processed chunk dictionaries.

        """

        if not content.strip():

            print(f"Warning: Web content from {url} is empty, skipping processing.")

            return []


        content_hash = self._generate_content_hash(content)

        doc_id = url # Use URL as doc_id for web documents


        if not self.is_document_new_or_modified(doc_id, content_hash):

            print(f"Web document from {url} already processed and unchanged. Skipping.")

            return []


        # Store raw web content locally (optional, but good for debugging/reprocessing)

        # Create a sanitized filename from the title or URL

        sanitized_title = "".join(c for c in title if c.isalnum() or c in (' ', '.', '_')).rstrip()

        if not sanitized_title:

            sanitized_title = hashlib.sha256(url.encode()).hexdigest()[:10]

        file_name = f"{sanitized_title}.txt"

        destination_path = os.path.join(DOCUMENTS_DIR, file_name)

        with open(destination_path, 'w', encoding='utf-8') as f:

            f.write(content)

        print(f"Stored raw web content from {url} to {destination_path}")


        metadata = {

            "doc_id": doc_id,

            "source": f"web_url:{url}",

            "file_path": destination_path,

            "title": title,

            "subject": subject,

            "timestamp": timestamp

        }


        chunks = self.chunk_document(content, metadata)

        unique_chunks = self.deduplicate_chunks(chunks)


        self.mark_document_as_processed(doc_id, content_hash)

        print(f"Successfully processed web document from {url}. Generated {len(unique_chunks)} unique chunks.")

        return unique_chunks

```


 `embedding_generator.py`



# embedding_generator.py

from typing import List, Dict

from sentence_transformers import SentenceTransformer

import numpy as np

from config import EMBEDDING_MODEL_CONFIG


class EmbeddingGenerator:

    """

    Generates embeddings for text chunks using a specified sentence transformer model.

    """

    def __init__(self):

        """

        Initializes the EmbeddingGenerator by loading the specified model.

        """

        model_name = EMBEDDING_MODEL_CONFIG["model_name"]

        provider = EMBEDDING_MODEL_CONFIG["provider"]


        if provider == "huggingface_local":

            print(f"Loading local embedding model: {model_name}")

            self.model = SentenceTransformer(model_name)

        elif provider == "openai":

            # For OpenAI, you would typically initialize the OpenAI client here.

            # from openai import OpenAI

            # self.client = OpenAI(api_key=EMBEDDING_MODEL_CONFIG["api_key"])

            # self.model_name = model_name

            raise NotImplementedError("OpenAI embedding integration not fully implemented in snippet.")

        else:

            raise ValueError(f"Unsupported embedding provider: {provider}")


    def generate_embeddings(self, texts: List[str]) -> List[List[float]]:

        """

        Generates embeddings for a list of text strings.


        Args:

            texts (List[str]): A list of text strings (e.g., document chunks).


        Returns:

            List[List[float]]: A list of embedding vectors, where each vector

                               is a list of floats.

        """

        if not texts:

            return []


        if EMBEDDING_MODEL_CONFIG["provider"] == "huggingface_local":

            embeddings = self.model.encode(texts, convert_to_numpy=True).tolist()

        elif EMBEDDING_MODEL_CONFIG["provider"] == "openai":

            # response = self.client.embeddings.create(input=texts, model=self.model_name)

            # embeddings = [d.embedding for d in response.data]

            raise NotImplementedError("OpenAI embedding integration not fully implemented in snippet.")

        else:

            embeddings = [] # Should not reach here due to ValueError in __init__


        return embeddings



   vector_db.py



from typing import List, Dict, Any

from chromadb import PersistentClient

import os

import shutil


from config import VECTOR_DB_DIR


class VectorDatabase:

    """

    Manages the interaction with the ChromaDB vector database.

    Stores and retrieves document chunks and their embeddings.

    """

    def __init__(self, collection_name: str = "agentic_ai_knowledge"):

        """

        Initializes the VectorDatabase client and collection.


        Args:

            collection_name (str): The name of the collection to use in ChromaDB.

        """

        self.client = PersistentClient(path=VECTOR_DB_DIR)

        self.collection_name = collection_name


        # For this example, we assume embeddings are provided externally.

        # ChromaDB will store them as-is.

        self.collection = self.client.get_or_create_collection(

            name=self.collection_name

        )

        print(f"Initialized ChromaDB at {VECTOR_DB_DIR} with collection '{collection_name}'")

        print(f"Current items in collection: {self.collection.count()}")


    def add_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[float]]):

        """

        Adds a list of document chunks and their embeddings to the vector database.


        Args:

            chunks (List[Dict[str, Any]]): A list of chunk dictionaries, each

                                           containing 'text' and 'metadata'.

            embeddings (List[List[float]]): A list of embedding vectors,

                                            corresponding to the chunks.

        """

        if not chunks or not embeddings:

            print("No chunks or embeddings to add.")

            return


        ids = [chunk["chunk_id"] for chunk in chunks]

        documents = [chunk["text"] for chunk in chunks]

        

        # Filter metadata to ensure it's JSON serializable

        metadatas = []

        for chunk in chunks:

            filtered_metadata = {k: v for k, v in chunk.items() if k not in ["text", "chunk_id"]}

            # Convert non-serializable types if necessary, e.g., float timestamps to int/str

            for k, v in filtered_metadata.items():

                if isinstance(v, float):

                    filtered_metadata[k] = str(v) # Convert float timestamp to string

            metadatas.append(filtered_metadata)


        # ChromaDB's add method requires IDs, embeddings, metadatas, and documents.

        # We ensure no duplicate IDs are added by checking existing IDs.

        # Note: A more robust approach for updates might involve `upsert` if available

        # or explicit deletion before adding. For simplicity, we skip existing IDs.

        existing_ids_in_db = set(self.collection.get(ids=ids, include=[])["ids"])

        

        new_ids, new_documents, new_metadatas, new_embeddings = [], [], [], []


        for i, chunk_id in enumerate(ids):

            if chunk_id not in existing_ids_in_db:

                new_ids.append(chunk_id)

                new_documents.append(documents[i])

                new_metadatas.append(metadatas[i])

                new_embeddings.append(embeddings[i])

            # else:

            #     print(f"Skipping existing chunk ID: {chunk_id}") # Too verbose


        if new_ids:

            self.collection.add(

                ids=new_ids,

                embeddings=new_embeddings,

                metadatas=new_metadatas,

                documents=new_documents

            )

            print(f"Added {len(new_ids)} new chunks to ChromaDB collection '{self.collection_name}'. Total items: {self.collection.count()}.")

        else:

            print("No new unique chunks to add after checking for existing IDs.")


    def query_chunks(self, query_embedding: List[float], n_results: int = 10) -> List[Dict[str, Any]]:

        """

        Queries the vector database for the most similar chunks.


        Args:

            query_embedding (List[float]): The embedding of the user's query.

            n_results (int): The number of top similar chunks to retrieve.


        Returns:

            List[Dict[str, Any]]: A list of dictionaries, each representing a

                                  retrieved chunk with its content and metadata.

        """

        if not query_embedding:

            print("Query embedding is empty, cannot perform query.")

            return []


        results = self.collection.query(

            query_embeddings=[query_embedding],

            n_results=n_results,

            include=['documents', 'metadatas', 'distances']

        )


        retrieved_chunks = []

        if results and results['documents'] and results['documents'][0]:

            for i in range(len(results['documents'][0])):

                chunk = {

                    "text": results['documents'][0][i],

                    "metadata": results['metadatas'][0][i],

                    "distance": results['distances'][0][i]

                }

                retrieved_chunks.append(chunk)

        return retrieved_chunks


    def reset_database(self):

        """

        Completely resets (deletes and recreates) the vector database.

        Use with caution!

        """

        print(f"WARNING: Resetting ChromaDB at {VECTOR_DB_DIR}...")

        try:

            # Delete the entire directory for a clean reset

            if os.path.exists(VECTOR_DB_DIR):

                shutil.rmtree(VECTOR_DB_DIR)

                os.makedirs(VECTOR_DB_DIR, exist_ok=True)

            

            # Re-initialize client and collection

            self.client = PersistentClient(path=VECTOR_DB_DIR)

            self.collection = self.client.get_or_create_collection(name=self.collection_name)

            print("ChromaDB collection and directory reset successfully.")

        except Exception as e:

            print(f"Error resetting ChromaDB collection: {e}")

            # Fallback if directory deletion fails for some reason

            self.client = PersistentClient(path=VECTOR_DB_DIR)

            self.collection = self.client.get_or_create_collection(name=self.collection_name)


    def get_collection_count(self) -> int:

        """Returns the number of items in the collection."""

        return self.collection.count()

```



  `retriever.py`



# retriever.py

from typing import List, Dict, Any

from embedding_generator import EmbeddingGenerator

from vector_db import VectorDatabase


class Retriever:

    """

    Handles the initial retrieval of relevant document chunks from the

    vector database based on a user query.

    """

    def __init__(self, vector_db: VectorDatabase, embedding_generator: EmbeddingGenerator):

        """

        Initializes the Retriever with a VectorDatabase instance and an

        EmbeddingGenerator.

        """

        self.vector_db = vector_db

        self.embedding_generator = embedding_generator


    def retrieve_chunks(self, query: str, k: int = 10) -> List[Dict[str, Any]]:

        """

        Retrieves the top-k most relevant document chunks for a given query.


        Args:

            query (str): The user's query string.

            k (int): The number of chunks to retrieve initially.


        Returns:

            List[Dict[str, Any]]: A list of dictionaries, each representing a

                                  retrieved chunk with its content and metadata.

        """

        if not query:

            print("Query is empty, skipping retrieval.")

            return []


        # Generate embedding for the query

        query_embedding = self.embedding_generator.generate_embeddings([query])[0]


        # Query the vector database

        retrieved_chunks = self.vector_db.query_chunks(query_embedding, n_results=k)

        print(f"Initial retrieval found {len(retrieved_chunks)} chunks.")

        return retrieved_chunks



  `reranker.py`



# reranker.py

from typing import List, Dict, Any

from transformers import AutoTokenizer, AutoModelForSequenceClassification

import torch

from config import RERANKER_MODEL_CONFIG


class Reranker:

    """

    Reranks a list of retrieved document chunks based on their relevance to a query

    using a cross-encoder model.

    """

    def __init__(self):

        """

        Initializes the Reranker by loading the specified cross-encoder model

        and tokenizer.

        """

        model_name = RERANKER_MODEL_CONFIG["model_name"]

        provider = RERANKER_MODEL_CONFIG["provider"]


        if provider == "huggingface_local":

            print(f"Loading local reranker model: {model_name}")

            self.tokenizer = AutoTokenizer.from_pretrained(model_name)

            self.model = AutoModelForSequenceClassification.from_pretrained(model_name)

            self.device = "cuda" if torch.cuda.is_available() else "cpu"

            self.model.to(self.device)

            self.model.eval() # Set model to evaluation mode

        elif provider == "cohere":

            # For Cohere, you would typically initialize the Cohere client here.

            # import cohere

            # self.client = cohere.Client(RERANKER_MODEL_CONFIG["api_key"])

            raise NotImplementedError("Cohere reranking integration not fully implemented in snippet.")

        else:

            raise ValueError(f"Unsupported reranker provider: {provider}")


    def rerank_chunks(self, query: str, chunks: List[Dict[str, Any]], top_n: int = 5) -> List[Dict[str, Any]]:

        """

        Reranks a list of document chunks based on their relevance to the query.


        Args:

            query (str): The user's query string.

            chunks (List[Dict[str, Any]]): A list of initially retrieved chunk

                                           dictionaries. Each dict must have a 'text' key.

            top_n (int): The number of top-ranked chunks to return.


        Returns:

            List[Dict[str, Any]]: A list of reranked chunk dictionaries, sorted

                                  by relevance score in descending order.

        """

        if not chunks:

            return []


        if RERANKER_MODEL_CONFIG["provider"] == "huggingface_local":

            sentences = [(query, chunk["text"]) for chunk in chunks]

            with torch.no_grad():

                inputs = self.tokenizer(sentences, padding=True, truncation=True,

                                        return_tensors="pt").to(self.device)

                scores = self.model(**inputs).logits.squeeze().cpu().numpy()


            # Pair chunks with their scores

            scored_chunks = []

            for i, chunk in enumerate(chunks):

                chunk_copy = chunk.copy()

                chunk_copy["relevance_score"] = float(scores[i]) # Convert numpy float to Python float

                scored_chunks.append(chunk_copy)


            # Sort by relevance score in descending order

            reranked_chunks = sorted(scored_chunks, key=lambda x: x["relevance_score"], reverse=True)

            print(f"Reranked {len(chunks)} chunks, returning top {min(top_n, len(reranked_chunks))}.")

            return reranked_chunks[:top_n]


        elif RERANKER_MODEL_CONFIG["provider"] == "cohere":

            # response = self.client.rerank(

            #     query=query,

            #     documents=[chunk["text"] for chunk in chunks],

            #     top_n=top_n,

            #     model=RERANKER_MODEL_CONFIG["model_name"]

            # )

            # reranked_results = []

            # for rank in response.results:

            #     original_chunk = chunks[rank.index]

            #     original_chunk["relevance_score"] = rank.relevance_score

            #     reranked_results.append(original_chunk)

            # return reranked_results

            raise NotImplementedError("Cohere reranking integration not fully implemented in snippet.")

        else:

            return [] # Should not reach here



  `agent.py`



# agent.py

from typing import List, Dict, Any

from langchain_community.llms import Ollama

from langchain_openai import OpenAI

from langchain_core.prompts import ChatPromptTemplate

from langchain_core.output_parsers import StrOutputParser


from config import LLM_CONFIG, AGENT_SUBJECTS

from retriever import Retriever

from reranker import Reranker

from embedding_generator import EmbeddingGenerator

from vector_db import VectorDatabase

from document_processor import DocumentProcessor

from document_loaders import DocumentLoaderFactory

from web_scraper import WebScraper

import os

import time


class AgentLLM:

    """

    Manages the interaction with the Large Language Model (LLM), handling

    different providers based on configuration.

    """

    def __init__(self):

        """

        Initializes the LLM client based on the configuration.

        """

        provider = LLM_CONFIG["provider"]

        model_name = LLM_CONFIG["model_name"]


        if provider == "ollama":

            print(f"Initializing Ollama LLM with model: {model_name} at {LLM_CONFIG['api_base']}")

            self.llm = Ollama(model=model_name, base_url=LLM_CONFIG["api_base"])

        elif provider == "openai":

            print(f"Initializing OpenAI LLM with model: {model_name}")

            self.llm = OpenAI(model_name=model_name, openai_api_key=LLM_CONFIG["api_key"])

        elif provider == "huggingface_local":

            # For local HuggingFace models, you might use transformers pipeline

            # from langchain_community.llms import HuggingFacePipeline

            # from transformers import pipeline

            # self.llm = HuggingFacePipeline(pipeline=pipeline("text-generation", model=model_name))

            raise NotImplementedError("HuggingFace local LLM integration not fully implemented in snippet.")

        else:

            raise ValueError(f"Unsupported LLM provider: {provider}")


        # Define a simple prompt template for RAG

        self.rag_prompt = ChatPromptTemplate.from_messages([

            ("system", "You are an expert assistant, specializing in subjects like "

                       f"{', '.join(AGENT_SUBJECTS)}. Your goal is to provide concise, accurate, and helpful "

                       "answers based *only* on the provided context. If the context does not contain "

                       "enough information to answer the question, state that you don't have enough "

                       "information and avoid making up answers."),

            ("human", "Context:\n{context}\n\nQuestion: {question}")

        ])

        self.output_parser = StrOutputParser()

        self.rag_chain = self.rag_prompt | self.llm | self.output_parser



    def generate_response(self, prompt_text: str) -> str:

        """

        Generates a text response from the LLM based on the given prompt.

        This is for direct LLM calls without RAG.


        Args:

            prompt_text (str): The input prompt for the LLM.


        Returns:

            str: The LLM's generated response.

        """

        try:

            response = self.llm.invoke(prompt_text)

            return response

        except Exception as e:

            print(f"Error generating LLM response: {e}")

            return "I apologize, but I encountered an error while trying to generate a response."


    def generate_rag_response(self, question: str, context: str) -> str:

        """

        Generates a text response from the LLM using the RAG chain.


        Args:

            question (str): The user's question.

            context (str): The concatenated, relevant document chunks.


        Returns:

            str: The LLM's generated response based on the context.

        """

        try:

            response = self.rag_chain.invoke({"context": context, "question": question})

            return response

        except Exception as e:

            print(f"Error generating RAG response: {e}")

            return "I apologize, but I encountered an error while trying to generate a response using the provided context."


class AgenticAI:

    """

    The main orchestrator for the Agentic AI system, managing interactions

    between components and handling user requests.

    """

    def __init__(self):

        """

        Initializes all core components of the Agentic AI.

        """

        self.embedding_generator = EmbeddingGenerator()

        self.vector_db = VectorDatabase()

        self.retriever = Retriever(self.vector_db, self.embedding_generator)

        self.reranker = Reranker()

        self.llm_agent = AgentLLM()

        self.doc_processor = DocumentProcessor()

        self.doc_loader_factory = DocumentLoaderFactory()

        self.web_scraper = WebScraper()


        print("Agentic AI system initialized.")


    def _process_and_add_chunks(self, chunks: List[Dict[str, Any]]):

        """Helper to generate embeddings and add chunks to DB."""

        if not chunks:

            return


        texts_to_embed = [chunk["text"] for chunk in chunks]

        embeddings = self.embedding_generator.generate_embeddings(texts_to_embed)

        

        if embeddings:

            self.vector_db.add_chunks(chunks, embeddings)

        else:

            print("No embeddings generated, skipping adding to DB.")


    def update_knowledge_from_web(self, topic: str):

        """

        Searches the web for new information on a given topic, processes it,

        and adds it to the knowledge base.

        """

        print(f"\n--- Initiating web knowledge update for topic: '{topic}' ---")

        scraped_docs = self.web_scraper.search_and_scrape_for_topic(topic)

        

        total_new_chunks = []

        for doc in scraped_docs:

            processed_chunks = self.doc_processor.process_web_document(

                content=doc["content"],

                url=doc["source_url"],

                title=doc["title"],

                subject=doc["subject"],

                timestamp=doc["timestamp"]

            )

            total_new_chunks.extend(processed_chunks)


        self._process_and_add_chunks(total_new_chunks)

        print(f"--- Web knowledge update for '{topic}' completed. Total new chunks added: {len(total_new_chunks)} ---")


    def process_user_uploaded_document(self, file_path: str):

        """

        Processes a single user-uploaded document and adds it to the knowledge base.

        """

        print(f"\n--- Processing user-uploaded document: {file_path} ---")

        loader = self.doc_loader_factory.get_loader(file_path)

        if not loader:

            print(f"Skipping {file_path}: Unsupported file type.")

            return


        # For user-uploaded documents, we can assign them to a general subject

        # or prompt the user for a subject if a UI were present.

        # For this example, we'll assign to the first configured subject.

        subject = AGENT_SUBJECTS[0] if AGENT_SUBJECTS else "general"


        processed_chunks = self.doc_processor.process_document_file(file_path, loader, subject)

        self._process_and_add_chunks(processed_chunks)

        print(f"--- User document processing for {file_path} completed. Total new chunks added: {len(processed_chunks)} ---")


    def handle_user_prompt(self, prompt: str) -> str:

        """

        Handles a user's prompt, orchestrating retrieval, reranking, and LLM generation.

        Also detects update commands.

        """

        prompt_lower = prompt.lower()

        

        # Check for update commands

        if "update knowledge base" in prompt_lower:

            for subject in AGENT_SUBJECTS:

                if subject.lower() in prompt_lower:

                    self.update_knowledge_from_web(subject)

                    return f"Initiated knowledge base update for '{subject}'. This may take a moment."

            return "Please specify a subject to update, e.g., 'Update knowledge base on Sustainable Manufacturing Practices'."


        # Regular RAG query

        print(f"\n--- Handling user query: '{prompt}' ---")

        retrieved_chunks = self.retriever.retrieve_chunks(prompt, k=20) # Retrieve more for reranking

        

        if not retrieved_chunks:

            print("No relevant chunks found in the knowledge base.")

            return self.llm_agent.generate_response(f"I don't have enough information in my knowledge base to answer: {prompt}")


        reranked_chunks = self.reranker.rerank_chunks(prompt, retrieved_chunks, top_n=5) # Select top 5 after reranking


        if not reranked_chunks:

            print("No chunks remained after reranking.")

            return self.llm_agent.generate_response(f"I found some information, but it wasn't relevant enough to answer: {prompt}")


        context = "\n\n".join([chunk["text"] for chunk in reranked_chunks])

        

        print(f"--- Generated RAG context from {len(reranked_chunks)} reranked chunks. ---")

        response = self.llm_agent.generate_rag_response(prompt, context)

        return response


    def get_knowledge_base_status(self) -> str:

        """Provides a status report on the knowledge base."""

        num_chunks = self.vector_db.get_collection_count()

        num_processed_docs = len(self.doc_processor.processed_metadata)

        return f"Knowledge Base Status: {num_chunks} chunks from {num_processed_docs} unique documents."



  `web_scraper.py`



# web_scraper.py

import requests

from bs4 import BeautifulSoup

from typing import List, Dict, Any

import time

from config import WEB_SCRAPE_CONFIG, AGENT_SUBJECTS

import os


class WebScraper:

    """

    Handles web searching and scraping to gather new information.

    """

    def __init__(self):

        """

        Initializes the WebScraper with search engine configuration.

        """

        self.search_api_key = WEB_SCRAPE_CONFIG["search_engine_api_key"]

        self.search_base_url = WEB_SCRAPE_CONFIG["search_engine_base_url"]

        self.max_results = WEB_SCRAPE_CONFIG["max_search_results"]

        self.scrape_timeout = WEB_SCRAPE_CONFIG["scrape_timeout"]


        if not self.search_api_key or self.search_api_key == "YOUR_SERPER_API_KEY":

            print("Warning: SERPER_API_KEY is not set or is default. Web search will be limited or fail.")


    def _perform_search(self, query: str) -> List[Dict[str, Any]]:

        """

        Performs a web search using a search engine API (e.g., Serper.dev).


        Args:

            query (str): The search query string.


        Returns:

            List[Dict[str, Any]]: A list of search results, each containing

                                  'title', 'link', and 'snippet'.

        """

        if not self.search_api_key or self.search_api_key == "YOUR_SERPER_API_KEY":

            print("Skipping web search: SERPER_API_KEY not configured.")

            return []

        if self.max_results <= 0:

            print("Web search disabled: max_search_results is 0.")

            return []


        headers = {

            'X-API-KEY': self.search_api_key,

            'Content-Type': 'application/json'

        }

        params = {

            'q': query,

            'num': self.max_results

        }

        try:

            response = requests.get(self.search_base_url, headers=headers, params=params, timeout=self.scrape_timeout)

            response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)

            data = response.json()

            organic_results = data.get('organic', [])

            return [{'title': r.get('title'), 'link': r.get('link'), 'snippet': r.get('snippet')}

                    for r in organic_results if r.get('link')]

        except requests.exceptions.RequestException as e:

            print(f"Error during web search for '{query}': {e}")

            return []

        except Exception as e:

            print(f"An unexpected error occurred during web search for '{query}': {e}")

            return []


    def scrape_page_content(self, url: str) -> str:

        """

        Scrapes the main text content from a given URL.


        Args:

            url (str): The URL of the page to scrape.


        Returns:

            str: The extracted text content, or an empty string if scraping fails.

        """

        try:

            response = requests.get(url, timeout=self.scrape_timeout)

            response.raise_for_status()

            soup = BeautifulSoup(response.text, 'html.parser')


            # Remove script, style, and other non-content elements

            for script_or_style in soup(["script", "style", "header", "footer", "nav", "form"]):

                script_or_style.decompose()


            # Get text and clean it

            text = soup.get_text()

            lines = (line.strip() for line in text.splitlines())

            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))

            text = '\n'.join(chunk for chunk in chunks if chunk)

            return text

        except requests.exceptions.RequestException as e:

            print(f"Error scraping {url}: {e}")

            return ""

        except Exception as e:

            print(f"General error scraping {url}: {e}")

            return ""


    def search_and_scrape_for_topic(self, topic: str) -> List[Dict[str, Any]]:

        """

        Performs a web search for a topic and scrapes content from the results.


        Args:

            topic (str): The subject topic to search for.


        Returns:

            List[Dict[str, Any]]: A list of dictionaries, each containing

                                  'content', 'source_url', 'title', 'timestamp'.

        """

        print(f"Searching web for new information on '{topic}'...")

        search_query = f"{topic} latest news OR research papers"

        results = self._perform_search(search_query)


        scraped_documents = []

        for i, result in enumerate(results):

            print(f"  Scraping result {i+1}/{len(results)}: {result['link']}")

            content = self.scrape_page_content(result['link'])

            if content:

                scraped_documents.append({

                    "content": content,

                    "source_url": result['link'],

                    "title": result['title'],

                    "timestamp": time.time(), # Record when it was scraped

                    "subject": topic

                })

            time.sleep(1) # Be polite to websites

        print(f"Finished web scraping for '{topic}'. Found {len(scraped_documents)} new documents.")

        return scraped_documents

```


  `file_watcher.py`



# file_watcher.py

import time

import os

import hashlib

import json

from watchdog.observers import Observer

from watchdog.events import FileSystemEventHandler

from typing import Callable, Dict, Any


from config import USER_UPLOAD_FOLDER, PROCESSED_DOCS_METADATA_PATH


class UserUploadHandler(FileSystemEventHandler):

    """

    Event handler for file system events in the user upload folder.

    Triggers processing for new files.

    """

    def __init__(self, process_new_document_callback: Callable[[str], None]):

        """

        Initializes the handler with a callback function to process new documents.


        Args:

            process_new_document_callback (Callable[[str], None]): A function

                                                                  that takes a

                                                                  file path

                                                                  and processes it.

        """

        self.process_new_document = process_new_document_callback

        # Load all processed hashes, the DocumentProcessor will handle if it's new/modified

        self.processed_file_hashes = self._load_all_processed_hashes()

        print(f"Initialized UserUploadHandler. Monitoring: {USER_UPLOAD_FOLDER}")


    def _load_all_processed_hashes(self) -> Dict[str, str]:

        """Loads all metadata of already processed documents."""

        if os.path.exists(PROCESSED_DOCS_METADATA_PATH):

            with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:

                return json.load(f)

        return {}


    def _get_file_hash(self, file_path: str) -> str:

        """Generates a hash for the content of a file."""

        hasher = hashlib.sha256()

        try:

            with open(file_path, 'rb') as f:

                while chunk := f.read(8192):

                    hasher.update(chunk)

            return hasher.hexdigest()

        except Exception as e:

            print(f"Error hashing file {file_path}: {e}")

            return ""


    def on_created(self, event):

        """

        Called when a file or directory is created.

        """

        if not event.is_directory:

            print(f"Detected new file creation event: {event.src_path}")

            self._check_and_process_file(event.src_path)


    def on_modified(self, event):

        """

        Called when a file or directory is modified.

        This can also indicate a new file being fully written.

        """

        if not event.is_directory:

            # Sometimes 'created' isn't fired immediately or reliably for large files.

            # 'modified' catches the completion of file writes.

            print(f"Detected file modification event: {event.src_path}")

            self._check_and_process_file(event.src_path)


    def _check_and_process_file(self, file_path: str):

        """

        Checks if a file is new or modified and processes it if needed.

        """

        # Ensure the file is fully written before processing

        # This is a heuristic; more robust solutions might involve locking or retry logic.

        time.sleep(2) # Give the OS a moment to finish writing


        if not os.path.exists(file_path):

            print(f"File {file_path} no longer exists, skipping processing.")

            return


        current_hash = self._get_file_hash(file_path)

        if not current_hash:

            print(f"Could not get hash for {file_path}, skipping.")

            return


        # The actual check for new/modified is handled by DocumentProcessor

        # We just call the callback, and it will decide if processing is needed.

        print(f"Triggering processing for potential new/modified document: {file_path}")

        self.process_new_document(file_path) # The callback handles the deduplication logic


class FileWatcher:

    """

    Sets up and starts an observer to watch for file system events.

    """

    def __init__(self, folder_to_watch: str, process_new_document_callback: Callable[[str], None]):

        """

        Initializes the FileWatcher.


        Args:

            folder_to_watch (str): The path to the folder to monitor.

            process_new_document_callback (Callable[[str], None]): Callback for new docs.

        """

        self.folder_to_watch = folder_to_watch

        self.event_handler = UserUploadHandler(process_new_document_callback)

        self.observer = Observer()


    def start(self):

        """Starts the file system observer."""

        print(f"Starting file watcher for folder: {self.folder_to_watch}")

        self.observer.schedule(self.event_handler, self.folder_to_watch, recursive=False) # Only watch top level

        self.observer.start()

        # The main loop will keep the program alive

        # try:

        #     while True:

        #         time.sleep(1)

        # except KeyboardInterrupt:

        #     self.stop()


    def stop(self):

        """Stops the file system observer."""

        if self.observer.is_alive():

            print("Stopping file watcher.")

            self.observer.stop()

            self.observer.join()



  `scheduler.py`



# scheduler.py

from apscheduler.schedulers.background import BackgroundScheduler

import time

from typing import Callable


from config import AUTO_UPDATE_INTERVAL_HOURS


class KnowledgeBaseScheduler:

    """

    Schedules automated tasks, such as periodically updating the knowledge base.

    """

    def __init__(self, update_function: Callable[[], None]):

        """

        Initializes the scheduler with the function to call for updates.


        Args:

            update_function (Callable[[], None]): The function to execute

                                                  periodically for updates.

        """

        self.scheduler = BackgroundScheduler()

        self.update_function = update_function

        self.interval_hours = AUTO_UPDATE_INTERVAL_HOURS


    def start(self):

        """

        Starts the scheduler and adds the update job.

        """

        if self.interval_hours > 0:

            # Schedule the job to run every 'interval_hours'

            # Start it 1 minute after launch to allow other components to initialize

            self.scheduler.add_job(self.update_function, 'interval', hours=self.interval_hours,

                                   next_run_time=time.time() + 60)

            print(f"Scheduled automated knowledge base updates every {self.interval_hours} hours.")

            self.scheduler.start()

        else:

            print("Automated updates are disabled (AUTO_UPDATE_INTERVAL_HOURS is 0 or less).")


    def stop(self):

        """

        Stops the scheduler.

        """

        if self.scheduler.running:

            print("Stopping knowledge base scheduler.")

            self.scheduler.shutdown()



  `main.py`



# main.py

import time

import os

import sys


from agent import AgenticAI

from file_watcher import FileWatcher

from scheduler import KnowledgeBaseScheduler

from config import USER_UPLOAD_FOLDER, AGENT_SUBJECTS


def main():

    """

    Main function to initialize and run the Agentic AI system.

    """

    print("==================================================")

    print("  Initializing Agentic AI System    ")

    print("==================================================")


    agent = AgenticAI()


    # --- Initialize File Watcher for User Uploads ---

    # The callback for the file watcher is agent.process_user_uploaded_document

    file_watcher = FileWatcher(USER_UPLOAD_FOLDER, agent.process_user_uploaded_document)

    file_watcher.start()


    # --- Initialize Scheduler for Automated Web Updates ---

    # The callback for the scheduler needs to update all configured subjects.

    def automated_web_update_all_subjects():

        print("\n--- Automated knowledge base update triggered ---")

        for subject in AGENT_SUBJECTS:

            agent.update_knowledge_from_web(subject)

        print("--- Automated knowledge base update finished ---")


    kb_scheduler = KnowledgeBaseScheduler(automated_web_update_all_subjects)

    kb_scheduler.start()


    print("\nAgentic AI system is ready!")

    print(f"Monitoring '{USER_UPLOAD_FOLDER}' for new documents.")

    print("Type your queries or commands below. Type 'exit' to quit.")

    print("Try: 'What are sustainable manufacturing practices?'")

    print("Or: 'Update knowledge base on Sustainable Manufacturing Practices'")

    print("Or: 'status' to check knowledge base status.")

    print("--------------------------------------------------")


    try:

        while True:

            user_input = input("\nYour prompt > ").strip()

            if user_input.lower() == 'exit':

                print("Exiting Agentic AI System. Goodbye!")

                break

            elif user_input.lower() == 'status':

                print(agent.get_knowledge_base_status())

            else:

                response = agent.handle_user_prompt(user_input)

                print("\nAgentic AI Response:")

                print(response)


            time.sleep(0.1) # Small delay to prevent busy-waiting


    except KeyboardInterrupt:

        print("\nKeyboard interrupt detected. Shutting down...")

    finally:

        file_watcher.stop()

        kb_scheduler.stop()

        print("All background processes stopped.")

        print("==================================================")

        print("  Agentic AI System Shut Down                     ")

        print("==================================================")


if __name__ == "__main__":

    main()



  How to Run the Example



1. Save all files: Save each code block into its respective `.py` file in the `agentic_ai_system` directory.

2.  Configure `config.py`:

Replace `"YOUR_SERPER_API_KEY"` with your actual Serper API key.

Ensure `LLM_CONFIG` points to your running Ollama instance or OpenAI (if you configure it).

3.  Start Ollama (if using local LLM): Open a terminal and run `ollama run llama3` (or your chosen model).

4.  Run `main.py`: Open a *new* terminal in the `agentic_ai_system` directory and execute:

    bash


    python main.py

    


The system will initialize, start the file watcher, and the scheduler. You can then interact with it by typing prompts in the console.


  • To add documents: Place a `.pdf`, `.html`, or `.md` file into the `data/user_uploads` folder. The file watcher will detect it, process it, and add its content to the knowledge base.
  • To update knowledge from web: Type a command like `Update knowledge base on Sustainable Manufacturing Practices`.
  • To query**: Ask questions related to the configured subjects, e.g., `What are the key principles of circular economy in engineering?` or `Tell me about the latest innovations in sustainable manufacturing.`


This running example provides a solid foundation for your Agentic AI system, ready for further expansion and customization!