Sunday, December 14, 2025

THE COMPLETE GUIDE TO BUILDING LLM APPLICATIONS WITH HUGGINGFACE TRANSFORMERS

 




INTRODUCTION

Welcome to the fascinating world of Large Language Models and the HuggingFace Transformers library. If you are reading this, you are about to embark on a transformative journey that will take you from someone who has never interacted with an LLM to a developer capable of building sophisticated AI-powered applications. This tutorial is meticulously designed for software engineers who understand general programming concepts but have absolutely zero experience with machine learning, natural language processing, or artificial intelligence.

The HuggingFace Transformers library represents a revolutionary shift in how developers interact with advanced language models. Before this library existed, working with models like GPT, BERT, LLaMA, or any other transformer-based architecture required deep expertise in machine learning frameworks, comprehensive understanding of complex neural network architectures, and the ability to deal with low-level tensor operations and mathematical computations. HuggingFace fundamentally changed this landscape by providing a unified, intuitive, and remarkably simple interface to thousands of pre-trained models.

Throughout this tutorial, we will construct five progressively sophisticated versions of an LLM chatbot application. Each version builds upon the previous one, systematically adding new capabilities while maintaining complete backward compatibility. By the time you reach the end of this tutorial, you will have created a production-ready, web-based chatbot with Retrieval-Augmented Generation capabilities that can run efficiently on any hardware configuration, from basic CPUs to high-end GPUs from different manufacturers.

WHAT EXACTLY IS A LARGE LANGUAGE MODEL

Before we dive into code, we must establish a foundational understanding of what a Large Language Model actually is. An LLM is a type of artificial neural network that has been trained on enormous amounts of text data from the internet, books, articles, and other written sources. The training process teaches the model to predict what word or token should come next in a sequence, given all the previous words.

Think of it like an incredibly sophisticated autocomplete system. When you type on your smartphone and it suggests the next word, that is a simple version of what LLMs do. However, LLMs are vastly more powerful because they have learned patterns, facts, reasoning capabilities, and language structures from billions of examples. They can generate coherent paragraphs, answer questions, write code, translate languages, and perform countless other language-related tasks.

The term "transformer" refers to the specific neural network architecture these models use. Transformers were introduced in a 2017 research paper titled "Attention Is All You Need" and revolutionized natural language processing. The key innovation is the attention mechanism, which allows the model to focus on relevant parts of the input when generating each output token. We will not dive into the mathematical details of transformers, but understanding that they process sequences of tokens and use attention to capture relationships between words is sufficient for our purposes.

WHAT YOU WILL BUILD THROUGHOUT THIS TUTORIAL

Our journey consists of five major milestones, each representing a complete, functional application that builds upon the previous version.

First, we create a basic console chatbot that can load any compatible LLM model and engage in conversations with users through the command line. This foundation teaches you about model loading, tokenization, text generation, hardware acceleration, and the fundamental components of the Transformers library.

Second, we add conversation memory so the chatbot remembers previous exchanges within a session. This introduces you to the stateless nature of LLMs, which is a critical concept. Unlike traditional applications that maintain state, LLMs have no memory between calls. Every time you send a prompt, you must include all relevant conversation history. We will implement this properly.

Third, we implement configurable generation parameters, giving users fine-grained control over how the model generates text. Parameters like temperature control randomness, max tokens limit output length, top-p and top-k control sampling strategies, and repetition penalty prevents the model from repeating itself. Understanding these parameters is essential for getting quality outputs.

Fourth, we integrate Retrieval-Augmented Generation, commonly abbreviated as RAG. This powerful technique allows the chatbot to reference external documents and provide grounded, factual responses based on your own data rather than just the model's training data. We will cover document loading, text chunking strategies, embeddings, vector databases, similarity search, and reranking.

Finally, we transform everything into a web application with a modern interface that users can access through their browsers. This involves integrating a web framework, handling asynchronous operations, implementing streaming responses for better user experience, and deploying the complete system.

Let us begin this exciting journey by understanding the absolute fundamentals.

STEP ONE: BUILDING THE FOUNDATION - A BASIC CONSOLE CHATBOT

Understanding What We Are Actually Building

A chatbot, at its most fundamental level, is a program that accepts text input from a user, processes that text through a language model, and returns a generated response. The simplest possible implementation involves several distinct steps. First, we load a pre-trained model into computer memory. Second, we convert user text into numerical tokens that the model can understand. Third, we run those tokens through the model's neural network to generate output tokens. Fourth, we convert the output tokens back into human-readable text and display it to the user.

However, beneath this simple description lie several layers of complexity that we must understand and handle properly. Different models have different architectures and were trained with different conversation formats. Some models expect messages wrapped in special tokens, while others use different conventions. Some models are instruction-tuned specifically for chat interactions, while others are base models better suited for text completion tasks. Our chatbot must handle these variations gracefully and automatically.

Additionally, modern computers have incredibly diverse hardware configurations. Some machines have NVIDIA GPUs with CUDA support, others have AMD GPUs with ROCm drivers, some have Apple Silicon processors with Metal Performance Shaders, and many have only CPUs without any GPU acceleration. Our application must intelligently detect available hardware and automatically use the fastest option without requiring manual configuration from users.

Installing Required Dependencies and Understanding Each One

Before writing any code, we need to install the necessary Python packages. The HuggingFace ecosystem consists of several libraries that work together synergistically. Each library has a specific purpose and understanding these purposes is crucial.

The transformers library is the core package that provides model architectures, tokenizers, training utilities, and high-level APIs for working with transformer models. This is the main library we will use throughout this tutorial.

The torch library, also known as PyTorch, is the underlying deep learning framework. PyTorch provides the fundamental building blocks for neural networks including tensors, which are multi-dimensional arrays of numbers, automatic differentiation for computing gradients, and GPU acceleration. The Transformers library is built on top of PyTorch and uses it for all low-level operations.

The accelerate library is a HuggingFace utility that simplifies device management, distributed training, and mixed precision training. It helps us write code that works seamlessly across different hardware configurations without manual device management.

The sentencepiece library is a tokenization library developed by Google. Some models, particularly those from the LLaMA family and many multilingual models, use SentencePiece tokenization. This library must be installed for those tokenizers to work.

The tokenizers library is a fast tokenization library written in Rust with Python bindings. It provides extremely efficient implementations of various tokenization algorithms and is used by many modern models for speed.

Open your terminal and execute the following installation command:

pip install transformers torch accelerate sentencepiece tokenizers

This command installs all core dependencies with their latest stable versions. Depending on your specific hardware configuration, you might need additional packages. For NVIDIA GPUs, you need to ensure you have CUDA-compatible PyTorch installed. The PyTorch website provides a configuration tool to generate the correct installation command for your system. For AMD GPUs, you need ROCm-compatible PyTorch, which requires following AMD's specific installation instructions. For Apple Silicon, the standard PyTorch installation includes Metal Performance Shaders support automatically, so no additional steps are needed.

Understanding Tokenization: The Critical Bridge Between Text and Numbers

Neural networks, at their core, are mathematical functions that operate on numerical data. They cannot process text directly. They work with tensors, which are multi-dimensional arrays of numbers similar to matrices in linear algebra but potentially with more dimensions. Tokenization is the fundamental process of converting human-readable text into numerical representations that neural networks can process.

This conversion happens in multiple stages, and understanding each stage is essential. First, text is split into tokens. A token is not necessarily a word. Depending on the tokenization algorithm, tokens can be complete words, parts of words called subwords, individual characters, or even bytes. The choice of tokenization strategy significantly impacts model performance and vocabulary size.

Second, each token is mapped to a unique integer identifier called a token ID. This mapping is defined by a vocabulary, which is a dictionary that associates each possible token with a specific number. The vocabulary is created during the model's training process and remains fixed afterward.

Third, these token IDs are converted into tensors that can be fed into the neural network. Additional processing may include adding special tokens, creating attention masks, and organizing the data into batches.

Different models use fundamentally different tokenization strategies. Byte-Pair Encoding, abbreviated as BPE, is one popular approach. BPE starts with individual characters and iteratively merges the most frequently occurring pairs of tokens to create a vocabulary of subword units. This allows the model to handle any word, even ones it has never seen, by breaking them into known subword pieces.

WordPiece is another algorithm, used by models like BERT. It is similar to BPE but uses a different merging criterion based on likelihood maximization. 

SentencePiece is yet another approach that treats the input as a raw stream of Unicode characters and learns subword units directly from that stream without requiring pre-tokenization into words.

The absolutely critical insight here is that tokenization is not arbitrary or interchangeable. Each model has a specific tokenizer that was trained alongside it using the same data. You must use the exact correct tokenizer for each model. Using the wrong tokenizer will produce nonsensical results because the token IDs will not correspond to what the model expects.

Let us examine a concrete tokenization example to make this concept crystal clear:

from transformers import AutoTokenizer

# Load a tokenizer for a specific model
# AutoTokenizer automatically determines the correct tokenizer type
tokenizer = AutoTokenizer.from_pretrained("gpt2")

# Convert text to tokens (the actual string pieces)
text = "Hello, how are you today?"
tokens = tokenizer.tokenize(text)
print("Tokens:", tokens)
# Output might be: ['Hello', ',', 'Ġhow', 'Ġare', 'Ġyou', 'Ġtoday', '?']
# Note: 'Ġ' represents a space in GPT-2 style tokenization

# Convert text directly to token IDs (the numerical representation)
token_ids = tokenizer.encode(text)
print("Token IDs:", token_ids)
# Output might be: [15496, 11, 703, 389, 345, 1909, 30]

# Convert token IDs back to text (decoding)
decoded_text = tokenizer.decode(token_ids)
print("Decoded:", decoded_text)
# Output: "Hello, how are you today?"

In this example, we use the AutoTokenizer class, which is a factory class that automatically loads the correct tokenizer type for any given model. The from_pretrained method is incredibly powerful. It takes a model identifier, which can be a model name from the HuggingFace Hub or a path to a local directory, and it downloads the tokenizer configuration and vocabulary files if they are not already cached locally.

The tokenize method shows the actual token strings, which helps us understand how the text is being split. Notice in the example output that some tokens have a special character 'Ġ' prefix. This is how GPT-2 style tokenizers represent spaces. Different tokenizers use different conventions.

The encode method converts text directly to integer IDs in a single step. This is the method you will use most frequently because it is more efficient than tokenizing and then converting separately. The encode method also handles adding special tokens that the model expects, such as beginning-of-sequence and end-of-sequence markers.

The decode method reverses the entire process, converting token IDs back to human-readable text. This is what we use to convert the model's numerical output into text we can display to users.

The AutoTokenizer class is incredibly convenient because it completely abstracts away the complexity of different tokenizer types. You do not need to know whether a model uses BertTokenizer, GPT2Tokenizer, LlamaTokenizer, or any other specific tokenizer class. Simply provide the model name, and AutoTokenizer handles all the details automatically.

The AutoModel Family: Loading Pre-Trained Models

HuggingFace provides several Auto classes for loading models, and understanding the distinctions between them is important. The most important class for our chatbot purposes is AutoModelForCausalLM, which loads models specifically designed for causal language modeling, also known as autoregressive text generation.

Causal language modeling means the model predicts the next token in a sequence based only on previous tokens. The model cannot look ahead at future tokens when making predictions. This is exactly the behavior we need for a chatbot because we want the model to generate responses one token at a time, building up a coherent answer sequentially.

Other Auto classes exist for different tasks. AutoModelForSequenceClassification loads models for text classification tasks like sentiment analysis. AutoModelForQuestionAnswering loads models specifically fine-tuned for extractive question answering. AutoModelForMaskedLM loads models like BERT that predict masked tokens in the middle of sequences. For our chatbot, we exclusively use AutoModelForCausalLM.

The AutoModelForCausalLM class works similarly to AutoTokenizer. You provide a model identifier, and it downloads the model weights, loads the correct architecture, and returns a ready-to-use model object. Here is a basic example with detailed explanations:

from transformers import AutoModelForCausalLM
import torch

# Load a model with specific configuration options
model = AutoModelForCausalLM.from_pretrained(
    "gpt2",
    torch_dtype=torch.float16,  # Use half precision for efficiency
    low_cpu_mem_usage=True      # Optimize memory usage during loading
)

print(f"Model loaded: {model.__class__.__name__}")
print(f"Number of parameters: {model.num_parameters():,}")

The from_pretrained method is the standard way to load any pre-trained model in the Transformers library. It accepts numerous parameters that control how the model is loaded and configured. Let us examine the most important ones.

The first argument is the model identifier. This can be a model name from the HuggingFace Hub like "gpt2", or it can be a local directory path containing model files. When you provide a Hub name, the method automatically downloads the model files to a cache directory on your computer. Subsequent loads use the cached version, so you only download once.

The torch_dtype parameter controls the numerical precision of the model's parameters. By default, models use torch.float32, which means each number is stored as a 32-bit floating point value. This provides high precision but uses significant memory. Setting torch_dtype to torch.float16 uses 16-bit floating point values instead, which reduces memory usage by exactly half with minimal impact on output quality for most models. This is called half precision or mixed precision computing.

For very large models, you might even use torch.bfloat16, which is a special 16-bit format that maintains the same range as float32 but with less precision. This is particularly useful for models with billions of parameters.

The low_cpu_mem_usage parameter optimizes how the model is loaded into memory. When set to True, the model is loaded more efficiently, preventing memory spikes during initialization. This is especially important for large models that might otherwise cause out-of-memory errors during loading.

The model object returned is a PyTorch neural network module, specifically an instance of a class like GPT2LMHeadModel, LlamaForCausalLM, or another architecture-specific class. It contains all the learned weights, which are the numerical parameters that were optimized during training, and the forward pass logic, which defines how inputs are transformed into outputs through the neural network layers.

