INTRODUCTION
Large Language Models have revolutionized how we interact with technology. These powerful artificial intelligence systems can understand natural language, generate human-like responses, and assist users in countless ways. Integrating an LLM into your website can transform a static page into an intelligent, interactive experience. Whether you want to add a chatbot that answers questions about your products, create an AI assistant that helps users navigate your content, or build a smart search system that understands context, this guide will walk you through every step of the process.
This tutorial assumes you have never integrated an LLM before. We will cover both JavaScript-based and Python-based implementations, explain how to work with local models running on your own hardware as well as remote models accessed through APIs, and demonstrate how to use Retrieval-Augmented Generation to make your LLM aware of your website's specific content. By the end of this guide, you will have a complete understanding of how to build production-ready LLM-powered features for your web applications.
UNDERSTANDING THE FUNDAMENTALS
Before diving into code, we need to understand what we are working with. A Large Language Model is a neural network trained on vast amounts of text data. It learns patterns in language and can generate coherent, contextually appropriate responses to prompts. When you integrate an LLM into your website, you are essentially creating a bridge between your users and this AI system.
There are two primary ways to access LLMs. The first approach uses remote models hosted by providers like OpenAI, Anthropic, or Cohere. You send requests to their servers through an API, and they return responses. This method requires an internet connection and typically involves usage fees, but it eliminates the need for powerful hardware on your end. The second approach runs models locally on your own servers or even in the user's browser. This gives you complete control and privacy but requires sufficient computational resources.
Retrieval-Augmented Generation is a technique that enhances LLM responses by first retrieving relevant information from a knowledge base. Instead of relying solely on the model's training data, RAG systems search through your documents, find pertinent passages, and include them in the prompt sent to the LLM. This allows the model to provide accurate, up-to-date answers based on your specific content rather than generic knowledge.
SETTING UP YOUR DEVELOPMENT ENVIRONMENT
For Python-based implementations, you will need Python version 3.8 or higher installed on your system. Create a new project directory and set up a virtual environment to keep dependencies isolated. Open your terminal and navigate to your project folder, then execute the commands to create and activate a virtual environment. On Windows, the activation command differs slightly from Unix-based systems.
For JavaScript implementations, you will need Node.js version 14 or higher. Modern web development typically uses npm or yarn for package management. Initialize a new Node.js project in your directory by running the initialization command and following the prompts.
IMPLEMENTING A REMOTE LLM INTEGRATION IN PYTHON
Let us begin with a Python implementation using a remote LLM service. We will use OpenAI's API as our example, but the concepts apply to any provider. First, install the necessary packages using pip. You will need the OpenAI library for API access, Flask for creating a web server, and python-dotenv for managing environment variables securely.
# Install required packages
# pip install openai flask python-dotenv requests
Create a file named config.py to store configuration settings. This separates concerns and makes your code more maintainable. Never hardcode API keys directly in your source code. Instead, use environment variables that you load from a .env file.
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
OPENAI_MODEL = os.getenv('OPENAI_MODEL', 'gpt-3.5-turbo')
FLASK_SECRET_KEY = os.getenv('FLASK_SECRET_KEY', 'dev-secret-key')
MAX_TOKENS = int(os.getenv('MAX_TOKENS', '500'))
TEMPERATURE = float(os.getenv('TEMPERATURE', '0.7'))
The Config class loads environment variables with sensible defaults. The OPENAI_API_KEY must be set in your .env file. The model defaults to GPT-3.5 Turbo, which balances performance and cost. MAX_TOKENS limits response length, and TEMPERATURE controls randomness in responses. Lower temperatures produce more focused, deterministic outputs, while higher values increase creativity.
Now create the main application file, app.py. This file will contain your Flask web server and the logic for communicating with the LLM.
from flask import Flask, request, jsonify, render_template
from openai import OpenAI
from config import Config
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config.from_object(Config)
client = OpenAI(api_key=Config.OPENAI_API_KEY)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/chat', methods=['POST'])
def chat():
try:
data = request.get_json()
user_message = data.get('message', '')
if not user_message:
return jsonify({'error': 'No message provided'}), 400
logger.info(f"Received message: {user_message}")
response = client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=[
{"role": "system", "content": "You are a helpful assistant for our website."},
{"role": "user", "content": user_message}
],
max_tokens=Config.MAX_TOKENS,
temperature=Config.TEMPERATURE
)
assistant_message = response.choices[0].message.content
logger.info(f"Generated response: {assistant_message}")
return jsonify({
'response': assistant_message,
'model': Config.OPENAI_MODEL
})
except Exception as e:
logger.error(f"Error in chat endpoint: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
app.run(debug=True, port=5000)
This application creates two routes. The root route serves an HTML page where users interact with the chatbot. The chat route handles POST requests containing user messages. When a message arrives, the code validates it, sends it to OpenAI's API along with a system message that defines the assistant's behavior, and returns the response as JSON. Error handling ensures that problems are logged and users receive appropriate error messages rather than seeing the application crash.
The system message in the messages array is crucial. It sets the context and personality for the LLM. You can customize this to make the assistant behave differently. For example, if your website sells gardening supplies, you might use a system message like "You are a knowledgeable gardening expert helping customers choose the right plants and tools."
CREATING THE FRONTEND INTERFACE
The frontend provides the user interface for your chatbot. Create a templates directory in your project folder and add an index.html file. This file contains the HTML structure, styling, and JavaScript needed to communicate with your backend.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI Assistant Chat</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: Arial, sans-serif;
background-color: #f5f5f5;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
padding: 20px;
}
.chat-container {
width: 100%;
max-width: 600px;
background: white;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
display: flex;
flex-direction: column;
height: 600px;
}
.chat-header {
background: #007bff;
color: white;
padding: 20px;
border-radius: 10px 10px 0 0;
text-align: center;
}
.chat-messages {
flex: 1;
overflow-y: auto;
padding: 20px;
display: flex;
flex-direction: column;
gap: 10px;
}
.message {
padding: 10px 15px;
border-radius: 8px;
max-width: 80%;
word-wrap: break-word;
}
.user-message {
background: #007bff;
color: white;
align-self: flex-end;
}
.assistant-message {
background: #e9ecef;
color: #333;
align-self: flex-start;
}
.chat-input-container {
padding: 20px;
border-top: 1px solid #ddd;
display: flex;
gap: 10px;
}
.chat-input {
flex: 1;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
font-size: 14px;
}
.send-button {
padding: 10px 20px;
background: #007bff;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 14px;
}
.send-button:hover {
background: #0056b3;
}
.send-button:disabled {
background: #ccc;
cursor: not-allowed;
}
.loading {
color: #666;
font-style: italic;
align-self: flex-start;
}
</style>
</head>
<body>
<div class="chat-container">
<div class="chat-header">
<h2>AI Assistant</h2>
<p>Ask me anything!</p>
</div>
<div class="chat-messages" id="chatMessages"></div>
<div class="chat-input-container">
<input
type="text"
class="chat-input"
id="messageInput"
placeholder="Type your message..."
onkeypress="handleKeyPress(event)"
>
<button class="send-button" id="sendButton" onclick="sendMessage()">Send</button>
</div>
</div>
<script>
const chatMessages = document.getElementById('chatMessages');
const messageInput = document.getElementById('messageInput');
const sendButton = document.getElementById('sendButton');
function addMessage(content, isUser) {
const messageDiv = document.createElement('div');
messageDiv.className = isUser ? 'message user-message' : 'message assistant-message';
messageDiv.textContent = content;
chatMessages.appendChild(messageDiv);
chatMessages.scrollTop = chatMessages.scrollHeight;
}
function showLoading() {
const loadingDiv = document.createElement('div');
loadingDiv.className = 'loading';
loadingDiv.id = 'loadingIndicator';
loadingDiv.textContent = 'Thinking...';
chatMessages.appendChild(loadingDiv);
chatMessages.scrollTop = chatMessages.scrollHeight;
}
function hideLoading() {
const loadingDiv = document.getElementById('loadingIndicator');
if (loadingDiv) {
loadingDiv.remove();
}
}
async function sendMessage() {
const message = messageInput.value.trim();
if (!message) return;
addMessage(message, true);
messageInput.value = '';
sendButton.disabled = true;
showLoading();
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ message: message })
});
const data = await response.json();
hideLoading();
if (response.ok) {
addMessage(data.response, false);
} else {
addMessage('Sorry, there was an error processing your request.', false);
}
} catch (error) {
hideLoading();
addMessage('Sorry, could not connect to the server.', false);
} finally {
sendButton.disabled = false;
messageInput.focus();
}
}
function handleKeyPress(event) {
if (event.key === 'Enter') {
sendMessage();
}
}
messageInput.focus();
</script>
</body>
</html>
This HTML file creates a complete chat interface. The styling uses flexbox to create a responsive layout that works on different screen sizes. The JavaScript handles user interactions, sending messages to the backend via fetch API calls, and displaying responses. The loading indicator provides feedback while waiting for the LLM to respond. Error handling ensures that network failures or server errors are communicated to the user gracefully.
IMPLEMENTING A LOCAL LLM WITH OLLAMA
Running models locally gives you complete control and eliminates API costs. Ollama is an excellent tool for running open-source LLMs on your own hardware. It supports models like Llama, Mistral, and many others. First, install Ollama from their official website. Once installed, pull a model using the command line.
# Run in terminal: ollama pull llama2
Now modify your Python backend to use Ollama instead of OpenAI. Create a new file called llm_service.py to abstract the LLM interaction.
import requests
import json
from typing import List, Dict
from config import Config
import logging
logger = logging.getLogger(__name__)
class LLMService:
def __init__(self, use_local=True):
self.use_local = use_local
self.ollama_url = "http://localhost:11434/api/generate"
def generate_response(self, messages: List[Dict[str, str]]) -> str:
if self.use_local:
return self._generate_local(messages)
else:
return self._generate_remote(messages)
def _generate_local(self, messages: List[Dict[str, str]]) -> str:
try:
prompt = self._format_messages(messages)
payload = {
"model": "llama2",
"prompt": prompt,
"stream": False,
"options": {
"temperature": Config.TEMPERATURE,
"num_predict": Config.MAX_TOKENS
}
}
response = requests.post(self.ollama_url, json=payload)
response.raise_for_status()
result = response.json()
return result.get('response', '')
except Exception as e:
logger.error(f"Error generating local response: {str(e)}")
raise
def _generate_remote(self, messages: List[Dict[str, str]]) -> str:
from openai import OpenAI
client = OpenAI(api_key=Config.OPENAI_API_KEY)
try:
response = client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=messages,
max_tokens=Config.MAX_TOKENS,
temperature=Config.TEMPERATURE
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"Error generating remote response: {str(e)}")
raise
def _format_messages(self, messages: List[Dict[str, str]]) -> str:
formatted = ""
for msg in messages:
role = msg.get('role', '')
content = msg.get('content', '')
if role == 'system':
formatted += f"System: {content}\n\n"
elif role == 'user':
formatted += f"User: {content}\n\n"
elif role == 'assistant':
formatted += f"Assistant: {content}\n\n"
formatted += "Assistant: "
return formatted
The LLMService class provides a unified interface for both local and remote models. The generate_response method routes requests to the appropriate backend. For local models, it formats the conversation into a single prompt string because Ollama's generate endpoint expects a text prompt rather than a structured message array. The remote implementation uses the OpenAI client as before. This abstraction makes it easy to switch between providers or even support multiple providers simultaneously.
Update your app.py to use the new service.
from flask import Flask, request, jsonify, render_template
from llm_service import LLMService
from config import Config
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config.from_object(Config)
llm_service = LLMService(use_local=True)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/chat', methods=['POST'])
def chat():
try:
data = request.get_json()
user_message = data.get('message', '')
conversation_history = data.get('history', [])
if not user_message:
return jsonify({'error': 'No message provided'}), 400
messages = [
{"role": "system", "content": "You are a helpful assistant for our website."}
]
messages.extend(conversation_history)
messages.append({"role": "user", "content": user_message})
logger.info(f"Processing message with {len(messages)} total messages")
response = llm_service.generate_response(messages)
return jsonify({
'response': response,
'model': 'llama2' if llm_service.use_local else Config.OPENAI_MODEL
})
except Exception as e:
logger.error(f"Error in chat endpoint: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
app.run(debug=True, port=5000)
This updated version accepts conversation history from the frontend, allowing the LLM to maintain context across multiple exchanges. The frontend needs a small modification to track and send this history.
IMPLEMENTING RAG FOR CONTEXT-AWARE RESPONSES
Retrieval-Augmented Generation transforms your chatbot from a general assistant into a knowledgeable expert on your specific content. The process involves three main steps. First, you extract and chunk your documents into manageable pieces. Second, you convert these chunks into vector embeddings, which are numerical representations that capture semantic meaning. Third, when a user asks a question, you search for relevant chunks and include them in the prompt sent to the LLM.
Install the required packages for RAG functionality. You will need libraries for PDF processing, text splitting, vector storage, and embeddings.
# pip install pypdf langchain langchain-community sentence-transformers chromadb
Create a new file called document_processor.py to handle document ingestion and chunking.
import os
from typing import List
from pypdf import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import logging
logger = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self, chunk_size=1000, chunk_overlap=200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
def process_pdf(self, pdf_path: str) -> List[str]:
try:
reader = PdfReader(pdf_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n"
chunks = self.text_splitter.split_text(text)
logger.info(f"Processed {pdf_path}: {len(chunks)} chunks created")
return chunks
except Exception as e:
logger.error(f"Error processing PDF {pdf_path}: {str(e)}")
raise
def process_html(self, html_content: str) -> List[str]:
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
chunks = self.text_splitter.split_text(text)
logger.info(f"Processed HTML: {len(chunks)} chunks created")
return chunks
except Exception as e:
logger.error(f"Error processing HTML: {str(e)}")
raise
def process_directory(self, directory_path: str) -> List[dict]:
all_chunks = []
for filename in os.listdir(directory_path):
file_path = os.path.join(directory_path, filename)
if filename.endswith('.pdf'):
chunks = self.process_pdf(file_path)
for chunk in chunks:
all_chunks.append({
'content': chunk,
'source': filename,
'type': 'pdf'
})
elif filename.endswith('.html'):
with open(file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
chunks = self.process_html(html_content)
for chunk in chunks:
all_chunks.append({
'content': chunk,
'source': filename,
'type': 'html'
})
logger.info(f"Processed directory {directory_path}: {len(all_chunks)} total chunks")
return all_chunks
The DocumentProcessor class handles different document types. The chunk_size parameter determines how many characters each piece contains, while chunk_overlap ensures that context is not lost at chunk boundaries. The RecursiveCharacterTextSplitter tries to split at natural boundaries like paragraphs and sentences rather than cutting words in half. For PDFs, it extracts text from each page and combines them. For HTML, it uses BeautifulSoup to remove scripts and styling, leaving only the meaningful content.
Now create a vector_store.py file to handle embeddings and similarity search.
from typing import List, Dict
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
import logging
logger = logging.getLogger(__name__)
class VectorStore:
def __init__(self, collection_name="documents", persist_directory="./chroma_db"):
self.client = chromadb.Client(Settings(
persist_directory=persist_directory,
anonymized_telemetry=False
))
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
logger.info(f"Initialized VectorStore with collection: {collection_name}")
def add_documents(self, documents: List[Dict]):
texts = [doc['content'] for doc in documents]
metadatas = [{'source': doc['source'], 'type': doc['type']} for doc in documents]
ids = [f"doc_{i}" for i in range(len(documents))]
embeddings = self.embedding_model.encode(texts).tolist()
self.collection.add(
embeddings=embeddings,
documents=texts,
metadatas=metadatas,
ids=ids
)
logger.info(f"Added {len(documents)} documents to vector store")
def search(self, query: str, n_results=3) -> List[Dict]:
query_embedding = self.embedding_model.encode([query]).tolist()
results = self.collection.query(
query_embeddings=query_embedding,
n_results=n_results
)
formatted_results = []
if results['documents']:
for i, doc in enumerate(results['documents'][0]):
formatted_results.append({
'content': doc,
'metadata': results['metadatas'][0][i] if results['metadatas'] else {},
'distance': results['distances'][0][i] if results['distances'] else 0
})
logger.info(f"Search for '{query}' returned {len(formatted_results)} results")
return formatted_results
def clear(self):
self.client.delete_collection(self.collection.name)
self.collection = self.client.create_collection(
name=self.collection.name,
metadata={"hnsw:space": "cosine"}
)
logger.info("Cleared vector store")
The VectorStore class uses ChromaDB for efficient similarity search and SentenceTransformers for creating embeddings. The all-MiniLM-L6-v2 model is lightweight and fast while still producing quality embeddings. When you add documents, the class converts each text chunk into a vector embedding and stores it along with metadata about the source. The search method takes a query, converts it to an embedding, and finds the most similar document chunks using cosine similarity.
Create a rag_service.py file to tie everything together.
from typing import List, Dict
from document_processor import DocumentProcessor
from vector_store import VectorStore
from llm_service import LLMService
import logging
logger = logging.getLogger(__name__)
class RAGService:
def __init__(self, use_local_llm=True):
self.document_processor = DocumentProcessor()
self.vector_store = VectorStore()
self.llm_service = LLMService(use_local=use_local_llm)
def ingest_documents(self, directory_path: str):
logger.info(f"Starting document ingestion from {directory_path}")
documents = self.document_processor.process_directory(directory_path)
if documents:
self.vector_store.add_documents(documents)
logger.info(f"Successfully ingested {len(documents)} document chunks")
else:
logger.warning("No documents found to ingest")
def generate_response(self, query: str, conversation_history: List[Dict] = None) -> Dict:
if conversation_history is None:
conversation_history = []
relevant_docs = self.vector_store.search(query, n_results=3)
context = self._build_context(relevant_docs)
system_message = self._create_system_message(context)
messages = [{"role": "system", "content": system_message}]
messages.extend(conversation_history)
messages.append({"role": "user", "content": query})
response = self.llm_service.generate_response(messages)
return {
'response': response,
'sources': [doc['metadata'] for doc in relevant_docs],
'context_used': len(relevant_docs) > 0
}
def _build_context(self, documents: List[Dict]) -> str:
if not documents:
return ""
context_parts = ["Here is relevant information from our documents:\n"]
for i, doc in enumerate(documents, 1):
source = doc['metadata'].get('source', 'Unknown')
content = doc['content']
context_parts.append(f"\nDocument {i} (from {source}):\n{content}\n")
return "\n".join(context_parts)
def _create_system_message(self, context: str) -> str:
base_message = "You are a helpful assistant for our website. "
if context:
return (
f"{base_message}Use the following information from our documents "
f"to provide accurate and helpful answers. If the information is not "
f"in the provided context, you can use your general knowledge but "
f"indicate that you're doing so.\n\n{context}"
)
else:
return f"{base_message}Answer questions to the best of your ability."
The RAGService orchestrates the entire RAG pipeline. The ingest_documents method processes all documents in a directory and stores them in the vector database. The generate_response method performs retrieval and generation. It searches for relevant documents, builds a context string from the results, creates an enhanced system message that includes this context, and sends everything to the LLM. The response includes not just the generated text but also information about which sources were used, allowing you to display citations to users.
Update your Flask application to use the RAG service.
from flask import Flask, request, jsonify, render_template
from rag_service import RAGService
from config import Config
import logging
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config.from_object(Config)
rag_service = RAGService(use_local_llm=True)
DOCUMENTS_DIR = os.path.join(os.path.dirname(__file__), 'documents')
if os.path.exists(DOCUMENTS_DIR):
rag_service.ingest_documents(DOCUMENTS_DIR)
else:
logger.warning(f"Documents directory not found: {DOCUMENTS_DIR}")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/chat', methods=['POST'])
def chat():
try:
data = request.get_json()
user_message = data.get('message', '')
conversation_history = data.get('history', [])
if not user_message:
return jsonify({'error': 'No message provided'}), 400
logger.info(f"Processing RAG query: {user_message}")
result = rag_service.generate_response(user_message, conversation_history)
return jsonify({
'response': result['response'],
'sources': result['sources'],
'context_used': result['context_used']
})
except Exception as e:
logger.error(f"Error in chat endpoint: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/ingest', methods=['POST'])
def ingest():
try:
data = request.get_json()
directory = data.get('directory', DOCUMENTS_DIR)
if not os.path.exists(directory):
return jsonify({'error': 'Directory not found'}), 404
rag_service.ingest_documents(directory)
return jsonify({'message': 'Documents ingested successfully'})
except Exception as e:
logger.error(f"Error in ingest endpoint: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
app.run(debug=True, port=5000)
This application automatically ingests documents from a documents directory when it starts. You can also trigger ingestion manually through the ingest endpoint. Create a documents folder in your project directory and add PDF or HTML files. The system will process them and make their content available for retrieval.
IMPLEMENTING A JAVASCRIPT-BASED SOLUTION
JavaScript implementations allow you to create entirely client-side AI experiences or build Node.js backends. Let us explore both approaches. For a Node.js backend similar to our Python implementation, start by installing the necessary packages.
// Install with: npm install express openai dotenv pdf-parse cheerio
Create a config.js file for configuration management.
require('dotenv').config();
module.exports = {
OPENAI_API_KEY: process.env.OPENAI_API_KEY,
OPENAI_MODEL: process.env.OPENAI_MODEL || 'gpt-3.5-turbo',
PORT: process.env.PORT || 3000,
MAX_TOKENS: parseInt(process.env.MAX_TOKENS) || 500,
TEMPERATURE: parseFloat(process.env.TEMPERATURE) || 0.7
};
Create a server.js file for your Express application.
const express = require('express');
const OpenAI = require('openai');
const config = require('./config');
const path = require('path');
const app = express();
const openai = new OpenAI({ apiKey: config.OPENAI_API_KEY });
app.use(express.json());
app.use(express.static('public'));
app.get('/', (req, res) => {
res.sendFile(path.join(__dirname, 'public', 'index.html'));
});
app.post('/api/chat', async (req, res) => {
try {
const { message, history = [] } = req.body;
if (!message) {
return res.status(400).json({ error: 'No message provided' });
}
console.log(`Received message: ${message}`);
const messages = [
{ role: 'system', content: 'You are a helpful assistant for our website.' },
...history,
{ role: 'user', content: message }
];
const completion = await openai.chat.completions.create({
model: config.OPENAI_MODEL,
messages: messages,
max_tokens: config.MAX_TOKENS,
temperature: config.TEMPERATURE
});
const response = completion.choices[0].message.content;
console.log(`Generated response: ${response}`);
res.json({
response: response,
model: config.OPENAI_MODEL
});
} catch (error) {
console.error('Error in chat endpoint:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
app.listen(config.PORT, () => {
console.log(`Server running on port ${config.PORT}`);
});
This Node.js implementation mirrors the Python version. Express handles routing, the OpenAI library manages API communication, and the structure follows the same patterns. The async/await syntax makes asynchronous operations clean and readable.
For RAG functionality in Node.js, you need additional libraries for document processing and vector storage. While the ecosystem is less mature than Python's, viable options exist.
// Install with: npm install @xenova/transformers pdf-parse cheerio
Create a documentProcessor.js file.
const fs = require('fs').promises;
const path = require('path');
const pdfParse = require('pdf-parse');
const cheerio = require('cheerio');
class DocumentProcessor {
constructor(chunkSize = 1000, chunkOverlap = 200) {
this.chunkSize = chunkSize;
this.chunkOverlap = chunkOverlap;
}
async processPDF(filePath) {
try {
const dataBuffer = await fs.readFile(filePath);
const data = await pdfParse(dataBuffer);
const text = data.text;
const chunks = this.splitText(text);
console.log(`Processed ${filePath}: ${chunks.length} chunks created`);
return chunks;
} catch (error) {
console.error(`Error processing PDF ${filePath}:`, error);
throw error;
}
}
async processHTML(htmlContent) {
try {
const $ = cheerio.load(htmlContent);
$('script, style').remove();
const text = $('body').text();
const cleanText = text.replace(/\s+/g, ' ').trim();
const chunks = this.splitText(cleanText);
console.log(`Processed HTML: ${chunks.length} chunks created`);
return chunks;
} catch (error) {
console.error('Error processing HTML:', error);
throw error;
}
}
async processDirectory(directoryPath) {
const allChunks = [];
const files = await fs.readdir(directoryPath);
for (const filename of files) {
const filePath = path.join(directoryPath, filename);
if (filename.endsWith('.pdf')) {
const chunks = await this.processPDF(filePath);
chunks.forEach(chunk => {
allChunks.push({
content: chunk,
source: filename,
type: 'pdf'
});
});
} else if (filename.endsWith('.html')) {
const htmlContent = await fs.readFile(filePath, 'utf-8');
const chunks = await this.processHTML(htmlContent);
chunks.forEach(chunk => {
allChunks.push({
content: chunk,
source: filename,
type: 'html'
});
});
}
}
console.log(`Processed directory ${directoryPath}: ${allChunks.length} total chunks`);
return allChunks;
}
splitText(text) {
const chunks = [];
let start = 0;
while (start < text.length) {
let end = start + this.chunkSize;
if (end < text.length) {
const lastPeriod = text.lastIndexOf('.', end);
const lastNewline = text.lastIndexOf('\n', end);
const lastSpace = text.lastIndexOf(' ', end);
const breakPoint = Math.max(lastPeriod, lastNewline, lastSpace);
if (breakPoint > start) {
end = breakPoint + 1;
}
}
chunks.push(text.slice(start, end).trim());
start = end - this.chunkOverlap;
}
return chunks.filter(chunk => chunk.length > 0);
}
}
module.exports = DocumentProcessor;
The JavaScript version implements similar chunking logic. The splitText method tries to break at sentence boundaries to maintain coherence. The async/await pattern handles file I/O cleanly.
For embeddings and vector search in JavaScript, you can use the Transformers.js library, which runs models directly in Node.js.
const { pipeline } = require('@xenova/transformers');
class VectorStore {
constructor() {
this.documents = [];
this.embeddings = [];
this.embeddingPipeline = null;
}
async initialize() {
this.embeddingPipeline = await pipeline(
'feature-extraction',
'Xenova/all-MiniLM-L6-v2'
);
console.log('VectorStore initialized');
}
async addDocuments(documents) {
for (const doc of documents) {
const embedding = await this.embed(doc.content);
this.documents.push(doc);
this.embeddings.push(embedding);
}
console.log(`Added ${documents.length} documents to vector store`);
}
async embed(text) {
const output = await this.embeddingPipeline(text, {
pooling: 'mean',
normalize: true
});
return Array.from(output.data);
}
cosineSimilarity(a, b) {
let dotProduct = 0;
let normA = 0;
let normB = 0;
for (let i = 0; i < a.length; i++) {
dotProduct += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
}
async search(query, nResults = 3) {
const queryEmbedding = await this.embed(query);
const similarities = this.embeddings.map((embedding, index) => ({
index: index,
similarity: this.cosineSimilarity(queryEmbedding, embedding)
}));
similarities.sort((a, b) => b.similarity - a.similarity);
const results = similarities.slice(0, nResults).map(item => ({
content: this.documents[item.index].content,
metadata: {
source: this.documents[item.index].source,
type: this.documents[item.index].type
},
similarity: item.similarity
}));
console.log(`Search for '${query}' returned ${results.length} results`);
return results;
}
clear() {
this.documents = [];
this.embeddings = [];
console.log('Cleared vector store');
}
}
module.exports = VectorStore;
This JavaScript implementation stores embeddings in memory. For production use with large document sets, you would want to use a proper vector database like Pinecone or Weaviate. The cosineSimilarity method implements the mathematical formula for comparing vectors.
BROWSER-BASED LLM INTEGRATION
Modern browsers can run smaller LLMs directly using WebAssembly and WebGPU. This approach eliminates server costs and provides instant responses. The Transformers.js library supports browser environments.
Create an HTML file that runs an LLM entirely in the browser.
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Browser-Based AI Chat</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: Arial, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
padding: 20px;
}
.container {
background: white;
border-radius: 15px;
box-shadow: 0 10px 40px rgba(0,0,0,0.2);
width: 100%;
max-width: 700px;
padding: 30px;
}
h1 {
color: #333;
margin-bottom: 10px;
}
.status {
color: #666;
margin-bottom: 20px;
font-size: 14px;
}
.chat-area {
border: 1px solid #ddd;
border-radius: 8px;
height: 400px;
overflow-y: auto;
padding: 15px;
margin-bottom: 20px;
background: #f9f9f9;
}
.message {
margin-bottom: 15px;
padding: 10px 15px;
border-radius: 8px;
max-width: 80%;
}
.user-message {
background: #667eea;
color: white;
margin-left: auto;
}
.bot-message {
background: white;
border: 1px solid #ddd;
}
.input-area {
display: flex;
gap: 10px;
}
input {
flex: 1;
padding: 12px;
border: 1px solid #ddd;
border-radius: 8px;
font-size: 14px;
}
button {
padding: 12px 24px;
background: #667eea;
color: white;
border: none;
border-radius: 8px;
cursor: pointer;
font-size: 14px;
font-weight: bold;
}
button:hover {
background: #5568d3;
}
button:disabled {
background: #ccc;
cursor: not-allowed;
}
.loading {
color: #666;
font-style: italic;
}
</style>
</head>
<body>
<div class="container">
<h1>Browser-Based AI Assistant</h1>
<div class="status" id="status">Initializing AI model...</div>
<div class="chat-area" id="chatArea"></div>
<div class="input-area">
<input
type="text"
id="userInput"
placeholder="Type your message..."
disabled
>
<button id="sendButton" disabled>Send</button>
</div>
</div>
<script type="module">
import { pipeline, env } from 'https://cdn.jsdelivr.net/npm/@xenova/transformers@2.6.0';
env.allowLocalModels = false;
const statusEl = document.getElementById('status');
const chatArea = document.getElementById('chatArea');
const userInput = document.getElementById('userInput');
const sendButton = document.getElementById('sendButton');
let generator;
async function initializeModel() {
try {
statusEl.textContent = 'Loading AI model (this may take a minute)...';
generator = await pipeline(
'text-generation',
'Xenova/gpt2'
);
statusEl.textContent = 'AI model ready! Start chatting below.';
userInput.disabled = false;
sendButton.disabled = false;
userInput.focus();
} catch (error) {
statusEl.textContent = 'Error loading model. Please refresh the page.';
console.error('Model initialization error:', error);
}
}
function addMessage(content, isUser) {
const messageDiv = document.createElement('div');
messageDiv.className = `message ${isUser ? 'user-message' : 'bot-message'}`;
messageDiv.textContent = content;
chatArea.appendChild(messageDiv);
chatArea.scrollTop = chatArea.scrollHeight;
}
function showLoading() {
const loadingDiv = document.createElement('div');
loadingDiv.className = 'loading';
loadingDiv.id = 'loadingIndicator';
loadingDiv.textContent = 'AI is thinking...';
chatArea.appendChild(loadingDiv);
chatArea.scrollTop = chatArea.scrollHeight;
}
function hideLoading() {
const loadingDiv = document.getElementById('loadingIndicator');
if (loadingDiv) {
loadingDiv.remove();
}
}
async function sendMessage() {
const message = userInput.value.trim();
if (!message) return;
addMessage(message, true);
userInput.value = '';
sendButton.disabled = true;
showLoading();
try {
const result = await generator(message, {
max_new_tokens: 50,
temperature: 0.7,
do_sample: true
});
hideLoading();
const response = result[0].generated_text;
const cleanResponse = response.replace(message, '').trim();
addMessage(cleanResponse || 'I understand. How can I help you further?', false);
} catch (error) {
hideLoading();
addMessage('Sorry, I encountered an error. Please try again.', false);
console.error('Generation error:', error);
} finally {
sendButton.disabled = false;
userInput.focus();
}
}
sendButton.addEventListener('click', sendMessage);
userInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
sendMessage();
}
});
initializeModel();
</script>
</body>
</html>
This browser-based implementation downloads and runs a GPT-2 model entirely in the user's browser. The first load takes time as the model downloads, but subsequent interactions are instant. This approach works best for smaller models. Larger, more capable models require too much memory and processing power for most browsers.
PRODUCTION CONSIDERATIONS AND BEST PRACTICES
When deploying LLM-powered features to production, several important considerations arise. Security is paramount. Never expose API keys in client-side code. Always proxy requests through your backend server. Implement rate limiting to prevent abuse and control costs. The following code shows a simple rate limiter for Flask.
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@app.route('/api/chat', methods=['POST'])
@limiter.limit("10 per minute")
def chat():
# Your existing chat logic
pass
For Node.js, use the express-rate-limit package.
const rateLimit = require('express-rate-limit');
const chatLimiter = rateLimit({
windowMs: 60 * 1000,
max: 10,
message: 'Too many requests, please try again later.'
});
app.post('/api/chat', chatLimiter, async (req, res) => {
// Your existing chat logic
});
Implement proper error handling and logging. Use structured logging to track usage patterns, errors, and performance metrics. Monitor your costs carefully, especially with pay-per-token services. Set up alerts for unusual usage patterns.
Caching can significantly reduce costs and improve response times. For frequently asked questions, cache responses and serve them directly without calling the LLM. Here is a simple Redis-based cache for Python.
import redis
import json
import hashlib
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cache_key(message):
return hashlib.md5(message.encode()).hexdigest()
def get_cached_response(message):
key = get_cache_key(message)
cached = redis_client.get(key)
if cached:
return json.loads(cached)
return None
def cache_response(message, response):
key = get_cache_key(message)
redis_client.setex(key, 3600, json.dumps(response))
@app.route('/api/chat', methods=['POST'])
def chat():
data = request.get_json()
user_message = data.get('message', '')
cached = get_cached_response(user_message)
if cached:
return jsonify(cached)
# Generate response using LLM
response = generate_llm_response(user_message)
cache_response(user_message, response)
return jsonify(response)
For RAG systems, keep your vector database updated. Implement a scheduled job that re-ingests documents periodically to capture updates. Monitor the quality of retrieved documents and adjust chunk sizes or retrieval parameters if needed.
User privacy is critical. If your application processes sensitive information, ensure that you comply with relevant regulations like GDPR or HIPAA. Consider running local models for sensitive use cases to avoid sending data to third-party services. Implement proper data retention policies and allow users to delete their conversation history.
Performance optimization matters for user experience. For remote APIs, implement streaming responses so users see text appear progressively rather than waiting for the complete response. Here is how to implement streaming with OpenAI's API in Python.
from flask import Response, stream_with_context
@app.route('/api/chat/stream', methods=['POST'])
def chat_stream():
data = request.get_json()
user_message = data.get('message', '')
def generate():
stream = client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_message}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}\n\n"
yield "data: [DONE]\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
The frontend needs to handle Server-Sent Events to display streaming responses.
async function sendMessageStreaming(message) {
const eventSource = new EventSource(`/api/chat/stream?message=${encodeURIComponent(message)}`);
let fullResponse = '';
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
fullResponse += data.content;
updateMessageDisplay(fullResponse);
};
eventSource.onerror = (error) => {
console.error('Streaming error:', error);
eventSource.close();
};
}
COMPLETE PRODUCTION-READY EXAMPLE
The following complete example integrates everything we have discussed into a production-ready application. This implementation includes a Python Flask backend with RAG capabilities, proper error handling, rate limiting, caching, and a polished frontend interface.
# app.py - Main application file
import os
import sys
import logging
from datetime import datetime
from flask import Flask, request, jsonify, render_template, Response, stream_with_context
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
import redis
import json
import hashlib
from typing import List, Dict, Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Import custom modules
from config import Config
from document_processor import DocumentProcessor
from vector_store import VectorStore
from llm_service import LLMService
# Initialize Flask application
app = Flask(__name__)
app.config.from_object(Config)
CORS(app)
# Initialize rate limiter
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["1000 per day", "100 per hour"],
storage_uri="memory://"
)
# Initialize Redis for caching
try:
redis_client = redis.Redis(
host=Config.REDIS_HOST,
port=Config.REDIS_PORT,
db=0,
decode_responses=True
)
redis_client.ping()
logger.info("Redis connection established")
except Exception as e:
logger.warning(f"Redis connection failed: {e}. Caching disabled.")
redis_client = None
# Initialize services
document_processor = DocumentProcessor(
chunk_size=Config.CHUNK_SIZE,
chunk_overlap=Config.CHUNK_OVERLAP
)
vector_store = VectorStore(
collection_name=Config.COLLECTION_NAME,
persist_directory=Config.VECTOR_DB_PATH
)
llm_service = LLMService(
use_local=Config.USE_LOCAL_LLM,
model_name=Config.LLM_MODEL
)
# Cache utilities
def get_cache_key(message: str, use_rag: bool = True) -> str:
content = f"{message}:{use_rag}"
return f"chat:{hashlib.md5(content.encode()).hexdigest()}"
def get_cached_response(message: str, use_rag: bool = True) -> Optional[Dict]:
if not redis_client:
return None
try:
key = get_cache_key(message, use_rag)
cached = redis_client.get(key)
if cached:
logger.info(f"Cache hit for message: {message[:50]}...")
return json.loads(cached)
except Exception as e:
logger.error(f"Cache retrieval error: {e}")
return None
def cache_response(message: str, response: Dict, use_rag: bool = True, ttl: int = 3600):
if not redis_client:
return
try:
key = get_cache_key(message, use_rag)
redis_client.setex(key, ttl, json.dumps(response))
logger.info(f"Cached response for message: {message[:50]}...")
except Exception as e:
logger.error(f"Cache storage error: {e}")
# RAG Service
class RAGService:
def __init__(self):
self.document_processor = document_processor
self.vector_store = vector_store
self.llm_service = llm_service
def ingest_documents(self, directory_path: str) -> Dict:
try:
logger.info(f"Starting document ingestion from {directory_path}")
if not os.path.exists(directory_path):
raise ValueError(f"Directory not found: {directory_path}")
documents = self.document_processor.process_directory(directory_path)
if not documents:
logger.warning("No documents found to ingest")
return {"status": "warning", "message": "No documents found", "count": 0}
self.vector_store.add_documents(documents)
logger.info(f"Successfully ingested {len(documents)} document chunks")
return {
"status": "success",
"message": f"Ingested {len(documents)} document chunks",
"count": len(documents)
}
except Exception as e:
logger.error(f"Document ingestion error: {e}")
raise
def generate_response(
self,
query: str,
conversation_history: List[Dict] = None,
use_rag: bool = True
) -> Dict:
try:
if conversation_history is None:
conversation_history = []
context = ""
sources = []
if use_rag:
relevant_docs = self.vector_store.search(query, n_results=Config.RAG_TOP_K)
if relevant_docs:
context = self._build_context(relevant_docs)
sources = [
{
"source": doc['metadata'].get('source', 'Unknown'),
"type": doc['metadata'].get('type', 'Unknown'),
"relevance": doc.get('distance', 0)
}
for doc in relevant_docs
]
system_message = self._create_system_message(context)
messages = [{"role": "system", "content": system_message}]
messages.extend(conversation_history[-Config.MAX_HISTORY:])
messages.append({"role": "user", "content": query})
response = self.llm_service.generate_response(messages)
return {
"response": response,
"sources": sources,
"context_used": len(sources) > 0,
"model": self.llm_service.model_name,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Response generation error: {e}")
raise
def _build_context(self, documents: List[Dict]) -> str:
if not documents:
return ""
context_parts = ["Here is relevant information from our documents:\n"]
for i, doc in enumerate(documents, 1):
source = doc['metadata'].get('source', 'Unknown')
content = doc['content']
context_parts.append(f"\n[Document {i} from {source}]:\n{content}\n")
return "\n".join(context_parts)
def _create_system_message(self, context: str) -> str:
base_message = Config.SYSTEM_MESSAGE
if context:
return (
f"{base_message}\n\n"
f"Use the following information from our documents to provide accurate answers. "
f"If the information is not in the provided context, you can use your general "
f"knowledge but clearly indicate that you're doing so.\n\n{context}"
)
else:
return base_message
# Initialize RAG service
rag_service = RAGService()
# Ingest documents on startup
DOCUMENTS_DIR = Config.DOCUMENTS_DIR
if os.path.exists(DOCUMENTS_DIR):
try:
result = rag_service.ingest_documents(DOCUMENTS_DIR)
logger.info(f"Initial document ingestion: {result}")
except Exception as e:
logger.error(f"Initial document ingestion failed: {e}")
else:
logger.warning(f"Documents directory not found: {DOCUMENTS_DIR}")
os.makedirs(DOCUMENTS_DIR, exist_ok=True)
# Routes
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/health', methods=['GET'])
def health_check():
return jsonify({
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"services": {
"llm": "operational",
"vector_store": "operational",
"cache": "operational" if redis_client else "disabled"
}
})
@app.route('/api/chat', methods=['POST'])
@limiter.limit("20 per minute")
def chat():
try:
data = request.get_json()
if not data:
return jsonify({"error": "No data provided"}), 400
user_message = data.get('message', '').strip()
conversation_history = data.get('history', [])
use_rag = data.get('use_rag', True)
if not user_message:
return jsonify({"error": "No message provided"}), 400
if len(user_message) > Config.MAX_MESSAGE_LENGTH:
return jsonify({"error": "Message too long"}), 400
logger.info(f"Processing chat request: {user_message[:100]}...")
# Check cache
cached_response = get_cached_response(user_message, use_rag)
if cached_response:
return jsonify(cached_response)
# Generate response
result = rag_service.generate_response(
query=user_message,
conversation_history=conversation_history,
use_rag=use_rag
)
# Cache response
cache_response(user_message, result, use_rag)
return jsonify(result)
except Exception as e:
logger.error(f"Chat endpoint error: {e}", exc_info=True)
return jsonify({"error": "Internal server error"}), 500
@app.route('/api/chat/stream', methods=['POST'])
@limiter.limit("10 per minute")
def chat_stream():
try:
data = request.get_json()
user_message = data.get('message', '').strip()
conversation_history = data.get('history', [])
use_rag = data.get('use_rag', True)
if not user_message:
return jsonify({"error": "No message provided"}), 400
logger.info(f"Processing streaming chat request: {user_message[:100]}...")
def generate():
try:
context = ""
sources = []
if use_rag:
relevant_docs = vector_store.search(user_message, n_results=Config.RAG_TOP_K)
if relevant_docs:
context = rag_service._build_context(relevant_docs)
sources = [doc['metadata'] for doc in relevant_docs]
system_message = rag_service._create_system_message(context)
messages = [{"role": "system", "content": system_message}]
messages.extend(conversation_history[-Config.MAX_HISTORY:])
messages.append({"role": "user", "content": user_message})
# Send sources first
yield f"data: {json.dumps({'type': 'sources', 'data': sources})}\n\n"
# Stream response
for chunk in llm_service.generate_response_stream(messages):
yield f"data: {json.dumps({'type': 'content', 'data': chunk})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Streaming error: {e}")
yield f"data: {json.dumps({'type': 'error', 'data': str(e)})}\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
except Exception as e:
logger.error(f"Stream endpoint error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route('/api/documents/ingest', methods=['POST'])
@limiter.limit("5 per hour")
def ingest_documents():
try:
data = request.get_json()
directory = data.get('directory', DOCUMENTS_DIR)
if not os.path.exists(directory):
return jsonify({"error": "Directory not found"}), 404
result = rag_service.ingest_documents(directory)
# Clear cache after ingestion
if redis_client:
try:
redis_client.flushdb()
logger.info("Cache cleared after document ingestion")
except Exception as e:
logger.error(f"Cache clear error: {e}")
return jsonify(result)
except Exception as e:
logger.error(f"Ingest endpoint error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.route('/api/documents/list', methods=['GET'])
def list_documents():
try:
if not os.path.exists(DOCUMENTS_DIR):
return jsonify({"documents": []})
documents = []
for filename in os.listdir(DOCUMENTS_DIR):
file_path = os.path.join(DOCUMENTS_DIR, filename)
if os.path.isfile(file_path):
documents.append({
"name": filename,
"size": os.path.getsize(file_path),
"modified": datetime.fromtimestamp(
os.path.getmtime(file_path)
).isoformat()
})
return jsonify({"documents": documents})
except Exception as e:
logger.error(f"List documents error: {e}")
return jsonify({"error": "Internal server error"}), 500
@app.errorhandler(429)
def ratelimit_handler(e):
return jsonify({"error": "Rate limit exceeded. Please try again later."}), 429
@app.errorhandler(500)
def internal_error_handler(e):
logger.error(f"Internal server error: {e}")
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
app.run(
host=Config.HOST,
port=Config.PORT,
debug=Config.DEBUG
)
—-
# config.py - Configuration management
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# Flask configuration
SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', '5000'))
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
# LLM configuration
USE_LOCAL_LLM = os.getenv('USE_LOCAL_LLM', 'True').lower() == 'true'
LLM_MODEL = os.getenv('LLM_MODEL', 'llama2')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', '')
OPENAI_MODEL = os.getenv('OPENAI_MODEL', 'gpt-3.5-turbo')
# Ollama configuration
OLLAMA_URL = os.getenv('OLLAMA_URL', 'http://localhost:11434')
# Generation parameters
MAX_TOKENS = int(os.getenv('MAX_TOKENS', '500'))
TEMPERATURE = float(os.getenv('TEMPERATURE', '0.7'))
MAX_MESSAGE_LENGTH = int(os.getenv('MAX_MESSAGE_LENGTH', '2000'))
MAX_HISTORY = int(os.getenv('MAX_HISTORY', '10'))
# RAG configuration
CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', '1000'))
CHUNK_OVERLAP = int(os.getenv('CHUNK_OVERLAP', '200'))
RAG_TOP_K = int(os.getenv('RAG_TOP_K', '3'))
# Vector database configuration
VECTOR_DB_PATH = os.getenv('VECTOR_DB_PATH', './chroma_db')
COLLECTION_NAME = os.getenv('COLLECTION_NAME', 'documents')
# Documents directory
DOCUMENTS_DIR = os.getenv('DOCUMENTS_DIR', './documents')
# Redis configuration
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
# System message
SYSTEM_MESSAGE = os.getenv(
'SYSTEM_MESSAGE',
'You are a helpful, knowledgeable assistant. Provide clear, accurate, '
'and concise answers. When you use information from provided documents, '
'be specific about what you found. If you are unsure or the information '
'is not available, say so honestly.'
)
# document_processor.py - Document processing utilities
import os
from typing import List, Dict
from pypdf import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from bs4 import BeautifulSoup
import logging
logger = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
def process_pdf(self, pdf_path: str) -> List[str]:
try:
reader = PdfReader(pdf_path)
text = ""
for page_num, page in enumerate(reader.pages):
page_text = page.extract_text()
if page_text:
text += f"\n--- Page {page_num + 1} ---\n{page_text}"
if not text.strip():
logger.warning(f"No text extracted from PDF: {pdf_path}")
return []
chunks = self.text_splitter.split_text(text)
logger.info(f"Processed PDF {pdf_path}: {len(chunks)} chunks created")
return chunks
except Exception as e:
logger.error(f"Error processing PDF {pdf_path}: {e}")
raise
def process_html(self, html_content: str) -> List[str]:
try:
soup = BeautifulSoup(html_content, 'html.parser')
for element in soup(['script', 'style', 'nav', 'footer', 'header']):
element.decompose()
text = soup.get_text(separator='\n', strip=True)
if not text.strip():
logger.warning("No text extracted from HTML")
return []
chunks = self.text_splitter.split_text(text)
logger.info(f"Processed HTML: {len(chunks)} chunks created")
return chunks
except Exception as e:
logger.error(f"Error processing HTML: {e}")
raise
def process_text(self, text_content: str) -> List[str]:
try:
if not text_content.strip():
logger.warning("Empty text content provided")
return []
chunks = self.text_splitter.split_text(text_content)
logger.info(f"Processed text: {len(chunks)} chunks created")
return chunks
except Exception as e:
logger.error(f"Error processing text: {e}")
raise
def process_file(self, file_path: str) -> List[Dict]:
filename = os.path.basename(file_path)
file_ext = os.path.splitext(filename)[1].lower()
chunks = []
try:
if file_ext == '.pdf':
chunks = self.process_pdf(file_path)
file_type = 'pdf'
elif file_ext in ['.html', '.htm']:
with open(file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
chunks = self.process_html(html_content)
file_type = 'html'
elif file_ext == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
text_content = f.read()
chunks = self.process_text(text_content)
file_type = 'text'
else:
logger.warning(f"Unsupported file type: {file_ext}")
return []
return [
{
'content': chunk,
'source': filename,
'type': file_type,
'chunk_index': i
}
for i, chunk in enumerate(chunks)
]
except Exception as e:
logger.error(f"Error processing file {file_path}: {e}")
return []
def process_directory(self, directory_path: str) -> List[Dict]:
all_chunks = []
if not os.path.exists(directory_path):
logger.error(f"Directory not found: {directory_path}")
return []
for filename in os.listdir(directory_path):
file_path = os.path.join(directory_path, filename)
if os.path.isfile(file_path):
file_chunks = self.process_file(file_path)
all_chunks.extend(file_chunks)
logger.info(
f"Processed directory {directory_path}: "
f"{len(all_chunks)} total chunks from {len(set(c['source'] for c in all_chunks))} files"
)
return all_chunks
# vector_store.py - Vector storage and retrieval
from typing import List, Dict
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
import logging
logger = logging.getLogger(__name__)
class VectorStore:
def __init__(self, collection_name: str = "documents", persist_directory: str = "./chroma_db"):
self.persist_directory = persist_directory
self.collection_name = collection_name
self.client = chromadb.Client(Settings(
persist_directory=persist_directory,
anonymized_telemetry=False
))
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
logger.info(f"Initialized VectorStore with collection: {collection_name}")
def add_documents(self, documents: List[Dict]):
if not documents:
logger.warning("No documents to add")
return
try:
texts = [doc['content'] for doc in documents]
metadatas = [
{
'source': doc.get('source', 'Unknown'),
'type': doc.get('type', 'Unknown'),
'chunk_index': doc.get('chunk_index', 0)
}
for doc in documents
]
current_count = self.collection.count()
ids = [f"doc_{current_count + i}" for i in range(len(documents))]
embeddings = self.embedding_model.encode(
texts,
show_progress_bar=True,
batch_size=32
).tolist()
batch_size = 100
for i in range(0, len(documents), batch_size):
batch_end = min(i + batch_size, len(documents))
self.collection.add(
embeddings=embeddings[i:batch_end],
documents=texts[i:batch_end],
metadatas=metadatas[i:batch_end],
ids=ids[i:batch_end]
)
logger.info(f"Added {len(documents)} documents to vector store")
except Exception as e:
logger.error(f"Error adding documents to vector store: {e}")
raise
def search(self, query: str, n_results: int = 3) -> List[Dict]:
try:
if self.collection.count() == 0:
logger.warning("Vector store is empty")
return []
query_embedding = self.embedding_model.encode([query]).tolist()
results = self.collection.query(
query_embeddings=query_embedding,
n_results=min(n_results, self.collection.count())
)
formatted_results = []
if results['documents'] and results['documents'][0]:
for i in range(len(results['documents'][0])):
formatted_results.append({
'content': results['documents'][0][i],
'metadata': results['metadatas'][0][i] if results['metadatas'] else {},
'distance': results['distances'][0][i] if results['distances'] else 1.0
})
logger.info(f"Search for '{query[:50]}...' returned {len(formatted_results)} results")
return formatted_results
except Exception as e:
logger.error(f"Error searching vector store: {e}")
return []
def get_stats(self) -> Dict:
try:
count = self.collection.count()
return {
"total_documents": count,
"collection_name": self.collection_name,
"persist_directory": self.persist_directory
}
except Exception as e:
logger.error(f"Error getting stats: {e}")
return {}
def clear(self):
try:
self.client.delete_collection(self.collection.name)
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={"hnsw:space": "cosine"}
)
logger.info("Cleared vector store")
except Exception as e:
logger.error(f"Error clearing vector store: {e}")
raise
# llm_service.py - LLM interaction service
import requests
import json
from typing import List, Dict, Generator
from config import Config
import logging
logger = logging.getLogger(__name__)
class LLMService:
def __init__(self, use_local: bool = True, model_name: str = "llama2"):
self.use_local = use_local
self.model_name = model_name
self.ollama_url = f"{Config.OLLAMA_URL}/api/generate"
self.ollama_chat_url = f"{Config.OLLAMA_URL}/api/chat"
if not use_local:
from openai import OpenAI
self.openai_client = OpenAI(api_key=Config.OPENAI_API_KEY)
def generate_response(self, messages: List[Dict[str, str]]) -> str:
if self.use_local:
return self._generate_local(messages)
else:
return self._generate_remote(messages)
def generate_response_stream(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
if self.use_local:
yield from self._generate_local_stream(messages)
else:
yield from self._generate_remote_stream(messages)
def _generate_local(self, messages: List[Dict[str, str]]) -> str:
try:
payload = {
"model": self.model_name,
"messages": messages,
"stream": False,
"options": {
"temperature": Config.TEMPERATURE,
"num_predict": Config.MAX_TOKENS
}
}
response = requests.post(
self.ollama_chat_url,
json=payload,
timeout=120
)
response.raise_for_status()
result = response.json()
return result.get('message', {}).get('content', '')
except requests.exceptions.RequestException as e:
logger.error(f"Ollama request error: {e}")
raise Exception("Failed to connect to local LLM service")
except Exception as e:
logger.error(f"Local generation error: {e}")
raise
def _generate_local_stream(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
try:
payload = {
"model": self.model_name,
"messages": messages,
"stream": True,
"options": {
"temperature": Config.TEMPERATURE,
"num_predict": Config.MAX_TOKENS
}
}
response = requests.post(
self.ollama_chat_url,
json=payload,
stream=True,
timeout=120
)
response.raise_for_status()
for line in response.iter_lines():
if line:
chunk = json.loads(line)
if 'message' in chunk and 'content' in chunk['message']:
yield chunk['message']['content']
except Exception as e:
logger.error(f"Local streaming error: {e}")
raise
def _generate_remote(self, messages: List[Dict[str, str]]) -> str:
try:
response = self.openai_client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=messages,
max_tokens=Config.MAX_TOKENS,
temperature=Config.TEMPERATURE
)
return response.choices[0].message.content
except Exception as e:
logger.error(f"OpenAI generation error: {e}")
raise
def _generate_remote_stream(self, messages: List[Dict[str, str]]) -> Generator[str, None, None]:
try:
stream = self.openai_client.chat.completions.create(
model=Config.OPENAI_MODEL,
messages=messages,
max_tokens=Config.MAX_TOKENS,
temperature=Config.TEMPERATURE,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
except Exception as e:
logger.error(f"OpenAI streaming error: {e}")
raise
<!-- templates/index.html - Frontend interface -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>AI-Powered Assistant</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
padding: 20px;
}
.container {
width: 100%;
max-width: 900px;
background: white;
border-radius: 20px;
box-shadow: 0 20px 60px rgba(0,0,0,0.3);
overflow: hidden;
display: flex;
flex-direction: column;
height: 90vh;
max-height: 800px;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 25px 30px;
display: flex;
justify-content: space-between;
align-items: center;
}
.header h1 {
font-size: 24px;
font-weight: 600;
}
.settings-button {
background: rgba(255,255,255,0.2);
border: none;
color: white;
padding: 8px 16px;
border-radius: 8px;
cursor: pointer;
font-size: 14px;
}
.settings-button:hover {
background: rgba(255,255,255,0.3);
}
.chat-container {
flex: 1;
overflow-y: auto;
padding: 30px;
background: #f8f9fa;
}
.message {
margin-bottom: 20px;
display: flex;
align-items: flex-start;
animation: slideIn 0.3s ease;
}
@keyframes slideIn {
from {
opacity: 0;
transform: translateY(10px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
.message.user {
justify-content: flex-end;
}
.message-content {
max-width: 70%;
padding: 15px 20px;
border-radius: 18px;
line-height: 1.5;
}
.message.user .message-content {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
}
.message.assistant .message-content {
background: white;
color: #333;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
}
.message-sources {
margin-top: 10px;
padding: 10px;
background: #f0f0f0;
border-radius: 8px;
font-size: 12px;
}
.source-item {
margin: 5px 0;
color: #666;
}
.input-container {
padding: 20px 30px;
background: white;
border-top: 1px solid #e0e0e0;
}
.input-wrapper {
display: flex;
gap: 15px;
align-items: center;
}
.input-field {
flex: 1;
padding: 15px 20px;
border: 2px solid #e0e0e0;
border-radius: 12px;
font-size: 15px;
transition: border-color 0.3s;
}
.input-field:focus {
outline: none;
border-color: #667eea;
}
.send-button {
padding: 15px 30px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 12px;
cursor: pointer;
font-size: 15px;
font-weight: 600;
transition: transform 0.2s, box-shadow 0.2s;
}
.send-button:hover:not(:disabled) {
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(102, 126, 234, 0.4);
}
.send-button:disabled {
opacity: 0.5;
cursor: not-allowed;
}
.loading {
display: flex;
align-items: center;
gap: 8px;
color: #666;
font-style: italic;
}
.loading-dots {
display: flex;
gap: 4px;
}
.loading-dot {
width: 8px;
height: 8px;
background: #667eea;
border-radius: 50%;
animation: bounce 1.4s infinite ease-in-out;
}
.loading-dot:nth-child(1) {
animation-delay: -0.32s;
}
.loading-dot:nth-child(2) {
animation-delay: -0.16s;
}
@keyframes bounce {
0%, 80%, 100% {
transform: scale(0);
}
40% {
transform: scale(1);
}
}
.settings-panel {
display: none;
position: fixed;
top: 0;
left: 0;
right: 0;
bottom: 0;
background: rgba(0,0,0,0.5);
z-index: 1000;
justify-content: center;
align-items: center;
}
.settings-panel.active {
display: flex;
}
.settings-content {
background: white;
padding: 30px;
border-radius: 15px;
max-width: 500px;
width: 90%;
}
.settings-content h2 {
margin-bottom: 20px;
}
.setting-item {
margin-bottom: 15px;
}
.setting-item label {
display: block;
margin-bottom: 5px;
font-weight: 500;
}
.setting-item input[type="checkbox"] {
margin-right: 10px;
}
.close-button {
margin-top: 20px;
padding: 10px 20px;
background: #667eea;
color: white;
border: none;
border-radius: 8px;
cursor: pointer;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>AI-Powered Assistant</h1>
<button class="settings-button" onclick="toggleSettings()">Settings</button>
</div>
<div class="chat-container" id="chatContainer"></div>
<div class="input-container">
<div class="input-wrapper">
<input
type="text"
class="input-field"
id="messageInput"
placeholder="Type your message..."
onkeypress="handleKeyPress(event)"
>
<button class="send-button" id="sendButton" onclick="sendMessage()">Send</button>
</div>
</div>
</div>
<div class="settings-panel" id="settingsPanel">
<div class="settings-content">
<h2>Settings</h2>
<div class="setting-item">
<label>
<input type="checkbox" id="useRagCheckbox" checked>
Use document context (RAG)
</label>
</div>
<div class="setting-item">
<label>
<input type="checkbox" id="showSourcesCheckbox" checked>
Show sources
</label>
</div>
<button class="close-button" onclick="toggleSettings()">Close</button>
</div>
</div>
<script>
const chatContainer = document.getElementById('chatContainer');
const messageInput = document.getElementById('messageInput');
const sendButton = document.getElementById('sendButton');
const settingsPanel = document.getElementById('settingsPanel');
const useRagCheckbox = document.getElementById('useRagCheckbox');
const showSourcesCheckbox = document.getElementById('showSourcesCheckbox');
let conversationHistory = [];
function toggleSettings() {
settingsPanel.classList.toggle('active');
}
function addMessage(content, isUser, sources = null) {
const messageDiv = document.createElement('div');
messageDiv.className = `message ${isUser ? 'user' : 'assistant'}`;
const contentDiv = document.createElement('div');
contentDiv.className = 'message-content';
contentDiv.textContent = content;
messageDiv.appendChild(contentDiv);
if (!isUser && sources && sources.length > 0 && showSourcesCheckbox.checked) {
const sourcesDiv = document.createElement('div');
sourcesDiv.className = 'message-sources';
sourcesDiv.innerHTML = '<strong>Sources:</strong>';
sources.forEach(source => {
const sourceItem = document.createElement('div');
sourceItem.className = 'source-item';
sourceItem.textContent = `📄 ${source.source} (${source.type})`;
sourcesDiv.appendChild(sourceItem);
});
messageDiv.appendChild(sourcesDiv);
}
chatContainer.appendChild(messageDiv);
chatContainer.scrollTop = chatContainer.scrollHeight;
}
function showLoading() {
const loadingDiv = document.createElement('div');
loadingDiv.className = 'message assistant';
loadingDiv.id = 'loadingIndicator';
const contentDiv = document.createElement('div');
contentDiv.className = 'message-content loading';
contentDiv.innerHTML = `
<span>Thinking</span>
<div class="loading-dots">
<div class="loading-dot"></div>
<div class="loading-dot"></div>
<div class="loading-dot"></div>
</div>
`;
loadingDiv.appendChild(contentDiv);
chatContainer.appendChild(loadingDiv);
chatContainer.scrollTop = chatContainer.scrollHeight;
}
function hideLoading() {
const loadingDiv = document.getElementById('loadingIndicator');
if (loadingDiv) {
loadingDiv.remove();
}
}
async function sendMessage() {
const message = messageInput.value.trim();
if (!message) return;
addMessage(message, true);
conversationHistory.push({ role: 'user', content: message });
messageInput.value = '';
sendButton.disabled = true;
showLoading();
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
message: message,
history: conversationHistory.slice(-10),
use_rag: useRagCheckbox.checked
})
});
hideLoading();
if (response.ok) {
const data = await response.json();
addMessage(data.response, false, data.sources);
conversationHistory.push({ role: 'assistant', content: data.response });
} else {
const error = await response.json();
addMessage(`Error: ${error.error || 'Something went wrong'}`, false);
}
} catch (error) {
hideLoading();
addMessage('Error: Could not connect to the server', false);
console.error('Error:', error);
} finally {
sendButton.disabled = false;
messageInput.focus();
}
}
function handleKeyPress(event) {
if (event.key === 'Enter' && !event.shiftKey) {
event.preventDefault();
sendMessage();
}
}
window.addEventListener('click', (event) => {
if (event.target === settingsPanel) {
toggleSettings();
}
});
messageInput.focus();
addMessage('Hello! I am your AI assistant. How can I help you today?', false);
</script>
</body>
</html>
This complete production-ready example includes all the components needed for a fully functional LLM-powered web application with RAG capabilities. The system handles document ingestion, vector storage, similarity search, conversation management, caching, rate limiting, and provides a polished user interface. You can deploy this to production by setting up the required environment variables, installing dependencies, and running the Flask application behind a production WSGI server like Gunicorn.
No comments:
Post a Comment