INTRODUCTION: WHY AGENT007 MATTERS IN THE AGE OF AI
In today's rapidly evolving technological landscape, artificial intelligence has become an indispensable tool for productivity and innovation. Large Language Models, commonly known as LLMs, have demonstrated remarkable capabilities in understanding and generating human-like text. However, most people interact with these models through cloud-based services, which raises concerns about data privacy, internet dependency, and recurring subscription costs.
Agent007 represents a different approach. It is a sophisticated yet accessible chatbot system that runs entirely on your local machine, giving you complete control over your data and conversations. The name Agent007 evokes the image of a capable, versatile agent that works for you, handling various tasks with intelligence and discretion.
What makes Agent007 particularly powerful is its dual nature. When you simply want to have a conversation or get help with a task, Agent007 functions as a straightforward chatbot, responding to your queries using its language understanding capabilities. But when you need to work with your own documents, whether they are research papers, company reports, or personal notes, Agent007 transforms into a Retrieval-Augmented Generation system. This means it can search through your documents, find relevant information, and provide answers grounded in your specific content.
The system is designed to work with whatever hardware you have available. Whether you own an Apple computer with Metal Performance Shaders, a gaming PC with an NVIDIA graphics card, a workstation with AMD hardware, or even an Intel-based system, Agent007 automatically detects and optimizes for your specific GPU. This hardware acceleration dramatically speeds up the AI processing, making the experience smooth and responsive.
Furthermore, Agent007 offers flexibility in how you interact with it. If you prefer working in a terminal environment, the console application provides a clean, distraction-free interface. For those who appreciate visual feedback and richer interactions, the web-based graphical interface offers dynamic animations, adjustable lighting themes, and comprehensive controls for fine-tuning the AI's behavior.
This article will guide you through building Agent007 from the ground up. We will explore each component in detail, understand how they work together, and ultimately create a production-ready system that you can customize and extend for your specific needs.
UNDERSTANDING THE ARCHITECTURE: HOW AGENT007 WORKS
Before diving into code, it is essential to understand how Agent007 is structured. The system follows a modular architecture where each component has a specific responsibility, and these components communicate through well-defined interfaces.
At the foundation lies the GPU detection and optimization layer. When Agent007 starts, it examines your system to determine what hardware acceleration is available. This layer abstracts away the complexity of different GPU frameworks, allowing the rest of the system to work uniformly regardless of whether you are using Apple's Metal, NVIDIA's CUDA, AMD's ROCm, or Intel's acceleration technologies.
The next layer is the LLM management system. This component is responsible for loading the language model into memory, managing its lifecycle, and providing a consistent interface for generating responses. It uses the HuggingFace Transformers library, which has become the de facto standard for working with language models. The LLM manager handles model initialization, tokenization, and generation parameters.
When documents are involved, the RAG system comes into play. This system consists of several interconnected parts. First, there is the document processor, which can read various file formats and extract their textual content. The extracted text is then split into manageable chunks that preserve semantic meaning. Each chunk is converted into a mathematical representation called an embedding using a specialized model. These embeddings are stored in a vector database, which allows for efficient similarity searches.
When you ask Agent007 a question in RAG mode, the system converts your question into an embedding, searches the vector database for the most relevant document chunks, and then provides these chunks as context to the language model. The model then generates a response that is informed by your specific documents rather than just its general training.
The conversation management layer keeps track of your interactions with Agent007. It maintains the history of messages, allowing the model to understand context and provide coherent responses across multiple turns. This layer also handles saving conversations to disk and retrieving them later.
The template system provides a way to store and reuse common patterns. System messages, which define the AI's behavior and personality, can be saved as templates. Similarly, frequently used prompts or question patterns can be templated. This makes it easy to switch between different AI personas or use cases.
Finally, there are two interface layers: the console interface and the web interface. The console interface is straightforward, presenting a text-based interaction model. The web interface is more sophisticated, consisting of a backend server that handles API requests and a frontend application that provides the visual experience with animations, controls, and history management.
SETTING UP YOUR DEVELOPMENT ENVIRONMENT
Creating Agent007 requires setting up a Python environment with several specialized libraries. Python has become the lingua franca of machine learning and AI development due to its extensive ecosystem and ease of use.
First, you need Python version 3.9 or higher installed on your system. You can verify your Python version by opening a terminal and typing "python --version" or "python3 --version" depending on your operating system.
Next, it is highly recommended to create a virtual environment for this project. A virtual environment is an isolated Python environment that keeps the dependencies for different projects separate. This prevents version conflicts and makes your project more reproducible. You can create a virtual environment by navigating to your project directory and running "python -m venv agent007_env" on Windows or "python3 -m venv agent007_env" on macOS and Linux. Activate it with "agent007_env\Scripts\activate" on Windows or "source agent007_env/bin/activate" on Unix-like systems.
With your virtual environment active, you need to install the core dependencies. The PyTorch library provides the fundamental tensor operations and neural network capabilities. Importantly, you must install the version of PyTorch that matches your hardware. For NVIDIA GPUs, you need the CUDA-enabled version. For AMD GPUs, you need the ROCm version. For Apple Silicon Macs, the standard PyTorch installation includes MPS support. For Intel GPUs, you need the Intel Extension for PyTorch.
The HuggingFace Transformers library provides access to thousands of pre-trained language models and the tools to work with them. The Accelerate library from HuggingFace automatically handles device placement and mixed precision training. For embeddings and vector operations, you need the Sentence-Transformers library.
LangChain is a framework that simplifies building applications with language models. It provides abstractions for prompts, chains of operations, and integrations with various tools. LangGraph extends LangChain with graph-based workflows, allowing for more complex agent behaviors. LlamaIndex, formerly known as GPT Index, specializes in connecting language models with external data sources.
For the RAG functionality, you need a vector database. FAISS, developed by Facebook AI Research, is an excellent choice for local deployments. It is fast, efficient, and does not require a separate server process. The Chromadb library is another option that provides a more feature-rich vector store.
For document processing, you need libraries that can handle different file formats. PyPDF2 or pdfplumber for PDF files, python-docx for Word documents, and openpyxl for Excel files are common choices.
The web interface requires a web framework. FastAPI is an excellent modern choice that provides automatic API documentation, type checking, and high performance. For the frontend, you will use standard HTML, CSS, and JavaScript with libraries like Axios for HTTP requests and potentially a framework like Vue.js or React for more complex interactions.
Here is an example requirements.txt file that captures these dependencies:
torch>=2.0.0
transformers>=4.30.0
accelerate>=0.20.0
sentence-transformers>=2.2.0
langchain>=0.1.0
langchain-community>=0.0.10
langgraph>=0.0.20
llama-index>=0.9.0
faiss-cpu>=1.7.4
chromadb>=0.4.0
fastapi>=0.100.0
uvicorn>=0.23.0
pydantic>=2.0.0
python-multipart>=0.0.6
pypdf2>=3.0.0
python-docx>=0.8.11
openpyxl>=3.1.0
aiofiles>=23.0.0
Note that for GPU-accelerated FAISS, you would replace "faiss-cpu" with "faiss-gpu" if using CUDA. The specific PyTorch installation command depends on your hardware and can be found on the PyTorch website.
DETECTING AND OPTIMIZING FOR YOUR GPU
One of Agent007's key strengths is its ability to automatically detect and utilize whatever GPU hardware you have available. This capability ensures that the system runs as efficiently as possible on your specific machine without requiring manual configuration.
The GPU detection process begins by checking for the availability of different acceleration frameworks. PyTorch provides built-in functions to query for CUDA availability, which indicates an NVIDIA GPU. For Apple Silicon Macs, PyTorch checks for MPS backend availability. AMD ROCm support is detected through CUDA compatibility layers, and Intel GPU support is checked through the Intel Extension for PyTorch.
The detection logic follows a priority order. First, it checks for CUDA, as NVIDIA GPUs are the most common in machine learning workloads. If CUDA is not available, it checks for Apple's MPS. If neither is available, it looks for ROCm or Intel acceleration. Finally, if no GPU acceleration is detected, the system falls back to CPU processing.
Here is a code example showing how this detection works:
import torch
class GPUDetector:
"""Detects and configures the optimal compute device for the system."""
def __init__(self):
self.device = None
self.device_name = None
self.device_type = None
def detect_device(self):
"""
Detects the best available compute device.
Returns a tuple of (device, device_name, device_type).
"""
# Check for NVIDIA CUDA
if torch.cuda.is_available():
self.device = torch.device("cuda")
self.device_name = torch.cuda.get_device_name(0)
self.device_type = "CUDA"
print(f"Detected NVIDIA GPU: {self.device_name}")
return self.device, self.device_name, self.device_type
# Check for Apple Metal Performance Shaders
if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
self.device = torch.device("mps")
self.device_name = "Apple Silicon GPU"
self.device_type = "MPS"
print(f"Detected Apple Metal GPU")
return self.device, self.device_name, self.device_type
# Check for Intel GPU support
try:
import intel_extension_for_pytorch as ipex
if ipex.xpu.is_available():
self.device = torch.device("xpu")
self.device_name = "Intel GPU"
self.device_type = "XPU"
print(f"Detected Intel GPU")
return self.device, self.device_name, self.device_type
except ImportError:
pass
# Fallback to CPU
self.device = torch.device("cpu")
self.device_name = "CPU"
self.device_type = "CPU"
print("No GPU detected, using CPU")
return self.device, self.device_name, self.device_type
def get_optimal_dtype(self):
"""Returns the optimal data type for the detected device."""
if self.device_type == "CUDA":
# NVIDIA GPUs benefit from mixed precision
return torch.float16
elif self.device_type == "MPS":
# Apple Silicon works well with float16
return torch.float16
elif self.device_type == "XPU":
# Intel GPUs prefer bfloat16
return torch.bfloat16
else:
# CPU uses full precision
return torch.float32
This GPUDetector class encapsulates all the logic for determining the optimal compute device. When you create an instance and call detect_device, it returns information about the best available hardware. The get_optimal_dtype method is particularly important because different GPUs perform better with different numerical precision levels. Lower precision like float16 uses less memory and computes faster, but may have slightly reduced accuracy. For most language model applications, this trade-off is worthwhile.
The device information is then used throughout the system when loading models and processing data. Every tensor operation can be directed to the appropriate device, ensuring that computation happens on the GPU rather than the CPU.
LOADING AND MANAGING THE LANGUAGE MODEL
The heart of Agent007 is the language model itself. This component is responsible for understanding your input and generating intelligent responses. The process of loading and managing a language model involves several important considerations.
First, you must choose which model to use. The HuggingFace Model Hub hosts thousands of models with varying capabilities and sizes. Smaller models like GPT-2 or smaller variants of LLaMA can run on modest hardware but have limited capabilities. Larger models like LLaMA-2-7B or Mistral-7B offer significantly better performance but require more memory and computational power. For Agent007, we will design the system to work with any HuggingFace-compatible model, allowing users to choose based on their hardware and needs.
When loading a model, we need to consider memory constraints. Language models can be quite large, sometimes requiring tens of gigabytes of RAM or VRAM. The HuggingFace Transformers library provides several techniques to manage this. Model quantization reduces the precision of model weights, significantly decreasing memory usage with minimal impact on quality. The 8-bit and 4-bit quantization techniques can reduce memory requirements by factors of two and four respectively.
Another important aspect is the tokenizer. The tokenizer converts text into numerical tokens that the model can process. Each model has a specific tokenizer that must be used with it. The tokenizer also handles special tokens that mark the beginning and end of sequences, separate different parts of a conversation, and perform other structural functions.
Here is an implementation of the LLM manager:
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
import torch
class LLMManager:
"""Manages the loading and inference of the language model."""
def __init__(self, model_name, device, dtype, use_quantization=True):
"""
Initializes the LLM manager.
Args:
model_name: HuggingFace model identifier (e.g., "meta-llama/Llama-2-7b-chat-hf")
device: PyTorch device to load the model on
dtype: Data type for model weights
use_quantization: Whether to use 8-bit quantization to save memory
"""
self.model_name = model_name
self.device = device
self.dtype = dtype
self.use_quantization = use_quantization
self.model = None
self.tokenizer = None
def load_model(self):
"""Loads the model and tokenizer from HuggingFace."""
print(f"Loading model {self.model_name}...")
# Load tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
trust_remote_code=True
)
# Ensure the tokenizer has a padding token
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
# Configure quantization if requested
if self.use_quantization and self.device.type == "cuda":
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0,
llm_int8_has_fp16_weight=False
)
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
quantization_config=quantization_config,
device_map="auto",
trust_remote_code=True
)
else:
# Load without quantization
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
trust_remote_code=True
)
self.model.to(self.device)
self.model.eval() # Set to evaluation mode
print("Model loaded successfully")
def generate_response(self, prompt, max_new_tokens=512, temperature=0.7,
top_p=0.9, top_k=50, repetition_penalty=1.1):
"""
Generates a response to the given prompt.
Args:
prompt: The input text to respond to
max_new_tokens: Maximum number of tokens to generate
temperature: Controls randomness (higher = more random)
top_p: Nucleus sampling parameter
top_k: Top-k sampling parameter
repetition_penalty: Penalty for repeating tokens
Returns:
Generated text response
"""
# Tokenize the input
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Generate response
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
repetition_penalty=repetition_penalty,
do_sample=True,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
# Decode the generated tokens
generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Remove the input prompt from the response
response = generated_text[len(prompt):].strip()
return response
This LLMManager class handles all aspects of model management. The load_model method loads both the tokenizer and the model itself. Notice how it checks whether quantization is requested and whether CUDA is available before applying quantization, as quantization is currently best supported on NVIDIA GPUs.
The generate_response method is where the actual text generation happens. It accepts various parameters that control the generation process. Temperature affects randomness, with higher values producing more creative but potentially less coherent outputs. The top_p and top_k parameters implement nucleus and top-k sampling respectively, which are techniques to improve generation quality. Repetition penalty discourages the model from repeating the same phrases.
The method uses torch.no_grad() context manager to disable gradient computation, which saves memory and speeds up inference since we are not training the model. After generation, it decodes the tokens back into text and removes the original prompt to return only the newly generated content.
BUILDING THE RAG SYSTEM: CONNECTING DOCUMENTS TO THE LLM
Retrieval-Augmented Generation is what transforms Agent007 from a simple chatbot into a powerful document assistant. The RAG system allows the language model to access and reason about information from your specific documents, rather than relying solely on its training data.
The RAG process consists of several stages. First, documents must be loaded and processed. This involves reading files from disk, extracting their text content, and cleaning it. Different file formats require different processing approaches. PDF files might contain text in various encodings and layouts. Word documents have their own structure. Plain text files are simplest but might still need encoding detection.
Once text is extracted, it must be split into chunks. This chunking is crucial because language models have a limited context window, meaning they can only process a certain amount of text at once. Additionally, smaller chunks allow for more precise retrieval. If an entire document is treated as one chunk, the system cannot distinguish between different topics within that document. The chunking strategy must balance size with semantic coherence. Splitting mid-sentence would lose meaning, so chunks are typically split at sentence or paragraph boundaries.
After chunking, each piece of text is converted into an embedding. An embedding is a high-dimensional vector that represents the semantic meaning of the text. Similar texts have similar embeddings. This mathematical representation allows computers to understand and compare meanings. The embedding model is a specialized neural network trained to produce these representations.
The embeddings are stored in a vector database, which is optimized for similarity searches. When you ask a question, your question is also converted into an embedding, and the vector database finds the chunks whose embeddings are most similar to your question's embedding. These relevant chunks are then provided to the language model as context.
Here is an implementation of the document processor:
import os
from typing import List
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
class DocumentProcessor:
"""Handles loading and processing of documents for RAG."""
def __init__(self, chunk_size=1000, chunk_overlap=200):
"""
Initializes the document processor.
Args:
chunk_size: Target size for text chunks in characters
chunk_overlap: Number of overlapping characters between chunks
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
def load_document(self, file_path):
"""
Loads a document from the given file path.
Args:
file_path: Path to the document file
Returns:
List of document chunks
"""
file_extension = os.path.splitext(file_path)[1].lower()
try:
if file_extension == '.pdf':
loader = PyPDFLoader(file_path)
elif file_extension in ['.docx', '.doc']:
loader = Docx2txtLoader(file_path)
elif file_extension == '.txt':
loader = TextLoader(file_path)
else:
raise ValueError(f"Unsupported file format: {file_extension}")
documents = loader.load()
print(f"Loaded {len(documents)} pages from {file_path}")
return documents
except Exception as e:
print(f"Error loading document {file_path}: {str(e)}")
return []
def process_documents(self, file_paths):
"""
Processes multiple documents into chunks.
Args:
file_paths: List of paths to document files
Returns:
List of text chunks with metadata
"""
all_documents = []
for file_path in file_paths:
documents = self.load_document(file_path)
all_documents.extend(documents)
# Split documents into chunks
chunks = self.text_splitter.split_documents(all_documents)
print(f"Created {len(chunks)} chunks from {len(file_paths)} documents")
return chunks
The DocumentProcessor class uses LangChain's document loaders to handle different file formats. The RecursiveCharacterTextSplitter is particularly intelligent about how it splits text. It tries to split at paragraph boundaries first, then sentences, then words, and only splits mid-word as a last resort. This preserves semantic coherence.
The chunk_overlap parameter is important because it ensures that information at chunk boundaries is not lost. If a relevant piece of information spans two chunks, the overlap ensures both chunks contain it.
Now we need to create embeddings and store them in a vector database:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import pickle
class VectorStore:
"""Manages embeddings and similarity search using FAISS."""
def __init__(self, embedding_model_name="all-MiniLM-L6-v2", device="cpu"):
"""
Initializes the vector store.
Args:
embedding_model_name: Name of the sentence transformer model
device: Device to run the embedding model on
"""
self.embedding_model = SentenceTransformer(embedding_model_name)
self.embedding_model.to(device)
self.index = None
self.chunks = []
self.dimension = self.embedding_model.get_sentence_embedding_dimension()
def create_embeddings(self, chunks):
"""
Creates embeddings for the given text chunks.
Args:
chunks: List of document chunks
Returns:
Numpy array of embeddings
"""
texts = [chunk.page_content for chunk in chunks]
self.chunks = chunks
print(f"Creating embeddings for {len(texts)} chunks...")
embeddings = self.embedding_model.encode(
texts,
show_progress_bar=True,
convert_to_numpy=True
)
return embeddings
def build_index(self, embeddings):
"""
Builds a FAISS index from the embeddings.
Args:
embeddings: Numpy array of embeddings
"""
# Normalize embeddings for cosine similarity
faiss.normalize_L2(embeddings)
# Create FAISS index
self.index = faiss.IndexFlatIP(self.dimension) # Inner product for cosine similarity
self.index.add(embeddings)
print(f"Built FAISS index with {self.index.ntotal} vectors")
def search(self, query, k=5):
"""
Searches for the most similar chunks to the query.
Args:
query: Query text
k: Number of results to return
Returns:
List of tuples (chunk, similarity_score)
"""
# Create query embedding
query_embedding = self.embedding_model.encode([query], convert_to_numpy=True)
faiss.normalize_L2(query_embedding)
# Search the index
scores, indices = self.index.search(query_embedding, k)
# Retrieve the corresponding chunks
results = []
for idx, score in zip(indices[0], scores[0]):
if idx < len(self.chunks):
results.append((self.chunks[idx], float(score)))
return results
def save(self, path):
"""Saves the vector store to disk."""
faiss.write_index(self.index, f"{path}/faiss.index")
with open(f"{path}/chunks.pkl", "wb") as f:
pickle.dump(self.chunks, f)
print(f"Vector store saved to {path}")
def load(self, path):
"""Loads the vector store from disk."""
self.index = faiss.read_index(f"{path}/faiss.index")
with open(f"{path}/chunks.pkl", "rb") as f:
self.chunks = pickle.load(f)
print(f"Vector store loaded from {path}")
The VectorStore class uses the Sentence-Transformers library to create embeddings. The all-MiniLM-L6-v2 model is a good default choice because it is fast and produces high-quality embeddings for general purposes. The embeddings are normalized and stored in a FAISS index using inner product similarity, which is equivalent to cosine similarity for normalized vectors.
The search method takes a query, converts it to an embedding, and finds the k most similar chunks. The results include both the chunks and their similarity scores, which can be useful for filtering or ranking.
Now we can tie everything together in a RAG system:
class RAGSystem:
"""Combines document processing, vector search, and LLM for RAG."""
def __init__(self, llm_manager, vector_store, document_processor):
"""
Initializes the RAG system.
Args:
llm_manager: LLMManager instance
vector_store: VectorStore instance
document_processor: DocumentProcessor instance
"""
self.llm_manager = llm_manager
self.vector_store = vector_store
self.document_processor = document_processor
self.is_rag_enabled = False
def index_documents(self, file_paths):
"""
Indexes documents for RAG.
Args:
file_paths: List of document file paths
"""
# Process documents into chunks
chunks = self.document_processor.process_documents(file_paths)
if not chunks:
print("No chunks created from documents")
return
# Create embeddings and build index
embeddings = self.vector_store.create_embeddings(chunks)
self.vector_store.build_index(embeddings)
self.is_rag_enabled = True
print("RAG system ready")
def generate_response(self, query, system_message="", num_context_chunks=3, **generation_params):
"""
Generates a response using RAG if enabled, otherwise uses LLM directly.
Args:
query: User query
system_message: System message to set AI behavior
num_context_chunks: Number of document chunks to use as context
**generation_params: Additional parameters for text generation
Returns:
Generated response
"""
if self.is_rag_enabled:
# Retrieve relevant chunks
results = self.vector_store.search(query, k=num_context_chunks)
# Build context from retrieved chunks
context = "\n\n".join([chunk.page_content for chunk, score in results])
# Construct prompt with context
prompt = f"""{system_message}
Context information from documents:
{context}
User question: {query}
Answer based on the context provided above:"""
else:
# No RAG, just use the query directly
prompt = f"{system_message}\n\nUser: {query}\n\nAssistant:"
# Generate response
response = self.llm_manager.generate_response(prompt, **generation_params)
return response
The RAGSystem class orchestrates the entire RAG workflow. When documents are indexed, it processes them, creates embeddings, and builds the search index. When generating a response, it checks whether RAG is enabled. If so, it retrieves relevant chunks and includes them in the prompt. If not, it simply passes the query to the language model.
This design allows Agent007 to seamlessly switch between pure chatbot mode and RAG mode based on whether documents have been indexed.
MANAGING CONVERSATIONS AND CONTEXT
A key feature of any chatbot is the ability to maintain context across multiple turns of conversation. When you ask a follow-up question, the system should remember what you were talking about. This requires careful management of conversation history.
The conversation manager keeps track of all messages in the current conversation, formats them appropriately for the language model, and can save and load conversations for later reference.
import json
from datetime import datetime
from pathlib import Path
class ConversationManager:
"""Manages conversation history and persistence."""
def __init__(self, storage_dir="conversations"):
"""
Initializes the conversation manager.
Args:
storage_dir: Directory to store conversation files
"""
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.current_conversation = []
self.conversation_id = None
self.system_message = ""
def start_new_conversation(self, system_message=""):
"""
Starts a new conversation.
Args:
system_message: System message defining AI behavior
"""
self.current_conversation = []
self.system_message = system_message
self.conversation_id = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"Started new conversation: {self.conversation_id}")
def add_message(self, role, content):
"""
Adds a message to the current conversation.
Args:
role: Either 'user' or 'assistant'
content: Message content
"""
message = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
}
self.current_conversation.append(message)
def get_conversation_history(self, max_messages=None):
"""
Returns the conversation history.
Args:
max_messages: Maximum number of recent messages to return
Returns:
List of messages
"""
if max_messages is None:
return self.current_conversation
else:
return self.current_conversation[-max_messages:]
def format_conversation_for_llm(self, max_messages=10):
"""
Formats the conversation history for the LLM.
Args:
max_messages: Maximum number of recent messages to include
Returns:
Formatted prompt string
"""
messages = self.get_conversation_history(max_messages)
prompt_parts = []
if self.system_message:
prompt_parts.append(f"System: {self.system_message}\n")
for msg in messages:
role = msg["role"].capitalize()
content = msg["content"]
prompt_parts.append(f"{role}: {content}\n")
prompt_parts.append("Assistant:")
return "\n".join(prompt_parts)
def save_conversation(self):
"""Saves the current conversation to disk."""
if not self.conversation_id:
print("No active conversation to save")
return
conversation_data = {
"id": self.conversation_id,
"system_message": self.system_message,
"messages": self.current_conversation,
"created_at": self.current_conversation[0]["timestamp"] if self.current_conversation else datetime.now().isoformat()
}
file_path = self.storage_dir / f"{self.conversation_id}.json"
with open(file_path, "w") as f:
json.dump(conversation_data, f, indent=2)
print(f"Conversation saved to {file_path}")
def load_conversation(self, conversation_id):
"""
Loads a conversation from disk.
Args:
conversation_id: ID of the conversation to load
"""
file_path = self.storage_dir / f"{conversation_id}.json"
if not file_path.exists():
print(f"Conversation {conversation_id} not found")
return False
with open(file_path, "r") as f:
conversation_data = json.load(f)
self.conversation_id = conversation_data["id"]
self.system_message = conversation_data["system_message"]
self.current_conversation = conversation_data["messages"]
print(f"Loaded conversation {conversation_id}")
return True
def list_conversations(self):
"""
Lists all saved conversations.
Returns:
List of conversation metadata
"""
conversations = []
for file_path in self.storage_dir.glob("*.json"):
with open(file_path, "r") as f:
data = json.load(f)
conversations.append({
"id": data["id"],
"created_at": data["created_at"],
"message_count": len(data["messages"])
})
return sorted(conversations, key=lambda x: x["created_at"], reverse=True)
def search_conversations(self, search_term):
"""
Searches conversations for a term.
Args:
search_term: Term to search for
Returns:
List of matching conversations with context
"""
results = []
for file_path in self.storage_dir.glob("*.json"):
with open(file_path, "r") as f:
data = json.load(f)
for msg in data["messages"]:
if search_term.lower() in msg["content"].lower():
results.append({
"conversation_id": data["id"],
"message": msg,
"created_at": data["created_at"]
})
return results
The ConversationManager class provides comprehensive conversation handling. It stores messages with timestamps, formats them for the language model, and persists them to JSON files. The search functionality allows users to find past conversations containing specific terms, which is valuable when you want to recall previous discussions.
The format_conversation_for_llm method is particularly important. It takes the conversation history and formats it in a way the language model can understand, with clear role labels and the system message at the beginning. The max_messages parameter prevents the context from growing too large, which would exceed the model's context window.
IMPLEMENTING THE TEMPLATE SYSTEM
Templates allow users to save and reuse common patterns for system messages and prompts. This is especially useful when you want to switch between different AI personas or use cases.
class TemplateManager:
"""Manages system message and prompt templates."""
def __init__(self, template_dir="templates"):
"""
Initializes the template manager.
Args:
template_dir: Directory to store template files
"""
self.template_dir = Path(template_dir)
self.template_dir.mkdir(exist_ok=True)
# Create subdirectories for different template types
self.system_template_dir = self.template_dir / "system"
self.prompt_template_dir = self.template_dir / "prompts"
self.system_template_dir.mkdir(exist_ok=True)
self.prompt_template_dir.mkdir(exist_ok=True)
def save_system_template(self, name, content, description=""):
"""
Saves a system message template.
Args:
name: Template name
content: System message content
description: Optional description of the template
"""
template_data = {
"name": name,
"content": content,
"description": description,
"created_at": datetime.now().isoformat()
}
file_path = self.system_template_dir / f"{name}.json"
with open(file_path, "w") as f:
json.dump(template_data, f, indent=2)
print(f"System template '{name}' saved")
def load_system_template(self, name):
"""
Loads a system message template.
Args:
name: Template name
Returns:
Template content or None if not found
"""
file_path = self.system_template_dir / f"{name}.json"
if not file_path.exists():
print(f"System template '{name}' not found")
return None
with open(file_path, "r") as f:
template_data = json.load(f)
return template_data["content"]
def list_system_templates(self):
"""
Lists all system message templates.
Returns:
List of template metadata
"""
templates = []
for file_path in self.system_template_dir.glob("*.json"):
with open(file_path, "r") as f:
data = json.load(f)
templates.append({
"name": data["name"],
"description": data["description"],
"created_at": data["created_at"]
})
return templates
def save_prompt_template(self, name, content, variables=None, description=""):
"""
Saves a prompt template with optional variables.
Args:
name: Template name
content: Prompt content with {variable} placeholders
variables: List of variable names
description: Optional description
"""
template_data = {
"name": name,
"content": content,
"variables": variables or [],
"description": description,
"created_at": datetime.now().isoformat()
}
file_path = self.prompt_template_dir / f"{name}.json"
with open(file_path, "w") as f:
json.dump(template_data, f, indent=2)
print(f"Prompt template '{name}' saved")
def load_prompt_template(self, name, **kwargs):
"""
Loads and formats a prompt template.
Args:
name: Template name
**kwargs: Variable values to substitute
Returns:
Formatted prompt or None if not found
"""
file_path = self.prompt_template_dir / f"{name}.json"
if not file_path.exists():
print(f"Prompt template '{name}' not found")
return None
with open(file_path, "r") as f:
template_data = json.load(f)
content = template_data["content"]
# Substitute variables
try:
formatted_content = content.format(**kwargs)
return formatted_content
except KeyError as e:
print(f"Missing variable for template: {e}")
return None
The TemplateManager separates system templates from prompt templates. System templates define the AI's behavior and personality. Prompt templates are reusable question or instruction patterns that can include variables. For example, you might have a template like "Summarize the following text in {num_sentences} sentences: {text}" where num_sentences and text are variables that get filled in when the template is used.
BUILDING THE CONSOLE APPLICATION
The console application provides a straightforward text-based interface to Agent007. It is ideal for users who prefer working in a terminal or want a lightweight, distraction-free experience.
class ConsoleInterface:
"""Console-based interface for Agent007."""
def __init__(self, rag_system, conversation_manager, template_manager):
"""
Initializes the console interface.
Args:
rag_system: RAGSystem instance
conversation_manager: ConversationManager instance
template_manager: TemplateManager instance
"""
self.rag_system = rag_system
self.conversation_manager = conversation_manager
self.template_manager = template_manager
self.running = True
def print_welcome(self):
"""Prints welcome message."""
print("=" * 70)
print(" AGENT007")
print(" Your Local AI Assistant with RAG Support")
print("=" * 70)
print()
print("Commands:")
print(" /new - Start a new conversation")
print(" /load - Load a saved conversation")
print(" /save - Save current conversation")
print(" /index - Index documents for RAG")
print(" /system - Set system message")
print(" /template - Load a system template")
print(" /params - Adjust generation parameters")
print(" /history - Show conversation history")
print(" /search - Search past conversations")
print(" /quit - Exit Agent007")
print()
def handle_command(self, command):
"""
Handles special commands.
Args:
command: Command string starting with /
Returns:
True if command was handled, False otherwise
"""
if command == "/quit":
self.running = False
print("Goodbye!")
return True
elif command == "/new":
system_msg = input("Enter system message (or press Enter for default): ")
self.conversation_manager.start_new_conversation(system_msg)
return True
elif command == "/save":
self.conversation_manager.save_conversation()
return True
elif command == "/load":
conversations = self.conversation_manager.list_conversations()
if not conversations:
print("No saved conversations found")
return True
print("\nSaved conversations:")
for i, conv in enumerate(conversations):
print(f"{i+1}. {conv['id']} - {conv['message_count']} messages")
choice = input("Enter number to load: ")
try:
idx = int(choice) - 1
if 0 <= idx < len(conversations):
self.conversation_manager.load_conversation(conversations[idx]['id'])
except ValueError:
print("Invalid choice")
return True
elif command == "/index":
doc_path = input("Enter document directory or file path: ")
if os.path.isdir(doc_path):
files = []
for ext in ['.pdf', '.txt', '.docx']:
files.extend(Path(doc_path).glob(f"**/*{ext}"))
file_paths = [str(f) for f in files]
else:
file_paths = [doc_path]
if file_paths:
self.rag_system.index_documents(file_paths)
else:
print("No documents found")
return True
elif command == "/system":
system_msg = input("Enter new system message: ")
self.conversation_manager.system_message = system_msg
print("System message updated")
return True
elif command == "/template":
templates = self.template_manager.list_system_templates()
if not templates:
print("No templates found")
return True
print("\nAvailable templates:")
for i, tmpl in enumerate(templates):
print(f"{i+1}. {tmpl['name']} - {tmpl['description']}")
choice = input("Enter number to load: ")
try:
idx = int(choice) - 1
if 0 <= idx < len(templates):
content = self.template_manager.load_system_template(templates[idx]['name'])
if content:
self.conversation_manager.system_message = content
print(f"Loaded template: {templates[idx]['name']}")
except ValueError:
print("Invalid choice")
return True
elif command == "/history":
history = self.conversation_manager.get_conversation_history()
print("\nConversation History:")
print("-" * 70)
for msg in history:
print(f"{msg['role'].upper()}: {msg['content']}")
print("-" * 70)
return True
elif command == "/search":
term = input("Enter search term: ")
results = self.conversation_manager.search_conversations(term)
if not results:
print("No results found")
else:
print(f"\nFound {len(results)} results:")
for result in results[:10]: # Show first 10
print(f"\nConversation: {result['conversation_id']}")
print(f"{result['message']['role'].upper()}: {result['message']['content'][:100]}...")
return True
return False
def run(self):
"""Runs the console interface main loop."""
self.print_welcome()
# Start with a new conversation
self.conversation_manager.start_new_conversation()
while self.running:
try:
user_input = input("\nYou: ").strip()
if not user_input:
continue
# Check if it's a command
if user_input.startswith("/"):
self.handle_command(user_input)
continue
# Add user message to history
self.conversation_manager.add_message("user", user_input)
# Get conversation context
prompt = self.conversation_manager.format_conversation_for_llm()
# Generate response
print("\nAgent007: ", end="", flush=True)
response = self.rag_system.llm_manager.generate_response(prompt)
print(response)
# Add assistant response to history
self.conversation_manager.add_message("assistant", response)
except KeyboardInterrupt:
print("\n\nInterrupted. Use /quit to exit.")
except Exception as e:
print(f"\nError: {str(e)}")
The ConsoleInterface class provides a complete command-line experience. It handles various slash commands for managing conversations, indexing documents, and adjusting settings. The main loop reads user input, processes commands, and generates responses using the RAG system.
The interface maintains conversation context by using the ConversationManager to format the history appropriately. This ensures that the language model has access to previous messages when generating responses.
CREATING THE WEB INTERFACE
The web interface provides a richer, more visual experience. It consists of a FastAPI backend that handles API requests and serves the frontend, and an HTML/CSS/JavaScript frontend that provides the user interface.
First, let us create the FastAPI backend:
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, StreamingResponse
from pydantic import BaseModel
from typing import List, Optional
import asyncio
class ChatRequest(BaseModel):
"""Request model for chat endpoint."""
message: str
conversation_id: Optional[str] = None
system_message: Optional[str] = ""
max_tokens: int = 512
temperature: float = 0.7
top_p: float = 0.9
class ChatResponse(BaseModel):
"""Response model for chat endpoint."""
response: str
conversation_id: str
class WebAPI:
"""FastAPI-based web interface for Agent007."""
def __init__(self, rag_system, conversation_manager, template_manager):
"""
Initializes the web API.
Args:
rag_system: RAGSystem instance
conversation_manager: ConversationManager instance
template_manager: TemplateManager instance
"""
self.app = FastAPI(title="Agent007 API")
self.rag_system = rag_system
self.conversation_manager = conversation_manager
self.template_manager = template_manager
self.setup_routes()
def setup_routes(self):
"""Sets up API routes."""
@self.app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Handles chat requests."""
try:
# Load or create conversation
if request.conversation_id:
self.conversation_manager.load_conversation(request.conversation_id)
else:
self.conversation_manager.start_new_conversation(request.system_message)
# Add user message
self.conversation_manager.add_message("user", request.message)
# Generate response
prompt = self.conversation_manager.format_conversation_for_llm()
response = self.rag_system.llm_manager.generate_response(
prompt,
max_new_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p
)
# Add assistant response
self.conversation_manager.add_message("assistant", response)
# Save conversation
self.conversation_manager.save_conversation()
return ChatResponse(
response=response,
conversation_id=self.conversation_manager.conversation_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/api/conversations")
async def list_conversations():
"""Lists all saved conversations."""
try:
conversations = self.conversation_manager.list_conversations()
return {"conversations": conversations}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/api/conversation/{conversation_id}")
async def get_conversation(conversation_id: str):
"""Retrieves a specific conversation."""
try:
if self.conversation_manager.load_conversation(conversation_id):
return {
"conversation_id": conversation_id,
"messages": self.conversation_manager.current_conversation,
"system_message": self.conversation_manager.system_message
}
else:
raise HTTPException(status_code=404, detail="Conversation not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.post("/api/index")
async def index_documents(files: List[UploadFile] = File(...)):
"""Indexes uploaded documents for RAG."""
try:
# Save uploaded files temporarily
temp_dir = Path("temp_uploads")
temp_dir.mkdir(exist_ok=True)
file_paths = []
for file in files:
file_path = temp_dir / file.filename
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
file_paths.append(str(file_path))
# Index documents
self.rag_system.index_documents(file_paths)
# Clean up temp files
for file_path in file_paths:
Path(file_path).unlink()
return {"status": "success", "indexed_files": len(file_paths)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/api/templates/system")
async def list_system_templates():
"""Lists system message templates."""
try:
templates = self.template_manager.list_system_templates()
return {"templates": templates}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/api/template/system/{name}")
async def get_system_template(name: str):
"""Retrieves a system template."""
try:
content = self.template_manager.load_system_template(name)
if content:
return {"name": name, "content": content}
else:
raise HTTPException(status_code=404, detail="Template not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.post("/api/search")
async def search_conversations(query: dict):
"""Searches conversations."""
try:
search_term = query.get("term", "")
results = self.conversation_manager.search_conversations(search_term)
return {"results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Serve static files for frontend
self.app.mount("/", StaticFiles(directory="static", html=True), name="static")
def run(self, host="0.0.0.0", port=8000):
"""Runs the web server."""
import uvicorn
uvicorn.run(self.app, host=host, port=port)
The WebAPI class creates a FastAPI application with endpoints for chatting, managing conversations, indexing documents, and working with templates. Each endpoint is asynchronous, allowing the server to handle multiple requests concurrently.
Now we need to create the frontend. This will be an HTML file with embedded CSS and JavaScript. The frontend will feature smooth animations, dynamic lighting effects, and comprehensive controls.
Here is a simplified version of the frontend structure:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Agent007 - AI Assistant</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%);
color: #ffffff;
min-height: 100vh;
transition: background 0.5s ease;
}
body.light-mode {
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
color: #333333;
}
.container {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
.header {
text-align: center;
padding: 30px 0;
animation: fadeInDown 0.8s ease;
}
.header h1 {
font-size: 3em;
text-shadow: 2px 2px 4px rgba(0,0,0,0.3);
animation: glow 2s ease-in-out infinite alternate;
}
@keyframes glow {
from {
text-shadow: 0 0 10px #fff, 0 0 20px #fff, 0 0 30px #00d4ff;
}
to {
text-shadow: 0 0 20px #fff, 0 0 30px #00d4ff, 0 0 40px #00d4ff;
}
}
@keyframes fadeInDown {
from {
opacity: 0;
transform: translateY(-20px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
.chat-container {
background: rgba(255, 255, 255, 0.1);
backdrop-filter: blur(10px);
border-radius: 15px;
padding: 20px;
margin: 20px 0;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3);
animation: fadeIn 1s ease;
}
@keyframes fadeIn {
from { opacity: 0; }
to { opacity: 1; }
}
.messages {
height: 500px;
overflow-y: auto;
padding: 15px;
margin-bottom: 20px;
}
.message {
margin: 15px 0;
padding: 12px 18px;
border-radius: 12px;
animation: slideIn 0.3s ease;
max-width: 80%;
}
@keyframes slideIn {
from {
opacity: 0;
transform: translateX(-20px);
}
to {
opacity: 1;
transform: translateX(0);
}
}
.message.user {
background: rgba(0, 123, 255, 0.7);
margin-left: auto;
text-align: right;
}
.message.assistant {
background: rgba(40, 167, 69, 0.7);
}
.input-area {
display: flex;
gap: 10px;
}
.input-area input {
flex: 1;
padding: 15px;
border: none;
border-radius: 25px;
background: rgba(255, 255, 255, 0.2);
color: #ffffff;
font-size: 16px;
transition: all 0.3s ease;
}
.input-area input:focus {
outline: none;
background: rgba(255, 255, 255, 0.3);
box-shadow: 0 0 15px rgba(0, 212, 255, 0.5);
}
.input-area button {
padding: 15px 30px;
border: none;
border-radius: 25px;
background: linear-gradient(135deg, #00d4ff 0%, #0099cc 100%);
color: #ffffff;
font-size: 16px;
cursor: pointer;
transition: all 0.3s ease;
}
.input-area button:hover {
transform: scale(1.05);
box-shadow: 0 5px 15px rgba(0, 212, 255, 0.4);
}
.controls {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 15px;
margin: 20px 0;
}
.control-group {
background: rgba(255, 255, 255, 0.1);
padding: 15px;
border-radius: 10px;
}
.control-group label {
display: block;
margin-bottom: 8px;
font-weight: bold;
}
.control-group input[type="range"] {
width: 100%;
}
.sidebar {
position: fixed;
right: -300px;
top: 0;
width: 300px;
height: 100vh;
background: rgba(0, 0, 0, 0.9);
padding: 20px;
transition: right 0.3s ease;
overflow-y: auto;
}
.sidebar.open {
right: 0;
}
.sidebar-toggle {
position: fixed;
right: 20px;
top: 20px;
padding: 10px 20px;
background: rgba(0, 212, 255, 0.8);
border: none;
border-radius: 5px;
color: white;
cursor: pointer;
z-index: 1000;
}
</style>
</head>
<body>
<button class="sidebar-toggle" onclick="toggleSidebar()">History</button>
<div class="sidebar" id="sidebar">
<h2>Conversation History</h2>
<div id="conversation-list"></div>
</div>
<div class="container">
<div class="header">
<h1>AGENT007</h1>
<p>Your Local AI Assistant with RAG Support</p>
</div>
<div class="controls">
<div class="control-group">
<label>Temperature: <span id="temp-value">0.7</span></label>
<input type="range" id="temperature" min="0" max="2" step="0.1" value="0.7"
oninput="updateValue('temp-value', this.value)">
</div>
<div class="control-group">
<label>Max Tokens: <span id="tokens-value">512</span></label>
<input type="range" id="max-tokens" min="50" max="2048" step="50" value="512"
oninput="updateValue('tokens-value', this.value)">
</div>
<div class="control-group">
<label>Top P: <span id="topp-value">0.9</span></label>
<input type="range" id="top-p" min="0" max="1" step="0.05" value="0.9"
oninput="updateValue('topp-value', this.value)">
</div>
</div>
<div class="chat-container">
<div class="messages" id="messages"></div>
<div class="input-area">
<input type="text" id="user-input" placeholder="Type your message..."
onkeypress="handleKeyPress(event)">
<button onclick="sendMessage()">Send</button>
</div>
</div>
</div>
<script>
let currentConversationId = null;
function updateValue(elementId, value) {
document.getElementById(elementId).textContent = value;
}
function toggleSidebar() {
document.getElementById('sidebar').classList.toggle('open');
loadConversationList();
}
async function loadConversationList() {
try {
const response = await fetch('/api/conversations');
const data = await response.json();
const listElement = document.getElementById('conversation-list');
listElement.innerHTML = '';
data.conversations.forEach(conv => {
const item = document.createElement('div');
item.className = 'conversation-item';
item.textContent = `${conv.id} (${conv.message_count} messages)`;
item.onclick = () => loadConversation(conv.id);
listElement.appendChild(item);
});
} catch (error) {
console.error('Error loading conversations:', error);
}
}
async function loadConversation(conversationId) {
try {
const response = await fetch(`/api/conversation/${conversationId}`);
const data = await response.json();
currentConversationId = conversationId;
const messagesDiv = document.getElementById('messages');
messagesDiv.innerHTML = '';
data.messages.forEach(msg => {
addMessageToUI(msg.role, msg.content);
});
toggleSidebar();
} catch (error) {
console.error('Error loading conversation:', error);
}
}
function addMessageToUI(role, content) {
const messagesDiv = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = `message ${role}`;
messageDiv.textContent = content;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
async function sendMessage() {
const input = document.getElementById('user-input');
const message = input.value.trim();
if (!message) return;
addMessageToUI('user', message);
input.value = '';
const temperature = parseFloat(document.getElementById('temperature').value);
const maxTokens = parseInt(document.getElementById('max-tokens').value);
const topP = parseFloat(document.getElementById('top-p').value);
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
message: message,
conversation_id: currentConversationId,
temperature: temperature,
max_tokens: maxTokens,
top_p: topP
})
});
const data = await response.json();
currentConversationId = data.conversation_id;
addMessageToUI('assistant', data.response);
} catch (error) {
console.error('Error sending message:', error);
addMessageToUI('assistant', 'Error: Could not get response');
}
}
function handleKeyPress(event) {
if (event.key === 'Enter') {
sendMessage();
}
}
</script>
</body>
</html>
This frontend provides a beautiful, animated interface with gradient backgrounds, glowing effects, and smooth transitions. The controls allow users to adjust generation parameters in real-time. The sidebar shows conversation history and allows loading previous conversations.
PUTTING IT ALL TOGETHER: CONFIGURATION AND INITIALIZATION
Now we need to create a main application that initializes all components and allows the user to choose between console and web interfaces.
import argparse
import sys
class Agent007:
"""Main application class for Agent007."""
def __init__(self, model_name="mistralai/Mistral-7B-Instruct-v0.2",
use_quantization=True):
"""
Initializes Agent007.
Args:
model_name: HuggingFace model to use
use_quantization: Whether to use quantization
"""
print("Initializing Agent007...")
# Detect GPU
self.gpu_detector = GPUDetector()
device, device_name, device_type = self.gpu_detector.detect_device()
dtype = self.gpu_detector.get_optimal_dtype()
# Initialize LLM manager
self.llm_manager = LLMManager(
model_name=model_name,
device=device,
dtype=dtype,
use_quantization=use_quantization
)
self.llm_manager.load_model()
# Initialize vector store
self.vector_store = VectorStore(device=device)
# Initialize document processor
self.document_processor = DocumentProcessor()
# Initialize RAG system
self.rag_system = RAGSystem(
llm_manager=self.llm_manager,
vector_store=self.vector_store,
document_processor=self.document_processor
)
# Initialize conversation manager
self.conversation_manager = ConversationManager()
# Initialize template manager
self.template_manager = TemplateManager()
# Create default templates
self.create_default_templates()
print("Agent007 initialized successfully!")
def create_default_templates(self):
"""Creates some default system message templates."""
default_templates = {
"helpful_assistant": {
"content": "You are a helpful AI assistant. Provide clear, accurate, and concise responses.",
"description": "General helpful assistant"
},
"technical_expert": {
"content": "You are a technical expert. Provide detailed technical explanations with examples.",
"description": "Technical expert persona"
},
"creative_writer": {
"content": "You are a creative writer. Provide imaginative and engaging responses.",
"description": "Creative writing assistant"
}
}
for name, data in default_templates.items():
try:
self.template_manager.save_system_template(
name=name,
content=data["content"],
description=data["description"]
)
except:
pass # Template might already exist
def run_console(self):
"""Runs the console interface."""
console = ConsoleInterface(
rag_system=self.rag_system,
conversation_manager=self.conversation_manager,
template_manager=self.template_manager
)
console.run()
def run_web(self, host="0.0.0.0", port=8000):
"""Runs the web interface."""
web_api = WebAPI(
rag_system=self.rag_system,
conversation_manager=self.conversation_manager,
template_manager=self.template_manager
)
print(f"Starting web server at http://{host}:{port}")
web_api.run(host=host, port=port)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Agent007 - Local AI Assistant with RAG")
parser.add_argument("--mode", choices=["console", "web"], default="console",
help="Interface mode")
parser.add_argument("--model", type=str, default="mistralai/Mistral-7B-Instruct-v0.2",
help="HuggingFace model name")
parser.add_argument("--no-quantization", action="store_true",
help="Disable quantization")
parser.add_argument("--host", type=str, default="0.0.0.0",
help="Web server host")
parser.add_argument("--port", type=int, default=8000,
help="Web server port")
args = parser.parse_args()
try:
agent = Agent007(
model_name=args.model,
use_quantization=not args.no_quantization
)
if args.mode == "console":
agent.run_console()
else:
agent.run_web(host=args.host, port=args.port)
except KeyboardInterrupt:
print("\nShutting down Agent007...")
sys.exit(0)
except Exception as e:
print(f"Error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
This main application provides a command-line interface for starting Agent007 in either console or web mode. It handles initialization of all components and provides sensible defaults.
RUNNING AGENT007: PRACTICAL USAGE EXAMPLES
To run Agent007 in console mode, you would execute:
python agent007.py --mode console --model mistralai/Mistral-7B-Instruct-v0.2
This starts the console interface with the Mistral 7B model. Once running, you can interact with it naturally. To enable RAG, you would use the /index command to point to your documents.
For the web interface:
python agent007.py --mode web --port 8000
Then open your browser to http://localhost:8000 to access the graphical interface.
The system automatically detects your GPU and optimizes accordingly. On an Apple Silicon Mac, it uses Metal Performance Shaders. On a system with an NVIDIA GPU, it uses CUDA. The quantization feature significantly reduces memory usage, allowing larger models to run on consumer hardware.
CONCLUSION: THE POWER OF LOCAL AI
Agent007 demonstrates that sophisticated AI capabilities do not require cloud services or expensive subscriptions. By running entirely on your local machine, it provides privacy, independence, and customization that cloud-based solutions cannot match.
The modular architecture makes it easy to extend and customize. You can swap out different language models, use different embedding models, or add new features. The RAG system allows you to create domain-specific assistants that understand your particular documents and context.
The dual interface approach means you can use Agent007 in whatever way suits your workflow. The console interface is perfect for quick interactions and scripting. The web interface provides a rich visual experience with animations and comprehensive controls.
Most importantly, Agent007 is designed to be accessible. Even if you are not an AI expert, you can run and use this system. The automatic GPU detection and optimization mean you do not need to understand the intricacies of different hardware platforms. The template system makes it easy to switch between different use cases.
As language models continue to improve and become more efficient, systems like Agent007 will become increasingly powerful. The future of AI is not just in massive cloud data centers, but also in local, personal AI assistants that work for you, on your terms, with your data.
COMPLETE PRODUCTION-READY CODE
Below is the complete, production-ready implementation of Agent007. This code is fully functional and includes all necessary components. Save each section to its respective file and ensure all dependencies are installed.
FILE: agent007.py
import torch
import argparse
import sys
import os
from pathlib import Path
from typing import List, Optional, Tuple
from datetime import datetime
import json
import pickle
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
import uvicorn
class GPUDetector:
"""Detects and configures the optimal compute device for the system."""
def __init__(self):
self.device = None
self.device_name = None
self.device_type = None
def detect_device(self) -> Tuple[torch.device, str, str]:
"""
Detects the best available compute device.
Returns a tuple of (device, device_name, device_type).
"""
if torch.cuda.is_available():
self.device = torch.device("cuda")
self.device_name = torch.cuda.get_device_name(0)
self.device_type = "CUDA"
print(f"Detected NVIDIA GPU: {self.device_name}")
return self.device, self.device_name, self.device_type
if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
self.device = torch.device("mps")
self.device_name = "Apple Silicon GPU"
self.device_type = "MPS"
print(f"Detected Apple Metal GPU")
return self.device, self.device_name, self.device_type
try:
import intel_extension_for_pytorch as ipex
if ipex.xpu.is_available():
self.device = torch.device("xpu")
self.device_name = "Intel GPU"
self.device_type = "XPU"
print(f"Detected Intel GPU")
return self.device, self.device_name, self.device_type
except ImportError:
pass
self.device = torch.device("cpu")
self.device_name = "CPU"
self.device_type = "CPU"
print("No GPU detected, using CPU")
return self.device, self.device_name, self.device_type
def get_optimal_dtype(self) -> torch.dtype:
"""Returns the optimal data type for the detected device."""
if self.device_type == "CUDA":
return torch.float16
elif self.device_type == "MPS":
return torch.float16
elif self.device_type == "XPU":
return torch.bfloat16
else:
return torch.float32
class LLMManager:
"""Manages the loading and inference of the language model."""
def __init__(self, model_name: str, device: torch.device,
dtype: torch.dtype, use_quantization: bool = True):
"""
Initializes the LLM manager.
Args:
model_name: HuggingFace model identifier
device: PyTorch device to load the model on
dtype: Data type for model weights
use_quantization: Whether to use 8-bit quantization
"""
self.model_name = model_name
self.device = device
self.dtype = dtype
self.use_quantization = use_quantization
self.model = None
self.tokenizer = None
def load_model(self):
"""Loads the model and tokenizer from HuggingFace."""
print(f"Loading model {self.model_name}...")
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
trust_remote_code=True
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
if self.use_quantization and self.device.type == "cuda":
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0,
llm_int8_has_fp16_weight=False
)
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
quantization_config=quantization_config,
device_map="auto",
trust_remote_code=True
)
else:
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=self.dtype,
trust_remote_code=True
)
self.model.to(self.device)
self.model.eval()
print("Model loaded successfully")
def generate_response(self, prompt: str, max_new_tokens: int = 512,
temperature: float = 0.7, top_p: float = 0.9,
top_k: int = 50, repetition_penalty: float = 1.1) -> str:
"""
Generates a response to the given prompt.
Args:
prompt: The input text to respond to
max_new_tokens: Maximum number of tokens to generate
temperature: Controls randomness
top_p: Nucleus sampling parameter
top_k: Top-k sampling parameter
repetition_penalty: Penalty for repeating tokens
Returns:
Generated text response
"""
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
repetition_penalty=repetition_penalty,
do_sample=True,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
response = generated_text[len(prompt):].strip()
return response
class DocumentProcessor:
"""Handles loading and processing of documents for RAG."""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""
Initializes the document processor.
Args:
chunk_size: Target size for text chunks in characters
chunk_overlap: Number of overlapping characters between chunks
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
def load_document(self, file_path: str):
"""
Loads a document from the given file path.
Args:
file_path: Path to the document file
Returns:
List of document chunks
"""
file_extension = os.path.splitext(file_path)[1].lower()
try:
if file_extension == '.pdf':
loader = PyPDFLoader(file_path)
elif file_extension in ['.docx', '.doc']:
loader = Docx2txtLoader(file_path)
elif file_extension == '.txt':
loader = TextLoader(file_path)
else:
raise ValueError(f"Unsupported file format: {file_extension}")
documents = loader.load()
print(f"Loaded {len(documents)} pages from {file_path}")
return documents
except Exception as e:
print(f"Error loading document {file_path}: {str(e)}")
return []
def process_documents(self, file_paths: List[str]):
"""
Processes multiple documents into chunks.
Args:
file_paths: List of paths to document files
Returns:
List of text chunks with metadata
"""
all_documents = []
for file_path in file_paths:
documents = self.load_document(file_path)
all_documents.extend(documents)
chunks = self.text_splitter.split_documents(all_documents)
print(f"Created {len(chunks)} chunks from {len(file_paths)} documents")
return chunks
class VectorStore:
"""Manages embeddings and similarity search using FAISS."""
def __init__(self, embedding_model_name: str = "all-MiniLM-L6-v2",
device: str = "cpu"):
"""
Initializes the vector store.
Args:
embedding_model_name: Name of the sentence transformer model
device: Device to run the embedding model on
"""
self.embedding_model = SentenceTransformer(embedding_model_name)
self.embedding_model.to(device)
self.index = None
self.chunks = []
self.dimension = self.embedding_model.get_sentence_embedding_dimension()
def create_embeddings(self, chunks):
"""
Creates embeddings for the given text chunks.
Args:
chunks: List of document chunks
Returns:
Numpy array of embeddings
"""
texts = [chunk.page_content for chunk in chunks]
self.chunks = chunks
print(f"Creating embeddings for {len(texts)} chunks...")
embeddings = self.embedding_model.encode(
texts,
show_progress_bar=True,
convert_to_numpy=True
)
return embeddings
def build_index(self, embeddings):
"""
Builds a FAISS index from the embeddings.
Args:
embeddings: Numpy array of embeddings
"""
faiss.normalize_L2(embeddings)
self.index = faiss.IndexFlatIP(self.dimension)
self.index.add(embeddings)
print(f"Built FAISS index with {self.index.ntotal} vectors")
def search(self, query: str, k: int = 5):
"""
Searches for the most similar chunks to the query.
Args:
query: Query text
k: Number of results to return
Returns:
List of tuples (chunk, similarity_score)
"""
query_embedding = self.embedding_model.encode([query], convert_to_numpy=True)
faiss.normalize_L2(query_embedding)
scores, indices = self.index.search(query_embedding, k)
results = []
for idx, score in zip(indices[0], scores[0]):
if idx < len(self.chunks):
results.append((self.chunks[idx], float(score)))
return results
def save(self, path: str):
"""Saves the vector store to disk."""
os.makedirs(path, exist_ok=True)
faiss.write_index(self.index, f"{path}/faiss.index")
with open(f"{path}/chunks.pkl", "wb") as f:
pickle.dump(self.chunks, f)
print(f"Vector store saved to {path}")
def load(self, path: str):
"""Loads the vector store from disk."""
self.index = faiss.read_index(f"{path}/faiss.index")
with open(f"{path}/chunks.pkl", "rb") as f:
self.chunks = pickle.load(f)
print(f"Vector store loaded from {path}")
class RAGSystem:
"""Combines document processing, vector search, and LLM for RAG."""
def __init__(self, llm_manager: LLMManager, vector_store: VectorStore,
document_processor: DocumentProcessor):
"""
Initializes the RAG system.
Args:
llm_manager: LLMManager instance
vector_store: VectorStore instance
document_processor: DocumentProcessor instance
"""
self.llm_manager = llm_manager
self.vector_store = vector_store
self.document_processor = document_processor
self.is_rag_enabled = False
def index_documents(self, file_paths: List[str]):
"""
Indexes documents for RAG.
Args:
file_paths: List of document file paths
"""
chunks = self.document_processor.process_documents(file_paths)
if not chunks:
print("No chunks created from documents")
return
embeddings = self.vector_store.create_embeddings(chunks)
self.vector_store.build_index(embeddings)
self.is_rag_enabled = True
print("RAG system ready")
def generate_response(self, query: str, system_message: str = "",
num_context_chunks: int = 3, **generation_params) -> str:
"""
Generates a response using RAG if enabled, otherwise uses LLM directly.
Args:
query: User query
system_message: System message to set AI behavior
num_context_chunks: Number of document chunks to use as context
**generation_params: Additional parameters for text generation
Returns:
Generated response
"""
if self.is_rag_enabled:
results = self.vector_store.search(query, k=num_context_chunks)
context = "\n\n".join([chunk.page_content for chunk, score in results])
prompt = f"""{system_message}
Context information from documents:
{context}
User question: {query}
Answer based on the context provided above:"""
else:
prompt = f"{system_message}\n\nUser: {query}\n\nAssistant:"
response = self.llm_manager.generate_response(prompt, **generation_params)
return response
class ConversationManager:
"""Manages conversation history and persistence."""
def __init__(self, storage_dir: str = "conversations"):
"""
Initializes the conversation manager.
Args:
storage_dir: Directory to store conversation files
"""
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(exist_ok=True)
self.current_conversation = []
self.conversation_id = None
self.system_message = ""
def start_new_conversation(self, system_message: str = ""):
"""
Starts a new conversation.
Args:
system_message: System message defining AI behavior
"""
self.current_conversation = []
self.system_message = system_message
self.conversation_id = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"Started new conversation: {self.conversation_id}")
def add_message(self, role: str, content: str):
"""
Adds a message to the current conversation.
Args:
role: Either 'user' or 'assistant'
content: Message content
"""
message = {
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
}
self.current_conversation.append(message)
def get_conversation_history(self, max_messages: Optional[int] = None):
"""
Returns the conversation history.
Args:
max_messages: Maximum number of recent messages to return
Returns:
List of messages
"""
if max_messages is None:
return self.current_conversation
else:
return self.current_conversation[-max_messages:]
def format_conversation_for_llm(self, max_messages: int = 10) -> str:
"""
Formats the conversation history for the LLM.
Args:
max_messages: Maximum number of recent messages to include
Returns:
Formatted prompt string
"""
messages = self.get_conversation_history(max_messages)
prompt_parts = []
if self.system_message:
prompt_parts.append(f"System: {self.system_message}\n")
for msg in messages:
role = msg["role"].capitalize()
content = msg["content"]
prompt_parts.append(f"{role}: {content}\n")
prompt_parts.append("Assistant:")
return "\n".join(prompt_parts)
def save_conversation(self):
"""Saves the current conversation to disk."""
if not self.conversation_id:
print("No active conversation to save")
return
conversation_data = {
"id": self.conversation_id,
"system_message": self.system_message,
"messages": self.current_conversation,
"created_at": self.current_conversation[0]["timestamp"] if self.current_conversation else datetime.now().isoformat()
}
file_path = self.storage_dir / f"{self.conversation_id}.json"
with open(file_path, "w") as f:
json.dump(conversation_data, f, indent=2)
print(f"Conversation saved to {file_path}")
def load_conversation(self, conversation_id: str) -> bool:
"""
Loads a conversation from disk.
Args:
conversation_id: ID of the conversation to load
Returns:
True if successful, False otherwise
"""
file_path = self.storage_dir / f"{conversation_id}.json"
if not file_path.exists():
print(f"Conversation {conversation_id} not found")
return False
with open(file_path, "r") as f:
conversation_data = json.load(f)
self.conversation_id = conversation_data["id"]
self.system_message = conversation_data["system_message"]
self.current_conversation = conversation_data["messages"]
print(f"Loaded conversation {conversation_id}")
return True
def list_conversations(self):
"""
Lists all saved conversations.
Returns:
List of conversation metadata
"""
conversations = []
for file_path in self.storage_dir.glob("*.json"):
with open(file_path, "r") as f:
data = json.load(f)
conversations.append({
"id": data["id"],
"created_at": data["created_at"],
"message_count": len(data["messages"])
})
return sorted(conversations, key=lambda x: x["created_at"], reverse=True)
def search_conversations(self, search_term: str):
"""
Searches conversations for a term.
Args:
search_term: Term to search for
Returns:
List of matching conversations with context
"""
results = []
for file_path in self.storage_dir.glob("*.json"):
with open(file_path, "r") as f:
data = json.load(f)
for msg in data["messages"]:
if search_term.lower() in msg["content"].lower():
results.append({
"conversation_id": data["id"],
"message": msg,
"created_at": data["created_at"]
})
return results
class TemplateManager:
"""Manages system message and prompt templates."""
def __init__(self, template_dir: str = "templates"):
"""
Initializes the template manager.
Args:
template_dir: Directory to store template files
"""
self.template_dir = Path(template_dir)
self.template_dir.mkdir(exist_ok=True)
self.system_template_dir = self.template_dir / "system"
self.prompt_template_dir = self.template_dir / "prompts"
self.system_template_dir.mkdir(exist_ok=True)
self.prompt_template_dir.mkdir(exist_ok=True)
def save_system_template(self, name: str, content: str, description: str = ""):
"""
Saves a system message template.
Args:
name: Template name
content: System message content
description: Optional description of the template
"""
template_data = {
"name": name,
"content": content,
"description": description,
"created_at": datetime.now().isoformat()
}
file_path = self.system_template_dir / f"{name}.json"
with open(file_path, "w") as f:
json.dump(template_data, f, indent=2)
print(f"System template '{name}' saved")
def load_system_template(self, name: str) -> Optional[str]:
"""
Loads a system message template.
Args:
name: Template name
Returns:
Template content or None if not found
"""
file_path = self.system_template_dir / f"{name}.json"
if not file_path.exists():
print(f"System template '{name}' not found")
return None
with open(file_path, "r") as f:
template_data = json.load(f)
return template_data["content"]
def list_system_templates(self):
"""
Lists all system message templates.
Returns:
List of template metadata
"""
templates = []
for file_path in self.system_template_dir.glob("*.json"):
with open(file_path, "r") as f:
data = json.load(f)
templates.append({
"name": data["name"],
"description": data["description"],
"created_at": data["created_at"]
})
return templates
def save_prompt_template(self, name: str, content: str,
variables: Optional[List[str]] = None,
description: str = ""):
"""
Saves a prompt template with optional variables.
Args:
name: Template name
content: Prompt content with {variable} placeholders
variables: List of variable names
description: Optional description
"""
template_data = {
"name": name,
"content": content,
"variables": variables or [],
"description": description,
"created_at": datetime.now().isoformat()
}
file_path = self.prompt_template_dir / f"{name}.json"
with open(file_path, "w") as f:
json.dump(template_data, f, indent=2)
print(f"Prompt template '{name}' saved")
def load_prompt_template(self, name: str, **kwargs) -> Optional[str]:
"""
Loads and formats a prompt template.
Args:
name: Template name
**kwargs: Variable values to substitute
Returns:
Formatted prompt or None if not found
"""
file_path = self.prompt_template_dir / f"{name}.json"
if not file_path.exists():
print(f"Prompt template '{name}' not found")
return None
with open(file_path, "r") as f:
template_data = json.load(f)
content = template_data["content"]
try:
formatted_content = content.format(**kwargs)
return formatted_content
except KeyError as e:
print(f"Missing variable for template: {e}")
return None
class ConsoleInterface:
"""Console-based interface for Agent007."""
def __init__(self, rag_system: RAGSystem, conversation_manager: ConversationManager,
template_manager: TemplateManager):
"""
Initializes the console interface.
Args:
rag_system: RAGSystem instance
conversation_manager: ConversationManager instance
template_manager: TemplateManager instance
"""
self.rag_system = rag_system
self.conversation_manager = conversation_manager
self.template_manager = template_manager
self.running = True
def print_welcome(self):
"""Prints welcome message."""
print("=" * 70)
print(" AGENT007")
print(" Your Local AI Assistant with RAG Support")
print("=" * 70)
print()
print("Commands:")
print(" /new - Start a new conversation")
print(" /load - Load a saved conversation")
print(" /save - Save current conversation")
print(" /index - Index documents for RAG")
print(" /system - Set system message")
print(" /template - Load a system template")
print(" /history - Show conversation history")
print(" /search - Search past conversations")
print(" /quit - Exit Agent007")
print()
def handle_command(self, command: str) -> bool:
"""
Handles special commands.
Args:
command: Command string starting with /
Returns:
True if command was handled, False otherwise
"""
if command == "/quit":
self.running = False
print("Goodbye!")
return True
elif command == "/new":
system_msg = input("Enter system message (or press Enter for default): ")
self.conversation_manager.start_new_conversation(system_msg)
return True
elif command == "/save":
self.conversation_manager.save_conversation()
return True
elif command == "/load":
conversations = self.conversation_manager.list_conversations()
if not conversations:
print("No saved conversations found")
return True
print("\nSaved conversations:")
for i, conv in enumerate(conversations):
print(f"{i+1}. {conv['id']} - {conv['message_count']} messages")
choice = input("Enter number to load: ")
try:
idx = int(choice) - 1
if 0 <= idx < len(conversations):
self.conversation_manager.load_conversation(conversations[idx]['id'])
except ValueError:
print("Invalid choice")
return True
elif command == "/index":
doc_path = input("Enter document directory or file path: ")
if os.path.isdir(doc_path):
files = []
for ext in ['.pdf', '.txt', '.docx']:
files.extend(Path(doc_path).glob(f"**/*{ext}"))
file_paths = [str(f) for f in files]
else:
file_paths = [doc_path]
if file_paths:
self.rag_system.index_documents(file_paths)
else:
print("No documents found")
return True
elif command == "/system":
system_msg = input("Enter new system message: ")
self.conversation_manager.system_message = system_msg
print("System message updated")
return True
elif command == "/template":
templates = self.template_manager.list_system_templates()
if not templates:
print("No templates found")
return True
print("\nAvailable templates:")
for i, tmpl in enumerate(templates):
print(f"{i+1}. {tmpl['name']} - {tmpl['description']}")
choice = input("Enter number to load: ")
try:
idx = int(choice) - 1
if 0 <= idx < len(templates):
content = self.template_manager.load_system_template(templates[idx]['name'])
if content:
self.conversation_manager.system_message = content
print(f"Loaded template: {templates[idx]['name']}")
except ValueError:
print("Invalid choice")
return True
elif command == "/history":
history = self.conversation_manager.get_conversation_history()
print("\nConversation History:")
print("-" * 70)
for msg in history:
print(f"{msg['role'].upper()}: {msg['content']}")
print("-" * 70)
return True
elif command == "/search":
term = input("Enter search term: ")
results = self.conversation_manager.search_conversations(term)
if not results:
print("No results found")
else:
print(f"\nFound {len(results)} results:")
for result in results[:10]:
print(f"\nConversation: {result['conversation_id']}")
print(f"{result['message']['role'].upper()}: {result['message']['content'][:100]}...")
return True
return False
def run(self):
"""Runs the console interface main loop."""
self.print_welcome()
self.conversation_manager.start_new_conversation()
while self.running:
try:
user_input = input("\nYou: ").strip()
if not user_input:
continue
if user_input.startswith("/"):
self.handle_command(user_input)
continue
self.conversation_manager.add_message("user", user_input)
prompt = self.conversation_manager.format_conversation_for_llm()
print("\nAgent007: ", end="", flush=True)
response = self.rag_system.llm_manager.generate_response(prompt)
print(response)
self.conversation_manager.add_message("assistant", response)
except KeyboardInterrupt:
print("\n\nInterrupted. Use /quit to exit.")
except Exception as e:
print(f"\nError: {str(e)}")
class ChatRequest(BaseModel):
"""Request model for chat endpoint."""
message: str
conversation_id: Optional[str] = None
system_message: Optional[str] = ""
max_tokens: int = 512
temperature: float = 0.7
top_p: float = 0.9
class ChatResponse(BaseModel):
"""Response model for chat endpoint."""
response: str
conversation_id: str
class WebAPI:
"""FastAPI-based web interface for Agent007."""
def __init__(self, rag_system: RAGSystem, conversation_manager: ConversationManager,
template_manager: TemplateManager):
"""
Initializes the web API.
Args:
rag_system: RAGSystem instance
conversation_manager: ConversationManager instance
template_manager: TemplateManager instance
"""
self.app = FastAPI(title="Agent007 API")
self.rag_system = rag_system
self.conversation_manager = conversation_manager
self.template_manager = template_manager
self.setup_routes()
self.create_static_files()
def create_static_files(self):
"""Creates the static directory and index.html."""
static_dir = Path("static")
static_dir.mkdir(exist_ok=True)
html_content = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Agent007 - AI Assistant</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #1e3c72 0%, #2a5298 100%);
color: #ffffff;
min-height: 100vh;
transition: background 0.5s ease;
}
.container {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
.header {
text-align: center;
padding: 30px 0;
animation: fadeInDown 0.8s ease;
}
.header h1 {
font-size: 3em;
text-shadow: 2px 2px 4px rgba(0,0,0,0.3);
animation: glow 2s ease-in-out infinite alternate;
}
@keyframes glow {
from {
text-shadow: 0 0 10px #fff, 0 0 20px #fff, 0 0 30px #00d4ff;
}
to {
text-shadow: 0 0 20px #fff, 0 0 30px #00d4ff, 0 0 40px #00d4ff;
}
}
@keyframes fadeInDown {
from {
opacity: 0;
transform: translateY(-20px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
.chat-container {
background: rgba(255, 255, 255, 0.1);
backdrop-filter: blur(10px);
border-radius: 15px;
padding: 20px;
margin: 20px 0;
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3);
animation: fadeIn 1s ease;
}
@keyframes fadeIn {
from { opacity: 0; }
to { opacity: 1; }
}
.messages {
height: 500px;
overflow-y: auto;
padding: 15px;
margin-bottom: 20px;
}
.message {
margin: 15px 0;
padding: 12px 18px;
border-radius: 12px;
animation: slideIn 0.3s ease;
max-width: 80%;
}
@keyframes slideIn {
from {
opacity: 0;
transform: translateX(-20px);
}
to {
opacity: 1;
transform: translateX(0);
}
}
.message.user {
background: rgba(0, 123, 255, 0.7);
margin-left: auto;
text-align: right;
}
.message.assistant {
background: rgba(40, 167, 69, 0.7);
}
.input-area {
display: flex;
gap: 10px;
}
.input-area input {
flex: 1;
padding: 15px;
border: none;
border-radius: 25px;
background: rgba(255, 255, 255, 0.2);
color: #ffffff;
font-size: 16px;
transition: all 0.3s ease;
}
.input-area input:focus {
outline: none;
background: rgba(255, 255, 255, 0.3);
box-shadow: 0 0 15px rgba(0, 212, 255, 0.5);
}
.input-area button {
padding: 15px 30px;
border: none;
border-radius: 25px;
background: linear-gradient(135deg, #00d4ff 0%, #0099cc 100%);
color: #ffffff;
font-size: 16px;
cursor: pointer;
transition: all 0.3s ease;
}
.input-area button:hover {
transform: scale(1.05);
box-shadow: 0 5px 15px rgba(0, 212, 255, 0.4);
}
.controls {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 15px;
margin: 20px 0;
}
.control-group {
background: rgba(255, 255, 255, 0.1);
padding: 15px;
border-radius: 10px;
}
.control-group label {
display: block;
margin-bottom: 8px;
font-weight: bold;
}
.control-group input[type="range"] {
width: 100%;
}
.sidebar {
position: fixed;
right: -300px;
top: 0;
width: 300px;
height: 100vh;
background: rgba(0, 0, 0, 0.9);
padding: 20px;
transition: right 0.3s ease;
overflow-y: auto;
z-index: 999;
}
.sidebar.open {
right: 0;
}
.sidebar-toggle {
position: fixed;
right: 20px;
top: 20px;
padding: 10px 20px;
background: rgba(0, 212, 255, 0.8);
border: none;
border-radius: 5px;
color: white;
cursor: pointer;
z-index: 1000;
}
.conversation-item {
padding: 10px;
margin: 5px 0;
background: rgba(255, 255, 255, 0.1);
border-radius: 5px;
cursor: pointer;
transition: background 0.3s ease;
}
.conversation-item:hover {
background: rgba(255, 255, 255, 0.2);
}
</style>
</head>
<body>
<button class="sidebar-toggle" onclick="toggleSidebar()">History</button>
<div class="sidebar" id="sidebar">
<h2>Conversation History</h2>
<div id="conversation-list"></div>
</div>
<div class="container">
<div class="header">
<h1>AGENT007</h1>
<p>Your Local AI Assistant with RAG Support</p>
</div>
<div class="controls">
<div class="control-group">
<label>Temperature: <span id="temp-value">0.7</span></label>
<input type="range" id="temperature" min="0" max="2" step="0.1" value="0.7"
oninput="updateValue('temp-value', this.value)">
</div>
<div class="control-group">
<label>Max Tokens: <span id="tokens-value">512</span></label>
<input type="range" id="max-tokens" min="50" max="2048" step="50" value="512"
oninput="updateValue('tokens-value', this.value)">
</div>
<div class="control-group">
<label>Top P: <span id="topp-value">0.9</span></label>
<input type="range" id="top-p" min="0" max="1" step="0.05" value="0.9"
oninput="updateValue('topp-value', this.value)">
</div>
</div>
<div class="chat-container">
<div class="messages" id="messages"></div>
<div class="input-area">
<input type="text" id="user-input" placeholder="Type your message..."
onkeypress="handleKeyPress(event)">
<button onclick="sendMessage()">Send</button>
</div>
</div>
</div>
<script>
let currentConversationId = null;
function updateValue(elementId, value) {
document.getElementById(elementId).textContent = value;
}
function toggleSidebar() {
document.getElementById('sidebar').classList.toggle('open');
loadConversationList();
}
async function loadConversationList() {
try {
const response = await fetch('/api/conversations');
const data = await response.json();
const listElement = document.getElementById('conversation-list');
listElement.innerHTML = '';
data.conversations.forEach(conv => {
const item = document.createElement('div');
item.className = 'conversation-item';
item.textContent = conv.id + ' (' + conv.message_count + ' messages)';
item.onclick = () => loadConversation(conv.id);
listElement.appendChild(item);
});
} catch (error) {
console.error('Error loading conversations:', error);
}
}
async function loadConversation(conversationId) {
try {
const response = await fetch('/api/conversation/' + conversationId);
const data = await response.json();
currentConversationId = conversationId;
const messagesDiv = document.getElementById('messages');
messagesDiv.innerHTML = '';
data.messages.forEach(msg => {
addMessageToUI(msg.role, msg.content);
});
toggleSidebar();
} catch (error) {
console.error('Error loading conversation:', error);
}
}
function addMessageToUI(role, content) {
const messagesDiv = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = 'message ' + role;
messageDiv.textContent = content;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
async function sendMessage() {
const input = document.getElementById('user-input');
const message = input.value.trim();
if (!message) return;
addMessageToUI('user', message);
input.value = '';
const temperature = parseFloat(document.getElementById('temperature').value);
const maxTokens = parseInt(document.getElementById('max-tokens').value);
const topP = parseFloat(document.getElementById('top-p').value);
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
message: message,
conversation_id: currentConversationId,
temperature: temperature,
max_tokens: maxTokens,
top_p: topP
})
});
const data = await response.json();
currentConversationId = data.conversation_id;
addMessageToUI('assistant', data.response);
} catch (error) {
console.error('Error sending message:', error);
addMessageToUI('assistant', 'Error: Could not get response');
}
}
function handleKeyPress(event) {
if (event.key === 'Enter') {
sendMessage();
}
}
</script>
</body>
</html>"""
with open(static_dir / "index.html", "w") as f:
f.write(html_content)
def setup_routes(self):
"""Sets up API routes."""
@self.app.post("/api/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""Handles chat requests."""
try:
if request.conversation_id:
self.conversation_manager.load_conversation(request.conversation_id)
else:
self.conversation_manager.start_new_conversation(request.system_message)
self.conversation_manager.add_message("user", request.message)
prompt = self.conversation_manager.format_conversation_for_llm()
response = self.rag_system.llm_manager.generate_response(
prompt,
max_new_tokens=request.max_tokens,
temperature=request.temperature,
top_p=request.top_p
)
self.conversation_manager.add_message("assistant", response)
self.conversation_manager.save_conversation()
return ChatResponse(
response=response,
conversation_id=self.conversation_manager.conversation_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/api/conversations")
async def list_conversations():
"""Lists all saved conversations."""
try:
conversations = self.conversation_manager.list_conversations()
return {"conversations": conversations}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/api/conversation/{conversation_id}")
async def get_conversation(conversation_id: str):
"""Retrieves a specific conversation."""
try:
if self.conversation_manager.load_conversation(conversation_id):
return {
"conversation_id": conversation_id,
"messages": self.conversation_manager.current_conversation,
"system_message": self.conversation_manager.system_message
}
else:
raise HTTPException(status_code=404, detail="Conversation not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.post("/api/index")
async def index_documents(files: List[UploadFile] = File(...)):
"""Indexes uploaded documents for RAG."""
try:
temp_dir = Path("temp_uploads")
temp_dir.mkdir(exist_ok=True)
file_paths = []
for file in files:
file_path = temp_dir / file.filename
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
file_paths.append(str(file_path))
self.rag_system.index_documents(file_paths)
for file_path in file_paths:
Path(file_path).unlink()
return {"status": "success", "indexed_files": len(file_paths)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/api/templates/system")
async def list_system_templates():
"""Lists system message templates."""
try:
templates = self.template_manager.list_system_templates()
return {"templates": templates}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/api/template/system/{name}")
async def get_system_template(name: str):
"""Retrieves a system template."""
try:
content = self.template_manager.load_system_template(name)
if content:
return {"name": name, "content": content}
else:
raise HTTPException(status_code=404, detail="Template not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.post("/api/search")
async def search_conversations(query: dict):
"""Searches conversations."""
try:
search_term = query.get("term", "")
results = self.conversation_manager.search_conversations(search_term)
return {"results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
self.app.mount("/", StaticFiles(directory="static", html=True), name="static")
def run(self, host: str = "0.0.0.0", port: int = 8000):
"""Runs the web server."""
uvicorn.run(self.app, host=host, port=port)
class Agent007:
"""Main application class for Agent007."""
def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.2",
use_quantization: bool = True):
"""
Initializes Agent007.
Args:
model_name: HuggingFace model to use
use_quantization: Whether to use quantization
"""
print("Initializing Agent007...")
self.gpu_detector = GPUDetector()
device, device_name, device_type = self.gpu_detector.detect_device()
dtype = self.gpu_detector.get_optimal_dtype()
self.llm_manager = LLMManager(
model_name=model_name,
device=device,
dtype=dtype,
use_quantization=use_quantization
)
self.llm_manager.load_model()
self.vector_store = VectorStore(device=device)
self.document_processor = DocumentProcessor()
self.rag_system = RAGSystem(
llm_manager=self.llm_manager,
vector_store=self.vector_store,
document_processor=self.document_processor
)
self.conversation_manager = ConversationManager()
self.template_manager = TemplateManager()
self.create_default_templates()
print("Agent007 initialized successfully!")
def create_default_templates(self):
"""Creates some default system message templates."""
default_templates = {
"helpful_assistant": {
"content": "You are a helpful AI assistant. Provide clear, accurate, and concise responses.",
"description": "General helpful assistant"
},
"technical_expert": {
"content": "You are a technical expert. Provide detailed technical explanations with examples.",
"description": "Technical expert persona"
},
"creative_writer": {
"content": "You are a creative writer. Provide imaginative and engaging responses.",
"description": "Creative writing assistant"
}
}
for name, data in default_templates.items():
try:
self.template_manager.save_system_template(
name=name,
content=data["content"],
description=data["description"]
)
except:
pass
def run_console(self):
"""Runs the console interface."""
console = ConsoleInterface(
rag_system=self.rag_system,
conversation_manager=self.conversation_manager,
template_manager=self.template_manager
)
console.run()
def run_web(self, host: str = "0.0.0.0", port: int = 8000):
"""Runs the web interface."""
web_api = WebAPI(
rag_system=self.rag_system,
conversation_manager=self.conversation_manager,
template_manager=self.template_manager
)
print(f"Starting web server at http://{host}:{port}")
web_api.run(host=host, port=port)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Agent007 - Local AI Assistant with RAG")
parser.add_argument("--mode", choices=["console", "web"], default="console",
help="Interface mode")
parser.add_argument("--model", type=str, default="mistralai/Mistral-7B-Instruct-v0.2",
help="HuggingFace model name")
parser.add_argument("--no-quantization", action="store_true",
help="Disable quantization")
parser.add_argument("--host", type=str, default="0.0.0.0",
help="Web server host")
parser.add_argument("--port", type=int, default=8000,
help="Web server port")
args = parser.parse_args()
try:
agent = Agent007(
model_name=args.model,
use_quantization=not args.no_quantization
)
if args.mode == "console":
agent.run_console()
else:
agent.run_web(host=args.host, port=args.port)
except KeyboardInterrupt:
print("\nShutting down Agent007...")
sys.exit(0)
except Exception as e:
print(f"Error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()