However, we rarely call the model's forward method directly. Instead, we use higher-level generation utilities provided by the Transformers library that handle the complexities of text generation, including sampling strategies, stopping criteria, and output formatting.

Device Management: Leveraging Available Hardware Acceleration

Modern deep learning frameworks can run computations on various types of hardware devices. CPUs are universal and always available, but they are relatively slow for neural network operations. GPUs, which are Graphics Processing Units originally designed for rendering graphics, are vastly more efficient for the parallel matrix operations that neural networks require. Different GPU manufacturers use different software stacks. NVIDIA GPUs use CUDA, AMD GPUs use ROCm, and Apple Silicon uses Metal Performance Shaders abbreviated as MPS.

Running models on GPUs is typically ten to one hundred times faster than CPUs, depending on the model size and GPU capability. Therefore, we want to use GPU acceleration whenever possible. However, we must write our code to work on any available hardware without requiring users to manually configure device settings.

PyTorch provides a device abstraction that lets us write hardware-agnostic code. A device is represented by a torch.device object that specifies where tensors should be stored and where operations should be executed. Here is a robust device detection function with comprehensive explanations:

import torch

def get_optimal_device():
    """
    Detects and returns the best available device for model inference.
    
    This function checks for available hardware in order of performance:
    1. CUDA (NVIDIA GPUs) - fastest for most models
    2. MPS (Apple Silicon) - fast on Mac computers with M1/M2/M3 chips
    3. CPU - universal fallback, slower but always available
    
    Returns:
        torch.device: The optimal device object for tensor operations
    """
    if torch.cuda.is_available():
        # NVIDIA GPU with CUDA support detected
        device = torch.device("cuda")
        # Get the name of the GPU for informational purposes
        gpu_name = torch.cuda.get_device_name(0)
        # Get total GPU memory in gigabytes
        gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9
        print(f"Using CUDA GPU: {gpu_name}")
        print(f"Available GPU memory: {gpu_memory:.2f} GB")
        return device
    elif torch.backends.mps.is_available():
        # Apple Silicon with Metal Performance Shaders detected
        device = torch.device("mps")
        print("Using Apple Metal Performance Shaders (MPS)")
        return device
    else:
        # No GPU acceleration available, falling back to CPU
        device = torch.device("cpu")
        print("Using CPU (no GPU acceleration available)")
        print("Warning: CPU inference will be significantly slower than GPU")
        return device

This function implements a priority-based device selection strategy. It first checks if CUDA is available using torch.cuda.is_available(), which returns True if an NVIDIA GPU is detected and CUDA drivers are properly installed. If CUDA is available, we create a device object with torch.device("cuda"), which represents the first NVIDIA GPU in the system. For systems with multiple GPUs, you could specify "cuda:0", "cuda:1", etc., but for our chatbot, using the default first GPU is sufficient.

When a CUDA device is selected, we also retrieve and display diagnostic information. The torch.cuda.get_device_name function returns the commercial name of the GPU, like "NVIDIA GeForce RTX 3080". The torch.cuda.get_device_properties function returns a structure containing various properties, including total_memory, which tells us how much VRAM the GPU has. This information helps users understand what hardware is being used and diagnose potential memory issues.

If CUDA is not available, the function checks for MPS using torch.backends.mps.is_available(). MPS is Apple's GPU acceleration framework for their M-series chips. If detected, we create an MPS device. MPS provides significant speedups on Apple Silicon compared to CPU, though typically not quite as fast as high-end NVIDIA GPUs.

If neither CUDA nor MPS is available, we fall back to CPU. The function prints a warning because CPU inference is substantially slower. For small models, CPU inference might be acceptable, but for large models with billions of parameters, it can be painfully slow.

Once we have determined the optimal device, we need to move both the model and input tensors to that device. Moving the model is a one-time operation performed after loading:

device = get_optimal_device()
model = model.to(device)

The to method is a PyTorch method available on all neural network modules. It moves all model parameters and buffers to the specified device. This operation can take a few seconds for large models because it involves copying gigabytes of data to GPU memory. After this operation, the model resides on the GPU and all subsequent operations will execute there.

Input tensors must also be moved to the same device before processing. We will handle this in our generation code by ensuring that tokenized inputs are moved to the device before being passed to the model. If the model is on a GPU but the input is on CPU, PyTorch will raise an error because operations cannot mix devices.

Understanding Chat Templates and Conversation Formatting

One of the most confusing and frustrating aspects of working with different LLMs is that they expect inputs formatted in specific, often completely different ways. This is not a technical limitation but rather a consequence of how models are trained. During fine-tuning for chat applications, models learn to associate specific formatting patterns with conversational structure.

However, many models, especially base models and older conversational models, do not have standardized chat templates. The DialoGPT model we will use in our examples is one such model. It was trained on conversational data but does not use the modern chat template format. Instead, it expects simple concatenation of user and bot messages.

To handle this diversity, we need a flexible conversation formatting system that works with both models that have chat templates and those that do not. Here is a robust implementation:

def format_conversation(tokenizer, conversation_history, add_generation_prompt=True):
    """
    Formats conversation with or without chat template support.
    Falls back to simple concatenation if chat template not available.
    
    Args:
        tokenizer: The tokenizer object
        conversation_history: List of message dictionaries with 'role' and 'content'
        add_generation_prompt: Whether to add the assistant prefix
        
    Returns:
        str: Formatted conversation string
    """
    # Check if tokenizer has chat template support
    if hasattr(tokenizer, 'chat_template') and tokenizer.chat_template is not None:
        try:
            # Use official chat template
            formatted_prompt = tokenizer.apply_chat_template(
                conversation_history,
                tokenize=False,
                add_generation_prompt=add_generation_prompt
            )
            return formatted_prompt
        except Exception as e:
            print(f"Chat template failed: {e}, using fallback")
    
    # Fallback for models without chat templates
    formatted_parts = []
    
    for message in conversation_history:
        role = message["role"]
        content = message["content"]
        
        if role == "system":
            # System messages provide instructions
            formatted_parts.append(content)
        elif role == "user":
            # User messages
            formatted_parts.append(content)
        elif role == "assistant":
            # Assistant responses
            formatted_parts.append(content)
    
    # Join all parts
    result = tokenizer.eos_token.join(formatted_parts) if hasattr(tokenizer, 'eos_token') and tokenizer.eos_token else "\n".join(formatted_parts)
    
    # Add generation prompt if requested
    if add_generation_prompt:
        if hasattr(tokenizer, 'eos_token') and tokenizer.eos_token:
            result += tokenizer.eos_token
    
    return result

This function first attempts to use the tokenizer's built-in chat template if available. If the tokenizer does not have a chat template or if the template fails, it falls back to a simple concatenation strategy that works with most conversational models.

The conversation_history parameter is a list of dictionaries, where each dictionary represents one message in the conversation. The role field specifies who is speaking and can be "system", "user", or "assistant". The system role is used for instructions that guide the model's behavior. The user role represents messages from the human user. The assistant role represents messages from the AI assistant.

The content field contains the actual text of the message. This structure is standardized across the Transformers library, so you always use the same format regardless of which model you are working with.

The Generate Method: Fine-Grained Control Over Text Generation

The generate method is the core function for producing text with causal language models. It is a method on the model object itself and provides extensive control over the generation process through numerous parameters. Understanding these parameters is essential for getting quality outputs.

Here is a basic example of using the generate method:

# Prepare input by tokenizing
input_text = "Hello, how are you?"
input_ids = tokenizer.encode(input_text, return_tensors="pt")

# Move input to the same device as the model
input_ids = input_ids.to(device)

# Generate output tokens
with torch.no_grad():
    output_ids = model.generate(
        input_ids,
        max_new_tokens=50,
        do_sample=True,
        temperature=0.7,
        pad_token_id=tokenizer.eos_token_id
    )

# Decode output tokens to text
# Only decode the newly generated tokens, not the input
output_text = tokenizer.decode(
    output_ids[0][input_ids.shape[1]:], 
    skip_special_tokens=True
)
print(output_text)

Let us dissect each part of this code in detail. First, we tokenize the input text using the tokenizer's encode method. The return_tensors parameter specifies the format of the returned data. Setting it to "pt" returns PyTorch tensors instead of plain Python lists. This is necessary because the model expects tensor inputs.

The encode method returns a tensor of token IDs with shape (1, sequence_length), where 1 is the batch dimension. Even though we are processing a single input, PyTorch models expect batched inputs, so the tokenizer adds a batch dimension automatically when return_tensors is specified.

Next, we move the input tensor to the same device as the model using the to method. This is crucial because PyTorch requires all tensors involved in an operation to be on the same device. If the model is on a GPU but the input is on CPU, the operation will fail.

The generate method is where the actual text generation happens. It takes the input token IDs and produces output token IDs by repeatedly predicting the next token and appending it to the sequence. Let us examine each parameter:

The max_new_tokens parameter specifies the maximum number of new tokens to generate, not counting the input tokens. This is different from max_length, which specifies the total length including input. Using max_new_tokens is generally better because it gives you predictable output lengths regardless of input length.

The do_sample parameter controls whether to use sampling or greedy decoding. When set to False, the model always selects the highest probability token at each step. This is called greedy decoding and produces deterministic outputs. When set to True, the model samples from the probability distribution, introducing randomness and variety in outputs.

The temperature parameter controls the randomness of sampling. It is a positive number that scales the logits, which are the raw model outputs, before converting them to probabilities. A temperature of 1.0 uses the model's original probabilities. Lower temperatures like 0.7 make the distribution more peaked, favoring high-probability tokens and producing more focused, conservative outputs. Higher temperatures like 1.5 flatten the distribution, giving lower-probability tokens more chance and producing more creative, diverse outputs.

The pad_token_id parameter specifies which token ID to use for padding. Padding is necessary when processing batches of sequences with different lengths. All sequences in a batch must have the same length, so shorter sequences are padded. Some tokenizers have a dedicated padding token, but many do not. Setting pad_token_id to tokenizer.eos_token_id uses the end-of-sequence token for padding, which is a common practice.

The torch.no_grad() context manager is a performance optimization. It tells PyTorch not to compute gradients, which are only needed for training. This reduces memory usage and speeds up inference.

The generate method returns a tensor of token IDs with shape (batch_size, sequence_length). For our single-input example, this is (1, total_length) where total_length includes both the input and generated tokens. 

The critical improvement in our decoding step is that we only decode the newly generated tokens, not the entire sequence including the input. We do this by slicing the output tensor: output_ids[0][input_ids.shape[1]:]. This takes the first batch element (index 0) and slices from the end of the input (input_ids.shape[1]) to the end. This gives us only the new tokens.

The decode method converts token IDs back to text. The skip_special_tokens parameter, when set to True, removes special tokens like padding tokens, end-of-sequence tokens, and other model-specific markers from the output. This produces cleaner text for display to users.

Building the Basic Console Chatbot: Putting It All Together

Now that we understand all the fundamental components, we can build our first complete chatbot. This version will be a simple console application that loads a model, accepts user input, generates responses, and displays them. It will not yet have conversation memory, but it will demonstrate all the core concepts.

Let us examine the complete code with extensive inline comments:

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

def get_optimal_device():
    """
    Detects and returns the best available device for model inference.
    Checks for CUDA, MPS, and falls back to CPU if neither is available.
    """
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print(f"Using CUDA GPU: {torch.cuda.get_device_name(0)}")
        print(f"Available GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
        return device
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
        print("Using Apple Metal Performance Shaders (MPS)")
        return device
    else:
        device = torch.device("cpu")
        print("Using CPU (no GPU acceleration available)")
        return device

def load_model_and_tokenizer(model_name):
    """
    Loads a pre-trained model and its associated tokenizer.
    
    Args:
        model_name: HuggingFace model identifier or local path
        
    Returns:
        tuple: (model, tokenizer, device)
    """
    print(f"Loading model: {model_name}")
    print("This may take a few minutes on first run...")
    
    # Load tokenizer
    # AutoTokenizer automatically selects the correct tokenizer class
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Fix missing padding token issue
    # Many models don't have a padding token defined
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    
    # Load model with optimized settings
    # torch.float16 reduces memory usage by half
    # low_cpu_mem_usage prevents memory spikes during loading
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=True
    )
    
    # Detect optimal device and move model to it
    device = get_optimal_device()
    model = model.to(device)
    
    # Set model to evaluation mode
    # This disables dropout and other training-specific behaviors
    model.eval()
    
    print(f"Model loaded successfully!")
    print(f"Model has {model.num_parameters():,} parameters")
    
    return model, tokenizer, device

def generate_response(model, tokenizer, device, prompt, max_new_tokens=100):
    """
    Generates a response to a given prompt using the loaded model.
    
    Args:
        model: The loaded language model
        tokenizer: The associated tokenizer
        device: The device where the model resides
        prompt: Input text to respond to
        max_new_tokens: Maximum number of tokens to generate
        
    Returns:
        str: Generated response text
    """
    # Tokenize input and convert to tensor
    # return_tensors="pt" returns PyTorch tensors
    input_ids = tokenizer.encode(prompt, return_tensors="pt")
    
    # Move input to the same device as the model
    input_ids = input_ids.to(device)
    
    # Generate response tokens
    # torch.no_grad() disables gradient computation for efficiency
    with torch.no_grad():
        output_ids = model.generate(
            input_ids,
            max_new_tokens=max_new_tokens,
            do_sample=True,  # Use sampling for varied responses
            temperature=0.7,  # Control randomness (lower = more focused)
            top_p=0.9,  # Nucleus sampling parameter
            pad_token_id=tokenizer.eos_token_id  # Use EOS token for padding
        )
    
    # Decode only the newly generated tokens
    # This avoids including the input prompt in the response
    response = tokenizer.decode(
        output_ids[0][input_ids.shape[1]:], 
        skip_special_tokens=True
    )
    
    return response.strip()

