INTRODUCTION
Welcome to the fascinating world of Large Language Models and the HuggingFace Transformers library. If you are reading this, you are about to embark on a transformative journey that will take you from someone who has never interacted with an LLM to a developer capable of building sophisticated AI-powered applications. This tutorial is meticulously designed for software engineers who understand general programming concepts but have absolutely zero experience with machine learning, natural language processing, or artificial intelligence.
The HuggingFace Transformers library represents a revolutionary shift in how developers interact with advanced language models. Before this library existed, working with models like GPT, BERT, LLaMA, or any other transformer-based architecture required deep expertise in machine learning frameworks, comprehensive understanding of complex neural network architectures, and the ability to deal with low-level tensor operations and mathematical computations. HuggingFace fundamentally changed this landscape by providing a unified, intuitive, and remarkably simple interface to thousands of pre-trained models.
Throughout this tutorial, we will construct five progressively sophisticated versions of an LLM chatbot application. Each version builds upon the previous one, systematically adding new capabilities while maintaining complete backward compatibility. By the time you reach the end of this tutorial, you will have created a production-ready, web-based chatbot with Retrieval-Augmented Generation capabilities that can run efficiently on any hardware configuration, from basic CPUs to high-end GPUs from different manufacturers.
WHAT EXACTLY IS A LARGE LANGUAGE MODEL
Before we dive into code, we must establish a foundational understanding of what a Large Language Model actually is. An LLM is a type of artificial neural network that has been trained on enormous amounts of text data from the internet, books, articles, and other written sources. The training process teaches the model to predict what word or token should come next in a sequence, given all the previous words.
Think of it like an incredibly sophisticated autocomplete system. When you type on your smartphone and it suggests the next word, that is a simple version of what LLMs do. However, LLMs are vastly more powerful because they have learned patterns, facts, reasoning capabilities, and language structures from billions of examples. They can generate coherent paragraphs, answer questions, write code, translate languages, and perform countless other language-related tasks.
The term "transformer" refers to the specific neural network architecture these models use. Transformers were introduced in a 2017 research paper titled "Attention Is All You Need" and revolutionized natural language processing. The key innovation is the attention mechanism, which allows the model to focus on relevant parts of the input when generating each output token. We will not dive into the mathematical details of transformers, but understanding that they process sequences of tokens and use attention to capture relationships between words is sufficient for our purposes.
WHAT YOU WILL BUILD THROUGHOUT THIS TUTORIAL
Our journey consists of five major milestones, each representing a complete, functional application that builds upon the previous version.
First, we create a basic console chatbot that can load any compatible LLM model and engage in conversations with users through the command line. This foundation teaches you about model loading, tokenization, text generation, hardware acceleration, and the fundamental components of the Transformers library.
Second, we add conversation memory so the chatbot remembers previous exchanges within a session. This introduces you to the stateless nature of LLMs, which is a critical concept. Unlike traditional applications that maintain state, LLMs have no memory between calls. Every time you send a prompt, you must include all relevant conversation history. We will implement this properly.
Third, we implement configurable generation parameters, giving users fine-grained control over how the model generates text. Parameters like temperature control randomness, max tokens limit output length, top-p and top-k control sampling strategies, and repetition penalty prevents the model from repeating itself. Understanding these parameters is essential for getting quality outputs.
Fourth, we integrate Retrieval-Augmented Generation, commonly abbreviated as RAG. This powerful technique allows the chatbot to reference external documents and provide grounded, factual responses based on your own data rather than just the model's training data. We will cover document loading, text chunking strategies, embeddings, vector databases, similarity search, and reranking.
Finally, we transform everything into a web application with a modern interface that users can access through their browsers. This involves integrating a web framework, handling asynchronous operations, implementing streaming responses for better user experience, and deploying the complete system.
Let us begin this exciting journey by understanding the absolute fundamentals.
STEP ONE: BUILDING THE FOUNDATION - A BASIC CONSOLE CHATBOT
Understanding What We Are Actually Building
A chatbot, at its most fundamental level, is a program that accepts text input from a user, processes that text through a language model, and returns a generated response. The simplest possible implementation involves several distinct steps. First, we load a pre-trained model into computer memory. Second, we convert user text into numerical tokens that the model can understand. Third, we run those tokens through the model's neural network to generate output tokens. Fourth, we convert the output tokens back into human-readable text and display it to the user.
However, beneath this simple description lie several layers of complexity that we must understand and handle properly. Different models have different architectures and were trained with different conversation formats. Some models expect messages wrapped in special tokens, while others use different conventions. Some models are instruction-tuned specifically for chat interactions, while others are base models better suited for text completion tasks. Our chatbot must handle these variations gracefully and automatically.
Additionally, modern computers have incredibly diverse hardware configurations. Some machines have NVIDIA GPUs with CUDA support, others have AMD GPUs with ROCm drivers, some have Apple Silicon processors with Metal Performance Shaders, and many have only CPUs without any GPU acceleration. Our application must intelligently detect available hardware and automatically use the fastest option without requiring manual configuration from users.
Installing Required Dependencies and Understanding Each One
Before writing any code, we need to install the necessary Python packages. The HuggingFace ecosystem consists of several libraries that work together synergistically. Each library has a specific purpose and understanding these purposes is crucial.
The transformers library is the core package that provides model architectures, tokenizers, training utilities, and high-level APIs for working with transformer models. This is the main library we will use throughout this tutorial.
The torch library, also known as PyTorch, is the underlying deep learning framework. PyTorch provides the fundamental building blocks for neural networks including tensors, which are multi-dimensional arrays of numbers, automatic differentiation for computing gradients, and GPU acceleration. The Transformers library is built on top of PyTorch and uses it for all low-level operations.
The accelerate library is a HuggingFace utility that simplifies device management, distributed training, and mixed precision training. It helps us write code that works seamlessly across different hardware configurations without manual device management.
The sentencepiece library is a tokenization library developed by Google. Some models, particularly those from the LLaMA family and many multilingual models, use SentencePiece tokenization. This library must be installed for those tokenizers to work.
The tokenizers library is a fast tokenization library written in Rust with Python bindings. It provides extremely efficient implementations of various tokenization algorithms and is used by many modern models for speed.
Open your terminal and execute the following installation command:
pip install transformers torch accelerate sentencepiece tokenizers
This command installs all core dependencies with their latest stable versions. Depending on your specific hardware configuration, you might need additional packages. For NVIDIA GPUs, you need to ensure you have CUDA-compatible PyTorch installed. The PyTorch website provides a configuration tool to generate the correct installation command for your system. For AMD GPUs, you need ROCm-compatible PyTorch, which requires following AMD's specific installation instructions. For Apple Silicon, the standard PyTorch installation includes Metal Performance Shaders support automatically, so no additional steps are needed.
Understanding Tokenization: The Critical Bridge Between Text and Numbers
Neural networks, at their core, are mathematical functions that operate on numerical data. They cannot process text directly. They work with tensors, which are multi-dimensional arrays of numbers similar to matrices in linear algebra but potentially with more dimensions. Tokenization is the fundamental process of converting human-readable text into numerical representations that neural networks can process.
This conversion happens in multiple stages, and understanding each stage is essential. First, text is split into tokens. A token is not necessarily a word. Depending on the tokenization algorithm, tokens can be complete words, parts of words called subwords, individual characters, or even bytes. The choice of tokenization strategy significantly impacts model performance and vocabulary size.
Second, each token is mapped to a unique integer identifier called a token ID. This mapping is defined by a vocabulary, which is a dictionary that associates each possible token with a specific number. The vocabulary is created during the model's training process and remains fixed afterward.
Third, these token IDs are converted into tensors that can be fed into the neural network. Additional processing may include adding special tokens, creating attention masks, and organizing the data into batches.
Different models use fundamentally different tokenization strategies. Byte-Pair Encoding, abbreviated as BPE, is one popular approach. BPE starts with individual characters and iteratively merges the most frequently occurring pairs of tokens to create a vocabulary of subword units. This allows the model to handle any word, even ones it has never seen, by breaking them into known subword pieces.
WordPiece is another algorithm, used by models like BERT. It is similar to BPE but uses a different merging criterion based on likelihood maximization.
SentencePiece is yet another approach that treats the input as a raw stream of Unicode characters and learns subword units directly from that stream without requiring pre-tokenization into words.
The absolutely critical insight here is that tokenization is not arbitrary or interchangeable. Each model has a specific tokenizer that was trained alongside it using the same data. You must use the exact correct tokenizer for each model. Using the wrong tokenizer will produce nonsensical results because the token IDs will not correspond to what the model expects.
Let us examine a concrete tokenization example to make this concept crystal clear:
from transformers import AutoTokenizer
# Load a tokenizer for a specific model
# AutoTokenizer automatically determines the correct tokenizer type
tokenizer = AutoTokenizer.from_pretrained("gpt2")
# Convert text to tokens (the actual string pieces)
text = "Hello, how are you today?"
tokens = tokenizer.tokenize(text)
print("Tokens:", tokens)
# Output might be: ['Hello', ',', 'Ġhow', 'Ġare', 'Ġyou', 'Ġtoday', '?']
# Note: 'Ġ' represents a space in GPT-2 style tokenization
# Convert text directly to token IDs (the numerical representation)
token_ids = tokenizer.encode(text)
print("Token IDs:", token_ids)
# Output might be: [15496, 11, 703, 389, 345, 1909, 30]
# Convert token IDs back to text (decoding)
decoded_text = tokenizer.decode(token_ids)
print("Decoded:", decoded_text)
# Output: "Hello, how are you today?"
In this example, we use the AutoTokenizer class, which is a factory class that automatically loads the correct tokenizer type for any given model. The from_pretrained method is incredibly powerful. It takes a model identifier, which can be a model name from the HuggingFace Hub or a path to a local directory, and it downloads the tokenizer configuration and vocabulary files if they are not already cached locally.
The tokenize method shows the actual token strings, which helps us understand how the text is being split. Notice in the example output that some tokens have a special character 'Ġ' prefix. This is how GPT-2 style tokenizers represent spaces. Different tokenizers use different conventions.
The encode method converts text directly to integer IDs in a single step. This is the method you will use most frequently because it is more efficient than tokenizing and then converting separately. The encode method also handles adding special tokens that the model expects, such as beginning-of-sequence and end-of-sequence markers.
The decode method reverses the entire process, converting token IDs back to human-readable text. This is what we use to convert the model's numerical output into text we can display to users.
The AutoTokenizer class is incredibly convenient because it completely abstracts away the complexity of different tokenizer types. You do not need to know whether a model uses BertTokenizer, GPT2Tokenizer, LlamaTokenizer, or any other specific tokenizer class. Simply provide the model name, and AutoTokenizer handles all the details automatically.
The AutoModel Family: Loading Pre-Trained Models
HuggingFace provides several Auto classes for loading models, and understanding the distinctions between them is important. The most important class for our chatbot purposes is AutoModelForCausalLM, which loads models specifically designed for causal language modeling, also known as autoregressive text generation.
Causal language modeling means the model predicts the next token in a sequence based only on previous tokens. The model cannot look ahead at future tokens when making predictions. This is exactly the behavior we need for a chatbot because we want the model to generate responses one token at a time, building up a coherent answer sequentially.
Other Auto classes exist for different tasks. AutoModelForSequenceClassification loads models for text classification tasks like sentiment analysis. AutoModelForQuestionAnswering loads models specifically fine-tuned for extractive question answering. AutoModelForMaskedLM loads models like BERT that predict masked tokens in the middle of sequences. For our chatbot, we exclusively use AutoModelForCausalLM.
The AutoModelForCausalLM class works similarly to AutoTokenizer. You provide a model identifier, and it downloads the model weights, loads the correct architecture, and returns a ready-to-use model object. Here is a basic example with detailed explanations:
from transformers import AutoModelForCausalLM
import torch
# Load a model with specific configuration options
model = AutoModelForCausalLM.from_pretrained(
"gpt2",
torch_dtype=torch.float16, # Use half precision for efficiency
low_cpu_mem_usage=True # Optimize memory usage during loading
)
print(f"Model loaded: {model.__class__.__name__}")
print(f"Number of parameters: {model.num_parameters():,}")
The from_pretrained method is the standard way to load any pre-trained model in the Transformers library. It accepts numerous parameters that control how the model is loaded and configured. Let us examine the most important ones.
The first argument is the model identifier. This can be a model name from the HuggingFace Hub like "gpt2", or it can be a local directory path containing model files. When you provide a Hub name, the method automatically downloads the model files to a cache directory on your computer. Subsequent loads use the cached version, so you only download once.
The torch_dtype parameter controls the numerical precision of the model's parameters. By default, models use torch.float32, which means each number is stored as a 32-bit floating point value. This provides high precision but uses significant memory. Setting torch_dtype to torch.float16 uses 16-bit floating point values instead, which reduces memory usage by exactly half with minimal impact on output quality for most models. This is called half precision or mixed precision computing.
For very large models, you might even use torch.bfloat16, which is a special 16-bit format that maintains the same range as float32 but with less precision. This is particularly useful for models with billions of parameters.
The low_cpu_mem_usage parameter optimizes how the model is loaded into memory. When set to True, the model is loaded more efficiently, preventing memory spikes during initialization. This is especially important for large models that might otherwise cause out-of-memory errors during loading.
The model object returned is a PyTorch neural network module, specifically an instance of a class like GPT2LMHeadModel, LlamaForCausalLM, or another architecture-specific class. It contains all the learned weights, which are the numerical parameters that were optimized during training, and the forward pass logic, which defines how inputs are transformed into outputs through the neural network layers.
However, we rarely call the model's forward method directly. Instead, we use higher-level generation utilities provided by the Transformers library that handle the complexities of text generation, including sampling strategies, stopping criteria, and output formatting.
Device Management: Leveraging Available Hardware Acceleration
Modern deep learning frameworks can run computations on various types of hardware devices. CPUs are universal and always available, but they are relatively slow for neural network operations. GPUs, which are Graphics Processing Units originally designed for rendering graphics, are vastly more efficient for the parallel matrix operations that neural networks require. Different GPU manufacturers use different software stacks. NVIDIA GPUs use CUDA, AMD GPUs use ROCm, and Apple Silicon uses Metal Performance Shaders abbreviated as MPS.
Running models on GPUs is typically ten to one hundred times faster than CPUs, depending on the model size and GPU capability. Therefore, we want to use GPU acceleration whenever possible. However, we must write our code to work on any available hardware without requiring users to manually configure device settings.
PyTorch provides a device abstraction that lets us write hardware-agnostic code. A device is represented by a torch.device object that specifies where tensors should be stored and where operations should be executed. Here is a robust device detection function with comprehensive explanations:
import torch
def get_optimal_device():
"""
Detects and returns the best available device for model inference.
This function checks for available hardware in order of performance:
1. CUDA (NVIDIA GPUs) - fastest for most models
2. MPS (Apple Silicon) - fast on Mac computers with M1/M2/M3 chips
3. CPU - universal fallback, slower but always available
Returns:
torch.device: The optimal device object for tensor operations
"""
if torch.cuda.is_available():
# NVIDIA GPU with CUDA support detected
device = torch.device("cuda")
# Get the name of the GPU for informational purposes
gpu_name = torch.cuda.get_device_name(0)
# Get total GPU memory in gigabytes
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9
print(f"Using CUDA GPU: {gpu_name}")
print(f"Available GPU memory: {gpu_memory:.2f} GB")
return device
elif torch.backends.mps.is_available():
# Apple Silicon with Metal Performance Shaders detected
device = torch.device("mps")
print("Using Apple Metal Performance Shaders (MPS)")
return device
else:
# No GPU acceleration available, falling back to CPU
device = torch.device("cpu")
print("Using CPU (no GPU acceleration available)")
print("Warning: CPU inference will be significantly slower than GPU")
return device
This function implements a priority-based device selection strategy. It first checks if CUDA is available using torch.cuda.is_available(), which returns True if an NVIDIA GPU is detected and CUDA drivers are properly installed. If CUDA is available, we create a device object with torch.device("cuda"), which represents the first NVIDIA GPU in the system. For systems with multiple GPUs, you could specify "cuda:0", "cuda:1", etc., but for our chatbot, using the default first GPU is sufficient.
When a CUDA device is selected, we also retrieve and display diagnostic information. The torch.cuda.get_device_name function returns the commercial name of the GPU, like "NVIDIA GeForce RTX 3080". The torch.cuda.get_device_properties function returns a structure containing various properties, including total_memory, which tells us how much VRAM the GPU has. This information helps users understand what hardware is being used and diagnose potential memory issues.
If CUDA is not available, the function checks for MPS using torch.backends.mps.is_available(). MPS is Apple's GPU acceleration framework for their M-series chips. If detected, we create an MPS device. MPS provides significant speedups on Apple Silicon compared to CPU, though typically not quite as fast as high-end NVIDIA GPUs.
If neither CUDA nor MPS is available, we fall back to CPU. The function prints a warning because CPU inference is substantially slower. For small models, CPU inference might be acceptable, but for large models with billions of parameters, it can be painfully slow.
Once we have determined the optimal device, we need to move both the model and input tensors to that device. Moving the model is a one-time operation performed after loading:
device = get_optimal_device()
model = model.to(device)
The to method is a PyTorch method available on all neural network modules. It moves all model parameters and buffers to the specified device. This operation can take a few seconds for large models because it involves copying gigabytes of data to GPU memory. After this operation, the model resides on the GPU and all subsequent operations will execute there.
Input tensors must also be moved to the same device before processing. We will handle this in our generation code by ensuring that tokenized inputs are moved to the device before being passed to the model. If the model is on a GPU but the input is on CPU, PyTorch will raise an error because operations cannot mix devices.
Understanding Chat Templates and Conversation Formatting
One of the most confusing and frustrating aspects of working with different LLMs is that they expect inputs formatted in specific, often completely different ways. This is not a technical limitation but rather a consequence of how models are trained. During fine-tuning for chat applications, models learn to associate specific formatting patterns with conversational structure.
However, many models, especially base models and older conversational models, do not have standardized chat templates. The DialoGPT model we will use in our examples is one such model. It was trained on conversational data but does not use the modern chat template format. Instead, it expects simple concatenation of user and bot messages.
To handle this diversity, we need a flexible conversation formatting system that works with both models that have chat templates and those that do not. Here is a robust implementation:
def format_conversation(tokenizer, conversation_history, add_generation_prompt=True):
"""
Formats conversation with or without chat template support.
Falls back to simple concatenation if chat template not available.
Args:
tokenizer: The tokenizer object
conversation_history: List of message dictionaries with 'role' and 'content'
add_generation_prompt: Whether to add the assistant prefix
Returns:
str: Formatted conversation string
"""
# Check if tokenizer has chat template support
if hasattr(tokenizer, 'chat_template') and tokenizer.chat_template is not None:
try:
# Use official chat template
formatted_prompt = tokenizer.apply_chat_template(
conversation_history,
tokenize=False,
add_generation_prompt=add_generation_prompt
)
return formatted_prompt
except Exception as e:
print(f"Chat template failed: {e}, using fallback")
# Fallback for models without chat templates
formatted_parts = []
for message in conversation_history:
role = message["role"]
content = message["content"]
if role == "system":
# System messages provide instructions
formatted_parts.append(content)
elif role == "user":
# User messages
formatted_parts.append(content)
elif role == "assistant":
# Assistant responses
formatted_parts.append(content)
# Join all parts
result = tokenizer.eos_token.join(formatted_parts) if hasattr(tokenizer, 'eos_token') and tokenizer.eos_token else "\n".join(formatted_parts)
# Add generation prompt if requested
if add_generation_prompt:
if hasattr(tokenizer, 'eos_token') and tokenizer.eos_token:
result += tokenizer.eos_token
return result
This function first attempts to use the tokenizer's built-in chat template if available. If the tokenizer does not have a chat template or if the template fails, it falls back to a simple concatenation strategy that works with most conversational models.
The conversation_history parameter is a list of dictionaries, where each dictionary represents one message in the conversation. The role field specifies who is speaking and can be "system", "user", or "assistant". The system role is used for instructions that guide the model's behavior. The user role represents messages from the human user. The assistant role represents messages from the AI assistant.
The content field contains the actual text of the message. This structure is standardized across the Transformers library, so you always use the same format regardless of which model you are working with.
The Generate Method: Fine-Grained Control Over Text Generation
The generate method is the core function for producing text with causal language models. It is a method on the model object itself and provides extensive control over the generation process through numerous parameters. Understanding these parameters is essential for getting quality outputs.
Here is a basic example of using the generate method:
# Prepare input by tokenizing
input_text = "Hello, how are you?"
input_ids = tokenizer.encode(input_text, return_tensors="pt")
# Move input to the same device as the model
input_ids = input_ids.to(device)
# Generate output tokens
with torch.no_grad():
output_ids = model.generate(
input_ids,
max_new_tokens=50,
do_sample=True,
temperature=0.7,
pad_token_id=tokenizer.eos_token_id
)
# Decode output tokens to text
# Only decode the newly generated tokens, not the input
output_text = tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
print(output_text)
Let us dissect each part of this code in detail. First, we tokenize the input text using the tokenizer's encode method. The return_tensors parameter specifies the format of the returned data. Setting it to "pt" returns PyTorch tensors instead of plain Python lists. This is necessary because the model expects tensor inputs.
The encode method returns a tensor of token IDs with shape (1, sequence_length), where 1 is the batch dimension. Even though we are processing a single input, PyTorch models expect batched inputs, so the tokenizer adds a batch dimension automatically when return_tensors is specified.
Next, we move the input tensor to the same device as the model using the to method. This is crucial because PyTorch requires all tensors involved in an operation to be on the same device. If the model is on a GPU but the input is on CPU, the operation will fail.
The generate method is where the actual text generation happens. It takes the input token IDs and produces output token IDs by repeatedly predicting the next token and appending it to the sequence. Let us examine each parameter:
The max_new_tokens parameter specifies the maximum number of new tokens to generate, not counting the input tokens. This is different from max_length, which specifies the total length including input. Using max_new_tokens is generally better because it gives you predictable output lengths regardless of input length.
The do_sample parameter controls whether to use sampling or greedy decoding. When set to False, the model always selects the highest probability token at each step. This is called greedy decoding and produces deterministic outputs. When set to True, the model samples from the probability distribution, introducing randomness and variety in outputs.
The temperature parameter controls the randomness of sampling. It is a positive number that scales the logits, which are the raw model outputs, before converting them to probabilities. A temperature of 1.0 uses the model's original probabilities. Lower temperatures like 0.7 make the distribution more peaked, favoring high-probability tokens and producing more focused, conservative outputs. Higher temperatures like 1.5 flatten the distribution, giving lower-probability tokens more chance and producing more creative, diverse outputs.
The pad_token_id parameter specifies which token ID to use for padding. Padding is necessary when processing batches of sequences with different lengths. All sequences in a batch must have the same length, so shorter sequences are padded. Some tokenizers have a dedicated padding token, but many do not. Setting pad_token_id to tokenizer.eos_token_id uses the end-of-sequence token for padding, which is a common practice.
The torch.no_grad() context manager is a performance optimization. It tells PyTorch not to compute gradients, which are only needed for training. This reduces memory usage and speeds up inference.
The generate method returns a tensor of token IDs with shape (batch_size, sequence_length). For our single-input example, this is (1, total_length) where total_length includes both the input and generated tokens.
The critical improvement in our decoding step is that we only decode the newly generated tokens, not the entire sequence including the input. We do this by slicing the output tensor: output_ids[0][input_ids.shape[1]:]. This takes the first batch element (index 0) and slices from the end of the input (input_ids.shape[1]) to the end. This gives us only the new tokens.
The decode method converts token IDs back to text. The skip_special_tokens parameter, when set to True, removes special tokens like padding tokens, end-of-sequence tokens, and other model-specific markers from the output. This produces cleaner text for display to users.
Building the Basic Console Chatbot: Putting It All Together
Now that we understand all the fundamental components, we can build our first complete chatbot. This version will be a simple console application that loads a model, accepts user input, generates responses, and displays them. It will not yet have conversation memory, but it will demonstrate all the core concepts.
Let us examine the complete code with extensive inline comments:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
def get_optimal_device():
"""
Detects and returns the best available device for model inference.
Checks for CUDA, MPS, and falls back to CPU if neither is available.
"""
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"Using CUDA GPU: {torch.cuda.get_device_name(0)}")
print(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
return device
elif torch.backends.mps.is_available():
device = torch.device("mps")
print("Using Apple Metal Performance Shaders (MPS)")
return device
else:
device = torch.device("cpu")
print("Using CPU (no GPU acceleration available)")
return device
def load_model_and_tokenizer(model_name):
"""
Loads a pre-trained model and its associated tokenizer.
Args:
model_name: HuggingFace model identifier or local path
Returns:
tuple: (model, tokenizer, device)
"""
print(f"Loading model: {model_name}")
print("This may take a few minutes on first run...")
# Load tokenizer
# AutoTokenizer automatically selects the correct tokenizer class
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Fix missing padding token issue
# Many models don't have a padding token defined
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load model with optimized settings
# torch.float16 reduces memory usage by half
# low_cpu_mem_usage prevents memory spikes during loading
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
low_cpu_mem_usage=True
)
# Detect optimal device and move model to it
device = get_optimal_device()
model = model.to(device)
# Set model to evaluation mode
# This disables dropout and other training-specific behaviors
model.eval()
print(f"Model loaded successfully!")
print(f"Model has {model.num_parameters():,} parameters")
return model, tokenizer, device
def generate_response(model, tokenizer, device, prompt, max_new_tokens=100):
"""
Generates a response to a given prompt using the loaded model.
Args:
model: The loaded language model
tokenizer: The associated tokenizer
device: The device where the model resides
prompt: Input text to respond to
max_new_tokens: Maximum number of tokens to generate
Returns:
str: Generated response text
"""
# Tokenize input and convert to tensor
# return_tensors="pt" returns PyTorch tensors
input_ids = tokenizer.encode(prompt, return_tensors="pt")
# Move input to the same device as the model
input_ids = input_ids.to(device)
# Generate response tokens
# torch.no_grad() disables gradient computation for efficiency
with torch.no_grad():
output_ids = model.generate(
input_ids,
max_new_tokens=max_new_tokens,
do_sample=True, # Use sampling for varied responses
temperature=0.7, # Control randomness (lower = more focused)
top_p=0.9, # Nucleus sampling parameter
pad_token_id=tokenizer.eos_token_id # Use EOS token for padding
)
# Decode only the newly generated tokens
# This avoids including the input prompt in the response
response = tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
return response.strip()
def main():
"""
Main function that runs the console chatbot.
"""
print("=" * 60)
print("CONSOLE CHATBOT - Version 1.0")
print("=" * 60)
# Specify the model to use
# GPT-2 is a good starting model - small, fast, and widely compatible
model_name = "gpt2"
# Load model and tokenizer
model, tokenizer, device = load_model_and_tokenizer(model_name)
print("\nChatbot ready! Type 'quit' to exit.")
print("-" * 60)
# Main conversation loop
while True:
# Get user input
user_input = input("\nYou: ").strip()
# Check for exit command
if user_input.lower() in ['quit', 'exit', 'bye']:
print("Goodbye!")
break
# Skip empty inputs
if not user_input:
continue
# Generate and display response
print("Assistant: ", end="", flush=True)
response = generate_response(
model,
tokenizer,
device,
user_input,
max_new_tokens=100
)
print(response)
if __name__ == "__main__":
main()
This complete program demonstrates all the concepts we have discussed. The get_optimal_device function detects available hardware. The load_model_and_tokenizer function loads the model and tokenizer with optimal settings, including the critical fix for missing padding tokens. The generate_response function handles the complete generation workflow, properly extracting only the new tokens. The main function orchestrates everything and provides a simple console interface.
When you run this program, it first loads the model, which may take a minute or two depending on your internet connection and hardware. Once loaded, you can type messages and receive responses. The chatbot does not yet remember previous messages, so each interaction is independent.
The model.eval() call is important. PyTorch models have two modes: training mode and evaluation mode. Training mode enables certain behaviors like dropout, which randomly deactivates neurons during training to prevent overfitting. Evaluation mode disables these behaviors for consistent inference. Always call eval() before using a model for generation.
The torch.no_grad() context manager is a performance optimization. It tells PyTorch not to compute gradients, which are only needed for training. This reduces memory usage and speeds up inference.
The top_p parameter we added to generate is another sampling strategy called nucleus sampling. It keeps only the most probable tokens whose cumulative probability exceeds p. This provides a dynamic vocabulary size that adapts to the certainty of the model's predictions.
STEP TWO: ADDING CONVERSATION MEMORY
Understanding the Stateless Nature of Language Models
This is one of the most important concepts to understand about Large Language Models: they are completely stateless. Every time you call the generate method, the model has absolutely no memory of previous calls. It only sees the input tokens you provide in that specific call. This is fundamentally different from traditional applications where objects maintain state between method calls.
Think of it like calling a pure mathematical function. If you call f(x) twice with the same x, you get the same result both times. The function does not remember that you called it before. Language models work exactly the same way. If you send the same prompt twice, you get similar outputs both times (with some variation due to sampling randomness), but the model does not "remember" the first call.
This statelessness is actually a design feature, not a limitation. It makes models simpler, more predictable, and easier to scale. However, it means that to have a conversation with memory, we must manually manage the conversation history and include it in every prompt.
The solution is to maintain a list of all previous messages and include them in each generation call. When the user sends a new message, we append it to the history, generate a response using the complete history, append the response to the history, and repeat. This way, the model always sees the full context of the conversation.
Implementing Conversation History Management
To implement conversation memory, we need to modify our chatbot to maintain a conversation history and format it properly for the model. Since we are using GPT-2, which does not have a chat template, we will use a simple concatenation strategy with the EOS token as a separator.
Here is how we structure the conversation history:
# Initialize conversation history
# This list will grow as the conversation progresses
conversation_history = []
# Add a system message to guide the model's behavior
# For models without chat templates, we include this as context
system_message = "You are a helpful and friendly AI assistant."
conversation_history.append(system_message)
The conversation history starts with a system message that provides instructions. For models with chat templates, we would use the role-based format, but for GPT-2, we simply include it as part of the context.
When the user sends a message, we add it to the history:
# User sends a message
user_message = "What is the capital of France?"
# Add user message to history
conversation_history.append(user_message)
Now we need to format the entire conversation history and generate a response:
# Format conversation by joining with EOS token
formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
# Tokenize the formatted prompt
input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(device)
# Generate response
with torch.no_grad():
output_ids = model.generate(
input_ids,
max_new_tokens=100,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=tokenizer.eos_token_id
)
# Decode only the new tokens
assistant_response = tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
# Add assistant response to history
conversation_history.append(assistant_response.strip())
The critical insight here is that we format the entire conversation history, not just the latest message. We join all messages with the EOS token, which helps the model distinguish between different turns in the conversation. This ensures the model sees the full context.
After generation, we extract only the new response by slicing the output tensor to exclude the input tokens. We then add the assistant's response to the conversation history so it will be included in the next turn.
Managing Context Window Limitations
There is one major complication with this approach: models have maximum context lengths. A context length, also called a context window, is the maximum number of tokens a model can process in a single call. For example, GPT-2 has a context window of 1024 tokens. If your conversation history grows beyond this limit, the model cannot process it.
We need to implement context window management to prevent this issue. There are several strategies. The simplest is to truncate old messages when the history gets too long. A more sophisticated approach is to summarize old messages. For our chatbot, we will implement a sliding window that keeps only the most recent messages.
Here is a function that manages the context window:
def manage_context_window(conversation_history, tokenizer, max_tokens=800):
"""
Ensures conversation history fits within the model's context window.
Removes oldest messages if necessary, always keeping the system message.
Args:
conversation_history: List of conversation messages
tokenizer: The tokenizer to measure token counts
max_tokens: Maximum tokens to keep (should be less than model's limit)
Returns:
list: Potentially truncated conversation history
"""
# Always keep the system message if it exists
system_message = None
messages = conversation_history.copy()
if messages and len(messages) > 0:
# Assume first message is system message
system_message = messages[0]
messages = messages[1:]
# Calculate total tokens in current history
formatted = tokenizer.eos_token.join(messages) if messages else ""
token_count = len(tokenizer.encode(formatted))
# Remove oldest messages until we fit in the limit
while token_count > max_tokens and len(messages) > 1:
# Remove the oldest message (index 0)
messages.pop(0)
# Recalculate token count
formatted = tokenizer.eos_token.join(messages)
token_count = len(tokenizer.encode(formatted))
# Reconstruct history with system message
result = []
if system_message:
result.append(system_message)
result.extend(messages)
return result
This function preserves the system message while removing old user and assistant messages from the beginning of the conversation. It calculates the total token count by actually encoding the formatted history, which gives an accurate measurement. The max_tokens parameter should be set lower than the model's actual limit to leave room for the response.
We call this function before each generation:
# Manage context window before generation
conversation_history = manage_context_window(
conversation_history,
tokenizer,
max_tokens=800
)
This ensures we never exceed the model's context limit, even in very long conversations.
Complete Chatbot with Conversation Memory
Let us now examine the complete second version of our chatbot with conversation memory fully implemented:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
def get_optimal_device():
"""Detects and returns the best available device."""
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"Using CUDA GPU: {torch.cuda.get_device_name(0)}")
return device
elif torch.backends.mps.is_available():
device = torch.device("mps")
print("Using Apple MPS")
return device
else:
device = torch.device("cpu")
print("Using CPU")
return device
def load_model_and_tokenizer(model_name):
"""Loads model and tokenizer with optimal settings."""
print(f"Loading model: {model_name}")
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Fix missing padding token
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
low_cpu_mem_usage=True
)
device = get_optimal_device()
model = model.to(device)
model.eval()
print(f"Model loaded with {model.num_parameters():,} parameters")
return model, tokenizer, device
def manage_context_window(conversation_history, tokenizer, max_tokens=800):
"""
Ensures conversation fits within context window by removing old messages.
Always preserves the system message if present.
"""
system_message = None
messages = conversation_history.copy()
# Extract and preserve system message
if messages and len(messages) > 0:
system_message = messages[0]
messages = messages[1:]
# Calculate current token count
if messages:
formatted = tokenizer.eos_token.join(messages)
token_count = len(tokenizer.encode(formatted))
# Remove oldest messages until we fit
while token_count > max_tokens and len(messages) > 1:
messages.pop(0)
formatted = tokenizer.eos_token.join(messages)
token_count = len(tokenizer.encode(formatted))
# Restore system message
result = []
if system_message:
result.append(system_message)
result.extend(messages)
return result
def generate_response(model, tokenizer, device, conversation_history, max_new_tokens=150):
"""
Generates a response based on complete conversation history.
Args:
model: The language model
tokenizer: Associated tokenizer
device: Device where model resides
conversation_history: List of conversation messages
max_new_tokens: Maximum tokens to generate
Returns:
str: Generated assistant response
"""
# Format conversation by joining with EOS token
formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
# Tokenize and move to device
input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(device)
# Generate response
with torch.no_grad():
output_ids = model.generate(
input_ids,
max_new_tokens=max_new_tokens,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=tokenizer.eos_token_id
)
# Decode only the new tokens
assistant_response = tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
return assistant_response.strip()
def main():
"""Main chatbot function with conversation memory."""
print("=" * 60)
print("CONSOLE CHATBOT - Version 2.0 (With Memory)")
print("=" * 60)
model_name = "gpt2"
model, tokenizer, device = load_model_and_tokenizer(model_name)
# Initialize conversation history with system message
conversation_history = [
"You are a helpful and friendly AI assistant."
]
print("\nChatbot ready! Type 'quit' to exit, 'reset' to clear history.")
print("-" * 60)
while True:
user_input = input("\nYou: ").strip()
if user_input.lower() in ['quit', 'exit']:
print("Goodbye!")
break
if user_input.lower() == 'reset':
conversation_history = [conversation_history[0]] # Keep system message
print("Conversation history cleared.")
continue
if not user_input:
continue
# Add user message to history
conversation_history.append(user_input)
# Manage context window
conversation_history = manage_context_window(
conversation_history,
tokenizer,
max_tokens=800
)
# Generate response
print("Assistant: ", end="", flush=True)
response = generate_response(
model,
tokenizer,
device,
conversation_history,
max_new_tokens=150
)
print(response)
# Add assistant response to history
conversation_history.append(response)
if __name__ == "__main__":
main()
This version maintains full conversation context, allowing the model to reference previous messages and maintain coherent multi-turn conversations. The reset command lets users start fresh without restarting the program. The context window management prevents errors from exceeding token limits.
STEP THREE: CONFIGURABLE GENERATION PARAMETERS
Understanding Generation Parameters and Their Effects
The quality, style, and characteristics of generated text are controlled by numerous parameters passed to the generate method. Understanding these parameters is crucial for getting the outputs you want. Each parameter affects the generation process in different ways, and finding the right combination often requires experimentation.
Let us examine each major generation parameter in detail, explaining what it does, why it matters, and how to use it effectively.
Temperature: Controlling Randomness and Creativity
Temperature is perhaps the most important generation parameter. It controls the randomness of the model's predictions by scaling the logits before converting them to probabilities. Logits are the raw numerical outputs from the model's final layer before any normalization.
Mathematically, temperature works like this. The model outputs a vector of logits, one for each token in the vocabulary. These logits are converted to probabilities using the softmax function. Temperature divides the logits before applying softmax. Lower temperatures make the distribution more peaked, higher temperatures make it flatter.
A temperature of 1.0 uses the model's original probability distribution without modification. This is the baseline.
A temperature below 1.0, such as 0.7 or 0.5, makes the model more conservative and focused. High-probability tokens become even more likely, while low-probability tokens become less likely. This produces more predictable, coherent, and factual outputs. Use lower temperatures when you want consistent, reliable responses.
A temperature above 1.0, such as 1.2 or 1.5, makes the model more creative and diverse. It gives lower-probability tokens more chance of being selected. This produces more varied, creative, and sometimes surprising outputs. However, very high temperatures can produce incoherent or nonsensical text. Use higher temperatures for creative writing or when you want diverse outputs.
A temperature of 0.0 is a special case that makes sampling equivalent to greedy decoding. The model always selects the highest probability token. This produces completely deterministic outputs.
Here is how to use temperature in code:
output_ids = model.generate(
input_ids,
max_new_tokens=100,
do_sample=True, # Must be True to use temperature
temperature=0.7 # Lower = more focused, higher = more creative
)
Note that temperature only has an effect when do_sample is True. With greedy decoding, temperature is ignored.
Top-k Sampling: Limiting the Vocabulary
Top-k sampling is a technique that restricts the model to considering only the k most probable tokens at each generation step. All other tokens are given zero probability. This prevents the model from selecting very unlikely tokens that might produce nonsensical outputs.
For example, with top_k=50, the model considers only the 50 most probable tokens at each step. Even if temperature makes the distribution flatter, tokens outside the top 50 are completely excluded.
Top-k sampling helps prevent the model from occasionally selecting bizarre, out-of-context tokens. However, it has a limitation: it uses a fixed k regardless of how certain the model is. Sometimes the model is very certain and only a few tokens make sense. Other times the model is uncertain and many tokens are plausible. A fixed k does not adapt to this.
Here is how to use top-k:
output_ids = model.generate(
input_ids,
max_new_tokens=100,
do_sample=True,
temperature=0.7,
top_k=50 # Consider only top 50 tokens
)
Typical values for top_k range from 20 to 100. Lower values are more restrictive, higher values are more permissive.
Top-p Sampling (Nucleus Sampling): Dynamic Vocabulary
Top-p sampling, also called nucleus sampling, is a more sophisticated alternative to top-k. Instead of using a fixed number of tokens, it dynamically selects the smallest set of tokens whose cumulative probability exceeds p.
For example, with top_p=0.9, the model considers the most probable tokens until their cumulative probability reaches 90 percent. The number of tokens included varies depending on the model's certainty. When the model is very certain, only a few tokens might be needed to reach 90 percent. When the model is uncertain, many tokens might be included.
This adaptive behavior makes top-p generally superior to top-k. It prevents unlikely tokens while adapting to the model's confidence. Top-p values typically range from 0.8 to 0.95. Lower values are more restrictive, higher values are more permissive.
You can use both top-k and top-p together. The model first applies top-k to limit the vocabulary, then applies top-p within that subset:
output_ids = model.generate(
input_ids,
max_new_tokens=100,
do_sample=True,
temperature=0.7,
top_k=50,
top_p=0.9 # Nucleus sampling
)
However, using top-p alone is often sufficient and simpler.
Repetition Penalty: Preventing Redundancy
Language models sometimes get stuck in repetitive loops, generating the same phrases or words over and over. The repetition penalty parameter discourages this by reducing the probability of tokens that have already been generated.
The repetition penalty is a multiplier applied to the logits of previously generated tokens. A value of 1.0 means no penalty. Values above 1.0 penalize repetition. For example, with repetition_penalty=1.2, tokens that have already appeared have their logits divided by 1.2, making them less likely to be selected again.
Higher values produce more diverse outputs but can sometimes make the text feel unnatural if the model is forced to avoid common words. Typical values range from 1.0 to 1.5:
output_ids = model.generate(
input_ids,
max_new_tokens=100,
do_sample=True,
temperature=0.7,
top_p=0.9,
repetition_penalty=1.2 # Discourage repetition
)
Use repetition penalty when you notice the model repeating itself excessively.
Length Penalties and Constraints
Several parameters control the length of generated outputs. We have already seen max_new_tokens, which sets a hard limit on generation length. Other parameters provide more nuanced control.
The min_new_tokens parameter sets a minimum length, forcing the model to generate at least that many tokens even if it wants to stop earlier:
output_ids = model.generate(
input_ids,
min_new_tokens=20, # Generate at least 20 tokens
max_new_tokens=100
)
The length_penalty parameter affects beam search, which is an alternative to sampling that we will discuss shortly. It encourages or discourages longer outputs. Values above 1.0 encourage longer outputs, values below 1.0 encourage shorter outputs.
Beam Search: An Alternative to Sampling
So far, we have focused on sampling-based generation where the model randomly selects tokens according to their probabilities. Beam search is a deterministic alternative that maintains multiple candidate sequences simultaneously and selects the one with the highest overall probability.
With beam search, the num_beams parameter specifies how many candidates to maintain. For example, with num_beams=5, the algorithm keeps track of the 5 most promising sequences at each step. At the end, it returns the sequence with the highest total probability.
Beam search typically produces higher-quality, more coherent outputs than sampling because it considers multiple possibilities and selects the best overall sequence. However, it is slower and can produce less diverse outputs:
output_ids = model.generate(
input_ids,
max_new_tokens=100,
num_beams=5, # Use beam search with 5 beams
early_stopping=True # Stop when all beams reach EOS
)
When using beam search, do_sample should be False (the default). You cannot combine beam search with sampling.
The early_stopping parameter controls whether to stop when all beams have generated an end-of-sequence token. Setting it to True can speed up generation.
Implementing User-Configurable Parameters
For our third chatbot version, we will allow users to configure all these parameters interactively. We will create a configuration system that stores parameter values and a menu interface for changing them.
Here is a configuration class that manages generation parameters:
class GenerationConfig:
"""
Manages generation parameters for the chatbot.
Provides defaults and validation for all parameters.
"""
def __init__(self):
"""Initialize with sensible default values."""
self.max_new_tokens = 150
self.temperature = 0.7
self.top_p = 0.9
self.top_k = 50
self.repetition_penalty = 1.1
self.do_sample = True
self.num_beams = 1 # 1 means no beam search
def to_dict(self):
"""
Converts configuration to a dictionary suitable for model.generate().
Returns:
dict: Parameter dictionary
"""
config = {
"max_new_tokens": self.max_new_tokens,
"do_sample": self.do_sample,
"pad_token_id": None # Will be set at generation time
}
if self.do_sample:
# Sampling parameters only apply when do_sample is True
config["temperature"] = self.temperature
config["top_p"] = self.top_p
config["top_k"] = self.top_k
config["repetition_penalty"] = self.repetition_penalty
else:
# Beam search parameters
if self.num_beams > 1:
config["num_beams"] = self.num_beams
config["early_stopping"] = True
return config
def display(self):
"""Prints current configuration in a readable format."""
print("\nCurrent Generation Configuration:")
print(f" Max New Tokens: {self.max_new_tokens}")
print(f" Sampling Mode: {'Enabled' if self.do_sample else 'Disabled (Greedy/Beam)'}")
if self.do_sample:
print(f" Temperature: {self.temperature}")
print(f" Top-p: {self.top_p}")
print(f" Top-k: {self.top_k}")
print(f" Repetition Penalty: {self.repetition_penalty}")
else:
print(f" Num Beams: {self.num_beams}")
def configure_interactive(self):
"""
Interactive menu for changing configuration parameters.
Provides explanations and validation for each parameter.
"""
while True:
print("\n" + "=" * 60)
print("GENERATION PARAMETER CONFIGURATION")
print("=" * 60)
self.display()
print("\nOptions:")
print(" 1. Max New Tokens")
print(" 2. Temperature")
print(" 3. Top-p")
print(" 4. Top-k")
print(" 5. Repetition Penalty")
print(" 6. Toggle Sampling Mode")
print(" 7. Num Beams")
print(" 0. Done")
choice = input("\nSelect option: ").strip()
if choice == "0":
break
elif choice == "1":
try:
value = int(input("Enter max new tokens (50-500): "))
if 1 <= value <= 2000:
self.max_new_tokens = value
else:
print("Value out of range")
except ValueError:
print("Invalid number")
elif choice == "2":
try:
value = float(input("Enter temperature (0.1-2.0): "))
if 0.0 < value <= 2.0:
self.temperature = value
else:
print("Value out of range")
except ValueError:
print("Invalid number")
elif choice == "3":
try:
value = float(input("Enter top-p (0.1-1.0): "))
if 0.0 < value <= 1.0:
self.top_p = value
else:
print("Value out of range")
except ValueError:
print("Invalid number")
elif choice == "4":
try:
value = int(input("Enter top-k (1-200): "))
if value >= 1:
self.top_k = value
else:
print("Value must be at least 1")
except ValueError:
print("Invalid number")
elif choice == "5":
try:
value = float(input("Enter repetition penalty (1.0-2.0): "))
if value >= 1.0:
self.repetition_penalty = value
else:
print("Value must be at least 1.0")
except ValueError:
print("Invalid number")
elif choice == "6":
self.do_sample = not self.do_sample
print(f"Sampling mode: {'Enabled' if self.do_sample else 'Disabled'}")
elif choice == "7":
try:
value = int(input("Enter num beams (1-10): "))
if value >= 1:
self.num_beams = value
if value > 1:
self.do_sample = False
print("Sampling disabled (beam search active)")
else:
print("Value must be at least 1")
except ValueError:
print("Invalid number")
else:
print("Invalid option")
This configuration class encapsulates all generation parameters and provides an interactive menu for changing them. The to_dict method converts the configuration to a dictionary that can be unpacked into the generate method call.
Now we modify our generate_response function to use this configuration:
def generate_response(model, tokenizer, device, conversation_history, gen_config):
"""
Generates response using the provided configuration.
Args:
model: The language model
tokenizer: Associated tokenizer
device: Device where model resides
conversation_history: List of conversation messages
gen_config: GenerationConfig object
Returns:
str: Generated assistant response
"""
# Format conversation
formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(device)
# Get configuration as dictionary
config_dict = gen_config.to_dict()
config_dict["pad_token_id"] = tokenizer.eos_token_id
# Generate with configuration
with torch.no_grad():
output_ids = model.generate(
input_ids,
**config_dict # Unpack configuration dictionary
)
# Decode only new tokens
response = tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
return response.strip()
The key change is that we now pass a GenerationConfig object and unpack its dictionary into the generate call using the double asterisk operator. This makes the function flexible and allows easy parameter experimentation.
Complete Chatbot with Configurable Parameters
Here is the complete third version with full parameter configuration:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
class GenerationConfig:
"""Manages generation parameters with defaults and validation."""
def __init__(self):
self.max_new_tokens = 150
self.temperature = 0.7
self.top_p = 0.9
self.top_k = 50
self.repetition_penalty = 1.1
self.do_sample = True
self.num_beams = 1
def to_dict(self):
config = {
"max_new_tokens": self.max_new_tokens,
"do_sample": self.do_sample,
"pad_token_id": None
}
if self.do_sample:
config["temperature"] = self.temperature
config["top_p"] = self.top_p
config["top_k"] = self.top_k
config["repetition_penalty"] = self.repetition_penalty
else:
if self.num_beams > 1:
config["num_beams"] = self.num_beams
config["early_stopping"] = True
return config
def display(self):
print("\nCurrent Configuration:")
print(f" Max New Tokens: {self.max_new_tokens}")
print(f" Mode: {'Sampling' if self.do_sample else 'Greedy/Beam'}")
if self.do_sample:
print(f" Temperature: {self.temperature}")
print(f" Top-p: {self.top_p}")
print(f" Top-k: {self.top_k}")
print(f" Repetition Penalty: {self.repetition_penalty}")
else:
print(f" Num Beams: {self.num_beams}")
def configure_interactive(self):
while True:
print("\n" + "=" * 60)
print("PARAMETER CONFIGURATION")
print("=" * 60)
self.display()
print("\n1. Max Tokens 2. Temperature 3. Top-p 4. Top-k")
print("5. Repetition Penalty 6. Toggle Sampling 7. Num Beams")
print("0. Done")
choice = input("\nSelect: ").strip()
if choice == "0":
break
elif choice == "1":
try:
self.max_new_tokens = int(input("Max tokens: "))
except ValueError:
print("Invalid number")
elif choice == "2":
try:
self.temperature = float(input("Temperature: "))
except ValueError:
print("Invalid number")
elif choice == "3":
try:
self.top_p = float(input("Top-p: "))
except ValueError:
print("Invalid number")
elif choice == "4":
try:
self.top_k = int(input("Top-k: "))
except ValueError:
print("Invalid number")
elif choice == "5":
try:
self.repetition_penalty = float(input("Repetition penalty: "))
except ValueError:
print("Invalid number")
elif choice == "6":
self.do_sample = not self.do_sample
elif choice == "7":
try:
self.num_beams = int(input("Num beams: "))
if self.num_beams > 1:
self.do_sample = False
except ValueError:
print("Invalid number")
def get_optimal_device():
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"Using CUDA: {torch.cuda.get_device_name(0)}")
return device
elif torch.backends.mps.is_available():
device = torch.device("mps")
print("Using MPS")
return device
else:
device = torch.device("cpu")
print("Using CPU")
return device
def load_model_and_tokenizer(model_name):
print(f"Loading {model_name}...")
tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
low_cpu_mem_usage=True
)
device = get_optimal_device()
model = model.to(device)
model.eval()
print(f"Loaded {model.num_parameters():,} parameters")
return model, tokenizer, device
def manage_context_window(conversation_history, tokenizer, max_tokens=800):
system_message = None
messages = conversation_history.copy()
if messages and len(messages) > 0:
system_message = messages[0]
messages = messages[1:]
if messages:
formatted = tokenizer.eos_token.join(messages)
token_count = len(tokenizer.encode(formatted))
while token_count > max_tokens and len(messages) > 1:
messages.pop(0)
formatted = tokenizer.eos_token.join(messages)
token_count = len(tokenizer.encode(formatted))
result = []
if system_message:
result.append(system_message)
result.extend(messages)
return result
def generate_response(model, tokenizer, device, conversation_history, gen_config):
formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(device)
config_dict = gen_config.to_dict()
config_dict["pad_token_id"] = tokenizer.eos_token_id
with torch.no_grad():
output_ids = model.generate(input_ids, **config_dict)
response = tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
return response.strip()
def main():
print("=" * 60)
print("CONSOLE CHATBOT - Version 3.0 (Configurable Parameters)")
print("=" * 60)
model_name = "gpt2"
model, tokenizer, device = load_model_and_tokenizer(model_name)
gen_config = GenerationConfig()
conversation_history = [
"You are a helpful AI assistant."
]
print("\nCommands: 'quit', 'reset', 'config'")
print("-" * 60)
while True:
user_input = input("\nYou: ").strip()
if user_input.lower() in ['quit', 'exit']:
print("Goodbye!")
break
if user_input.lower() == 'reset':
conversation_history = [conversation_history[0]]
print("History cleared")
continue
if user_input.lower() == 'config':
gen_config.configure_interactive()
continue
if not user_input:
continue
conversation_history.append(user_input)
conversation_history = manage_context_window(
conversation_history,
tokenizer
)
print("Assistant: ", end="", flush=True)
response = generate_response(
model,
tokenizer,
device,
conversation_history,
gen_config
)
print(response)
conversation_history.append(response)
if __name__ == "__main__":
main()
This version allows users to experiment with all generation parameters interactively. The config command opens the configuration menu where parameters can be adjusted. This is invaluable for understanding how different parameters affect output quality and style.
STEP FOUR: ADDING RETRIEVAL-AUGMENTED GENERATION
Understanding the Rationale for RAG
Retrieval-Augmented Generation, commonly abbreviated as RAG, is a technique that combines language models with external knowledge retrieval. The fundamental problem RAG solves is that language models only know what was in their training data, which has several limitations.
First, training data has a cutoff date. A model trained in 2023 knows nothing about events from 2024. Second, models cannot access private or proprietary information that was not in their training data. Third, models sometimes hallucinate, generating plausible-sounding but factually incorrect information.
RAG addresses these limitations by giving the model access to external documents. When a user asks a question, the system first retrieves relevant passages from a document collection, then provides those passages to the model as context. The model can then generate responses grounded in the retrieved information rather than relying solely on its training data.
The RAG workflow consists of several stages. First, documents are loaded and split into chunks. Second, chunks are converted into numerical embeddings that capture their semantic meaning. Third, embeddings are stored in a vector database that enables efficient similarity search. Fourth, when a user asks a question, it is converted to an embedding and used to retrieve the most relevant chunks. Fifth, retrieved chunks are provided to the language model as context. Sixth, the model generates a response based on the retrieved information.
Let us examine each stage in detail.
Document Loading: Ingesting External Knowledge
The first step in RAG is loading documents into a format we can process. Documents can come from many sources: PDF files, Word documents, web pages, databases, APIs, and more. For our chatbot, we will focus on plain text files and PDFs as they are most common.
We need functionality that can handle various document formats. Here are simple but effective loaders:
def load_text_file(file_path):
"""
Loads a plain text file and returns its contents.
Args:
file_path: Path to the text file
Returns:
str: File contents
"""
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def load_pdf_file(file_path):
"""
Loads a PDF file and extracts its text content.
Requires PyPDF2 library: pip install PyPDF2
Args:
file_path: Path to the PDF file
Returns:
str: Extracted text from all pages
"""
try:
import PyPDF2
except ImportError:
raise ImportError("PyPDF2 required for PDF loading. Install with: pip install PyPDF2")
text = ""
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text
def load_documents(file_paths):
"""
Loads multiple documents from a list of file paths.
Automatically detects file type based on extension.
Args:
file_paths: List of file paths to load
Returns:
list: List of dictionaries with 'content' and 'source' keys
"""
documents = []
for file_path in file_paths:
print(f"Loading {file_path}...")
try:
if file_path.endswith('.txt'):
content = load_text_file(file_path)
elif file_path.endswith('.pdf'):
content = load_pdf_file(file_path)
else:
print(f"Unsupported file type: {file_path}")
continue
documents.append({
'content': content,
'source': file_path
})
except Exception as e:
print(f"Error loading {file_path}: {e}")
print(f"Loaded {len(documents)} documents")
return documents
These functions load documents and return them as dictionaries containing the text content and source file path. The source path is important for attribution, allowing us to tell users where information came from.
Text Chunking: Breaking Documents into Manageable Pieces
Raw documents are often too long to process efficiently. A single document might contain thousands of words, but we only need to retrieve the specific sections relevant to a query. Additionally, embedding models and language models have maximum input lengths. We must split documents into smaller chunks.
Chunking strategies vary in sophistication. The simplest approach is fixed-size chunking, where we split text every N characters or tokens. A better approach is semantic chunking, where we split at natural boundaries like paragraphs or sentences. The best approach depends on your documents and use case.
For our chatbot, we will implement a sentence-aware chunking strategy that splits text into chunks of approximately equal size while respecting sentence boundaries:
def split_into_sentences(text):
"""
Splits text into sentences using simple heuristics.
This is a basic implementation; more sophisticated approaches
use NLP libraries like spaCy or NLTK.
Args:
text: Input text
Returns:
list: List of sentences
"""
# Simple sentence splitting based on punctuation
# This handles most cases but is not perfect
import re
# Split on sentence-ending punctuation followed by whitespace
sentences = re.split(r'(?<=[.!?])\s+', text)
# Filter out empty sentences
sentences = [s.strip() for s in sentences if s.strip()]
return sentences
def chunk_text(text, chunk_size=500, overlap=50):
"""
Splits text into overlapping chunks while respecting sentence boundaries.
Overlapping chunks ensure that information spanning chunk boundaries
is not lost. Each chunk includes some context from the previous chunk.
Args:
text: Input text to chunk
chunk_size: Target size of each chunk in characters
overlap: Number of characters to overlap between chunks
Returns:
list: List of text chunks
"""
sentences = split_into_sentences(text)
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence_length = len(sentence)
# If adding this sentence exceeds chunk size, save current chunk
if current_length + sentence_length > chunk_size and current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append(chunk_text)
# Start new chunk with overlap from previous chunk
# Calculate how many sentences to keep for overlap
overlap_text = chunk_text[-overlap:] if len(chunk_text) > overlap else chunk_text
overlap_sentences = split_into_sentences(overlap_text)
current_chunk = overlap_sentences
current_length = len(' '.join(current_chunk))
# Add sentence to current chunk
current_chunk.append(sentence)
current_length += sentence_length + 1 # +1 for space
# Add final chunk
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def chunk_documents(documents, chunk_size=500, overlap=50):
"""
Chunks multiple documents and maintains source attribution.
Args:
documents: List of document dictionaries
chunk_size: Target chunk size
overlap: Overlap between chunks
Returns:
list: List of chunk dictionaries with content, source, and chunk_id
"""
all_chunks = []
for doc in documents:
chunks = chunk_text(doc['content'], chunk_size, overlap)
for i, chunk in enumerate(chunks):
all_chunks.append({
'content': chunk,
'source': doc['source'],
'chunk_id': i
})
print(f"Created {len(all_chunks)} chunks from {len(documents)} documents")
return all_chunks
The chunk_text function implements overlapping chunking. Overlap is important because it ensures that information spanning chunk boundaries is not lost. If a relevant sentence is split across two chunks, the overlap ensures it appears complete in at least one chunk.
The chunk_size parameter controls the target size of each chunk. Smaller chunks are more precise but may lack context. Larger chunks provide more context but are less precise. Typical values range from 300 to 1000 characters.
The overlap parameter controls how much text is shared between consecutive chunks. Typical values are 10-20 percent of chunk size.
Understanding Embeddings: Converting Text to Numbers
Embeddings are dense vector representations of text that capture semantic meaning. Unlike simple word counts or TF-IDF vectors, embeddings place semantically similar texts close together in vector space. For example, the embeddings for "dog" and "puppy" would be very close, while "dog" and "car" would be far apart.
Embedding models are neural networks trained to convert text into fixed-size vectors. These models learn to encode meaning, context, and relationships into the numerical representation. The Transformers library provides access to many embedding models through the sentence-transformers library.
For RAG, we use embeddings to enable semantic search. We convert all document chunks into embeddings and store them. When a user asks a question, we convert the question into an embedding and find the chunks with the most similar embeddings. This retrieves semantically relevant information even if the exact words differ.
Let us implement an embedding system using the sentence-transformers library:
First, install the library:
pip install sentence-transformers
Now implement the embedding wrapper:
from sentence_transformers import SentenceTransformer
import numpy as np
class EmbeddingModel:
"""
Wrapper for sentence embedding models.
Provides a simple interface for encoding text into vectors.
"""
def __init__(self, model_name="all-MiniLM-L6-v2"):
"""
Initializes the embedding model.
Args:
model_name: Name of the sentence-transformers model to use
Default is a fast, efficient model good for most uses
"""
print(f"Loading embedding model: {model_name}")
self.model = SentenceTransformer(model_name)
self.embedding_dim = self.model.get_sentence_embedding_dimension()
print(f"Embedding dimension: {self.embedding_dim}")
def embed_text(self, text):
"""
Converts a single text string into an embedding vector.
Args:
text: Input text
Returns:
numpy.ndarray: Embedding vector
"""
return self.model.encode(text, convert_to_numpy=True)
def embed_batch(self, texts):
"""
Converts multiple texts into embedding vectors efficiently.
Batch processing is much faster than encoding texts individually.
Args:
texts: List of text strings
Returns:
numpy.ndarray: Matrix of embedding vectors (num_texts x embedding_dim)
"""
return self.model.encode(texts, convert_to_numpy=True, show_progress_bar=True)
The SentenceTransformer class from the sentence-transformers library handles all the complexity of embedding generation. It loads a pre-trained model, tokenizes input text, runs it through the model, and returns the embedding vector.
The default model "all-MiniLM-L6-v2" is a good general-purpose choice. It is fast, produces 384-dimensional embeddings, and works well for most applications. Other models offer different tradeoffs between speed, embedding quality, and dimensionality.
The embed_batch method is important for efficiency. Encoding texts one at a time is slow because each call involves overhead. Batch encoding processes multiple texts together, leveraging GPU parallelism and reducing overhead.
Vector Databases: Efficient Similarity Search
Once we have embeddings for all chunks, we need a way to quickly find the most similar chunks to a query embedding. This is called similarity search or nearest neighbor search. A naive approach would compute the similarity between the query and every chunk, but this is too slow for large document collections.
Vector databases are specialized systems optimized for similarity search. They use data structures like HNSW graphs or IVF indexes to find similar vectors quickly. For our chatbot, we will implement a simple in-memory vector store using FAISS, a library from Facebook Research.
First, install FAISS:
pip install faiss-cpu
For GPU support, use faiss-gpu instead.
Now implement the vector store:
import faiss
import numpy as np
class VectorStore:
"""
Simple vector database for storing and searching embeddings.
Uses FAISS for efficient similarity search.
"""
def __init__(self, embedding_dim):
"""
Initializes an empty vector store.
Args:
embedding_dim: Dimensionality of embedding vectors
"""
self.embedding_dim = embedding_dim
# Create a FAISS index for cosine similarity
# L2 normalization + inner product = cosine similarity
self.index = faiss.IndexFlatIP(embedding_dim)
# Store chunk metadata (content, source, etc.)
self.chunks = []
def add_chunks(self, chunks, embeddings):
"""
Adds chunks and their embeddings to the store.
Args:
chunks: List of chunk dictionaries
embeddings: Numpy array of embedding vectors (num_chunks x embedding_dim)
"""
# Normalize embeddings for cosine similarity
faiss.normalize_L2(embeddings)
# Add to FAISS index
self.index.add(embeddings)
# Store chunk metadata
self.chunks.extend(chunks)
print(f"Added {len(chunks)} chunks to vector store")
print(f"Total chunks: {len(self.chunks)}")
def search(self, query_embedding, top_k=5):
"""
Searches for the most similar chunks to a query embedding.
Args:
query_embedding: Query embedding vector
top_k: Number of results to return
Returns:
list: List of (chunk, score) tuples, sorted by similarity
"""
# Normalize query embedding
query_embedding = query_embedding.reshape(1, -1)
faiss.normalize_L2(query_embedding)
# Search for nearest neighbors
# Returns distances (similarities) and indices
distances, indices = self.index.search(query_embedding, top_k)
# Retrieve chunks and scores
results = []
for i, idx in enumerate(indices[0]):
if idx < len(self.chunks): # Valid index
chunk = self.chunks[idx]
score = float(distances[0][i])
results.append((chunk, score))
return results
The VectorStore class wraps FAISS functionality in a simple interface. The IndexFlatIP index performs exact search using inner product similarity. For very large collections, you might use approximate indexes like IndexIVFFlat for faster search at the cost of some accuracy.
Cosine similarity measures the angle between vectors and is ideal for text embeddings. FAISS does not have a direct cosine similarity index, but we can achieve it by normalizing vectors to unit length and using inner product, which is mathematically equivalent.
The search method returns the top-k most similar chunks along with their similarity scores. Higher scores indicate greater similarity.
Similarity Metrics and Search Strategies
We have been using cosine similarity, but other similarity metrics exist. Understanding when to use each is important.
Cosine similarity measures the angle between vectors, ignoring magnitude. It ranges from negative one to one, where one means identical direction. This is ideal for text because we care about semantic similarity regardless of text length.
Euclidean distance measures the straight-line distance between vectors in space. Smaller distances indicate greater similarity. This is sensitive to vector magnitude, which can be problematic for text of varying lengths.
Dot product similarity is the inner product of vectors. It combines both direction and magnitude. This can be useful when magnitude carries meaning, but for text embeddings, cosine similarity is generally better.
Integrating RAG into the Chatbot
Now we integrate all RAG components into our chatbot. The workflow is as follows. When the user asks a question, we embed the question, search the vector store for relevant chunks, format the retrieved chunks as context, and provide them to the language model along with the question.
Here is the complete RAG integration:
def format_rag_prompt(query, retrieved_chunks, conversation_history):
"""
Formats a prompt that includes retrieved context.
Args:
query: User's question
retrieved_chunks: List of (chunk, score) tuples
conversation_history: Existing conversation history
Returns:
list: Updated conversation history with RAG context
"""
# Build context from retrieved chunks
context_parts = []
for i, (chunk, score) in enumerate(retrieved_chunks, 1):
context_parts.append(f"[Document {i} from {chunk['source']}]")
context_parts.append(chunk['content'])
context_parts.append("") # Blank line
context = "\n".join(context_parts)
# Create RAG-enhanced message
rag_message = f"""Based on the following context, please answer the question.
Context:
{context}
Question: {query}
Please provide a detailed answer based on the context provided."""
# Add to conversation history
updated_history = conversation_history.copy()
updated_history.append(rag_message)
return updated_history
def generate_rag_response(model, tokenizer, device, query, conversation_history,
vector_store, embedding_model, gen_config, top_k=3):
"""
Generates a response using RAG.
Args:
model: Language model
tokenizer: Tokenizer
device: Device
query: User query
conversation_history: Conversation history
vector_store: VectorStore instance
embedding_model: EmbeddingModel instance
gen_config: GenerationConfig instance
top_k: Number of chunks to retrieve
Returns:
tuple: (response_text, retrieved_chunks)
"""
# Embed query
query_embedding = embedding_model.embed_text(query)
# Retrieve relevant chunks
retrieved_chunks = vector_store.search(query_embedding, top_k)
# Format prompt with retrieved context
rag_history = format_rag_prompt(query, retrieved_chunks, conversation_history)
# Generate response
formatted_prompt = tokenizer.eos_token.join(rag_history) + tokenizer.eos_token
input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(device)
config_dict = gen_config.to_dict()
config_dict["pad_token_id"] = tokenizer.eos_token_id
with torch.no_grad():
output_ids = model.generate(input_ids, **config_dict)
response = tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
return response.strip(), retrieved_chunks
The format_rag_prompt function creates a prompt that includes retrieved context. It lists each retrieved chunk with its source, then presents the user's question. This gives the model all the information it needs to generate a grounded response.
The generate_rag_response function orchestrates the entire RAG workflow. It retrieves relevant chunks, formats the prompt, and generates a response. It also returns the retrieved chunks so we can show users which sources were used.
Building the RAG-Enabled Chatbot
Let us now create the complete fourth version of our chatbot with full RAG capabilities:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import os
# [Previous classes: GenerationConfig, EmbeddingModel, VectorStore]
class RAGChatbot:
"""
Complete RAG-enabled chatbot system.
"""
def __init__(self, model_name, embedding_model_name="all-MiniLM-L6-v2"):
"""
Initializes the RAG chatbot.
Args:
model_name: HuggingFace model name for generation
embedding_model_name: Sentence-transformers model for embeddings
"""
# Load generation model
print("Loading generation model...")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
low_cpu_mem_usage=True
)
self.device = self.get_optimal_device()
self.model = self.model.to(self.device)
self.model.eval()
# Load embedding model
self.embedding_model = EmbeddingModel(embedding_model_name)
# Initialize vector store
self.vector_store = VectorStore(self.embedding_model.embedding_dim)
# Generation configuration
self.gen_config = GenerationConfig()
# Conversation history
self.conversation_history = [
"You are a helpful AI assistant."
]
# Document chunks
self.chunks = []
def get_optimal_device(self):
if torch.cuda.is_available():
return torch.device("cuda")
elif torch.backends.mps.is_available():
return torch.device("mps")
else:
return torch.device("cpu")
def load_documents(self, file_paths, chunk_size=500, overlap=50):
"""
Loads and indexes documents for RAG.
Args:
file_paths: List of document file paths
chunk_size: Size of text chunks
overlap: Overlap between chunks
"""
print("Loading documents...")
documents = load_documents(file_paths)
print("Chunking documents...")
self.chunks = chunk_documents(documents, chunk_size, overlap)
print("Generating embeddings...")
chunk_texts = [chunk['content'] for chunk in self.chunks]
embeddings = self.embedding_model.embed_batch(chunk_texts)
print("Building vector index...")
self.vector_store.add_chunks(self.chunks, embeddings)
print("Document indexing complete!")
def chat(self, user_input, use_rag=True, top_k=3):
"""
Processes a user message and generates a response.
Args:
user_input: User's message
use_rag: Whether to use RAG
top_k: Number of chunks to retrieve
Returns:
tuple: (response, sources)
"""
if use_rag and self.chunks:
# RAG-enabled response
response, retrieved_chunks = generate_rag_response(
self.model,
self.tokenizer,
self.device,
user_input,
self.conversation_history,
self.vector_store,
self.embedding_model,
self.gen_config,
top_k
)
# Extract sources
sources = [chunk['source'] for chunk, _ in retrieved_chunks]
# Update conversation history with original query and response
self.conversation_history.append(user_input)
self.conversation_history.append(response)
return response, sources
else:
# Regular response without RAG
self.conversation_history.append(user_input)
formatted_prompt = self.tokenizer.eos_token.join(self.conversation_history) + self.tokenizer.eos_token
input_ids = self.tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(self.device)
config_dict = self.gen_config.to_dict()
config_dict["pad_token_id"] = self.tokenizer.eos_token_id
with torch.no_grad():
output_ids = self.model.generate(input_ids, **config_dict)
response = self.tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
self.conversation_history.append(response.strip())
return response.strip(), []
def reset_conversation(self):
"""Clears conversation history."""
self.conversation_history = [self.conversation_history[0]]
def main():
print("=" * 60)
print("RAG-ENABLED CHATBOT - Version 4.0")
print("=" * 60)
# Initialize chatbot
chatbot = RAGChatbot("gpt2")
# Load documents if available
doc_dir = "documents"
if os.path.exists(doc_dir):
doc_files = [os.path.join(doc_dir, f) for f in os.listdir(doc_dir)
if f.endswith(('.txt', '.pdf'))]
if doc_files:
chatbot.load_documents(doc_files)
print("\nCommands: 'quit', 'reset', 'config', 'rag on/off'")
print("-" * 60)
use_rag = True
while True:
user_input = input("\nYou: ").strip()
if user_input.lower() in ['quit', 'exit']:
print("Goodbye!")
break
if user_input.lower() == 'reset':
chatbot.reset_conversation()
print("Conversation reset")
continue
if user_input.lower() == 'config':
chatbot.gen_config.configure_interactive()
continue
if user_input.lower() == 'rag on':
use_rag = True
print("RAG enabled")
continue
if user_input.lower() == 'rag off':
use_rag = False
print("RAG disabled")
continue
if not user_input:
continue
print("Assistant: ", end="", flush=True)
response, sources = chatbot.chat(user_input, use_rag)
print(response)
if sources:
print(f"\nSources: {', '.join(set(sources))}")
if __name__ == "__main__":
main()
This complete RAG-enabled chatbot can load documents, index them, retrieve relevant information, and generate grounded responses. Users can toggle RAG on or off to compare different modes.
STEP FIVE: WEB-BASED CHATBOT INTERFACE
Transitioning from Console to Web
Our final step is transforming the console chatbot into a web application. This makes it accessible to users who are not comfortable with command-line interfaces and enables features like rich formatting, file uploads, and concurrent users.
We will use Flask, a lightweight Python web framework, to build our web application. Flask provides routing, request handling, and template rendering. We will also implement streaming responses using Server-Sent Events, which allows the chatbot to display text as it is generated rather than waiting for the complete response.
Understanding Flask Basics
Flask is a micro web framework that makes building web applications simple. A Flask application consists of routes, which are functions that handle HTTP requests for specific URLs. Here is a minimal Flask application:
from flask import Flask, render_template, request, jsonify
app = Flask(__name__)
@app.route('/')
def home():
"""Renders the home page."""
return render_template('index.html')
@app.route('/api/chat', methods=['POST'])
def chat():
"""Handles chat requests."""
data = request.get_json()
user_message = data.get('message', '')
# Process message and generate response
response = f"You said: {user_message}"
return jsonify({'response': response})
if __name__ == '__main__':
app.run(debug=True, port=5000)
The @app.route decorator defines URL routes. The home function renders an HTML template for the main page. The chat function handles POST requests to the /api/chat endpoint, processes the message, and returns a JSON response.
Flask templates use Jinja2 syntax, which allows embedding Python expressions in HTML. Templates are stored in a templates directory relative to the Flask application file.
Implementing Streaming Responses
Streaming responses improve user experience by showing text as it is generated rather than waiting for the complete response. This is especially important for long responses that might take many seconds to generate.
The Transformers library supports streaming through the TextIteratorStreamer class. This class receives tokens as they are generated and makes them available through an iterator:
from transformers import TextIteratorStreamer
from threading import Thread
def generate_streaming_response(model, tokenizer, device, conversation_history, gen_config):
"""
Generates a streaming response.
Yields:
str: Generated text chunks
"""
formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(device)
config_dict = gen_config.to_dict()
config_dict["pad_token_id"] = tokenizer.eos_token_id
# Create streamer
streamer = TextIteratorStreamer(
tokenizer,
skip_prompt=True, # Don't include the input prompt in output
skip_special_tokens=True # Skip EOS and padding tokens
)
# Generate in a separate thread
# This allows the streamer to yield tokens while generation continues
generation_kwargs = {
"input_ids": input_ids,
"streamer": streamer,
**config_dict
}
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
# Yield tokens as they become available
for text_chunk in streamer:
yield text_chunk
thread.join()
The TextIteratorStreamer is passed to the generate method, which sends tokens to it as they are produced. We run generation in a separate thread so the main thread can iterate over the streamer and yield chunks to the web client.
Server-Sent Events for Real-Time Updates
Server-Sent Events, abbreviated as SSE, is a web standard for pushing updates from server to client. Unlike WebSockets, which require bidirectional communication, SSE is unidirectional and simpler to implement.
Flask can serve SSE responses using a generator function:
from flask import Response
@app.route('/api/chat/stream', methods=['POST'])
def chat_stream():
"""Streams chat response using Server-Sent Events."""
data = request.get_json()
user_message = data.get('message', '')
def generate():
# Add user message to history
chatbot.conversation_history.append(user_message)
# Stream response
full_response = ""
for chunk in generate_streaming_response(
chatbot.model,
chatbot.tokenizer,
chatbot.device,
chatbot.conversation_history,
chatbot.gen_config
):
full_response += chunk
# Format as SSE
yield f"data: {chunk}\n\n"
# Add complete response to history
chatbot.conversation_history.append(full_response)
# Send completion signal
yield "data: [DONE]\n\n"
return Response(generate(), mimetype='text/event-stream')
The generate function is a generator that yields SSE-formatted messages. Each message is prefixed with "data: " and followed by two newlines. The client receives these messages in real-time and can update the UI incrementally.
Building the Web Interface
The web interface consists of HTML, CSS, and JavaScript. We create a chat interface with a message list and input form. JavaScript handles sending messages and receiving streaming responses.
Create a directory named templates in the same directory as your Flask application, and save the following HTML file as templates/index.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>RAG Chatbot</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: Arial, sans-serif;
background: #f5f5f5;
height: 100vh;
display: flex;
flex-direction: column;
}
.header {
background: #2c3e50;
color: white;
padding: 20px;
text-align: center;
}
.chat-container {
flex: 1;
display: flex;
flex-direction: column;
max-width: 900px;
width: 100%;
margin: 0 auto;
background: white;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.messages {
flex: 1;
overflow-y: auto;
padding: 20px;
}
.message {
margin-bottom: 15px;
padding: 10px 15px;
border-radius: 8px;
max-width: 70%;
}
.user-message {
background: #3498db;
color: white;
margin-left: auto;
}
.assistant-message {
background: #ecf0f1;
color: #2c3e50;
}
.input-container {
padding: 20px;
border-top: 1px solid #ddd;
display: flex;
gap: 10px;
}
#message-input {
flex: 1;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
font-size: 14px;
}
#send-button {
padding: 10px 20px;
background: #3498db;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
}
#send-button:hover {
background: #2980b9;
}
#send-button:disabled {
background: #95a5a6;
cursor: not-allowed;
}
.typing-indicator {
display: none;
padding: 10px 15px;
background: #ecf0f1;
border-radius: 8px;
max-width: 70%;
color: #7f8c8d;
}
.typing-indicator.active {
display: block;
}
</style>
</head>
<body>
<div class="header">
<h1>RAG-Enabled Chatbot</h1>
<p>Powered by HuggingFace Transformers</p>
</div>
<div class="chat-container">
<div class="messages" id="messages">
<div class="message assistant-message">
Hello! I'm your AI assistant. How can I help you today?
</div>
</div>
<div class="typing-indicator" id="typing-indicator">
Assistant is typing...
</div>
<div class="input-container">
<input
type="text"
id="message-input"
placeholder="Type your message here..."
autocomplete="off"
>
<button id="send-button">Send</button>
</div>
</div>
<script>
const messagesContainer = document.getElementById('messages');
const messageInput = document.getElementById('message-input');
const sendButton = document.getElementById('send-button');
const typingIndicator = document.getElementById('typing-indicator');
function addMessage(content, isUser) {
const messageDiv = document.createElement('div');
messageDiv.className = `message ${isUser ? 'user-message' : 'assistant-message'}`;
messageDiv.textContent = content;
messagesContainer.appendChild(messageDiv);
messagesContainer.scrollTop = messagesContainer.scrollHeight;
return messageDiv;
}
async function sendMessage() {
const message = messageInput.value.trim();
if (!message) return;
// Add user message
addMessage(message, true);
messageInput.value = '';
// Disable input
messageInput.disabled = true;
sendButton.disabled = true;
typingIndicator.classList.add('active');
// Create assistant message div
const assistantMessageDiv = document.createElement('div');
assistantMessageDiv.className = 'message assistant-message';
messagesContainer.appendChild(assistantMessageDiv);
try {
// Send request and stream response
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ message: message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.substring(6);
if (data === '[DONE]') {
break;
}
assistantMessageDiv.textContent += data;
messagesContainer.scrollTop = messagesContainer.scrollHeight;
}
}
}
} catch (error) {
assistantMessageDiv.textContent = 'Error: Failed to get response';
console.error('Error:', error);
} finally {
messageInput.disabled = false;
sendButton.disabled = false;
typingIndicator.classList.remove('active');
}
}
sendButton.addEventListener('click', sendMessage);
messageInput.addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
This interface provides a clean chat experience with user messages on the right in blue and assistant messages on the left in gray. The JavaScript code handles sending messages and receiving streaming responses using the Fetch API with ReadableStream.
Complete Web Application
Here is the complete Flask application integrating all components. This should be saved as a separate Python file, for example app.py:
from flask import Flask, render_template, request, jsonify, Response
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from threading import Thread, Lock
import os
# Import all previous classes
# [Include GenerationConfig, EmbeddingModel, VectorStore, RAGChatbot classes here]
app = Flask(__name__)
# Global chatbot instance with thread safety
chatbot = None
chatbot_lock = Lock()
def initialize_chatbot():
"""Initializes the chatbot on application startup."""
global chatbot
print("Initializing chatbot...")
chatbot = RAGChatbot("gpt2")
# Load documents if available
doc_dir = "documents"
if os.path.exists(doc_dir):
doc_files = [os.path.join(doc_dir, f) for f in os.listdir(doc_dir)
if f.endswith(('.txt', '.pdf'))]
if doc_files:
chatbot.load_documents(doc_files)
print("Chatbot ready!")
@app.route('/')
def home():
"""Renders the main chat interface."""
return render_template('index.html')
@app.route('/api/chat', methods=['POST'])
def chat():
"""Handles non-streaming chat requests."""
data = request.get_json()
user_message = data.get('message', '')
use_rag = data.get('use_rag', True)
if not user_message:
return jsonify({'error': 'No message provided'}), 400
try:
with chatbot_lock:
response, sources = chatbot.chat(user_message, use_rag=use_rag)
return jsonify({
'response': response,
'sources': sources
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/chat/stream', methods=['POST'])
def chat_stream():
"""Handles streaming chat requests using Server-Sent Events."""
data = request.get_json()
user_message = data.get('message', '')
if not user_message:
return jsonify({'error': 'No message provided'}), 400
def generate():
try:
with chatbot_lock:
# Add user message to history
chatbot.conversation_history.append(user_message)
# Format prompt
formatted_prompt = chatbot.tokenizer.eos_token.join(
chatbot.conversation_history
) + chatbot.tokenizer.eos_token
input_ids = chatbot.tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(chatbot.device)
config_dict = chatbot.gen_config.to_dict()
config_dict["pad_token_id"] = chatbot.tokenizer.eos_token_id
# Create streamer
streamer = TextIteratorStreamer(
chatbot.tokenizer,
skip_prompt=True,
skip_special_tokens=True
)
# Generate in thread
generation_kwargs = {
"input_ids": input_ids,
"streamer": streamer,
**config_dict
}
thread = Thread(target=chatbot.model.generate, kwargs=generation_kwargs)
thread.start()
# Stream tokens
full_response = ""
for text_chunk in streamer:
full_response += text_chunk
yield f"data: {text_chunk}\n\n"
thread.join()
# Add response to history
chatbot.conversation_history.append(full_response)
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: Error: {str(e)}\n\n"
yield "data: [DONE]\n\n"
return Response(generate(), mimetype='text/event-stream')
@app.route('/api/reset', methods=['POST'])
def reset():
"""Resets the conversation history."""
with chatbot_lock:
chatbot.reset_conversation()
return jsonify({'status': 'success'})
@app.route('/api/config', methods=['GET', 'POST'])
def config():
"""Gets or updates generation configuration."""
if request.method == 'GET':
with chatbot_lock:
return jsonify({
'max_new_tokens': chatbot.gen_config.max_new_tokens,
'temperature': chatbot.gen_config.temperature,
'top_p': chatbot.gen_config.top_p,
'top_k': chatbot.gen_config.top_k,
'repetition_penalty': chatbot.gen_config.repetition_penalty
})
else:
data = request.get_json()
with chatbot_lock:
if 'max_new_tokens' in data:
chatbot.gen_config.max_new_tokens = data['max_new_tokens']
if 'temperature' in data:
chatbot.gen_config.temperature = data['temperature']
if 'top_p' in data:
chatbot.gen_config.top_p = data['top_p']
if 'top_k' in data:
chatbot.gen_config.top_k = data['top_k']
if 'repetition_penalty' in data:
chatbot.gen_config.repetition_penalty = data['repetition_penalty']
return jsonify({'status': 'success'})
if __name__ == '__main__':
initialize_chatbot()
app.run(debug=True, host='0.0.0.0', port=5000)
This complete web application provides a production-ready chatbot interface with streaming responses, configurable parameters, and RAG capabilities. The thread lock ensures thread safety for concurrent requests.
COMPLETE RUNNING EXAMPLE: PRODUCTION-READY RAG CHATBOT
The following is a complete, production-ready implementation that combines all five versions of our chatbot. This code includes error handling, logging, configuration management, and all features discussed throughout the tutorial. Save this as chatbot_complete.py:
#!/usr/bin/env python3
"""
Complete RAG-Enabled Chatbot System
Supports console and web interfaces with full RAG capabilities
"""
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import os
import sys
from threading import Thread, Lock
from flask import Flask, render_template, request, jsonify, Response
import logging
import re
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class GenerationConfig:
"""Manages generation parameters with validation."""
def __init__(self):
self.max_new_tokens = 150
self.temperature = 0.7
self.top_p = 0.9
self.top_k = 50
self.repetition_penalty = 1.1
self.do_sample = True
self.num_beams = 1
def to_dict(self):
"""Converts configuration to dictionary for model.generate()."""
config = {
"max_new_tokens": self.max_new_tokens,
"do_sample": self.do_sample,
"pad_token_id": None
}
if self.do_sample:
config["temperature"] = self.temperature
config["top_p"] = self.top_p
config["top_k"] = self.top_k
config["repetition_penalty"] = self.repetition_penalty
else:
if self.num_beams > 1:
config["num_beams"] = self.num_beams
config["early_stopping"] = True
return config
def display(self):
"""Displays current configuration."""
print("\nGeneration Configuration:")
print(f" Max New Tokens: {self.max_new_tokens}")
print(f" Mode: {'Sampling' if self.do_sample else 'Greedy/Beam'}")
if self.do_sample:
print(f" Temperature: {self.temperature}")
print(f" Top-p: {self.top_p}")
print(f" Top-k: {self.top_k}")
print(f" Repetition Penalty: {self.repetition_penalty}")
else:
print(f" Num Beams: {self.num_beams}")
def configure_interactive(self):
"""Interactive configuration menu."""
while True:
print("\n" + "=" * 60)
print("PARAMETER CONFIGURATION")
print("=" * 60)
self.display()
print("\n1. Max Tokens 2. Temperature 3. Top-p 4. Top-k")
print("5. Repetition Penalty 6. Toggle Sampling 7. Num Beams")
print("0. Done")
choice = input("\nSelect: ").strip()
if choice == "0":
break
elif choice == "1":
try:
self.max_new_tokens = int(input("Max tokens: "))
except ValueError:
print("Invalid number")
elif choice == "2":
try:
self.temperature = float(input("Temperature: "))
except ValueError:
print("Invalid number")
elif choice == "3":
try:
self.top_p = float(input("Top-p: "))
except ValueError:
print("Invalid number")
elif choice == "4":
try:
self.top_k = int(input("Top-k: "))
except ValueError:
print("Invalid number")
elif choice == "5":
try:
self.repetition_penalty = float(input("Repetition penalty: "))
except ValueError:
print("Invalid number")
elif choice == "6":
self.do_sample = not self.do_sample
elif choice == "7":
try:
self.num_beams = int(input("Num beams: "))
if self.num_beams > 1:
self.do_sample = False
except ValueError:
print("Invalid number")
class EmbeddingModel:
"""Wrapper for sentence embedding models."""
def __init__(self, model_name="all-MiniLM-L6-v2"):
logger.info(f"Loading embedding model: {model_name}")
self.model = SentenceTransformer(model_name)
self.embedding_dim = self.model.get_sentence_embedding_dimension()
logger.info(f"Embedding dimension: {self.embedding_dim}")
def embed_text(self, text):
"""Embeds a single text string."""
return self.model.encode(text, convert_to_numpy=True)
def embed_batch(self, texts):
"""Embeds multiple texts efficiently."""
return self.model.encode(texts, convert_to_numpy=True, show_progress_bar=True)
class VectorStore:
"""Vector database for similarity search."""
def __init__(self, embedding_dim):
self.embedding_dim = embedding_dim
self.index = faiss.IndexFlatIP(embedding_dim)
self.chunks = []
def add_chunks(self, chunks, embeddings):
"""Adds chunks and embeddings to the store."""
faiss.normalize_L2(embeddings)
self.index.add(embeddings)
self.chunks.extend(chunks)
logger.info(f"Added {len(chunks)} chunks (total: {len(self.chunks)})")
def search(self, query_embedding, top_k=5):
"""Searches for similar chunks."""
query_embedding = query_embedding.reshape(1, -1)
faiss.normalize_L2(query_embedding)
distances, indices = self.index.search(query_embedding, top_k)
results = []
for i, idx in enumerate(indices[0]):
if idx < len(self.chunks):
chunk = self.chunks[idx]
score = float(distances[0][i])
results.append((chunk, score))
return results
def load_text_file(file_path):
"""Loads a plain text file."""
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def load_pdf_file(file_path):
"""Loads a PDF file and extracts text."""
try:
import PyPDF2
except ImportError:
raise ImportError("PyPDF2 required. Install with: pip install PyPDF2")
text = ""
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
return text
def load_documents(file_paths):
"""Loads multiple documents."""
documents = []
for file_path in file_paths:
logger.info(f"Loading {file_path}")
try:
if file_path.endswith('.txt'):
content = load_text_file(file_path)
elif file_path.endswith('.pdf'):
content = load_pdf_file(file_path)
else:
logger.warning(f"Unsupported file type: {file_path}")
continue
documents.append({
'content': content,
'source': file_path
})
except Exception as e:
logger.error(f"Error loading {file_path}: {e}")
logger.info(f"Loaded {len(documents)} documents")
return documents
def split_into_sentences(text):
"""Splits text into sentences."""
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
return sentences
def chunk_text(text, chunk_size=500, overlap=50):
"""Chunks text with overlap."""
sentences = split_into_sentences(text)
chunks = []
current_chunk = []
current_length = 0
for sentence in sentences:
sentence_length = len(sentence)
if current_length + sentence_length > chunk_size and current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append(chunk_text)
overlap_text = chunk_text[-overlap:] if len(chunk_text) > overlap else chunk_text
overlap_sentences = split_into_sentences(overlap_text)
current_chunk = overlap_sentences
current_length = len(' '.join(current_chunk))
current_chunk.append(sentence)
current_length += sentence_length + 1
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def chunk_documents(documents, chunk_size=500, overlap=50):
"""Chunks multiple documents."""
all_chunks = []
for doc in documents:
chunks = chunk_text(doc['content'], chunk_size, overlap)
for i, chunk in enumerate(chunks):
all_chunks.append({
'content': chunk,
'source': doc['source'],
'chunk_id': i
})
logger.info(f"Created {len(all_chunks)} chunks")
return all_chunks
def get_optimal_device():
"""Detects optimal device."""
if torch.cuda.is_available():
device = torch.device("cuda")
logger.info(f"Using CUDA: {torch.cuda.get_device_name(0)}")
return device
elif torch.backends.mps.is_available():
device = torch.device("mps")
logger.info("Using Apple MPS")
return device
else:
device = torch.device("cpu")
logger.info("Using CPU")
return device
def manage_context_window(conversation_history, tokenizer, max_tokens=800):
"""Manages context window by removing old messages."""
system_message = None
messages = conversation_history.copy()
if messages and len(messages) > 0:
system_message = messages[0]
messages = messages[1:]
if messages:
formatted = tokenizer.eos_token.join(messages)
token_count = len(tokenizer.encode(formatted))
while token_count > max_tokens and len(messages) > 1:
messages.pop(0)
formatted = tokenizer.eos_token.join(messages)
token_count = len(tokenizer.encode(formatted))
result = []
if system_message:
result.append(system_message)
result.extend(messages)
return result
def format_rag_prompt(query, retrieved_chunks, conversation_history):
"""Formats a prompt with retrieved context."""
context_parts = []
for i, (chunk, score) in enumerate(retrieved_chunks, 1):
context_parts.append(f"[Document {i} from {chunk['source']}]")
context_parts.append(chunk['content'])
context_parts.append("")
context = "\n".join(context_parts)
rag_message = f"""Based on the following context, please answer the question.
Context:
{context}
Question: {query}
Please provide a detailed answer based on the context provided."""
updated_history = conversation_history.copy()
updated_history.append(rag_message)
return updated_history
class RAGChatbot:
"""Complete RAG-enabled chatbot system."""
def __init__(self, model_name, embedding_model_name="all-MiniLM-L6-v2"):
logger.info("Initializing RAG chatbot")
# Load generation model
logger.info(f"Loading model: {model_name}")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
low_cpu_mem_usage=True
)
self.device = get_optimal_device()
self.model = self.model.to(self.device)
self.model.eval()
# Load embedding model
self.embedding_model = EmbeddingModel(embedding_model_name)
# Initialize vector store
self.vector_store = VectorStore(self.embedding_model.embedding_dim)
# Generation configuration
self.gen_config = GenerationConfig()
# Conversation history
self.conversation_history = [
"You are a helpful AI assistant."
]
# Document chunks
self.chunks = []
logger.info("Chatbot initialized successfully")
def load_documents(self, file_paths, chunk_size=500, overlap=50):
"""Loads and indexes documents."""
logger.info("Loading documents for RAG")
documents = load_documents(file_paths)
logger.info("Chunking documents")
self.chunks = chunk_documents(documents, chunk_size, overlap)
logger.info("Generating embeddings")
chunk_texts = [chunk['content'] for chunk in self.chunks]
embeddings = self.embedding_model.embed_batch(chunk_texts)
logger.info("Building vector index")
self.vector_store.add_chunks(self.chunks, embeddings)
logger.info("Document indexing complete")
def chat(self, user_input, use_rag=True, top_k=3):
"""Processes user message and generates response."""
try:
if use_rag and self.chunks:
return self._chat_with_rag(user_input, top_k)
else:
return self._chat_without_rag(user_input)
except Exception as e:
logger.error(f"Error in chat: {e}")
return f"Error: {str(e)}", []
def _chat_with_rag(self, user_input, top_k):
"""Generates RAG-enabled response."""
# Retrieve relevant chunks
query_embedding = self.embedding_model.embed_text(user_input)
retrieved_chunks = self.vector_store.search(query_embedding, top_k)
# Format RAG prompt
rag_history = format_rag_prompt(user_input, retrieved_chunks, self.conversation_history)
# Generate response
response = self._generate(rag_history)
# Update history with original query
self.conversation_history.append(user_input)
self.conversation_history.append(response)
sources = [chunk['source'] for chunk, _ in retrieved_chunks]
return response, sources
def _chat_without_rag(self, user_input):
"""Generates regular response."""
self.conversation_history.append(user_input)
response = self._generate(self.conversation_history)
self.conversation_history.append(response)
return response, []
def _generate(self, conversation_history):
"""Generates response from conversation history."""
formatted_prompt = self.tokenizer.eos_token.join(conversation_history) + self.tokenizer.eos_token
input_ids = self.tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(self.device)
config_dict = self.gen_config.to_dict()
config_dict["pad_token_id"] = self.tokenizer.eos_token_id
with torch.no_grad():
output_ids = self.model.generate(input_ids, **config_dict)
response = self.tokenizer.decode(
output_ids[0][input_ids.shape[1]:],
skip_special_tokens=True
)
return response.strip()
def generate_streaming(self, conversation_history):
"""Generates streaming response."""
formatted_prompt = self.tokenizer.eos_token.join(conversation_history) + self.tokenizer.eos_token
input_ids = self.tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(self.device)
config_dict = self.gen_config.to_dict()
config_dict["pad_token_id"] = self.tokenizer.eos_token_id
streamer = TextIteratorStreamer(
self.tokenizer,
skip_prompt=True,
skip_special_tokens=True
)
generation_kwargs = {
"input_ids": input_ids,
"streamer": streamer,
**config_dict
}
thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
for text_chunk in streamer:
yield text_chunk
thread.join()
def reset_conversation(self):
"""Resets conversation history."""
self.conversation_history = [self.conversation_history[0]]
logger.info("Conversation reset")
def run_console_chatbot():
"""Runs console version of chatbot."""
print("=" * 60)
print("RAG-ENABLED CONSOLE CHATBOT")
print("=" * 60)
model_name = "gpt2"
chatbot = RAGChatbot(model_name)
# Load documents if available
doc_dir = "documents"
if os.path.exists(doc_dir):
doc_files = [os.path.join(doc_dir, f) for f in os.listdir(doc_dir)
if f.endswith(('.txt', '.pdf'))]
if doc_files:
chatbot.load_documents(doc_files)
print("\nCommands: 'quit', 'reset', 'config', 'rag on/off'")
print("-" * 60)
use_rag = True
while True:
user_input = input("\nYou: ").strip()
if user_input.lower() in ['quit', 'exit']:
print("Goodbye!")
break
if user_input.lower() == 'reset':
chatbot.reset_conversation()
print("Conversation reset")
continue
if user_input.lower() == 'config':
chatbot.gen_config.configure_interactive()
continue
if user_input.lower() == 'rag on':
use_rag = True
print("RAG enabled")
continue
if user_input.lower() == 'rag off':
use_rag = False
print("RAG disabled")
continue
if not user_input:
continue
print("Assistant: ", end="", flush=True)
response, sources = chatbot.chat(user_input, use_rag)
print(response)
if sources:
print(f"\nSources: {', '.join(set(sources))}")
def create_web_app(chatbot):
"""Creates Flask web application."""
app = Flask(__name__)
chatbot_lock = Lock()
@app.route('/')
def home():
return render_template('index.html')
@app.route('/api/chat', methods=['POST'])
def chat():
data = request.get_json()
user_message = data.get('message', '')
use_rag = data.get('use_rag', True)
if not user_message:
return jsonify({'error': 'No message provided'}), 400
try:
with chatbot_lock:
response, sources = chatbot.chat(user_message, use_rag=use_rag)
return jsonify({'response': response, 'sources': sources})
except Exception as e:
logger.error(f"Chat error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/chat/stream', methods=['POST'])
def chat_stream():
data = request.get_json()
user_message = data.get('message', '')
if not user_message:
return jsonify({'error': 'No message provided'}), 400
def generate():
try:
with chatbot_lock:
chatbot.conversation_history.append(user_message)
full_response = ""
for chunk in chatbot.generate_streaming(chatbot.conversation_history):
full_response += chunk
yield f"data: {chunk}\n\n"
chatbot.conversation_history.append(full_response)
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Streaming error: {e}")
yield f"data: Error: {str(e)}\n\n"
yield "data: [DONE]\n\n"
return Response(generate(), mimetype='text/event-stream')
@app.route('/api/reset', methods=['POST'])
def reset():
with chatbot_lock:
chatbot.reset_conversation()
return jsonify({'status': 'success'})
return app
def run_web_chatbot():
"""Runs web version of chatbot."""
logger.info("Starting web chatbot")
model_name = "gpt2"
chatbot = RAGChatbot(model_name)
# Load documents
doc_dir = "documents"
if os.path.exists(doc_dir):
doc_files = [os.path.join(doc_dir, f) for f in os.listdir(doc_dir)
if f.endswith(('.txt', '.pdf'))]
if doc_files:
chatbot.load_documents(doc_files)
app = create_web_app(chatbot)
app.run(debug=True, host='0.0.0.0', port=5000)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == 'web':
run_web_chatbot()
else:
run_console_chatbot()
This complete implementation provides a fully functional RAG-enabled chatbot with both console and web interfaces. It includes all features discussed throughout the tutorial: conversation memory, configurable generation parameters, document loading and chunking, embedding generation, vector search, and streaming responses.
To use the console version, run:
python chatbot_complete.py
To use the web version, run:
python chatbot_complete.py web
The chatbot automatically detects available hardware and uses GPU acceleration when possible. Documents placed in a documents directory are automatically loaded and indexed for RAG.
This represents a production-ready system that can be deployed and used for real applications. The modular design allows easy extension and customization for specific use cases.