Introduction: The Power of Hybrid Knowledge Representation
Imagine you are exploring a vast library where every book is not only cataloged by its content but also connected to related concepts, authors, and themes through an intricate web of relationships. Now imagine that you can ask questions in natural language and receive answers that draw upon both the structured relationships between concepts and the semantic meaning of the text itself. This is precisely what becomes possible when we combine knowledge graphs with vector databases in Large Language Model applications. This is what I called „RAG^2“ (RAG Squared) in previous blog articles.
Traditional document retrieval systems face a fundamental limitation. They either excel at understanding semantic similarity through embeddings or at representing explicit relationships through graphs, but rarely both. A pure vector database approach might find documents with similar content but miss important structural relationships. Conversely, a pure knowledge graph approach captures relationships beautifully but struggles with nuanced semantic queries. The hybrid approach we will explore in this article transcends these limitations by leveraging the strengths of both paradigms.
The system we will build allows users to upload documents, automatically extracts entities and relationships to populate a knowledge graph, creates semantic embeddings for retrieval, and establishes bidirectional links between graph nodes and vector embeddings. Users can then navigate through the knowledge graph to discover related concepts or ask natural language questions that leverage both structural and semantic information.
Understanding the Architecture: A Symphony of Components
Before diving into implementation, we need to understand how the different components work together. Our system consists of five primary layers that interact in sophisticated ways.
The first layer is the document ingestion pipeline. This component reads various document types from a source directory, extracts text content, and prepares it for processing. It handles different file formats including plain text, code files, PDFs, and structured documents.
The second layer is the knowledge extraction engine. This sophisticated component analyzes documents to identify entities such as people, organizations, concepts, and technical terms. It also detects relationships between these entities, creating the foundation for our knowledge graph. For code documents, it might extract function names, class definitions, and their dependencies. For text documents, it identifies key concepts and their associations.
The third layer is the knowledge graph database. We will use Neo4j, a powerful graph database that stores nodes representing entities and edges representing relationships. Each node contains properties such as entity type, name, and crucially, a reference to related vector embeddings. The graph structure enables traversal queries that reveal how concepts relate to each other.
The fourth layer is the vector database. We will use a system like ChromaDB or Pinecone to store embeddings generated from document chunks. Each embedding captures the semantic meaning of a text segment and includes metadata pointing back to the corresponding knowledge graph nodes. This bidirectional linking is the secret sauce that makes our system powerful.
The fifth layer is the LLM-powered query interface. This chatbot component accepts natural language questions, determines whether to query the knowledge graph, the vector database, or both, and synthesizes responses using a Large Language Model. It can follow graph relationships to provide context and retrieve semantically similar content to answer specific questions.
The Architecture diagrams are provided in the addendum of this article.
Knowledge Graphs: Representing the Structure of Knowledge
A knowledge graph is fundamentally a network of entities and their relationships. Unlike traditional databases that store information in tables, knowledge graphs represent information as nodes connected by edges. Each node represents an entity, and each edge represents a relationship between entities.
Consider a simple example from a software documentation context. We might have nodes representing a Python class named "DataProcessor", a method named "transform_data", and a concept named "data_validation". The edges might indicate that "DataProcessor" contains "transform_data" and that "transform_data" implements "data_validation". This graph structure makes it trivial to answer questions like "What methods implement data validation?" or "What classes contain the transform_data method?"
The power of knowledge graphs becomes apparent when we consider traversal operations. If a user wants to understand all components related to data processing, we can start at the "DataProcessor" node and traverse outward, following edges to discover related methods, dependent classes, and associated concepts. This traversal capability provides context that pure keyword search cannot match.
In our implementation, we will use Neo4j because it provides a mature query language called Cypher that makes graph operations intuitive. Neo4j also offers excellent performance for graph traversal operations and supports rich property types on both nodes and edges.
Vector Databases: Capturing Semantic Meaning
While knowledge graphs excel at representing explicit relationships, vector databases excel at capturing semantic similarity. When we convert text into embeddings using models like OpenAI's text-embedding-ada-002 or open-source alternatives like sentence-transformers, we create high-dimensional vectors that encode meaning. Texts with similar meanings produce vectors that are close together in this high-dimensional space.
The beauty of embeddings is that they capture nuances that keyword matching misses. A query about "error handling" will retrieve documents discussing "exception management" or "fault tolerance" even if those exact words do not appear in the query. The semantic similarity is encoded in the vector representation.
Vector databases like ChromaDB, Pinecone, or Weaviate are optimized for storing and querying these high-dimensional vectors efficiently. They use techniques like approximate nearest neighbor search to quickly find the most similar vectors to a query vector, even when dealing with millions of embeddings.
In our hybrid system, each chunk of text from our documents gets converted into an embedding and stored in the vector database along with metadata. This metadata includes the source document, the chunk's position, and critically, references to related knowledge graph nodes. This metadata enables us to bridge between the semantic and structural representations.
The Integration Strategy: Bridging Two Worlds
The key innovation in our system is the bidirectional linking between knowledge graph nodes and vector database entries. When we process a document, we simultaneously populate both the knowledge graph and the vector database, creating explicit links between them.
Here is how this works in practice. When we extract an entity like a class name from a code file, we create a node in the knowledge graph. We also create embeddings for the code surrounding that class definition and store those embeddings in the vector database. The knowledge graph node includes a property containing the IDs of related embeddings, and each embedding's metadata includes the ID of the corresponding knowledge graph node.
This bidirectional linking enables powerful query patterns. A user might start by exploring the knowledge graph, clicking through related concepts. When they find an interesting node, they can immediately access the most relevant text chunks by following the links to the vector database. Conversely, a semantic search might return relevant embeddings, and the user can then explore the knowledge graph to understand how those concepts relate to the broader system.
The integration strategy also affects how we chunk documents. Rather than using fixed-size chunks, we can use semantically meaningful boundaries aligned with knowledge graph entities. For example, when processing code, we might chunk at the function or class level. This alignment ensures that embeddings correspond to coherent conceptual units that have natural representations in the knowledge graph.
Step-by-Step Implementation: Building the System
Now we will walk through the actual implementation of this hybrid system. We will build it incrementally, starting with basic components and progressively adding sophistication.
Setting Up the Development Environment
Before writing any code, we need to set up our development environment with the necessary dependencies. We will use Python as our primary language because it has excellent libraries for working with LLMs, embeddings, and both types of databases.
First, create a new Python project directory and set up a virtual environment. This isolation ensures that our dependencies do not conflict with other projects. Navigate to your project directory and execute the following commands in your terminal.
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
Now we need to install the required packages. Create a file named requirements.txt with the following content:
neo4j==5.14.0
chromadb==0.4.18
openai==1.3.0
langchain==0.1.0
sentence-transformers==2.2.2
pypdf2==3.0.1
python-docx==1.1.0
tiktoken==0.5.1
numpy==1.24.3
Install these dependencies using pip:
pip install -r requirements.txt
Each of these packages serves a specific purpose. The neo4j package provides the Python driver for connecting to our Neo4j graph database. ChromaDB gives us a lightweight vector database that runs locally without requiring separate infrastructure. The openai package allows us to use OpenAI's embedding and language models. LangChain provides useful abstractions for building LLM applications. Sentence-transformers offers open-source embedding models as an alternative to OpenAI. PyPDF2 and python-docx enable us to read PDF and Word documents respectively. Tiktoken helps us count tokens for managing context windows. NumPy supports numerical operations on vectors.
Designing the Core Data Structures
Before implementing the processing pipeline, we need to design the core data structures that will represent our documents, entities, and relationships. Clean architecture principles suggest that we should define clear abstractions that separate concerns.
Let us create a file named models.py that defines our core data classes:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from enum import Enum
class EntityType(Enum):
"""Enumeration of entity types that can be extracted from documents."""
CONCEPT = "concept"
PERSON = "person"
ORGANIZATION = "organization"
FUNCTION = "function"
CLASS = "class"
MODULE = "module"
VARIABLE = "variable"
TECHNOLOGY = "technology"
class RelationType(Enum):
"""Enumeration of relationship types between entities."""
CONTAINS = "contains"
USES = "uses"
IMPLEMENTS = "implements"
DEPENDS_ON = "depends_on"
RELATED_TO = "related_to"
AUTHORED_BY = "authored_by"
MENTIONS = "mentions"
@dataclass
class Entity:
"""
Represents an entity extracted from a document.
Attributes:
name: The name or identifier of the entity
entity_type: The type of entity (concept, person, class, etc.)
properties: Additional properties associated with the entity
embedding_ids: List of vector database IDs for related embeddings
graph_id: The ID of this entity in the knowledge graph
"""
name: str
entity_type: EntityType
properties: Dict[str, any] = field(default_factory=dict)
embedding_ids: List[str] = field(default_factory=list)
graph_id: Optional[str] = None
def add_embedding_reference(self, embedding_id: str) -> None:
"""Add a reference to a related embedding in the vector database."""
if embedding_id not in self.embedding_ids:
self.embedding_ids.append(embedding_id)
@dataclass
class Relationship:
"""
Represents a relationship between two entities.
Attributes:
source: The source entity name
target: The target entity name
relation_type: The type of relationship
properties: Additional properties of the relationship
"""
source: str
target: str
relation_type: RelationType
properties: Dict[str, any] = field(default_factory=dict)
@dataclass
class DocumentChunk:
"""
Represents a chunk of text from a document with associated metadata.
Attributes:
content: The text content of the chunk
document_id: Identifier of the source document
chunk_index: Position of this chunk within the document
metadata: Additional metadata about the chunk
related_entities: Set of entity names related to this chunk
embedding_id: ID of this chunk's embedding in the vector database
"""
content: str
document_id: str
chunk_index: int
metadata: Dict[str, any] = field(default_factory=dict)
related_entities: Set[str] = field(default_factory=set)
embedding_id: Optional[str] = None
def add_entity_reference(self, entity_name: str) -> None:
"""Add a reference to an entity mentioned in this chunk."""
self.related_entities.add(entity_name)
@dataclass
class ProcessedDocument:
"""
Represents a fully processed document with extracted entities and chunks.
Attributes:
document_id: Unique identifier for the document
file_path: Path to the source file
entities: List of entities extracted from the document
relationships: List of relationships between entities
chunks: List of document chunks with embeddings
metadata: Additional document metadata
"""
document_id: str
file_path: str
entities: List[Entity] = field(default_factory=list)
relationships: List[Relationship] = field(default_factory=list)
chunks: List[DocumentChunk] = field(default_factory=list)
metadata: Dict[str, any] = field(default_factory=dict)
These data classes provide a clean foundation for our system. The Entity class represents nodes in our knowledge graph and maintains references to related embeddings. The Relationship class represents edges in the graph. The DocumentChunk class represents text segments that will be embedded and stored in the vector database, maintaining references back to related entities. The ProcessedDocument class ties everything together, representing the complete output of our document processing pipeline.
Notice how we use type hints throughout. This improves code clarity and enables better IDE support and static type checking. The dataclass decorator automatically generates initialization methods and other useful functionality, reducing boilerplate code.
Implementing the Document Ingestion Pipeline
Now we will implement the document ingestion pipeline that reads files from a source directory and extracts their text content. We need to handle multiple file formats gracefully.
Create a file named document_loader.py:
import os
from pathlib import Path
from typing import List, Optional
import PyPDF2
import docx
class DocumentLoader:
"""
Handles loading and text extraction from various document formats.
This class provides a unified interface for reading different file types
and extracting their text content. It supports plain text, Python code,
PDF documents, and Word documents.
"""
def __init__(self, source_directory: str):
"""
Initialize the document loader.
Args:
source_directory: Path to the directory containing documents
"""
self.source_directory = Path(source_directory)
if not self.source_directory.exists():
raise ValueError(f"Source directory does not exist: {source_directory}")
def load_all_documents(self) -> List[tuple[str, str]]:
"""
Load all supported documents from the source directory.
Returns:
List of tuples containing (file_path, text_content)
"""
documents = []
# Walk through all files in the directory and subdirectories
for file_path in self.source_directory.rglob('*'):
if file_path.is_file():
content = self._load_single_document(file_path)
if content is not None:
documents.append((str(file_path), content))
return documents
def _load_single_document(self, file_path: Path) -> Optional[str]:
"""
Load a single document based on its file extension.
Args:
file_path: Path to the document file
Returns:
The text content of the document, or None if unsupported
"""
extension = file_path.suffix.lower()
try:
if extension in ['.txt', '.md', '.py', '.java', '.cpp', '.js', '.html', '.css']:
return self._load_text_file(file_path)
elif extension == '.pdf':
return self._load_pdf_file(file_path)
elif extension in ['.docx', '.doc']:
return self._load_word_file(file_path)
else:
# Unsupported file type
return None
except Exception as e:
print(f"Error loading {file_path}: {str(e)}")
return None
def _load_text_file(self, file_path: Path) -> str:
"""
Load a plain text or code file.
Args:
file_path: Path to the text file
Returns:
The file content as a string
"""
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
def _load_pdf_file(self, file_path: Path) -> str:
"""
Extract text from a PDF file.
Args:
file_path: Path to the PDF file
Returns:
The extracted text content
"""
text_content = []
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
# Extract text from each page
for page in pdf_reader.pages:
text_content.append(page.extract_text())
return '\n'.join(text_content)
def _load_word_file(self, file_path: Path) -> str:
"""
Extract text from a Word document.
Args:
file_path: Path to the Word file
Returns:
The extracted text content
"""
doc = docx.Document(file_path)
# Extract text from all paragraphs
text_content = [paragraph.text for paragraph in doc.paragraphs]
return '\n'.join(text_content)
This DocumentLoader class provides a clean abstraction for reading various file types. The load_all_documents method recursively scans the source directory and processes each supported file. The implementation uses private methods for each file type, making it easy to add support for additional formats in the future.
The error handling is important here. If a file cannot be read, we print an error message but continue processing other files. This robustness ensures that one corrupted file does not crash the entire ingestion pipeline.
Implementing Entity Extraction
The next critical component is entity extraction. This is where we analyze document content to identify entities and relationships. For code files, we will use abstract syntax tree parsing. For text documents, we will use natural language processing techniques.
Create a file named entity_extractor.py:
import ast
import re
from typing import List, Set, Tuple
from models import Entity, Relationship, EntityType, RelationType
class CodeEntityExtractor:
"""
Extracts entities and relationships from source code files.
This extractor uses abstract syntax tree parsing to identify
classes, functions, and their relationships in Python code.
"""
def extract_from_python(self, code: str, file_path: str) -> Tuple[List[Entity], List[Relationship]]:
"""
Extract entities and relationships from Python code.
Args:
code: The Python source code as a string
file_path: Path to the source file
Returns:
Tuple of (entities, relationships)
"""
entities = []
relationships = []
try:
tree = ast.parse(code)
except SyntaxError:
# If code cannot be parsed, return empty results
return entities, relationships
# Extract module-level information
module_name = self._get_module_name(file_path)
module_entity = Entity(
name=module_name,
entity_type=EntityType.MODULE,
properties={'file_path': file_path}
)
entities.append(module_entity)
# Walk through the AST to find classes and functions
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
class_entity, class_relationships = self._extract_class(node, module_name)
entities.append(class_entity)
relationships.extend(class_relationships)
elif isinstance(node, ast.FunctionDef):
# Only process module-level functions
if self._is_module_level(node, tree):
func_entity, func_relationships = self._extract_function(node, module_name)
entities.append(func_entity)
relationships.extend(func_relationships)
return entities, relationships
def _get_module_name(self, file_path: str) -> str:
"""Extract module name from file path."""
return file_path.split('/')[-1].replace('.py', '')
def _is_module_level(self, node: ast.AST, tree: ast.AST) -> bool:
"""Check if a node is at module level (not nested in a class)."""
for parent_node in ast.walk(tree):
if isinstance(parent_node, ast.ClassDef):
if node in ast.walk(parent_node):
return False
return True
def _extract_class(self, node: ast.ClassDef, module_name: str) -> Tuple[Entity, List[Relationship]]:
"""
Extract entity and relationships for a class definition.
Args:
node: The AST node representing the class
module_name: Name of the containing module
Returns:
Tuple of (class_entity, relationships)
"""
class_name = node.name
relationships = []
# Create entity for the class
class_entity = Entity(
name=f"{module_name}.{class_name}",
entity_type=EntityType.CLASS,
properties={
'docstring': ast.get_docstring(node) or '',
'base_classes': [base.id for base in node.bases if isinstance(base, ast.Name)]
}
)
# Create relationship to module
relationships.append(Relationship(
source=module_name,
target=f"{module_name}.{class_name}",
relation_type=RelationType.CONTAINS
))
# Extract methods
for item in node.body:
if isinstance(item, ast.FunctionDef):
method_name = f"{module_name}.{class_name}.{item.name}"
relationships.append(Relationship(
source=f"{module_name}.{class_name}",
target=method_name,
relation_type=RelationType.CONTAINS
))
return class_entity, relationships
def _extract_function(self, node: ast.FunctionDef, module_name: str) -> Tuple[Entity, List[Relationship]]:
"""
Extract entity and relationships for a function definition.
Args:
node: The AST node representing the function
module_name: Name of the containing module
Returns:
Tuple of (function_entity, relationships)
"""
func_name = node.name
relationships = []
# Create entity for the function
func_entity = Entity(
name=f"{module_name}.{func_name}",
entity_type=EntityType.FUNCTION,
properties={
'docstring': ast.get_docstring(node) or '',
'parameters': [arg.arg for arg in node.args.args]
}
)
# Create relationship to module
relationships.append(Relationship(
source=module_name,
target=f"{module_name}.{func_name}",
relation_type=RelationType.CONTAINS
))
return func_entity, relationships
class TextEntityExtractor:
"""
Extracts entities and relationships from natural language text.
This extractor uses pattern matching and keyword extraction to identify
concepts and their relationships in text documents.
"""
def __init__(self):
"""Initialize the text entity extractor with common patterns."""
# Common technical terms that should be extracted as entities
self.technical_terms = {
'api', 'database', 'server', 'client', 'framework', 'library',
'algorithm', 'data structure', 'interface', 'protocol', 'service'
}
def extract_from_text(self, text: str, document_id: str) -> Tuple[List[Entity], List[Relationship]]:
"""
Extract entities and relationships from natural language text.
Args:
text: The text content to analyze
document_id: Identifier for the source document
Returns:
Tuple of (entities, relationships)
"""
entities = []
relationships = []
# Extract capitalized terms as potential entities
capitalized_terms = self._extract_capitalized_terms(text)
# Extract technical terms
technical_entities = self._extract_technical_terms(text)
# Combine and deduplicate entities
all_entity_names = set(capitalized_terms) | set(technical_entities)
for entity_name in all_entity_names:
entity = Entity(
name=entity_name,
entity_type=EntityType.CONCEPT,
properties={'document_id': document_id}
)
entities.append(entity)
# Extract simple co-occurrence relationships
for i, entity1 in enumerate(all_entity_names):
for entity2 in list(all_entity_names)[i+1:]:
if self._entities_cooccur(text, entity1, entity2):
relationships.append(Relationship(
source=entity1,
target=entity2,
relation_type=RelationType.RELATED_TO
))
return entities, relationships
def _extract_capitalized_terms(self, text: str) -> Set[str]:
"""Extract capitalized terms that might be proper nouns or concepts."""
# Find sequences of capitalized words
pattern = r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b'
matches = re.findall(pattern, text)
# Filter out common words and short terms
filtered = {match for match in matches if len(match) > 3}
return filtered
def _extract_technical_terms(self, text: str) -> Set[str]:
"""Extract technical terms from the text."""
text_lower = text.lower()
found_terms = set()
for term in self.technical_terms:
if term in text_lower:
found_terms.add(term)
return found_terms
def _entities_cooccur(self, text: str, entity1: str, entity2: str, window: int = 100) -> bool:
"""
Check if two entities co-occur within a text window.
Args:
text: The text to search
entity1: First entity name
entity2: Second entity name
window: Character window size for co-occurrence
Returns:
True if entities co-occur within the window
"""
text_lower = text.lower()
entity1_lower = entity1.lower()
entity2_lower = entity2.lower()
# Find all positions of entity1
pos1_list = [m.start() for m in re.finditer(re.escape(entity1_lower), text_lower)]
pos2_list = [m.start() for m in re.finditer(re.escape(entity2_lower), text_lower)]
# Check if any positions are within the window
for pos1 in pos1_list:
for pos2 in pos2_list:
if abs(pos1 - pos2) <= window:
return True
return False
This entity extraction implementation provides separate extractors for code and text. The CodeEntityExtractor uses Python's ast module to parse code and extract classes, functions, and their relationships. This approach is much more reliable than regex-based parsing because it understands the actual structure of the code.
The TextEntityExtractor uses simpler heuristics. It identifies capitalized terms that might be proper nouns or important concepts, and it looks for known technical terms. It also detects co-occurrence relationships, where entities that appear near each other in the text are likely related.
These extractors are intentionally simple to keep the tutorial focused. In a production system, you would likely use more sophisticated natural language processing techniques, possibly including named entity recognition models or even LLM-based extraction.
Implementing Text Chunking
Before we can create embeddings, we need to chunk our documents into appropriately sized segments. The chunking strategy significantly impacts retrieval quality.
Create a file named text_chunker.py:
from typing import List
import tiktoken
from models import DocumentChunk
class TextChunker:
"""
Chunks documents into segments suitable for embedding.
This chunker uses token-based splitting with overlap to ensure
that context is preserved across chunk boundaries.
"""
def __init__(self, chunk_size: int = 512, overlap: int = 50):
"""
Initialize the text chunker.
Args:
chunk_size: Maximum number of tokens per chunk
overlap: Number of tokens to overlap between chunks
"""
self.chunk_size = chunk_size
self.overlap = overlap
self.encoding = tiktoken.get_encoding("cl100k_base")
def chunk_document(self, text: str, document_id: str, metadata: dict = None) -> List[DocumentChunk]:
"""
Split a document into chunks with overlap.
Args:
text: The text content to chunk
document_id: Identifier for the source document
metadata: Optional metadata to attach to chunks
Returns:
List of DocumentChunk objects
"""
if metadata is None:
metadata = {}
# Tokenize the entire text
tokens = self.encoding.encode(text)
chunks = []
chunk_index = 0
start_pos = 0
while start_pos < len(tokens):
# Calculate end position for this chunk
end_pos = min(start_pos + self.chunk_size, len(tokens))
# Extract tokens for this chunk
chunk_tokens = tokens[start_pos:end_pos]
# Decode back to text
chunk_text = self.encoding.decode(chunk_tokens)
# Create DocumentChunk object
chunk = DocumentChunk(
content=chunk_text,
document_id=document_id,
chunk_index=chunk_index,
metadata={
**metadata,
'start_token': start_pos,
'end_token': end_pos,
'token_count': len(chunk_tokens)
}
)
chunks.append(chunk)
# Move start position forward, accounting for overlap
start_pos += self.chunk_size - self.overlap
chunk_index += 1
return chunks
def chunk_by_sections(self, text: str, document_id: str, section_delimiter: str = "\n\n") -> List[DocumentChunk]:
"""
Chunk a document by sections rather than fixed token counts.
This method is useful for documents with clear section boundaries
like markdown files or structured text.
Args:
text: The text content to chunk
document_id: Identifier for the source document
section_delimiter: String that delimits sections
Returns:
List of DocumentChunk objects
"""
sections = text.split(section_delimiter)
chunks = []
for index, section in enumerate(sections):
# Skip empty sections
if not section.strip():
continue
# Check if section is too large and needs further splitting
tokens = self.encoding.encode(section)
if len(tokens) <= self.chunk_size:
# Section fits in one chunk
chunk = DocumentChunk(
content=section,
document_id=document_id,
chunk_index=index,
metadata={'section_index': index}
)
chunks.append(chunk)
else:
# Section needs to be split further
sub_chunks = self.chunk_document(section, document_id, {'section_index': index})
chunks.extend(sub_chunks)
return chunks
The TextChunker class provides two chunking strategies. The chunk_document method uses fixed-size token windows with overlap. The overlap is crucial because it ensures that information spanning chunk boundaries is not lost. If a key concept is split across two chunks, the overlap ensures that at least one chunk contains the complete context.
The chunk_by_sections method respects natural document boundaries like paragraphs or sections. This is particularly useful for structured documents where breaking in the middle of a section would lose important context. If a section is too large, it falls back to token-based chunking.
Using token-based chunking rather than character-based chunking is important because embedding models have token limits, and different texts can have very different token-to-character ratios depending on the language and content.
Implementing the Vector Database Interface
Now we will implement the interface to our vector database. We will use ChromaDB for this tutorial because it is lightweight and does not require separate infrastructure.
Create a file named vector_store.py:
import chromadb
from chromadb.config import Settings
from typing import List, Dict, Optional
import openai
from models import DocumentChunk
class VectorStore:
"""
Manages storage and retrieval of document embeddings.
This class provides a clean interface to the vector database,
handling embedding generation and similarity search.
"""
def __init__(self, collection_name: str = "documents", persist_directory: str = "./chroma_db"):
"""
Initialize the vector store.
Args:
collection_name: Name of the collection to use
persist_directory: Directory where the database will be persisted
"""
self.client = chromadb.Client(Settings(
persist_directory=persist_directory,
anonymized_telemetry=False
))
# Get or create collection
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
# Initialize OpenAI client for embeddings
self.openai_client = openai.OpenAI()
def add_chunks(self, chunks: List[DocumentChunk]) -> List[str]:
"""
Add document chunks to the vector store.
Args:
chunks: List of DocumentChunk objects to add
Returns:
List of embedding IDs assigned to the chunks
"""
if not chunks:
return []
# Generate embeddings for all chunks
texts = [chunk.content for chunk in chunks]
embeddings = self._generate_embeddings(texts)
# Prepare metadata for each chunk
metadatas = []
for chunk in chunks:
metadata = {
'document_id': chunk.document_id,
'chunk_index': chunk.chunk_index,
**chunk.metadata
}
# Add entity references to metadata
if chunk.related_entities:
metadata['related_entities'] = ','.join(chunk.related_entities)
metadatas.append(metadata)
# Generate IDs for the chunks
ids = [f"{chunk.document_id}_chunk_{chunk.chunk_index}" for chunk in chunks]
# Add to collection
self.collection.add(
embeddings=embeddings,
documents=texts,
metadatas=metadatas,
ids=ids
)
# Update chunk objects with their IDs
for chunk, chunk_id in zip(chunks, ids):
chunk.embedding_id = chunk_id
return ids
def _generate_embeddings(self, texts: List[str]) -> List[List[float]]:
"""
Generate embeddings for a list of texts using OpenAI.
Args:
texts: List of text strings to embed
Returns:
List of embedding vectors
"""
response = self.openai_client.embeddings.create(
model="text-embedding-ada-002",
input=texts
)
embeddings = [item.embedding for item in response.data]
return embeddings
def search(self, query: str, n_results: int = 5, filter_dict: Optional[Dict] = None) -> List[Dict]:
"""
Search for similar chunks using a text query.
Args:
query: The search query text
n_results: Number of results to return
filter_dict: Optional metadata filters
Returns:
List of result dictionaries containing content and metadata
"""
# Generate embedding for the query
query_embedding = self._generate_embeddings([query])[0]
# Perform similarity search
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=filter_dict
)
# Format results
formatted_results = []
for i in range(len(results['ids'][0])):
result = {
'id': results['ids'][0][i],
'content': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'distance': results['distances'][0][i] if 'distances' in results else None
}
formatted_results.append(result)
return formatted_results
def get_by_ids(self, ids: List[str]) -> List[Dict]:
"""
Retrieve chunks by their IDs.
Args:
ids: List of chunk IDs to retrieve
Returns:
List of result dictionaries
"""
results = self.collection.get(ids=ids)
formatted_results = []
for i in range(len(results['ids'])):
result = {
'id': results['ids'][i],
'content': results['documents'][i],
'metadata': results['metadatas'][i]
}
formatted_results.append(result)
return formatted_results
The VectorStore class encapsulates all interactions with ChromaDB. The add_chunks method takes a list of DocumentChunk objects, generates embeddings using OpenAI's API, and stores them in the collection along with metadata. The metadata includes the document ID, chunk index, and importantly, references to related entities from the knowledge graph.
The search method performs semantic similarity search. It generates an embedding for the query text and finds the most similar chunks in the database. The optional filter_dict parameter allows filtering results based on metadata, which is useful for restricting searches to specific documents or entity types.
The get_by_ids method enables direct retrieval of chunks when we have their IDs. This is crucial for the bidirectional linking with the knowledge graph. When a user explores a node in the graph, we can use this method to retrieve the associated text chunks.
Implementing the Knowledge Graph Interface
Now we will implement the interface to Neo4j for managing our knowledge graph.
Create a file named knowledge_graph.py:
from neo4j import GraphDatabase
from typing import List, Dict, Optional
from models import Entity, Relationship
class KnowledgeGraph:
"""
Manages the knowledge graph stored in Neo4j.
This class provides methods for adding entities and relationships,
querying the graph, and maintaining bidirectional links with the vector store.
"""
def __init__(self, uri: str = "bolt://localhost:7687", user: str = "neo4j", password: str = "password"):
"""
Initialize the knowledge graph connection.
Args:
uri: Neo4j connection URI
user: Database username
password: Database password
"""
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
"""Close the database connection."""
self.driver.close()
def add_entity(self, entity: Entity) -> str:
"""
Add an entity to the knowledge graph.
Args:
entity: The Entity object to add
Returns:
The graph ID of the created node
"""
with self.driver.session() as session:
result = session.execute_write(self._create_entity_node, entity)
entity.graph_id = result
return result
@staticmethod
def _create_entity_node(tx, entity: Entity) -> str:
"""
Transaction function to create an entity node.
Args:
tx: Neo4j transaction
entity: The Entity object to create
Returns:
The ID of the created node
"""
query = """
CREATE (e:Entity {
name: $name,
type: $type,
properties: $properties,
embedding_ids: $embedding_ids
})
RETURN id(e) as node_id
"""
result = tx.run(
query,
name=entity.name,
type=entity.entity_type.value,
properties=str(entity.properties),
embedding_ids=entity.embedding_ids
)
record = result.single()
return str(record["node_id"])
def add_relationship(self, relationship: Relationship) -> None:
"""
Add a relationship between two entities.
Args:
relationship: The Relationship object to add
"""
with self.driver.session() as session:
session.execute_write(self._create_relationship, relationship)
@staticmethod
def _create_relationship(tx, relationship: Relationship) -> None:
"""
Transaction function to create a relationship.
Args:
tx: Neo4j transaction
relationship: The Relationship object to create
"""
query = """
MATCH (source:Entity {name: $source_name})
MATCH (target:Entity {name: $target_name})
CREATE (source)-[r:RELATES {
type: $rel_type,
properties: $properties
}]->(target)
"""
tx.run(
query,
source_name=relationship.source,
target_name=relationship.target,
rel_type=relationship.relation_type.value,
properties=str(relationship.properties)
)
def update_entity_embeddings(self, entity_name: str, embedding_ids: List[str]) -> None:
"""
Update the embedding references for an entity.
Args:
entity_name: Name of the entity to update
embedding_ids: List of embedding IDs to associate with the entity
"""
with self.driver.session() as session:
session.execute_write(self._update_embeddings, entity_name, embedding_ids)
@staticmethod
def _update_embeddings(tx, entity_name: str, embedding_ids: List[str]) -> None:
"""
Transaction function to update embedding references.
Args:
tx: Neo4j transaction
entity_name: Name of the entity
embedding_ids: List of embedding IDs
"""
query = """
MATCH (e:Entity {name: $name})
SET e.embedding_ids = $embedding_ids
"""
tx.run(query, name=entity_name, embedding_ids=embedding_ids)
def get_entity(self, entity_name: str) -> Optional[Dict]:
"""
Retrieve an entity by name.
Args:
entity_name: Name of the entity to retrieve
Returns:
Dictionary containing entity data, or None if not found
"""
with self.driver.session() as session:
result = session.execute_read(self._find_entity, entity_name)
return result
@staticmethod
def _find_entity(tx, entity_name: str) -> Optional[Dict]:
"""
Transaction function to find an entity.
Args:
tx: Neo4j transaction
entity_name: Name of the entity to find
Returns:
Dictionary with entity data or None
"""
query = """
MATCH (e:Entity {name: $name})
RETURN e.name as name, e.type as type,
e.properties as properties, e.embedding_ids as embedding_ids
"""
result = tx.run(query, name=entity_name)
record = result.single()
if record:
return {
'name': record['name'],
'type': record['type'],
'properties': record['properties'],
'embedding_ids': record['embedding_ids']
}
return None
def get_related_entities(self, entity_name: str, max_depth: int = 2) -> List[Dict]:
"""
Get entities related to a given entity within a certain depth.
Args:
entity_name: Name of the starting entity
max_depth: Maximum relationship depth to traverse
Returns:
List of related entity dictionaries
"""
with self.driver.session() as session:
result = session.execute_read(self._find_related, entity_name, max_depth)
return result
@staticmethod
def _find_related(tx, entity_name: str, max_depth: int) -> List[Dict]:
"""
Transaction function to find related entities.
Args:
tx: Neo4j transaction
entity_name: Name of the starting entity
max_depth: Maximum depth to traverse
Returns:
List of related entity dictionaries
"""
query = """
MATCH path = (start:Entity {name: $name})-[*1..%d]-(related:Entity)
RETURN DISTINCT related.name as name, related.type as type,
related.embedding_ids as embedding_ids,
length(path) as distance
ORDER BY distance
""" % max_depth
result = tx.run(query, name=entity_name)
related_entities = []
for record in result:
related_entities.append({
'name': record['name'],
'type': record['type'],
'embedding_ids': record['embedding_ids'],
'distance': record['distance']
})
return related_entities
def search_entities_by_type(self, entity_type: str) -> List[Dict]:
"""
Search for all entities of a specific type.
Args:
entity_type: The type of entities to find
Returns:
List of entity dictionaries
"""
with self.driver.session() as session:
result = session.execute_read(self._find_by_type, entity_type)
return result
@staticmethod
def _find_by_type(tx, entity_type: str) -> List[Dict]:
"""
Transaction function to find entities by type.
Args:
tx: Neo4j transaction
entity_type: Type to search for
Returns:
List of entity dictionaries
"""
query = """
MATCH (e:Entity {type: $type})
RETURN e.name as name, e.type as type,
e.properties as properties, e.embedding_ids as embedding_ids
"""
result = tx.run(query, type=entity_type)
entities = []
for record in result:
entities.append({
'name': record['name'],
'type': record['type'],
'properties': record['properties'],
'embedding_ids': record['embedding_ids']
})
return entities
The KnowledgeGraph class provides a clean interface to Neo4j. It handles entity creation, relationship creation, and various query operations. The implementation uses Neo4j's transaction functions to ensure data consistency.
The add_entity method creates a node in the graph with properties including the entity name, type, and crucially, a list of embedding IDs that link to the vector database. The update_entity_embeddings method allows us to add these links after the embeddings have been created.
The get_related_entities method is particularly powerful. It uses Cypher's path matching capabilities to find all entities connected to a given entity within a specified depth. This enables exploration of the knowledge graph, discovering how concepts relate to each other.
Implementing the Document Processing Pipeline
Now we will tie everything together in a document processing pipeline that coordinates all the components we have built.
Create a file named document_processor.py:
from typing import List, Dict
from pathlib import Path
from models import ProcessedDocument, DocumentChunk
from document_loader import DocumentLoader
from entity_extractor import CodeEntityExtractor, TextEntityExtractor
from text_chunker import TextChunker
from vector_store import VectorStore
from knowledge_graph import KnowledgeGraph
class DocumentProcessor:
"""
Orchestrates the document processing pipeline.
This class coordinates loading documents, extracting entities,
creating embeddings, and populating both the knowledge graph
and vector database with bidirectional links.
"""
def __init__(self,
source_directory: str,
vector_store: VectorStore,
knowledge_graph: KnowledgeGraph):
"""
Initialize the document processor.
Args:
source_directory: Directory containing documents to process
vector_store: VectorStore instance for embeddings
knowledge_graph: KnowledgeGraph instance for entities
"""
self.loader = DocumentLoader(source_directory)
self.code_extractor = CodeEntityExtractor()
self.text_extractor = TextEntityExtractor()
self.chunker = TextChunker(chunk_size=512, overlap=50)
self.vector_store = vector_store
self.knowledge_graph = knowledge_graph
def process_all_documents(self) -> List[ProcessedDocument]:
"""
Process all documents in the source directory.
Returns:
List of ProcessedDocument objects
"""
# Load all documents
documents = self.loader.load_all_documents()
print(f"Loaded {len(documents)} documents")
processed_docs = []
for file_path, content in documents:
print(f"Processing: {file_path}")
processed_doc = self.process_single_document(file_path, content)
processed_docs.append(processed_doc)
return processed_docs
def process_single_document(self, file_path: str, content: str) -> ProcessedDocument:
"""
Process a single document through the entire pipeline.
Args:
file_path: Path to the document file
content: Text content of the document
Returns:
ProcessedDocument object with all extracted information
"""
# Generate document ID from file path
document_id = self._generate_document_id(file_path)
# Extract entities and relationships
entities, relationships = self._extract_entities(file_path, content)
# Create chunks
chunks = self.chunker.chunk_document(
content,
document_id,
metadata={'file_path': file_path}
)
# Link chunks to entities
self._link_chunks_to_entities(chunks, entities, content)
# Add chunks to vector store
embedding_ids = self.vector_store.add_chunks(chunks)
# Add entities to knowledge graph
for entity in entities:
self.knowledge_graph.add_entity(entity)
# Add relationships to knowledge graph
for relationship in relationships:
self.knowledge_graph.add_relationship(relationship)
# Update entities with embedding references
self._update_entity_embeddings(entities, chunks)
# Create ProcessedDocument object
processed_doc = ProcessedDocument(
document_id=document_id,
file_path=file_path,
entities=entities,
relationships=relationships,
chunks=chunks,
metadata={'content_length': len(content)}
)
return processed_doc
def _generate_document_id(self, file_path: str) -> str:
"""Generate a unique document ID from the file path."""
path_obj = Path(file_path)
# Use relative path components to create ID
return str(path_obj).replace('/', '_').replace('\\', '_')
def _extract_entities(self, file_path: str, content: str):
"""
Extract entities and relationships based on file type.
Args:
file_path: Path to the file
content: File content
Returns:
Tuple of (entities, relationships)
"""
# Determine file type
if file_path.endswith('.py'):
return self.code_extractor.extract_from_python(content, file_path)
else:
document_id = self._generate_document_id(file_path)
return self.text_extractor.extract_from_text(content, document_id)
def _link_chunks_to_entities(self, chunks: List[DocumentChunk], entities: List, content: str) -> None:
"""
Link document chunks to the entities they mention.
Args:
chunks: List of document chunks
entities: List of extracted entities
content: Full document content
"""
# For each chunk, find which entities it mentions
for chunk in chunks:
chunk_lower = chunk.content.lower()
for entity in entities:
entity_name_lower = entity.name.lower()
# Check if entity name appears in chunk
if entity_name_lower in chunk_lower:
chunk.add_entity_reference(entity.name)
def _update_entity_embeddings(self, entities: List, chunks: List[DocumentChunk]) -> None:
"""
Update entities with references to related embeddings.
Args:
entities: List of entities to update
chunks: List of document chunks with embedding IDs
"""
# Create mapping from entity names to embedding IDs
for entity in entities:
related_embedding_ids = []
for chunk in chunks:
if entity.name in chunk.related_entities:
if chunk.embedding_id:
related_embedding_ids.append(chunk.embedding_id)
if related_embedding_ids:
entity.add_embedding_reference(related_embedding_ids[0])
self.knowledge_graph.update_entity_embeddings(
entity.name,
related_embedding_ids
)
The DocumentProcessor class orchestrates the entire pipeline. The process_all_documents method loads all documents from the source directory and processes each one. The process_single_document method handles a single document through all stages: entity extraction, chunking, embedding generation, and storage in both databases.
The critical step is the bidirectional linking. The _link_chunks_to_entities method identifies which entities are mentioned in each chunk. Then _update_entity_embeddings creates the reverse links, updating entity nodes in the knowledge graph with references to the embedding IDs of chunks that mention them.
This bidirectional linking is what makes the hybrid system powerful. Users can start from either entry point and navigate to the other.
Implementing the Query Interface
Now we will implement the query interface that allows users to ask questions and navigate the hybrid knowledge system.
Create a file named query_interface.py:
from typing import List, Dict, Optional
from vector_store import VectorStore
from knowledge_graph import KnowledgeGraph
import openai
class QueryInterface:
"""
Provides a unified interface for querying the hybrid knowledge system.
This class handles both semantic search through the vector database
and structural queries through the knowledge graph, combining results
to provide comprehensive answers.
"""
def __init__(self, vector_store: VectorStore, knowledge_graph: KnowledgeGraph):
"""
Initialize the query interface.
Args:
vector_store: VectorStore instance for semantic search
knowledge_graph: KnowledgeGraph instance for structural queries
"""
self.vector_store = vector_store
self.knowledge_graph = knowledge_graph
self.openai_client = openai.OpenAI()
def semantic_search(self, query: str, n_results: int = 5) -> List[Dict]:
"""
Perform semantic search using the vector database.
Args:
query: The search query
n_results: Number of results to return
Returns:
List of search results with content and metadata
"""
results = self.vector_store.search(query, n_results)
# Enhance results with knowledge graph information
for result in results:
# Get related entities from metadata
if 'related_entities' in result['metadata']:
entity_names = result['metadata']['related_entities'].split(',')
result['entities'] = entity_names
else:
result['entities'] = []
return results
def explore_entity(self, entity_name: str) -> Dict:
"""
Explore an entity and its related information.
Args:
entity_name: Name of the entity to explore
Returns:
Dictionary containing entity information and related content
"""
# Get entity from knowledge graph
entity = self.knowledge_graph.get_entity(entity_name)
if not entity:
return {'error': f'Entity {entity_name} not found'}
# Get related entities
related = self.knowledge_graph.get_related_entities(entity_name, max_depth=2)
# Get associated text chunks from vector store
embedding_ids = entity.get('embedding_ids', [])
chunks = []
if embedding_ids:
chunks = self.vector_store.get_by_ids(embedding_ids)
return {
'entity': entity,
'related_entities': related,
'text_chunks': chunks
}
def hybrid_search(self, query: str, n_results: int = 5) -> Dict:
"""
Perform hybrid search combining semantic and structural information.
Args:
query: The search query
n_results: Number of results to return
Returns:
Dictionary with semantic results and related graph information
"""
# First, perform semantic search
semantic_results = self.semantic_search(query, n_results)
# Collect all entities mentioned in results
all_entities = set()
for result in semantic_results:
all_entities.update(result.get('entities', []))
# Get graph information for these entities
graph_context = {}
for entity_name in all_entities:
related = self.knowledge_graph.get_related_entities(entity_name, max_depth=1)
graph_context[entity_name] = related
return {
'semantic_results': semantic_results,
'graph_context': graph_context
}
def answer_question(self, question: str, n_context: int = 5) -> str:
"""
Answer a question using the hybrid knowledge system and LLM.
Args:
question: The question to answer
n_context: Number of context chunks to retrieve
Returns:
The generated answer
"""
# Perform hybrid search to get context
search_results = self.hybrid_search(question, n_context)
# Build context from search results
context_parts = []
# Add semantic search results
for result in search_results['semantic_results']:
context_parts.append(f"Content: {result['content']}")
if result.get('entities'):
context_parts.append(f"Related entities: {', '.join(result['entities'])}")
# Add graph context
for entity, related in search_results['graph_context'].items():
if related:
related_names = [r['name'] for r in related[:3]]
context_parts.append(
f"Entity '{entity}' is related to: {', '.join(related_names)}"
)
context = '\n\n'.join(context_parts)
# Generate answer using LLM
messages = [
{
'role': 'system',
'content': 'You are a helpful assistant that answers questions based on the provided context. Use both the text content and the entity relationships to provide comprehensive answers.'
},
{
'role': 'user',
'content': f"Context:\n{context}\n\nQuestion: {question}\n\nPlease provide a detailed answer based on the context above."
}
]
response = self.openai_client.chat.completions.create(
model='gpt-4',
messages=messages,
temperature=0.7,
max_tokens=500
)
return response.choices[0].message.content
The QueryInterface class provides multiple ways to interact with the hybrid system. The semantic_search method performs pure vector similarity search. The explore_entity method starts from a knowledge graph node and retrieves both related entities and associated text chunks. The hybrid_search method combines both approaches, performing semantic search and then enriching results with graph context.
The answer_question method demonstrates the full power of the hybrid approach. It performs hybrid search to gather context, then uses an LLM to synthesize an answer. The LLM receives both the semantically relevant text chunks and information about entity relationships, enabling it to provide more comprehensive and contextually aware answers than either approach alone could provide.
Creating the Main Application
Finally, we will create the main application that ties everything together and provides a command-line interface for users.
Create a file named main.py:
import os
from vector_store import VectorStore
from knowledge_graph import KnowledgeGraph
from document_processor import DocumentProcessor
from query_interface import QueryInterface
class DocumentChatbot:
"""
Main application class for the document chatbot.
This class provides a command-line interface for processing documents
and querying the hybrid knowledge system.
"""
def __init__(self, source_directory: str):
"""
Initialize the chatbot application.
Args:
source_directory: Directory containing documents to process
"""
self.source_directory = source_directory
# Initialize components
print("Initializing vector store...")
self.vector_store = VectorStore(
collection_name="documents",
persist_directory="./chroma_db"
)
print("Initializing knowledge graph...")
self.knowledge_graph = KnowledgeGraph(
uri="bolt://localhost:7687",
user="neo4j",
password="your_password_here"
)
print("Initializing document processor...")
self.processor = DocumentProcessor(
source_directory=source_directory,
vector_store=self.vector_store,
knowledge_graph=self.knowledge_graph
)
print("Initializing query interface...")
self.query_interface = QueryInterface(
vector_store=self.vector_store,
knowledge_graph=self.knowledge_graph
)
def process_documents(self):
"""Process all documents in the source directory."""
print("\nProcessing documents...")
processed_docs = self.processor.process_all_documents()
print(f"\nSuccessfully processed {len(processed_docs)} documents")
# Print summary statistics
total_entities = sum(len(doc.entities) for doc in processed_docs)
total_relationships = sum(len(doc.relationships) for doc in processed_docs)
total_chunks = sum(len(doc.chunks) for doc in processed_docs)
print(f"Total entities extracted: {total_entities}")
print(f"Total relationships extracted: {total_relationships}")
print(f"Total text chunks created: {total_chunks}")
def interactive_query_loop(self):
"""Run an interactive query loop for users to ask questions."""
print("\n" + "="*60)
print("Document Chatbot - Interactive Query Mode")
print("="*60)
print("\nCommands:")
print(" ask <question> - Ask a question about the documents")
print(" search <query> - Perform semantic search")
print(" explore <entity> - Explore an entity in the knowledge graph")
print(" entities <type> - List entities of a specific type")
print(" quit - Exit the application")
print("\n" + "="*60 + "\n")
while True:
try:
user_input = input("\nEnter command: ").strip()
if not user_input:
continue
if user_input.lower() == 'quit':
print("Goodbye!")
break
# Parse command
parts = user_input.split(maxsplit=1)
command = parts[0].lower()
if command == 'ask' and len(parts) > 1:
question = parts[1]
print("\nGenerating answer...")
answer = self.query_interface.answer_question(question)
print(f"\nAnswer:\n{answer}")
elif command == 'search' and len(parts) > 1:
query = parts[1]
print("\nSearching...")
results = self.query_interface.semantic_search(query, n_results=3)
print(f"\nFound {len(results)} results:\n")
for i, result in enumerate(results, 1):
print(f"Result {i}:")
print(f"Content: {result['content'][:200]}...")
if result.get('entities'):
print(f"Related entities: {', '.join(result['entities'])}")
print()
elif command == 'explore' and len(parts) > 1:
entity_name = parts[1]
print(f"\nExploring entity: {entity_name}")
info = self.query_interface.explore_entity(entity_name)
if 'error' in info:
print(info['error'])
else:
print(f"\nEntity type: {info['entity']['type']}")
print(f"\nRelated entities ({len(info['related_entities'])}):")
for related in info['related_entities'][:5]:
print(f" - {related['name']} (distance: {related['distance']})")
print(f"\nAssociated text chunks ({len(info['text_chunks'])}):")
for chunk in info['text_chunks'][:2]:
print(f" {chunk['content'][:150]}...")
elif command == 'entities' and len(parts) > 1:
entity_type = parts[1]
print(f"\nSearching for entities of type: {entity_type}")
entities = self.knowledge_graph.search_entities_by_type(entity_type)
print(f"\nFound {len(entities)} entities:")
for entity in entities[:10]:
print(f" - {entity['name']}")
else:
print("Invalid command. Type 'quit' to exit or use one of the available commands.")
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"\nError: {str(e)}")
def cleanup(self):
"""Clean up resources."""
self.knowledge_graph.close()
def main():
"""Main entry point for the application."""
# Check if source directory is provided
import sys
if len(sys.argv) < 2:
print("Usage: python main.py <source_directory>")
print("\nExample: python main.py ./documents")
return
source_directory = sys.argv[1]
if not os.path.exists(source_directory):
print(f"Error: Directory '{source_directory}' does not exist")
return
# Create and run chatbot
chatbot = DocumentChatbot(source_directory)
try:
# Process documents
chatbot.process_documents()
# Run interactive query loop
chatbot.interactive_query_loop()
finally:
# Clean up
chatbot.cleanup()
if __name__ == "__main__":
main()
The DocumentChatbot class is the main application controller. It initializes all components, processes documents, and provides an interactive command-line interface. Users can ask questions, perform searches, explore entities, and list entities by type.
The interactive_query_loop method implements a simple command parser that routes user input to the appropriate query interface methods. This provides a practical way to interact with the hybrid knowledge system.
Advanced Features and Optimizations
The system we have built provides a solid foundation, but there are many ways to enhance it further. Here are some advanced features you might consider implementing.
One important enhancement is incremental updates. The current system processes all documents at once, but in a production environment, you would want to handle document additions, updates, and deletions incrementally. This requires tracking document versions and updating only the affected entities and embeddings.
Another valuable feature is query result caching. Frequently asked questions could be cached along with their answers to reduce latency and API costs. You could implement a simple cache using Python's functools.lru_cache decorator or a more sophisticated solution using Redis.
The entity extraction could be significantly improved using more sophisticated techniques. Instead of simple pattern matching for text documents, you could use named entity recognition models from libraries like spaCy or use an LLM to extract entities and relationships. For code, you could extend the AST-based extraction to support multiple programming languages.
The chunking strategy could also be enhanced. Rather than using fixed-size chunks with overlap, you could implement semantic chunking that uses embeddings to identify natural boundaries where topic shifts occur. This would create more coherent chunks that better represent distinct concepts.
Graph visualization would make the system much more user-friendly. You could add a web interface that displays the knowledge graph visually, allowing users to click on nodes to explore relationships and view associated text chunks. Libraries like D3.js or Cytoscape.js work well for graph visualization.
Query routing intelligence would improve answer quality. The system could analyze the question to determine whether it requires primarily semantic search, graph traversal, or a combination. Questions about relationships between concepts would prioritize graph queries, while questions about specific details would prioritize semantic search.
Multi-modal support would extend the system to handle images, diagrams, and other non-text content. You could use vision-language models to generate descriptions of images and include them in the knowledge graph and vector database.
Conclusion: The Power of Hybrid Knowledge Systems
The hybrid knowledge system we have built demonstrates the power of combining complementary approaches to knowledge representation. The knowledge graph provides explicit, queryable relationships between entities, enabling users to understand how concepts connect. The vector database provides semantic search capabilities, finding relevant information based on meaning rather than exact keyword matches. Together, they create a system that is greater than the sum of its parts.
The bidirectional linking between the two systems is the key innovation. Users can start from either entry point and seamlessly navigate to the other. A semantic search might lead to discovering unexpected relationships in the knowledge graph. Exploring the graph might reveal relevant text chunks that provide detailed explanations.
This architecture is particularly powerful for technical documentation, code repositories, and knowledge bases where both structure and semantics matter. A developer exploring a codebase can ask natural language questions about functionality while also traversing the dependency graph to understand how components relate. A researcher exploring scientific literature can find semantically similar papers while also discovering citation networks and author relationships.
The system is also extensible and adaptable. The modular architecture makes it easy to swap components, add new entity extractors for different document types, or integrate different vector databases and graph databases. The clean separation of concerns means that improvements to one component do not require changes to others.
As Large Language Models continue to advance, hybrid systems like this will become increasingly important. Pure LLM-based systems struggle with factual accuracy and hallucination. Grounding LLM responses in a hybrid knowledge system that combines structured relationships with semantic search provides both accuracy and flexibility. The LLM can synthesize information from multiple sources while the underlying knowledge system ensures that the information is accurate and traceable to source documents.
Building such systems requires careful attention to data modeling, efficient indexing, and thoughtful user interface design. But the result is a powerful tool that transforms how users interact with large document collections, making knowledge more accessible and discoverable than ever before.
ADDENDUM ARCHITECTURE DIAGRAMS
System Architecture Diagram
+==============================================================================+
| DOCUMENT CHATBOT HYBRID KNOWLEDGE SYSTEM |
+==============================================================================+
USER INTERFACE LAYER
+------------------------------------------------------------------------------+
| |
| +------------------------------------------------------------------------+ |
| | DocumentChatbot (main.py) | |
| | | |
| | - Interactive Command Loop | |
| | - Document Processing Orchestration | |
| | - Component Initialization and Lifecycle Management | |
| +------------------------------------------------------------------------+ |
| | |
+---------------------------------------|--------------------------------------+
|
v
QUERY INTERFACE LAYER
+------------------------------------------------------------------------------+
| |
| +------------------------------------------------------------------------+ |
| | QueryInterface (query_interface.py) | |
| | | |
| | Methods: | |
| | - semantic_search() : Vector-based similarity search | |
| | - explore_entity() : Graph-based entity exploration | |
| | - hybrid_search() : Combined vector + graph search | |
| | - answer_question() : LLM-powered Q&A with context | |
| +------------------------------------------------------------------------+ |
| | | |
+--------------------------|------------------------------|--------------------+
| |
+-----------------+ +------------------+
| |
v v
+---------------------------+ +---------------------------+
| VECTOR STORE LAYER | | KNOWLEDGE GRAPH LAYER |
+---------------------------+ +---------------------------+
| | | |
| +----------------------+ | | +----------------------+ |
| | VectorStore | | | | KnowledgeGraph | |
| | (vector_store.py) | | | | (knowledge_graph.py) | |
| | | | | | | |
| | Methods: | | | | Methods: | |
| | - add_chunks() | | | | - add_entity() | |
| | - search() | | | | - add_relationship() | |
| | - get_by_ids() | | | | - get_entity() | |
| | - _generate_ | | | | - get_related_ | |
| | embeddings() | | | | entities() | |
| +----------------------+ | | | - search_entities_ | |
| | | | | by_type() | |
| v | | +----------------------+ |
| +----------------------+ | | | |
| | ChromaDB | | | v |
| | (External Service) | | | +----------------------+ |
| | | | | | Neo4j Database | |
| | - Stores embeddings | | | | (External Service) | |
| | - Cosine similarity | | | | | |
| | - Metadata indexing | | | | - Stores nodes/edges | |
| +----------------------+ | | | - Cypher queries | |
| ^ | | | - Graph traversal | |
| | | | +----------------------+ |
+-----------|---------------+ | ^ |
| +-----------|---------------+
| |
| |
+--------------------+ +-------------------+
| |
| BIDIRECTIONAL |
| LINKING |
| |
| - Embeddings |
| reference |
| graph nodes |
| |
| - Graph nodes |
| reference |
| embedding IDs |
| |
+-------------------+
^
|
PROCESSING PIPELINE LAYER
+------------------------------------------------------------------------------+
| |
| +------------------------------------------------------------------------+ |
| | DocumentProcessor (document_processor.py) | |
| | | |
| | Orchestrates: | |
| | - Document loading and parsing | |
| | - Entity extraction from code and text | |
| | - Text chunking with overlap | |
| | - Embedding generation and storage | |
| | - Knowledge graph population | |
| | - Bidirectional link creation | |
| +------------------------------------------------------------------------+ |
| | | | | |
+---------|--------------|----------------|----------------|-------------------+
| | | |
v v v v
+----------------+ +----------------+ +----------------+ +----------------+
| DocumentLoader | | Entity | | TextChunker | | Data Models |
| (document_ | | Extractors | | (text_ | | (models.py) |
| loader.py) | | (entity_ | | chunker.py) | | |
| | | extractor.py) | | | | Classes: |
| Methods: | | | | Methods: | | - Entity |
| - load_all_ | | Classes: | | - chunk_ | | - Relationship |
| documents() | | - CodeEntity | | document() | | - DocumentChunk|
| - _load_text_ | | Extractor | | - chunk_by_ | | - Processed |
| file() | | - TextEntity | | sections() | | Document |
| - _load_pdf_ | | Extractor | | - _generate_ | | |
| file() | | | | embeddings() | | Enums: |
| - _load_word_ | | Methods: | | | | - EntityType |
| file() | | - extract_from_| | Token-based | | - RelationType |
| | | python() | | chunking with | | |
| Supports: | | - extract_from_| | overlap for | | Provides type |
| - .txt, .md | | text() | | context | | safety and |
| - .py, .java | | - AST parsing | | preservation | | structure for |
| - .pdf, .docx | | - Pattern | | | | all data |
| - Code files | | matching | | | | objects |
+----------------+ +----------------+ +----------------+ +----------------+
| | | |
+--------------|----------------|----------------+
| |
v v
+---------------------------+
| EXTERNAL DEPENDENCIES |
+---------------------------+
| |
| - OpenAI API |
| * text-embedding-ada-002|
| * GPT-4 for Q&A |
| |
| - Python Libraries |
| * ast (code parsing) |
| * PyPDF2 (PDF reading) |
| * python-docx (Word) |
| * tiktoken (tokenizing) |
| * re (regex patterns) |
+---------------------------+
DATA FLOW DIAGRAM
=================
Document Ingestion Flow:
------------------------
Source Directory
|
| (1) Read files
v
DocumentLoader
|
| (2) Extract text content
v
DocumentProcessor
|
+---> (3a) Extract entities/relationships
| |
| v
| Entity Extractors
| |
| | (4a) Create Entity/Relationship objects
| v
| KnowledgeGraph.add_entity()
| KnowledgeGraph.add_relationship()
| |
| | (5a) Store in Neo4j
| v
| [Neo4j Database]
|
+---> (3b) Chunk text
| |
| v
| TextChunker
| |
| | (4b) Create DocumentChunk objects
| v
| VectorStore.add_chunks()
| |
| | (5b) Generate embeddings via OpenAI
| | (6b) Store in ChromaDB
| v
| [ChromaDB]
|
+---> (7) Link chunks to entities
| |
| v
| Update Entity.embedding_ids
| Update DocumentChunk.related_entities
| |
| | (8) Update bidirectional references
| v
| KnowledgeGraph.update_entity_embeddings()
Query Processing Flow:
---------------------
User Question
|
| (1) Submit query
v
QueryInterface.answer_question()
|
| (2) Perform hybrid search
v
QueryInterface.hybrid_search()
|
+---> (3a) Semantic search
| |
| v
| VectorStore.search()
| |
| | (4a) Generate query embedding
| | (5a) Find similar vectors
| v
| [ChromaDB returns chunks with metadata]
| |
| | (6a) Extract entity references
| v
| Entity names from chunk metadata
|
+---> (3b) Graph context retrieval
| |
| v
| KnowledgeGraph.get_related_entities()
| |
| | (4b) Traverse graph relationships
| v
| [Neo4j returns related entities]
|
| (7) Combine results
v
Build context from chunks + graph relationships
|
| (8) Generate answer
v
OpenAI GPT-4 API
|
| (9) Return synthesized answer
v
User receives answer
Entity Exploration Flow:
-----------------------
User selects entity
|
| (1) Request entity details
v
QueryInterface.explore_entity()
|
+---> (2a) Get entity from graph
| |
| v
| KnowledgeGraph.get_entity()
| |
| v
| [Neo4j returns entity node]
| |
| | (3a) Extract embedding_ids
| v
| Entity with embedding references
|
+---> (2b) Get related entities
| |
| v
| KnowledgeGraph.get_related_entities()
| |
| | (3b) Graph traversal (depth 2)
| v
| [Neo4j returns connected entities]
|
+---> (2c) Get associated text chunks
|
v
VectorStore.get_by_ids()
|
| (3c) Retrieve by embedding IDs
v
[ChromaDB returns text chunks]
|
| (4) Combine all information
v
Return entity details + related entities + text chunks
|
v
User sees comprehensive entity view
COMPONENT INTERACTION MATRIX
=============================
Document Entity Text Vector Knowledge Query
Loader Extractor Chunker Store Graph Interface
------- --------- ------- ------ --------- ---------
DocumentLoader [SELF] - - - - -
EntityExtractor USES [SELF] - - - -
TextChunker USES - [SELF] - - -
VectorStore - - USES [SELF] - USES
KnowledgeGraph - USES - - [SELF] USES
QueryInterface - - - USES USES [SELF]
DocumentProcessor USES USES USES USES USES -
STORAGE SCHEMA OVERVIEW
=======================
Neo4j Knowledge Graph Schema:
-----------------------------
Node: Entity
Properties:
- name: string (unique identifier)
- type: string (EntityType enum value)
- properties: string (serialized dict)
- embedding_ids: list of strings (references to vector DB)
Relationship: RELATES
Properties:
- type: string (RelationType enum value)
- properties: string (serialized dict)
Example Graph Structure:
(Module: "data_processor")
|
| [CONTAINS]
v
(Class: "DataProcessor")
|
| [CONTAINS]
v
(Function: "transform_data")
|
| [IMPLEMENTS]
v
(Concept: "data_validation")
ChromaDB Vector Store Schema:
-----------------------------
Collection: documents
Metadata per embedding:
- document_id: string
- chunk_index: integer
- file_path: string
- start_token: integer
- end_token: integer
- token_count: integer
- related_entities: comma-separated string
- section_index: integer (optional)
Vector: 1536-dimensional embedding (OpenAI ada-002)
Document: Original text chunk content
ID: "{document_id}_chunk_{chunk_index}"
TECHNOLOGY STACK
================
Programming Language:
- Python 3.8+
Graph Database:
- Neo4j 5.x
- neo4j-driver 5.14.0
- Cypher query language
Vector Database:
- ChromaDB 0.4.18
- HNSW indexing algorithm
- Cosine similarity metric
LLM Services:
- OpenAI API
- text-embedding-ada-002 (embeddings)
- GPT-4 (question answering)
Document Processing:
- PyPDF2 3.0.1 (PDF parsing)
- python-docx 1.1.0 (Word documents)
- ast (Python code parsing)
- tiktoken 0.5.1 (tokenization)
Application Framework:
- LangChain 0.1.0 (LLM abstractions)
- sentence-transformers 2.2.2 (alternative embeddings)
DEPLOYMENT ARCHITECTURE
=======================
Development Environment:
+------------------+
| Local Machine |
| |
| - Python App |
| - ChromaDB |
| (embedded) |
+------------------+
|
| Network
v
+------------------+
| Neo4j Server |
| (localhost:7687) |
+------------------+
|
| Network
v
+------------------+
| OpenAI API |
| (cloud service) |
+------------------+
Production Environment:
+------------------+ +------------------+ +------------------+
| Application | | Neo4j Cluster | | Vector DB |
| Server(s) |----->| (High Avail.) | | Service |
| | | | | (Pinecone/ |
| - Load Balancer | | - Primary | | Weaviate) |
| - Multiple | | - Replicas | | |
| Instances | | - Backup | | - Distributed |
+------------------+ +------------------+ +------------------+
| | |
+-------------------------+-------------------------+
|
v
+------------------+
| OpenAI API |
| (cloud service) |
+------------------+
This architecture diagram illustrates the complete system structure showing all major components, their relationships, data flows, and interactions. The system follows a layered architecture with clear separation of concerns between the user interface, query processing, storage layers, and document processing pipeline. The bidirectional linking between the vector store and knowledge graph is the central innovation that enables powerful hybrid queries combining semantic search with structural graph traversal.
No comments:
Post a Comment