def main():
    """
    Main function that runs the console chatbot.
    """
    print("=" * 60)
    print("CONSOLE CHATBOT - Version 1.0")
    print("=" * 60)
    
    # Specify the model to use
    # GPT-2 is a good starting model - small, fast, and widely compatible
    model_name = "gpt2"
    
    # Load model and tokenizer
    model, tokenizer, device = load_model_and_tokenizer(model_name)
    
    print("\nChatbot ready! Type 'quit' to exit.")
    print("-" * 60)
    
    # Main conversation loop
    while True:
        # Get user input
        user_input = input("\nYou: ").strip()
        
        # Check for exit command
        if user_input.lower() in ['quit', 'exit', 'bye']:
            print("Goodbye!")
            break
        
        # Skip empty inputs
        if not user_input:
            continue
        
        # Generate and display response
        print("Assistant: ", end="", flush=True)
        response = generate_response(
            model, 
            tokenizer, 
            device, 
            user_input,
            max_new_tokens=100
        )
        print(response)

if __name__ == "__main__":
    main()

This complete program demonstrates all the concepts we have discussed. The get_optimal_device function detects available hardware. The load_model_and_tokenizer function loads the model and tokenizer with optimal settings, including the critical fix for missing padding tokens. The generate_response function handles the complete generation workflow, properly extracting only the new tokens. The main function orchestrates everything and provides a simple console interface.

When you run this program, it first loads the model, which may take a minute or two depending on your internet connection and hardware. Once loaded, you can type messages and receive responses. The chatbot does not yet remember previous messages, so each interaction is independent.

The model.eval() call is important. PyTorch models have two modes: training mode and evaluation mode. Training mode enables certain behaviors like dropout, which randomly deactivates neurons during training to prevent overfitting. Evaluation mode disables these behaviors for consistent inference. Always call eval() before using a model for generation.

The torch.no_grad() context manager is a performance optimization. It tells PyTorch not to compute gradients, which are only needed for training. This reduces memory usage and speeds up inference.

The top_p parameter we added to generate is another sampling strategy called nucleus sampling. It keeps only the most probable tokens whose cumulative probability exceeds p. This provides a dynamic vocabulary size that adapts to the certainty of the model's predictions.

STEP TWO: ADDING CONVERSATION MEMORY

Understanding the Stateless Nature of Language Models

This is one of the most important concepts to understand about Large Language Models: they are completely stateless. Every time you call the generate method, the model has absolutely no memory of previous calls. It only sees the input tokens you provide in that specific call. This is fundamentally different from traditional applications where objects maintain state between method calls.

Think of it like calling a pure mathematical function. If you call f(x) twice with the same x, you get the same result both times. The function does not remember that you called it before. Language models work exactly the same way. If you send the same prompt twice, you get similar outputs both times (with some variation due to sampling randomness), but the model does not "remember" the first call.

This statelessness is actually a design feature, not a limitation. It makes models simpler, more predictable, and easier to scale. However, it means that to have a conversation with memory, we must manually manage the conversation history and include it in every prompt.

The solution is to maintain a list of all previous messages and include them in each generation call. When the user sends a new message, we append it to the history, generate a response using the complete history, append the response to the history, and repeat. This way, the model always sees the full context of the conversation.

Implementing Conversation History Management

To implement conversation memory, we need to modify our chatbot to maintain a conversation history and format it properly for the model. Since we are using GPT-2, which does not have a chat template, we will use a simple concatenation strategy with the EOS token as a separator.

Here is how we structure the conversation history:

# Initialize conversation history
# This list will grow as the conversation progresses
conversation_history = []

# Add a system message to guide the model's behavior
# For models without chat templates, we include this as context
system_message = "You are a helpful and friendly AI assistant."
conversation_history.append(system_message)

The conversation history starts with a system message that provides instructions. For models with chat templates, we would use the role-based format, but for GPT-2, we simply include it as part of the context.

When the user sends a message, we add it to the history:

# User sends a message
user_message = "What is the capital of France?"

# Add user message to history
conversation_history.append(user_message)

Now we need to format the entire conversation history and generate a response:

# Format conversation by joining with EOS token
formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token

# Tokenize the formatted prompt
input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
input_ids = input_ids.to(device)

# Generate response
with torch.no_grad():
    output_ids = model.generate(
        input_ids,
        max_new_tokens=100,
        do_sample=True,
        temperature=0.7,
        top_p=0.9,
        pad_token_id=tokenizer.eos_token_id
    )

# Decode only the new tokens
assistant_response = tokenizer.decode(
    output_ids[0][input_ids.shape[1]:], 
    skip_special_tokens=True
)

# Add assistant response to history
conversation_history.append(assistant_response.strip())

The critical insight here is that we format the entire conversation history, not just the latest message. We join all messages with the EOS token, which helps the model distinguish between different turns in the conversation. This ensures the model sees the full context.

After generation, we extract only the new response by slicing the output tensor to exclude the input tokens. We then add the assistant's response to the conversation history so it will be included in the next turn.

Managing Context Window Limitations

There is one major complication with this approach: models have maximum context lengths. A context length, also called a context window, is the maximum number of tokens a model can process in a single call. For example, GPT-2 has a context window of 1024 tokens. If your conversation history grows beyond this limit, the model cannot process it.

We need to implement context window management to prevent this issue. There are several strategies. The simplest is to truncate old messages when the history gets too long. A more sophisticated approach is to summarize old messages. For our chatbot, we will implement a sliding window that keeps only the most recent messages.

Here is a function that manages the context window:

def manage_context_window(conversation_history, tokenizer, max_tokens=800):
    """
    Ensures conversation history fits within the model's context window.
    Removes oldest messages if necessary, always keeping the system message.
    
    Args:
        conversation_history: List of conversation messages
        tokenizer: The tokenizer to measure token counts
        max_tokens: Maximum tokens to keep (should be less than model's limit)
        
    Returns:
        list: Potentially truncated conversation history
    """
    # Always keep the system message if it exists
    system_message = None
    messages = conversation_history.copy()
    
    if messages and len(messages) > 0:
        # Assume first message is system message
        system_message = messages[0]
        messages = messages[1:]
    
    # Calculate total tokens in current history
    formatted = tokenizer.eos_token.join(messages) if messages else ""
    token_count = len(tokenizer.encode(formatted))
    
    # Remove oldest messages until we fit in the limit
    while token_count > max_tokens and len(messages) > 1:
        # Remove the oldest message (index 0)
        messages.pop(0)
        
        # Recalculate token count
        formatted = tokenizer.eos_token.join(messages)
        token_count = len(tokenizer.encode(formatted))
    
    # Reconstruct history with system message
    result = []
    if system_message:
        result.append(system_message)
    result.extend(messages)
    
    return result

This function preserves the system message while removing old user and assistant messages from the beginning of the conversation. It calculates the total token count by actually encoding the formatted history, which gives an accurate measurement. The max_tokens parameter should be set lower than the model's actual limit to leave room for the response.

We call this function before each generation:

# Manage context window before generation
conversation_history = manage_context_window(
    conversation_history,
    tokenizer,
    max_tokens=800
)

This ensures we never exceed the model's context limit, even in very long conversations.

Complete Chatbot with Conversation Memory

Let us now examine the complete second version of our chatbot with conversation memory fully implemented:

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

def get_optimal_device():
    """Detects and returns the best available device."""
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print(f"Using CUDA GPU: {torch.cuda.get_device_name(0)}")
        return device
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
        print("Using Apple MPS")
        return device
    else:
        device = torch.device("cpu")
        print("Using CPU")
        return device

def load_model_and_tokenizer(model_name):
    """Loads model and tokenizer with optimal settings."""
    print(f"Loading model: {model_name}")
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # Fix missing padding token
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=True
    )
    
    device = get_optimal_device()
    model = model.to(device)
    model.eval()
    
    print(f"Model loaded with {model.num_parameters():,} parameters")
    return model, tokenizer, device

def manage_context_window(conversation_history, tokenizer, max_tokens=800):
    """
    Ensures conversation fits within context window by removing old messages.
    Always preserves the system message if present.
    """
    system_message = None
    messages = conversation_history.copy()
    
    # Extract and preserve system message
    if messages and len(messages) > 0:
        system_message = messages[0]
        messages = messages[1:]
    
    # Calculate current token count
    if messages:
        formatted = tokenizer.eos_token.join(messages)
        token_count = len(tokenizer.encode(formatted))
        
        # Remove oldest messages until we fit
        while token_count > max_tokens and len(messages) > 1:
            messages.pop(0)
            formatted = tokenizer.eos_token.join(messages)
            token_count = len(tokenizer.encode(formatted))
    
    # Restore system message
    result = []
    if system_message:
        result.append(system_message)
    result.extend(messages)
    
    return result

def generate_response(model, tokenizer, device, conversation_history, max_new_tokens=150):
    """
    Generates a response based on complete conversation history.
    
    Args:
        model: The language model
        tokenizer: Associated tokenizer
        device: Device where model resides
        conversation_history: List of conversation messages
        max_new_tokens: Maximum tokens to generate
        
    Returns:
        str: Generated assistant response
    """
    # Format conversation by joining with EOS token
    formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
    
    # Tokenize and move to device
    input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
    input_ids = input_ids.to(device)
    
    # Generate response
    with torch.no_grad():
        output_ids = model.generate(
            input_ids,
            max_new_tokens=max_new_tokens,
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            pad_token_id=tokenizer.eos_token_id
        )
    
    # Decode only the new tokens
    assistant_response = tokenizer.decode(
        output_ids[0][input_ids.shape[1]:], 
        skip_special_tokens=True
    )
    
    return assistant_response.strip()

def main():
    """Main chatbot function with conversation memory."""
    print("=" * 60)
    print("CONSOLE CHATBOT - Version 2.0 (With Memory)")
    print("=" * 60)
    
    model_name = "gpt2"
    model, tokenizer, device = load_model_and_tokenizer(model_name)
    
    # Initialize conversation history with system message
    conversation_history = [
        "You are a helpful and friendly AI assistant."
    ]
    
    print("\nChatbot ready! Type 'quit' to exit, 'reset' to clear history.")
    print("-" * 60)
    
    while True:
        user_input = input("\nYou: ").strip()
        
        if user_input.lower() in ['quit', 'exit']:
            print("Goodbye!")
            break
        
        if user_input.lower() == 'reset':
            conversation_history = [conversation_history[0]]  # Keep system message
            print("Conversation history cleared.")
            continue
        
        if not user_input:
            continue
        
        # Add user message to history
        conversation_history.append(user_input)
        
        # Manage context window
        conversation_history = manage_context_window(
            conversation_history,
            tokenizer,
            max_tokens=800
        )
        
        # Generate response
        print("Assistant: ", end="", flush=True)
        response = generate_response(
            model,
            tokenizer,
            device,
            conversation_history,
            max_new_tokens=150
        )
        print(response)
        
        # Add assistant response to history
        conversation_history.append(response)

if __name__ == "__main__":
    main()

This version maintains full conversation context, allowing the model to reference previous messages and maintain coherent multi-turn conversations. The reset command lets users start fresh without restarting the program. The context window management prevents errors from exceeding token limits.

STEP THREE: CONFIGURABLE GENERATION PARAMETERS

Understanding Generation Parameters and Their Effects

The quality, style, and characteristics of generated text are controlled by numerous parameters passed to the generate method. Understanding these parameters is crucial for getting the outputs you want. Each parameter affects the generation process in different ways, and finding the right combination often requires experimentation.

Let us examine each major generation parameter in detail, explaining what it does, why it matters, and how to use it effectively.

Temperature: Controlling Randomness and Creativity

Temperature is perhaps the most important generation parameter. It controls the randomness of the model's predictions by scaling the logits before converting them to probabilities. Logits are the raw numerical outputs from the model's final layer before any normalization.

Mathematically, temperature works like this. The model outputs a vector of logits, one for each token in the vocabulary. These logits are converted to probabilities using the softmax function. Temperature divides the logits before applying softmax. Lower temperatures make the distribution more peaked, higher temperatures make it flatter.

A temperature of 1.0 uses the model's original probability distribution without modification. This is the baseline.

A temperature below 1.0, such as 0.7 or 0.5, makes the model more conservative and focused. High-probability tokens become even more likely, while low-probability tokens become less likely. This produces more predictable, coherent, and factual outputs. Use lower temperatures when you want consistent, reliable responses.

A temperature above 1.0, such as 1.2 or 1.5, makes the model more creative and diverse. It gives lower-probability tokens more chance of being selected. This produces more varied, creative, and sometimes surprising outputs. However, very high temperatures can produce incoherent or nonsensical text. Use higher temperatures for creative writing or when you want diverse outputs.

A temperature of 0.0 is a special case that makes sampling equivalent to greedy decoding. The model always selects the highest probability token. This produces completely deterministic outputs.

Here is how to use temperature in code:

output_ids = model.generate(
    input_ids,
    max_new_tokens=100,
    do_sample=True,  # Must be True to use temperature
    temperature=0.7  # Lower = more focused, higher = more creative
)

Note that temperature only has an effect when do_sample is True. With greedy decoding, temperature is ignored.

Top-k Sampling: Limiting the Vocabulary

Top-k sampling is a technique that restricts the model to considering only the k most probable tokens at each generation step. All other tokens are given zero probability. This prevents the model from selecting very unlikely tokens that might produce nonsensical outputs.

For example, with top_k=50, the model considers only the 50 most probable tokens at each step. Even if temperature makes the distribution flatter, tokens outside the top 50 are completely excluded.

