Introduction to Agentic AI Search Engines
The evolution of search technology has reached a pivotal moment with the emergence of agentic AI systems that can understand context, reason about information, and provide intelligent responses rather than simple keyword matches. Unlike traditional search engines that return lists of documents, agentic AI search engines act as intelligent agents that comprehend user intent, synthesize information from multiple sources, and generate coherent, contextual responses.
An agentic AI search engine differs fundamentally from conventional search systems in its ability to perform multi-step reasoning, maintain conversation context, and adapt its search strategy based on the type of query and available data sources. This approach transforms search from a retrieval task into an intelligent dialogue between the user and an AI agent that can understand nuanced questions and provide comprehensive answers.
The system we will develop combines the power of large language models with sophisticated information retrieval techniques, creating a search engine that can operate both as a standalone application and as an embedded component within larger software systems. The flexibility to configure search domains, data sources, and LLM backends makes this approach suitable for diverse use cases ranging from enterprise knowledge management to consumer applications.
Architecture Overview and Core Components
The architecture of our agentic AI search engine follows a modular design that separates concerns while maintaining tight integration between components. The system consists of several key layers that work together to provide intelligent search capabilities.
The foundation layer contains the core search engine implementation, which manages document indexing, query processing, and result ranking. This layer is responsible for the fundamental information retrieval operations and maintains independence from specific AI models or user interfaces.
Above the foundation sits the LLM integration layer, which provides a unified interface for working with different language models, whether they are hosted locally or accessed through remote APIs. This abstraction allows the system to switch between different models based on configuration settings or performance requirements.
The configuration management system spans across all layers, providing centralized control over search behavior, model selection, and system parameters. This component enables the flexibility needed for both standalone and embedded deployment scenarios.
The application integration framework handles the interface between the search engine and host applications when operating in embedded mode. This framework provides APIs for configuration, query submission, and result retrieval while maintaining isolation between the search engine and the host application.
Let me demonstrate the core architecture with a foundational implementation that establishes the main system components:
import asyncio
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Union, Any
from pathlib import Path
import hashlib
import time
class DeploymentMode(Enum):
STANDALONE = "standalone"
EMBEDDED = "embedded"
class LLMProvider(Enum):
LOCAL_OLLAMA = "local_ollama"
OPENAI = "openai"
ANTHROPIC = "anthropic"
AZURE_OPENAI = "azure_openai"
@dataclass
class SearchConfig:
deployment_mode: DeploymentMode = DeploymentMode.STANDALONE
llm_provider: LLMProvider = LLMProvider.LOCAL_OLLAMA
llm_model: str = "llama2"
max_results: int = 10
search_timeout: float = 30.0
enable_caching: bool = True
cache_ttl: int = 3600
embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"
chunk_size: int = 512
chunk_overlap: int = 50
temperature: float = 0.7
max_tokens: int = 2048
search_domains: List[str] = field(default_factory=list)
excluded_paths: List[str] = field(default_factory=list)
custom_parameters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SearchResult:
content: str
source: str
relevance_score: float
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
@dataclass
class SearchResponse:
query: str
answer: str
sources: List[SearchResult]
processing_time: float
model_used: str
confidence_score: float = 0.0
follow_up_questions: List[str] = field(default_factory=list)
class AgenticSearchEngine:
def __init__(self, config: SearchConfig):
self.config = config
self.logger = logging.getLogger(__name__)
self.document_store = None
self.llm_client = None
self.embedding_model = None
self.cache = {}
self.is_initialized = False
async def initialize(self):
"""Initialize all components of the search engine"""
self.logger.info(f"Initializing search engine in {self.config.deployment_mode.value} mode")
# Initialize components based on configuration
await self._initialize_document_store()
await self._initialize_llm_client()
await self._initialize_embedding_model()
self.is_initialized = True
self.logger.info("Search engine initialization completed")
async def _initialize_document_store(self):
"""Initialize the document storage and indexing system"""
# Implementation will be detailed in subsequent sections
pass
async def _initialize_llm_client(self):
"""Initialize the LLM client based on configuration"""
# Implementation will be detailed in subsequent sections
pass
async def _initialize_embedding_model(self):
"""Initialize the embedding model for semantic search"""
# Implementation will be detailed in subsequent sections
pass
async def search(self, query: str, context: Optional[Dict] = None) -> SearchResponse:
"""Main search method that orchestrates the entire search process"""
if not self.is_initialized:
raise RuntimeError("Search engine not initialized. Call initialize() first.")
start_time = time.time()
# Generate cache key for the query
cache_key = self._generate_cache_key(query, context)
# Check cache if enabled
if self.config.enable_caching and cache_key in self.cache:
cached_result = self.cache[cache_key]
if time.time() - cached_result['timestamp'] < self.config.cache_ttl:
self.logger.info(f"Returning cached result for query: {query}")
return cached_result['response']
# Perform the actual search
response = await self._perform_search(query, context)
response.processing_time = time.time() - start_time
# Cache the result if caching is enabled
if self.config.enable_caching:
self.cache[cache_key] = {
'response': response,
'timestamp': time.time()
}
return response
def _generate_cache_key(self, query: str, context: Optional[Dict]) -> str:
"""Generate a unique cache key for the query and context"""
content = f"{query}_{json.dumps(context or {}, sort_keys=True)}"
return hashlib.md5(content.encode()).hexdigest()
async def _perform_search(self, query: str, context: Optional[Dict]) -> SearchResponse:
"""Perform the actual search operation"""
# This method will be implemented in detail in subsequent sections
# For now, return a placeholder response
return SearchResponse(
query=query,
answer="Search implementation in progress",
sources=[],
processing_time=0.0,
model_used=self.config.llm_model
)
This foundational code establishes the main structure of our agentic search engine. The SearchConfig class provides comprehensive configuration options that control every aspect of the system's behavior. The AgenticSearchEngine class serves as the main orchestrator, managing the initialization and coordination of all system components.
The configuration system is designed to be flexible enough to handle both standalone and embedded deployment scenarios. In standalone mode, the search engine operates independently and can search across configured domains or document collections. In embedded mode, the host application can specify exactly what documents or data sources should be searched, providing fine-grained control over the search scope.
Search Engine Core Implementation
The core search functionality combines traditional information retrieval techniques with modern AI capabilities to provide intelligent responses to user queries. The search process involves several stages: query analysis, document retrieval, context synthesis, and response generation.
Query analysis is the first critical step where the system attempts to understand the user's intent, identify key concepts, and determine the most appropriate search strategy. This involves both linguistic analysis and semantic understanding to extract meaningful search terms and concepts.
Document retrieval uses a hybrid approach combining keyword-based search with semantic similarity matching. The system maintains both traditional inverted indexes for fast keyword lookup and vector embeddings for semantic search capabilities. This dual approach ensures that the system can handle both specific factual queries and more abstract conceptual questions.
Let me implement the core search functionality with detailed document processing and retrieval capabilities:
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import re
from collections import defaultdict
import sqlite3
from sentence_transformers import SentenceTransformer
import torch
class DocumentStore:
def __init__(self, config: SearchConfig):
self.config = config
self.documents = {}
self.document_chunks = {}
self.tfidf_vectorizer = TfidfVectorizer(
max_features=10000,
stop_words='english',
ngram_range=(1, 2)
)
self.tfidf_matrix = None
self.chunk_embeddings = None
self.embedding_model = None
self.db_connection = None
async def initialize(self):
"""Initialize the document store with indexing capabilities"""
# Initialize embedding model
self.embedding_model = SentenceTransformer(self.config.embedding_model)
# Initialize SQLite database for metadata storage
self.db_connection = sqlite3.connect(':memory:')
await self._create_database_schema()
print(f"Document store initialized with embedding model: {self.config.embedding_model}")
async def _create_database_schema(self):
"""Create database schema for document metadata"""
cursor = self.db_connection.cursor()
cursor.execute('''
CREATE TABLE documents (
id TEXT PRIMARY KEY,
title TEXT,
source TEXT,
content_length INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT
)
''')
cursor.execute('''
CREATE TABLE document_chunks (
id TEXT PRIMARY KEY,
document_id TEXT,
chunk_index INTEGER,
content TEXT,
embedding_vector BLOB,
FOREIGN KEY (document_id) REFERENCES documents (id)
)
''')
self.db_connection.commit()
async def add_document(self, doc_id: str, content: str, source: str, metadata: Dict = None):
"""Add a document to the store with chunking and indexing"""
if metadata is None:
metadata = {}
# Store document metadata
cursor = self.db_connection.cursor()
cursor.execute('''
INSERT OR REPLACE INTO documents (id, title, source, content_length, metadata)
VALUES (?, ?, ?, ?, ?)
''', (doc_id, metadata.get('title', ''), source, len(content), json.dumps(metadata)))
# Chunk the document
chunks = self._chunk_document(content)
self.document_chunks[doc_id] = chunks
# Generate embeddings for chunks
chunk_texts = [chunk['content'] for chunk in chunks]
embeddings = self.embedding_model.encode(chunk_texts)
# Store chunks and embeddings
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = f"{doc_id}_chunk_{i}"
cursor.execute('''
INSERT OR REPLACE INTO document_chunks
(id, document_id, chunk_index, content, embedding_vector)
VALUES (?, ?, ?, ?, ?)
''', (chunk_id, doc_id, i, chunk['content'], embedding.tobytes()))
self.db_connection.commit()
# Update TF-IDF index
await self._update_tfidf_index()
print(f"Added document {doc_id} with {len(chunks)} chunks")
def _chunk_document(self, content: str) -> List[Dict]:
"""Split document into overlapping chunks for better retrieval"""
sentences = re.split(r'[.!?]+', content)
chunks = []
current_chunk = ""
current_length = 0
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
sentence_length = len(sentence.split())
if current_length + sentence_length > self.config.chunk_size and current_chunk:
# Create chunk with overlap
chunks.append({
'content': current_chunk.strip(),
'start_pos': len(chunks) * (self.config.chunk_size - self.config.chunk_overlap),
'length': current_length
})
# Start new chunk with overlap
overlap_words = current_chunk.split()[-self.config.chunk_overlap:]
current_chunk = ' '.join(overlap_words) + ' ' + sentence
current_length = len(overlap_words) + sentence_length
else:
current_chunk += ' ' + sentence
current_length += sentence_length
# Add final chunk
if current_chunk.strip():
chunks.append({
'content': current_chunk.strip(),
'start_pos': len(chunks) * (self.config.chunk_size - self.config.chunk_overlap),
'length': current_length
})
return chunks
async def _update_tfidf_index(self):
"""Update the TF-IDF index with all document chunks"""
all_chunks = []
for doc_chunks in self.document_chunks.values():
all_chunks.extend([chunk['content'] for chunk in doc_chunks])
if all_chunks:
self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(all_chunks)
async def search_documents(self, query: str, max_results: int = 10) -> List[SearchResult]:
"""Search documents using hybrid keyword and semantic search"""
if not self.document_chunks:
return []
# Perform keyword-based search using TF-IDF
keyword_results = await self._keyword_search(query, max_results)
# Perform semantic search using embeddings
semantic_results = await self._semantic_search(query, max_results)
# Combine and rank results
combined_results = self._combine_search_results(keyword_results, semantic_results)
return combined_results[:max_results]
async def _keyword_search(self, query: str, max_results: int) -> List[SearchResult]:
"""Perform keyword-based search using TF-IDF"""
if self.tfidf_matrix is None:
return []
query_vector = self.tfidf_vectorizer.transform([query])
similarities = cosine_similarity(query_vector, self.tfidf_matrix).flatten()
# Get top results
top_indices = np.argsort(similarities)[::-1][:max_results]
results = []
chunk_index = 0
for doc_id, chunks in self.document_chunks.items():
for chunk in chunks:
if chunk_index in top_indices and similarities[chunk_index] > 0.1:
results.append(SearchResult(
content=chunk['content'],
source=doc_id,
relevance_score=float(similarities[chunk_index]),
metadata={'search_type': 'keyword', 'chunk_index': chunk_index}
))
chunk_index += 1
return sorted(results, key=lambda x: x.relevance_score, reverse=True)
async def _semantic_search(self, query: str, max_results: int) -> List[SearchResult]:
"""Perform semantic search using embeddings"""
query_embedding = self.embedding_model.encode([query])
cursor = self.db_connection.cursor()
cursor.execute('SELECT id, document_id, content, embedding_vector FROM document_chunks')
results = []
for row in cursor.fetchall():
chunk_id, doc_id, content, embedding_blob = row
chunk_embedding = np.frombuffer(embedding_blob, dtype=np.float32).reshape(1, -1)
similarity = cosine_similarity(query_embedding, chunk_embedding)[0][0]
if similarity > 0.3: # Threshold for semantic relevance
results.append(SearchResult(
content=content,
source=doc_id,
relevance_score=float(similarity),
metadata={'search_type': 'semantic', 'chunk_id': chunk_id}
))
return sorted(results, key=lambda x: x.relevance_score, reverse=True)[:max_results]
def _combine_search_results(self, keyword_results: List[SearchResult],
semantic_results: List[SearchResult]) -> List[SearchResult]:
"""Combine and deduplicate search results from different methods"""
seen_content = set()
combined_results = []
# Merge results with weighted scoring
all_results = []
# Weight keyword results
for result in keyword_results:
result.relevance_score *= 0.6 # Keyword weight
all_results.append(result)
# Weight semantic results
for result in semantic_results:
result.relevance_score *= 0.8 # Semantic weight
all_results.append(result)
# Deduplicate and sort
for result in sorted(all_results, key=lambda x: x.relevance_score, reverse=True):
content_hash = hashlib.md5(result.content.encode()).hexdigest()
if content_hash not in seen_content:
seen_content.add(content_hash)
combined_results.append(result)
return combined_results
This implementation provides a sophisticated document storage and retrieval system that combines traditional keyword search with modern semantic search capabilities. The document chunking strategy ensures that large documents are broken down into manageable pieces while maintaining context through overlapping chunks.
The hybrid search approach leverages both TF-IDF vectorization for precise keyword matching and sentence transformers for semantic understanding. This combination allows the system to handle both specific factual queries where exact terms matter and conceptual queries where semantic similarity is more important.
The database schema provides efficient storage and retrieval of document metadata and embeddings, while the caching mechanisms ensure good performance even with large document collections. The scoring and ranking system combines results from both search methods to provide the most relevant information to users.
LLM Integration Layer
The LLM integration layer provides a unified interface for working with different language models, whether they are hosted locally or accessed through remote APIs. This abstraction is crucial for maintaining flexibility and allowing the system to adapt to different deployment scenarios and performance requirements.
The layer handles authentication, request formatting, response parsing, and error handling for various LLM providers. It also implements retry logic, rate limiting, and fallback mechanisms to ensure robust operation in production environments.
For local LLM integration, the system supports popular frameworks like Ollama, which provides easy access to models like Llama 2, Mistral, and others. For remote services, the system integrates with OpenAI's GPT models, Anthropic's Claude, and Azure OpenAI services.
Here is the comprehensive LLM integration implementation:
import aiohttp
import asyncio
import json
from typing import AsyncGenerator
import ollama
from openai import AsyncOpenAI
import anthropic
class LLMClient(ABC):
@abstractmethod
async def generate_response(self, prompt: str, context: List[SearchResult] = None) -> str:
pass
@abstractmethod
async def generate_streaming_response(self, prompt: str, context: List[SearchResult] = None) -> AsyncGenerator[str, None]:
pass
class OllamaClient(LLMClient):
def __init__(self, config: SearchConfig):
self.config = config
self.model = config.llm_model
self.base_url = config.custom_parameters.get('ollama_url', 'http://localhost:11434')
async def generate_response(self, prompt: str, context: List[SearchResult] = None) -> str:
"""Generate response using local Ollama model"""
try:
# Prepare the context-aware prompt
full_prompt = self._prepare_prompt(prompt, context)
response = await asyncio.to_thread(
ollama.generate,
model=self.model,
prompt=full_prompt,
options={
'temperature': self.config.temperature,
'num_predict': self.config.max_tokens,
'top_k': 40,
'top_p': 0.9
}
)
return response['response']
except Exception as e:
raise RuntimeError(f"Ollama generation failed: {str(e)}")
async def generate_streaming_response(self, prompt: str, context: List[SearchResult] = None) -> AsyncGenerator[str, None]:
"""Generate streaming response using local Ollama model"""
full_prompt = self._prepare_prompt(prompt, context)
try:
stream = await asyncio.to_thread(
ollama.generate,
model=self.model,
prompt=full_prompt,
stream=True,
options={
'temperature': self.config.temperature,
'num_predict': self.config.max_tokens
}
)
for chunk in stream:
if 'response' in chunk:
yield chunk['response']
except Exception as e:
raise RuntimeError(f"Ollama streaming failed: {str(e)}")
def _prepare_prompt(self, query: str, context: List[SearchResult] = None) -> str:
"""Prepare a context-aware prompt for the LLM"""
if not context:
return f"Question: {query}\n\nPlease provide a comprehensive answer."
context_text = "\n\n".join([
f"Source {i+1} ({result.source}):\n{result.content}"
for i, result in enumerate(context[:5]) # Limit to top 5 results
])
prompt = f"""Based on the following context information, please answer the question. If the context doesn't contain enough information to answer the question completely, please say so and provide what information you can.
Context:
{context_text}
Question: {query}
Please provide a comprehensive answer based on the context provided. Include references to the sources when relevant."""
return prompt
class OpenAIClient(LLMClient):
def __init__(self, config: SearchConfig):
self.config = config
self.model = config.llm_model
api_key = config.custom_parameters.get('openai_api_key')
if not api_key:
raise ValueError("OpenAI API key required in custom_parameters")
self.client = AsyncOpenAI(api_key=api_key)
async def generate_response(self, prompt: str, context: List[SearchResult] = None) -> str:
"""Generate response using OpenAI GPT models"""
try:
messages = self._prepare_messages(prompt, context)
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
timeout=self.config.search_timeout
)
return response.choices[0].message.content
except Exception as e:
raise RuntimeError(f"OpenAI generation failed: {str(e)}")
async def generate_streaming_response(self, prompt: str, context: List[SearchResult] = None) -> AsyncGenerator[str, None]:
"""Generate streaming response using OpenAI GPT models"""
try:
messages = self._prepare_messages(prompt, context)
stream = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
stream=True,
timeout=self.config.search_timeout
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
raise RuntimeError(f"OpenAI streaming failed: {str(e)}")
def _prepare_messages(self, query: str, context: List[SearchResult] = None) -> List[Dict]:
"""Prepare messages for OpenAI chat completion"""
system_message = {
"role": "system",
"content": "You are a helpful AI assistant that answers questions based on provided context. Always cite your sources when possible and indicate if information is not available in the context."
}
if context:
context_text = "\n\n".join([
f"Source {i+1} ({result.source}):\n{result.content}"
for i, result in enumerate(context[:5])
])
user_message = {
"role": "user",
"content": f"Context:\n{context_text}\n\nQuestion: {query}"
}
else:
user_message = {
"role": "user",
"content": query
}
return [system_message, user_message]
class AnthropicClient(LLMClient):
def __init__(self, config: SearchConfig):
self.config = config
self.model = config.llm_model
api_key = config.custom_parameters.get('anthropic_api_key')
if not api_key:
raise ValueError("Anthropic API key required in custom_parameters")
self.client = anthropic.AsyncAnthropic(api_key=api_key)
async def generate_response(self, prompt: str, context: List[SearchResult] = None) -> str:
"""Generate response using Anthropic Claude models"""
try:
full_prompt = self._prepare_prompt(prompt, context)
response = await self.client.messages.create(
model=self.model,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature,
messages=[{"role": "user", "content": full_prompt}]
)
return response.content[0].text
except Exception as e:
raise RuntimeError(f"Anthropic generation failed: {str(e)}")
async def generate_streaming_response(self, prompt: str, context: List[SearchResult] = None) -> AsyncGenerator[str, None]:
"""Generate streaming response using Anthropic Claude models"""
try:
full_prompt = self._prepare_prompt(prompt, context)
async with self.client.messages.stream(
model=self.model,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature,
messages=[{"role": "user", "content": full_prompt}]
) as stream:
async for text in stream.text_stream:
yield text
except Exception as e:
raise RuntimeError(f"Anthropic streaming failed: {str(e)}")
def _prepare_prompt(self, query: str, context: List[SearchResult] = None) -> str:
"""Prepare prompt for Anthropic Claude"""
if not context:
return f"Please answer this question: {query}"
context_text = "\n\n".join([
f"Source {i+1} ({result.source}):\n{result.content}"
for i, result in enumerate(context[:5])
])
return f"""I'll provide you with some context information and then ask a question. Please answer based on the context provided.
Context:
{context_text}
Question: {query}
Please provide a comprehensive answer based on the context. If the context doesn't contain sufficient information, please indicate what's missing."""
class LLMClientFactory:
@staticmethod
def create_client(config: SearchConfig) -> LLMClient:
"""Factory method to create appropriate LLM client based on configuration"""
if config.llm_provider == LLMProvider.LOCAL_OLLAMA:
return OllamaClient(config)
elif config.llm_provider == LLMProvider.OPENAI:
return OpenAIClient(config)
elif config.llm_provider == LLMProvider.ANTHROPIC:
return AnthropicClient(config)
elif config.llm_provider == LLMProvider.AZURE_OPENAI:
return AzureOpenAIClient(config)
else:
raise ValueError(f"Unsupported LLM provider: {config.llm_provider}")
class AzureOpenAIClient(LLMClient):
def __init__(self, config: SearchConfig):
self.config = config
self.model = config.llm_model
# Azure OpenAI specific parameters
azure_endpoint = config.custom_parameters.get('azure_endpoint')
api_key = config.custom_parameters.get('azure_api_key')
api_version = config.custom_parameters.get('azure_api_version', '2024-02-15-preview')
if not azure_endpoint or not api_key:
raise ValueError("Azure endpoint and API key required for Azure OpenAI")
self.client = AsyncOpenAI(
azure_endpoint=azure_endpoint,
api_key=api_key,
api_version=api_version
)
async def generate_response(self, prompt: str, context: List[SearchResult] = None) -> str:
"""Generate response using Azure OpenAI"""
try:
messages = self._prepare_messages(prompt, context)
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens
)
return response.choices[0].message.content
except Exception as e:
raise RuntimeError(f"Azure OpenAI generation failed: {str(e)}")
async def generate_streaming_response(self, prompt: str, context: List[SearchResult] = None) -> AsyncGenerator[str, None]:
"""Generate streaming response using Azure OpenAI"""
try:
messages = self._prepare_messages(prompt, context)
stream = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
raise RuntimeError(f"Azure OpenAI streaming failed: {str(e)}")
def _prepare_messages(self, query: str, context: List[SearchResult] = None) -> List[Dict]:
"""Prepare messages for Azure OpenAI chat completion"""
system_message = {
"role": "system",
"content": "You are an intelligent search assistant. Answer questions based on the provided context and cite sources when possible."
}
if context:
context_text = "\n\n".join([
f"Source {i+1} ({result.source}):\n{result.content}"
for i, result in enumerate(context[:5])
])
user_message = {
"role": "user",
"content": f"Context:\n{context_text}\n\nQuestion: {query}"
}
else:
user_message = {
"role": "user",
"content": query
}
return [system_message, user_message]
This LLM integration layer provides a clean abstraction over different language model providers while maintaining the specific capabilities and optimizations for each service. The factory pattern allows for easy switching between providers based on configuration, and the streaming support enables real-time response generation for better user experience.
Each client implementation handles the specific requirements and API patterns of its respective service, including authentication, request formatting, and error handling. The context preparation methods ensure that search results are properly formatted and presented to the language models for accurate and relevant response generation.
Configuration Management System
The configuration management system provides centralized control over all aspects of the search engine's behavior. This system must be flexible enough to handle both standalone and embedded deployment scenarios while providing sensible defaults and validation for all configuration parameters.
The configuration system supports hierarchical configuration loading, allowing for base configurations to be overridden by environment-specific settings or application-specific customizations. This approach enables the same search engine codebase to be deployed in different environments with appropriate settings.
Environment variable support ensures that sensitive information like API keys can be managed securely without hardcoding them in configuration files. The system also provides runtime configuration updates for certain parameters, allowing for dynamic tuning without requiring application restarts.
Here is the comprehensive configuration management implementation:
import os
import yaml
import json
from pathlib import Path
from typing import Any, Dict, Optional, Union
import logging
from dataclasses import asdict, fields
import jsonschema
class ConfigurationManager:
def __init__(self, config_path: Optional[str] = None):
self.config_path = config_path
self.config_schema = self._define_config_schema()
self.logger = logging.getLogger(__name__)
self._config_cache = {}
def load_config(self, config_override: Optional[Dict] = None) -> SearchConfig:
"""Load configuration from multiple sources with proper precedence"""
# Start with default configuration
config_dict = self._get_default_config()
# Load from configuration file if provided
if self.config_path and Path(self.config_path).exists():
file_config = self._load_config_file(self.config_path)
config_dict.update(file_config)
# Override with environment variables
env_config = self._load_from_environment()
config_dict.update(env_config)
# Apply any runtime overrides
if config_override:
config_dict.update(config_override)
# Validate configuration
self._validate_config(config_dict)
# Convert to SearchConfig object
return self._dict_to_config(config_dict)
def _get_default_config(self) -> Dict[str, Any]:
"""Get default configuration values"""
return {
'deployment_mode': 'standalone',
'llm_provider': 'local_ollama',
'llm_model': 'llama2',
'max_results': 10,
'search_timeout': 30.0,
'enable_caching': True,
'cache_ttl': 3600,
'embedding_model': 'sentence-transformers/all-MiniLM-L6-v2',
'chunk_size': 512,
'chunk_overlap': 50,
'temperature': 0.7,
'max_tokens': 2048,
'search_domains': [],
'excluded_paths': [],
'custom_parameters': {}
}
def _load_config_file(self, config_path: str) -> Dict[str, Any]:
"""Load configuration from YAML or JSON file"""
try:
with open(config_path, 'r') as file:
if config_path.endswith('.yaml') or config_path.endswith('.yml'):
return yaml.safe_load(file) or {}
elif config_path.endswith('.json'):
return json.load(file) or {}
else:
raise ValueError(f"Unsupported config file format: {config_path}")
except Exception as e:
self.logger.error(f"Failed to load config file {config_path}: {e}")
return {}
def _load_from_environment(self) -> Dict[str, Any]:
"""Load configuration from environment variables"""
env_config = {}
# Define environment variable mappings
env_mappings = {
'SEARCH_ENGINE_DEPLOYMENT_MODE': 'deployment_mode',
'SEARCH_ENGINE_LLM_PROVIDER': 'llm_provider',
'SEARCH_ENGINE_LLM_MODEL': 'llm_model',
'SEARCH_ENGINE_MAX_RESULTS': ('max_results', int),
'SEARCH_ENGINE_TIMEOUT': ('search_timeout', float),
'SEARCH_ENGINE_ENABLE_CACHE': ('enable_caching', bool),
'SEARCH_ENGINE_CACHE_TTL': ('cache_ttl', int),
'SEARCH_ENGINE_EMBEDDING_MODEL': 'embedding_model',
'SEARCH_ENGINE_CHUNK_SIZE': ('chunk_size', int),
'SEARCH_ENGINE_CHUNK_OVERLAP': ('chunk_overlap', int),
'SEARCH_ENGINE_TEMPERATURE': ('temperature', float),
'SEARCH_ENGINE_MAX_TOKENS': ('max_tokens', int),
}
for env_var, config_key in env_mappings.items():
value = os.getenv(env_var)
if value is not None:
if isinstance(config_key, tuple):
key, type_converter = config_key
try:
if type_converter == bool:
env_config[key] = value.lower() in ('true', '1', 'yes', 'on')
else:
env_config[key] = type_converter(value)
except ValueError as e:
self.logger.warning(f"Invalid value for {env_var}: {value}")
else:
env_config[config_key] = value
# Handle custom parameters from environment
custom_params = {}
for key, value in os.environ.items():
if key.startswith('SEARCH_ENGINE_CUSTOM_'):
param_name = key[len('SEARCH_ENGINE_CUSTOM_'):].lower()
custom_params[param_name] = value
if custom_params:
env_config['custom_parameters'] = custom_params
return env_config
def _define_config_schema(self) -> Dict[str, Any]:
"""Define JSON schema for configuration validation"""
return {
"type": "object",
"properties": {
"deployment_mode": {
"type": "string",
"enum": ["standalone", "embedded"]
},
"llm_provider": {
"type": "string",
"enum": ["local_ollama", "openai", "anthropic", "azure_openai"]
},
"llm_model": {
"type": "string",
"minLength": 1
},
"max_results": {
"type": "integer",
"minimum": 1,
"maximum": 100
},
"search_timeout": {
"type": "number",
"minimum": 1.0,
"maximum": 300.0
},
"enable_caching": {
"type": "boolean"
},
"cache_ttl": {
"type": "integer",
"minimum": 60,
"maximum": 86400
},
"embedding_model": {
"type": "string",
"minLength": 1
},
"chunk_size": {
"type": "integer",
"minimum": 100,
"maximum": 2000
},
"chunk_overlap": {
"type": "integer",
"minimum": 0,
"maximum": 200
},
"temperature": {
"type": "number",
"minimum": 0.0,
"maximum": 2.0
},
"max_tokens": {
"type": "integer",
"minimum": 100,
"maximum": 8000
},
"search_domains": {
"type": "array",
"items": {"type": "string"}
},
"excluded_paths": {
"type": "array",
"items": {"type": "string"}
},
"custom_parameters": {
"type": "object"
}
},
"required": ["deployment_mode", "llm_provider", "llm_model"],
"additionalProperties": False
}
def _validate_config(self, config_dict: Dict[str, Any]) -> None:
"""Validate configuration against schema"""
try:
jsonschema.validate(config_dict, self.config_schema)
except jsonschema.ValidationError as e:
raise ValueError(f"Configuration validation failed: {e.message}")
# Additional custom validations
if config_dict.get('chunk_overlap', 0) >= config_dict.get('chunk_size', 512):
raise ValueError("chunk_overlap must be less than chunk_size")
# Validate LLM provider specific requirements
provider = config_dict.get('llm_provider')
custom_params = config_dict.get('custom_parameters', {})
if provider == 'openai' and 'openai_api_key' not in custom_params:
if not os.getenv('OPENAI_API_KEY'):
raise ValueError("OpenAI API key required in custom_parameters or OPENAI_API_KEY environment variable")
if provider == 'anthropic' and 'anthropic_api_key' not in custom_params:
if not os.getenv('ANTHROPIC_API_KEY'):
raise ValueError("Anthropic API key required in custom_parameters or ANTHROPIC_API_KEY environment variable")
if provider == 'azure_openai':
required_azure_params = ['azure_endpoint', 'azure_api_key']
for param in required_azure_params:
if param not in custom_params:
raise ValueError(f"Azure OpenAI requires {param} in custom_parameters")
def _dict_to_config(self, config_dict: Dict[str, Any]) -> SearchConfig:
"""Convert dictionary to SearchConfig object"""
# Handle enum conversions
if 'deployment_mode' in config_dict:
config_dict['deployment_mode'] = DeploymentMode(config_dict['deployment_mode'])
if 'llm_provider' in config_dict:
config_dict['llm_provider'] = LLMProvider(config_dict['llm_provider'])
# Filter out any keys that aren't valid SearchConfig fields
valid_fields = {f.name for f in fields(SearchConfig)}
filtered_dict = {k: v for k, v in config_dict.items() if k in valid_fields}
return SearchConfig(**filtered_dict)
def save_config(self, config: SearchConfig, output_path: str) -> None:
"""Save configuration to file"""
config_dict = asdict(config)
# Convert enums to strings for serialization
config_dict['deployment_mode'] = config.deployment_mode.value
config_dict['llm_provider'] = config.llm_provider.value
try:
with open(output_path, 'w') as file:
if output_path.endswith('.yaml') or output_path.endswith('.yml'):
yaml.dump(config_dict, file, default_flow_style=False, indent=2)
elif output_path.endswith('.json'):
json.dump(config_dict, file, indent=2)
else:
raise ValueError(f"Unsupported output format: {output_path}")
self.logger.info(f"Configuration saved to {output_path}")
except Exception as e:
self.logger.error(f"Failed to save configuration: {e}")
raise
def update_runtime_config(self, config: SearchConfig, updates: Dict[str, Any]) -> SearchConfig:
"""Update configuration at runtime with validation"""
# Define which parameters can be updated at runtime
runtime_updatable = {
'max_results', 'search_timeout', 'enable_caching', 'cache_ttl',
'temperature', 'max_tokens', 'search_domains', 'excluded_paths'
}
config_dict = asdict(config)
for key, value in updates.items():
if key not in runtime_updatable:
raise ValueError(f"Parameter {key} cannot be updated at runtime")
config_dict[key] = value
# Validate updated configuration
self._validate_config(config_dict)
return self._dict_to_config(config_dict)
class EmbeddedConfigurationManager(ConfigurationManager):
"""Configuration manager specifically for embedded deployment scenarios"""
def __init__(self, host_app_config: Dict[str, Any]):
super().__init__()
self.host_app_config = host_app_config
def load_config(self, config_override: Optional[Dict] = None) -> SearchConfig:
"""Load configuration with host application preferences"""
# Start with default configuration
config_dict = self._get_default_config()
# Apply host application configuration
if self.host_app_config:
config_dict.update(self.host_app_config)
# Set deployment mode to embedded
config_dict['deployment_mode'] = 'embedded'
# Apply any additional overrides
if config_override:
config_dict.update(config_override)
# Validate and return
self._validate_config(config_dict)
return self._dict_to_config(config_dict)
def configure_search_scope(self, config: SearchConfig,
document_sources: List[str],
search_filters: Optional[Dict] = None) -> SearchConfig:
"""Configure search scope for embedded deployment"""
config_dict = asdict(config)
# Set search domains based on provided document sources
config_dict['search_domains'] = document_sources
# Apply search filters if provided
if search_filters:
excluded_paths = search_filters.get('excluded_paths', [])
config_dict['excluded_paths'] = excluded_paths
# Store additional filters in custom parameters
filter_params = {k: v for k, v in search_filters.items() if k != 'excluded_paths'}
config_dict['custom_parameters'].update(filter_params)
self._validate_config(config_dict)
return self._dict_to_config(config_dict)
This configuration management system provides comprehensive support for both standalone and embedded deployment scenarios. The hierarchical loading system ensures that configurations can be customized at multiple levels while maintaining proper validation and type safety.
The embedded configuration manager extends the base functionality to handle the specific requirements of applications that integrate the search engine as a component. This includes dynamic scope configuration and host application preference integration.
The runtime configuration update capability allows for dynamic tuning of search behavior without requiring application restarts, which is particularly important for production deployments where search performance may need to be optimized based on usage patterns.
Embedding and Standalone Modes
The search engine architecture supports two distinct deployment modes that cater to different use cases and integration requirements. The standalone mode provides a complete, independent search application that can be deployed as a service or used directly. The embedded mode allows the search engine to be integrated as a component within larger applications, providing search capabilities while respecting the host application's architecture and constraints.
In standalone mode, the search engine manages its own document sources, user interfaces, and system resources. It can operate as a web service, command-line tool, or desktop application, providing full search functionality with minimal external dependencies.
The embedded mode is designed for integration scenarios where the search engine becomes part of a larger application ecosystem. In this mode, the host application controls document sources, user authentication, and interface presentation, while the search engine focuses purely on providing intelligent search and response generation capabilities.
Here is the implementation that supports both deployment modes with appropriate abstractions and interfaces:
from abc import ABC, abstractmethod
from typing import Protocol, runtime_checkable
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
@runtime_checkable
class DocumentProvider(Protocol):
"""Protocol for document providers in embedded mode"""
async def get_documents(self, filters: Optional[Dict] = None) -> List[Dict[str, Any]]:
"""Retrieve documents based on filters"""
...
async def get_document_content(self, document_id: str) -> str:
"""Get content of a specific document"""
...
async def search_documents(self, query: str, filters: Optional[Dict] = None) -> List[Dict[str, Any]]:
"""Search documents with application-specific logic"""
...
class StandaloneSearchEngine(AgenticSearchEngine):
"""Standalone implementation of the search engine"""
def __init__(self, config: SearchConfig):
super().__init__(config)
self.web_app = None
self.document_loader = None
async def initialize(self):
"""Initialize standalone search engine with all components"""
await super().initialize()
# Initialize document loader for standalone mode
self.document_loader = StandaloneDocumentLoader(self.config)
await self.document_loader.initialize()
# Load documents from configured domains
await self._load_initial_documents()
# Initialize web interface if needed
if self.config.custom_parameters.get('enable_web_interface', True):
self._setup_web_interface()
async def _load_initial_documents(self):
"""Load documents from configured search domains"""
for domain in self.config.search_domains:
try:
if domain.startswith('http'):
await self._load_web_documents(domain)
elif Path(domain).exists():
await self._load_file_documents(domain)
else:
self.logger.warning(f"Invalid search domain: {domain}")
except Exception as e:
self.logger.error(f"Failed to load documents from {domain}: {e}")
async def _load_web_documents(self, url: str):
"""Load documents from web sources"""
# Implementation for web scraping and document extraction
# This would include handling robots.txt, rate limiting, etc.
self.logger.info(f"Loading web documents from {url}")
# Placeholder implementation
# In a real implementation, this would use libraries like scrapy, beautifulsoup, etc.
pass
async def _load_file_documents(self, path: str):
"""Load documents from file system"""
path_obj = Path(path)
if path_obj.is_file():
await self._process_single_file(path_obj)
elif path_obj.is_dir():
await self._process_directory(path_obj)
async def _process_single_file(self, file_path: Path):
"""Process a single file and add to document store"""
try:
content = await self._extract_file_content(file_path)
if content:
await self.document_store.add_document(
doc_id=str(file_path),
content=content,
source=str(file_path),
metadata={
'file_type': file_path.suffix,
'file_size': file_path.stat().st_size,
'modified_time': file_path.stat().st_mtime
}
)
except Exception as e:
self.logger.error(f"Failed to process file {file_path}: {e}")
async def _process_directory(self, dir_path: Path):
"""Process all files in a directory recursively"""
supported_extensions = {'.txt', '.md', '.pdf', '.docx', '.html', '.json', '.csv'}
for file_path in dir_path.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
# Check if file is excluded
if not self._is_path_excluded(str(file_path)):
await self._process_single_file(file_path)
def _is_path_excluded(self, path: str) -> bool:
"""Check if a path should be excluded from indexing"""
for excluded_pattern in self.config.excluded_paths:
if excluded_pattern in path:
return True
return False
async def _extract_file_content(self, file_path: Path) -> Optional[str]:
"""Extract text content from various file types"""
try:
if file_path.suffix.lower() == '.txt':
return file_path.read_text(encoding='utf-8')
elif file_path.suffix.lower() == '.md':
return file_path.read_text(encoding='utf-8')
elif file_path.suffix.lower() == '.json':
# Extract text from JSON values
data = json.loads(file_path.read_text(encoding='utf-8'))
return self._extract_text_from_json(data)
elif file_path.suffix.lower() == '.pdf':
# Would use PyPDF2 or similar library
return await self._extract_pdf_content(file_path)
elif file_path.suffix.lower() == '.docx':
# Would use python-docx library
return await self._extract_docx_content(file_path)
elif file_path.suffix.lower() == '.html':
# Would use beautifulsoup
return await self._extract_html_content(file_path)
else:
return None
except Exception as e:
self.logger.error(f"Failed to extract content from {file_path}: {e}")
return None
def _extract_text_from_json(self, data: Any) -> str:
"""Extract text content from JSON data structures"""
if isinstance(data, str):
return data
elif isinstance(data, dict):
return ' '.join(self._extract_text_from_json(v) for v in data.values())
elif isinstance(data, list):
return ' '.join(self._extract_text_from_json(item) for item in data)
else:
return str(data)
async def _extract_pdf_content(self, file_path: Path) -> str:
"""Extract text from PDF files"""
# Placeholder - would use PyPDF2, pdfplumber, or similar
self.logger.info(f"PDF extraction not implemented for {file_path}")
return ""
async def _extract_docx_content(self, file_path: Path) -> str:
"""Extract text from DOCX files"""
# Placeholder - would use python-docx
self.logger.info(f"DOCX extraction not implemented for {file_path}")
return ""
async def _extract_html_content(self, file_path: Path) -> str:
"""Extract text from HTML files"""
# Placeholder - would use beautifulsoup
self.logger.info(f"HTML extraction not implemented for {file_path}")
return ""
def _setup_web_interface(self):
"""Setup FastAPI web interface for standalone mode"""
self.web_app = FastAPI(title="Agentic Search Engine", version="1.0.0")
@self.web_app.post("/search")
async def search_endpoint(request: SearchRequest) -> SearchResponse:
try:
response = await self.search(request.query, request.context)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.web_app.get("/health")
async def health_check():
return {"status": "healthy", "initialized": self.is_initialized}
@self.web_app.post("/documents")
async def add_document_endpoint(request: AddDocumentRequest):
try:
await self.document_store.add_document(
doc_id=request.document_id,
content=request.content,
source=request.source,
metadata=request.metadata
)
return {"status": "success", "document_id": request.document_id}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def run_web_server(self, host: str = "0.0.0.0", port: int = 8000):
"""Run the web server for standalone mode"""
if not self.web_app:
raise RuntimeError("Web interface not initialized")
config = uvicorn.Config(self.web_app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()
class EmbeddedSearchEngine(AgenticSearchEngine):
"""Embedded implementation of the search engine"""
def __init__(self, config: SearchConfig, document_provider: DocumentProvider):
super().__init__(config)
self.document_provider = document_provider
self.host_callbacks = {}
async def initialize(self):
"""Initialize embedded search engine"""
await super().initialize()
# Initialize with documents from the host application
await self._sync_with_host_documents()
self.logger.info("Embedded search engine initialized")
async def _sync_with_host_documents(self):
"""Synchronize with documents provided by host application"""
try:
documents = await self.document_provider.get_documents()
for doc_info in documents:
content = await self.document_provider.get_document_content(doc_info['id'])
await self.document_store.add_document(
doc_id=doc_info['id'],
content=content,
source=doc_info.get('source', 'host_application'),
metadata=doc_info.get('metadata', {})
)
except Exception as e:
self.logger.error(f"Failed to sync with host documents: {e}")
async def search_with_filters(self, query: str, filters: Optional[Dict] = None) -> SearchResponse:
"""Search with host application specific filters"""
# Apply host application filters to the search
filtered_results = []
# Get base search results
base_response = await self.search(query)
# Apply additional filtering based on host application requirements
if filters:
filtered_results = self._apply_host_filters(base_response.sources, filters)
base_response.sources = filtered_results
return base_response
def _apply_host_filters(self, results: List[SearchResult], filters: Dict) -> List[SearchResult]:
"""Apply host application specific filters to search results"""
filtered_results = []
for result in results:
# Apply various filter criteria
if self._passes_filters(result, filters):
filtered_results.append(result)
return filtered_results
def _passes_filters(self, result: SearchResult, filters: Dict) -> bool:
"""Check if a search result passes the specified filters"""
# Document type filter
if 'document_types' in filters:
doc_type = result.metadata.get('file_type', '').lower()
if doc_type not in filters['document_types']:
return False
# Date range filter
if 'date_range' in filters:
doc_date = result.metadata.get('modified_time', 0)
start_date = filters['date_range'].get('start', 0)
end_date = filters['date_range'].get('end', float('inf'))
if not (start_date <= doc_date <= end_date):
return False
# Custom metadata filters
if 'metadata_filters' in filters:
for key, expected_value in filters['metadata_filters'].items():
if result.metadata.get(key) != expected_value:
return False
return True
def register_callback(self, event_type: str, callback_func):
"""Register callbacks for host application integration"""
if event_type not in self.host_callbacks:
self.host_callbacks[event_type] = []
self.host_callbacks[event_type].append(callback_func)
async def _trigger_callback(self, event_type: str, data: Any):
"""Trigger registered callbacks for specific events"""
if event_type in self.host_callbacks:
for callback in self.host_callbacks[event_type]:
try:
if asyncio.iscoroutinefunction(callback):
await callback(data)
else:
callback(data)
except Exception as e:
self.logger.error(f"Callback error for {event_type}: {e}")
async def update_document_index(self, document_updates: List[Dict]):
"""Update document index based on host application changes"""
for update in document_updates:
if update['action'] == 'add':
content = await self.document_provider.get_document_content(update['document_id'])
await self.document_store.add_document(
doc_id=update['document_id'],
content=content,
source=update.get('source', 'host_application'),
metadata=update.get('metadata', {})
)
elif update['action'] == 'remove':
# Implementation for document removal
pass
elif update['action'] == 'update':
# Implementation for document updates
pass
await self._trigger_callback('index_updated', document_updates)
# Pydantic models for API requests
class SearchRequest(BaseModel):
query: str
context: Optional[Dict] = None
filters: Optional[Dict] = None
class AddDocumentRequest(BaseModel):
document_id: str
content: str
source: str
metadata: Optional[Dict] = None
class StandaloneDocumentLoader:
"""Document loader for standalone mode"""
def __init__(self, config: SearchConfig):
self.config = config
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize the document loader"""
self.logger.info("Document loader initialized")
async def load_from_source(self, source: str) -> List[Dict]:
"""Load documents from a specified source"""
# Implementation for loading documents from various sources
# This would handle web scraping, file system scanning, database queries, etc.
pass
This implementation provides comprehensive support for both standalone and embedded deployment modes. The standalone mode includes a complete document loading system that can handle various file types and web sources, along with a web API for external access.
The embedded mode focuses on integration with host applications through the DocumentProvider protocol, allowing the host application to maintain control over document sources while leveraging the search engine's AI capabilities. The callback system enables tight integration with host application workflows and real-time synchronization of document changes.
The filtering system in embedded mode allows host applications to apply their own business logic and access controls to search results, ensuring that the search engine respects the host application's security and data governance requirements.
Document Processing and Indexing
The document processing and indexing system forms the foundation of the search engine's ability to understand and retrieve relevant information. This system must handle diverse document types, extract meaningful content, and create efficient indexes that support both keyword and semantic search operations.
The processing pipeline includes document parsing, content extraction, text preprocessing, chunking, embedding generation, and index construction. Each stage is designed to be modular and extensible, allowing for the addition of new document types and processing techniques without disrupting the existing system.
The indexing strategy combines traditional inverted indexes for fast keyword lookup with vector databases for semantic similarity search. This hybrid approach ensures optimal performance across different types of queries while maintaining the ability to scale to large document collections.
Let me implement the comprehensive document processing and indexing system:
import mimetypes
import hashlib
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import asyncio
from typing import Iterator, Tuple
import spacy
from transformers import pipeline
import faiss
import pickle
class DocumentProcessor:
"""Handles document parsing and content extraction"""
def __init__(self, config: SearchConfig):
self.config = config
self.logger = logging.getLogger(__name__)
self.nlp_model = None
self.summarizer = None
self.executor = ThreadPoolExecutor(max_workers=4)
async def initialize(self):
"""Initialize NLP models and processing components"""
try:
# Load spaCy model for text processing
self.nlp_model = spacy.load("en_core_web_sm")
# Initialize summarization pipeline
self.summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=0 if torch.cuda.is_available() else -1
)
self.logger.info("Document processor initialized with NLP models")
except Exception as e:
self.logger.warning(f"Failed to load NLP models: {e}")
# Continue without advanced NLP features
async def process_document(self, doc_id: str, content: str, source: str,
metadata: Dict = None) -> ProcessedDocument:
"""Process a document through the complete pipeline"""
if metadata is None:
metadata = {}
start_time = time.time()
# Extract and clean text content
cleaned_content = await self._clean_content(content)
# Generate document summary if content is long
summary = await self._generate_summary(cleaned_content)
# Extract key entities and topics
entities = await self._extract_entities(cleaned_content)
# Generate document chunks
chunks = await self._create_semantic_chunks(cleaned_content)
# Calculate document metrics
metrics = self._calculate_document_metrics(cleaned_content)
processing_time = time.time() - start_time
return ProcessedDocument(
doc_id=doc_id,
content=cleaned_content,
summary=summary,
chunks=chunks,
entities=entities,
metrics=metrics,
source=source,
metadata=metadata,
processing_time=processing_time,
content_hash=hashlib.sha256(cleaned_content.encode()).hexdigest()
)
async def _clean_content(self, content: str) -> str:
"""Clean and normalize document content"""
# Remove excessive whitespace
content = re.sub(r'\s+', ' ', content)
# Remove special characters but preserve sentence structure
content = re.sub(r'[^\w\s\.\!\?\,\;\:\-\(\)]', ' ', content)
# Normalize line breaks
content = re.sub(r'\n+', '\n', content)
return content.strip()
async def _generate_summary(self, content: str) -> str:
"""Generate a summary of the document content"""
if not self.summarizer or len(content.split()) < 100:
# Return first few sentences as summary
sentences = content.split('.')[:3]
return '. '.join(sentences) + '.'
try:
# Use transformer model for summarization
max_chunk_length = 1024
if len(content) > max_chunk_length:
content = content[:max_chunk_length]
summary_result = await asyncio.to_thread(
self.summarizer,
content,
max_length=150,
min_length=50,
do_sample=False
)
return summary_result[0]['summary_text']
except Exception as e:
self.logger.warning(f"Summarization failed: {e}")
sentences = content.split('.')[:3]
return '. '.join(sentences) + '.'
async def _extract_entities(self, content: str) -> List[Dict]:
"""Extract named entities from the content"""
if not self.nlp_model:
return []
try:
doc = await asyncio.to_thread(self.nlp_model, content)
entities = []
for ent in doc.ents:
entities.append({
'text': ent.text,
'label': ent.label_,
'start': ent.start_char,
'end': ent.end_char,
'confidence': getattr(ent, 'confidence', 1.0)
})
return entities
except Exception as e:
self.logger.warning(f"Entity extraction failed: {e}")
return []
async def _create_semantic_chunks(self, content: str) -> List[DocumentChunk]:
"""Create semantically coherent chunks from the document"""
if not self.nlp_model:
return self._create_simple_chunks(content)
try:
doc = await asyncio.to_thread(self.nlp_model, content)
# Group sentences into semantic chunks
chunks = []
current_chunk = []
current_length = 0
for sent in doc.sents:
sent_length = len(sent.text.split())
if current_length + sent_length > self.config.chunk_size and current_chunk:
# Create chunk from current sentences
chunk_text = ' '.join([s.text for s in current_chunk])
chunks.append(DocumentChunk(
content=chunk_text,
start_pos=current_chunk[0].start_char,
end_pos=current_chunk[-1].end_char,
sentence_count=len(current_chunk),
word_count=current_length
))
# Start new chunk with overlap
overlap_sentences = current_chunk[-2:] if len(current_chunk) > 2 else current_chunk
current_chunk = overlap_sentences + [sent]
current_length = sum(len(s.text.split()) for s in current_chunk)
else:
current_chunk.append(sent)
current_length += sent_length
# Add final chunk
if current_chunk:
chunk_text = ' '.join([s.text for s in current_chunk])
chunks.append(DocumentChunk(
content=chunk_text,
start_pos=current_chunk[0].start_char,
end_pos=current_chunk[-1].end_char,
sentence_count=len(current_chunk),
word_count=current_length
))
return chunks
except Exception as e:
self.logger.warning(f"Semantic chunking failed: {e}")
return self._create_simple_chunks(content)
def _create_simple_chunks(self, content: str) -> List[DocumentChunk]:
"""Create simple word-based chunks as fallback"""
words = content.split()
chunks = []
for i in range(0, len(words), self.config.chunk_size - self.config.chunk_overlap):
chunk_words = words[i:i + self.config.chunk_size]
chunk_text = ' '.join(chunk_words)
chunks.append(DocumentChunk(
content=chunk_text,
start_pos=i,
end_pos=i + len(chunk_words),
sentence_count=chunk_text.count('.') + 1,
word_count=len(chunk_words)
))
return chunks
def _calculate_document_metrics(self, content: str) -> Dict[str, Any]:
"""Calculate various metrics for the document"""
words = content.split()
sentences = content.split('.')
return {
'word_count': len(words),
'sentence_count': len(sentences),
'character_count': len(content),
'average_sentence_length': len(words) / len(sentences) if sentences else 0,
'readability_score': self._calculate_readability(content),
'language': self._detect_language(content)
}
def _calculate_readability(self, content: str) -> float:
"""Calculate a simple readability score"""
words = content.split()
sentences = content.split('.')
if not sentences or not words:
return 0.0
avg_sentence_length = len(words) / len(sentences)
# Simple readability approximation
score = 206.835 - (1.015 * avg_sentence_length)
return max(0.0, min(100.0, score))
def _detect_language(self, content: str) -> str:
"""Detect the language of the content"""
# Simple language detection based on common words
# In a real implementation, would use langdetect or similar
english_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
words = set(content.lower().split())
english_count = len(words.intersection(english_words))
if english_count > len(words) * 0.1:
return 'en'
else:
return 'unknown'
@dataclass
class DocumentChunk:
content: str
start_pos: int
end_pos: int
sentence_count: int
word_count: int
embedding: Optional[np.ndarray] = None
@dataclass
class ProcessedDocument:
doc_id: str
content: str
summary: str
chunks: List[DocumentChunk]
entities: List[Dict]
metrics: Dict[str, Any]
source: str
metadata: Dict[str, Any]
processing_time: float
content_hash: str
class VectorIndex:
"""Manages vector embeddings and similarity search"""
def __init__(self, config: SearchConfig):
self.config = config
self.embedding_model = None
self.index = None
self.chunk_metadata = {}
self.dimension = None
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize the vector index and embedding model"""
self.embedding_model = SentenceTransformer(self.config.embedding_model)
self.dimension = self.embedding_model.get_sentence_embedding_dimension()
# Initialize FAISS index for efficient similarity search
self.index = faiss.IndexFlatIP(self.dimension) # Inner product for cosine similarity
self.logger.info(f"Vector index initialized with dimension {self.dimension}")
async def add_document_chunks(self, doc_id: str, chunks: List[DocumentChunk]):
"""Add document chunks to the vector index"""
if not chunks:
return
# Generate embeddings for all chunks
chunk_texts = [chunk.content for chunk in chunks]
embeddings = await asyncio.to_thread(
self.embedding_model.encode,
chunk_texts,
normalize_embeddings=True
)
# Add embeddings to FAISS index
start_idx = self.index.ntotal
self.index.add(embeddings.astype(np.float32))
# Store chunk metadata
for i, chunk in enumerate(chunks):
chunk_id = f"{doc_id}_chunk_{i}"
self.chunk_metadata[start_idx + i] = {
'chunk_id': chunk_id,
'doc_id': doc_id,
'content': chunk.content,
'start_pos': chunk.start_pos,
'end_pos': chunk.end_pos,
'word_count': chunk.word_count
}
# Store embedding in chunk object
chunk.embedding = embeddings[i]
async def search_similar(self, query: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""Search for similar chunks using vector similarity"""
if self.index.ntotal == 0:
return []
# Generate query embedding
query_embedding = await asyncio.to_thread(
self.embedding_model.encode,
[query],
normalize_embeddings=True
)
# Search in FAISS index
scores, indices = self.index.search(query_embedding.astype(np.float32), top_k)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx in self.chunk_metadata:
chunk_info = self.chunk_metadata[idx]
results.append((chunk_info['content'], float(score)))
return results
def save_index(self, filepath: str):
"""Save the vector index to disk"""
try:
# Save FAISS index
faiss.write_index(self.index, f"{filepath}.faiss")
# Save metadata
with open(f"{filepath}.metadata", 'wb') as f:
pickle.dump(self.chunk_metadata, f)
self.logger.info(f"Vector index saved to {filepath}")
except Exception as e:
self.logger.error(f"Failed to save index: {e}")
def load_index(self, filepath: str):
"""Load the vector index from disk"""
try:
# Load FAISS index
self.index = faiss.read_index(f"{filepath}.faiss")
# Load metadata
with open(f"{filepath}.metadata", 'rb') as f:
self.chunk_metadata = pickle.load(f)
self.logger.info(f"Vector index loaded from {filepath}")
except Exception as e:
self.logger.error(f"Failed to load index: {e}")
class IndexManager:
"""Manages both keyword and vector indexes"""
def __init__(self, config: SearchConfig):
self.config = config
self.vector_index = VectorIndex(config)
self.keyword_index = {}
self.document_processor = DocumentProcessor(config)
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize all indexing components"""
await self.document_processor.initialize()
await self.vector_index.initialize()
self.logger.info("Index manager initialized")
async def index_document(self, doc_id: str, content: str, source: str, metadata: Dict = None):
"""Index a document with both keyword and vector indexes"""
# Process the document
processed_doc = await self.document_processor.process_document(
doc_id, content, source, metadata
)
# Add to vector index
await self.vector_index.add_document_chunks(doc_id, processed_doc.chunks)
# Add to keyword index
self._add_to_keyword_index(processed_doc)
self.logger.info(f"Indexed document {doc_id} with {len(processed_doc.chunks)} chunks")
return processed_doc
def _add_to_keyword_index(self, processed_doc: ProcessedDocument):
"""Add document to keyword-based index"""
doc_id = processed_doc.doc_id
# Create inverted index for keywords
words = processed_doc.content.lower().split()
for word in set(words):
if word not in self.keyword_index:
self.keyword_index[word] = {}
if doc_id not in self.keyword_index[word]:
self.keyword_index[word][doc_id] = 0
self.keyword_index[word][doc_id] += words.count(word)
async def search(self, query: str, search_type: str = 'hybrid') -> List[SearchResult]:
"""Search using specified method"""
if search_type == 'vector':
return await self._vector_search(query)
elif search_type == 'keyword':
return await self._keyword_search(query)
else: # hybrid
return await self._hybrid_search(query)
async def _vector_search(self, query: str) -> List[SearchResult]:
"""Perform vector-based semantic search"""
similar_chunks = await self.vector_index.search_similar(query, self.config.max_results)
results = []
for content, score in similar_chunks:
results.append(SearchResult(
content=content,
source='vector_search',
relevance_score=score,
metadata={'search_type': 'vector'}
))
return results
async def _keyword_search(self, query: str) -> List[SearchResult]:
"""Perform keyword-based search"""
query_words = query.lower().split()
doc_scores = defaultdict(float)
for word in query_words:
if word in self.keyword_index:
for doc_id, count in self.keyword_index[word].items():
doc_scores[doc_id] += count
# Sort by score and return top results
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
results = []
for doc_id, score in sorted_docs[:self.config.max_results]:
results.append(SearchResult(
content=f"Document {doc_id}", # Would retrieve actual content
source=doc_id,
relevance_score=score,
metadata={'search_type': 'keyword'}
))
return results
async def _hybrid_search(self, query: str) -> List[SearchResult]:
"""Combine vector and keyword search results"""
vector_results = await self._vector_search(query)
keyword_results = await self._keyword_search(query)
# Combine and re-rank results
all_results = vector_results + keyword_results
# Simple scoring combination (could be more sophisticated)
for result in vector_results:
result.relevance_score *= 0.7 # Weight for vector search
for result in keyword_results:
result.relevance_score *= 0.3 # Weight for keyword search
# Remove duplicates and sort
seen_content = set()
unique_results = []
for result in sorted(all_results, key=lambda x: x.relevance_score, reverse=True):
content_hash = hashlib.md5(result.content.encode()).hexdigest()
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_results.append(result)
return unique_results[:self.config.max_results]
This comprehensive document processing and indexing system provides sophisticated capabilities for handling diverse document types and creating efficient search indexes. The semantic chunking approach ensures that document segments maintain coherent meaning, while the hybrid indexing strategy combines the precision of keyword search with the understanding capabilities of semantic search.
The vector index implementation using FAISS provides scalable similarity search capabilities that can handle large document collections efficiently. The document processor includes advanced NLP features like entity extraction and summarization, which enhance the search experience by providing richer context and metadata.
The modular design allows for easy extension with additional document types, processing techniques, and indexing strategies as requirements evolve.
Query Processing and Response Generation
The query processing and response generation system represents the core intelligence of the agentic search engine. This system must understand user intent, formulate appropriate search strategies, synthesize information from multiple sources, and generate coherent, helpful responses that address the user's needs.
The query processing pipeline includes intent analysis, query expansion, search strategy selection, and result ranking. The response generation component takes the retrieved information and uses the configured language model to create comprehensive answers that cite sources and provide additional context.
The system implements sophisticated reasoning capabilities that allow it to handle complex queries requiring multi-step analysis, comparison of information from different sources, and synthesis of concepts that may not be explicitly stated in any single document.
Here is the implementation of the advanced query processing and response generation system:
import re
from typing import Set, Tuple
from dataclasses import dataclass
from enum import Enum
import spacy
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class QueryType(Enum):
FACTUAL = "factual"
CONCEPTUAL = "conceptual"
COMPARATIVE = "comparative"
PROCEDURAL = "procedural"
ANALYTICAL = "analytical"
class SearchStrategy(Enum):
KEYWORD_FOCUSED = "keyword_focused"
SEMANTIC_FOCUSED = "semantic_focused"
HYBRID_BALANCED = "hybrid_balanced"
MULTI_STEP = "multi_step"
@dataclass
class QueryAnalysis:
original_query: str
query_type: QueryType
key_concepts: List[str]
entities: List[Dict]
expanded_terms: List[str]
search_strategy: SearchStrategy
confidence: float
intent_description: str
@dataclass
class SearchPlan:
primary_query: str
sub_queries: List[str]
search_strategy: SearchStrategy
max_results_per_query: int
result_fusion_method: str
expected_answer_type: str
class QueryProcessor:
"""Analyzes and processes user queries to determine optimal search strategy"""
def __init__(self, config: SearchConfig):
self.config = config
self.nlp_model = None
self.query_patterns = self._initialize_query_patterns()
self.concept_extractor = None
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize NLP components for query processing"""
try:
self.nlp_model = spacy.load("en_core_web_sm")
self.concept_extractor = ConceptExtractor()
await self.concept_extractor.initialize()
self.logger.info("Query processor initialized")
except Exception as e:
self.logger.warning(f"Failed to initialize NLP components: {e}")
def _initialize_query_patterns(self) -> Dict[QueryType, List[str]]:
"""Initialize patterns for query type classification"""
return {
QueryType.FACTUAL: [
r'\bwhat is\b', r'\bwho is\b', r'\bwhen did\b', r'\bwhere is\b',
r'\bhow many\b', r'\bhow much\b', r'\bwhich\b', r'\bdefine\b'
],
QueryType.CONCEPTUAL: [
r'\bexplain\b', r'\bdescribe\b', r'\bwhy\b', r'\bhow does\b',
r'\bwhat are the principles\b', r'\bwhat is the concept\b'
],
QueryType.COMPARATIVE: [
r'\bcompare\b', r'\bversus\b', r'\bvs\b', r'\bdifference between\b',
r'\bsimilar\b', r'\bbetter than\b', r'\bworst\b', r'\bbest\b'
],
QueryType.PROCEDURAL: [
r'\bhow to\b', r'\bsteps\b', r'\bprocess\b', r'\bprocedure\b',
r'\binstructions\b', r'\bguide\b', r'\btutorial\b'
],
QueryType.ANALYTICAL: [
r'\banalyze\b', r'\bevaluate\b', r'\bassess\b', r'\bimpact\b',
r'\bimplications\b', r'\bconsequences\b', r'\btrends\b'
]
}
async def analyze_query(self, query: str) -> QueryAnalysis:
"""Perform comprehensive analysis of the user query"""
# Classify query type
query_type = self._classify_query_type(query)
# Extract key concepts and entities
key_concepts = await self._extract_key_concepts(query)
entities = await self._extract_entities(query)
# Expand query terms
expanded_terms = await self._expand_query_terms(query, key_concepts)
# Determine optimal search strategy
search_strategy = self._determine_search_strategy(query_type, key_concepts)
# Calculate confidence in analysis
confidence = self._calculate_analysis_confidence(query, query_type, key_concepts)
# Generate intent description
intent_description = self._generate_intent_description(query, query_type)
return QueryAnalysis(
original_query=query,
query_type=query_type,
key_concepts=key_concepts,
entities=entities,
expanded_terms=expanded_terms,
search_strategy=search_strategy,
confidence=confidence,
intent_description=intent_description
)
def _classify_query_type(self, query: str) -> QueryType:
"""Classify the query into one of the predefined types"""
query_lower = query.lower()
type_scores = {}
for query_type, patterns in self.query_patterns.items():
score = 0
for pattern in patterns:
if re.search(pattern, query_lower):
score += 1
type_scores[query_type] = score
# Return the type with the highest score, default to CONCEPTUAL
if not type_scores or max(type_scores.values()) == 0:
return QueryType.CONCEPTUAL
return max(type_scores, key=type_scores.get)
async def _extract_key_concepts(self, query: str) -> List[str]:
"""Extract key concepts from the query"""
if not self.nlp_model:
return query.split()
try:
doc = await asyncio.to_thread(self.nlp_model, query)
concepts = []
# Extract noun phrases
for chunk in doc.noun_chunks:
if len(chunk.text.split()) > 1: # Multi-word concepts
concepts.append(chunk.text.lower())
# Extract important single words
for token in doc:
if (token.pos_ in ['NOUN', 'PROPN', 'ADJ'] and
not token.is_stop and
len(token.text) > 2):
concepts.append(token.lemma_.lower())
return list(set(concepts))
except Exception as e:
self.logger.warning(f"Concept extraction failed: {e}")
return query.split()
async def _extract_entities(self, query: str) -> List[Dict]:
"""Extract named entities from the query"""
if not self.nlp_model:
return []
try:
doc = await asyncio.to_thread(self.nlp_model, query)
entities = []
for ent in doc.ents:
entities.append({
'text': ent.text,
'label': ent.label_,
'start': ent.start_char,
'end': ent.end_char
})
return entities
except Exception as e:
self.logger.warning(f"Entity extraction failed: {e}")
return []
async def _expand_query_terms(self, query: str, key_concepts: List[str]) -> List[str]:
"""Expand query terms with synonyms and related concepts"""
if not self.concept_extractor:
return key_concepts
expanded_terms = set(key_concepts)
for concept in key_concepts:
try:
related_terms = await self.concept_extractor.get_related_terms(concept)
expanded_terms.update(related_terms[:3]) # Limit expansion
except Exception as e:
self.logger.debug(f"Term expansion failed for {concept}: {e}")
return list(expanded_terms)
def _determine_search_strategy(self, query_type: QueryType, key_concepts: List[str]) -> SearchStrategy:
"""Determine the optimal search strategy based on query analysis"""
if query_type == QueryType.FACTUAL:
return SearchStrategy.KEYWORD_FOCUSED
elif query_type == QueryType.CONCEPTUAL:
return SearchStrategy.SEMANTIC_FOCUSED
elif query_type == QueryType.COMPARATIVE:
return SearchStrategy.MULTI_STEP
elif query_type == QueryType.PROCEDURAL:
return SearchStrategy.HYBRID_BALANCED
else: # ANALYTICAL
return SearchStrategy.MULTI_STEP
def _calculate_analysis_confidence(self, query: str, query_type: QueryType,
key_concepts: List[str]) -> float:
"""Calculate confidence in the query analysis"""
confidence = 0.5 # Base confidence
# Increase confidence based on clear patterns
query_lower = query.lower()
patterns = self.query_patterns.get(query_type, [])
pattern_matches = sum(1 for pattern in patterns if re.search(pattern, query_lower))
confidence += min(0.3, pattern_matches * 0.1)
# Increase confidence based on concept extraction quality
if len(key_concepts) > 0:
confidence += min(0.2, len(key_concepts) * 0.05)
return min(1.0, confidence)
def _generate_intent_description(self, query: str, query_type: QueryType) -> str:
"""Generate a description of the user's intent"""
intent_templates = {
QueryType.FACTUAL: "User is seeking specific factual information",
QueryType.CONCEPTUAL: "User wants to understand concepts or explanations",
QueryType.COMPARATIVE: "User is comparing different options or concepts",
QueryType.PROCEDURAL: "User needs step-by-step instructions or procedures",
QueryType.ANALYTICAL: "User requires analysis or evaluation of information"
}
return intent_templates.get(query_type, "User has a general information need")
class ConceptExtractor:
"""Extracts and expands concepts using various techniques"""
def __init__(self):
self.word_vectors = None
self.concept_graph = {}
async def initialize(self):
"""Initialize concept extraction resources"""
# In a real implementation, this would load word embeddings,
# knowledge graphs, or other semantic resources
self.logger = logging.getLogger(__name__)
self.logger.info("Concept extractor initialized")
async def get_related_terms(self, term: str) -> List[str]:
"""Get related terms for concept expansion"""
# Placeholder implementation
# In practice, this would use word embeddings, WordNet, or knowledge graphs
related_terms = {
'machine learning': ['artificial intelligence', 'ML', 'algorithms'],
'python': ['programming', 'coding', 'development'],
'database': ['SQL', 'data storage', 'DBMS']
}
return related_terms.get(term.lower(), [])
class SearchPlanner:
"""Creates detailed search plans based on query analysis"""
def __init__(self, config: SearchConfig):
self.config = config
self.logger = logging.getLogger(__name__)
async def create_search_plan(self, query_analysis: QueryAnalysis) -> SearchPlan:
"""Create a comprehensive search plan"""
if query_analysis.search_strategy == SearchStrategy.MULTI_STEP:
return await self._create_multi_step_plan(query_analysis)
else:
return await self._create_single_step_plan(query_analysis)
async def _create_single_step_plan(self, query_analysis: QueryAnalysis) -> SearchPlan:
"""Create a single-step search plan"""
return SearchPlan(
primary_query=query_analysis.original_query,
sub_queries=[],
search_strategy=query_analysis.search_strategy,
max_results_per_query=self.config.max_results,
result_fusion_method='relevance_weighted',
expected_answer_type=self._determine_answer_type(query_analysis.query_type)
)
async def _create_multi_step_plan(self, query_analysis: QueryAnalysis) -> SearchPlan:
"""Create a multi-step search plan for complex queries"""
sub_queries = await self._decompose_query(query_analysis)
return SearchPlan(
primary_query=query_analysis.original_query,
sub_queries=sub_queries,
search_strategy=query_analysis.search_strategy,
max_results_per_query=max(5, self.config.max_results // len(sub_queries)),
result_fusion_method='hierarchical_synthesis',
expected_answer_type=self._determine_answer_type(query_analysis.query_type)
)
async def _decompose_query(self, query_analysis: QueryAnalysis) -> List[str]:
"""Decompose complex queries into sub-queries"""
if query_analysis.query_type == QueryType.COMPARATIVE:
return await self._create_comparative_subqueries(query_analysis)
elif query_analysis.query_type == QueryType.ANALYTICAL:
return await self._create_analytical_subqueries(query_analysis)
else:
return [query_analysis.original_query]
async def _create_comparative_subqueries(self, query_analysis: QueryAnalysis) -> List[str]:
"""Create sub-queries for comparative analysis"""
entities = [ent['text'] for ent in query_analysis.entities]
if len(entities) >= 2:
# Create individual queries for each entity
sub_queries = []
for entity in entities[:3]: # Limit to 3 entities
sub_queries.append(f"What are the characteristics of {entity}?")
sub_queries.append(f"What are the advantages and disadvantages of {entity}?")
return sub_queries
else:
return [query_analysis.original_query]
async def _create_analytical_subqueries(self, query_analysis: QueryAnalysis) -> List[str]:
"""Create sub-queries for analytical queries"""
key_concepts = query_analysis.key_concepts[:3] # Limit concepts
sub_queries = []
for concept in key_concepts:
sub_queries.append(f"What is {concept}?")
sub_queries.append(f"How does {concept} work?")
sub_queries.append(f"What are the implications of {concept}?")
return sub_queries
def _determine_answer_type(self, query_type: QueryType) -> str:
"""Determine the expected type of answer"""
answer_types = {
QueryType.FACTUAL: 'direct_answer',
QueryType.CONCEPTUAL: 'explanation',
QueryType.COMPARATIVE: 'comparison_table',
QueryType.PROCEDURAL: 'step_by_step',
QueryType.ANALYTICAL: 'analysis_report'
}
return answer_types.get(query_type, 'general_response')
class ResponseGenerator:
"""Generates comprehensive responses using LLM and retrieved information"""
def __init__(self, config: SearchConfig, llm_client: LLMClient):
self.config = config
self.llm_client = llm_client
self.response_templates = self._initialize_response_templates()
self.logger = logging.getLogger(__name__)
def _initialize_response_templates(self) -> Dict[str, str]:
"""Initialize response templates for different query types"""
return {
'direct_answer': """Based on the provided information, here is a direct answer to your question:
{answer}
Sources: {sources}""",
'explanation': """Here's a comprehensive explanation of your question:
{explanation}
Key points:
{key_points}
For more details, refer to: {sources}""",
'comparison_table': """Here's a detailed comparison based on the available information:
{comparison}
Summary:
{summary}
Sources: {sources}""",
'step_by_step': """Here are the step-by-step instructions:
{steps}
Important notes:
{notes}
References: {sources}""",
'analysis_report': """Analysis Report:
Executive Summary:
{summary}
Detailed Analysis:
{analysis}
Conclusions:
{conclusions}
Sources: {sources}"""
}
async def generate_response(self, query: str, search_results: List[SearchResult],
query_analysis: QueryAnalysis, search_plan: SearchPlan) -> SearchResponse:
"""Generate a comprehensive response based on search results"""
start_time = time.time()
# Prepare context for LLM
context = self._prepare_context(search_results, query_analysis)
# Generate the main response
if search_plan.expected_answer_type == 'comparison_table':
response_text = await self._generate_comparative_response(query, context, search_results)
elif search_plan.expected_answer_type == 'step_by_step':
response_text = await self._generate_procedural_response(query, context, search_results)
elif search_plan.expected_answer_type == 'analysis_report':
response_text = await self._generate_analytical_response(query, context, search_results)
else:
response_text = await self._generate_standard_response(query, context, search_results)
# Generate follow-up questions
follow_up_questions = await self._generate_follow_up_questions(query, query_analysis, response_text)
# Calculate confidence score
confidence_score = self._calculate_response_confidence(search_results, query_analysis)
processing_time = time.time() - start_time
return SearchResponse(
query=query,
answer=response_text,
sources=search_results,
processing_time=processing_time,
model_used=self.config.llm_model,
confidence_score=confidence_score,
follow_up_questions=follow_up_questions
)
def _prepare_context(self, search_results: List[SearchResult], query_analysis: QueryAnalysis) -> str:
"""Prepare context information for the LLM"""
if not search_results:
return "No relevant information found in the knowledge base."
context_parts = []
for i, result in enumerate(search_results[:5], 1):
context_parts.append(f"Source {i} (Relevance: {result.relevance_score:.2f}):")
context_parts.append(f"Content: {result.content}")
context_parts.append(f"Source: {result.source}")
context_parts.append("")
return "\n".join(context_parts)
async def _generate_standard_response(self, query: str, context: str,
search_results: List[SearchResult]) -> str:
"""Generate a standard response using the LLM"""
prompt = f"""You are an intelligent search assistant. Based on the provided context information, please answer the user's question comprehensively and accurately.
Context Information:
{context}
User Question: {query}
Please provide a detailed answer that:
1. Directly addresses the user's question
2. Uses information from the provided sources
3. Cites specific sources when making claims
4. Indicates if the available information is insufficient
5. Maintains a helpful and informative tone
Answer:"""
try:
response = await self.llm_client.generate_response(prompt)
return response
except Exception as e:
self.logger.error(f"LLM response generation failed: {e}")
return self._generate_fallback_response(query, search_results)
async def _generate_comparative_response(self, query: str, context: str,
search_results: List[SearchResult]) -> str:
"""Generate a comparative analysis response"""
prompt = f"""You are tasked with creating a comprehensive comparison based on the provided information.
Context Information:
{context}
User Question: {query}
Please provide a structured comparison that includes:
1. A clear comparison table or structured format
2. Key similarities and differences
3. Pros and cons for each item being compared
4. A summary recommendation if appropriate
5. Citations to specific sources
Comparison:"""
try:
response = await self.llm_client.generate_response(prompt)
return response
except Exception as e:
self.logger.error(f"Comparative response generation failed: {e}")
return self._generate_fallback_response(query, search_results)
async def _generate_procedural_response(self, query: str, context: str,
search_results: List[SearchResult]) -> str:
"""Generate a step-by-step procedural response"""
prompt = f"""You are providing step-by-step instructions based on the available information.
Context Information:
{context}
User Question: {query}
Please provide clear, actionable instructions that include:
1. Numbered steps in logical order
2. Prerequisites or requirements
3. Important warnings or notes
4. Expected outcomes for each step
5. References to source materials
Instructions:"""
try:
response = await self.llm_client.generate_response(prompt)
return response
except Exception as e:
self.logger.error(f"Procedural response generation failed: {e}")
return self._generate_fallback_response(query, search_results)
async def _generate_analytical_response(self, query: str, context: str,
search_results: List[SearchResult]) -> str:
"""Generate an analytical response with detailed analysis"""
prompt = f"""You are conducting a thorough analysis based on the provided information.
Context Information:
{context}
User Question: {query}
Please provide a comprehensive analysis that includes:
1. Executive summary of key findings
2. Detailed analysis of the main points
3. Supporting evidence from sources
4. Implications and consequences
5. Conclusions and recommendations
6. Areas where more information might be needed
Analysis:"""
try:
response = await self.llm_client.generate_response(prompt)
return response
except Exception as e:
self.logger.error(f"Analytical response generation failed: {e}")
return self._generate_fallback_response(query, search_results)
def _generate_fallback_response(self, query: str, search_results: List[SearchResult]) -> str:
"""Generate a fallback response when LLM fails"""
if not search_results:
return "I apologize, but I couldn't find relevant information to answer your question. Please try rephrasing your query or providing more specific details."
response_parts = [
f"Based on the available information, here's what I found regarding your question: {query}",
"",
"Key information from the sources:"
]
for i, result in enumerate(search_results[:3], 1):
response_parts.append(f"{i}. {result.content[:200]}... (Source: {result.source})")
response_parts.append("")
response_parts.append("For more detailed information, please refer to the sources listed above.")
return "\n".join(response_parts)
async def _generate_follow_up_questions(self, query: str, query_analysis: QueryAnalysis,
response: str) -> List[str]:
"""Generate relevant follow-up questions"""
prompt = f"""Based on the user's original question and the response provided, suggest 3 relevant follow-up questions that the user might want to ask.
Original Question: {query}
Response: {response[:500]}...
Please provide 3 concise, relevant follow-up questions that would help the user explore the topic further:"""
try:
follow_up_response = await self.llm_client.generate_response(prompt)
# Extract questions from the response
questions = []
for line in follow_up_response.split('\n'):
line = line.strip()
if line and ('?' in line or line.startswith(('1.', '2.', '3.', '-', '*'))):
# Clean up the question
question = re.sub(r'^[\d\.\-\*\s]+', '', line).strip()
if question and question.endswith('?'):
questions.append(question)
return questions[:3]
except Exception as e:
self.logger.warning(f"Follow-up question generation failed: {e}")
return []
def _calculate_response_confidence(self, search_results: List[SearchResult],
query_analysis: QueryAnalysis) -> float:
"""Calculate confidence score for the response"""
if not search_results:
return 0.1
# Base confidence from query analysis
confidence = query_analysis.confidence * 0.3
# Add confidence based on search result quality
avg_relevance = sum(result.relevance_score for result in search_results) / len(search_results)
confidence += avg_relevance * 0.4
# Add confidence based on number of sources
source_bonus = min(0.3, len(search_results) * 0.1)
confidence += source_bonus
return min(1.0, confidence)
This comprehensive query processing and response generation system provides sophisticated capabilities for understanding user intent and generating intelligent responses. The multi-stage analysis pipeline ensures that queries are properly understood and appropriate search strategies are employed.
The response generation system adapts its output format and style based on the type of query, providing structured comparisons for comparative queries, step-by-step instructions for procedural queries, and detailed analysis for analytical queries. The follow-up question generation helps users explore topics more deeply and discover related information.
The confidence scoring system provides transparency about the reliability of responses, helping users understand when additional information might be needed or when the system has high confidence in its answers.
Application Integration Framework
The application integration framework provides the necessary abstractions and interfaces for seamlessly embedding the search engine into existing applications. This framework must handle diverse integration scenarios, from simple API-based integration to deep embedding within complex application architectures.
The framework supports multiple integration patterns including REST APIs, SDK integration, plugin architectures, and direct library embedding. It provides configuration management, authentication, authorization, and monitoring capabilities that respect the host application's existing infrastructure and security requirements.
The integration framework also handles data synchronization, ensuring that the search engine's document index remains current with the host application's data while respecting access controls and data governance policies.
Here is the comprehensive application integration framework implementation:
from abc import ABC, abstractmethod
from typing import Callable, Awaitable
import jwt
from datetime import datetime, timedelta
import asyncio
from contextlib import asynccontextmanager
import weakref
class IntegrationMode(Enum):
REST_API = "rest_api"
SDK_EMBEDDED = "sdk_embedded"
PLUGIN = "plugin"
MICROSERVICE = "microservice"
class AuthenticationMethod(Enum):
API_KEY = "api_key"
JWT_TOKEN = "jwt_token"
OAUTH2 = "oauth2"
CUSTOM = "custom"
@dataclass
class IntegrationConfig:
mode: IntegrationMode
auth_method: AuthenticationMethod
api_prefix: str = "/search"
max_requests_per_minute: int = 100
enable_cors: bool = True
allowed_origins: List[str] = field(default_factory=lambda: ["*"])
webhook_endpoints: List[str] = field(default_factory=list)
custom_headers: Dict[str, str] = field(default_factory=dict)
encryption_key: Optional[str] = None
session_timeout: int = 3600
class ApplicationContext:
"""Represents the context of the host application"""
def __init__(self, app_id: str, user_id: Optional[str] = None,
permissions: List[str] = None, metadata: Dict = None):
self.app_id = app_id
self.user_id = user_id
self.permissions = permissions or []
self.metadata = metadata or {}
self.created_at = datetime.utcnow()
self.last_activity = datetime.utcnow()
def has_permission(self, permission: str) -> bool:
"""Check if the context has a specific permission"""
return permission in self.permissions or 'admin' in self.permissions
def update_activity(self):
"""Update the last activity timestamp"""
self.last_activity = datetime.utcnow()
def is_expired(self, timeout_seconds: int) -> bool:
"""Check if the context has expired"""
return (datetime.utcnow() - self.last_activity).total_seconds() > timeout_seconds
class SearchEngineSDK:
"""SDK for embedding the search engine in applications"""
def __init__(self, config: SearchConfig, integration_config: IntegrationConfig):
self.config = config
self.integration_config = integration_config
self.search_engine = None
self.auth_manager = None
self.rate_limiter = None
self.event_handlers = {}
self.active_contexts = weakref.WeakValueDictionary()
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize the SDK and underlying search engine"""
# Initialize the search engine based on configuration
if self.config.deployment_mode == DeploymentMode.EMBEDDED:
self.search_engine = EmbeddedSearchEngine(self.config, self)
else:
self.search_engine = StandaloneSearchEngine(self.config)
await self.search_engine.initialize()
# Initialize authentication manager
self.auth_manager = AuthenticationManager(self.integration_config)
await self.auth_manager.initialize()
# Initialize rate limiter
self.rate_limiter = RateLimiter(self.integration_config.max_requests_per_minute)
self.logger.info("Search Engine SDK initialized")
async def create_application_context(self, app_id: str, user_id: Optional[str] = None,
permissions: List[str] = None) -> ApplicationContext:
"""Create a new application context for search operations"""
context = ApplicationContext(app_id, user_id, permissions)
self.active_contexts[context.app_id] = context
await self._trigger_event('context_created', context)
return context
async def search(self, query: str, context: ApplicationContext,
filters: Optional[Dict] = None) -> SearchResponse:
"""Perform a search operation within the application context"""
# Validate context and permissions
if not await self._validate_context(context):
raise PermissionError("Invalid or expired application context")
# Check rate limits
if not await self.rate_limiter.check_rate_limit(context.app_id):
raise RateLimitExceededError("Rate limit exceeded")
# Apply context-specific filters
context_filters = await self._apply_context_filters(context, filters)
# Perform the search
try:
if hasattr(self.search_engine, 'search_with_filters'):
response = await self.search_engine.search_with_filters(query, context_filters)
else:
response = await self.search_engine.search(query, context_filters)
# Update context activity
context.update_activity()
# Trigger search event
await self._trigger_event('search_performed', {
'context': context,
'query': query,
'response': response
})
return response
except Exception as e:
await self._trigger_event('search_error', {
'context': context,
'query': query,
'error': str(e)
})
raise
async def add_documents(self, documents: List[Dict], context: ApplicationContext):
"""Add documents to the search index within the application context"""
if not context.has_permission('write'):
raise PermissionError("Write permission required")
for doc in documents:
# Apply context-specific metadata
doc['metadata'] = doc.get('metadata', {})
doc['metadata']['app_id'] = context.app_id
doc['metadata']['added_by'] = context.user_id
doc['metadata']['added_at'] = datetime.utcnow().isoformat()
await self.search_engine.document_store.add_document(
doc_id=doc['id'],
content=doc['content'],
source=doc.get('source', 'application'),
metadata=doc['metadata']
)
await self._trigger_event('documents_added', {
'context': context,
'document_count': len(documents)
})
async def _validate_context(self, context: ApplicationContext) -> bool:
"""Validate the application context"""
if context.is_expired(self.integration_config.session_timeout):
return False
# Additional validation logic can be added here
return True
async def _apply_context_filters(self, context: ApplicationContext,
filters: Optional[Dict]) -> Dict:
"""Apply context-specific filters to search operations"""
context_filters = filters or {}
# Add application-specific filters
context_filters['app_id'] = context.app_id
# Add user-specific filters if applicable
if context.user_id:
context_filters['accessible_by'] = context.user_id
# Add permission-based filters
if not context.has_permission('read_all'):
context_filters['public_only'] = True
return context_filters
def register_event_handler(self, event_type: str, handler: Callable):
"""Register an event handler for integration events"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
async def _trigger_event(self, event_type: str, data: Any):
"""Trigger registered event handlers"""
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
try:
if asyncio.iscoroutinefunction(handler):
await handler(data)
else:
handler(data)
except Exception as e:
self.logger.error(f"Event handler error for {event_type}: {e}")
class AuthenticationManager:
"""Manages authentication for the search engine integration"""
def __init__(self, integration_config: IntegrationConfig):
self.config = integration_config
self.api_keys = {}
self.jwt_secret = None
self.oauth_config = {}
self.custom_auth_handler = None
async def initialize(self):
"""Initialize the authentication manager"""
if self.config.auth_method == AuthenticationMethod.JWT_TOKEN:
self.jwt_secret = self.config.encryption_key or "default_secret"
elif self.config.auth_method == AuthenticationMethod.API_KEY:
# Load API keys from configuration or database
await self._load_api_keys()
self.logger = logging.getLogger(__name__)
self.logger.info(f"Authentication manager initialized with {self.config.auth_method.value}")
async def _load_api_keys(self):
"""Load API keys from storage"""
# In a real implementation, this would load from a secure store
self.api_keys = {
"demo_key": {
"app_id": "demo_app",
"permissions": ["read", "write"],
"expires": None
}
}
async def authenticate(self, credentials: Dict) -> Optional[ApplicationContext]:
"""Authenticate a request and return application context"""
if self.config.auth_method == AuthenticationMethod.API_KEY:
return await self._authenticate_api_key(credentials)
elif self.config.auth_method == AuthenticationMethod.JWT_TOKEN:
return await self._authenticate_jwt(credentials)
elif self.config.auth_method == AuthenticationMethod.CUSTOM:
return await self._authenticate_custom(credentials)
else:
raise NotImplementedError(f"Authentication method {self.config.auth_method} not implemented")
async def _authenticate_api_key(self, credentials: Dict) -> Optional[ApplicationContext]:
"""Authenticate using API key"""
api_key = credentials.get('api_key')
if not api_key or api_key not in self.api_keys:
return None
key_info = self.api_keys[api_key]
# Check expiration
if key_info.get('expires') and datetime.utcnow() > key_info['expires']:
return None
return ApplicationContext(
app_id=key_info['app_id'],
permissions=key_info['permissions']
)
async def _authenticate_jwt(self, credentials: Dict) -> Optional[ApplicationContext]:
"""Authenticate using JWT token"""
token = credentials.get('jwt_token')
if not token:
return None
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
return ApplicationContext(
app_id=payload.get('app_id'),
user_id=payload.get('user_id'),
permissions=payload.get('permissions', [])
)
except jwt.InvalidTokenError:
return None
async def _authenticate_custom(self, credentials: Dict) -> Optional[ApplicationContext]:
"""Authenticate using custom handler"""
if self.custom_auth_handler:
return await self.custom_auth_handler(credentials)
return None
def set_custom_auth_handler(self, handler: Callable):
"""Set a custom authentication handler"""
self.custom_auth_handler = handler
class RateLimiter:
"""Implements rate limiting for API requests"""
def __init__(self, max_requests_per_minute: int):
self.max_requests = max_requests_per_minute
self.request_counts = {}
self.window_start = {}
async def check_rate_limit(self, identifier: str) -> bool:
"""Check if the request is within rate limits"""
current_time = datetime.utcnow()
# Reset window if needed
if identifier not in self.window_start or \
(current_time - self.window_start[identifier]).total_seconds() >= 60:
self.window_start[identifier] = current_time
self.request_counts[identifier] = 0
# Check current count
if self.request_counts[identifier] >= self.max_requests:
return False
# Increment count
self.request_counts[identifier] += 1
return True
class WebhookManager:
"""Manages webhook notifications for integration events"""
def __init__(self, webhook_endpoints: List[str]):
self.endpoints = webhook_endpoints
self.session = None
async def initialize(self):
"""Initialize the webhook manager"""
self.session = aiohttp.ClientSession()
async def send_webhook(self, event_type: str, data: Dict):
"""Send webhook notification to registered endpoints"""
if not self.endpoints:
return
payload = {
'event_type': event_type,
'timestamp': datetime.utcnow().isoformat(),
'data': data
}
for endpoint in self.endpoints:
try:
async with self.session.post(endpoint, json=payload) as response:
if response.status != 200:
self.logger.warning(f"Webhook failed for {endpoint}: {response.status}")
except Exception as e:
self.logger.error(f"Webhook error for {endpoint}: {e}")
async def cleanup(self):
"""Cleanup webhook manager resources"""
if self.session:
await self.session.close()
class PluginInterface:
"""Interface for plugin-based integration"""
def __init__(self, search_engine_sdk: SearchEngineSDK):
self.sdk = search_engine_sdk
self.plugin_registry = {}
def register_plugin(self, plugin_name: str, plugin_instance):
"""Register a plugin with the search engine"""
self.plugin_registry[plugin_name] = plugin_instance
# Set up plugin event handlers
if hasattr(plugin_instance, 'on_search'):
self.sdk.register_event_handler('search_performed', plugin_instance.on_search)
if hasattr(plugin_instance, 'on_document_added'):
self.sdk.register_event_handler('documents_added', plugin_instance.on_document_added)
def get_plugin(self, plugin_name: str):
"""Get a registered plugin"""
return self.plugin_registry.get(plugin_name)
async def execute_plugin_hook(self, hook_name: str, data: Any):
"""Execute a specific hook across all plugins"""
for plugin in self.plugin_registry.values():
if hasattr(plugin, hook_name):
try:
hook_method = getattr(plugin, hook_name)
if asyncio.iscoroutinefunction(hook_method):
await hook_method(data)
else:
hook_method(data)
except Exception as e:
self.logger.error(f"Plugin hook error: {e}")
class RESTAPIServer:
"""REST API server for the search engine"""
def __init__(self, sdk: SearchEngineSDK, integration_config: IntegrationConfig):
self.sdk = sdk
self.config = integration_config
self.app = FastAPI(title="Search Engine API")
self.webhook_manager = None
self._setup_routes()
self._setup_middleware()
def _setup_routes(self):
"""Setup API routes"""
@self.app.post(f"{self.config.api_prefix}/search")
async def search_endpoint(request: APISearchRequest,
context: ApplicationContext = Depends(self._get_context)):
try:
response = await self.sdk.search(request.query, context, request.filters)
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.post(f"{self.config.api_prefix}/documents")
async def add_documents_endpoint(request: APIAddDocumentsRequest,
context: ApplicationContext = Depends(self._get_context)):
try:
await self.sdk.add_documents(request.documents, context)
return {"status": "success", "count": len(request.documents)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get(f"{self.config.api_prefix}/health")
async def health_check():
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
def _setup_middleware(self):
"""Setup middleware for CORS, authentication, etc."""
if self.config.enable_cors:
from fastapi.middleware.cors import CORSMiddleware
self.app.add_middleware(
CORSMiddleware,
allow_origins=self.config.allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def _get_context(self, request: Request) -> ApplicationContext:
"""Extract and validate application context from request"""
# Extract credentials from headers
credentials = {}
if self.config.auth_method == AuthenticationMethod.API_KEY:
credentials['api_key'] = request.headers.get('X-API-Key')
elif self.config.auth_method == AuthenticationMethod.JWT_TOKEN:
auth_header = request.headers.get('Authorization', '')
if auth_header.startswith('Bearer '):
credentials['jwt_token'] = auth_header[7:]
# Authenticate and get context
context = await self.sdk.auth_manager.authenticate(credentials)
if not context:
raise HTTPException(status_code=401, detail="Authentication failed")
return context
async def start_server(self, host: str = "0.0.0.0", port: int = 8000):
"""Start the REST API server"""
import uvicorn
config = uvicorn.Config(self.app, host=host, port=port)
server = uvicorn.Server(config)
await server.serve()
# API request models
class APISearchRequest(BaseModel):
query: str
filters: Optional[Dict] = None
max_results: Optional[int] = None
class APIAddDocumentsRequest(BaseModel):
documents: List[Dict]
# Custom exceptions
class RateLimitExceededError(Exception):
pass
class PermissionError(Exception):
pass
This comprehensive application integration framework provides flexible and robust support for embedding the search engine into diverse application environments. The SDK approach allows for deep integration while maintaining clean separation of concerns and respecting host application architectures.
The authentication and authorization system supports multiple methods and can be easily extended with custom handlers. The rate limiting and webhook systems provide the operational capabilities needed for production deployments.
The plugin interface enables extensibility and customization, allowing host applications to modify search behavior and integrate with their existing workflows and business logic.
Performance Optimization and Caching
Performance optimization is crucial for providing responsive search experiences, especially when dealing with large document collections and complex queries. The optimization strategy encompasses multiple layers including query optimization, result caching, index optimization, and resource management.
The caching system implements multiple tiers of caching including query result caching, document chunk caching, and embedding caching. The cache invalidation strategy ensures that results remain current while maximizing cache hit rates for improved performance.
Index optimization includes techniques such as index compression, smart partitioning, and incremental updates to minimize search latency and memory usage. The system also implements connection pooling and resource management to handle concurrent requests efficiently.
Here is the comprehensive performance optimization and caching implementation:
import asyncio
import pickle
import zlib
from collections import OrderedDict
from typing import Optional, Tuple, Union
import redis.asyncio as redis
import psutil
import time
from concurrent.futures import ThreadPoolExecutor
import threading
from dataclasses import asdict
class CacheLevel(Enum):
MEMORY = "memory"
REDIS = "redis"
DISK = "disk"
class CacheStrategy(Enum):
LRU = "lru"
LFU = "lfu"
TTL = "ttl"
ADAPTIVE = "adaptive"
@dataclass
class CacheConfig:
levels: List[CacheLevel] = field(default_factory=lambda: [CacheLevel.MEMORY, CacheLevel.REDIS])
strategy: CacheStrategy = CacheStrategy.ADAPTIVE
memory_limit_mb: int = 512
redis_url: str = "redis://localhost:6379"
disk_cache_path: str = "./cache"
default_ttl: int = 3600
compression_enabled: bool = True
compression_threshold: int = 1024
class PerformanceConfig:
def __init__(self):
self.max_concurrent_searches: int = 50
self.query_timeout: float = 30.0
self.embedding_batch_size: int = 32
self.index_update_batch_size: int = 100
self.connection_pool_size: int = 20
self.enable_query_optimization: bool = True
self.enable_result_prefetching: bool = True
self.memory_threshold: float = 0.8
self.cpu_threshold: float = 0.9
class MultiLevelCache:
"""Multi-level caching system with memory, Redis, and disk tiers"""
def __init__(self, config: CacheConfig):
self.config = config
self.memory_cache = None
self.redis_client = None
self.disk_cache_path = Path(config.disk_cache_path)
self.cache_stats = {
'hits': 0,
'misses': 0,
'memory_hits': 0,
'redis_hits': 0,
'disk_hits': 0
}
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize all cache levels"""
# Initialize memory cache
if CacheLevel.MEMORY in self.config.levels:
self.memory_cache = MemoryCache(
max_size_mb=self.config.memory_limit_mb,
strategy=self.config.strategy
)
await self.memory_cache.initialize()
# Initialize Redis cache
if CacheLevel.REDIS in self.config.levels:
try:
self.redis_client = redis.from_url(self.config.redis_url)
await self.redis_client.ping()
self.logger.info("Redis cache initialized")
except Exception as e:
self.logger.warning(f"Redis initialization failed: {e}")
self.redis_client = None
# Initialize disk cache
if CacheLevel.DISK in self.config.levels:
self.disk_cache_path.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Disk cache initialized at {self.disk_cache_path}")
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache, checking all levels in order"""
# Check memory cache first
if self.memory_cache:
value = await self.memory_cache.get(key)
if value is not None:
self.cache_stats['hits'] += 1
self.cache_stats['memory_hits'] += 1
return value
# Check Redis cache
if self.redis_client:
try:
cached_data = await self.redis_client.get(key)
if cached_data:
value = self._deserialize(cached_data)
# Promote to memory cache
if self.memory_cache:
await self.memory_cache.set(key, value, ttl=self.config.default_ttl)
self.cache_stats['hits'] += 1
self.cache_stats['redis_hits'] += 1
return value
except Exception as e:
self.logger.warning(f"Redis get error: {e}")
# Check disk cache
if CacheLevel.DISK in self.config.levels:
value = await self._get_from_disk(key)
if value is not None:
# Promote to higher cache levels
if self.redis_client:
await self._set_redis(key, value, self.config.default_ttl)
if self.memory_cache:
await self.memory_cache.set(key, value, ttl=self.config.default_ttl)
self.cache_stats['hits'] += 1
self.cache_stats['disk_hits'] += 1
return value
self.cache_stats['misses'] += 1
return None
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Set value in all configured cache levels"""
ttl = ttl or self.config.default_ttl
# Set in memory cache
if self.memory_cache:
await self.memory_cache.set(key, value, ttl)
# Set in Redis cache
if self.redis_client:
await self._set_redis(key, value, ttl)
# Set in disk cache
if CacheLevel.DISK in self.config.levels:
await self._set_disk(key, value, ttl)
async def _set_redis(self, key: str, value: Any, ttl: int):
"""Set value in Redis with error handling"""
try:
serialized_data = self._serialize(value)
await self.redis_client.setex(key, ttl, serialized_data)
except Exception as e:
self.logger.warning(f"Redis set error: {e}")
async def _get_from_disk(self, key: str) -> Optional[Any]:
"""Get value from disk cache"""
try:
cache_file = self.disk_cache_path / f"{key}.cache"
if cache_file.exists():
# Check if file is expired
file_age = time.time() - cache_file.stat().st_mtime
if file_age > self.config.default_ttl:
cache_file.unlink()
return None
with open(cache_file, 'rb') as f:
data = f.read()
return self._deserialize(data)
except Exception as e:
self.logger.warning(f"Disk cache read error: {e}")
return None
async def _set_disk(self, key: str, value: Any, ttl: int):
"""Set value in disk cache"""
try:
cache_file = self.disk_cache_path / f"{key}.cache"
serialized_data = self._serialize(value)
with open(cache_file, 'wb') as f:
f.write(serialized_data)
except Exception as e:
self.logger.warning(f"Disk cache write error: {e}")
def _serialize(self, value: Any) -> bytes:
"""Serialize value with optional compression"""
data = pickle.dumps(value)
if self.config.compression_enabled and len(data) > self.config.compression_threshold:
data = zlib.compress(data)
return data
def _deserialize(self, data: bytes) -> Any:
"""Deserialize value with optional decompression"""
try:
# Try decompression first
if self.config.compression_enabled:
try:
data = zlib.decompress(data)
except zlib.error:
# Data wasn't compressed
pass
return pickle.loads(data)
except Exception as e:
self.logger.error(f"Deserialization error: {e}")
return None
async def invalidate(self, pattern: str):
"""Invalidate cache entries matching pattern"""
# Invalidate memory cache
if self.memory_cache:
await self.memory_cache.invalidate_pattern(pattern)
# Invalidate Redis cache
if self.redis_client:
try:
keys = await self.redis_client.keys(pattern)
if keys:
await self.redis_client.delete(*keys)
except Exception as e:
self.logger.warning(f"Redis invalidation error: {e}")
# Invalidate disk cache
if CacheLevel.DISK in self.config.levels:
await self._invalidate_disk_pattern(pattern)
async def _invalidate_disk_pattern(self, pattern: str):
"""Invalidate disk cache entries matching pattern"""
try:
# Simple pattern matching for disk files
pattern = pattern.replace('*', '')
for cache_file in self.disk_cache_path.glob(f"{pattern}*.cache"):
cache_file.unlink()
except Exception as e:
self.logger.warning(f"Disk cache invalidation error: {e}")
def get_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
total_requests = self.cache_stats['hits'] + self.cache_stats['misses']
hit_rate = self.cache_stats['hits'] / total_requests if total_requests > 0 else 0
return {
'hit_rate': hit_rate,
'total_requests': total_requests,
**self.cache_stats
}
class MemoryCache:
"""In-memory cache with configurable eviction strategies"""
def __init__(self, max_size_mb: int, strategy: CacheStrategy):
self.max_size_bytes = max_size_mb * 1024 * 1024
self.strategy = strategy
self.cache = OrderedDict()
self.access_counts = {}
self.access_times = {}
self.current_size = 0
self.lock = asyncio.Lock()
async def initialize(self):
"""Initialize the memory cache"""
pass
async def get(self, key: str) -> Optional[Any]:
"""Get value from memory cache"""
async with self.lock:
if key in self.cache:
value = self.cache[key]
self._update_access_stats(key)
return value
return None
async def set(self, key: str, value: Any, ttl: Optional[int] = None):
"""Set value in memory cache"""
async with self.lock:
# Calculate size of new entry
entry_size = len(pickle.dumps(value))
# Remove existing entry if present
if key in self.cache:
old_size = len(pickle.dumps(self.cache[key]))
self.current_size -= old_size
del self.cache[key]
# Ensure we have space
while self.current_size + entry_size > self.max_size_bytes and self.cache:
await self._evict_entry()
# Add new entry
self.cache[key] = value
self.current_size += entry_size
self._update_access_stats(key)
def _update_access_stats(self, key: str):
"""Update access statistics for cache entry"""
current_time = time.time()
self.access_times[key] = current_time
self.access_counts[key] = self.access_counts.get(key, 0) + 1
# Move to end for LRU
if self.strategy == CacheStrategy.LRU:
self.cache.move_to_end(key)
async def _evict_entry(self):
"""Evict an entry based on the configured strategy"""
if not self.cache:
return
if self.strategy == CacheStrategy.LRU:
# Remove least recently used (first item)
key = next(iter(self.cache))
elif self.strategy == CacheStrategy.LFU:
# Remove least frequently used
key = min(self.access_counts, key=self.access_counts.get)
else: # TTL or ADAPTIVE
# Remove oldest entry
key = min(self.access_times, key=self.access_times.get)
entry_size = len(pickle.dumps(self.cache[key]))
self.current_size -= entry_size
del self.cache[key]
self.access_counts.pop(key, None)
self.access_times.pop(key, None)
async def invalidate_pattern(self, pattern: str):
"""Invalidate entries matching pattern"""
async with self.lock:
pattern = pattern.replace('*', '')
keys_to_remove = [key for key in self.cache.keys() if pattern in key]
for key in keys_to_remove:
entry_size = len(pickle.dumps(self.cache[key]))
self.current_size -= entry_size
del self.cache[key]
self.access_counts.pop(key, None)
self.access_times.pop(key, None)
class QueryOptimizer:
"""Optimizes queries for better performance"""
def __init__(self, config: PerformanceConfig):
self.config = config
self.query_cache = {}
self.optimization_stats = {
'queries_optimized': 0,
'time_saved': 0.0
}
async def optimize_query(self, query: str, query_analysis: QueryAnalysis) -> Tuple[str, List[str]]:
"""Optimize query and generate sub-queries if beneficial"""
start_time = time.time()
# Check cache for previous optimizations
cache_key = hashlib.md5(query.encode()).hexdigest()
if cache_key in self.query_cache:
cached_result = self.query_cache[cache_key]
self.optimization_stats['time_saved'] += time.time() - start_time
return cached_result
optimized_query = query
sub_queries = []
# Apply query optimization techniques
if self.config.enable_query_optimization:
optimized_query = await self._apply_query_rewriting(query, query_analysis)
# Generate sub-queries for complex queries
if query_analysis.query_type in [QueryType.COMPARATIVE, QueryType.ANALYTICAL]:
sub_queries = await self._generate_optimized_subqueries(query_analysis)
result = (optimized_query, sub_queries)
self.query_cache[cache_key] = result
self.optimization_stats['queries_optimized'] += 1
optimization_time = time.time() - start_time
return result
async def _apply_query_rewriting(self, query: str, query_analysis: QueryAnalysis) -> str:
"""Apply query rewriting techniques"""
# Expand abbreviations and synonyms
expanded_query = query
# Add important terms from analysis
important_terms = query_analysis.key_concepts[:3] # Limit to avoid query bloat
for term in important_terms:
if term.lower() not in query.lower():
expanded_query += f" {term}"
return expanded_query
async def _generate_optimized_subqueries(self, query_analysis: QueryAnalysis) -> List[str]:
"""Generate optimized sub-queries"""
sub_queries = []
# Create focused sub-queries for each key concept
for concept in query_analysis.key_concepts[:2]: # Limit to 2 concepts
sub_queries.append(f"What is {concept}?")
sub_queries.append(f"How does {concept} work?")
return sub_queries
class ResourceManager:
"""Manages system resources and performance monitoring"""
def __init__(self, config: PerformanceConfig):
self.config = config
self.connection_pool = None
self.thread_pool = None
self.active_searches = 0
self.search_semaphore = None
self.monitoring_task = None
self.resource_stats = {
'cpu_usage': 0.0,
'memory_usage': 0.0,
'active_connections': 0,
'queue_size': 0
}
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize resource management"""
# Create semaphore for concurrent search limiting
self.search_semaphore = asyncio.Semaphore(self.config.max_concurrent_searches)
# Create thread pool for CPU-intensive tasks
self.thread_pool = ThreadPoolExecutor(
max_workers=min(32, (psutil.cpu_count() or 1) + 4)
)
# Start resource monitoring
self.monitoring_task = asyncio.create_task(self._monitor_resources())
self.logger.info("Resource manager initialized")
@asynccontextmanager
async def search_context(self):
"""Context manager for search operations with resource limiting"""
async with self.search_semaphore:
self.active_searches += 1
try:
yield
finally:
self.active_searches -= 1
async def execute_cpu_task(self, func, *args, **kwargs):
"""Execute CPU-intensive task in thread pool"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.thread_pool, func, *args, **kwargs)
async def _monitor_resources(self):
"""Monitor system resources continuously"""
while True:
try:
# Update resource statistics
self.resource_stats['cpu_usage'] = psutil.cpu_percent(interval=1)
self.resource_stats['memory_usage'] = psutil.virtual_memory().percent / 100
self.resource_stats['active_connections'] = self.active_searches
# Check for resource pressure
if self.resource_stats['memory_usage'] > self.config.memory_threshold:
self.logger.warning(f"High memory usage: {self.resource_stats['memory_usage']:.1%}")
await self._handle_memory_pressure()
if self.resource_stats['cpu_usage'] > self.config.cpu_threshold * 100:
self.logger.warning(f"High CPU usage: {self.resource_stats['cpu_usage']:.1f}%")
await self._handle_cpu_pressure()
await asyncio.sleep(5) # Monitor every 5 seconds
except Exception as e:
self.logger.error(f"Resource monitoring error: {e}")
await asyncio.sleep(10)
async def _handle_memory_pressure(self):
"""Handle high memory usage"""
# Trigger garbage collection
import gc
gc.collect()
# Could also trigger cache cleanup, reduce batch sizes, etc.
self.logger.info("Triggered memory pressure handling")
async def _handle_cpu_pressure(self):
"""Handle high CPU usage"""
# Reduce concurrent operations
if self.search_semaphore._value > 1:
# Temporarily reduce concurrent searches
self.logger.info("Reducing concurrent search limit due to CPU pressure")
def get_stats(self) -> Dict[str, Any]:
"""Get current resource statistics"""
return {
**self.resource_stats,
'thread_pool_size': self.thread_pool._max_workers if self.thread_pool else 0,
'search_semaphore_available': self.search_semaphore._value if self.search_semaphore else 0
}
async def cleanup(self):
"""Cleanup resources"""
if self.monitoring_task:
self.monitoring_task.cancel()
if self.thread_pool:
self.thread_pool.shutdown(wait=True)
class PerformanceOptimizedSearchEngine(AgenticSearchEngine):
"""Search engine with integrated performance optimizations"""
def __init__(self, config: SearchConfig, cache_config: CacheConfig,
performance_config: PerformanceConfig):
super().__init__(config)
self.cache_config = cache_config
self.performance_config = performance_config
self.cache = None
self.query_optimizer = None
self.resource_manager = None
async def initialize(self):
"""Initialize with performance optimizations"""
# Initialize base search engine
await super().initialize()
# Initialize performance components
self.cache = MultiLevelCache(self.cache_config)
await self.cache.initialize()
self.query_optimizer = QueryOptimizer(self.performance_config)
self.resource_manager = ResourceManager(self.performance_config)
await self.resource_manager.initialize()
self.logger.info("Performance-optimized search engine initialized")
async def search(self, query: str, context: Optional[Dict] = None) -> SearchResponse:
"""Optimized search with caching and resource management"""
async with self.resource_manager.search_context():
# Generate cache key
cache_key = self._generate_cache_key(query, context)
# Check cache first
cached_response = await self.cache.get(cache_key)
if cached_response:
self.logger.debug(f"Cache hit for query: {query}")
return cached_response
# Perform optimized search
start_time = time.time()
# Analyze and optimize query
query_analysis = await self.query_processor.analyze_query(query)
optimized_query, sub_queries = await self.query_optimizer.optimize_query(query, query_analysis)
# Execute search with optimizations
if sub_queries:
response = await self._execute_multi_query_search(optimized_query, sub_queries, context)
else:
response = await self._execute_single_query_search(optimized_query, context)
response.processing_time = time.time() - start_time
# Cache the response
await self.cache.set(cache_key, response, ttl=self.config.cache_ttl)
return response
async def _execute_multi_query_search(self, main_query: str, sub_queries: List[str],
context: Optional[Dict]) -> SearchResponse:
"""Execute search with multiple sub-queries"""
# Execute sub-queries in parallel
tasks = []
for sub_query in sub_queries:
task = asyncio.create_task(self._execute_single_query_search(sub_query, context))
tasks.append(task)
# Wait for all sub-queries to complete
sub_responses = await asyncio.gather(*tasks, return_exceptions=True)
# Combine results
all_sources = []
for response in sub_responses:
if isinstance(response, SearchResponse):
all_sources.extend(response.sources)
# Remove duplicates and re-rank
unique_sources = self._deduplicate_sources(all_sources)
# Generate final response
final_response = await self.response_generator.generate_response(
main_query, unique_sources, None, None
)
return final_response
async def _execute_single_query_search(self, query: str, context: Optional[Dict]) -> SearchResponse:
"""Execute a single optimized query"""
# Use the base search implementation but with resource management
return await super().search(query, context)
def _deduplicate_sources(self, sources: List[SearchResult]) -> List[SearchResult]:
"""Remove duplicate sources and re-rank"""
seen_content = set()
unique_sources = []
for source in sorted(sources, key=lambda x: x.relevance_score, reverse=True):
content_hash = hashlib.md5(source.content.encode()).hexdigest()
if content_hash not in seen_content:
seen_content.add(content_hash)
unique_sources.append(source)
return unique_sources[:self.config.max_results]
def _generate_cache_key(self, query: str, context: Optional[Dict]) -> str:
"""Generate cache key for query and context"""
context_str = json.dumps(context or {}, sort_keys=True)
cache_content = f"{query}_{context_str}_{self.config.llm_model}"
return hashlib.md5(cache_content.encode()).hexdigest()
async def get_performance_stats(self) -> Dict[str, Any]:
"""Get comprehensive performance statistics"""
stats = {
'cache_stats': self.cache.get_stats() if self.cache else {},
'resource_stats': self.resource_manager.get_stats() if self.resource_manager else {},
'optimization_stats': self.query_optimizer.optimization_stats if self.query_optimizer else {}
}
return stats
async def cleanup(self):
"""Cleanup performance optimization resources"""
if self.resource_manager:
await self.resource_manager.cleanup()
This comprehensive performance optimization and caching system provides multiple layers of optimization to ensure responsive search experiences even under high load. The multi-level caching system maximizes cache hit rates while providing fallback options when higher-level caches are unavailable.
The resource management system monitors system health and automatically adjusts behavior to prevent resource exhaustion. The query optimization system improves search efficiency by rewriting queries and generating optimized execution plans.
The integration of these optimization techniques ensures that the search engine can scale effectively while maintaining high performance and reliability in production environments.
Security and Privacy Considerations
Security and privacy are fundamental requirements for any search engine that handles sensitive information or operates in enterprise environments. The security framework must address authentication, authorization, data encryption, audit logging, and privacy protection while maintaining system performance and usability.
The security implementation includes multiple layers of protection including transport security, data encryption at rest and in transit, access control mechanisms, and comprehensive audit logging. Privacy protection features include data anonymization, retention policies, and compliance with regulations such as GDPR and CCPA.
The system also implements security monitoring and threat detection capabilities to identify and respond to potential security incidents in real-time.
Here is the comprehensive security and privacy implementation:
import secrets
import hashlib
import hmac
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import ipaddress
from datetime import datetime, timedelta
import re
from typing import Set, Pattern
import asyncio
import json
class SecurityLevel(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class EncryptionMethod(Enum):
AES_256 = "aes_256"
RSA_2048 = "rsa_2048"
HYBRID = "hybrid"
@dataclass
class SecurityConfig:
encryption_method: EncryptionMethod = EncryptionMethod.AES_256
require_https: bool = True
session_timeout: int = 3600
max_login_attempts: int = 5
lockout_duration: int = 900
audit_logging: bool = True
data_retention_days: int = 90
anonymization_enabled: bool = True
ip_whitelist: List[str] = field(default_factory=list)
rate_limit_per_ip: int = 100
enable_2fa: bool = False
password_policy: Dict[str, Any] = field(default_factory=lambda: {
'min_length': 8,
'require_uppercase': True,
'require_lowercase': True,
'require_numbers': True,
'require_special': True
})
class EncryptionManager:
"""Manages encryption and decryption operations"""
def __init__(self, config: SecurityConfig):
self.config = config
self.symmetric_key = None
self.private_key = None
self.public_key = None
self.fernet = None
async def initialize(self):
"""Initialize encryption components"""
if self.config.encryption_method in [EncryptionMethod.AES_256, EncryptionMethod.HYBRID]:
await self._initialize_symmetric_encryption()
if self.config.encryption_method in [EncryptionMethod.RSA_2048, EncryptionMethod.HYBRID]:
await self._initialize_asymmetric_encryption()
self.logger = logging.getLogger(__name__)
self.logger.info(f"Encryption manager initialized with {self.config.encryption_method.value}")
async def _initialize_symmetric_encryption(self):
"""Initialize AES symmetric encryption"""
# Generate or load symmetric key
key = Fernet.generate_key()
self.symmetric_key = key
self.fernet = Fernet(key)
async def _initialize_asymmetric_encryption(self):
"""Initialize RSA asymmetric encryption"""
# Generate RSA key pair
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
self.public_key = self.private_key.public_key()
async def encrypt_data(self, data: str, security_level: SecurityLevel = SecurityLevel.INTERNAL) -> str:
"""Encrypt data based on security level"""
if security_level == SecurityLevel.PUBLIC:
# No encryption for public data
return data
data_bytes = data.encode('utf-8')
if self.config.encryption_method == EncryptionMethod.AES_256:
encrypted = self.fernet.encrypt(data_bytes)
return base64.b64encode(encrypted).decode('utf-8')
elif self.config.encryption_method == EncryptionMethod.RSA_2048:
# RSA encryption (limited by key size)
max_chunk_size = 190 # For 2048-bit key
if len(data_bytes) > max_chunk_size:
raise ValueError("Data too large for RSA encryption")
encrypted = self.public_key.encrypt(
data_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return base64.b64encode(encrypted).decode('utf-8')
elif self.config.encryption_method == EncryptionMethod.HYBRID:
# Use AES for data, RSA for key
aes_key = Fernet.generate_key()
fernet = Fernet(aes_key)
# Encrypt data with AES
encrypted_data = fernet.encrypt(data_bytes)
# Encrypt AES key with RSA
encrypted_key = self.public_key.encrypt(
aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# Combine encrypted key and data
combined = base64.b64encode(encrypted_key).decode('utf-8') + ':' + base64.b64encode(encrypted_data).decode('utf-8')
return combined
return data
async def decrypt_data(self, encrypted_data: str, security_level: SecurityLevel = SecurityLevel.INTERNAL) -> str:
"""Decrypt data based on security level"""
if security_level == SecurityLevel.PUBLIC:
return encrypted_data
try:
if self.config.encryption_method == EncryptionMethod.AES_256:
encrypted_bytes = base64.b64decode(encrypted_data.encode('utf-8'))
decrypted = self.fernet.decrypt(encrypted_bytes)
return decrypted.decode('utf-8')
elif self.config.encryption_method == EncryptionMethod.RSA_2048:
encrypted_bytes = base64.b64decode(encrypted_data.encode('utf-8'))
decrypted = self.private_key.decrypt(
encrypted_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return decrypted.decode('utf-8')
elif self.config.encryption_method == EncryptionMethod.HYBRID:
# Split encrypted key and data
key_part, data_part = encrypted_data.split(':', 1)
# Decrypt AES key with RSA
encrypted_key = base64.b64decode(key_part.encode('utf-8'))
aes_key = self.private_key.decrypt(
encrypted_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# Decrypt data with AES
fernet = Fernet(aes_key)
encrypted_data_bytes = base64.b64decode(data_part.encode('utf-8'))
decrypted = fernet.decrypt(encrypted_data_bytes)
return decrypted.decode('utf-8')
except Exception as e:
raise ValueError(f"Decryption failed: {e}")
return encrypted_data
class AccessControlManager:
"""Manages access control and authorization"""
def __init__(self, config: SecurityConfig):
self.config = config
self.permissions = {}
self.role_permissions = {}
self.user_roles = {}
self.access_patterns = {}
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize access control system"""
await self._load_permissions()
await self._load_roles()
self.logger.info("Access control manager initialized")
async def _load_permissions(self):
"""Load permission definitions"""
self.permissions = {
'search:read': 'Perform search queries',
'search:write': 'Add or modify documents',
'search:admin': 'Administrative access to search engine',
'documents:read': 'Read document content',
'documents:write': 'Modify document content',
'documents:delete': 'Delete documents',
'analytics:read': 'View search analytics',
'config:read': 'View configuration',
'config:write': 'Modify configuration'
}
async def _load_roles(self):
"""Load role definitions"""
self.role_permissions = {
'viewer': ['search:read', 'documents:read'],
'editor': ['search:read', 'search:write', 'documents:read', 'documents:write'],
'admin': ['search:read', 'search:write', 'search:admin', 'documents:read',
'documents:write', 'documents:delete', 'analytics:read',
'config:read', 'config:write'],
'analyst': ['search:read', 'documents:read', 'analytics:read']
}
async def check_permission(self, user_id: str, permission: str, resource: Optional[str] = None) -> bool:
"""Check if user has specific permission"""
user_permissions = await self._get_user_permissions(user_id)
# Check direct permission
if permission in user_permissions:
return True
# Check resource-specific permissions
if resource:
resource_permission = f"{permission}:{resource}"
if resource_permission in user_permissions:
return True
return False
async def _get_user_permissions(self, user_id: str) -> Set[str]:
"""Get all permissions for a user"""
permissions = set()
# Get user roles
user_roles = self.user_roles.get(user_id, [])
# Collect permissions from roles
for role in user_roles:
role_perms = self.role_permissions.get(role, [])
permissions.update(role_perms)
return permissions
async def assign_role(self, user_id: str, role: str):
"""Assign role to user"""
if role not in self.role_permissions:
raise ValueError(f"Unknown role: {role}")
if user_id not in self.user_roles:
self.user_roles[user_id] = []
if role not in self.user_roles[user_id]:
self.user_roles[user_id].append(role)
self.logger.info(f"Assigned role {role} to user {user_id}")
async def revoke_role(self, user_id: str, role: str):
"""Revoke role from user"""
if user_id in self.user_roles and role in self.user_roles[user_id]:
self.user_roles[user_id].remove(role)
self.logger.info(f"Revoked role {role} from user {user_id}")
async def log_access_attempt(self, user_id: str, resource: str, action: str,
success: bool, ip_address: str):
"""Log access attempt for audit purposes"""
access_log = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'resource': resource,
'action': action,
'success': success,
'ip_address': ip_address
}
# Store in audit log (implementation depends on storage backend)
await self._store_audit_log(access_log)
async def _store_audit_log(self, log_entry: Dict):
"""Store audit log entry"""
# Implementation would store to database, file, or external service
self.logger.info(f"Audit log: {log_entry}")
class DataAnonymizer:
"""Handles data anonymization and privacy protection"""
def __init__(self, config: SecurityConfig):
self.config = config
self.pii_patterns = self._initialize_pii_patterns()
self.anonymization_cache = {}
def _initialize_pii_patterns(self) -> Dict[str, Pattern]:
"""Initialize patterns for detecting PII"""
return {
'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
'phone': re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'),
'ssn': re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
'credit_card': re.compile(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b'),
'ip_address': re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'),
'name': re.compile(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b') # Simple name pattern
}
async def anonymize_text(self, text: str, preserve_structure: bool = True) -> str:
"""Anonymize PII in text while preserving structure"""
if not self.config.anonymization_enabled:
return text
anonymized_text = text
for pii_type, pattern in self.pii_patterns.items():
matches = pattern.findall(text)
for match in matches:
# Generate consistent anonymized replacement
replacement = await self._get_anonymized_replacement(match, pii_type, preserve_structure)
anonymized_text = anonymized_text.replace(match, replacement)
return anonymized_text
async def _get_anonymized_replacement(self, original: str, pii_type: str,
preserve_structure: bool) -> str:
"""Get anonymized replacement for PII"""
# Use cache for consistent anonymization
cache_key = f"{pii_type}:{original}"
if cache_key in self.anonymization_cache:
return self.anonymization_cache[cache_key]
if pii_type == 'email':
if preserve_structure:
domain = original.split('@')[1]
replacement = f"user{len(original)}@{domain}"
else:
replacement = "[EMAIL]"
elif pii_type == 'phone':
if preserve_structure:
replacement = "XXX-XXX-" + original[-4:]
else:
replacement = "[PHONE]"
elif pii_type == 'ssn':
replacement = "XXX-XX-" + original[-4:] if preserve_structure else "[SSN]"
elif pii_type == 'credit_card':
replacement = "XXXX-XXXX-XXXX-" + original[-4:] if preserve_structure else "[CARD]"
elif pii_type == 'ip_address':
replacement = "XXX.XXX.XXX." + original.split('.')[-1] if preserve_structure else "[IP]"
elif pii_type == 'name':
replacement = "[NAME]"
else:
replacement = "[REDACTED]"
self.anonymization_cache[cache_key] = replacement
return replacement
async def detect_pii(self, text: str) -> Dict[str, List[str]]:
"""Detect PII in text and return findings"""
findings = {}
for pii_type, pattern in self.pii_patterns.items():
matches = pattern.findall(text)
if matches:
findings[pii_type] = matches
return findings
class SecurityMonitor:
"""Monitors security events and detects threats"""
def __init__(self, config: SecurityConfig):
self.config = config
self.failed_attempts = {}
self.blocked_ips = set()
self.suspicious_patterns = {}
self.alert_handlers = []
self.monitoring_task = None
async def initialize(self):
"""Initialize security monitoring"""
self.monitoring_task = asyncio.create_task(self._monitor_security_events())
self.logger = logging.getLogger(__name__)
self.logger.info("Security monitor initialized")
async def record_login_attempt(self, user_id: str, ip_address: str, success: bool):
"""Record login attempt and check for suspicious activity"""
timestamp = datetime.utcnow()
if not success:
# Track failed attempts
key = f"{user_id}:{ip_address}"
if key not in self.failed_attempts:
self.failed_attempts[key] = []
self.failed_attempts[key].append(timestamp)
# Clean old attempts
cutoff = timestamp - timedelta(minutes=15)
self.failed_attempts[key] = [
attempt for attempt in self.failed_attempts[key]
if attempt > cutoff
]
# Check if threshold exceeded
if len(self.failed_attempts[key]) >= self.config.max_login_attempts:
await self._handle_brute_force_attempt(user_id, ip_address)
else:
# Clear failed attempts on successful login
key = f"{user_id}:{ip_address}"
self.failed_attempts.pop(key, None)
async def _handle_brute_force_attempt(self, user_id: str, ip_address: str):
"""Handle detected brute force attempt"""
self.blocked_ips.add(ip_address)
alert = {
'type': 'brute_force_attempt',
'user_id': user_id,
'ip_address': ip_address,
'timestamp': datetime.utcnow().isoformat(),
'action': 'ip_blocked'
}
await self._send_security_alert(alert)
# Schedule IP unblock
asyncio.create_task(self._schedule_ip_unblock(ip_address))
async def _schedule_ip_unblock(self, ip_address: str):
"""Schedule IP address to be unblocked"""
await asyncio.sleep(self.config.lockout_duration)
self.blocked_ips.discard(ip_address)
self.logger.info(f"IP address {ip_address} unblocked after lockout period")
async def check_ip_blocked(self, ip_address: str) -> bool:
"""Check if IP address is blocked"""
return ip_address in self.blocked_ips
async def check_ip_whitelist(self, ip_address: str) -> bool:
"""Check if IP address is in whitelist"""
if not self.config.ip_whitelist:
return True # No whitelist configured
try:
ip = ipaddress.ip_address(ip_address)
for allowed_ip in self.config.ip_whitelist:
if '/' in allowed_ip:
# CIDR notation
if ip in ipaddress.ip_network(allowed_ip):
return True
else:
# Exact IP
if ip == ipaddress.ip_address(allowed_ip):
return True
return False
except ValueError:
return False
async def detect_anomalous_query(self, query: str, user_id: str) -> bool:
"""Detect potentially malicious queries"""
# Check for SQL injection patterns
sql_patterns = [
r"union\s+select", r"drop\s+table", r"delete\s+from",
r"insert\s+into", r"update\s+set", r"--", r"/\*", r"\*/"
]
query_lower = query.lower()
for pattern in sql_patterns:
if re.search(pattern, query_lower):
await self._send_security_alert({
'type': 'suspicious_query',
'user_id': user_id,
'query': query,
'pattern': pattern,
'timestamp': datetime.utcnow().isoformat()
})
return True
# Check for excessively long queries
if len(query) > 1000:
await self._send_security_alert({
'type': 'long_query',
'user_id': user_id,
'query_length': len(query),
'timestamp': datetime.utcnow().isoformat()
})
return True
return False
async def _monitor_security_events(self):
"""Continuously monitor for security events"""
while True:
try:
# Clean up old failed attempts
cutoff = datetime.utcnow() - timedelta(hours=1)
for key in list(self.failed_attempts.keys()):
self.failed_attempts[key] = [
attempt for attempt in self.failed_attempts[key]
if attempt > cutoff
]
if not self.failed_attempts[key]:
del self.failed_attempts[key]
await asyncio.sleep(300) # Check every 5 minutes
except Exception as e:
self.logger.error(f"Security monitoring error: {e}")
await asyncio.sleep(60)
def register_alert_handler(self, handler: Callable):
"""Register handler for security alerts"""
self.alert_handlers.append(handler)
async def _send_security_alert(self, alert: Dict):
"""Send security alert to registered handlers"""
self.logger.warning(f"Security alert: {alert}")
for handler in self.alert_handlers:
try:
if asyncio.iscoroutinefunction(handler):
await handler(alert)
else:
handler(alert)
except Exception as e:
self.logger.error(f"Alert handler error: {e}")
class SecureSearchEngine(AgenticSearchEngine):
"""Search engine with integrated security features"""
def __init__(self, config: SearchConfig, security_config: SecurityConfig):
super().__init__(config)
self.security_config = security_config
self.encryption_manager = None
self.access_control = None
self.data_anonymizer = None
self.security_monitor = None
async def initialize(self):
"""Initialize with security features"""
await super().initialize()
# Initialize security components
self.encryption_manager = EncryptionManager(self.security_config)
await self.encryption_manager.initialize()
self.access_control = AccessControlManager(self.security_config)
await self.access_control.initialize()
self.data_anonymizer = DataAnonymizer(self.security_config)
self.security_monitor = SecurityMonitor(self.security_config)
await self.security_monitor.initialize()
self.logger.info("Secure search engine initialized")
async def secure_search(self, query: str, user_id: str, ip_address: str,
context: Optional[Dict] = None) -> SearchResponse:
"""Perform secure search with access control and monitoring"""
# Check IP whitelist and blocks
if not await self.security_monitor.check_ip_whitelist(ip_address):
raise PermissionError("IP address not in whitelist")
if await self.security_monitor.check_ip_blocked(ip_address):
raise PermissionError("IP address is blocked")
# Check permissions
if not await self.access_control.check_permission(user_id, 'search:read'):
await self.access_control.log_access_attempt(
user_id, 'search', 'read', False, ip_address
)
raise PermissionError("Insufficient permissions for search")
# Check for malicious queries
if await self.security_monitor.detect_anomalous_query(query, user_id):
await self.access_control.log_access_attempt(
user_id, 'search', 'query', False, ip_address
)
raise ValueError("Query blocked by security policy")
# Anonymize query if needed
anonymized_query = await self.data_anonymizer.anonymize_text(query)
# Perform search
try:
response = await self.search(anonymized_query, context)
# Anonymize response content
if self.security_config.anonymization_enabled:
response.answer = await self.data_anonymizer.anonymize_text(response.answer)
for source in response.sources:
source.content = await self.data_anonymizer.anonymize_text(source.content)
# Log successful access
await self.access_control.log_access_attempt(
user_id, 'search', 'read', True, ip_address
)
return response
except Exception as e:
await self.access_control.log_access_attempt(
user_id, 'search', 'read', False, ip_address
)
raise
async def secure_add_document(self, doc_id: str, content: str, source: str,
user_id: str, ip_address: str,
security_level: SecurityLevel = SecurityLevel.INTERNAL,
metadata: Dict = None):
"""Securely add document with encryption and access control"""
# Check permissions
if not await self.access_control.check_permission(user_id, 'documents:write'):
await self.access_control.log_access_attempt(
user_id, 'documents', 'write', False, ip_address
)
raise PermissionError("Insufficient permissions to add documents")
# Encrypt content based on security level
encrypted_content = await self.encryption_manager.encrypt_data(content, security_level)
# Add security metadata
if metadata is None:
metadata = {}
metadata.update({
'security_level': security_level.value,
'encrypted': True,
'added_by': user_id,
'added_from_ip': ip_address,
'added_at': datetime.utcnow().isoformat()
})
# Add document
await self.document_store.add_document(doc_id, encrypted_content, source, metadata)
# Log access
await self.access_control.log_access_attempt(
user_id, 'documents', 'write', True, ip_address
)
async def get_security_report(self, user_id: str) -> Dict[str, Any]:
"""Get security status report"""
if not await self.access_control.check_permission(user_id, 'analytics:read'):
raise PermissionError("Insufficient permissions for security report")
return {
'blocked_ips': len(self.security_monitor.blocked_ips),
'failed_attempts': len(self.security_monitor.failed_attempts),
'encryption_enabled': self.security_config.encryption_method != EncryptionMethod.AES_256,
'anonymization_enabled': self.security_config.anonymization_enabled,
'audit_logging': self.security_config.audit_logging,
'last_updated': datetime.utcnow().isoformat()
}
This comprehensive security and privacy implementation provides enterprise-grade protection for the search engine. The multi-layered security approach includes encryption, access control, monitoring, and privacy protection features that can be configured based on specific requirements and compliance needs.
The system provides transparency through audit logging and security reporting while maintaining performance through efficient caching and monitoring mechanisms. The modular design allows for easy extension with additional security features as requirements evolve.
Testing and Deployment Strategies
Comprehensive testing and deployment strategies are essential for ensuring the reliability, performance, and security of the agentic search engine in production environments. The testing framework must cover unit testing, integration testing, performance testing, and security testing across all components of the system.
The deployment strategy includes containerization, orchestration, monitoring, and rollback capabilities to ensure smooth production deployments with minimal downtime. The system supports multiple deployment patterns including blue-green deployments, canary releases, and rolling updates.
Here is the comprehensive testing and deployment implementation:
import pytest
import asyncio
import docker
import kubernetes
from typing import Generator, AsyncGenerator
import tempfile
import shutil
import yaml
import subprocess
from pathlib import Path
import time
import requests
from dataclasses import asdict
class TestEnvironment(Enum):
UNIT = "unit"
INTEGRATION = "integration"
PERFORMANCE = "performance"
SECURITY = "security"
END_TO_END = "end_to_end"
class DeploymentStrategy(Enum):
BLUE_GREEN = "blue_green"
CANARY = "canary"
ROLLING_UPDATE = "rolling_update"
RECREATE = "recreate"
@dataclass
class TestConfig:
environment: TestEnvironment
test_data_path: str = "./test_data"
mock_llm: bool = True
test_timeout: int = 300
parallel_tests: bool = True
coverage_threshold: float = 0.8
performance_baseline: Dict[str, float] = field(default_factory=lambda: {
'search_latency_ms': 500,
'throughput_qps': 100,
'memory_usage_mb': 1024
})
@dataclass
class DeploymentConfig:
strategy: DeploymentStrategy = DeploymentStrategy.ROLLING_UPDATE
container_registry: str = "localhost:5000"
namespace: str = "search-engine"
replicas: int = 3
resource_limits: Dict[str, str] = field(default_factory=lambda: {
'cpu': '2',
'memory': '4Gi'
})
health_check_path: str = "/health"
readiness_timeout: int = 300
rollback_on_failure: bool = True
class SearchEngineTestSuite:
"""Comprehensive test suite for the search engine"""
def __init__(self, test_config: TestConfig):
self.config = test_config
self.test_data_path = Path(test_config.test_data_path)
self.temp_dir = None
self.test_search_engine = None
self.mock_llm_responses = {}
async def setup_test_environment(self):
"""Setup test environment with necessary fixtures"""
# Create temporary directory for test data
self.temp_dir = tempfile.mkdtemp()
# Setup test data
await self._setup_test_data()
# Initialize test search engine
await self._initialize_test_search_engine()
print(f"Test environment setup complete in {self.temp_dir}")
async def _setup_test_data(self):
"""Setup test documents and data"""
test_documents = [
{
'id': 'doc1',
'content': 'This is a test document about machine learning algorithms and their applications in data science.',
'source': 'test_source_1',
'metadata': {'category': 'technology', 'author': 'test_author'}
},
{
'id': 'doc2',
'content': 'Python programming language is widely used for web development, data analysis, and artificial intelligence.',
'source': 'test_source_2',
'metadata': {'category': 'programming', 'author': 'test_author'}
},
{
'id': 'doc3',
'content': 'Database management systems are crucial for storing and retrieving large amounts of structured data efficiently.',
'source': 'test_source_3',
'metadata': {'category': 'database', 'author': 'test_author'}
}
]
# Save test documents
for doc in test_documents:
doc_path = Path(self.temp_dir) / f"{doc['id']}.json"
with open(doc_path, 'w') as f:
json.dump(doc, f)
async def _initialize_test_search_engine(self):
"""Initialize search engine for testing"""
# Create test configuration
test_search_config = SearchConfig(
deployment_mode=DeploymentMode.STANDALONE,
llm_provider=LLMProvider.LOCAL_OLLAMA if not self.config.mock_llm else LLMProvider.OPENAI,
llm_model="test_model",
max_results=10,
search_timeout=30.0,
enable_caching=False, # Disable caching for consistent tests
embedding_model="sentence-transformers/all-MiniLM-L6-v2",
search_domains=[self.temp_dir]
)
# Initialize search engine
if self.config.mock_llm:
self.test_search_engine = TestableSearchEngine(test_search_config, self.mock_llm_responses)
else:
self.test_search_engine = AgenticSearchEngine(test_search_config)
await self.test_search_engine.initialize()
async def run_unit_tests(self) -> Dict[str, Any]:
"""Run unit tests for individual components"""
test_results = {
'passed': 0,
'failed': 0,
'errors': [],
'coverage': 0.0
}
# Test document processing
try:
await self._test_document_processing()
test_results['passed'] += 1
except Exception as e:
test_results['failed'] += 1
test_results['errors'].append(f"Document processing test failed: {e}")
# Test query processing
try:
await self._test_query_processing()
test_results['passed'] += 1
except Exception as e:
test_results['failed'] += 1
test_results['errors'].append(f"Query processing test failed: {e}")
# Test search functionality
try:
await self._test_search_functionality()
test_results['passed'] += 1
except Exception as e:
test_results['failed'] += 1
test_results['errors'].append(f"Search functionality test failed: {e}")
# Test caching
try:
await self._test_caching_system()
test_results['passed'] += 1
except Exception as e:
test_results['failed'] += 1
test_results['errors'].append(f"Caching system test failed: {e}")
return test_results
async def _test_document_processing(self):
"""Test document processing functionality"""
# Test document loading
doc_content = "This is a test document for processing."
processed_doc = await self.test_search_engine.document_processor.process_document(
"test_doc", doc_content, "test_source"
)
assert processed_doc.doc_id == "test_doc"
assert processed_doc.content == doc_content
assert len(processed_doc.chunks) > 0
assert processed_doc.processing_time > 0
print("Document processing test passed")
async def _test_query_processing(self):
"""Test query processing and analysis"""
test_query = "What is machine learning?"
query_analysis = await self.test_search_engine.query_processor.analyze_query(test_query)
assert query_analysis.original_query == test_query
assert query_analysis.query_type is not None
assert len(query_analysis.key_concepts) > 0
assert query_analysis.confidence > 0
print("Query processing test passed")
async def _test_search_functionality(self):
"""Test core search functionality"""
test_query = "machine learning algorithms"
response = await self.test_search_engine.search(test_query)
assert response.query == test_query
assert len(response.answer) > 0
assert len(response.sources) > 0
assert response.processing_time > 0
assert response.model_used is not None
print("Search functionality test passed")
async def _test_caching_system(self):
"""Test caching system functionality"""
# Enable caching for this test
self.test_search_engine.config.enable_caching = True
test_query = "python programming"
# First search (should miss cache)
start_time = time.time()
response1 = await self.test_search_engine.search(test_query)
first_search_time = time.time() - start_time
# Second search (should hit cache)
start_time = time.time()
response2 = await self.test_search_engine.search(test_query)
second_search_time = time.time() - start_time
assert response1.answer == response2.answer
assert second_search_time < first_search_time # Cache should be faster
print("Caching system test passed")
async def run_integration_tests(self) -> Dict[str, Any]:
"""Run integration tests for component interactions"""
test_results = {
'passed': 0,
'failed': 0,
'errors': []
}
# Test end-to-end search workflow
try:
await self._test_end_to_end_search()
test_results['passed'] += 1
except Exception as e:
test_results['failed'] += 1
test_results['errors'].append(f"End-to-end search test failed: {e}")
# Test document addition and search
try:
await self._test_document_addition_and_search()
test_results['passed'] += 1
except Exception as e:
test_results['failed'] += 1
test_results['errors'].append(f"Document addition test failed: {e}")
# Test API integration
try:
await self._test_api_integration()
test_results['passed'] += 1
except Exception as e:
test_results['failed'] += 1
test_results['errors'].append(f"API integration test failed: {e}")
return test_results
async def _test_end_to_end_search(self):
"""Test complete search workflow"""
queries = [
"What is machine learning?",
"How to use Python for data analysis?",
"Database management best practices"
]
for query in queries:
response = await self.test_search_engine.search(query)
assert len(response.answer) > 10 # Meaningful response
assert len(response.sources) > 0 # Found relevant sources
assert response.confidence_score > 0 # Has confidence
print("End-to-end search test passed")
async def _test_document_addition_and_search(self):
"""Test adding documents and searching them"""
new_doc = {
'id': 'new_test_doc',
'content': 'This is a newly added document about quantum computing and its applications.',
'source': 'test_addition',
'metadata': {'category': 'quantum'}
}
# Add document
await self.test_search_engine.document_store.add_document(
new_doc['id'], new_doc['content'], new_doc['source'], new_doc['metadata']
)
# Search for the new document
response = await self.test_search_engine.search("quantum computing")
# Verify the new document is found
found_new_doc = any('quantum computing' in source.content.lower() for source in response.sources)
assert found_new_doc, "Newly added document not found in search results"
print("Document addition and search test passed")
async def _test_api_integration(self):
"""Test API integration if available"""
# This would test REST API endpoints if the search engine is running as a service
# For now, we'll test the SDK integration
sdk_config = IntegrationConfig(
mode=IntegrationMode.SDK_EMBEDDED,
auth_method=AuthenticationMethod.API_KEY
)
sdk = SearchEngineSDK(self.test_search_engine.config, sdk_config)
await sdk.initialize()
# Create test context
context = await sdk.create_application_context("test_app", "test_user", ["read"])
# Perform search through SDK
response = await sdk.search("machine learning", context)
assert len(response.answer) > 0
assert len(response.sources) > 0
print("API integration test passed")
async def run_performance_tests(self) -> Dict[str, Any]:
"""Run performance tests"""
performance_results = {
'search_latency_ms': 0,
'throughput_qps': 0,
'memory_usage_mb': 0,
'passed_baseline': False
}
# Test search latency
latencies = []
for i in range(10):
start_time = time.time()
await self.test_search_engine.search(f"test query {i}")
latency = (time.time() - start_time) * 1000
latencies.append(latency)
performance_results['search_latency_ms'] = sum(latencies) / len(latencies)
# Test throughput
start_time = time.time()
tasks = []
for i in range(50):
task = asyncio.create_task(self.test_search_engine.search(f"throughput test {i}"))
tasks.append(task)
await asyncio.gather(*tasks)
duration = time.time() - start_time
performance_results['throughput_qps'] = 50 / duration
# Check memory usage
import psutil
process = psutil.Process()
performance_results['memory_usage_mb'] = process.memory_info().rss / 1024 / 1024
# Check against baseline
baseline = self.config.performance_baseline
performance_results['passed_baseline'] = (
performance_results['search_latency_ms'] <= baseline['search_latency_ms'] and
performance_results['throughput_qps'] >= baseline['throughput_qps'] and
performance_results['memory_usage_mb'] <= baseline['memory_usage_mb']
)
return performance_results
async def run_security_tests(self) -> Dict[str, Any]:
"""Run security tests"""
security_results = {
'passed': 0,
'failed': 0,
'errors': []
}
# Test SQL injection protection
try:
malicious_query = "'; DROP TABLE documents; --"
response = await self.test_search_engine.search(malicious_query)
# Should not crash and should return safe response
assert len(response.answer) > 0
security_results['passed'] += 1
except Exception as e:
security_results['failed'] += 1
security_results['errors'].append(f"SQL injection test failed: {e}")
# Test input sanitization
try:
xss_query = "<script>alert('xss')</script>"
response = await self.test_search_engine.search(xss_query)
# Should sanitize the input
assert "<script>" not in response.answer
security_results['passed'] += 1
except Exception as e:
security_results['failed'] += 1
security_results['errors'].append(f"XSS protection test failed: {e}")
return security_results
async def cleanup_test_environment(self):
"""Cleanup test environment"""
if self.temp_dir and Path(self.temp_dir).exists():
shutil.rmtree(self.temp_dir)
if self.test_search_engine:
# Cleanup search engine resources
pass
print("Test environment cleaned up")
class TestableSearchEngine(AgenticSearchEngine):
"""Search engine with mocked LLM for testing"""
def __init__(self, config: SearchConfig, mock_responses: Dict[str, str]):
super().__init__(config)
self.mock_responses = mock_responses
async def _perform_search(self, query: str, context: Optional[Dict]) -> SearchResponse:
"""Override search with mock responses"""
# Use mock response if available
mock_answer = self.mock_responses.get(query, f"Mock response for: {query}")
# Create mock sources
mock_sources = [
SearchResult(
content=f"Mock content related to {query}",
source="mock_source",
relevance_score=0.9,
metadata={'type': 'mock'}
)
]
return SearchResponse(
query=query,
answer=mock_answer,
sources=mock_sources,
processing_time=0.1,
model_used="mock_model",
confidence_score=0.8
)
class ContainerizedDeployment:
"""Handles containerized deployment of the search engine"""
def __init__(self, deployment_config: DeploymentConfig):
self.config = deployment_config
self.docker_client = None
self.k8s_client = None
async def initialize(self):
"""Initialize deployment tools"""
self.docker_client = docker.from_env()
# Initialize Kubernetes client if available
try:
kubernetes.config.load_incluster_config()
self.k8s_client = kubernetes.client.ApiClient()
except:
try:
kubernetes.config.load_kube_config()
self.k8s_client = kubernetes.client.ApiClient()
except:
print("Kubernetes not available, using Docker only")
async def build_container_image(self, source_path: str, tag: str) -> str:
"""Build container image for the search engine"""
dockerfile_content = self._generate_dockerfile()
# Write Dockerfile
dockerfile_path = Path(source_path) / "Dockerfile"
with open(dockerfile_path, 'w') as f:
f.write(dockerfile_content)
# Build image
image, logs = self.docker_client.images.build(
path=source_path,
tag=tag,
rm=True
)
print(f"Built container image: {tag}")
return image.id
def _generate_dockerfile(self) -> str:
"""Generate Dockerfile for the search engine"""
return """
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \\
gcc \\
g++ \\
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd -m -u 1000 searchuser && chown -R searchuser:searchuser /app
USER searchuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \\
CMD curl -f http://localhost:8000/health || exit 1
# Start application
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
"""
async def deploy_to_kubernetes(self, image_tag: str) -> Dict[str, Any]:
"""Deploy to Kubernetes cluster"""
if not self.k8s_client:
raise RuntimeError("Kubernetes client not available")
# Generate Kubernetes manifests
manifests = self._generate_k8s_manifests(image_tag)
deployment_result = {
'namespace': self.config.namespace,
'replicas': self.config.replicas,
'image': image_tag,
'status': 'deployed'
}
try:
# Create namespace if it doesn't exist
await self._ensure_namespace()
# Apply manifests
for manifest in manifests:
await self._apply_k8s_manifest(manifest)
# Wait for deployment to be ready
await self._wait_for_deployment_ready()
deployment_result['status'] = 'ready'
except Exception as e:
deployment_result['status'] = 'failed'
deployment_result['error'] = str(e)
if self.config.rollback_on_failure:
await self._rollback_deployment()
return deployment_result
def _generate_k8s_manifests(self, image_tag: str) -> List[Dict]:
"""Generate Kubernetes deployment manifests"""
manifests = []
# Deployment manifest
deployment = {
'apiVersion': 'apps/v1',
'kind': 'Deployment',
'metadata': {
'name': 'search-engine',
'namespace': self.config.namespace
},
'spec': {
'replicas': self.config.replicas,
'selector': {
'matchLabels': {
'app': 'search-engine'
}
},
'template': {
'metadata': {
'labels': {
'app': 'search-engine'
}
},
'spec': {
'containers': [{
'name': 'search-engine',
'image': image_tag,
'ports': [{
'containerPort': 8000
}],
'resources': {
'limits': self.config.resource_limits,
'requests': {
'cpu': '500m',
'memory': '1Gi'
}
},
'livenessProbe': {
'httpGet': {
'path': self.config.health_check_path,
'port': 8000
},
'initialDelaySeconds': 30,
'periodSeconds': 10
},
'readinessProbe': {
'httpGet': {
'path': self.config.health_check_path,
'port': 8000
},
'initialDelaySeconds': 5,
'periodSeconds': 5
}
}]
}
}
}
}
manifests.append(deployment)
# Service manifest
service = {
'apiVersion': 'v1',
'kind': 'Service',
'metadata': {
'name': 'search-engine-service',
'namespace': self.config.namespace
},
'spec': {
'selector': {
'app': 'search-engine'
},
'ports': [{
'port': 80,
'targetPort': 8000
}],
'type': 'ClusterIP'
}
}
manifests.append(service)
return manifests
async def _ensure_namespace(self):
"""Ensure the namespace exists"""
v1 = kubernetes.client.CoreV1Api(self.k8s_client)
try:
v1.read_namespace(name=self.config.namespace)
except kubernetes.client.exceptions.ApiException as e:
if e.status == 404:
# Create namespace
namespace = kubernetes.client.V1Namespace(
metadata=kubernetes.client.V1ObjectMeta(name=self.config.namespace)
)
v1.create_namespace(body=namespace)
print(f"Created namespace: {self.config.namespace}")
async def _apply_k8s_manifest(self, manifest: Dict):
"""Apply Kubernetes manifest"""
kind = manifest['kind']
if kind == 'Deployment':
apps_v1 = kubernetes.client.AppsV1Api(self.k8s_client)
apps_v1.create_namespaced_deployment(
namespace=self.config.namespace,
body=manifest
)
elif kind == 'Service':
v1 = kubernetes.client.CoreV1Api(self.k8s_client)
v1.create_namespaced_service(
namespace=self.config.namespace,
body=manifest
)
async def _wait_for_deployment_ready(self):
"""Wait for deployment to be ready"""
apps_v1 = kubernetes.client.AppsV1Api(self.k8s_client)
timeout = self.config.readiness_timeout
start_time = time.time()
while time.time() - start_time < timeout:
deployment = apps_v1.read_namespaced_deployment(
name='search-engine',
namespace=self.config.namespace
)
if (deployment.status.ready_replicas and
deployment.status.ready_replicas == self.config.replicas):
print("Deployment is ready")
return
await asyncio.sleep(5)
raise TimeoutError("Deployment did not become ready within timeout")
async def _rollback_deployment(self):
"""Rollback deployment on failure"""
print("Rolling back deployment...")
# Implementation would rollback to previous version
# This is a simplified version
pass
async def run_comprehensive_tests():
"""Run all test suites"""
test_config = TestConfig(
environment=TestEnvironment.INTEGRATION,
mock_llm=True,
test_timeout=300
)
test_suite = SearchEngineTestSuite(test_config)
try:
await test_suite.setup_test_environment()
print("Running unit tests...")
unit_results = await test_suite.run_unit_tests()
print(f"Unit tests: {unit_results['passed']} passed, {unit_results['failed']} failed")
print("Running integration tests...")
integration_results = await test_suite.run_integration_tests()
print(f"Integration tests: {integration_results['passed']} passed, {integration_results['failed']} failed")
print("Running performance tests...")
performance_results = await test_suite.run_performance_tests()
print(f"Performance: Latency {performance_results['search_latency_ms']:.2f}ms, "
f"Throughput {performance_results['throughput_qps']:.2f} QPS")
print("Running security tests...")
security_results = await test_suite.run_security_tests()
print(f"Security tests: {security_results['passed']} passed, {security_results['failed']} failed")
# Overall results
total_passed = unit_results['passed'] + integration_results['passed'] + security_results['passed']
total_failed = unit_results['failed'] + integration_results['failed'] + security_results['failed']
print(f"\nOverall test results: {total_passed} passed, {total_failed} failed")
print(f"Performance baseline met: {performance_results['passed_baseline']}")
return {
'unit': unit_results,
'integration': integration_results,
'performance': performance_results,
'security': security_results,
'overall_success': total_failed == 0 and performance_results['passed_baseline']
}
finally:
await test_suite.cleanup_test_environment()
async def deploy_search_engine():
"""Deploy search engine to production"""
deployment_config = DeploymentConfig(
strategy=DeploymentStrategy.ROLLING_UPDATE,
namespace="search-engine-prod",
replicas=3
)
deployment = ContainerizedDeployment(deployment_config)
await deployment.initialize()
# Build container image
image_tag = f"{deployment_config.container_registry}/search-engine:latest"
await deployment.build_container_image(".", image_tag)
# Deploy to Kubernetes
result = await deployment.deploy_to_kubernetes(image_tag)
print(f"Deployment result: {result}")
return result
This comprehensive testing and deployment framework provides all the necessary tools for ensuring the reliability and successful deployment of the agentic search engine. The testing suite covers all aspects of functionality, performance, and security, while the deployment system supports modern containerized deployment patterns with proper monitoring and rollback capabilities.
The modular design allows for easy extension with additional test cases and deployment strategies as requirements evolve. The framework provides clear feedback on test results and deployment status, enabling rapid iteration and reliable production deployments.
Conclusion and Future Enhancements
The comprehensive agentic AI search engine implementation presented in this article provides a robust foundation for building intelligent search systems that can operate both as standalone applications and embedded components within larger software ecosystems. The modular architecture ensures flexibility and extensibility while maintaining high performance and security standards.
The system successfully combines traditional information retrieval techniques with modern AI capabilities, creating a search experience that goes beyond simple keyword matching to provide intelligent, contextual responses. The multi-layered approach to caching, security, and performance optimization ensures that the system can scale effectively in production environments.
Key achievements of this implementation include the flexible configuration management system that supports both deployment modes, the sophisticated query processing pipeline that understands user intent and adapts search strategies accordingly, and the comprehensive security framework that protects sensitive information while maintaining usability.
The integration framework provides multiple pathways for embedding the search engine into existing applications, from simple REST API integration to deep SDK-based embedding with custom authentication and authorization. The performance optimization system ensures responsive search experiences even under high load through intelligent caching and resource management.
Future enhancements could include advanced features such as multi-modal search capabilities that handle images, audio, and video content alongside text. The integration of knowledge graphs could provide richer semantic understanding and enable more sophisticated reasoning capabilities. Real-time learning and adaptation mechanisms could allow the system to improve its responses based on user feedback and usage patterns.
Additional areas for enhancement include support for federated search across multiple data sources, advanced analytics and insights generation, and integration with emerging AI technologies such as retrieval-augmented generation models and large multimodal models. The system architecture is designed to accommodate these enhancements without requiring fundamental changes to the core implementation.
The testing and deployment framework ensures that the system can be reliably deployed and maintained in production environments, with comprehensive monitoring and automated rollback capabilities. This foundation provides the confidence needed to deploy intelligent search capabilities in mission-critical applications.
This implementation demonstrates that it is possible to build sophisticated agentic AI search systems that provide the intelligence and flexibility of modern AI while maintaining the reliability and performance requirements of production software systems. The modular design and comprehensive feature set make this a suitable foundation for a wide range of search applications, from enterprise knowledge management to consumer-facing intelligent assistants.
No comments:
Post a Comment