In today's fast-paced environment, staying informed and efficient is paramount. This article outlines the architecture and implementation of a powerful Agentic AI system designed to be your personal knowledge navigator and assistant. This system will not only answer your specific questions by leveraging a continuously updated knowledge base but also proactively expand its understanding of critical subjects relevant to your work.
Imagine an AI assistant that understands the nuances of your projects, keeps itself updated on the latest industry trends, and helps you sift through vast amounts of information with ease. That's precisely what we're building here. This Agentic AI system is engineered to handle user prompts on predefined subjects, drawing insights from a rich, dynamically updated knowledge base. It employs sophisticated techniques like Retrieval-Augmented Generation (RAG) with semantic chunking and advanced reranking to deliver accurate and highly relevant information. Furthermore, it intelligently manages its knowledge, automatically seeking out new information from the web and seamlessly integrating documents you provide.
Let's dive into the fascinating world of Agentic AI and explore how to construct such a robust and intelligent system.
1. Core Components of Our Agentic AI System
Our Agentic AI system is a symphony of interconnected modules, each playing a crucial role in its overall intelligence and functionality. Understanding these components is key to appreciating the system's capabilities.
- Configuration Management: This foundational component defines the subjects the agent specializes in and specifies which Large Language Model (LLM), be it local or remote, the system should utilize for its reasoning and generation tasks. It acts as the central control panel for the agent's operational parameters.
- Document Ingestion and Processing Pipeline: This vital pipeline is responsible for taking raw information from various sources, such as PDF files, HTML pages, and Markdown documents, and transforming it into a format suitable for retrieval. It involves crucial steps like parsing, semantic chunking, and deduplication to ensure data quality and efficiency.
- Vector Database and Embeddings: At the heart of the system's knowledge storage lies the vector database. This component stores numerical representations, known as embeddings, of the processed document chunks. These embeddings allow for rapid and semantically meaningful similarity searches, enabling the system to find the most relevant pieces of information for any given query.
- Retrieval-Augmented Generation (RAG) with Reranking: This is the brain of our question-answering mechanism. When a user asks a question, the RAG system first retrieves relevant information from the vector database. Crucially, it then employs sophisticated reranking strategies to refine these retrieved results, ensuring that only the most pertinent and accurate information is passed to the LLM for generating a comprehensive and contextually rich answer.
- Agent Orchestration: This module acts as the conductor of our AI orchestra. It manages the flow of information, directs user prompts to the appropriate tools (like the RAG system or web search), and integrates the responses from the LLM to formulate a coherent and helpful answer for the user. It embodies the "agentic" nature by making decisions on how to best fulfill a user's request.
- Knowledge Base Update Mechanisms: To remain effective, our agent must continuously learn and adapt. This component encompasses two key mechanisms for updating its knowledge.
- Web Search for Relevant News and Papers: The agent can autonomously search the internet for the latest information on its configured subjects, ensuring its knowledge is always current.
- User-Added Documents: Employees can easily contribute their own documents by simply placing them in a designated local folder, and the system will automatically process and integrate them.
- Scheduling for Automated Updates: Beyond user-initiated updates, the system incorporates a scheduler that periodically triggers the knowledge base update process. This ensures that the agent's understanding of its subjects remains fresh and comprehensive, even without explicit user intervention.
2. Deep Dive into Each Component: Implementation Details
Let's explore each component in detail, including practical examples and code snippets to illustrate their functionality. We will use "Sustainable Manufacturing Practices" as our running example subject throughout this section.
2.1. Configuration Management
The configuration file is the central hub for defining the agent's behavior. It specifies the subjects the agent will focus on, the LLM it should use, and various paths for data storage. This separation of configuration from code allows for easy modification and scalability.
We will use a simple Python file, `config.py`, to store these settings.
Example: `config.py`
# config.py
import os
# Define the base directory for all data
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(BASE_DIR, "data")
DOCUMENTS_DIR = os.path.join(DATA_DIR, "documents")
VECTOR_DB_DIR = os.path.join(DATA_DIR, "vector_db")
PROCESSED_DOCS_METADATA_PATH = os.path.join(DATA_DIR, "processed_docs_metadata.json")
# Ensure directories exist
os.makedirs(DOCUMENTS_DIR, exist_ok=True)
os.makedirs(VECTOR_DB_DIR, exist_ok=True)
# Define the subjects the Agentic AI should be knowledgeable about.
# These subjects will guide web searches and contextual understanding.
AGENT_SUBJECTS = [
"Sustainable Manufacturing Practices",
"Industry 4.0 Technologies in Production",
"Circular Economy in Engineering",
"Energy Efficiency in Industrial Processes"
]
# Large Language Model (LLM) Configuration
# You can specify a local LLM (e.g., via Ollama) or a remote one (e.g., OpenAI).
# For local LLMs, ensure your Ollama server is running.
LLM_CONFIG = {
"provider": "ollama", # Options: "ollama", "openai", "huggingface_local"
"model_name": "llama3", # e.g., "llama3", "gpt-3.5-turbo", "mistral"
"api_base": "http://localhost:11434", # Only for "ollama" or self-hosted APIs
"api_key": "YOUR_OPENAI_API_KEY" # Only for "openai"
}
# Embedding Model Configuration
# Used to convert text into numerical vectors for the vector database.
EMBEDDING_MODEL_CONFIG = {
"provider": "huggingface_local", # Options: "huggingface_local", "openai"
"model_name": "sentence-transformers/all-MiniLM-L6-v2", # Local model path or name
"api_key": "YOUR_OPENAI_API_KEY" # Only for "openai"
}
# Reranker Model Configuration
# Used to re-rank retrieved documents for higher relevance.
RERANKER_MODEL_CONFIG = {
"provider": "huggingface_local", # Options: "huggingface_local", "cohere"
"model_name": "cross-encoder/ms-marco-MiniLM-L-6-v2", # Local cross-encoder model
"api_key": "YOUR_COHERE_API_KEY" # Only for "cohere"
}
# Web Scraping Configuration
WEB_SCRAPE_CONFIG = {
"search_engine_api_key": "YOUR_SERPER_API_KEY", # e.g., Serper, SerpApi, or custom
"search_engine_base_url": "https://google.serper.dev/search",
"max_search_results": 5,
"scrape_timeout": 10 # seconds
}
# Knowledge Base Update Schedule
# Interval in hours for automated web-based knowledge updates.
AUTO_UPDATE_INTERVAL_HOURS = 24
# File Watcher Configuration
# Directory where users can place documents for automatic processing.
USER_UPLOAD_FOLDER = os.path.join(DATA_DIR, "user_uploads")
os.makedirs(USER_UPLOAD_FOLDER, exist_ok=True)
In this configuration, we establish paths for storing documents and the vector database, define the core subjects of interest, and specify the details for the LLM, embedding model, and reranker. We also include settings for web scraping and the automated update schedule.
2.2. Document Ingestion and Processing Pipeline
This pipeline is crucial for transforming raw documents into a usable format for the RAG system. It involves loading various file types, semantically chunking their content, and generating embeddings.
The system needs to handle diverse document formats. We'll create specialized loaders for each type. For robustness, we'll use libraries like `pypdf` for PDFs, `BeautifulSoup` for HTML, and `markdown` for Markdown files.
Example: `document_loaders.py` (snippet for PDF loading)
# document_loaders.py (snippet)
import os
from pypdf import PdfReader
class PDFLoader:
"""
A class responsible for loading text content from PDF files.
"""
def load(self, file_path: str) -> str:
"""
Loads and extracts all text content from a specified PDF file.
Args:
file_path (str): The full path to the PDF file.
Returns:
str: The concatenated text content from all pages of the PDF.
Returns an empty string if the file cannot be read or
contains no text.
"""
if not os.path.exists(file_path):
print(f"Warning: PDF file not found at {file_path}")
return ""
try:
reader = PdfReader(file_path)
text_content = ""
for page in reader.pages:
text_content += page.extract_text() + "\n"
return text_content
except Exception as e:
print(f"Error loading PDF {file_path}: {e}")
return ""
# Similar loaders would be implemented for HTML and Markdown.
# For HTML, you'd use BeautifulSoup to parse and extract visible text.
# For Markdown, you'd use the markdown library to convert to plain text.
2.2.2. Semantic Chunking
Traditional text splitting often breaks documents at arbitrary character counts, potentially separating related ideas. Semantic chunking aims to keep semantically related sentences or paragraphs together, even if it means varying chunk sizes. This significantly improves the quality of retrieval. A common approach involves splitting into smaller units (like sentences), embedding them, and then grouping adjacent units whose embeddings are sufficiently similar. For simplicity in this example, we will use a `RecursiveCharacterTextSplitter` from LangChain which is a good starting point for maintaining context, and mention how to enhance it with semantic similarity.
Example: `document_processor.py` (snippet for chunking)
# document_processor.py (snippet)
from typing import List, Dict, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter
class DocumentProcessor:
"""
Handles the chunking and embedding generation for document content.
"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""
Initializes the DocumentProcessor with chunking parameters.
Args:
chunk_size (int): The maximum number of characters in a chunk.
chunk_overlap (int): The number of characters to overlap between
consecutive chunks to maintain context.
"""
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False,
)
def chunk_document(self, doc_content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Splits a document's content into smaller, semantically coherent chunks.
Args:
doc_content (str): The full text content of the document.
metadata (Dict[str, Any]): Metadata associated with the document
(e.g., source, title, file_path).
Returns:
List[Dict[str, Any]]: A list of dictionaries, where each dictionary
represents a chunk with its content and
associated metadata.
"""
# LangChain's text splitter returns Document objects, we convert them
# to our desired dictionary format.
chunks = self.text_splitter.create_documents([doc_content])
processed_chunks = []
for i, chunk in enumerate(chunks):
chunk_metadata = metadata.copy()
chunk_metadata["chunk_id"] = f"{metadata.get('doc_id', 'unknown')}_chunk_{i}"
chunk_metadata["text"] = chunk.page_content
processed_chunks.append(chunk_metadata)
return processed_chunks
# For true semantic chunking, one would typically:
# 1. Split text into smaller units (e.g., sentences).
# 2. Generate embeddings for each unit.
# 3. Group adjacent units whose embeddings are sufficiently similar,
# or use a sliding window approach with embedding similarity checks.
# Libraries like LlamaIndex offer more advanced semantic chunking strategies
# that could be integrated here for even better performance.
2.2.3. Deduplication Strategies
Deduplication is essential to prevent redundant information from cluttering the vector database and skewing retrieval results. We can apply deduplication at two levels:
- Document Level: Prevent processing the same document multiple times. This can be done by storing a hash of the document's content or its unique file path and modification timestamp.
- Chunk Level: Identify and remove identical or near-identical chunks, which can arise from overlapping splits or highly repetitive content. Hashing chunk content or comparing chunk embeddings can achieve this.
Example: `document_processor.py` (snippet for deduplication logic)
# document_processor.py (snippet)
import hashlib
import json
from config import PROCESSED_DOCS_METADATA_PATH
class DocumentProcessor:
# ... (previous __init__ and chunk_document methods)
def _generate_content_hash(self, content: str) -> str:
"""Generates an SHA256 hash for the given string content."""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def _load_processed_metadata(self) -> Dict[str, str]:
"""Loads metadata of already processed documents."""
if os.path.exists(PROCESSED_DOCS_METADATA_PATH):
with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def _save_processed_metadata(self, metadata: Dict[str, str]):
"""Saves metadata of processed documents."""
with open(PROCESSED_DOCS_METADATA_PATH, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=4)
def is_document_new_or_modified(self, file_path: str, current_content_hash: str) -> bool:
"""
Checks if a document is new or has been modified since last processing.
Args:
file_path (str): The path to the document.
current_content_hash (str): The SHA256 hash of the document's
current content.
Returns:
bool: True if the document is new or its content hash has changed,
False otherwise.
"""
processed_metadata = self._load_processed_metadata()
stored_hash = processed_metadata.get(file_path)
return stored_hash != current_content_hash
def mark_document_as_processed(self, file_path: str, content_hash: str):
"""
Marks a document as processed by storing its content hash.
"""
processed_metadata = self._load_processed_metadata()
processed_metadata[file_path] = content_hash
self._save_processed_metadata(processed_metadata)
def deduplicate_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Removes duplicate chunks based on their text content.
Args:
chunks (List[Dict[str, Any]]): A list of chunk dictionaries.
Returns:
List[Dict[str, Any]]: A list of unique chunk dictionaries.
"""
seen_hashes = set()
unique_chunks = []
for chunk in chunks:
chunk_hash = self._generate_content_hash(chunk["text"])
if chunk_hash not in seen_hashes:
unique_chunks.append(chunk)
seen_hashes.add(chunk_hash)
return unique_chunks
2.2.4. Embedding Generation
Embeddings are numerical vector representations of text that capture semantic meaning. Texts with similar meanings will have embeddings that are close to each other in the vector space. We will use a pre-trained sentence transformer model for this purpose.
Example: `embedding_generator.py`
# embedding_generator.py
from typing import List, Dict
from sentence_transformers import SentenceTransformer
import numpy as np
from config import EMBEDDING_MODEL_CONFIG
class EmbeddingGenerator:
"""
Generates embeddings for text chunks using a specified sentence transformer model.
"""
def __init__(self):
"""
Initializes the EmbeddingGenerator by loading the specified model.
"""
model_name = EMBEDDING_MODEL_CONFIG["model_name"]
provider = EMBEDDING_MODEL_CONFIG["provider"]
if provider == "huggingface_local":
print(f"Loading local embedding model: {model_name}")
self.model = SentenceTransformer(model_name)
elif provider == "openai":
# Placeholder for OpenAI embeddings. Requires openai client.
# from openai import OpenAI
# self.client = OpenAI(api_key=EMBEDDING_MODEL_CONFIG["api_key"])
# self.model_name = model_name
raise NotImplementedError("OpenAI embedding integration not shown in snippet.")
else:
raise ValueError(f"Unsupported embedding provider: {provider}")
def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""
Generates embeddings for a list of text strings.
Args:
texts (List[str]): A list of text strings (e.g., document chunks).
Returns:
List[List[float]]: A list of embedding vectors, where each vector
is a list of floats.
"""
if not texts:
return []
if EMBEDDING_MODEL_CONFIG["provider"] == "huggingface_local":
embeddings = self.model.encode(texts, convert_to_numpy=True).tolist()
elif EMBEDDING_MODEL_CONFIG["provider"] == "openai":
# response = self.client.embeddings.create(input=texts, model=self.model_name)
# embeddings = [d.embedding for d in response.data]
raise NotImplementedError("OpenAI embedding integration not shown in snippet.")
else:
embeddings = [] # Should not reach here due to ValueError in __init__
return embeddings
2.3. Vector Database and Document Storage
The vector database is where our processed chunks and their embeddings reside, enabling efficient similarity searches. We'll use ChromaDB for its ease of setup and local persistence, which aligns with our requirement for local storage. All raw documents will also be stored in a designated local folder.
Example: `vector_db.py`
# vector_db.py
from typing import List, Dict, Any
from chromadb import PersistentClient
from chromadb.utils import embedding_functions
import os
import shutil
from config import VECTOR_DB_DIR, EMBEDDING_MODEL_CONFIG
class VectorDatabase:
"""
Manages the interaction with the ChromaDB vector database.
Stores and retrieves document chunks and their embeddings.
"""
def __init__(self, collection_name: str = "agentic_ai_knowledge"):
"""
Initializes the VectorDatabase client and collection.
Args:
collection_name (str): The name of the collection to use in ChromaDB.
"""
self.client = PersistentClient(path=VECTOR_DB_DIR)
self.collection_name = collection_name
# ChromaDB requires an embedding function. We'll use a wrapper
# around our SentenceTransformer model.
# Note: For production, consider using Chroma's built-in SentenceTransformer
# embedding function or a custom one that matches your EmbeddingGenerator.
# For simplicity, we'll assume embeddings are provided externally.
self.collection = self.client.get_or_create_collection(
name=self.collection_name,
# If you want ChromaDB to handle embeddings directly:
# embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(
# model_name=EMBEDDING_MODEL_CONFIG["model_name"]
# )
)
print(f"Initialized ChromaDB at {VECTOR_DB_DIR} with collection '{collection_name}'")
def add_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[float]]):
"""
Adds a list of document chunks and their embeddings to the vector database.
Args:
chunks (List[Dict[str, Any]]): A list of chunk dictionaries, each
containing 'text' and 'metadata'.
embeddings (List[List[float]]): A list of embedding vectors,
corresponding to the chunks.
"""
if not chunks or not embeddings:
print("No chunks or embeddings to add.")
return
ids = [chunk["chunk_id"] for chunk in chunks]
documents = [chunk["text"] for chunk in chunks]
metadatas = [{k: v for k, v in chunk.items() if k not in ["text", "chunk_id"]} for chunk in chunks]
# ChromaDB's add method requires IDs, embeddings, metadatas, and documents.
# We ensure no duplicate IDs are added.
existing_ids = set(self.collection.get(ids=ids, include=[])["ids"])
new_ids, new_documents, new_metadatas, new_embeddings = [], [], [], []
for i, chunk_id in enumerate(ids):
if chunk_id not in existing_ids:
new_ids.append(chunk_id)
new_documents.append(documents[i])
new_metadatas.append(metadatas[i])
new_embeddings.append(embeddings[i])
else:
print(f"Skipping existing chunk ID: {chunk_id}")
if new_ids:
self.collection.add(
ids=new_ids,
embeddings=new_embeddings,
metadatas=new_metadatas,
documents=new_documents
)
print(f"Added {len(new_ids)} new chunks to ChromaDB collection '{self.collection_name}'.")
else:
print("No new unique chunks to add after checking for existing IDs.")
def query_chunks(self, query_embedding: List[float], n_results: int = 10) -> List[Dict[str, Any]]:
"""
Queries the vector database for the most similar chunks.
Args:
query_embedding (List[float]): The embedding of the user's query.
n_results (int): The number of top similar chunks to retrieve.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each representing a
retrieved chunk with its content and metadata.
"""
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
include=['documents', 'metadatas', 'distances']
)
retrieved_chunks = []
if results and results['documents']:
for i in range(len(results['documents'][0])):
chunk = {
"text": results['documents'][0][i],
"metadata": results['metadatas'][0][i],
"distance": results['distances'][0][i]
}
retrieved_chunks.append(chunk)
return retrieved_chunks
def reset_database(self):
"""
Completely resets (deletes and recreates) the vector database.
Use with caution!
"""
print(f"WARNING: Resetting ChromaDB at {VECTOR_DB_DIR}...")
try:
self.client.delete_collection(name=self.collection_name)
self.collection = self.client.create_collection(name=self.collection_name)
print("ChromaDB collection reset successfully.")
except Exception as e:
print(f"Error resetting ChromaDB collection: {e}")
# If collection deletion fails, try recreating client and collection
self.client = PersistentClient(path=VECTOR_DB_DIR)
self.collection = self.client.get_or_create_collection(name=self.collection_name)
def get_collection_count(self) -> int:
"""Returns the number of items in the collection."""
return self.collection.count()
2.4. Retrieval-Augmented Generation (RAG) with Reranking
The RAG system is the core mechanism for answering user queries. It combines the power of information retrieval with the generative capabilities of an LLM.
2.4.1. Initial Retrieval (Similarity Search)
When a user submits a query, the first step is to convert that query into an embedding using the same model used for document chunks. This query embedding is then used to perform a similarity search in the vector database, retrieving the top `N` most similar chunks.
Example: `retriever.py` (snippet for initial retrieval)
# retriever.py (snippet)
from typing import List, Dict, Any
from embedding_generator import EmbeddingGenerator
from vector_db import VectorDatabase
class Retriever:
"""
Handles the initial retrieval of relevant document chunks from the
vector database based on a user query.
"""
def __init__(self, vector_db: VectorDatabase, embedding_generator: EmbeddingGenerator):
"""
Initializes the Retriever with a VectorDatabase instance and an
EmbeddingGenerator.
"""
self.vector_db = vector_db
self.embedding_generator = embedding_generator
def retrieve_chunks(self, query: str, k: int = 10) -> List[Dict[str, Any]]:
"""
Retrieves the top-k most relevant document chunks for a given query.
Args:
query (str): The user's query string.
k (int): The number of chunks to retrieve initially.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each representing a
retrieved chunk with its content and metadata.
"""
# Generate embedding for the query
query_embedding = self.embedding_generator.generate_embeddings([query])[0]
# Query the vector database
retrieved_chunks = self.vector_db.query_chunks(query_embedding, n_results=k)
print(f"Initial retrieval found {len(retrieved_chunks)} chunks.")
return retrieved_chunks
2.4.2. Sophisticated Reranking
Initial retrieval often returns chunks that are semantically similar but might not be directly relevant to the specific intent of the query. Sophisticated reranking, typically using a cross-encoder model, re-evaluates these initially retrieved chunks based on their relevance to the query. Cross-encoders take both the query and a document chunk as input and output a relevance score, providing a more granular and accurate assessment than pure semantic similarity.
Example: `reranker.py`
# reranker.py
from typing import List, Dict, Any
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from config import RERANKER_MODEL_CONFIG
class Reranker:
"""
Reranks a list of retrieved document chunks based on their relevance to a query
using a cross-encoder model.
"""
def __init__(self):
"""
Initializes the Reranker by loading the specified cross-encoder model
and tokenizer.
"""
model_name = RERANKER_MODEL_CONFIG["model_name"]
provider = RERANKER_MODEL_CONFIG["provider"]
if provider == "huggingface_local":
print(f"Loading local reranker model: {model_name}")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model.to(self.device)
self.model.eval() # Set model to evaluation mode
elif provider == "cohere":
# Placeholder for Cohere reranking. Requires cohere client.
# import cohere
# self.client = cohere.Client(RERANKER_MODEL_CONFIG["api_key"])
raise NotImplementedError("Cohere reranking integration not shown in snippet.")
else:
raise ValueError(f"Unsupported reranker provider: {provider}")
def rerank_chunks(self, query: str, chunks: List[Dict[str, Any]], top_n: int = 5) -> List[Dict[str, Any]]:
"""
Reranks a list of document chunks based on their relevance to the query.
Args:
query (str): The user's query string.
chunks (List[Dict[str, Any]]): A list of initially retrieved chunk
dictionaries. Each dict must have a 'text' key.
top_n (int): The number of top-ranked chunks to return.
Returns:
List[Dict[str, Any]]: A list of reranked chunk dictionaries, sorted
by relevance score in descending order.
"""
if not chunks:
return []
if RERANKER_MODEL_CONFIG["provider"] == "huggingface_local":
sentences = [(query, chunk["text"]) for chunk in chunks]
with torch.no_grad():
inputs = self.tokenizer(sentences, padding=True, truncation=True,
return_tensors="pt").to(self.device)
scores = self.model(**inputs).logits.squeeze().cpu().numpy()
# Pair chunks with their scores
scored_chunks = []
for i, chunk in enumerate(chunks):
chunk_copy = chunk.copy()
chunk_copy["relevance_score"] = float(scores[i]) # Convert numpy float to Python float
scored_chunks.append(chunk_copy)
# Sort by relevance score in descending order
reranked_chunks = sorted(scored_chunks, key=lambda x: x["relevance_score"], reverse=True)
print(f"Reranked {len(chunks)} chunks, returning top {min(top_n, len(reranked_chunks))}.")
return reranked_chunks[:top_n]
elif RERANKER_MODEL_CONFIG["provider"] == "cohere":
# response = self.client.rerank(
# query=query,
# documents=[chunk["text"] for chunk in chunks],
# top_n=top_n,
# model=RERANKER_MODEL_CONFIG["model_name"]
# )
# reranked_results = []
# for rank in response.results:
# original_chunk = chunks[rank.index]
# original_chunk["relevance_score"] = rank.relevance_score
# reranked_results.append(original_chunk)
# return reranked_results
raise NotImplementedError("Cohere reranking integration not shown in snippet.")
else:
return [] # Should not reach here
2.5. Agent Orchestration
The agent orchestration layer is where the intelligence truly comes alive. It interprets the user's intent, decides which tools to use (e.g., RAG, web search), and constructs a coherent response using the LLM. For this, we'll use a simple agentic loop, demonstrating how the LLM can be prompted to act as a coordinator.
Example: `agent.py` (snippet for LLM interaction)
# agent.py (snippet)
from typing import List, Dict, Any
from langchain_community.llms import Ollama
from langchain_openai import OpenAI
from config import LLM_CONFIG
class AgentLLM:
"""
Manages the interaction with the Large Language Model (LLM), handling
different providers based on configuration.
"""
def __init__(self):
"""
Initializes the LLM client based on the configuration.
"""
provider = LLM_CONFIG["provider"]
model_name = LLM_CONFIG["model_name"]
if provider == "ollama":
print(f"Initializing Ollama LLM with model: {model_name} at {LLM_CONFIG['api_base']}")
self.llm = Ollama(model=model_name, base_url=LLM_CONFIG["api_base"])
elif provider == "openai":
print(f"Initializing OpenAI LLM with model: {model_name}")
self.llm = OpenAI(model_name=model_name, openai_api_key=LLM_CONFIG["api_key"])
elif provider == "huggingface_local":
# For local HuggingFace models, you might use transformers pipeline
# from langchain_community.llms import HuggingFacePipeline
# from transformers import pipeline
# self.llm = HuggingFacePipeline(pipeline=pipeline("text-generation", model=model_name))
raise NotImplementedError("HuggingFace local LLM integration not shown in snippet.")
else:
raise ValueError(f"Unsupported LLM provider: {provider}")
def generate_response(self, prompt: str) -> str:
"""
Generates a text response from the LLM based on the given prompt.
Args:
prompt (str): The input prompt for the LLM.
Returns:
str: The LLM's generated response.
"""
try:
response = self.llm.invoke(prompt)
return response
except Exception as e:
print(f"Error generating LLM response: {e}")
return "I apologize, but I encountered an error while trying to generate a response."
# The full agent logic would combine this LLM with the RAG system.
# It would involve constructing a prompt that includes the user's query
# and the retrieved, reranked document chunks.
# A typical prompt structure might look like:
# "You are an expert on [Subject]. Answer the following question based
# on the provided context. If the context does not contain the answer,
# state that you don't have enough information.
# Question: [User Query]
# Context: [Concatenated Reranked Chunks]"
2.6. Knowledge Base Update Mechanisms
Maintaining a current and comprehensive knowledge base is paramount. Our system supports two primary methods for updating its knowledge.
2.6.1. Web Search for Relevant News and Papers
When a user requests an update for a specific topic or during an automated refresh, the agent will search the web for new, relevant information. This involves using a search engine API, scraping content from search results, and then processing it through our ingestion pipeline.
Example: `web_scraper.py`
# web_scraper.py
import requests
from bs4 import BeautifulSoup
from typing import List, Dict, Any
import time
from config import WEB_SCRAPE_CONFIG, AGENT_SUBJECTS
class WebScraper:
"""
Handles web searching and scraping to gather new information.
"""
def __init__(self):
"""
Initializes the WebScraper with search engine configuration.
"""
self.search_api_key = WEB_SCRAPE_CONFIG["search_engine_api_key"]
self.search_base_url = WEB_SCRAPE_CONFIG["search_engine_base_url"]
self.max_results = WEB_SCRAPE_CONFIG["max_search_results"]
self.scrape_timeout = WEB_SCRAPE_CONFIG["scrape_timeout"]
if not self.search_api_key:
print("Warning: SERPER_API_KEY is not set. Web search will be limited or fail.")
def _perform_search(self, query: str) -> List[Dict[str, Any]]:
"""
Performs a web search using a search engine API (e.g., Serper.dev).
Args:
query (str): The search query string.
Returns:
List[Dict[str, Any]]: A list of search results, each containing
'title', 'link', and 'snippet'.
"""
headers = {
'X-API-KEY': self.search_api_key,
'Content-Type': 'application/json'
}
params = {
'q': query,
'num': self.max_results
}
try:
response = requests.get(self.search_base_url, headers=headers, params=params, timeout=self.scrape_timeout)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
data = response.json()
organic_results = data.get('organic', [])
return [{'title': r.get('title'), 'link': r.get('link'), 'snippet': r.get('snippet')}
for r in organic_results if r.get('link')]
except requests.exceptions.RequestException as e:
print(f"Error during web search for '{query}': {e}")
return []
def scrape_page_content(self, url: str) -> str:
"""
Scrapes the main text content from a given URL.
Args:
url (str): The URL of the page to scrape.
Returns:
str: The extracted text content, or an empty string if scraping fails.
"""
try:
response = requests.get(url, timeout=self.scrape_timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script, style, and other non-content elements
for script_or_style in soup(["script", "style", "header", "footer", "nav", "form"]):
script_or_style.decompose()
# Get text and clean it
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
except requests.exceptions.RequestException as e:
print(f"Error scraping {url}: {e}")
return ""
except Exception as e:
print(f"General error scraping {url}: {e}")
return ""
def search_and_scrape_for_topic(self, topic: str) -> List[Dict[str, Any]]:
"""
Performs a web search for a topic and scrapes content from the results.
Args:
topic (str): The subject topic to search for.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each containing
'content', 'source_url', 'title', 'timestamp'.
"""
print(f"Searching web for new information on '{topic}'...")
search_query = f"{topic} latest news OR research papers"
results = self._perform_search(search_query)
scraped_documents = []
for i, result in enumerate(results):
print(f" Scraping result {i+1}/{len(results)}: {result['link']}")
content = self.scrape_page_content(result['link'])
if content:
scraped_documents.append({
"content": content,
"source_url": result['link'],
"title": result['title'],
"timestamp": time.time(), # Record when it was scraped
"subject": topic
})
time.sleep(1) # Be polite to websites
print(f"Finished web scraping for '{topic}'. Found {len(scraped_documents)} new documents.")
return scraped_documents
2.6.2. User-Added Documents
Users can contribute documents by simply dropping them into a designated folder. A file system watcher continuously monitors this folder and triggers the processing pipeline for any *new* documents. This ensures that only newly added files are processed, avoiding redundant work.
Example: `file_watcher.py`
# file_watcher.py
import time
import os
import hashlib
import json
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from typing import Callable, Dict, Any
from config import USER_UPLOAD_FOLDER, PROCESSED_DOCS_METADATA_PATH
class UserUploadHandler(FileSystemEventHandler):
"""
Event handler for file system events in the user upload folder.
Triggers processing for new files.
"""
def __init__(self, process_new_document_callback: Callable[[str], None]):
"""
Initializes the handler with a callback function to process new documents.
Args:
process_new_document_callback (Callable[[str], None]): A function
that takes a
file path
and processes it.
"""
self.process_new_document = process_new_document_callback
self.processed_file_hashes = self._load_processed_hashes()
print(f"Initialized UserUploadHandler. Monitoring: {USER_UPLOAD_FOLDER}")
def _load_processed_hashes(self) -> Dict[str, str]:
"""Loads the hashes of already processed files from metadata."""
if os.path.exists(PROCESSED_DOCS_METADATA_PATH):
with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:
# Filter for files that were user-uploaded
return {path: hash_val for path, hash_val in json.load(f).items()
if path.startswith(USER_UPLOAD_FOLDER)}
return {}
def _save_processed_hash(self, file_path: str, content_hash: str):
"""Saves the hash of a newly processed file."""
self.processed_file_hashes[file_path] = content_hash
# Update the main metadata file
all_metadata = {}
if os.path.exists(PROCESSED_DOCS_METADATA_PATH):
with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:
all_metadata = json.load(f)
all_metadata[file_path] = content_hash
with open(PROCESSED_DOCS_METADATA_PATH, 'w', encoding='utf-8') as f:
json.dump(all_metadata, f, indent=4)
def _get_file_hash(self, file_path: str) -> str:
"""Generates a hash for the content of a file."""
hasher = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
print(f"Error hashing file {file_path}: {e}")
return ""
def on_created(self, event):
"""
Called when a file or directory is created.
"""
if not event.is_directory:
print(f"Detected new file: {event.src_path}")
self._check_and_process_file(event.src_path)
def on_modified(self, event):
"""
Called when a file or directory is modified.
This can also indicate a new file being fully written.
"""
if not event.is_directory:
# Sometimes 'created' isn't fired immediately or reliably for large files.
# 'modified' catches the completion of file writes.
print(f"Detected modification to file: {event.src_path}")
self._check_and_process_file(event.src_path)
def _check_and_process_file(self, file_path: str):
"""
Checks if a file is new or modified and processes it if needed.
"""
# Ensure the file is fully written before processing
time.sleep(1) # Give the OS a moment to finish writing
current_hash = self._get_file_hash(file_path)
if not current_hash:
print(f"Could not get hash for {file_path}, skipping.")
return
if file_path not in self.processed_file_hashes or \
self.processed_file_hashes.get(file_path) != current_hash:
print(f"Processing new or modified user-uploaded document: {file_path}")
self.process_new_document(file_path)
self._save_processed_hash(file_path, current_hash)
else:
print(f"Document {file_path} already processed and unchanged. Skipping.")
class FileWatcher:
"""
Sets up and starts an observer to watch for file system events.
"""
def __init__(self, folder_to_watch: str, process_new_document_callback: Callable[[str], None]):
"""
Initializes the FileWatcher.
Args:
folder_to_watch (str): The path to the folder to monitor.
process_new_document_callback (Callable[[str], None]): Callback for new docs.
"""
self.folder_to_watch = folder_to_watch
self.event_handler = UserUploadHandler(process_new_document_callback)
self.observer = Observer()
def start(self):
"""Starts the file system observer."""
print(f"Starting file watcher for folder: {self.folder_to_watch}")
self.observer.schedule(self.event_handler, self.folder_to_watch, recursive=True)
self.observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
self.stop()
def stop(self):
"""Stops the file system observer."""
print("Stopping file watcher.")
self.observer.stop()
self.observer.join()
2.7. Scheduling for Automated Updates
To keep the knowledge base perpetually fresh, the system includes a scheduler that periodically triggers the web-based knowledge update process for all configured subjects. This ensures that the agent automatically learns about new developments.
Example: `scheduler.py`
# scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
import time
from typing import Callable
from config import AUTO_UPDATE_INTERVAL_HOURS
class KnowledgeBaseScheduler:
"""
Schedules automated tasks, such as periodically updating the knowledge base.
"""
def __init__(self, update_function: Callable[[], None]):
"""
Initializes the scheduler with the function to call for updates.
Args:
update_function (Callable[[], None]): The function to execute
periodically for updates.
"""
self.scheduler = BackgroundScheduler()
self.update_function = update_function
self.interval_hours = AUTO_UPDATE_INTERVAL_HOURS
def start(self):
"""
Starts the scheduler and adds the update job.
"""
if self.interval_hours > 0:
self.scheduler.add_job(self.update_function, 'interval', hours=self.interval_hours,
next_run_time=time.time() + 60) # Start 1 min after launch
print(f"Scheduled automated knowledge base updates every {self.interval_hours} hours.")
self.scheduler.start()
else:
print("Automated updates are disabled (AUTO_UPDATE_INTERVAL_HOURS is 0 or less).")
def stop(self):
"""
Stops the scheduler.
"""
if self.scheduler.running:
print("Stopping knowledge base scheduler.")
self.scheduler.shutdown()
3. Putting It All Together: The Agentic Workflow
Now that we've explored the individual components, let's see how they collaborate to form a cohesive Agentic AI system.
3.1. User Prompt Workflow
When a user sends a prompt to the Agentic AI:
- Intent Recognition: The agent (orchestration layer) first analyzes the user's prompt to understand its intent. Is it a question? A request for information? An instruction to update the knowledge base?
- Query Embedding: If it's a question requiring knowledge retrieval, the user's query is converted into a numerical embedding using the `EmbeddingGenerator`.
- Initial Retrieval: The `Retriever` uses this query embedding to perform a similarity search against the `VectorDatabase`, fetching an initial set of potentially relevant document chunks.
- Sophisticated Reranking: The `Reranker` then takes these initial chunks and the original query. It employs a cross-encoder model to re-evaluate and score each chunk based on its direct relevance to the query, presenting the most pertinent information at the top.
- Contextualized Prompt Generation: The agent constructs a comprehensive prompt for the `AgentLLM`. This prompt includes the original user query, clear instructions for the LLM (e.g., "answer based only on the provided context"), and the text content of the top-ranked chunks.
- LLM Response Generation: The `AgentLLM` processes this contextualized prompt and generates a well-informed, coherent, and accurate response, drawing directly from the provided context.
- Response to User: The agent delivers the LLM's response back to the user.
3.2. Knowledge Base Update Workflow (User-Initiated and Automated)
The knowledge base is designed to be dynamic and continuously updated.
3.2.1. User-Initiated Web Update (e.g., "Update knowledge on <topic>")
- Topic Identification: The agent identifies the specific topic the user wants to update (e.g., "Sustainable Manufacturing Practices").
- Web Search and Scraping: The `WebScraper` is invoked with the identified topic. It performs a web search and scrapes content from the most relevant results (HTML, PDFs if linked and handled, etc.).
- Document Storage: The raw content of newly scraped documents is saved to the `DOCUMENTS_DIR` local folder.
- Document Processing: Each new document's content is passed to the `DocumentProcessor`. It checks for deduplication (has this exact document been processed before?). It performs semantic chunking, breaking the document into smaller, meaningful pieces. It generates embeddings for each chunk using the `EmbeddingGenerator`.
- Vector Database Update: The processed chunks and their embeddings are added to the `VectorDatabase`, making them available for future retrieval. Metadata about the source, title, and subject is also stored.
- Confirmation: The agent informs the user that the knowledge base for the specified topic has been updated.
3.2.2. Automated Web Update
This workflow is identical to the user-initiated web update but is triggered periodically by the `KnowledgeBaseScheduler` for all subjects defined in `AGENT_SUBJECTS` in the `config.py` file. This ensures proactive knowledge maintenance.
3.2.3. User-Added Documents (Local Folder Monitoring)
- File System Monitoring: The `FileWatcher` continuously monitors the `USER_UPLOAD_FOLDER` for any new files or modifications.
- New Document Detection: When a new file (e.g., a PDF report, an internal Markdown document) is detected, the `FileWatcher` verifies if it's truly new or modified by comparing its content hash against previously processed documents.
- Document Processing: If the document is new or modified, its content is loaded by the appropriate `DocumentLoader` (PDF, HTML, Markdown), then passed to the `DocumentProcessor` for chunking, deduplication, and embedding generation. The raw document is also moved or copied to the `DOCUMENTS_DIR` for central storage.
- Vector Database Update: The processed chunks and their embeddings are added to the `VectorDatabase`.
- Internal Notification: While not directly visible to the user unless explicitly requested, the system internally logs that new documents have been processed and integrated.
4. Benefits and Future Enhancements
This Agentic AI system offers numerous benefits:
- Enhanced Productivity: Quickly retrieve precise information, reducing time spent searching through documents and web pages.
- Always Up-to-Date Knowledge: Automated and user-driven updates ensure the knowledge base remains current with the latest industry trends and internal documentation.
- Personalized Expertise: By focusing on configured subjects, the agent becomes a specialized expert tailored to specific team or project needs.
- Improved Decision Making: Access to accurate, context-rich information supports better-informed decisions.
- Reduced Information Overload: The agent sifts through noise, presenting only the most relevant content.
Future enhancements could include:
- Advanced AgenticReasoning: Implement more complex planning and tool-use capabilities, allowing the agent to break down complex queries into sub-tasks and utilize multiple tools sequentially.
- Feedback Loop: Allow users to provide feedback on answers, which can be used to fine-tune retrieval, reranking, or LLM generation.
- Multi-Modal Support: Extend document processing to include images, videos, or audio transcripts.
- User Interface: Develop a user-friendly web or desktop interface for easier interaction, document upload, and configuration management.
- Security and Access Control: Implement robust security measures and access controls for sensitive information, especially when handling internal documents.
5. Conclusion
Building an Agentic AI system as described provides a powerful tool to navigate and leverage vast amounts of information. By combining robust document processing, intelligent RAG with reranking, and proactive knowledge base management, this system transforms how we access and utilize information, ultimately fostering greater efficiency and innovation. We encourage you to explore these concepts and adapt them to your specific project requirements, empowering your teams with a truly intelligent assistant.
ADDENDUM: Full Running Example Code
This section provides a complete, runnable example of the Agentic AI system, integrating all the components discussed. To run this, you will need Python 3.9+, `pip`, and potentially `ollama` running locally for the LLM.
Prerequisites and Setup
1. Install Python Libraries:
bash
pip install pypdf beautifulsoup4 markdown-it-py sentence-transformers \
chromadb langchain-community langchain-openai transformers \
torch apscheduler watchdog requests
Note: `torch` is a large dependency for `transformers`. If you don't have a
GPU, `pip install torch --index-url https://download.pytorch.org/whl/cpu`
might be faster.
2. Ollama (for local LLM):
Download and install Ollama from `ollama.com`.
Run `ollama run llama3` (or your preferred model) in your terminal to
download and start the LLM server.
3. Serper API Key (for web search):
Sign up at `serper.dev` to get a free API key for web search.
Replace `"YOUR_SERPER_API_KEY"` in `config.py` with your actual key.
(Optional: If you don't want web search, you can set `max_search_results`
to 0 in `config.py` or comment out the web scraper calls.)
4. Create Project Structure:
Create a folder named `agentic_ai_system`.
Inside it, create empty `data` folder.
Inside `data`, create empty `user_uploads` folder.
Place all the following Python files (`config.py`, `document_loaders.py`,
`document_processor.py`, `embedding_generator.py`, `vector_db.py`,
`retriever.py`, `reranker.py`, `agent.py`, `web_scraper.py`,
`file_watcher.py`, `scheduler.py`, `main.py`) directly inside the
`agentic_ai_system` folder.
`config.py`
# config.py
import os
# Define the base directory for all data
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_DIR = os.path.join(BASE_DIR, "data")
DOCUMENTS_DIR = os.path.join(DATA_DIR, "documents")
VECTOR_DB_DIR = os.path.join(DATA_DIR, "vector_db")
PROCESSED_DOCS_METADATA_PATH = os.path.join(DATA_DIR, "processed_docs_metadata.json")
# Ensure directories exist
os.makedirs(DOCUMENTS_DIR, exist_ok=True)
os.makedirs(VECTOR_DB_DIR, exist_ok=True)
# Define the subjects the Agentic AI should be knowledgeable about.
# These subjects will guide web searches and contextual understanding.
AGENT_SUBJECTS = [
"Sustainable Manufacturing Practices",
"Industry 4.0 Technologies in Production",
"Circular Economy in Engineering",
"Energy Efficiency in Industrial Processes"
]
# Large Language Model (LLM) Configuration
# You can specify a local LLM (e.g., via Ollama) or a remote one (e.g., OpenAI).
# For local LLMs, ensure your Ollama server is running.
LLM_CONFIG = {
"provider": "ollama", # Options: "ollama", "openai", "huggingface_local"
"model_name": "llama3", # e.g., "llama3", "gpt-3.5-turbo", "mistral"
"api_base": "http://localhost:11434", # Only for "ollama" or self-hosted APIs
"api_key": "YOUR_OPENAI_API_KEY" # Only for "openai"
}
# Embedding Model Configuration
# Used to convert text into numerical vectors for the vector database.
EMBEDDING_MODEL_CONFIG = {
"provider": "huggingface_local", # Options: "huggingface_local", "openai"
"model_name": "sentence-transformers/all-MiniLM-L6-v2", # Local model path or name
"api_key": "YOUR_OPENAI_API_KEY" # Only for "openai"
}
# Reranker Model Configuration
# Used to re-rank retrieved documents for higher relevance.
RERANKER_MODEL_CONFIG = {
"provider": "huggingface_local", # Options: "huggingface_local", "cohere"
"model_name": "cross-encoder/ms-marco-MiniLM-L-6-v2", # Local cross-encoder model
"api_key": "YOUR_COHERE_API_KEY" # Only for "cohere"
}
# Web Scraping Configuration
WEB_SCRAPE_CONFIG = {
"search_engine_api_key": "YOUR_SERPER_API_KEY", # e.g., Serper, SerpApi, or custom
"search_engine_base_url": "https://google.serper.dev/search",
"max_search_results": 3, # Reduced for example to limit API calls
"scrape_timeout": 10 # seconds
}
# Knowledge Base Update Schedule
# Interval in hours for automated web-based knowledge updates.
# Set to a higher value (e.g., 24) for production, 0 to disable.
AUTO_UPDATE_INTERVAL_HOURS = 0.1 # For testing, every 6 minutes
# File Watcher Configuration
# Directory where users can place documents for automatic processing.
USER_UPLOAD_FOLDER = os.path.join(DATA_DIR, "user_uploads")
os.makedirs(USER_UPLOAD_FOLDER, exist_ok=True)
`document_loaders.py`
# document_loaders.py
import os
from pypdf import PdfReader
from bs4 import BeautifulSoup
from markdown_it import MarkdownIt
class PDFLoader:
"""
A class responsible for loading text content from PDF files.
"""
def load(self, file_path: str) -> str:
"""
Loads and extracts all text content from a specified PDF file.
Args:
file_path (str): The full path to the PDF file.
Returns:
str: The concatenated text content from all pages of the PDF.
Returns an empty string if the file cannot be read or
contains no text.
"""
if not os.path.exists(file_path):
print(f"Warning: PDF file not found at {file_path}")
return ""
try:
reader = PdfReader(file_path)
text_content = ""
for page in reader.pages:
text_content += page.extract_text() + "\n"
return text_content
except Exception as e:
print(f"Error loading PDF {file_path}: {e}")
return ""
class HTMLLoader:
"""
A class responsible for loading and extracting visible text from HTML files.
"""
def load(self, file_path: str) -> str:
"""
Loads and extracts visible text content from a specified HTML file.
Args:
file_path (str): The full path to the HTML file.
Returns:
str: The extracted plain text content.
Returns an empty string if the file cannot be read or parsed.
"""
if not os.path.exists(file_path):
print(f"Warning: HTML file not found at {file_path}")
return ""
try:
with open(file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script, style, and other non-content elements
for script_or_style in soup(["script", "style", "header", "footer", "nav", "form"]):
script_or_style.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
except Exception as e:
print(f"Error loading HTML {file_path}: {e}")
return ""
class MarkdownLoader:
"""
A class responsible for loading and converting Markdown files to plain text.
"""
def load(self, file_path: str) -> str:
"""
Loads a Markdown file and converts its content to plain text.
Args:
file_path (str): The full path to the Markdown file.
Returns:
str: The plain text content of the Markdown file.
Returns an empty string if the file cannot be read.
"""
if not os.path.exists(file_path):
print(f"Warning: Markdown file not found at {file_path}")
return ""
try:
with open(file_path, 'r', encoding='utf-8') as f:
md_content = f.read()
# Use markdown-it to convert markdown to HTML, then BeautifulSoup to get text
md = MarkdownIt()
html = md.render(md_content)
soup = BeautifulSoup(html, 'html.parser')
return soup.get_text()
except Exception as e:
print(f"Error loading Markdown {file_path}: {e}")
return ""
class DocumentLoaderFactory:
"""
A factory to get the appropriate document loader based on file extension.
"""
def get_loader(self, file_path: str):
"""
Returns an instance of the correct loader for the given file type.
Args:
file_path (str): The path to the document file.
Returns:
Union[PDFLoader, HTMLLoader, MarkdownLoader, None]: An instance
of the
appropriate
loader, or
None if
unsupported.
"""
ext = os.path.splitext(file_path)[1].lower()
if ext == ".pdf":
return PDFLoader()
elif ext in [".html", ".htm"]:
return HTMLLoader()
elif ext in [".md", ".markdown"]:
return MarkdownLoader()
else:
print(f"Unsupported file type for loading: {ext}")
return None
`document_processor.py`
# document_processor.py
import os
import hashlib
import json
import shutil
from typing import List, Dict, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter
from config import PROCESSED_DOCS_METADATA_PATH, DOCUMENTS_DIR
class DocumentProcessor:
"""
Handles the chunking and embedding generation for document content,
along with deduplication logic.
"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""
Initializes the DocumentProcessor with chunking parameters.
Args:
chunk_size (int): The maximum number of characters in a chunk.
chunk_overlap (int): The number of characters to overlap between
consecutive chunks to maintain context.
"""
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False,
)
# Load existing metadata on initialization
self.processed_metadata = self._load_processed_metadata()
def _generate_content_hash(self, content: str) -> str:
"""Generates an SHA256 hash for the given string content."""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def _load_processed_metadata(self) -> Dict[str, str]:
"""Loads metadata of already processed documents."""
if os.path.exists(PROCESSED_DOCS_METADATA_PATH):
with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def _save_processed_metadata(self):
"""Saves current state of processed documents metadata."""
with open(PROCESSED_DOCS_METADATA_PATH, 'w', encoding='utf-8') as f:
json.dump(self.processed_metadata, f, indent=4)
def is_document_new_or_modified(self, doc_id: str, current_content_hash: str) -> bool:
"""
Checks if a document is new or has been modified since last processing.
Args:
doc_id (str): A unique identifier for the document (e.g., file_path or URL).
current_content_hash (str): The SHA256 hash of the document's
current content.
Returns:
bool: True if the document is new or its content hash has changed,
False otherwise.
"""
stored_hash = self.processed_metadata.get(doc_id)
return stored_hash != current_content_hash
def mark_document_as_processed(self, doc_id: str, content_hash: str):
"""
Marks a document as processed by storing its content hash.
"""
self.processed_metadata[doc_id] = content_hash
self._save_processed_metadata()
def chunk_document(self, doc_content: str, metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Splits a document's content into smaller, semantically coherent chunks.
Args:
doc_content (str): The full text content of the document.
metadata (Dict[str, Any]): Metadata associated with the document
(e.g., source, title, file_path).
Returns:
List[Dict[str, Any]]: A list of dictionaries, where each dictionary
represents a chunk with its content and
associated metadata.
"""
if not doc_content.strip():
print(f"Warning: Document content for {metadata.get('doc_id', 'unknown')} is empty, skipping chunking.")
return []
chunks = self.text_splitter.create_documents([doc_content])
processed_chunks = []
for i, chunk in enumerate(chunks):
chunk_metadata = metadata.copy()
# Ensure doc_id is present for chunk_id generation
doc_id = metadata.get('doc_id', f"unknown_doc_{hashlib.sha256(doc_content.encode()).hexdigest()[:8]}")
chunk_metadata["chunk_id"] = f"{doc_id}_chunk_{i}"
chunk_metadata["text"] = chunk.page_content
processed_chunks.append(chunk_metadata)
return processed_chunks
def deduplicate_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Removes duplicate chunks based on their text content.
Args:
chunks (List[Dict[str, Any]]): A list of chunk dictionaries.
Returns:
List[Dict[str, Any]]: A list of unique chunk dictionaries.
"""
seen_hashes = set()
unique_chunks = []
for chunk in chunks:
chunk_hash = self._generate_content_hash(chunk["text"])
if chunk_hash not in seen_hashes:
unique_chunks.append(chunk)
seen_hashes.add(chunk_hash)
return unique_chunks
def process_document_file(self, file_path: str, loader, subject: str = "general") -> List[Dict[str, Any]]:
"""
Loads, chunks, and prepares a document from a file for embedding and storage.
Args:
file_path (str): The path to the document file.
loader: An instance of a document loader (e.g., PDFLoader).
subject (str): The subject associated with this document.
Returns:
List[Dict[str, Any]]: A list of processed chunk dictionaries.
"""
print(f"Processing file: {file_path}")
doc_content = loader.load(file_path)
if not doc_content:
print(f"Could not load content from {file_path}, skipping.")
return []
content_hash = self._generate_content_hash(doc_content)
# Use file_path as doc_id for user-uploaded files
doc_id = file_path
if not self.is_document_new_or_modified(doc_id, content_hash):
print(f"Document {file_path} already processed and unchanged. Skipping.")
return []
# Store the raw document in the DOCUMENTS_DIR
destination_path = os.path.join(DOCUMENTS_DIR, os.path.basename(file_path))
if not os.path.exists(destination_path) or \
self._generate_content_hash(open(destination_path, 'rb').read().decode('utf-8', errors='ignore')) != content_hash:
shutil.copy(file_path, destination_path)
print(f"Copied {file_path} to {destination_path}")
else:
print(f"Raw document {file_path} already exists in {DOCUMENTS_DIR}.")
metadata = {
"doc_id": doc_id,
"source": f"local_file:{os.path.basename(file_path)}",
"file_path": destination_path,
"title": os.path.basename(file_path),
"subject": subject,
"timestamp": os.path.getmtime(file_path) # Last modified time
}
chunks = self.chunk_document(doc_content, metadata)
unique_chunks = self.deduplicate_chunks(chunks)
self.mark_document_as_processed(doc_id, content_hash)
print(f"Successfully processed {file_path}. Generated {len(unique_chunks)} unique chunks.")
return unique_chunks
def process_web_document(self, content: str, url: str, title: str, subject: str, timestamp: float) -> List[Dict[str, Any]]:
"""
Chunks and prepares a document from web-scraped content for embedding and storage.
Args:
content (str): The text content scraped from the web.
url (str): The source URL of the content.
title (str): The title of the web page.
subject (str): The subject associated with this document.
timestamp (float): Unix timestamp of when the content was scraped.
Returns:
List[Dict[str, Any]]: A list of processed chunk dictionaries.
"""
if not content.strip():
print(f"Warning: Web content from {url} is empty, skipping processing.")
return []
content_hash = self._generate_content_hash(content)
doc_id = url # Use URL as doc_id for web documents
if not self.is_document_new_or_modified(doc_id, content_hash):
print(f"Web document from {url} already processed and unchanged. Skipping.")
return []
# Store raw web content locally (optional, but good for debugging/reprocessing)
# Create a sanitized filename from the title or URL
sanitized_title = "".join(c for c in title if c.isalnum() or c in (' ', '.', '_')).rstrip()
if not sanitized_title:
sanitized_title = hashlib.sha256(url.encode()).hexdigest()[:10]
file_name = f"{sanitized_title}.txt"
destination_path = os.path.join(DOCUMENTS_DIR, file_name)
with open(destination_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"Stored raw web content from {url} to {destination_path}")
metadata = {
"doc_id": doc_id,
"source": f"web_url:{url}",
"file_path": destination_path,
"title": title,
"subject": subject,
"timestamp": timestamp
}
chunks = self.chunk_document(content, metadata)
unique_chunks = self.deduplicate_chunks(chunks)
self.mark_document_as_processed(doc_id, content_hash)
print(f"Successfully processed web document from {url}. Generated {len(unique_chunks)} unique chunks.")
return unique_chunks
```
`embedding_generator.py`
# embedding_generator.py
from typing import List, Dict
from sentence_transformers import SentenceTransformer
import numpy as np
from config import EMBEDDING_MODEL_CONFIG
class EmbeddingGenerator:
"""
Generates embeddings for text chunks using a specified sentence transformer model.
"""
def __init__(self):
"""
Initializes the EmbeddingGenerator by loading the specified model.
"""
model_name = EMBEDDING_MODEL_CONFIG["model_name"]
provider = EMBEDDING_MODEL_CONFIG["provider"]
if provider == "huggingface_local":
print(f"Loading local embedding model: {model_name}")
self.model = SentenceTransformer(model_name)
elif provider == "openai":
# For OpenAI, you would typically initialize the OpenAI client here.
# from openai import OpenAI
# self.client = OpenAI(api_key=EMBEDDING_MODEL_CONFIG["api_key"])
# self.model_name = model_name
raise NotImplementedError("OpenAI embedding integration not fully implemented in snippet.")
else:
raise ValueError(f"Unsupported embedding provider: {provider}")
def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""
Generates embeddings for a list of text strings.
Args:
texts (List[str]): A list of text strings (e.g., document chunks).
Returns:
List[List[float]]: A list of embedding vectors, where each vector
is a list of floats.
"""
if not texts:
return []
if EMBEDDING_MODEL_CONFIG["provider"] == "huggingface_local":
embeddings = self.model.encode(texts, convert_to_numpy=True).tolist()
elif EMBEDDING_MODEL_CONFIG["provider"] == "openai":
# response = self.client.embeddings.create(input=texts, model=self.model_name)
# embeddings = [d.embedding for d in response.data]
raise NotImplementedError("OpenAI embedding integration not fully implemented in snippet.")
else:
embeddings = [] # Should not reach here due to ValueError in __init__
return embeddings
vector_db.py
from typing import List, Dict, Any
from chromadb import PersistentClient
import os
import shutil
from config import VECTOR_DB_DIR
class VectorDatabase:
"""
Manages the interaction with the ChromaDB vector database.
Stores and retrieves document chunks and their embeddings.
"""
def __init__(self, collection_name: str = "agentic_ai_knowledge"):
"""
Initializes the VectorDatabase client and collection.
Args:
collection_name (str): The name of the collection to use in ChromaDB.
"""
self.client = PersistentClient(path=VECTOR_DB_DIR)
self.collection_name = collection_name
# For this example, we assume embeddings are provided externally.
# ChromaDB will store them as-is.
self.collection = self.client.get_or_create_collection(
name=self.collection_name
)
print(f"Initialized ChromaDB at {VECTOR_DB_DIR} with collection '{collection_name}'")
print(f"Current items in collection: {self.collection.count()}")
def add_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[float]]):
"""
Adds a list of document chunks and their embeddings to the vector database.
Args:
chunks (List[Dict[str, Any]]): A list of chunk dictionaries, each
containing 'text' and 'metadata'.
embeddings (List[List[float]]): A list of embedding vectors,
corresponding to the chunks.
"""
if not chunks or not embeddings:
print("No chunks or embeddings to add.")
return
ids = [chunk["chunk_id"] for chunk in chunks]
documents = [chunk["text"] for chunk in chunks]
# Filter metadata to ensure it's JSON serializable
metadatas = []
for chunk in chunks:
filtered_metadata = {k: v for k, v in chunk.items() if k not in ["text", "chunk_id"]}
# Convert non-serializable types if necessary, e.g., float timestamps to int/str
for k, v in filtered_metadata.items():
if isinstance(v, float):
filtered_metadata[k] = str(v) # Convert float timestamp to string
metadatas.append(filtered_metadata)
# ChromaDB's add method requires IDs, embeddings, metadatas, and documents.
# We ensure no duplicate IDs are added by checking existing IDs.
# Note: A more robust approach for updates might involve `upsert` if available
# or explicit deletion before adding. For simplicity, we skip existing IDs.
existing_ids_in_db = set(self.collection.get(ids=ids, include=[])["ids"])
new_ids, new_documents, new_metadatas, new_embeddings = [], [], [], []
for i, chunk_id in enumerate(ids):
if chunk_id not in existing_ids_in_db:
new_ids.append(chunk_id)
new_documents.append(documents[i])
new_metadatas.append(metadatas[i])
new_embeddings.append(embeddings[i])
# else:
# print(f"Skipping existing chunk ID: {chunk_id}") # Too verbose
if new_ids:
self.collection.add(
ids=new_ids,
embeddings=new_embeddings,
metadatas=new_metadatas,
documents=new_documents
)
print(f"Added {len(new_ids)} new chunks to ChromaDB collection '{self.collection_name}'. Total items: {self.collection.count()}.")
else:
print("No new unique chunks to add after checking for existing IDs.")
def query_chunks(self, query_embedding: List[float], n_results: int = 10) -> List[Dict[str, Any]]:
"""
Queries the vector database for the most similar chunks.
Args:
query_embedding (List[float]): The embedding of the user's query.
n_results (int): The number of top similar chunks to retrieve.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each representing a
retrieved chunk with its content and metadata.
"""
if not query_embedding:
print("Query embedding is empty, cannot perform query.")
return []
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
include=['documents', 'metadatas', 'distances']
)
retrieved_chunks = []
if results and results['documents'] and results['documents'][0]:
for i in range(len(results['documents'][0])):
chunk = {
"text": results['documents'][0][i],
"metadata": results['metadatas'][0][i],
"distance": results['distances'][0][i]
}
retrieved_chunks.append(chunk)
return retrieved_chunks
def reset_database(self):
"""
Completely resets (deletes and recreates) the vector database.
Use with caution!
"""
print(f"WARNING: Resetting ChromaDB at {VECTOR_DB_DIR}...")
try:
# Delete the entire directory for a clean reset
if os.path.exists(VECTOR_DB_DIR):
shutil.rmtree(VECTOR_DB_DIR)
os.makedirs(VECTOR_DB_DIR, exist_ok=True)
# Re-initialize client and collection
self.client = PersistentClient(path=VECTOR_DB_DIR)
self.collection = self.client.get_or_create_collection(name=self.collection_name)
print("ChromaDB collection and directory reset successfully.")
except Exception as e:
print(f"Error resetting ChromaDB collection: {e}")
# Fallback if directory deletion fails for some reason
self.client = PersistentClient(path=VECTOR_DB_DIR)
self.collection = self.client.get_or_create_collection(name=self.collection_name)
def get_collection_count(self) -> int:
"""Returns the number of items in the collection."""
return self.collection.count()
```
`retriever.py`
# retriever.py
from typing import List, Dict, Any
from embedding_generator import EmbeddingGenerator
from vector_db import VectorDatabase
class Retriever:
"""
Handles the initial retrieval of relevant document chunks from the
vector database based on a user query.
"""
def __init__(self, vector_db: VectorDatabase, embedding_generator: EmbeddingGenerator):
"""
Initializes the Retriever with a VectorDatabase instance and an
EmbeddingGenerator.
"""
self.vector_db = vector_db
self.embedding_generator = embedding_generator
def retrieve_chunks(self, query: str, k: int = 10) -> List[Dict[str, Any]]:
"""
Retrieves the top-k most relevant document chunks for a given query.
Args:
query (str): The user's query string.
k (int): The number of chunks to retrieve initially.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each representing a
retrieved chunk with its content and metadata.
"""
if not query:
print("Query is empty, skipping retrieval.")
return []
# Generate embedding for the query
query_embedding = self.embedding_generator.generate_embeddings([query])[0]
# Query the vector database
retrieved_chunks = self.vector_db.query_chunks(query_embedding, n_results=k)
print(f"Initial retrieval found {len(retrieved_chunks)} chunks.")
return retrieved_chunks
`reranker.py`
# reranker.py
from typing import List, Dict, Any
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from config import RERANKER_MODEL_CONFIG
class Reranker:
"""
Reranks a list of retrieved document chunks based on their relevance to a query
using a cross-encoder model.
"""
def __init__(self):
"""
Initializes the Reranker by loading the specified cross-encoder model
and tokenizer.
"""
model_name = RERANKER_MODEL_CONFIG["model_name"]
provider = RERANKER_MODEL_CONFIG["provider"]
if provider == "huggingface_local":
print(f"Loading local reranker model: {model_name}")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model.to(self.device)
self.model.eval() # Set model to evaluation mode
elif provider == "cohere":
# For Cohere, you would typically initialize the Cohere client here.
# import cohere
# self.client = cohere.Client(RERANKER_MODEL_CONFIG["api_key"])
raise NotImplementedError("Cohere reranking integration not fully implemented in snippet.")
else:
raise ValueError(f"Unsupported reranker provider: {provider}")
def rerank_chunks(self, query: str, chunks: List[Dict[str, Any]], top_n: int = 5) -> List[Dict[str, Any]]:
"""
Reranks a list of document chunks based on their relevance to the query.
Args:
query (str): The user's query string.
chunks (List[Dict[str, Any]]): A list of initially retrieved chunk
dictionaries. Each dict must have a 'text' key.
top_n (int): The number of top-ranked chunks to return.
Returns:
List[Dict[str, Any]]: A list of reranked chunk dictionaries, sorted
by relevance score in descending order.
"""
if not chunks:
return []
if RERANKER_MODEL_CONFIG["provider"] == "huggingface_local":
sentences = [(query, chunk["text"]) for chunk in chunks]
with torch.no_grad():
inputs = self.tokenizer(sentences, padding=True, truncation=True,
return_tensors="pt").to(self.device)
scores = self.model(**inputs).logits.squeeze().cpu().numpy()
# Pair chunks with their scores
scored_chunks = []
for i, chunk in enumerate(chunks):
chunk_copy = chunk.copy()
chunk_copy["relevance_score"] = float(scores[i]) # Convert numpy float to Python float
scored_chunks.append(chunk_copy)
# Sort by relevance score in descending order
reranked_chunks = sorted(scored_chunks, key=lambda x: x["relevance_score"], reverse=True)
print(f"Reranked {len(chunks)} chunks, returning top {min(top_n, len(reranked_chunks))}.")
return reranked_chunks[:top_n]
elif RERANKER_MODEL_CONFIG["provider"] == "cohere":
# response = self.client.rerank(
# query=query,
# documents=[chunk["text"] for chunk in chunks],
# top_n=top_n,
# model=RERANKER_MODEL_CONFIG["model_name"]
# )
# reranked_results = []
# for rank in response.results:
# original_chunk = chunks[rank.index]
# original_chunk["relevance_score"] = rank.relevance_score
# reranked_results.append(original_chunk)
# return reranked_results
raise NotImplementedError("Cohere reranking integration not fully implemented in snippet.")
else:
return [] # Should not reach here
`agent.py`
# agent.py
from typing import List, Dict, Any
from langchain_community.llms import Ollama
from langchain_openai import OpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from config import LLM_CONFIG, AGENT_SUBJECTS
from retriever import Retriever
from reranker import Reranker
from embedding_generator import EmbeddingGenerator
from vector_db import VectorDatabase
from document_processor import DocumentProcessor
from document_loaders import DocumentLoaderFactory
from web_scraper import WebScraper
import os
import time
class AgentLLM:
"""
Manages the interaction with the Large Language Model (LLM), handling
different providers based on configuration.
"""
def __init__(self):
"""
Initializes the LLM client based on the configuration.
"""
provider = LLM_CONFIG["provider"]
model_name = LLM_CONFIG["model_name"]
if provider == "ollama":
print(f"Initializing Ollama LLM with model: {model_name} at {LLM_CONFIG['api_base']}")
self.llm = Ollama(model=model_name, base_url=LLM_CONFIG["api_base"])
elif provider == "openai":
print(f"Initializing OpenAI LLM with model: {model_name}")
self.llm = OpenAI(model_name=model_name, openai_api_key=LLM_CONFIG["api_key"])
elif provider == "huggingface_local":
# For local HuggingFace models, you might use transformers pipeline
# from langchain_community.llms import HuggingFacePipeline
# from transformers import pipeline
# self.llm = HuggingFacePipeline(pipeline=pipeline("text-generation", model=model_name))
raise NotImplementedError("HuggingFace local LLM integration not fully implemented in snippet.")
else:
raise ValueError(f"Unsupported LLM provider: {provider}")
# Define a simple prompt template for RAG
self.rag_prompt = ChatPromptTemplate.from_messages([
("system", "You are an expert assistant, specializing in subjects like "
f"{', '.join(AGENT_SUBJECTS)}. Your goal is to provide concise, accurate, and helpful "
"answers based *only* on the provided context. If the context does not contain "
"enough information to answer the question, state that you don't have enough "
"information and avoid making up answers."),
("human", "Context:\n{context}\n\nQuestion: {question}")
])
self.output_parser = StrOutputParser()
self.rag_chain = self.rag_prompt | self.llm | self.output_parser
def generate_response(self, prompt_text: str) -> str:
"""
Generates a text response from the LLM based on the given prompt.
This is for direct LLM calls without RAG.
Args:
prompt_text (str): The input prompt for the LLM.
Returns:
str: The LLM's generated response.
"""
try:
response = self.llm.invoke(prompt_text)
return response
except Exception as e:
print(f"Error generating LLM response: {e}")
return "I apologize, but I encountered an error while trying to generate a response."
def generate_rag_response(self, question: str, context: str) -> str:
"""
Generates a text response from the LLM using the RAG chain.
Args:
question (str): The user's question.
context (str): The concatenated, relevant document chunks.
Returns:
str: The LLM's generated response based on the context.
"""
try:
response = self.rag_chain.invoke({"context": context, "question": question})
return response
except Exception as e:
print(f"Error generating RAG response: {e}")
return "I apologize, but I encountered an error while trying to generate a response using the provided context."
class AgenticAI:
"""
The main orchestrator for the Agentic AI system, managing interactions
between components and handling user requests.
"""
def __init__(self):
"""
Initializes all core components of the Agentic AI.
"""
self.embedding_generator = EmbeddingGenerator()
self.vector_db = VectorDatabase()
self.retriever = Retriever(self.vector_db, self.embedding_generator)
self.reranker = Reranker()
self.llm_agent = AgentLLM()
self.doc_processor = DocumentProcessor()
self.doc_loader_factory = DocumentLoaderFactory()
self.web_scraper = WebScraper()
print("Agentic AI system initialized.")
def _process_and_add_chunks(self, chunks: List[Dict[str, Any]]):
"""Helper to generate embeddings and add chunks to DB."""
if not chunks:
return
texts_to_embed = [chunk["text"] for chunk in chunks]
embeddings = self.embedding_generator.generate_embeddings(texts_to_embed)
if embeddings:
self.vector_db.add_chunks(chunks, embeddings)
else:
print("No embeddings generated, skipping adding to DB.")
def update_knowledge_from_web(self, topic: str):
"""
Searches the web for new information on a given topic, processes it,
and adds it to the knowledge base.
"""
print(f"\n--- Initiating web knowledge update for topic: '{topic}' ---")
scraped_docs = self.web_scraper.search_and_scrape_for_topic(topic)
total_new_chunks = []
for doc in scraped_docs:
processed_chunks = self.doc_processor.process_web_document(
content=doc["content"],
url=doc["source_url"],
title=doc["title"],
subject=doc["subject"],
timestamp=doc["timestamp"]
)
total_new_chunks.extend(processed_chunks)
self._process_and_add_chunks(total_new_chunks)
print(f"--- Web knowledge update for '{topic}' completed. Total new chunks added: {len(total_new_chunks)} ---")
def process_user_uploaded_document(self, file_path: str):
"""
Processes a single user-uploaded document and adds it to the knowledge base.
"""
print(f"\n--- Processing user-uploaded document: {file_path} ---")
loader = self.doc_loader_factory.get_loader(file_path)
if not loader:
print(f"Skipping {file_path}: Unsupported file type.")
return
# For user-uploaded documents, we can assign them to a general subject
# or prompt the user for a subject if a UI were present.
# For this example, we'll assign to the first configured subject.
subject = AGENT_SUBJECTS[0] if AGENT_SUBJECTS else "general"
processed_chunks = self.doc_processor.process_document_file(file_path, loader, subject)
self._process_and_add_chunks(processed_chunks)
print(f"--- User document processing for {file_path} completed. Total new chunks added: {len(processed_chunks)} ---")
def handle_user_prompt(self, prompt: str) -> str:
"""
Handles a user's prompt, orchestrating retrieval, reranking, and LLM generation.
Also detects update commands.
"""
prompt_lower = prompt.lower()
# Check for update commands
if "update knowledge base" in prompt_lower:
for subject in AGENT_SUBJECTS:
if subject.lower() in prompt_lower:
self.update_knowledge_from_web(subject)
return f"Initiated knowledge base update for '{subject}'. This may take a moment."
return "Please specify a subject to update, e.g., 'Update knowledge base on Sustainable Manufacturing Practices'."
# Regular RAG query
print(f"\n--- Handling user query: '{prompt}' ---")
retrieved_chunks = self.retriever.retrieve_chunks(prompt, k=20) # Retrieve more for reranking
if not retrieved_chunks:
print("No relevant chunks found in the knowledge base.")
return self.llm_agent.generate_response(f"I don't have enough information in my knowledge base to answer: {prompt}")
reranked_chunks = self.reranker.rerank_chunks(prompt, retrieved_chunks, top_n=5) # Select top 5 after reranking
if not reranked_chunks:
print("No chunks remained after reranking.")
return self.llm_agent.generate_response(f"I found some information, but it wasn't relevant enough to answer: {prompt}")
context = "\n\n".join([chunk["text"] for chunk in reranked_chunks])
print(f"--- Generated RAG context from {len(reranked_chunks)} reranked chunks. ---")
response = self.llm_agent.generate_rag_response(prompt, context)
return response
def get_knowledge_base_status(self) -> str:
"""Provides a status report on the knowledge base."""
num_chunks = self.vector_db.get_collection_count()
num_processed_docs = len(self.doc_processor.processed_metadata)
return f"Knowledge Base Status: {num_chunks} chunks from {num_processed_docs} unique documents."
`web_scraper.py`
# web_scraper.py
import requests
from bs4 import BeautifulSoup
from typing import List, Dict, Any
import time
from config import WEB_SCRAPE_CONFIG, AGENT_SUBJECTS
import os
class WebScraper:
"""
Handles web searching and scraping to gather new information.
"""
def __init__(self):
"""
Initializes the WebScraper with search engine configuration.
"""
self.search_api_key = WEB_SCRAPE_CONFIG["search_engine_api_key"]
self.search_base_url = WEB_SCRAPE_CONFIG["search_engine_base_url"]
self.max_results = WEB_SCRAPE_CONFIG["max_search_results"]
self.scrape_timeout = WEB_SCRAPE_CONFIG["scrape_timeout"]
if not self.search_api_key or self.search_api_key == "YOUR_SERPER_API_KEY":
print("Warning: SERPER_API_KEY is not set or is default. Web search will be limited or fail.")
def _perform_search(self, query: str) -> List[Dict[str, Any]]:
"""
Performs a web search using a search engine API (e.g., Serper.dev).
Args:
query (str): The search query string.
Returns:
List[Dict[str, Any]]: A list of search results, each containing
'title', 'link', and 'snippet'.
"""
if not self.search_api_key or self.search_api_key == "YOUR_SERPER_API_KEY":
print("Skipping web search: SERPER_API_KEY not configured.")
return []
if self.max_results <= 0:
print("Web search disabled: max_search_results is 0.")
return []
headers = {
'X-API-KEY': self.search_api_key,
'Content-Type': 'application/json'
}
params = {
'q': query,
'num': self.max_results
}
try:
response = requests.get(self.search_base_url, headers=headers, params=params, timeout=self.scrape_timeout)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
data = response.json()
organic_results = data.get('organic', [])
return [{'title': r.get('title'), 'link': r.get('link'), 'snippet': r.get('snippet')}
for r in organic_results if r.get('link')]
except requests.exceptions.RequestException as e:
print(f"Error during web search for '{query}': {e}")
return []
except Exception as e:
print(f"An unexpected error occurred during web search for '{query}': {e}")
return []
def scrape_page_content(self, url: str) -> str:
"""
Scrapes the main text content from a given URL.
Args:
url (str): The URL of the page to scrape.
Returns:
str: The extracted text content, or an empty string if scraping fails.
"""
try:
response = requests.get(url, timeout=self.scrape_timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script, style, and other non-content elements
for script_or_style in soup(["script", "style", "header", "footer", "nav", "form"]):
script_or_style.decompose()
# Get text and clean it
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
return text
except requests.exceptions.RequestException as e:
print(f"Error scraping {url}: {e}")
return ""
except Exception as e:
print(f"General error scraping {url}: {e}")
return ""
def search_and_scrape_for_topic(self, topic: str) -> List[Dict[str, Any]]:
"""
Performs a web search for a topic and scrapes content from the results.
Args:
topic (str): The subject topic to search for.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each containing
'content', 'source_url', 'title', 'timestamp'.
"""
print(f"Searching web for new information on '{topic}'...")
search_query = f"{topic} latest news OR research papers"
results = self._perform_search(search_query)
scraped_documents = []
for i, result in enumerate(results):
print(f" Scraping result {i+1}/{len(results)}: {result['link']}")
content = self.scrape_page_content(result['link'])
if content:
scraped_documents.append({
"content": content,
"source_url": result['link'],
"title": result['title'],
"timestamp": time.time(), # Record when it was scraped
"subject": topic
})
time.sleep(1) # Be polite to websites
print(f"Finished web scraping for '{topic}'. Found {len(scraped_documents)} new documents.")
return scraped_documents
```
`file_watcher.py`
# file_watcher.py
import time
import os
import hashlib
import json
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from typing import Callable, Dict, Any
from config import USER_UPLOAD_FOLDER, PROCESSED_DOCS_METADATA_PATH
class UserUploadHandler(FileSystemEventHandler):
"""
Event handler for file system events in the user upload folder.
Triggers processing for new files.
"""
def __init__(self, process_new_document_callback: Callable[[str], None]):
"""
Initializes the handler with a callback function to process new documents.
Args:
process_new_document_callback (Callable[[str], None]): A function
that takes a
file path
and processes it.
"""
self.process_new_document = process_new_document_callback
# Load all processed hashes, the DocumentProcessor will handle if it's new/modified
self.processed_file_hashes = self._load_all_processed_hashes()
print(f"Initialized UserUploadHandler. Monitoring: {USER_UPLOAD_FOLDER}")
def _load_all_processed_hashes(self) -> Dict[str, str]:
"""Loads all metadata of already processed documents."""
if os.path.exists(PROCESSED_DOCS_METADATA_PATH):
with open(PROCESSED_DOCS_METADATA_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def _get_file_hash(self, file_path: str) -> str:
"""Generates a hash for the content of a file."""
hasher = hashlib.sha256()
try:
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
print(f"Error hashing file {file_path}: {e}")
return ""
def on_created(self, event):
"""
Called when a file or directory is created.
"""
if not event.is_directory:
print(f"Detected new file creation event: {event.src_path}")
self._check_and_process_file(event.src_path)
def on_modified(self, event):
"""
Called when a file or directory is modified.
This can also indicate a new file being fully written.
"""
if not event.is_directory:
# Sometimes 'created' isn't fired immediately or reliably for large files.
# 'modified' catches the completion of file writes.
print(f"Detected file modification event: {event.src_path}")
self._check_and_process_file(event.src_path)
def _check_and_process_file(self, file_path: str):
"""
Checks if a file is new or modified and processes it if needed.
"""
# Ensure the file is fully written before processing
# This is a heuristic; more robust solutions might involve locking or retry logic.
time.sleep(2) # Give the OS a moment to finish writing
if not os.path.exists(file_path):
print(f"File {file_path} no longer exists, skipping processing.")
return
current_hash = self._get_file_hash(file_path)
if not current_hash:
print(f"Could not get hash for {file_path}, skipping.")
return
# The actual check for new/modified is handled by DocumentProcessor
# We just call the callback, and it will decide if processing is needed.
print(f"Triggering processing for potential new/modified document: {file_path}")
self.process_new_document(file_path) # The callback handles the deduplication logic
class FileWatcher:
"""
Sets up and starts an observer to watch for file system events.
"""
def __init__(self, folder_to_watch: str, process_new_document_callback: Callable[[str], None]):
"""
Initializes the FileWatcher.
Args:
folder_to_watch (str): The path to the folder to monitor.
process_new_document_callback (Callable[[str], None]): Callback for new docs.
"""
self.folder_to_watch = folder_to_watch
self.event_handler = UserUploadHandler(process_new_document_callback)
self.observer = Observer()
def start(self):
"""Starts the file system observer."""
print(f"Starting file watcher for folder: {self.folder_to_watch}")
self.observer.schedule(self.event_handler, self.folder_to_watch, recursive=False) # Only watch top level
self.observer.start()
# The main loop will keep the program alive
# try:
# while True:
# time.sleep(1)
# except KeyboardInterrupt:
# self.stop()
def stop(self):
"""Stops the file system observer."""
if self.observer.is_alive():
print("Stopping file watcher.")
self.observer.stop()
self.observer.join()
`scheduler.py`
# scheduler.py
from apscheduler.schedulers.background import BackgroundScheduler
import time
from typing import Callable
from config import AUTO_UPDATE_INTERVAL_HOURS
class KnowledgeBaseScheduler:
"""
Schedules automated tasks, such as periodically updating the knowledge base.
"""
def __init__(self, update_function: Callable[[], None]):
"""
Initializes the scheduler with the function to call for updates.
Args:
update_function (Callable[[], None]): The function to execute
periodically for updates.
"""
self.scheduler = BackgroundScheduler()
self.update_function = update_function
self.interval_hours = AUTO_UPDATE_INTERVAL_HOURS
def start(self):
"""
Starts the scheduler and adds the update job.
"""
if self.interval_hours > 0:
# Schedule the job to run every 'interval_hours'
# Start it 1 minute after launch to allow other components to initialize
self.scheduler.add_job(self.update_function, 'interval', hours=self.interval_hours,
next_run_time=time.time() + 60)
print(f"Scheduled automated knowledge base updates every {self.interval_hours} hours.")
self.scheduler.start()
else:
print("Automated updates are disabled (AUTO_UPDATE_INTERVAL_HOURS is 0 or less).")
def stop(self):
"""
Stops the scheduler.
"""
if self.scheduler.running:
print("Stopping knowledge base scheduler.")
self.scheduler.shutdown()
`main.py`
# main.py
import time
import os
import sys
from agent import AgenticAI
from file_watcher import FileWatcher
from scheduler import KnowledgeBaseScheduler
from config import USER_UPLOAD_FOLDER, AGENT_SUBJECTS
def main():
"""
Main function to initialize and run the Agentic AI system.
"""
print("==================================================")
print(" Initializing Agentic AI System ")
print("==================================================")
agent = AgenticAI()
# --- Initialize File Watcher for User Uploads ---
# The callback for the file watcher is agent.process_user_uploaded_document
file_watcher = FileWatcher(USER_UPLOAD_FOLDER, agent.process_user_uploaded_document)
file_watcher.start()
# --- Initialize Scheduler for Automated Web Updates ---
# The callback for the scheduler needs to update all configured subjects.
def automated_web_update_all_subjects():
print("\n--- Automated knowledge base update triggered ---")
for subject in AGENT_SUBJECTS:
agent.update_knowledge_from_web(subject)
print("--- Automated knowledge base update finished ---")
kb_scheduler = KnowledgeBaseScheduler(automated_web_update_all_subjects)
kb_scheduler.start()
print("\nAgentic AI system is ready!")
print(f"Monitoring '{USER_UPLOAD_FOLDER}' for new documents.")
print("Type your queries or commands below. Type 'exit' to quit.")
print("Try: 'What are sustainable manufacturing practices?'")
print("Or: 'Update knowledge base on Sustainable Manufacturing Practices'")
print("Or: 'status' to check knowledge base status.")
print("--------------------------------------------------")
try:
while True:
user_input = input("\nYour prompt > ").strip()
if user_input.lower() == 'exit':
print("Exiting Agentic AI System. Goodbye!")
break
elif user_input.lower() == 'status':
print(agent.get_knowledge_base_status())
else:
response = agent.handle_user_prompt(user_input)
print("\nAgentic AI Response:")
print(response)
time.sleep(0.1) # Small delay to prevent busy-waiting
except KeyboardInterrupt:
print("\nKeyboard interrupt detected. Shutting down...")
finally:
file_watcher.stop()
kb_scheduler.stop()
print("All background processes stopped.")
print("==================================================")
print(" Agentic AI System Shut Down ")
print("==================================================")
if __name__ == "__main__":
main()
How to Run the Example
1. Save all files: Save each code block into its respective `.py` file in the `agentic_ai_system` directory.
2. Configure `config.py`:
Replace `"YOUR_SERPER_API_KEY"` with your actual Serper API key.
Ensure `LLM_CONFIG` points to your running Ollama instance or OpenAI (if you configure it).
3. Start Ollama (if using local LLM): Open a terminal and run `ollama run llama3` (or your chosen model).
4. Run `main.py`: Open a *new* terminal in the `agentic_ai_system` directory and execute:
bash
python main.py
The system will initialize, start the file watcher, and the scheduler. You can then interact with it by typing prompts in the console.
- To add documents: Place a `.pdf`, `.html`, or `.md` file into the `data/user_uploads` folder. The file watcher will detect it, process it, and add its content to the knowledge base.
- To update knowledge from web: Type a command like `Update knowledge base on Sustainable Manufacturing Practices`.
- To query**: Ask questions related to the configured subjects, e.g., `What are the key principles of circular economy in engineering?` or `Tell me about the latest innovations in sustainable manufacturing.`
This running example provides a solid foundation for your Agentic AI system, ready for further expansion and customization!