Top-k sampling helps prevent the model from occasionally selecting bizarre, out-of-context tokens. However, it has a limitation: it uses a fixed k regardless of how certain the model is. Sometimes the model is very certain and only a few tokens make sense. Other times the model is uncertain and many tokens are plausible. A fixed k does not adapt to this.

Here is how to use top-k:

output_ids = model.generate(
    input_ids,
    max_new_tokens=100,
    do_sample=True,
    temperature=0.7,
    top_k=50  # Consider only top 50 tokens
)

Typical values for top_k range from 20 to 100. Lower values are more restrictive, higher values are more permissive.

Top-p Sampling (Nucleus Sampling): Dynamic Vocabulary

Top-p sampling, also called nucleus sampling, is a more sophisticated alternative to top-k. Instead of using a fixed number of tokens, it dynamically selects the smallest set of tokens whose cumulative probability exceeds p.

For example, with top_p=0.9, the model considers the most probable tokens until their cumulative probability reaches 90 percent. The number of tokens included varies depending on the model's certainty. When the model is very certain, only a few tokens might be needed to reach 90 percent. When the model is uncertain, many tokens might be included.

This adaptive behavior makes top-p generally superior to top-k. It prevents unlikely tokens while adapting to the model's confidence. Top-p values typically range from 0.8 to 0.95. Lower values are more restrictive, higher values are more permissive.

You can use both top-k and top-p together. The model first applies top-k to limit the vocabulary, then applies top-p within that subset:

output_ids = model.generate(
    input_ids,
    max_new_tokens=100,
    do_sample=True,
    temperature=0.7,
    top_k=50,
    top_p=0.9  # Nucleus sampling
)

However, using top-p alone is often sufficient and simpler.

Repetition Penalty: Preventing Redundancy

Language models sometimes get stuck in repetitive loops, generating the same phrases or words over and over. The repetition penalty parameter discourages this by reducing the probability of tokens that have already been generated.

The repetition penalty is a multiplier applied to the logits of previously generated tokens. A value of 1.0 means no penalty. Values above 1.0 penalize repetition. For example, with repetition_penalty=1.2, tokens that have already appeared have their logits divided by 1.2, making them less likely to be selected again.

Higher values produce more diverse outputs but can sometimes make the text feel unnatural if the model is forced to avoid common words. Typical values range from 1.0 to 1.5:

output_ids = model.generate(
    input_ids,
    max_new_tokens=100,
    do_sample=True,
    temperature=0.7,
    top_p=0.9,
    repetition_penalty=1.2  # Discourage repetition
)

Use repetition penalty when you notice the model repeating itself excessively.

Length Penalties and Constraints

Several parameters control the length of generated outputs. We have already seen max_new_tokens, which sets a hard limit on generation length. Other parameters provide more nuanced control.

The min_new_tokens parameter sets a minimum length, forcing the model to generate at least that many tokens even if it wants to stop earlier:

output_ids = model.generate(
    input_ids,
    min_new_tokens=20,  # Generate at least 20 tokens
    max_new_tokens=100
)

The length_penalty parameter affects beam search, which is an alternative to sampling that we will discuss shortly. It encourages or discourages longer outputs. Values above 1.0 encourage longer outputs, values below 1.0 encourage shorter outputs.

Beam Search: An Alternative to Sampling

So far, we have focused on sampling-based generation where the model randomly selects tokens according to their probabilities. Beam search is a deterministic alternative that maintains multiple candidate sequences simultaneously and selects the one with the highest overall probability.

With beam search, the num_beams parameter specifies how many candidates to maintain. For example, with num_beams=5, the algorithm keeps track of the 5 most promising sequences at each step. At the end, it returns the sequence with the highest total probability.

Beam search typically produces higher-quality, more coherent outputs than sampling because it considers multiple possibilities and selects the best overall sequence. However, it is slower and can produce less diverse outputs:

output_ids = model.generate(
    input_ids,
    max_new_tokens=100,
    num_beams=5,  # Use beam search with 5 beams
    early_stopping=True  # Stop when all beams reach EOS
)

When using beam search, do_sample should be False (the default). You cannot combine beam search with sampling.

The early_stopping parameter controls whether to stop when all beams have generated an end-of-sequence token. Setting it to True can speed up generation.

Implementing User-Configurable Parameters

For our third chatbot version, we will allow users to configure all these parameters interactively. We will create a configuration system that stores parameter values and a menu interface for changing them.

Here is a configuration class that manages generation parameters:

class GenerationConfig:
    """
    Manages generation parameters for the chatbot.
    Provides defaults and validation for all parameters.
    """
    
    def __init__(self):
        """Initialize with sensible default values."""
        self.max_new_tokens = 150
        self.temperature = 0.7
        self.top_p = 0.9
        self.top_k = 50
        self.repetition_penalty = 1.1
        self.do_sample = True
        self.num_beams = 1  # 1 means no beam search
    
    def to_dict(self):
        """
        Converts configuration to a dictionary suitable for model.generate().
        
        Returns:
            dict: Parameter dictionary
        """
        config = {
            "max_new_tokens": self.max_new_tokens,
            "do_sample": self.do_sample,
            "pad_token_id": None  # Will be set at generation time
        }
        
        if self.do_sample:
            # Sampling parameters only apply when do_sample is True
            config["temperature"] = self.temperature
            config["top_p"] = self.top_p
            config["top_k"] = self.top_k
            config["repetition_penalty"] = self.repetition_penalty
        else:
            # Beam search parameters
            if self.num_beams > 1:
                config["num_beams"] = self.num_beams
                config["early_stopping"] = True
        
        return config
    
    def display(self):
        """Prints current configuration in a readable format."""
        print("\nCurrent Generation Configuration:")
        print(f"  Max New Tokens: {self.max_new_tokens}")
        print(f"  Sampling Mode: {'Enabled' if self.do_sample else 'Disabled (Greedy/Beam)'}")
        
        if self.do_sample:
            print(f"  Temperature: {self.temperature}")
            print(f"  Top-p: {self.top_p}")
            print(f"  Top-k: {self.top_k}")
            print(f"  Repetition Penalty: {self.repetition_penalty}")
        else:
            print(f"  Num Beams: {self.num_beams}")
    
    def configure_interactive(self):
        """
        Interactive menu for changing configuration parameters.
        Provides explanations and validation for each parameter.
        """
        while True:
            print("\n" + "=" * 60)
            print("GENERATION PARAMETER CONFIGURATION")
            print("=" * 60)
            self.display()
            print("\nOptions:")
            print("  1. Max New Tokens")
            print("  2. Temperature")
            print("  3. Top-p")
            print("  4. Top-k")
            print("  5. Repetition Penalty")
            print("  6. Toggle Sampling Mode")
            print("  7. Num Beams")
            print("  0. Done")
            
            choice = input("\nSelect option: ").strip()
            
            if choice == "0":
                break
            elif choice == "1":
                try:
                    value = int(input("Enter max new tokens (50-500): "))
                    if 1 <= value <= 2000:
                        self.max_new_tokens = value
                    else:
                        print("Value out of range")
                except ValueError:
                    print("Invalid number")
            elif choice == "2":
                try:
                    value = float(input("Enter temperature (0.1-2.0): "))
                    if 0.0 < value <= 2.0:
                        self.temperature = value
                    else:
                        print("Value out of range")
                except ValueError:
                    print("Invalid number")
            elif choice == "3":
                try:
                    value = float(input("Enter top-p (0.1-1.0): "))
                    if 0.0 < value <= 1.0:
                        self.top_p = value
                    else:
                        print("Value out of range")
                except ValueError:
                    print("Invalid number")
            elif choice == "4":
                try:
                    value = int(input("Enter top-k (1-200): "))
                    if value >= 1:
                        self.top_k = value
                    else:
                        print("Value must be at least 1")
                except ValueError:
                    print("Invalid number")
            elif choice == "5":
                try:
                    value = float(input("Enter repetition penalty (1.0-2.0): "))
                    if value >= 1.0:
                        self.repetition_penalty = value
                    else:
                        print("Value must be at least 1.0")
                except ValueError:
                    print("Invalid number")
            elif choice == "6":
                self.do_sample = not self.do_sample
                print(f"Sampling mode: {'Enabled' if self.do_sample else 'Disabled'}")
            elif choice == "7":
                try:
                    value = int(input("Enter num beams (1-10): "))
                    if value >= 1:
                        self.num_beams = value
                        if value > 1:
                            self.do_sample = False
                            print("Sampling disabled (beam search active)")
                    else:
                        print("Value must be at least 1")
                except ValueError:
                    print("Invalid number")
            else:
                print("Invalid option")

This configuration class encapsulates all generation parameters and provides an interactive menu for changing them. The to_dict method converts the configuration to a dictionary that can be unpacked into the generate method call.

Now we modify our generate_response function to use this configuration:

def generate_response(model, tokenizer, device, conversation_history, gen_config):
    """
    Generates response using the provided configuration.
    
    Args:
        model: The language model
        tokenizer: Associated tokenizer
        device: Device where model resides
        conversation_history: List of conversation messages
        gen_config: GenerationConfig object
        
    Returns:
        str: Generated assistant response
    """
    # Format conversation
    formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
    
    input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
    input_ids = input_ids.to(device)
    
    # Get configuration as dictionary
    config_dict = gen_config.to_dict()
    config_dict["pad_token_id"] = tokenizer.eos_token_id
    
    # Generate with configuration
    with torch.no_grad():
        output_ids = model.generate(
            input_ids,
            **config_dict  # Unpack configuration dictionary
        )
    
    # Decode only new tokens
    response = tokenizer.decode(
        output_ids[0][input_ids.shape[1]:], 
        skip_special_tokens=True
    )
    
    return response.strip()

The key change is that we now pass a GenerationConfig object and unpack its dictionary into the generate call using the double asterisk operator. This makes the function flexible and allows easy parameter experimentation.

Complete Chatbot with Configurable Parameters

Here is the complete third version with full parameter configuration:

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

class GenerationConfig:
    """Manages generation parameters with defaults and validation."""
    
    def __init__(self):
        self.max_new_tokens = 150
        self.temperature = 0.7
        self.top_p = 0.9
        self.top_k = 50
        self.repetition_penalty = 1.1
        self.do_sample = True
        self.num_beams = 1
    
    def to_dict(self):
        config = {
            "max_new_tokens": self.max_new_tokens,
            "do_sample": self.do_sample,
            "pad_token_id": None
        }
        
        if self.do_sample:
            config["temperature"] = self.temperature
            config["top_p"] = self.top_p
            config["top_k"] = self.top_k
            config["repetition_penalty"] = self.repetition_penalty
        else:
            if self.num_beams > 1:
                config["num_beams"] = self.num_beams
                config["early_stopping"] = True
        
        return config
    
    def display(self):
        print("\nCurrent Configuration:")
        print(f"  Max New Tokens: {self.max_new_tokens}")
        print(f"  Mode: {'Sampling' if self.do_sample else 'Greedy/Beam'}")
        if self.do_sample:
            print(f"  Temperature: {self.temperature}")
            print(f"  Top-p: {self.top_p}")
            print(f"  Top-k: {self.top_k}")
            print(f"  Repetition Penalty: {self.repetition_penalty}")
        else:
            print(f"  Num Beams: {self.num_beams}")
    
    def configure_interactive(self):
        while True:
            print("\n" + "=" * 60)
            print("PARAMETER CONFIGURATION")
            print("=" * 60)
            self.display()
            print("\n1. Max Tokens  2. Temperature  3. Top-p  4. Top-k")
            print("5. Repetition Penalty  6. Toggle Sampling  7. Num Beams")
            print("0. Done")
            
            choice = input("\nSelect: ").strip()
            
            if choice == "0":
                break
            elif choice == "1":
                try:
                    self.max_new_tokens = int(input("Max tokens: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "2":
                try:
                    self.temperature = float(input("Temperature: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "3":
                try:
                    self.top_p = float(input("Top-p: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "4":
                try:
                    self.top_k = int(input("Top-k: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "5":
                try:
                    self.repetition_penalty = float(input("Repetition penalty: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "6":
                self.do_sample = not self.do_sample
            elif choice == "7":
                try:
                    self.num_beams = int(input("Num beams: "))
                    if self.num_beams > 1:
                        self.do_sample = False
                except ValueError:
                    print("Invalid number")

def get_optimal_device():
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print(f"Using CUDA: {torch.cuda.get_device_name(0)}")
        return device
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
        print("Using MPS")
        return device
    else:
        device = torch.device("cpu")
        print("Using CPU")
        return device

def load_model_and_tokenizer(model_name):
    print(f"Loading {model_name}...")
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=True
    )
    device = get_optimal_device()
    model = model.to(device)
    model.eval()
    print(f"Loaded {model.num_parameters():,} parameters")
    return model, tokenizer, device

def manage_context_window(conversation_history, tokenizer, max_tokens=800):
    system_message = None
    messages = conversation_history.copy()
    
    if messages and len(messages) > 0:
        system_message = messages[0]
        messages = messages[1:]
    
    if messages:
        formatted = tokenizer.eos_token.join(messages)
        token_count = len(tokenizer.encode(formatted))
        
        while token_count > max_tokens and len(messages) > 1:
            messages.pop(0)
            formatted = tokenizer.eos_token.join(messages)
            token_count = len(tokenizer.encode(formatted))
    
    result = []
    if system_message:
        result.append(system_message)
    result.extend(messages)
    
    return result

def generate_response(model, tokenizer, device, conversation_history, gen_config):
    formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
    
    input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
    input_ids = input_ids.to(device)
    
    config_dict = gen_config.to_dict()
    config_dict["pad_token_id"] = tokenizer.eos_token_id
    
    with torch.no_grad():
        output_ids = model.generate(input_ids, **config_dict)
    
    response = tokenizer.decode(
        output_ids[0][input_ids.shape[1]:], 
        skip_special_tokens=True
    )
    
    return response.strip()

def main():
    print("=" * 60)
    print("CONSOLE CHATBOT - Version 3.0 (Configurable Parameters)")
    print("=" * 60)
    
    model_name = "gpt2"
    model, tokenizer, device = load_model_and_tokenizer(model_name)
    
    gen_config = GenerationConfig()
    
    conversation_history = [
        "You are a helpful AI assistant."
    ]
    
    print("\nCommands: 'quit', 'reset', 'config'")
    print("-" * 60)
    
    while True:
        user_input = input("\nYou: ").strip()
        
        if user_input.lower() in ['quit', 'exit']:
            print("Goodbye!")
            break
        
        if user_input.lower() == 'reset':
            conversation_history = [conversation_history[0]]
            print("History cleared")
            continue
        
        if user_input.lower() == 'config':
            gen_config.configure_interactive()
            continue
        
        if not user_input:
            continue
        
        conversation_history.append(user_input)
        
        conversation_history = manage_context_window(
            conversation_history,
            tokenizer
        )
        
        print("Assistant: ", end="", flush=True)
        response = generate_response(
            model,
            tokenizer,
            device,
            conversation_history,
            gen_config
        )
        print(response)
        
        conversation_history.append(response)

if __name__ == "__main__":
    main()

This version allows users to experiment with all generation parameters interactively. The config command opens the configuration menu where parameters can be adjusted. This is invaluable for understanding how different parameters affect output quality and style.

STEP FOUR: ADDING RETRIEVAL-AUGMENTED GENERATION

Understanding the Rationale for RAG

Retrieval-Augmented Generation, commonly abbreviated as RAG, is a technique that combines language models with external knowledge retrieval. The fundamental problem RAG solves is that language models only know what was in their training data, which has several limitations.

First, training data has a cutoff date. A model trained in 2023 knows nothing about events from 2024. Second, models cannot access private or proprietary information that was not in their training data. Third, models sometimes hallucinate, generating plausible-sounding but factually incorrect information.

RAG addresses these limitations by giving the model access to external documents. When a user asks a question, the system first retrieves relevant passages from a document collection, then provides those passages to the model as context. The model can then generate responses grounded in the retrieved information rather than relying solely on its training data.

The RAG workflow consists of several stages. First, documents are loaded and split into chunks. Second, chunks are converted into numerical embeddings that capture their semantic meaning. Third, embeddings are stored in a vector database that enables efficient similarity search. Fourth, when a user asks a question, it is converted to an embedding and used to retrieve the most relevant chunks. Fifth, retrieved chunks are provided to the language model as context. Sixth, the model generates a response based on the retrieved information.

Let us examine each stage in detail.

Document Loading: Ingesting External Knowledge

The first step in RAG is loading documents into a format we can process. Documents can come from many sources: PDF files, Word documents, web pages, databases, APIs, and more. For our chatbot, we will focus on plain text files and PDFs as they are most common.

We need functionality that can handle various document formats. Here are simple but effective loaders:

def load_text_file(file_path):
    """
    Loads a plain text file and returns its contents.
    
    Args:
        file_path: Path to the text file
        
    Returns:
        str: File contents
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()

def load_pdf_file(file_path):
    """
    Loads a PDF file and extracts its text content.
    Requires PyPDF2 library: pip install PyPDF2
    
    Args:
        file_path: Path to the PDF file
        
    Returns:
        str: Extracted text from all pages
    """
    try:
        import PyPDF2
    except ImportError:
        raise ImportError("PyPDF2 required for PDF loading. Install with: pip install PyPDF2")
    
    text = ""
    with open(file_path, 'rb') as f:
        pdf_reader = PyPDF2.PdfReader(f)
        for page in pdf_reader.pages:
            text += page.extract_text() + "\n"
    
    return text

def load_documents(file_paths):
    """
    Loads multiple documents from a list of file paths.
    Automatically detects file type based on extension.
    
    Args:
        file_paths: List of file paths to load
        
    Returns:
        list: List of dictionaries with 'content' and 'source' keys
    """
    documents = []
    
    for file_path in file_paths:
        print(f"Loading {file_path}...")
        
        try:
            if file_path.endswith('.txt'):
                content = load_text_file(file_path)
            elif file_path.endswith('.pdf'):
                content = load_pdf_file(file_path)
            else:
                print(f"Unsupported file type: {file_path}")
                continue
            
            documents.append({
                'content': content,
                'source': file_path
            })
        except Exception as e:
            print(f"Error loading {file_path}: {e}")
    
    print(f"Loaded {len(documents)} documents")
    return documents

These functions load documents and return them as dictionaries containing the text content and source file path. The source path is important for attribution, allowing us to tell users where information came from.

Text Chunking: Breaking Documents into Manageable Pieces

Raw documents are often too long to process efficiently. A single document might contain thousands of words, but we only need to retrieve the specific sections relevant to a query. Additionally, embedding models and language models have maximum input lengths. We must split documents into smaller chunks.

Chunking strategies vary in sophistication. The simplest approach is fixed-size chunking, where we split text every N characters or tokens. A better approach is semantic chunking, where we split at natural boundaries like paragraphs or sentences. The best approach depends on your documents and use case.

For our chatbot, we will implement a sentence-aware chunking strategy that splits text into chunks of approximately equal size while respecting sentence boundaries:

def split_into_sentences(text):
    """
    Splits text into sentences using simple heuristics.
    This is a basic implementation; more sophisticated approaches
    use NLP libraries like spaCy or NLTK.
    
    Args:
        text: Input text
        
    Returns:
        list: List of sentences
    """
    # Simple sentence splitting based on punctuation
    # This handles most cases but is not perfect
    import re
    
    # Split on sentence-ending punctuation followed by whitespace
    sentences = re.split(r'(?<=[.!?])\s+', text)
    
    # Filter out empty sentences
    sentences = [s.strip() for s in sentences if s.strip()]
    
    return sentences

def chunk_text(text, chunk_size=500, overlap=50):
    """
    Splits text into overlapping chunks while respecting sentence boundaries.
    
    Overlapping chunks ensure that information spanning chunk boundaries
    is not lost. Each chunk includes some context from the previous chunk.
    
    Args:
        text: Input text to chunk
        chunk_size: Target size of each chunk in characters
        overlap: Number of characters to overlap between chunks
        
    Returns:
        list: List of text chunks
    """
    sentences = split_into_sentences(text)
    chunks = []
    current_chunk = []
    current_length = 0
    
    for sentence in sentences:
        sentence_length = len(sentence)
        
        # If adding this sentence exceeds chunk size, save current chunk
        if current_length + sentence_length > chunk_size and current_chunk:
            chunk_text = ' '.join(current_chunk)
            chunks.append(chunk_text)
            
            # Start new chunk with overlap from previous chunk
            # Calculate how many sentences to keep for overlap
            overlap_text = chunk_text[-overlap:] if len(chunk_text) > overlap else chunk_text
            overlap_sentences = split_into_sentences(overlap_text)
            
            current_chunk = overlap_sentences
            current_length = len(' '.join(current_chunk))
        
        # Add sentence to current chunk
        current_chunk.append(sentence)
        current_length += sentence_length + 1  # +1 for space
    
    # Add final chunk
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    
    return chunks

def chunk_documents(documents, chunk_size=500, overlap=50):
    """
    Chunks multiple documents and maintains source attribution.
    
    Args:
        documents: List of document dictionaries
        chunk_size: Target chunk size
        overlap: Overlap between chunks
        
    Returns:
        list: List of chunk dictionaries with content, source, and chunk_id
    """
    all_chunks = []
    
    for doc in documents:
        chunks = chunk_text(doc['content'], chunk_size, overlap)
        
        for i, chunk in enumerate(chunks):
            all_chunks.append({
                'content': chunk,
                'source': doc['source'],
                'chunk_id': i
            })
    
    print(f"Created {len(all_chunks)} chunks from {len(documents)} documents")
    return all_chunks

The chunk_text function implements overlapping chunking. Overlap is important because it ensures that information spanning chunk boundaries is not lost. If a relevant sentence is split across two chunks, the overlap ensures it appears complete in at least one chunk.

The chunk_size parameter controls the target size of each chunk. Smaller chunks are more precise but may lack context. Larger chunks provide more context but are less precise. Typical values range from 300 to 1000 characters.

The overlap parameter controls how much text is shared between consecutive chunks. Typical values are 10-20 percent of chunk size.

Understanding Embeddings: Converting Text to Numbers

Embeddings are dense vector representations of text that capture semantic meaning. Unlike simple word counts or TF-IDF vectors, embeddings place semantically similar texts close together in vector space. For example, the embeddings for "dog" and "puppy" would be very close, while "dog" and "car" would be far apart.

Embedding models are neural networks trained to convert text into fixed-size vectors. These models learn to encode meaning, context, and relationships into the numerical representation. The Transformers library provides access to many embedding models through the sentence-transformers library.

For RAG, we use embeddings to enable semantic search. We convert all document chunks into embeddings and store them. When a user asks a question, we convert the question into an embedding and find the chunks with the most similar embeddings. This retrieves semantically relevant information even if the exact words differ.

Let us implement an embedding system using the sentence-transformers library:

First, install the library:

pip install sentence-transformers

Now implement the embedding wrapper:

from sentence_transformers import SentenceTransformer
import numpy as np

class EmbeddingModel:
    """
    Wrapper for sentence embedding models.
    Provides a simple interface for encoding text into vectors.
    """
    
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        """
        Initializes the embedding model.
        
        Args:
            model_name: Name of the sentence-transformers model to use
                       Default is a fast, efficient model good for most uses
        """
        print(f"Loading embedding model: {model_name}")
        self.model = SentenceTransformer(model_name)
        self.embedding_dim = self.model.get_sentence_embedding_dimension()
        print(f"Embedding dimension: {self.embedding_dim}")
    
    def embed_text(self, text):
        """
        Converts a single text string into an embedding vector.
        
        Args:
            text: Input text
            
        Returns:
            numpy.ndarray: Embedding vector
        """
        return self.model.encode(text, convert_to_numpy=True)
    
    def embed_batch(self, texts):
        """
        Converts multiple texts into embedding vectors efficiently.
        Batch processing is much faster than encoding texts individually.
        
        Args:
            texts: List of text strings
            
        Returns:
            numpy.ndarray: Matrix of embedding vectors (num_texts x embedding_dim)
        """
        return self.model.encode(texts, convert_to_numpy=True, show_progress_bar=True)

The SentenceTransformer class from the sentence-transformers library handles all the complexity of embedding generation. It loads a pre-trained model, tokenizes input text, runs it through the model, and returns the embedding vector.

The default model "all-MiniLM-L6-v2" is a good general-purpose choice. It is fast, produces 384-dimensional embeddings, and works well for most applications. Other models offer different tradeoffs between speed, embedding quality, and dimensionality.

The embed_batch method is important for efficiency. Encoding texts one at a time is slow because each call involves overhead. Batch encoding processes multiple texts together, leveraging GPU parallelism and reducing overhead.

Vector Databases: Efficient Similarity Search

Once we have embeddings for all chunks, we need a way to quickly find the most similar chunks to a query embedding. This is called similarity search or nearest neighbor search. A naive approach would compute the similarity between the query and every chunk, but this is too slow for large document collections.

Vector databases are specialized systems optimized for similarity search. They use data structures like HNSW graphs or IVF indexes to find similar vectors quickly. For our chatbot, we will implement a simple in-memory vector store using FAISS, a library from Facebook Research.

First, install FAISS:

pip install faiss-cpu

For GPU support, use faiss-gpu instead.

Now implement the vector store:

import faiss
import numpy as np

class VectorStore:
    """
    Simple vector database for storing and searching embeddings.
    Uses FAISS for efficient similarity search.
    """
    
    def __init__(self, embedding_dim):
        """
        Initializes an empty vector store.
        
        Args:
            embedding_dim: Dimensionality of embedding vectors
        """
        self.embedding_dim = embedding_dim
        
        # Create a FAISS index for cosine similarity
        # L2 normalization + inner product = cosine similarity
        self.index = faiss.IndexFlatIP(embedding_dim)
        
        # Store chunk metadata (content, source, etc.)
        self.chunks = []
    
    def add_chunks(self, chunks, embeddings):
        """
        Adds chunks and their embeddings to the store.
        
        Args:
            chunks: List of chunk dictionaries
            embeddings: Numpy array of embedding vectors (num_chunks x embedding_dim)
        """
        # Normalize embeddings for cosine similarity
        faiss.normalize_L2(embeddings)
        
        # Add to FAISS index
        self.index.add(embeddings)
        
        # Store chunk metadata
        self.chunks.extend(chunks)
        
        print(f"Added {len(chunks)} chunks to vector store")
        print(f"Total chunks: {len(self.chunks)}")
    
    def search(self, query_embedding, top_k=5):
        """
        Searches for the most similar chunks to a query embedding.
        
        Args:
            query_embedding: Query embedding vector
            top_k: Number of results to return
            
        Returns:
            list: List of (chunk, score) tuples, sorted by similarity
        """
        # Normalize query embedding
        query_embedding = query_embedding.reshape(1, -1)
        faiss.normalize_L2(query_embedding)
        
        # Search for nearest neighbors
        # Returns distances (similarities) and indices
        distances, indices = self.index.search(query_embedding, top_k)
        
        # Retrieve chunks and scores
        results = []
        for i, idx in enumerate(indices[0]):
            if idx < len(self.chunks):  # Valid index
                chunk = self.chunks[idx]
                score = float(distances[0][i])
                results.append((chunk, score))
        
        return results

The VectorStore class wraps FAISS functionality in a simple interface. The IndexFlatIP index performs exact search using inner product similarity. For very large collections, you might use approximate indexes like IndexIVFFlat for faster search at the cost of some accuracy.

Cosine similarity measures the angle between vectors and is ideal for text embeddings. FAISS does not have a direct cosine similarity index, but we can achieve it by normalizing vectors to unit length and using inner product, which is mathematically equivalent.

The search method returns the top-k most similar chunks along with their similarity scores. Higher scores indicate greater similarity.

Similarity Metrics and Search Strategies

We have been using cosine similarity, but other similarity metrics exist. Understanding when to use each is important.

Cosine similarity measures the angle between vectors, ignoring magnitude. It ranges from negative one to one, where one means identical direction. This is ideal for text because we care about semantic similarity regardless of text length.

Euclidean distance measures the straight-line distance between vectors in space. Smaller distances indicate greater similarity. This is sensitive to vector magnitude, which can be problematic for text of varying lengths.

Dot product similarity is the inner product of vectors. It combines both direction and magnitude. This can be useful when magnitude carries meaning, but for text embeddings, cosine similarity is generally better.

Integrating RAG into the Chatbot

Now we integrate all RAG components into our chatbot. The workflow is as follows. When the user asks a question, we embed the question, search the vector store for relevant chunks, format the retrieved chunks as context, and provide them to the language model along with the question.

Here is the complete RAG integration:

def format_rag_prompt(query, retrieved_chunks, conversation_history):
    """
    Formats a prompt that includes retrieved context.
    
    Args:
        query: User's question
        retrieved_chunks: List of (chunk, score) tuples
        conversation_history: Existing conversation history
        
    Returns:
        list: Updated conversation history with RAG context
    """
    # Build context from retrieved chunks
    context_parts = []
    for i, (chunk, score) in enumerate(retrieved_chunks, 1):
        context_parts.append(f"[Document {i} from {chunk['source']}]")
        context_parts.append(chunk['content'])
        context_parts.append("")  # Blank line
    
    context = "\n".join(context_parts)
    
    # Create RAG-enhanced message
    rag_message = f"""Based on the following context, please answer the question.

Context:
{context}

Question: {query}

Please provide a detailed answer based on the context provided."""
    
    # Add to conversation history
    updated_history = conversation_history.copy()
    updated_history.append(rag_message)
    
    return updated_history

def generate_rag_response(model, tokenizer, device, query, conversation_history, 
                         vector_store, embedding_model, gen_config, top_k=3):
    """
    Generates a response using RAG.
    
    Args:
        model: Language model
        tokenizer: Tokenizer
        device: Device
        query: User query
        conversation_history: Conversation history
        vector_store: VectorStore instance
        embedding_model: EmbeddingModel instance
        gen_config: GenerationConfig instance
        top_k: Number of chunks to retrieve
        
    Returns:
        tuple: (response_text, retrieved_chunks)
    """
    # Embed query
    query_embedding = embedding_model.embed_text(query)
    
    # Retrieve relevant chunks
    retrieved_chunks = vector_store.search(query_embedding, top_k)
    
    # Format prompt with retrieved context
    rag_history = format_rag_prompt(query, retrieved_chunks, conversation_history)
    
    # Generate response
    formatted_prompt = tokenizer.eos_token.join(rag_history) + tokenizer.eos_token
    
    input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
    input_ids = input_ids.to(device)
    
    config_dict = gen_config.to_dict()
    config_dict["pad_token_id"] = tokenizer.eos_token_id
    
    with torch.no_grad():
        output_ids = model.generate(input_ids, **config_dict)
    
    response = tokenizer.decode(
        output_ids[0][input_ids.shape[1]:], 
        skip_special_tokens=True
    )
    
    return response.strip(), retrieved_chunks

The format_rag_prompt function creates a prompt that includes retrieved context. It lists each retrieved chunk with its source, then presents the user's question. This gives the model all the information it needs to generate a grounded response.

The generate_rag_response function orchestrates the entire RAG workflow. It retrieves relevant chunks, formats the prompt, and generates a response. It also returns the retrieved chunks so we can show users which sources were used.

Building the RAG-Enabled Chatbot

Let us now create the complete fourth version of our chatbot with full RAG capabilities:

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import os

# [Previous classes: GenerationConfig, EmbeddingModel, VectorStore]

class RAGChatbot:
    """
    Complete RAG-enabled chatbot system.
    """
    
    def __init__(self, model_name, embedding_model_name="all-MiniLM-L6-v2"):
        """
        Initializes the RAG chatbot.
        
        Args:
            model_name: HuggingFace model name for generation
            embedding_model_name: Sentence-transformers model for embeddings
        """
        # Load generation model
        print("Loading generation model...")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True
        )
        self.device = self.get_optimal_device()
        self.model = self.model.to(self.device)
        self.model.eval()
        
        # Load embedding model
        self.embedding_model = EmbeddingModel(embedding_model_name)
        
        # Initialize vector store
        self.vector_store = VectorStore(self.embedding_model.embedding_dim)
        
        # Generation configuration
        self.gen_config = GenerationConfig()
        
        # Conversation history
        self.conversation_history = [
            "You are a helpful AI assistant."
        ]
        
        # Document chunks
        self.chunks = []
    
    def get_optimal_device(self):
        if torch.cuda.is_available():
            return torch.device("cuda")
        elif torch.backends.mps.is_available():
            return torch.device("mps")
        else:
            return torch.device("cpu")
    
    def load_documents(self, file_paths, chunk_size=500, overlap=50):
        """
        Loads and indexes documents for RAG.
        
        Args:
            file_paths: List of document file paths
            chunk_size: Size of text chunks
            overlap: Overlap between chunks
        """
        print("Loading documents...")
        documents = load_documents(file_paths)
        
        print("Chunking documents...")
        self.chunks = chunk_documents(documents, chunk_size, overlap)
        
        print("Generating embeddings...")
        chunk_texts = [chunk['content'] for chunk in self.chunks]
        embeddings = self.embedding_model.embed_batch(chunk_texts)
        
        print("Building vector index...")
        self.vector_store.add_chunks(self.chunks, embeddings)
        
        print("Document indexing complete!")
    
    def chat(self, user_input, use_rag=True, top_k=3):
        """
        Processes a user message and generates a response.
        
        Args:
            user_input: User's message
            use_rag: Whether to use RAG
            top_k: Number of chunks to retrieve
            
        Returns:
            tuple: (response, sources)
        """
        if use_rag and self.chunks:
            # RAG-enabled response
            response, retrieved_chunks = generate_rag_response(
                self.model,
                self.tokenizer,
                self.device,
                user_input,
                self.conversation_history,
                self.vector_store,
                self.embedding_model,
                self.gen_config,
                top_k
            )
            
            # Extract sources
            sources = [chunk['source'] for chunk, _ in retrieved_chunks]
            
            # Update conversation history with original query and response
            self.conversation_history.append(user_input)
            self.conversation_history.append(response)
            
            return response, sources
        else:
            # Regular response without RAG
            self.conversation_history.append(user_input)
            
            formatted_prompt = self.tokenizer.eos_token.join(self.conversation_history) + self.tokenizer.eos_token
            
            input_ids = self.tokenizer.encode(formatted_prompt, return_tensors="pt")
            input_ids = input_ids.to(self.device)
            
            config_dict = self.gen_config.to_dict()
            config_dict["pad_token_id"] = self.tokenizer.eos_token_id
            
            with torch.no_grad():
                output_ids = self.model.generate(input_ids, **config_dict)
            
            response = self.tokenizer.decode(
                output_ids[0][input_ids.shape[1]:], 
                skip_special_tokens=True
            )
            
            self.conversation_history.append(response.strip())
            
            return response.strip(), []
    
    def reset_conversation(self):
        """Clears conversation history."""
        self.conversation_history = [self.conversation_history[0]]

def main():
    print("=" * 60)
    print("RAG-ENABLED CHATBOT - Version 4.0")
    print("=" * 60)
    
    # Initialize chatbot
    chatbot = RAGChatbot("gpt2")
    
    # Load documents if available
    doc_dir = "documents"
    if os.path.exists(doc_dir):
        doc_files = [os.path.join(doc_dir, f) for f in os.listdir(doc_dir) 
                     if f.endswith(('.txt', '.pdf'))]
        if doc_files:
            chatbot.load_documents(doc_files)
    
    print("\nCommands: 'quit', 'reset', 'config', 'rag on/off'")
    print("-" * 60)
    
    use_rag = True
    
    while True:
        user_input = input("\nYou: ").strip()
        
        if user_input.lower() in ['quit', 'exit']:
            print("Goodbye!")
            break
        
        if user_input.lower() == 'reset':
            chatbot.reset_conversation()
            print("Conversation reset")
            continue
        
        if user_input.lower() == 'config':
            chatbot.gen_config.configure_interactive()
            continue
        
        if user_input.lower() == 'rag on':
            use_rag = True
            print("RAG enabled")
            continue
        
        if user_input.lower() == 'rag off':
            use_rag = False
            print("RAG disabled")
            continue
        
        if not user_input:
            continue
        
        print("Assistant: ", end="", flush=True)
        response, sources = chatbot.chat(user_input, use_rag)
        print(response)
        
        if sources:
            print(f"\nSources: {', '.join(set(sources))}")

if __name__ == "__main__":
    main()

This complete RAG-enabled chatbot can load documents, index them, retrieve relevant information, and generate grounded responses. Users can toggle RAG on or off to compare different modes.

STEP FIVE: WEB-BASED CHATBOT INTERFACE

Transitioning from Console to Web

Our final step is transforming the console chatbot into a web application. This makes it accessible to users who are not comfortable with command-line interfaces and enables features like rich formatting, file uploads, and concurrent users.

We will use Flask, a lightweight Python web framework, to build our web application. Flask provides routing, request handling, and template rendering. We will also implement streaming responses using Server-Sent Events, which allows the chatbot to display text as it is generated rather than waiting for the complete response.

Understanding Flask Basics

Flask is a micro web framework that makes building web applications simple. A Flask application consists of routes, which are functions that handle HTTP requests for specific URLs. Here is a minimal Flask application:

from flask import Flask, render_template, request, jsonify

app = Flask(__name__)

@app.route('/')
def home():
    """Renders the home page."""
    return render_template('index.html')

@app.route('/api/chat', methods=['POST'])
def chat():
    """Handles chat requests."""
    data = request.get_json()
    user_message = data.get('message', '')
    
    # Process message and generate response
    response = f"You said: {user_message}"
    
    return jsonify({'response': response})

if __name__ == '__main__':
    app.run(debug=True, port=5000)

The @app.route decorator defines URL routes. The home function renders an HTML template for the main page. The chat function handles POST requests to the /api/chat endpoint, processes the message, and returns a JSON response.

Flask templates use Jinja2 syntax, which allows embedding Python expressions in HTML. Templates are stored in a templates directory relative to the Flask application file.

Implementing Streaming Responses

Streaming responses improve user experience by showing text as it is generated rather than waiting for the complete response. This is especially important for long responses that might take many seconds to generate.

The Transformers library supports streaming through the TextIteratorStreamer class. This class receives tokens as they are generated and makes them available through an iterator:

from transformers import TextIteratorStreamer
from threading import Thread

def generate_streaming_response(model, tokenizer, device, conversation_history, gen_config):
    """
    Generates a streaming response.
    
    Yields:
        str: Generated text chunks
    """
    formatted_prompt = tokenizer.eos_token.join(conversation_history) + tokenizer.eos_token
    
    input_ids = tokenizer.encode(formatted_prompt, return_tensors="pt")
    input_ids = input_ids.to(device)
    
    config_dict = gen_config.to_dict()
    config_dict["pad_token_id"] = tokenizer.eos_token_id
    
    # Create streamer
    streamer = TextIteratorStreamer(
        tokenizer,
        skip_prompt=True,  # Don't include the input prompt in output
        skip_special_tokens=True  # Skip EOS and padding tokens
    )
    
    # Generate in a separate thread
    # This allows the streamer to yield tokens while generation continues
    generation_kwargs = {
        "input_ids": input_ids,
        "streamer": streamer,
        **config_dict
    }
    
    thread = Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()
    
    # Yield tokens as they become available
    for text_chunk in streamer:
        yield text_chunk
    
    thread.join()

The TextIteratorStreamer is passed to the generate method, which sends tokens to it as they are produced. We run generation in a separate thread so the main thread can iterate over the streamer and yield chunks to the web client.

Server-Sent Events for Real-Time Updates

Server-Sent Events, abbreviated as SSE, is a web standard for pushing updates from server to client. Unlike WebSockets, which require bidirectional communication, SSE is unidirectional and simpler to implement.

Flask can serve SSE responses using a generator function:

from flask import Response

@app.route('/api/chat/stream', methods=['POST'])
def chat_stream():
    """Streams chat response using Server-Sent Events."""
    data = request.get_json()
    user_message = data.get('message', '')
    
    def generate():
        # Add user message to history
        chatbot.conversation_history.append(user_message)
        
        # Stream response
        full_response = ""
        for chunk in generate_streaming_response(
            chatbot.model,
            chatbot.tokenizer,
            chatbot.device,
            chatbot.conversation_history,
            chatbot.gen_config
        ):
            full_response += chunk
            # Format as SSE
            yield f"data: {chunk}\n\n"
        
        # Add complete response to history
        chatbot.conversation_history.append(full_response)
        
        # Send completion signal
        yield "data: [DONE]\n\n"
    
    return Response(generate(), mimetype='text/event-stream')

The generate function is a generator that yields SSE-formatted messages. Each message is prefixed with "data: " and followed by two newlines. The client receives these messages in real-time and can update the UI incrementally.

Building the Web Interface

The web interface consists of HTML, CSS, and JavaScript. We create a chat interface with a message list and input form. JavaScript handles sending messages and receiving streaming responses.

Create a directory named templates in the same directory as your Flask application, and save the following HTML file as templates/index.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>RAG Chatbot</title>
    <style>
        * {
            margin: 0;
            padding: 0;
            box-sizing: border-box;
        }
        
        body {
            font-family: Arial, sans-serif;
            background: #f5f5f5;
            height: 100vh;
            display: flex;
            flex-direction: column;
        }
        
        .header {
            background: #2c3e50;
            color: white;
            padding: 20px;
            text-align: center;
        }
        
        .chat-container {
            flex: 1;
            display: flex;
            flex-direction: column;
            max-width: 900px;
            width: 100%;
            margin: 0 auto;
            background: white;
            box-shadow: 0 2px 10px rgba(0,0,0,0.1);
        }
        
        .messages {
            flex: 1;
            overflow-y: auto;
            padding: 20px;
        }
        
        .message {
            margin-bottom: 15px;
            padding: 10px 15px;
            border-radius: 8px;
            max-width: 70%;
        }
        
        .user-message {
            background: #3498db;
            color: white;
            margin-left: auto;
        }
        
        .assistant-message {
            background: #ecf0f1;
            color: #2c3e50;
        }
        
        .input-container {
            padding: 20px;
            border-top: 1px solid #ddd;
            display: flex;
            gap: 10px;
        }
        
        #message-input {
            flex: 1;
            padding: 10px;
            border: 1px solid #ddd;
            border-radius: 4px;
            font-size: 14px;
        }
        
        #send-button {
            padding: 10px 20px;
            background: #3498db;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
            font-size: 14px;
        }
        
        #send-button:hover {
            background: #2980b9;
        }
        
        #send-button:disabled {
            background: #95a5a6;
            cursor: not-allowed;
        }
        
        .typing-indicator {
            display: none;
            padding: 10px 15px;
            background: #ecf0f1;
            border-radius: 8px;
            max-width: 70%;
            color: #7f8c8d;
        }
        
        .typing-indicator.active {
            display: block;
        }
    </style>
</head>
<body>
    <div class="header">
        <h1>RAG-Enabled Chatbot</h1>
        <p>Powered by HuggingFace Transformers</p>
    </div>
    
    <div class="chat-container">
        <div class="messages" id="messages">
            <div class="message assistant-message">
                Hello! I'm your AI assistant. How can I help you today?
            </div>
        </div>
        
        <div class="typing-indicator" id="typing-indicator">
            Assistant is typing...
        </div>
        
        <div class="input-container">
            <input 
                type="text" 
                id="message-input" 
                placeholder="Type your message here..."
                autocomplete="off"
            >
            <button id="send-button">Send</button>
        </div>
    </div>
    
    <script>
        const messagesContainer = document.getElementById('messages');
        const messageInput = document.getElementById('message-input');
        const sendButton = document.getElementById('send-button');
        const typingIndicator = document.getElementById('typing-indicator');
        
        function addMessage(content, isUser) {
            const messageDiv = document.createElement('div');
            messageDiv.className = `message ${isUser ? 'user-message' : 'assistant-message'}`;
            messageDiv.textContent = content;
            messagesContainer.appendChild(messageDiv);
            messagesContainer.scrollTop = messagesContainer.scrollHeight;
            return messageDiv;
        }
        
        async function sendMessage() {
            const message = messageInput.value.trim();
            if (!message) return;
            
            // Add user message
            addMessage(message, true);
            messageInput.value = '';
            
            // Disable input
            messageInput.disabled = true;
            sendButton.disabled = true;
            typingIndicator.classList.add('active');
            
            // Create assistant message div
            const assistantMessageDiv = document.createElement('div');
            assistantMessageDiv.className = 'message assistant-message';
            messagesContainer.appendChild(assistantMessageDiv);
            
            try {
                // Send request and stream response
                const response = await fetch('/api/chat/stream', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json'
                    },
                    body: JSON.stringify({ message: message })
                });
                
                const reader = response.body.getReader();
                const decoder = new TextDecoder();
                
                while (true) {
                    const { done, value } = await reader.read();
                    if (done) break;
                    
                    const chunk = decoder.decode(value);
                    const lines = chunk.split('\n');
                    
                    for (const line of lines) {
                        if (line.startsWith('data: ')) {
                            const data = line.substring(6);
                            if (data === '[DONE]') {
                                break;
                            }
                            assistantMessageDiv.textContent += data;
                            messagesContainer.scrollTop = messagesContainer.scrollHeight;
                        }
                    }
                }
            } catch (error) {
                assistantMessageDiv.textContent = 'Error: Failed to get response';
                console.error('Error:', error);
            } finally {
                messageInput.disabled = false;
                sendButton.disabled = false;
                typingIndicator.classList.remove('active');
            }
        }
        
        sendButton.addEventListener('click', sendMessage);
        messageInput.addEventListener('keypress', function(e) {
            if (e.key === 'Enter') {
                sendMessage();
            }
        });
    </script>
</body>
</html>

This interface provides a clean chat experience with user messages on the right in blue and assistant messages on the left in gray. The JavaScript code handles sending messages and receiving streaming responses using the Fetch API with ReadableStream.

Complete Web Application

Here is the complete Flask application integrating all components. This should be saved as a separate Python file, for example app.py:

from flask import Flask, render_template, request, jsonify, Response
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from threading import Thread, Lock
import os

# Import all previous classes
# [Include GenerationConfig, EmbeddingModel, VectorStore, RAGChatbot classes here]

app = Flask(__name__)

# Global chatbot instance with thread safety
chatbot = None
chatbot_lock = Lock()

def initialize_chatbot():
    """Initializes the chatbot on application startup."""
    global chatbot
    print("Initializing chatbot...")
    chatbot = RAGChatbot("gpt2")
    
    # Load documents if available
    doc_dir = "documents"
    if os.path.exists(doc_dir):
        doc_files = [os.path.join(doc_dir, f) for f in os.listdir(doc_dir) 
                     if f.endswith(('.txt', '.pdf'))]
        if doc_files:
            chatbot.load_documents(doc_files)
    
    print("Chatbot ready!")

@app.route('/')
def home():
    """Renders the main chat interface."""
    return render_template('index.html')

@app.route('/api/chat', methods=['POST'])
def chat():
    """Handles non-streaming chat requests."""
    data = request.get_json()
    user_message = data.get('message', '')
    use_rag = data.get('use_rag', True)
    
    if not user_message:
        return jsonify({'error': 'No message provided'}), 400
    
    try:
        with chatbot_lock:
            response, sources = chatbot.chat(user_message, use_rag=use_rag)
        return jsonify({
            'response': response,
            'sources': sources
        })
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/chat/stream', methods=['POST'])
def chat_stream():
    """Handles streaming chat requests using Server-Sent Events."""
    data = request.get_json()
    user_message = data.get('message', '')
    
    if not user_message:
        return jsonify({'error': 'No message provided'}), 400
    
    def generate():
        try:
            with chatbot_lock:
                # Add user message to history
                chatbot.conversation_history.append(user_message)
                
                # Format prompt
                formatted_prompt = chatbot.tokenizer.eos_token.join(
                    chatbot.conversation_history
                ) + chatbot.tokenizer.eos_token
                
                input_ids = chatbot.tokenizer.encode(formatted_prompt, return_tensors="pt")
                input_ids = input_ids.to(chatbot.device)
                
                config_dict = chatbot.gen_config.to_dict()
                config_dict["pad_token_id"] = chatbot.tokenizer.eos_token_id
                
                # Create streamer
                streamer = TextIteratorStreamer(
                    chatbot.tokenizer,
                    skip_prompt=True,
                    skip_special_tokens=True
                )
                
                # Generate in thread
                generation_kwargs = {
                    "input_ids": input_ids,
                    "streamer": streamer,
                    **config_dict
                }
                
                thread = Thread(target=chatbot.model.generate, kwargs=generation_kwargs)
                thread.start()
                
                # Stream tokens
                full_response = ""
                for text_chunk in streamer:
                    full_response += text_chunk
                    yield f"data: {text_chunk}\n\n"
                
                thread.join()
                
                # Add response to history
                chatbot.conversation_history.append(full_response)
                
                yield "data: [DONE]\n\n"
                
        except Exception as e:
            yield f"data: Error: {str(e)}\n\n"
            yield "data: [DONE]\n\n"
    
    return Response(generate(), mimetype='text/event-stream')

@app.route('/api/reset', methods=['POST'])
def reset():
    """Resets the conversation history."""
    with chatbot_lock:
        chatbot.reset_conversation()
    return jsonify({'status': 'success'})

@app.route('/api/config', methods=['GET', 'POST'])
def config():
    """Gets or updates generation configuration."""
    if request.method == 'GET':
        with chatbot_lock:
            return jsonify({
                'max_new_tokens': chatbot.gen_config.max_new_tokens,
                'temperature': chatbot.gen_config.temperature,
                'top_p': chatbot.gen_config.top_p,
                'top_k': chatbot.gen_config.top_k,
                'repetition_penalty': chatbot.gen_config.repetition_penalty
            })
    else:
        data = request.get_json()
        with chatbot_lock:
            if 'max_new_tokens' in data:
                chatbot.gen_config.max_new_tokens = data['max_new_tokens']
            if 'temperature' in data:
                chatbot.gen_config.temperature = data['temperature']
            if 'top_p' in data:
                chatbot.gen_config.top_p = data['top_p']
            if 'top_k' in data:
                chatbot.gen_config.top_k = data['top_k']
            if 'repetition_penalty' in data:
                chatbot.gen_config.repetition_penalty = data['repetition_penalty']
        
        return jsonify({'status': 'success'})

if __name__ == '__main__':
    initialize_chatbot()
    app.run(debug=True, host='0.0.0.0', port=5000)

This complete web application provides a production-ready chatbot interface with streaming responses, configurable parameters, and RAG capabilities. The thread lock ensures thread safety for concurrent requests.

COMPLETE RUNNING EXAMPLE: PRODUCTION-READY RAG CHATBOT

The following is a complete, production-ready implementation that combines all five versions of our chatbot. This code includes error handling, logging, configuration management, and all features discussed throughout the tutorial. Save this as chatbot_complete.py:

#!/usr/bin/env python3
"""
Complete RAG-Enabled Chatbot System
Supports console and web interfaces with full RAG capabilities
"""

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import os
import sys
from threading import Thread, Lock
from flask import Flask, render_template, request, jsonify, Response
import logging
import re

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class GenerationConfig:
    """Manages generation parameters with validation."""
    
    def __init__(self):
        self.max_new_tokens = 150
        self.temperature = 0.7
        self.top_p = 0.9
        self.top_k = 50
        self.repetition_penalty = 1.1
        self.do_sample = True
        self.num_beams = 1
    
    def to_dict(self):
        """Converts configuration to dictionary for model.generate()."""
        config = {
            "max_new_tokens": self.max_new_tokens,
            "do_sample": self.do_sample,
            "pad_token_id": None
        }
        
        if self.do_sample:
            config["temperature"] = self.temperature
            config["top_p"] = self.top_p
            config["top_k"] = self.top_k
            config["repetition_penalty"] = self.repetition_penalty
        else:
            if self.num_beams > 1:
                config["num_beams"] = self.num_beams
                config["early_stopping"] = True
        
        return config
    
    def display(self):
        """Displays current configuration."""
        print("\nGeneration Configuration:")
        print(f"  Max New Tokens: {self.max_new_tokens}")
        print(f"  Mode: {'Sampling' if self.do_sample else 'Greedy/Beam'}")
        if self.do_sample:
            print(f"  Temperature: {self.temperature}")
            print(f"  Top-p: {self.top_p}")
            print(f"  Top-k: {self.top_k}")
            print(f"  Repetition Penalty: {self.repetition_penalty}")
        else:
            print(f"  Num Beams: {self.num_beams}")
    
    def configure_interactive(self):
        """Interactive configuration menu."""
        while True:
            print("\n" + "=" * 60)
            print("PARAMETER CONFIGURATION")
            print("=" * 60)
            self.display()
            print("\n1. Max Tokens  2. Temperature  3. Top-p  4. Top-k")
            print("5. Repetition Penalty  6. Toggle Sampling  7. Num Beams")
            print("0. Done")
            
            choice = input("\nSelect: ").strip()
            
            if choice == "0":
                break
            elif choice == "1":
                try:
                    self.max_new_tokens = int(input("Max tokens: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "2":
                try:
                    self.temperature = float(input("Temperature: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "3":
                try:
                    self.top_p = float(input("Top-p: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "4":
                try:
                    self.top_k = int(input("Top-k: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "5":
                try:
                    self.repetition_penalty = float(input("Repetition penalty: "))
                except ValueError:
                    print("Invalid number")
            elif choice == "6":
                self.do_sample = not self.do_sample
            elif choice == "7":
                try:
                    self.num_beams = int(input("Num beams: "))
                    if self.num_beams > 1:
                        self.do_sample = False
                except ValueError:
                    print("Invalid number")


class EmbeddingModel:
    """Wrapper for sentence embedding models."""
    
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        logger.info(f"Loading embedding model: {model_name}")
        self.model = SentenceTransformer(model_name)
        self.embedding_dim = self.model.get_sentence_embedding_dimension()
        logger.info(f"Embedding dimension: {self.embedding_dim}")
    
    def embed_text(self, text):
        """Embeds a single text string."""
        return self.model.encode(text, convert_to_numpy=True)
    
    def embed_batch(self, texts):
        """Embeds multiple texts efficiently."""
        return self.model.encode(texts, convert_to_numpy=True, show_progress_bar=True)


class VectorStore:
    """Vector database for similarity search."""
    
    def __init__(self, embedding_dim):
        self.embedding_dim = embedding_dim
        self.index = faiss.IndexFlatIP(embedding_dim)
        self.chunks = []
    
    def add_chunks(self, chunks, embeddings):
        """Adds chunks and embeddings to the store."""
        faiss.normalize_L2(embeddings)
        self.index.add(embeddings)
        self.chunks.extend(chunks)
        logger.info(f"Added {len(chunks)} chunks (total: {len(self.chunks)})")
    
    def search(self, query_embedding, top_k=5):
        """Searches for similar chunks."""
        query_embedding = query_embedding.reshape(1, -1)
        faiss.normalize_L2(query_embedding)
        distances, indices = self.index.search(query_embedding, top_k)
        
        results = []
        for i, idx in enumerate(indices[0]):
            if idx < len(self.chunks):
                chunk = self.chunks[idx]
                score = float(distances[0][i])
                results.append((chunk, score))
        
        return results


def load_text_file(file_path):
    """Loads a plain text file."""
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()


def load_pdf_file(file_path):
    """Loads a PDF file and extracts text."""
    try:
        import PyPDF2
    except ImportError:
        raise ImportError("PyPDF2 required. Install with: pip install PyPDF2")
    
    text = ""
    with open(file_path, 'rb') as f:
        pdf_reader = PyPDF2.PdfReader(f)
        for page in pdf_reader.pages:
            text += page.extract_text() + "\n"
    
    return text


def load_documents(file_paths):
    """Loads multiple documents."""
    documents = []
    
    for file_path in file_paths:
        logger.info(f"Loading {file_path}")
        
        try:
            if file_path.endswith('.txt'):
                content = load_text_file(file_path)
            elif file_path.endswith('.pdf'):
                content = load_pdf_file(file_path)
            else:
                logger.warning(f"Unsupported file type: {file_path}")
                continue
            
            documents.append({
                'content': content,
                'source': file_path
            })
        except Exception as e:
            logger.error(f"Error loading {file_path}: {e}")
    
    logger.info(f"Loaded {len(documents)} documents")
    return documents


def split_into_sentences(text):
    """Splits text into sentences."""
    sentences = re.split(r'(?<=[.!?])\s+', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    return sentences


def chunk_text(text, chunk_size=500, overlap=50):
    """Chunks text with overlap."""
    sentences = split_into_sentences(text)
    chunks = []
    current_chunk = []
    current_length = 0
    
    for sentence in sentences:
        sentence_length = len(sentence)
        
        if current_length + sentence_length > chunk_size and current_chunk:
            chunk_text = ' '.join(current_chunk)
            chunks.append(chunk_text)
            
            overlap_text = chunk_text[-overlap:] if len(chunk_text) > overlap else chunk_text
            overlap_sentences = split_into_sentences(overlap_text)
            
            current_chunk = overlap_sentences
            current_length = len(' '.join(current_chunk))
        
        current_chunk.append(sentence)
        current_length += sentence_length + 1
    
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    
    return chunks


def chunk_documents(documents, chunk_size=500, overlap=50):
    """Chunks multiple documents."""
    all_chunks = []
    
    for doc in documents:
        chunks = chunk_text(doc['content'], chunk_size, overlap)
        
        for i, chunk in enumerate(chunks):
            all_chunks.append({
                'content': chunk,
                'source': doc['source'],
                'chunk_id': i
            })
    
    logger.info(f"Created {len(all_chunks)} chunks")
    return all_chunks


def get_optimal_device():
    """Detects optimal device."""
    if torch.cuda.is_available():
        device = torch.device("cuda")
        logger.info(f"Using CUDA: {torch.cuda.get_device_name(0)}")
        return device
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
        logger.info("Using Apple MPS")
        return device
    else:
        device = torch.device("cpu")
        logger.info("Using CPU")
        return device


def manage_context_window(conversation_history, tokenizer, max_tokens=800):
    """Manages context window by removing old messages."""
    system_message = None
    messages = conversation_history.copy()
    
    if messages and len(messages) > 0:
        system_message = messages[0]
        messages = messages[1:]
    
    if messages:
        formatted = tokenizer.eos_token.join(messages)
        token_count = len(tokenizer.encode(formatted))
        
        while token_count > max_tokens and len(messages) > 1:
            messages.pop(0)
            formatted = tokenizer.eos_token.join(messages)
            token_count = len(tokenizer.encode(formatted))
    
    result = []
    if system_message:
        result.append(system_message)
    result.extend(messages)
    
    return result


def format_rag_prompt(query, retrieved_chunks, conversation_history):
    """Formats a prompt with retrieved context."""
    context_parts = []
    for i, (chunk, score) in enumerate(retrieved_chunks, 1):
        context_parts.append(f"[Document {i} from {chunk['source']}]")
        context_parts.append(chunk['content'])
        context_parts.append("")
    
    context = "\n".join(context_parts)
    
    rag_message = f"""Based on the following context, please answer the question.

Context:
{context}

Question: {query}

Please provide a detailed answer based on the context provided."""
    
    updated_history = conversation_history.copy()
    updated_history.append(rag_message)
    
    return updated_history


class RAGChatbot:
    """Complete RAG-enabled chatbot system."""
    
    def __init__(self, model_name, embedding_model_name="all-MiniLM-L6-v2"):
        logger.info("Initializing RAG chatbot")
        
        # Load generation model
        logger.info(f"Loading model: {model_name}")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True
        )
        
        self.device = get_optimal_device()
        self.model = self.model.to(self.device)
        self.model.eval()
        
        # Load embedding model
        self.embedding_model = EmbeddingModel(embedding_model_name)
        
        # Initialize vector store
        self.vector_store = VectorStore(self.embedding_model.embedding_dim)
        
        # Generation configuration
        self.gen_config = GenerationConfig()
        
        # Conversation history
        self.conversation_history = [
            "You are a helpful AI assistant."
        ]
        
        # Document chunks
        self.chunks = []
        
        logger.info("Chatbot initialized successfully")
    
    def load_documents(self, file_paths, chunk_size=500, overlap=50):
        """Loads and indexes documents."""
        logger.info("Loading documents for RAG")
        documents = load_documents(file_paths)
        
        logger.info("Chunking documents")
        self.chunks = chunk_documents(documents, chunk_size, overlap)
        
        logger.info("Generating embeddings")
        chunk_texts = [chunk['content'] for chunk in self.chunks]
        embeddings = self.embedding_model.embed_batch(chunk_texts)
        
        logger.info("Building vector index")
        self.vector_store.add_chunks(self.chunks, embeddings)
        
        logger.info("Document indexing complete")
    
    def chat(self, user_input, use_rag=True, top_k=3):
        """Processes user message and generates response."""
        try:
            if use_rag and self.chunks:
                return self._chat_with_rag(user_input, top_k)
            else:
                return self._chat_without_rag(user_input)
        except Exception as e:
            logger.error(f"Error in chat: {e}")
            return f"Error: {str(e)}", []
    
    def _chat_with_rag(self, user_input, top_k):
        """Generates RAG-enabled response."""
        # Retrieve relevant chunks
        query_embedding = self.embedding_model.embed_text(user_input)
        retrieved_chunks = self.vector_store.search(query_embedding, top_k)
        
        # Format RAG prompt
        rag_history = format_rag_prompt(user_input, retrieved_chunks, self.conversation_history)
        
        # Generate response
        response = self._generate(rag_history)
        
        # Update history with original query
        self.conversation_history.append(user_input)
        self.conversation_history.append(response)
        
        sources = [chunk['source'] for chunk, _ in retrieved_chunks]
        return response, sources
    
    def _chat_without_rag(self, user_input):
        """Generates regular response."""
        self.conversation_history.append(user_input)
        response = self._generate(self.conversation_history)
        self.conversation_history.append(response)
        return response, []
    
    def _generate(self, conversation_history):
        """Generates response from conversation history."""
        formatted_prompt = self.tokenizer.eos_token.join(conversation_history) + self.tokenizer.eos_token
        
        input_ids = self.tokenizer.encode(formatted_prompt, return_tensors="pt")
        input_ids = input_ids.to(self.device)
        
        config_dict = self.gen_config.to_dict()
        config_dict["pad_token_id"] = self.tokenizer.eos_token_id
        
        with torch.no_grad():
            output_ids = self.model.generate(input_ids, **config_dict)
        
        response = self.tokenizer.decode(
            output_ids[0][input_ids.shape[1]:], 
            skip_special_tokens=True
        )
        
        return response.strip()
    
    def generate_streaming(self, conversation_history):
        """Generates streaming response."""
        formatted_prompt = self.tokenizer.eos_token.join(conversation_history) + self.tokenizer.eos_token
        
        input_ids = self.tokenizer.encode(formatted_prompt, return_tensors="pt")
        input_ids = input_ids.to(self.device)
        
        config_dict = self.gen_config.to_dict()
        config_dict["pad_token_id"] = self.tokenizer.eos_token_id
        
        streamer = TextIteratorStreamer(
            self.tokenizer,
            skip_prompt=True,
            skip_special_tokens=True
        )
        
        generation_kwargs = {
            "input_ids": input_ids,
            "streamer": streamer,
            **config_dict
        }
        
        thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
        thread.start()
        
        for text_chunk in streamer:
            yield text_chunk
        
        thread.join()
    
    def reset_conversation(self):
        """Resets conversation history."""
        self.conversation_history = [self.conversation_history[0]]
        logger.info("Conversation reset")


def run_console_chatbot():
    """Runs console version of chatbot."""
    print("=" * 60)
    print("RAG-ENABLED CONSOLE CHATBOT")
    print("=" * 60)
    
    model_name = "gpt2"
    chatbot = RAGChatbot(model_name)
    
    # Load documents if available
    doc_dir = "documents"
    if os.path.exists(doc_dir):
        doc_files = [os.path.join(doc_dir, f) for f in os.listdir(doc_dir) 
                     if f.endswith(('.txt', '.pdf'))]
        if doc_files:
            chatbot.load_documents(doc_files)
    
    print("\nCommands: 'quit', 'reset', 'config', 'rag on/off'")
    print("-" * 60)
    
    use_rag = True
    
    while True:
        user_input = input("\nYou: ").strip()
        
        if user_input.lower() in ['quit', 'exit']:
            print("Goodbye!")
            break
        
        if user_input.lower() == 'reset':
            chatbot.reset_conversation()
            print("Conversation reset")
            continue
        
        if user_input.lower() == 'config':
            chatbot.gen_config.configure_interactive()
            continue
        
        if user_input.lower() == 'rag on':
            use_rag = True
            print("RAG enabled")
            continue
        
        if user_input.lower() == 'rag off':
            use_rag = False
            print("RAG disabled")
            continue
        
        if not user_input:
            continue
        
        print("Assistant: ", end="", flush=True)
        response, sources = chatbot.chat(user_input, use_rag)
        print(response)
        
        if sources:
            print(f"\nSources: {', '.join(set(sources))}")


def create_web_app(chatbot):
    """Creates Flask web application."""
    app = Flask(__name__)
    chatbot_lock = Lock()
    
    @app.route('/')
    def home():
        return render_template('index.html')
    
    @app.route('/api/chat', methods=['POST'])
    def chat():
        data = request.get_json()
        user_message = data.get('message', '')
        use_rag = data.get('use_rag', True)
        
        if not user_message:
            return jsonify({'error': 'No message provided'}), 400
        
        try:
            with chatbot_lock:
                response, sources = chatbot.chat(user_message, use_rag=use_rag)
            return jsonify({'response': response, 'sources': sources})
        except Exception as e:
            logger.error(f"Chat error: {e}")
            return jsonify({'error': str(e)}), 500
    
    @app.route('/api/chat/stream', methods=['POST'])
    def chat_stream():
        data = request.get_json()
        user_message = data.get('message', '')
        
        if not user_message:
            return jsonify({'error': 'No message provided'}), 400
        
        def generate():
            try:
                with chatbot_lock:
                    chatbot.conversation_history.append(user_message)
                    
                    full_response = ""
                    for chunk in chatbot.generate_streaming(chatbot.conversation_history):
                        full_response += chunk
                        yield f"data: {chunk}\n\n"
                    
                    chatbot.conversation_history.append(full_response)
                    
                    yield "data: [DONE]\n\n"
            except Exception as e:
                logger.error(f"Streaming error: {e}")
                yield f"data: Error: {str(e)}\n\n"
                yield "data: [DONE]\n\n"
        
        return Response(generate(), mimetype='text/event-stream')
    
    @app.route('/api/reset', methods=['POST'])
    def reset():
        with chatbot_lock:
            chatbot.reset_conversation()
        return jsonify({'status': 'success'})
    
    return app


def run_web_chatbot():
    """Runs web version of chatbot."""
    logger.info("Starting web chatbot")
    
    model_name = "gpt2"
    chatbot = RAGChatbot(model_name)
    
    # Load documents
    doc_dir = "documents"
    if os.path.exists(doc_dir):
        doc_files = [os.path.join(doc_dir, f) for f in os.listdir(doc_dir) 
                     if f.endswith(('.txt', '.pdf'))]
        if doc_files:
            chatbot.load_documents(doc_files)
    
    app = create_web_app(chatbot)
    app.run(debug=True, host='0.0.0.0', port=5000)


if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == 'web':
        run_web_chatbot()
    else:
        run_console_chatbot()

This complete implementation provides a fully functional RAG-enabled chatbot with both console and web interfaces. It includes all features discussed throughout the tutorial: conversation memory, configurable generation parameters, document loading and chunking, embedding generation, vector search, and streaming responses.

To use the console version, run:

python chatbot_complete.py

To use the web version, run:

python chatbot_complete.py web

The chatbot automatically detects available hardware and uses GPU acceleration when possible. Documents placed in a documents directory are automatically loaded and indexed for RAG.

This represents a production-ready system that can be deployed and used for real applications. The modular design allows easy extension and customization for specific use cases.