OVERVIEW
This comprehensive guide demonstrates how to build an autonomous Research Assistant Agent capable of planning research tasks, executing them using various tools, and providing comprehensive analysis. The agent uses a local Mistral LLM and demonstrates key agentic AI capabilities including autonomous planning, tool orchestration, and self-reflection.
Parts contained in this Document:
- Description of Sample Use Case, its Architecture, and its Implementation
- User‘s Guide
- Setup Guide
EXAMPLE USE CASE: RESEARCH ASSISTANT AGENT
The Research Assistant Agent we'll build can:
- Accept high-level research requests like "Analyze renewable energy adoption"
- Plan a research strategy autonomously without human intervention
- Use multiple tools to gather data (web search, PDF analysis, visualization)
- Synthesize findings into comprehensive reports
- Iterate and refine research based on initial findings
- Learn from past research sessions to improve future performance
ARCHITECTURE OVERVIEW
COMPREHENSIVE ARCHITECTURE EXPLANATION - AGENTIC AI RESEARCH ASSISTANT SYSTEM
INTRODUCTION: WHAT MAKES THIS SYSTEM "AGENTIC"
The Research Assistant system represents a sophisticated implementation of agentic artificial intelligence, which is a paradigm where AI systems operate with genuine autonomy, making independent decisions, adapting to circumstances, and pursuing goals without constant human intervention. Unlike traditional AI systems that follow predetermined scripts or respond reactively to inputs, this architecture embodies the core principles of agency through several fundamental capabilities.
The system demonstrates autonomous goal pursuit by receiving high-level objectives and independently determining how to achieve them, breaking complex goals into manageable tasks and executing them strategically. This capability allows users to specify broad research intentions without needing to prescribe specific methodologies or detailed execution steps. Adaptive planning represents another core agentic capability where the system continuously evaluates its progress and adjusts its approach based on intermediate findings and changing circumstances, rather than following rigid procedures. When new information suggests better research directions or when initial approaches prove less fruitful than expected, the agent autonomously modifies its strategy to optimize outcomes.
Tool orchestration demonstrates the system's ability to intelligently select and coordinate multiple tools and data sources, understanding when and how to use each capability to maximum effect. The agent doesn't simply execute tools in predetermined sequences but makes contextual decisions about which capabilities to employ based on the specific requirements of each research goal. Self-reflection represents perhaps the most sophisticated agentic capability, where the agent analyzes its own performance, learns from experience, and continuously improves its strategies and methodologies. This metacognitive ability enables the system to become more effective over time without external programming or manual optimization.
Contextual memory provides the agent with both short-term working memory for current tasks and long-term semantic memory that allows it to build upon previous research and avoid redundant work. This memory architecture enables the system to maintain coherence across extended research sessions while accumulating knowledge that improves future performance. This architecture achieves agency through a carefully orchestrated interplay of specialized subsystems, each contributing essential capabilities while maintaining clear separation of concerns and robust error handling. The resulting system exhibits emergent intelligent behavior that arises from the interaction of these subsystems rather than being explicitly programmed.
ARCHITECTURAL PHILOSOPHY AND DESIGN PRINCIPLES
The system architecture is founded on several key design principles that enable both sophisticated autonomous behavior and practical production deployment. These principles guide every aspect of the system design and implementation, ensuring that the architecture remains maintainable, extensible, and reliable while supporting complex autonomous behaviors.
LAYERED ABSTRACTION ARCHITECTURE
The system employs a multi-layered architecture where each layer provides specific abstractions and services to the layers above it. This design enables complexity management while maintaining clear interfaces between different system concerns.
- The presentation layer handles REST API endpoints and user interfaces, providing clean abstractions for external interaction with the system.
- The application layer manages main agent orchestration and session management, coordinating overall system behavior and maintaining state across research sessions.
- The domain layer implements core agentic capabilities including planning, execution, and reflection, encapsulating the intelligent behaviors that define the system's autonomous nature.
- The infrastructure layer provides tools, memory systems, and LLM integration, offering the fundamental capabilities that enable intelligent behavior.
- Finally, the hardware layer manages local compute resources, storage, and networking, abstracting the physical infrastructure from higher-level concerns.
AUTONOMOUS CONTROL LOOPS
The architecture implements multiple autonomous control loops that operate at different time scales and abstraction levels, creating a hierarchy of autonomous behaviors that work together to achieve sophisticated goals.
- The planning loop handles strategic goal decomposition and task sequencing, operating at the highest level to transform abstract objectives into concrete action plans.
- The execution loop manages individual task execution with real-time adaptation, making tactical decisions about tool usage and parameter optimization during task execution.
- The reflection loop conducts performance analysis and methodology improvement, operating at a longer timescale to identify patterns and optimize future behavior.
- The monitoring loop maintains system health and resource management, ensuring that the system operates within acceptable performance and resource boundaries.
Each loop operates independently while sharing information through well-defined interfaces, creating emergent intelligent behavior through their interaction. This design enables the system to simultaneously pursue long-term strategic goals while adapting to immediate tactical challenges and continuously improving its overall capabilities.
MICROSERVICES-INSPIRED MODULARITY
While deployed as a single application for simplicity, the architecture follows microservices principles with loosely coupled, highly cohesive components. Each major subsystem could theoretically be deployed separately, enabling future scaling and evolution of the system architecture.
- The Tool Registry functions as a service discovery mechanism, enabling dynamic registration and discovery of available capabilities.
- The Memory System operates as a knowledge management service, providing sophisticated storage and retrieval of research findings and contextual information.
- The Planning System serves as a strategic reasoning service, transforming high-level goals into executable plans through sophisticated reasoning processes.
- The Reflection System acts as a performance optimization service, analyzing system behavior and generating improvement recommendations.
This modular design ensures that individual components can be enhanced, replaced, or scaled independently while maintaining overall system coherence. The clean interfaces between components also facilitate testing, debugging, and maintenance of individual subsystems without affecting the entire system.
EVENT-DRIVEN COMMUNICATION
Components communicate through an event-driven pattern where state changes trigger appropriate responses throughout the system. This design enables loose coupling between components while ensuring that relevant information flows efficiently throughout the system. When tasks complete, events notify relevant subsystems about new information, completed work, or changed circumstances. When plans are updated, events inform execution components about new priorities or modified strategies. This approach makes the system highly extensible and maintainable while supporting the complex coordination required for autonomous behavior.
CORE ARCHITECTURAL COMPONENTS
THE RESEARCH AGENT CORE: ORCHESTRATING INTELLIGENCE
At the heart of the system lies the ResearchAgent class, which serves as the primary orchestrator of all agentic behavior. This component embodies the "executive function" of artificial intelligence, making high-level decisions about resource allocation, task prioritization, and strategic direction. The Agent Core represents the central coordination point where all autonomous behaviors converge and where the system's overall intelligence emerges from the interaction of specialized subsystems.
The Agent Core maintains the complete state of ongoing research through the AgentState data structure, which provides a comprehensive view of current system status and context. The current goal represents the high-level research objective driving all activities, serving as the primary organizing principle for all system behavior. The task queue contains a dynamically managed list of planned tasks awaiting execution, with priorities and dependencies that guide execution order. The completed tasks section maintains a historical record of accomplished work with results, enabling the system to build upon previous accomplishments and avoid redundant effort. Working memory stores immediate context and intermediate findings that influence ongoing decisions and maintain coherence across related tasks. Session context preserves environmental information and constraints that shape how the agent approaches its current research objectives.
The core implements a sophisticated execution loop that demonstrates true autonomous behavior through several key phases of operation. During goal analysis, upon receiving a research objective, the agent doesn't simply execute predefined procedures but instead analyzes the goal's complexity, identifies knowledge gaps, and determines the most effective research strategy based on available resources and contextual factors. The dynamic planning phase sees the agent creating initial plans but continuously adapting them based on discoveries, such as when initial web searches reveal unexpected information sources that prompt the agent to autonomously adjust its strategy to pursue more promising directions.
Resource management demonstrates the agent's ability to intelligently manage computational resources, balancing thoroughness with efficiency by tracking token usage in the LLM, monitoring tool execution times, and adjusting its approach to stay within operational constraints. Throughout execution, quality assessment enables the agent to evaluate the quality and relevance of information it discovers, making autonomous decisions about when to continue pursuing a particular line of inquiry versus when to pivot to alternative approaches based on the value and reliability of discovered information.
THE PLANNING SYSTEM: STRATEGIC INTELLIGENCE
The PlanningSystem represents the "strategic reasoning" capability of the agent, transforming abstract research goals into concrete, executable action plans. This component demonstrates sophisticated reasoning capabilities that go far beyond simple task decomposition, incorporating contextual awareness, resource optimization, and adaptive strategy formulation into its planning processes.
The planning system operates through a multi-level planning architecture that addresses different aspects of research strategy simultaneously. At the strategic level, the system determines overall research approach and methodology selection, making high-level decisions about how to approach complex research objectives based on goal characteristics, available resources, and historical performance data. The tactical level focuses on specific tool selection and parameter optimization, choosing which capabilities to employ and how to configure them for maximum effectiveness in the current context. At the operational level, the system handles detailed task sequencing and dependency management, ensuring that individual tasks execute in logical order while maximizing parallel execution opportunities.
Rather than generating generic plans, the system creates contextually appropriate strategies that adapt to available capabilities and environmental constraints. A research goal about "renewable energy adoption" will generate different plans depending on whether the agent has access to academic databases, government statistics, or only web search capabilities, demonstrating the system's ability to adapt its approach based on available resources and constraints.
The planning system implements sophisticated adaptive replanning mechanisms that continuously monitor execution progress and trigger replanning when circumstances warrant modification of the original strategy. Replanning may occur when initial searches reveal better data sources than anticipated, requiring strategy adjustment to take advantage of superior information sources. Discovered information might suggest more promising research directions that weren't apparent during initial planning, prompting strategic pivots to pursue more valuable lines of inquiry. Resource constraints may require strategy optimization to maintain progress within available computational or time limitations. Quality assessment might indicate gaps in the current approach that necessitate additional tasks or alternative methodologies to ensure comprehensive coverage of the research objective.
The planner demonstrates dependency-aware scheduling by understanding task dependencies and optimally sequencing work to minimize waiting time while ensuring logical progression of research activities. For example, the system ensures that data gathering tasks complete before analysis tasks that depend on their results, while also identifying opportunities for parallel execution of independent tasks to maximize overall efficiency.
THE TOOL ORCHESTRATION SYSTEM: CAPABILITY INTEGRATION
The ToolRegistry and associated tool implementations provide the agent with concrete capabilities for interacting with the external world. This subsystem demonstrates sophisticated capability composition and resource management, enabling the agent to leverage diverse tools and data sources in coordinated fashion to achieve research objectives that would be impossible with any single capability.
The system implements dynamic tool selection processes where the agent doesn't simply use tools in predetermined sequences but instead intelligently selects tools based on multiple contextual factors. The specific requirements of each research goal influence tool selection, with different objectives requiring different combinations of search, analysis, and visualization capabilities. The current context and available information shape tool choices, as the agent considers what information it already possesses and what additional data would be most valuable. Historical performance data for different tool combinations informs selection decisions, enabling the agent to learn which tool sequences tend to produce the best results for different types of research objectives. Resource constraints and efficiency considerations also influence tool selection, ensuring that the agent chooses approaches that balance thoroughness with practical limitations.
For each tool invocation, the agent demonstrates intelligent parameter optimization based on the specific context and objectives of the current task. Web search queries are crafted to maximize relevant results by incorporating context from previous research findings and optimizing search terms for the specific information being sought. Visualization parameters are selected to best represent the data characteristics and research objectives, choosing chart types, scales, and formatting that most effectively communicate the insights being discovered. Document analysis focuses on the most relevant sections based on research goals and current context, optimizing processing time while ensuring comprehensive coverage of pertinent information.
The tool system implements sophisticated error recovery mechanisms that enable resilient operation even when individual tools fail or produce suboptimal results. When tools fail, the agent automatically retries with adjusted parameters, attempting to resolve issues through parameter modification or alternative configuration options. If direct retry approaches fail, the system falls back to alternative tools that can accomplish similar objectives, maintaining research progress even when preferred tools are unavailable. The agent adjusts its overall strategy to work around unavailable capabilities, modifying research approaches to achieve objectives through alternative means when specific tools cannot be utilized. Most importantly, the system learns from failures to avoid similar issues in future sessions, incorporating failure experiences into its decision-making processes to improve future tool selection and configuration.
The system continuously monitors tool performance and optimizes usage patterns through systematic performance optimization processes. Execution times are tracked across different tools and configurations, enabling the agent to choose approaches that balance thoroughness with efficiency. Success rates are monitored for different tool combinations and contexts, allowing the system to identify which approaches are most reliable for specific types of research tasks. Result quality metrics are collected and analyzed to ensure that efficiency optimizations don't compromise the effectiveness of research outcomes, maintaining a balance between speed and quality that serves user objectives.
THE MEMORY AND CONTEXT SYSTEM: COGNITIVE ARCHITECTURE
The memory system provides the agent with sophisticated cognitive capabilities that enable learning, context maintenance, and knowledge accumulation over time. This cognitive architecture represents one of the most advanced aspects of the system, implementing memory patterns inspired by human cognition while leveraging modern computational capabilities to achieve sophisticated information storage and retrieval.
The system implements a dual memory architecture inspired by human cognition that addresses both immediate operational needs and long-term learning requirements. Working Memory, managed by the ContextManager, maintains immediate context and intermediate results during active research sessions, enabling coherent reasoning across multiple tasks within a single research effort and ensuring that discoveries in one task inform decision-making in subsequent tasks. Long-Term Memory, implemented through the MemorySystem, stores semantic knowledge from past research sessions using vector embeddings for semantic retrieval, enabling the agent to build upon previous work and avoid redundant research while accessing relevant insights from historical research activities.
Rather than storing simple key-value pairs, the memory system uses vector embeddings to represent knowledge semantically, enabling sophisticated queries and retrieval operations that go beyond exact text matching. This semantic knowledge representation allows the agent to find conceptually related information even when different terminology is used, creating a more flexible and powerful knowledge management system that can adapt to varied expression of similar concepts.
The memory system implements context-aware retrieval mechanisms that don't just store information but intelligently retrieve relevant context based on current goals and activities. When researching "solar power efficiency," the system automatically surfaces relevant past research on "photovoltaic technology" or "renewable energy systems," demonstrating the semantic understanding that enables effective knowledge reuse and prevents redundant research efforts.
Like biological memory systems, the architecture implements selective retention through memory consolidation and forgetting processes where more important and frequently accessed information is preserved while less relevant details are allowed to fade. This approach prevents memory systems from becoming cluttered with irrelevant information while ensuring that valuable insights and frequently needed knowledge remain easily accessible for future research activities.
THE REFLECTION AND IMPROVEMENT SYSTEM: METACOGNITIVE CAPABILITIES
The ReflectionSystem represents perhaps the most sophisticated aspect of the architecture, implementing the agent's ability to analyze its own performance and continuously improve its capabilities. This metacognitive capability distinguishes truly agentic systems from simpler automated tools by enabling self-directed learning and optimization that doesn't require external programming or manual tuning.
The reflection system evaluates performance across multiple dimensions to provide comprehensive assessment of system effectiveness and identify specific areas for improvement. Research quality assessment examines the comprehensiveness, accuracy, and depth of analysis produced by research sessions, evaluating whether the agent is meeting its primary objective of producing valuable and reliable research outputs. Efficiency analysis focuses on resource utilization and time management, ensuring that the agent achieves its objectives within reasonable resource constraints while identifying opportunities for optimization. Tool effectiveness evaluation examines success rates and optimal usage patterns for different tools and configurations, enabling the system to learn which approaches work best in different contexts. Strategic soundness assessment evaluates the quality of planning and adaptation decisions, analyzing whether the agent's high-level strategic choices lead to effective research outcomes.
The system goes beyond collecting performance metrics to actively learning from experience through sophisticated analysis and pattern recognition. The reflection system identifies patterns in successful research sessions and incorporates those insights into future planning and execution strategies, enabling the agent to replicate successful approaches while avoiding strategies that have proven ineffective. This learning process operates continuously, with each research session contributing to the agent's understanding of effective research methodologies and optimal resource utilization.
Based on comprehensive performance analysis, the system generates specific, actionable improvement recommendations that can be implemented in future research sessions. These automated improvement recommendations might include adjusting tool selection criteria based on observed performance patterns, modifying planning strategies to better sequence research activities, or optimizing resource allocation to improve efficiency while maintaining quality. The recommendations are specific and actionable rather than generic, enabling meaningful system improvements over time.
The reflection system tracks performance trends over time and provides predictive insights about when certain approaches are likely to be effective and when alternative strategies should be considered. This trend analysis enables the agent to adapt its behavior based on longer-term patterns rather than just immediate feedback, creating more stable and reliable improvements in system performance.
LOCAL LLM INTEGRATION: AUTONOMOUS REASONING ENGINE
The MistralProvider component represents the reasoning engine that enables the agent's sophisticated language understanding and generation capabilities while maintaining complete local control over all reasoning processes. This local deployment approach ensures privacy, reduces latency, and eliminates dependencies on external services while providing the sophisticated language capabilities necessary for autonomous research and analysis.
The system implements numerous optimizations for local LLM deployment that enable effective operation on standard hardware configurations. Memory optimization uses 16-bit precision and quantization techniques to reduce memory requirements while maintaining quality, making it possible to run sophisticated language models on systems with limited GPU memory. Device management automatically distributes model components across available hardware resources including both CPU and GPU, optimizing performance based on the specific hardware configuration available. Caching strategies implement intelligent caching to avoid redundant generations for repeated queries, improving both performance and resource efficiency by reusing previous reasoning results when appropriate.
The LLM integration goes beyond simple text generation to implement sophisticated context management that enables complex reasoning and planning tasks. Structured output generation ensures reliable JSON generation for tool parameters and planning decisions, enabling the LLM to interface effectively with other system components through well-defined data structures. Context window management implements intelligent truncation and summarization to work within model limitations while preserving essential information needed for effective reasoning. Dynamic temperature adjustment modifies generation parameters based on the type of reasoning required, using lower temperatures for analytical tasks that require precision and higher temperatures for creative or exploratory reasoning tasks.
By using local LLM deployment, the system ensures that sensitive research data never leaves the user's environment, providing essential privacy and security protections. This approach is crucial for proprietary research, confidential analysis, or work with sensitive data sources where external service dependencies would create unacceptable security risks or privacy concerns.
DATA FLOW AND COMMUNICATION PATTERNS
INFORMATION FLOW ARCHITECTURE
The system implements sophisticated information flow patterns that enable coherent autonomous behavior while maintaining clear separation of concerns between different subsystems. These patterns ensure that information flows efficiently throughout the system while maintaining data integrity and enabling the complex coordination required for autonomous research activities.
When a research request enters the system, it follows a well-defined request processing pipeline that ensures consistent handling and appropriate resource allocation. Input validation and normalization processes research goals, validating them for completeness and clarity while normalizing terminology and extracting key requirements that will guide subsequent processing. Context enrichment incorporates relevant historical information and environmental constraints that influence how the research should be conducted, drawing from the memory system to provide background context that improves research quality. Goal decomposition transforms abstract research objectives into concrete, measurable sub-goals that can be addressed through specific research activities, creating a clear roadmap for achieving the overall objective. Resource allocation determines appropriate computational resources and time constraints for the research session, ensuring that the approach matches available capabilities and user expectations.
INTER-COMPONENT COMMUNICATION
Components throughout the system communicate through standardized interfaces and protocols that enable loose coupling while ensuring reliable information exchange. The Planning System communicates with the Tool Registry to understand available capabilities and their characteristics, enabling informed decisions about which tools to incorporate into research plans. The Execution Engine coordinates with individual tools through the Tool Registry, passing parameters and receiving results in standardized formats that enable consistent processing regardless of the specific tool being utilized.
The Memory System receives information from multiple sources throughout the system, including task results from the Execution Engine, performance metrics from the Monitoring System, and reflection insights from the Reflection System. This comprehensive information gathering enables the memory system to maintain a complete picture of system activity and provide relevant context for future research sessions.
The Reflection System monitors activity across all other components, collecting performance data, analyzing patterns, and generating insights that inform future system behavior. This monitoring occurs continuously and non-intrusively, ensuring that reflection processes don't interfere with primary research activities while gathering the information necessary for meaningful performance analysis.
ERROR HANDLING AND RESILIENCE PATTERNS
The architecture implements comprehensive error handling and resilience patterns that enable continued operation even when individual components experience failures or suboptimal performance. These patterns are essential for autonomous operation, as the system must be able to recover from failures and adapt to unexpected circumstances without human intervention.
Component-level error handling ensures that failures in individual tools or subsystems don't cascade to affect the entire system. When tools fail, the system implements automatic retry mechanisms with adjusted parameters, attempting to resolve transient issues through configuration changes or alternative approaches. If retry mechanisms are unsuccessful, fallback strategies activate alternative tools or methodologies that can accomplish similar objectives, ensuring that research progress continues even when preferred approaches are unavailable.
System-level resilience mechanisms monitor overall system health and implement corrective actions when performance degrades or resource constraints are encountered. Resource monitoring tracks memory usage, CPU utilization, and other system resources, triggering optimization measures or graceful degradation when resources become constrained. Performance monitoring identifies when system components are operating outside normal parameters and initiates diagnostic and corrective processes to restore optimal operation.
The error handling system learns from failures to improve future resilience and performance. Failure patterns are analyzed to identify common causes and potential preventive measures, enabling the system to avoid similar issues in future operations. Recovery strategies are evaluated for effectiveness and optimized based on actual performance during failure scenarios, creating a continuously improving resilience capability.
SCALABILITY AND PERFORMANCE ARCHITECTURE
COMPUTATIONAL RESOURCE MANAGEMENT
The system implements sophisticated computational resource management that enables effective operation across a wide range of hardware configurations while optimizing performance based on available capabilities. This resource management approach ensures that the system can scale appropriately from development environments to production deployments while maintaining consistent behavior and performance characteristics.
Memory management strategies optimize usage of both system RAM and GPU memory to enable effective operation on standard hardware configurations. The system monitors memory usage continuously and implements intelligent caching and garbage collection strategies that minimize memory overhead while maintaining performance. Model loading and optimization techniques ensure that the local LLM operates efficiently within available memory constraints while providing the reasoning capabilities necessary for autonomous operation.
CPU and GPU utilization optimization balances computational workloads across available processing resources to maximize throughput while preventing resource contention that could degrade performance. The system automatically detects available hardware capabilities and configures processing strategies accordingly, ensuring optimal performance regardless of the specific hardware configuration.
Concurrent processing capabilities enable parallel execution of independent tasks while maintaining coordination and resource sharing for dependent activities. The system identifies opportunities for parallel execution within research plans and coordinates multiple concurrent activities without creating resource conflicts or coordination problems.
HORIZONTAL AND VERTICAL SCALING
The architecture supports both horizontal and vertical scaling approaches to accommodate different deployment requirements and growth patterns. Vertical scaling capabilities enable the system to take advantage of more powerful hardware by automatically utilizing additional CPU cores, GPU resources, and memory capacity when available. The system detects hardware capabilities at startup and configures processing strategies to utilize available resources effectively.
Horizontal scaling support enables deployment across multiple systems or containers for high-throughput research applications. The modular architecture facilitates distribution of different components across multiple systems, with clear interfaces that enable network-based communication between distributed components. Load balancing capabilities distribute research requests across multiple agent instances while maintaining session consistency and result quality.
The stateless design of many components facilitates scaling by enabling multiple instances to operate independently while sharing common resources like the memory system and tool registry. This approach enables linear scaling of processing capacity while maintaining the coordination and context sharing necessary for effective autonomous operation.
PERFORMANCE MONITORING AND OPTIMIZATION
Comprehensive performance monitoring capabilities track system behavior across multiple dimensions and provide insights for ongoing optimization efforts. Response time monitoring tracks the duration of different types of research activities and identifies bottlenecks or performance degradation that might require attention. Throughput analysis measures the system's capacity to handle multiple concurrent research requests while maintaining quality and responsiveness.
Resource utilization monitoring provides detailed insights into how computational resources are being used and identifies opportunities for optimization. Memory usage patterns are analyzed to identify potential memory leaks or inefficient allocation strategies that could be improved. CPU and GPU utilization patterns help identify whether processing resources are being used effectively or whether workload distribution could be optimized.
Quality metrics monitoring ensures that performance optimizations don't compromise the effectiveness of research outputs. Research quality scores are tracked over time to ensure that efficiency improvements maintain or improve the value of research results. User satisfaction metrics provide feedback about whether system performance meets user expectations and requirements.
The performance monitoring system generates automated optimization recommendations based on observed patterns and performance characteristics. These recommendations might include configuration adjustments, resource allocation modifications, or architectural changes that could improve system performance while maintaining reliability and quality.
SECURITY AND PRIVACY ARCHITECTURE
DATA PROTECTION AND PRIVACY
The architecture implements comprehensive data protection and privacy measures that ensure sensitive research information remains secure throughout all system operations. Local processing capabilities ensure that research data never leaves the user's environment, eliminating the privacy risks associated with cloud-based processing or external service dependencies.
Data encryption mechanisms protect sensitive information both at rest and during processing, ensuring that research data remains secure even if storage media or memory dumps are compromised. The system implements strong encryption standards for all persistent storage and uses secure memory handling practices to minimize the risk of sensitive information exposure.
Access control mechanisms ensure that research data and system capabilities are only accessible to authorized users and processes. Authentication and authorization systems verify user identity and enforce appropriate access restrictions based on user roles and permissions. Session management ensures that research activities remain isolated between different users and sessions.
Data retention policies automatically manage the lifecycle of research data and system logs, ensuring that sensitive information is retained only as long as necessary for system operation and user requirements. Secure deletion mechanisms ensure that sensitive data is completely removed from system storage when it is no longer needed.
SYSTEM SECURITY AND HARDENING
The system implements multiple layers of security controls that protect against various types of attacks and security threats. Input validation mechanisms ensure that all user inputs and external data sources are properly sanitized and validated before processing, preventing injection attacks and other input-based security vulnerabilities.
Sandbox isolation mechanisms ensure that tool execution and external data processing occur in controlled environments that limit the potential impact of malicious or corrupted data. The system implements appropriate isolation between different research sessions and between research activities and core system functions.
Network security controls limit and monitor external network communications to ensure that only necessary and authorized network activities occur. The system implements appropriate firewall rules and network monitoring to detect and prevent unauthorized network access or data exfiltration attempts.
Regular security updates and vulnerability management processes ensure that the system remains protected against newly discovered security threats. The system implements automated update mechanisms for security-critical components while maintaining stability and compatibility of research capabilities.
AUDIT AND COMPLIANCE CAPABILITIES
Comprehensive audit logging capabilities track all significant system activities and provide detailed records for compliance and security monitoring purposes. Research activity logs record details about research requests, execution steps, and results in a format that enables compliance verification and security analysis. System access logs track user authentication, authorization decisions, and system administration activities to enable security monitoring and incident investigation.
Data handling audit trails record how sensitive information is processed, stored, and transmitted throughout the system, enabling compliance verification for data protection regulations and organizational policies. Tool execution logs provide detailed records of external data access and processing activities to enable security analysis and compliance verification.
Compliance reporting capabilities generate standardized reports that demonstrate adherence to relevant regulations and organizational policies. The system can generate reports for data protection compliance, security compliance, and operational compliance requirements based on collected audit data.
Incident response capabilities enable rapid detection and response to security incidents or compliance violations. The system implements automated monitoring and alerting mechanisms that detect suspicious activities or policy violations and initiate appropriate response procedures to minimize impact and ensure proper incident handling.
DEPLOYMENT AND OPERATIONS ARCHITECTURE
CONTAINERIZATION AND ORCHESTRATION
The system leverages containerization technologies to ensure consistent deployment across different environments while simplifying operations and maintenance activities. Docker containerization encapsulates all system components and dependencies in portable containers that can be deployed consistently across development, testing, and production environments. This approach eliminates environment-specific configuration issues and ensures that the system behaves consistently regardless of the underlying infrastructure.
Kubernetes orchestration capabilities enable sophisticated deployment strategies including rolling updates, automatic scaling, and failure recovery mechanisms. The system implements appropriate health checks and readiness probes that enable Kubernetes to monitor system health and automatically restart or replace failed components. Service discovery mechanisms enable dynamic configuration and communication between system components even as they are moved or scaled across different nodes in a cluster.
Configuration management approaches enable environment-specific customization while maintaining consistent core behavior across different deployments. The system uses environment variables and configuration files to customize behavior for different deployment scenarios while ensuring that core functionality remains consistent and reliable.
MONITORING AND OBSERVABILITY
Comprehensive monitoring and observability capabilities provide detailed insights into system behavior and enable proactive identification and resolution of performance or reliability issues. Application performance monitoring tracks response times, throughput, and error rates across all system components, enabling identification of performance bottlenecks or degradation that might require attention.
Infrastructure monitoring tracks the health and performance of underlying hardware and system resources including CPU utilization, memory usage, disk I/O, and network performance. This monitoring enables identification of resource constraints or hardware issues that might affect system performance or reliability.
Distributed tracing capabilities track research requests as they flow through different system components, enabling detailed analysis of request processing paths and identification of performance bottlenecks or failure points. This capability is essential for debugging complex issues and optimizing system performance in distributed deployment scenarios.
Log aggregation and analysis systems collect and analyze log data from all system components to enable centralized monitoring and troubleshooting. The system implements structured logging practices that facilitate automated analysis and alerting based on log content and patterns.
BACKUP AND DISASTER RECOVERY
Comprehensive backup and disaster recovery capabilities ensure that research data and system configuration can be recovered in the event of hardware failures, software issues, or other catastrophic events. Automated backup mechanisms regularly capture research data, system configuration, and learned knowledge from the memory system to enable complete system recovery if necessary.
Data replication strategies ensure that critical information is stored in multiple locations to prevent data loss in the event of storage failures. The system implements appropriate replication mechanisms for different types of data based on their criticality and recovery requirements.
Disaster recovery procedures provide step-by-step guidance for restoring system operation after various types of failures or disasters. These procedures are tested regularly to ensure that they remain effective and can be executed successfully when needed.
Business continuity planning ensures that research capabilities can be maintained even during system outages or maintenance activities. The system implements appropriate redundancy and failover mechanisms that enable continued operation during planned maintenance or unexpected failures.
CONCLUSION: EMERGENT INTELLIGENCE THROUGH ARCHITECTURAL SOPHISTICATION
The Research Assistant system demonstrates that sophisticated autonomous behavior can emerge from the careful orchestration of well-designed architectural components working in concert. The system achieves genuine agency not through any single "intelligent" component but through the complex interactions between specialized subsystems that each contribute essential capabilities to the overall intelligent behavior.
The layered architecture enables sophisticated autonomous behavior while maintaining clear separation of concerns and manageable complexity. Each layer provides appropriate abstractions that enable higher-level reasoning while hiding implementation details that would otherwise complicate decision-making processes. This approach enables the system to operate at multiple levels of abstraction simultaneously, from high-level strategic planning to low-level resource management, while maintaining coherent overall behavior.
The autonomous control loops create a hierarchy of intelligent behavior that operates at different timescales and addresses different aspects of research execution. The interaction between these loops generates emergent intelligence that adapts to circumstances, learns from experience, and continuously improves performance without external programming or manual optimization.
The sophisticated memory and context management systems enable the agent to maintain coherence across extended research sessions while accumulating knowledge that improves future performance. This cognitive architecture represents one of the most advanced aspects of the system, implementing memory patterns that enable both immediate operational effectiveness and long-term learning and improvement.
The comprehensive error handling and resilience mechanisms ensure that the system can operate autonomously even in the face of failures, unexpected circumstances, or resource constraints. This resilience is essential for practical autonomous operation and distinguishes the system from simpler automated tools that require constant supervision and intervention.
The result is a truly agentic system that can receive high-level objectives and autonomously determine how to achieve them, adapting to circumstances, learning from experience, and continuously improving its capabilities. The architecture demonstrates that sophisticated autonomous behavior can be achieved through careful design and implementation of interconnected components that work together to create intelligence that emerges from their interaction rather than being explicitly programmed into any single component.
HOW TO IMPLEMENT THIS ARCHITECTURE
STEP 1: ENVIRONMENT SETUP
PREREQUISITES AND DEPENDENCIES
The following Python packages are required for the complete system:
install_dependencies.sh:
------------------------
pip install transformers torch accelerate
pip install langchain langchain-community
pip install requests beautifulsoup4 arxiv
pip install matplotlib plotly pandas
pip install sentence-transformers chromadb
pip install pydantic fastapi uvicorn
Code Explanation:
- transformers: Provides access to Hugging Face transformer models like Mistral
- torch: PyTorch framework for running neural networks locally
- langchain: Framework for building LLM applications with tool integration
- requests/beautifulsoup4: Web scraping and HTTP request handling
- matplotlib/plotly: Data visualization capabilities
- sentence-transformers: For creating embeddings for the memory system
- chromadb: Vector database for storing and retrieving agent memories
- pydantic/fastapi: For building REST APIs and data validation
LOCAL MISTRAL SETUP
# install_mistral.py
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
def setup_mistral():
"""
Initialize and load the Mistral 7B Instruct model locally.
This function handles model download, tokenizer setup, and memory optimization.
"""
model_name = "mistralai/Mistral-7B-Instruct-v0.2"
# Load tokenizer - handles text encoding/decoding for the model
print("Loading Mistral tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Load model with optimization settings
print("Loading Mistral model (this may take several minutes)...")
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16, # Use half precision to reduce memory usage
device_map="auto" # Automatically distribute across available GPUs/CPU
)
return tokenizer, model
# Test the setup to ensure everything works
if __name__ == "__main__":
try:
tokenizer, model = setup_mistral()
print("SUCCESS: Mistral model loaded successfully!")
print(f"Model loaded on: {model.device}")
print(f"Memory usage: ~{torch.cuda.memory_allocated() / 1e9:.2f} GB")
except Exception as e:
print(f"ERROR: Failed to load Mistral model: {e}")
Code Explanation:
- We use Mistral-7B-Instruct-v0.2, which is optimized for instruction following torch.float16 reduces memory usage from ~14GB to ~7GB without major quality loss
- device_map="auto" automatically handles GPU/CPU placement based on available hardware
- The tokenizer converts text to tokens the model can understand
- Error handling ensures we can diagnose loading issues
STEP 2: CORE AGENT ARCHITECTURE
# agent_core.py
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json
import uuid
from datetime import datetime
class TaskStatus(Enum):
"""
Enumeration defining the possible states of agent tasks.
This helps track task progress and enables proper error handling.
"""
PENDING = "pending" # Task created but not started
IN_PROGRESS = "in_progress" # Task currently being executed
COMPLETED = "completed" # Task finished successfully
FAILED = "failed" # Task encountered an error
@dataclass
class Task:
"""
Represents a single executable task within the agent's plan.
Each task encapsulates what tool to use, with what parameters,
and tracks its execution status and results.
"""
id: str # Unique identifier for tracking
description: str # Human-readable task description
tool_name: str # Which tool should execute this task
parameters: Dict[str, Any] # Tool-specific parameters
status: TaskStatus = TaskStatus.PENDING
result: Optional[Any] = None # Stores execution results
created_at: datetime = None # Timestamp for debugging
def __post_init__(self):
"""Automatically set creation timestamp if not provided"""
if self.created_at is None:
self.created_at = datetime.now()
@dataclass
class AgentState:
"""
Maintains the complete state of the agent during execution.
This is the agent's "working memory" containing current goals,
task queues, completed work, and accumulated context.
"""
current_goal: str # The high-level objective
task_queue: List[Task] # Tasks waiting to be executed
completed_tasks: List[Task] # Successfully finished tasks
memory: Dict[str, Any] # Persistent knowledge storage
context: str # Current situational context
class ResearchAgent:
"""
Main agent class that orchestrates the research process.
Combines planning, execution, and reflection capabilities
to autonomously achieve research objectives.
"""
def __init__(self, llm_provider, tool_registry):
"""
Initialize the agent with required components.
Args:
llm_provider: Interface to the local Mistral model
tool_registry: Collection of available tools
"""
self.llm = llm_provider
self.tools = tool_registry
self.state = AgentState(
current_goal="",
task_queue=[],
completed_tasks=[],
memory={},
context=""
)
async def execute_goal(self, goal: str) -> str:
"""
Main execution loop for achieving a research goal.
This method orchestrates the entire research process from
planning through execution to final synthesis.
Args:
goal: High-level research objective (e.g., "Analyze renewable energy trends")
Returns:
Comprehensive research report as a string
"""
print(f"Agent received goal: {goal}")
self.state.current_goal = goal
# PHASE 1: Strategic Planning
print("Phase 1: Creating research plan...")
plan = await self.create_plan(goal)
print(f"Generated {len(self.state.task_queue)} tasks")
# PHASE 2: Autonomous Execution
print("Phase 2: Executing research tasks...")
task_count = 0
while self.state.task_queue:
task = self.state.task_queue.pop(0)
task_count += 1
print(f" Executing task {task_count}: {task.description}")
await self.execute_task(task)
# PHASE 3: Adaptive Replanning
# After every few tasks, evaluate if we need to adjust our approach
if task_count % 3 == 0 and await self.should_replan():
print(" Adaptive replanning triggered...")
new_tasks = await self.replan()
self.state.task_queue.extend(new_tasks)
# PHASE 4: Synthesis and Reporting
print("Phase 4: Synthesizing final results...")
return await self.synthesize_results()
Code Explanation:
- The Task dataclass encapsulates all information needed to execute a single action
- TaskStatus enum provides clear state tracking and enables robust error handling
- AgentState maintains all context needed for autonomous operation
- The execute_goal method implements a four-phase research process:
1. Strategic planning using the LLM
2. Autonomous task execution using tools
3. Adaptive replanning based on intermediate results
4. Final synthesis into a comprehensive report
- The architecture is designed to be extensible - new tools and capabilities can be added without modifying the core agent logic
STEP 3: PLANNING SYSTEM
STRATEGIC PLANNING MODULE
# planning.py
class PlanningSystem:
"""
Handles strategic planning for research objectives.
Uses the local LLM to break down high-level goals into
concrete, executable tasks with proper sequencing.
"""
def __init__(self, llm_provider):
self.llm = llm_provider
async def create_plan(self, goal: str, context: str = "") -> List[Task]:
"""
Generate a strategic research plan for achieving the given goal.
This method prompts the LLM to think strategically about the research
process and create a well-sequenced set of tasks.
Args:
goal: The research objective to plan for
context: Additional context from previous research or user input
Returns:
List of Task objects representing the complete research plan
"""
# Construct a detailed planning prompt that guides the LLM
# to think systematically about the research process
planning_prompt = f"""
You are an expert research planning AI. Your job is to create a comprehensive, well-sequenced plan for achieving a research objective.
RESEARCH GOAL: {goal}
ADDITIONAL CONTEXT: {context}
Available tools and their capabilities:
- web_search: Search the internet for current information and sources
Parameters: query (string), num_results (int)
- pdf_analyzer: Download and analyze research papers, reports, and documents
Parameters: url (string), focus_area (string)
- data_visualizer: Create charts, graphs, and visual representations
Parameters: data (list), chart_type (string), title (string)
- web_scraper: Extract structured data from specific websites
Parameters: url (string), data_fields (list)
- synthesis: Combine and analyze information from multiple sources
Parameters: sources (list), analysis_type (string)
PLANNING GUIDELINES:
1. Start with broad information gathering before diving into specifics
2. Sequence tasks logically - gather data before analyzing it
3. Include validation steps to verify important findings
4. Plan for synthesis and visualization of key insights
5. Consider multiple perspectives and sources
OUTPUT FORMAT (JSON):
{{
"strategy": "Brief description of the overall research approach",
"tasks": [
{{
"id": "task_001",
"description": "Clear description of what this task accomplishes",
"tool_name": "name_of_tool_to_use",
"parameters": {{"param1": "value1", "param2": "value2"}},
"priority": 1,
"dependencies": ["list_of_task_ids_that_must_complete_first"]
}}
],
"success_criteria": [
"Specific measurable criteria for successful completion",
"Additional criteria that define research quality"
]
}}
Create a detailed research plan:
"""
try:
# Generate the plan using the local LLM
print("Generating research plan with Mistral...")
response = await self.llm.generate(planning_prompt)
# Parse the JSON response
plan_data = json.loads(response)
print(f"Strategy: {plan_data['strategy']}")
print(f"Success criteria: {plan_data['success_criteria']}")
# Convert the plan data into Task objects
tasks = []
for task_data in plan_data["tasks"]:
task = Task(
id=task_data["id"],
description=task_data["description"],
tool_name=task_data["tool_name"],
parameters=task_data["parameters"]
)
tasks.append(task)
print(f" Planned task: {task.description}")
return tasks
except json.JSONDecodeError as e:
print(f"Error parsing LLM response as JSON: {e}")
# Fallback to a simple default plan
return self._create_fallback_plan(goal)
except Exception as e:
print(f"Error in planning: {e}")
return self._create_fallback_plan(goal)
def _create_fallback_plan(self, goal: str) -> List[Task]:
"""
Create a basic research plan when LLM planning fails.
This ensures the agent can still operate even if planning encounters issues.
"""
print("Using fallback planning strategy...")
return [
Task(
id="fallback_001",
description=f"Search for general information about: {goal}",
tool_name="web_search",
parameters={"query": goal, "num_results": 5}
),
Task(
id="fallback_002",
description="Synthesize findings from search results",
tool_name="synthesis",
parameters={"sources": ["search_results"], "analysis_type": "summary"}
)
]
async def should_replan(self, agent_state: AgentState) -> bool:
"""
Determine if replanning is needed based on current research progress.
This enables adaptive behavior - the agent can change course if
initial findings suggest better approaches.
Args:
agent_state: Current state of the agent including completed tasks
Returns:
Boolean indicating whether replanning should occur
"""
# Don't replan if we haven't completed enough tasks to assess progress
if len(agent_state.completed_tasks) < 2:
return False
# Construct a prompt to evaluate research progress
replan_prompt = f"""
Analyze the current research progress and determine if we should adjust our approach.
ORIGINAL RESEARCH GOAL: {agent_state.current_goal}
PROGRESS SUMMARY:
- Tasks completed: {len(agent_state.completed_tasks)}
- Tasks remaining: {len(agent_state.task_queue)}
RECENT TASK RESULTS:
{self._format_recent_results(agent_state.completed_tasks[-3:])}
REMAINING PLANNED TASKS:
{self._format_remaining_tasks(agent_state.task_queue[:3])}
EVALUATION CRITERIA:
1. Are we making meaningful progress toward the research goal?
2. Do recent results suggest more promising research directions?
3. Are there significant gaps in our current approach?
4. Would additional or different tasks improve the final output?
Based on this analysis, should we replan our approach?
Answer with: YES or NO
If YES, provide a brief reason: [explanation]
If NO, provide a brief reason: [explanation]
Decision:
"""
try:
response = await self.llm.generate(replan_prompt)
decision = "YES" in response.upper()
if decision:
print(f"Replanning triggered. Reason: {response}")
else:
print("Continuing with current plan")
return decision
except Exception as e:
print(f"Error in replan evaluation: {e}")
return False # Default to not replanning on errors
def _format_recent_results(self, recent_tasks: List[Task]) -> str:
"""Format recent task results for LLM analysis"""
if not recent_tasks:
return "No completed tasks yet"
formatted = []
for task in recent_tasks:
result_summary = str(task.result)[:200] + "..." if len(str(task.result)) > 200 else str(task.result)
formatted.append(f"- {task.description}: {result_summary}")
return "\n".join(formatted)
def _format_remaining_tasks(self, remaining_tasks: List[Task]) -> str:
"""Format remaining tasks for LLM analysis"""
if not remaining_tasks:
return "No remaining tasks"
formatted = []
for task in remaining_tasks:
formatted.append(f"- {task.description} (using {task.tool_name})")
return "\n".join(formatted)
Code Explanation:
- The PlanningSystem uses structured prompting to guide the LLM through strategic thinking
- JSON output format ensures machine-readable plans that can be parsed reliably
- The planning prompt includes detailed tool descriptions so the LLM understands capabilities
- Error handling with fallback planning ensures the agent continues operating even if the LLM produces invalid JSON or encounters other issues
- The should_replan method enables adaptive behavior by evaluating progress and adjusting the research approach based on intermediate findings
- Task formatting methods prepare context for LLM evaluation in a readable format
STEP 4: TOOL SYSTEM
TOOL REGISTRY AND IMPLEMENTATION
# tools.py
from abc import ABC, abstractmethod
import requests
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
import pandas as pd
import uuid
import os
class Tool(ABC):
"""
Abstract base class for all agent tools.
Defines the interface that all tools must implement,
ensuring consistent integration with the agent system.
"""
@abstractmethod
async def execute(self, **kwargs) -> Dict[str, Any]:
"""
Execute the tool with given parameters.
All tools must implement this method and return structured results.
Args:
**kwargs: Tool-specific parameters
Returns:
Dictionary containing execution results and metadata
"""
pass
class WebSearchTool(Tool):
"""
Tool for searching the web and retrieving structured information.
Integrates with search APIs to gather current information for research.
"""
def __init__(self, api_key: str):
"""
Initialize the web search tool with API credentials.
Args:
api_key: API key for the search service
"""
self.api_key = api_key
self.base_url = "https://api.searchengine.com/search" # Example URL
async def execute(self, query: str, num_results: int = 5) -> Dict[str, Any]:
"""
Search the web and return structured, relevant results.
Args:
query: Search query string
num_results: Maximum number of results to return
Returns:
Dictionary containing search results and metadata
"""
print(f"Searching web for: '{query}'")
try:
# Prepare search parameters
params = {
"q": query,
"key": self.api_key,
"num": num_results,
"safe": "active" # Enable safe search
}
# Make the search request
response = requests.get(self.base_url, params=params, timeout=10)
response.raise_for_status()
search_data = response.json()
# Process and structure the results
structured_results = []
for item in search_data.get("items", []):
structured_results.append({
"title": item.get("title", "No title"),
"snippet": item.get("snippet", "No description available"),
"url": item.get("link", ""),
"source": item.get("displayLink", "Unknown source"),
"relevance_score": self._calculate_relevance(query, item)
})
# Sort by relevance score
structured_results.sort(key=lambda x: x["relevance_score"], reverse=True)
result = {
"query": query,
"results": structured_results,
"total_found": len(structured_results),
"search_time": response.elapsed.total_seconds(),
"status": "success"
}
print(f"Found {len(structured_results)} relevant results")
return result
except requests.RequestException as e:
print(f"Search request failed: {e}")
return {
"query": query,
"results": [],
"total_found": 0,
"error": str(e),
"status": "failed"
}
def _calculate_relevance(self, query: str, item: Dict) -> float:
"""
Calculate a relevance score for search results.
This helps prioritize the most useful results for the research goal.
"""
query_terms = query.lower().split()
title = item.get("title", "").lower()
snippet = item.get("snippet", "").lower()
# Count term matches in title (weighted higher) and snippet
title_matches = sum(1 for term in query_terms if term in title)
snippet_matches = sum(1 for term in query_terms if term in snippet)
# Calculate relevance score (0-1 scale)
relevance = (title_matches * 0.7 + snippet_matches * 0.3) / len(query_terms)
return min(relevance, 1.0)
class PDFAnalyzerTool(Tool):
"""
Tool for downloading and analyzing PDF documents.
Extracts text content and provides focused analysis
based on research requirements.
"""
async def execute(self, url: str, focus_area: str = "") -> Dict[str, Any]:
"""
Download, process, and analyze a PDF document.
Args:
url: URL of the PDF document to analyze
focus_area: Specific aspect to focus analysis on
Returns:
Dictionary containing analysis results and metadata
"""
print(f"Analyzing PDF from: {url}")
print(f"Focus area: {focus_area}")
try:
# Download the PDF with proper headers
headers = {
'User-Agent': 'Mozilla/5.0 (Research Agent/1.0)',
'Accept': 'application/pdf'
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
if 'application/pdf' not in response.headers.get('content-type', ''):
raise ValueError("URL does not point to a PDF document")
# Extract text content from PDF
# Note: In production, use PyPDF2, pdfplumber, or similar libraries
text_content = self._extract_pdf_text(response.content)
if not text_content.strip():
raise ValueError("Could not extract text from PDF")
# Perform focused analysis using LLM
analysis = await self._analyze_document(text_content, focus_area, url)
result = {
"source_url": url,
"focus_area": focus_area,
"analysis": analysis,
"word_count": len(text_content.split()),
"char_count": len(text_content),
"status": "success"
}
print(f"Successfully analyzed {result['word_count']} words")
return result
except Exception as e:
print(f"PDF analysis failed: {e}")
return {
"source_url": url,
"focus_area": focus_area,
"error": str(e),
"status": "failed"
}
def _extract_pdf_text(self, pdf_content: bytes) -> str:
"""
Extract text from PDF content.
This is a simplified implementation - in production,
use specialized PDF processing libraries.
"""
# Placeholder implementation
# In reality, you would use:
# import PyPDF2 or import pdfplumber
# and implement proper PDF text extraction
# For demonstration, return a simulated extraction
return "Extracted PDF content would appear here..."
async def _analyze_document(self, text_content: str, focus_area: str, source_url: str) -> Dict[str, Any]:
"""
Analyze document content with focus on specific research areas.
Uses the LLM to extract relevant insights and information.
"""
# Truncate content to fit in LLM context window
max_content_length = 2000
content_preview = text_content[:max_content_length]
if len(text_content) > max_content_length:
content_preview += "\n[Content truncated for analysis...]"
analysis_prompt = f"""
Analyze this research document with specific focus on: {focus_area}
DOCUMENT SOURCE: {source_url}
DOCUMENT CONTENT:
{content_preview}
Please extract and analyze:
1. KEY FINDINGS: What are the main findings relevant to "{focus_area}"?
2. METHODOLOGY: What research methods or data sources were used?
3. DATA SOURCES: What data sources or evidence are mentioned?
4. CONCLUSIONS: What conclusions are drawn about "{focus_area}"?
5. LIMITATIONS: What limitations or caveats are noted?
6. RELEVANCE: How relevant is this document to "{focus_area}" (1-10 scale)?
Format your response as structured JSON:
{{
"key_findings": ["finding1", "finding2", ...],
"methodology": "description of methods used",
"data_sources": ["source1", "source2", ...],
"conclusions": ["conclusion1", "conclusion2", ...],
"limitations": ["limitation1", "limitation2", ...],
"relevance_score": 8,
"summary": "Brief summary of the document's contribution to the research topic"
}}
Analysis:
"""
# This would use your LLM provider to analyze the content
# For this example, we'll simulate the analysis
simulated_analysis = {
"key_findings": [
f"Document provides insights into {focus_area}",
"Research methodology appears robust",
"Multiple data sources referenced"
],
"methodology": "Mixed methods research approach with quantitative and qualitative components",
"data_sources": ["Government statistics", "Survey data", "Expert interviews"],
"conclusions": [
f"Significant trends identified in {focus_area}",
"Further research recommended"
],
"limitations": ["Sample size constraints", "Temporal limitations"],
"relevance_score": 7,
"summary": f"Relevant research document providing valuable insights into {focus_area}"
}
return simulated_analysis
class DataVisualizerTool(Tool):
"""
Tool for creating charts, graphs, and visual representations of data.
Helps transform research data into clear, communicative visualizations.
"""
def __init__(self):
"""Initialize the data visualizer with necessary directories."""
self.output_dir = "charts"
os.makedirs(self.output_dir, exist_ok=True)
async def execute(self, data: List[Dict], chart_type: str, title: str, **kwargs) -> Dict[str, Any]:
"""
Create a visualization from structured data.
Args:
data: List of dictionaries containing the data to visualize
chart_type: Type of chart to create ('bar', 'line', 'pie', 'scatter')
title: Title for the chart
**kwargs: Additional chart formatting options
Returns:
Dictionary containing chart information and file path
"""
print(f"Creating {chart_type} chart: '{title}'")
try:
# Convert data to pandas DataFrame for easier manipulation
df = pd.DataFrame(data)
if df.empty:
raise ValueError("No data provided for visualization")
# Create the chart based on type
chart_path = await self._create_chart(df, chart_type, title, **kwargs)
result = {
"chart_path": chart_path,
"chart_type": chart_type,
"title": title,
"data_points": len(data),
"columns": list(df.columns),
"status": "success"
}
print(f"Chart saved to: {chart_path}")
return result
except Exception as e:
print(f"Visualization failed: {e}")
return {
"chart_type": chart_type,
"title": title,
"error": str(e),
"status": "failed"
}
async def _create_chart(self, df: pd.DataFrame, chart_type: str, title: str, **kwargs) -> str:
"""
Create the actual chart using matplotlib.
Handles different chart types and formatting options.
"""
# Set up the plot
plt.figure(figsize=(12, 8))
plt.style.use('seaborn-v0_8') # Use a clean, professional style
# Create chart based on type
if chart_type == "bar":
plt.bar(df.iloc[:, 0], df.iloc[:, 1], color='skyblue', alpha=0.8)
plt.xlabel(df.columns[0])
plt.ylabel(df.columns[1])
plt.xticks(rotation=45, ha='right')
elif chart_type == "line":
plt.plot(df.iloc[:, 0], df.iloc[:, 1], marker='o', linewidth=2, markersize=6)
plt.xlabel(df.columns[0])
plt.ylabel(df.columns[1])
plt.grid(True, alpha=0.3)
elif chart_type == "pie":
plt.pie(df.iloc[:, 1], labels=df.iloc[:, 0], autopct='%1.1f%%', startangle=90)
plt.axis('equal')
elif chart_type == "scatter":
plt.scatter(df.iloc[:, 0], df.iloc[:, 1], alpha=0.7, s=60)
plt.xlabel(df.columns[0])
plt.ylabel(df.columns[1])
plt.grid(True, alpha=0.3)
else:
raise ValueError(f"Unsupported chart type: {chart_type}")
# Add title and formatting
plt.title(title, fontsize=16, fontweight='bold', pad=20)
plt.tight_layout()
# Save the chart
chart_filename = f"{chart_type}_{uuid.uuid4().hex[:8]}.png"
chart_path = os.path.join(self.output_dir, chart_filename)
plt.savefig(chart_path, dpi=300, bbox_inches='tight', facecolor='white')
plt.close() # Close the figure to free memory
return chart_path
class ToolRegistry:
"""
Central registry for managing and executing agent tools.
Provides a unified interface for tool discovery and execution.
"""
def __init__(self):
"""Initialize an empty tool registry."""
self.tools = {}
self.execution_history = []
def register(self, name: str, tool: Tool):
"""
Register a tool with the given name.
Args:
name: Unique identifier for the tool
tool: Tool instance implementing the Tool interface
"""
if not isinstance(tool, Tool):
raise ValueError(f"Tool must implement the Tool interface")
self.tools[name] = tool
print(f"Registered tool: {name}")
async def execute_tool(self, name: str, **kwargs) -> Dict[str, Any]:
"""
Execute a registered tool with the given parameters.
Args:
name: Name of the tool to execute
**kwargs: Parameters to pass to the tool
Returns:
Dictionary containing tool execution results
"""
if name not in self.tools:
available_tools = ", ".join(self.tools.keys())
raise ValueError(f"Tool '{name}' not found. Available tools: {available_tools}")
# Record execution start
execution_start = datetime.now()
try:
# Execute the tool
result = await self.tools[name].execute(**kwargs)
# Record successful execution
execution_record = {
"tool_name": name,
"parameters": kwargs,
"start_time": execution_start,
"end_time": datetime.now(),
"status": "success",
"result_summary": str(result)[:100] + "..." if len(str(result)) > 100 else str(result)
}
self.execution_history.append(execution_record)
return result
except Exception as e:
# Record failed execution
execution_record = {
"tool_name": name,
"parameters": kwargs,
"start_time": execution_start,
"end_time": datetime.now(),
"status": "failed",
"error": str(e)
}
self.execution_history.append(execution_record)
raise e
def get_available_tools(self) -> List[str]:
"""Return a list of available tool names."""
return list(self.tools.keys())
def get_execution_history(self) -> List[Dict[str, Any]]:
"""Return the complete tool execution history."""
return self.execution_history.copy()
Code Explanation:
- The Tool abstract base class ensures all tools implement a consistent interface
- WebSearchTool integrates with search APIs and includes relevance scoring to prioritize results
- PDFAnalyzerTool handles document download, text extraction, and focused analysis
- DataVisualizerTool creates professional-quality charts using matplotlib with proper styling
- ToolRegistry provides centralized tool management with execution tracking and error handling
- All tools include comprehensive error handling and return structured results
- Execution history tracking enables performance analysis and debugging
- The modular design makes it easy to add new tools without modifying existing code
STEP 5: MEMORY AND CONTEXT MANAGEMENT
MEMORY SYSTEM IMPLEMENTATION
# memory.py
import chromadb
from sentence_transformers import SentenceTransformer
from typing import Dict, List, Any, Optional
from datetime import datetime
import json
import numpy as np
class MemorySystem:
"""
Advanced memory system for storing and retrieving research context.
Uses vector embeddings to enable semantic search across past research,
allowing the agent to build on previous work and avoid repetition.
"""
def __init__(self, collection_name: str = "agent_memory"):
"""
Initialize the memory system with vector database and embedding model.
Args:
collection_name: Name of the ChromaDB collection for this agent
"""
print("Initializing memory system...")
# Initialize ChromaDB for vector storage
self.client = chromadb.Client()
try:
# Try to get existing collection
self.collection = self.client.get_collection(collection_name)
print(f"Loaded existing memory collection: {collection_name}")
except:
# Create new collection if it doesn't exist
self.collection = self.client.create_collection(collection_name)
print(f"Created new memory collection: {collection_name}")
# Initialize sentence transformer for creating embeddings
print("Loading sentence transformer model...")
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
print("Memory system ready")
def store_result(self, task_id: str, content: str, metadata: Dict[str, Any]):
"""
Store a task result in vector memory for future retrieval.
This allows the agent to learn from past research and build context.
Args:
task_id: Unique identifier for the task
content: Text content to store (findings, analysis, etc.)
metadata: Additional structured information about the result
"""
print(f"Storing memory for task: {task_id}")
try:
# Create vector embedding for semantic search
embedding = self.encoder.encode([content])[0]
# Prepare metadata with timestamp and task info
enriched_metadata = {
"task_id": task_id,
"timestamp": datetime.now().isoformat(),
"content_length": len(content),
"content_type": metadata.get("type", "research_result"),
**metadata
}
# Store in ChromaDB
self.collection.add(
embeddings=[embedding.tolist()],
documents=[content],
metadatas=[enriched_metadata],
ids=[task_id]
)
print(f"Memory stored successfully for task {task_id}")
except Exception as e:
print(f"Error storing memory: {e}")
def retrieve_relevant(self, query: str, k: int = 5,
time_filter: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Retrieve relevant memories based on semantic similarity to a query.
This enables the agent to access relevant past research when working
on related topics.
Args:
query: Text query to search for relevant memories
k: Number of results to return
time_filter: Optional time filter ("recent", "week", "month")
Returns:
List of relevant memory entries with similarity scores
"""
print(f"Retrieving relevant memories for: '{query}'")
try:
# Create query embedding
query_embedding = self.encoder.encode([query])[0]
# Prepare filter conditions
where_conditions = {}
if time_filter:
cutoff_time = self._get_time_cutoff(time_filter)
where_conditions["timestamp"] = {"$gte": cutoff_time}
# Query the vector database
query_params = {
"query_embeddings": [query_embedding.tolist()],
"n_results": k
}
if where_conditions:
query_params["where"] = where_conditions
results = self.collection.query(**query_params)
# Format results with similarity scores
relevant_memories = []
for i, (doc, meta, distance) in enumerate(zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
)):
similarity_score = 1 - distance # Convert distance to similarity
memory_entry = {
"content": doc,
"metadata": meta,
"similarity_score": similarity_score,
"rank": i + 1
}
relevant_memories.append(memory_entry)
print(f"Retrieved {len(relevant_memories)} relevant memories")
return relevant_memories
except Exception as e:
print(f"Error retrieving memories: {e}")
return []
def get_research_context(self, current_goal: str, max_context_length: int = 1000) -> str:
"""
Generate a research context summary based on relevant past work.
This provides the agent with background information for better decision-making.
Args:
current_goal: The current research objective
max_context_length: Maximum length of context to return
Returns:
Formatted context string summarizing relevant past research
"""
print("Generating research context...")
# Retrieve relevant memories
relevant_memories = self.retrieve_relevant(current_goal, k=10)
if not relevant_memories:
return "No relevant past research found."
# Format context from most relevant memories
context_parts = []
current_length = 0
for memory in relevant_memories:
# Skip if similarity is too low
if memory["similarity_score"] < 0.3:
continue
# Format memory entry
timestamp = memory["metadata"].get("timestamp", "Unknown time")
content_preview = memory["content"][:200] + "..." if len(memory["content"]) > 200 else memory["content"]
memory_summary = f"[{timestamp[:10]}] {content_preview}"
# Check if adding this memory would exceed length limit
if current_length + len(memory_summary) > max_context_length:
break
context_parts.append(memory_summary)
current_length += len(memory_summary)
if not context_parts:
return "No sufficiently relevant past research found."
context = "RELEVANT PAST RESEARCH:\n" + "\n\n".join(context_parts)
print(f"Generated context with {len(context_parts)} relevant memories")
return context
def _get_time_cutoff(self, time_filter: str) -> str:
"""
Calculate time cutoff for filtering memories.
Args:
time_filter: Time filter type ("recent", "week", "month")
Returns:
ISO format timestamp for filtering
"""
from datetime import timedelta
now = datetime.now()
if time_filter == "recent":
cutoff = now - timedelta(days=1)
elif time_filter == "week":
cutoff = now - timedelta(weeks=1)
elif time_filter == "month":
cutoff = now - timedelta(days=30)
else:
cutoff = now - timedelta(days=365) # Default to one year
return cutoff.isoformat()
def get_memory_stats(self) -> Dict[str, Any]:
"""
Get statistics about the memory system.
Useful for monitoring and debugging memory usage.
Returns:
Dictionary containing memory system statistics
"""
try:
collection_info = self.collection.get()
total_memories = len(collection_info["ids"])
# Calculate memory types distribution
memory_types = {}
for metadata in collection_info["metadatas"]:
content_type = metadata.get("content_type", "unknown")
memory_types[content_type] = memory_types.get(content_type, 0) + 1
# Calculate time distribution
timestamps = [meta.get("timestamp") for meta in collection_info["metadatas"] if meta.get("timestamp")]
stats = {
"total_memories": total_memories,
"memory_types": memory_types,
"oldest_memory": min(timestamps) if timestamps else None,
"newest_memory": max(timestamps) if timestamps else None,
"collection_name": self.collection.name
}
return stats
except Exception as e:
return {"error": str(e)}
class ContextManager:
"""
Manages the current context and working memory for agent operations.
Maintains the immediate context needed for coherent task execution.
"""
def __init__(self, memory_system: MemorySystem):
"""
Initialize context manager with reference to long-term memory.
Args:
memory_system: The long-term memory system for context retrieval
"""
self.memory_system = memory_system
self.current_context = {
"goal": "",
"progress_summary": "",
"key_findings": [],
"active_hypotheses": [],
"next_priorities": []
}
self.context_history = []
def update_context(self, new_information: Dict[str, Any]):
"""
Update the current working context with new information.
This maintains coherent context throughout the research process.
Args:
new_information: Dictionary containing new context information
"""
print("Updating research context...")
# Store previous context in history
self.context_history.append(self.current_context.copy())
# Update current context
for key, value in new_information.items():
if key in self.current_context:
if isinstance(self.current_context[key], list):
if isinstance(value, list):
self.current_context[key].extend(value)
else:
self.current_context[key].append(value)
else:
self.current_context[key] = value
# Trim context to prevent excessive growth
self._trim_context()
print("Context updated successfully")
def get_current_context(self) -> str:
"""
Get formatted current context for LLM consumption.
Returns:
Formatted context string
"""
context_parts = []
if self.current_context["goal"]:
context_parts.append(f"CURRENT GOAL: {self.current_context['goal']}")
if self.current_context["progress_summary"]:
context_parts.append(f"PROGRESS: {self.current_context['progress_summary']}")
if self.current_context["key_findings"]:
findings = "\n - ".join(self.current_context["key_findings"][-5:]) # Last 5 findings
context_parts.append(f"KEY FINDINGS:\n - {findings}")
if self.current_context["active_hypotheses"]:
hypotheses = "\n - ".join(self.current_context["active_hypotheses"])
context_parts.append(f"ACTIVE HYPOTHESES:\n - {hypotheses}")
return "\n\n".join(context_parts) if context_parts else "No active context"
def _trim_context(self):
"""Trim context lists to prevent excessive memory usage."""
max_items = 10
for key in ["key_findings", "active_hypotheses", "next_priorities"]:
if isinstance(self.current_context[key], list):
self.current_context[key] = self.current_context[key][-max_items:]
Code Explanation:
- The MemorySystem uses ChromaDB for vector storage and SentenceTransformers for embeddings
- Vector embeddings enable semantic search - the agent can find relevant past research even when different terminology is used
- store_result saves task outcomes with rich metadata for future retrieval
- retrieve_relevant finds semantically similar past research with configurable filtering
- get_research_context creates formatted summaries of relevant background information
- The ContextManager maintains short-term working memory during task execution
- Context trimming prevents memory usage from growing unboundedly over time
- Statistics tracking enables monitoring of memory system performance and usage patterns
STEP 6: LLM INTEGRATION
MISTRAL LLM PROVIDER IMPLEMENTATION
# llm_provider.py
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from typing import List, Dict, Any, Optional
import time
import logging
class MistralProvider:
"""
Local Mistral LLM provider for agent operations.
Handles model loading, prompt formatting, generation, and optimization
for the specific requirements of agentic AI applications.
"""
def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.2"):
"""
Initialize the Mistral model provider.
Args:
model_name: Hugging Face model identifier for Mistral
"""
self.model_name = model_name
self.tokenizer = None
self.model = None
self.device = None
# Generation parameters optimized for agent tasks
self.default_params = {
"max_length": 2048,
"temperature": 0.7, # Balance between creativity and consistency
"top_p": 0.9, # Nucleus sampling for quality
"do_sample": True, # Enable sampling for varied responses
"repetition_penalty": 1.1 # Reduce repetitive output
}
# Initialize the model
self._load_model()
def _load_model(self):
"""
Load the Mistral model and tokenizer with optimization.
Handles device placement, memory optimization, and error recovery.
"""
print(f"Loading Mistral model: {self.model_name}")
print("This may take several minutes on first run...")
try:
# Load tokenizer first
print("Loading tokenizer...")
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
# Set pad token if not present (common issue with some models)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
# Determine optimal device placement
if torch.cuda.is_available():
self.device = "cuda"
print(f"CUDA available: {torch.cuda.get_device_name()}")
print(f"CUDA memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
self.device = "cpu"
print("Using CPU (GPU not available or not configured)")
# Load model with optimizations
print("Loading model...")
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
device_map="auto", # Automatically distribute across available devices
trust_remote_code=True # Required for some Mistral variants
)
# Set model to evaluation mode
self.model.eval()
# Display memory usage
if self.device == "cuda":
memory_gb = torch.cuda.memory_allocated() / 1e9
print(f"Model loaded successfully, using {memory_gb:.2f} GB GPU memory")
else:
print("Model loaded successfully on CPU")
# Test the model with a simple generation
self._test_model()
except Exception as e:
print(f"Error loading model: {e}")
print("Attempting fallback configuration...")
self._load_fallback_model()
def _load_fallback_model(self):
"""
Load model with reduced memory requirements if standard loading fails.
Uses 8-bit quantization to reduce memory usage.
"""
try:
print("Loading model with 8-bit quantization...")
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
torch_dtype=torch.float16,
device_map="auto",
load_in_8bit=True, # 8-bit quantization to reduce memory
trust_remote_code=True
)
self.model.eval()
print("Fallback model loaded successfully with quantization")
except Exception as e:
print(f"Fallback loading also failed: {e}")
raise RuntimeError("Could not load Mistral model with any configuration")
def _test_model(self):
"""
Test the model with a simple generation to ensure it's working properly.
"""
try:
test_prompt = "Hello, I am a research assistant."
result = self.generate_sync(test_prompt, max_length=50)
print("Model test successful")
except Exception as e:
print(f"Model test failed: {e}")
async def generate(self, prompt: str, **kwargs) -> str:
"""
Generate text using the Mistral model (async version).
Args:
prompt: Input prompt for generation
**kwargs: Override default generation parameters
Returns:
Generated text response
"""
# Run synchronous generation in thread pool to maintain async interface
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self.generate_sync, prompt, kwargs)
def generate_sync(self, prompt: str, **kwargs) -> str:
"""
Generate text using the Mistral model (synchronous version).
Args:
prompt: Input prompt for generation
**kwargs: Override default generation parameters
Returns:
Generated text response
"""
if self.model is None or self.tokenizer is None:
raise RuntimeError("Model not loaded. Call _load_model() first.")
# Merge provided parameters with defaults
generation_params = {**self.default_params, **kwargs}
# Format prompt for Mistral instruction format
formatted_prompt = self._format_prompt(prompt)
# Log generation start
start_time = time.time()
try:
# Tokenize input
inputs = self.tokenizer(
formatted_prompt,
return_tensors="pt",
truncation=True,
max_length=generation_params.get("max_input_length", 1500)
)
# Move inputs to appropriate device
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Generate response
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=generation_params["max_length"],
temperature=generation_params["temperature"],
top_p=generation_params["top_p"],
do_sample=generation_params["do_sample"],
repetition_penalty=generation_params["repetition_penalty"],
pad_token_id=self.tokenizer.eos_token_id,
eos_token_id=self.tokenizer.eos_token_id
)
# Decode the response
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract only the generated part (remove input prompt)
response = self._extract_response(response, formatted_prompt)
# Log generation statistics
generation_time = time.time() - start_time
tokens_generated = len(outputs[0]) - len(inputs["input_ids"][0])
print(f"Generated {tokens_generated} tokens in {generation_time:.2f}s")
return response.strip()
except Exception as e:
print(f"Generation error: {e}")
return f"Error generating response: {str(e)}"
def _format_prompt(self, prompt: str) -> str:
"""
Format prompt for Mistral's instruction format.
Mistral uses specific formatting for optimal performance.
Args:
prompt: Raw prompt text
Returns:
Properly formatted prompt for Mistral
"""
# Mistral instruction format
return f"<s>[INST] {prompt} [/INST]"
def _extract_response(self, full_text: str, formatted_prompt: str) -> str:
"""
Extract the generated response from the full model output.
Removes the input prompt and formatting tokens.
Args:
full_text: Complete model output including prompt
formatted_prompt: The original formatted prompt
Returns:
Clean generated response
"""
# Find the end of the instruction formatting
inst_end = "[/INST]"
if inst_end in full_text:
response = full_text.split(inst_end, 1)[-1]
else:
# Fallback: remove the original prompt
response = full_text.replace(formatted_prompt, "")
return response.strip()
def generate_structured(self, prompt: str, output_format: str = "json") -> Dict[str, Any]:
"""
Generate structured output (JSON, YAML, etc.) with validation.
Useful for agent planning and tool parameter generation.
Args:
prompt: Prompt requesting structured output
output_format: Expected output format ("json", "yaml")
Returns:
Parsed structured data
"""
# Add format instructions to prompt
format_instruction = f"\nPlease respond with valid {output_format.upper()} only:"
structured_prompt = prompt + format_instruction
# Generate response
response = self.generate_sync(structured_prompt, temperature=0.3) # Lower temperature for consistency
try:
if output_format.lower() == "json":
import json
return json.loads(response)
elif output_format.lower() == "yaml":
import yaml
return yaml.safe_load(response)
else:
return {"raw_response": response}
except Exception as e:
print(f"Error parsing {output_format}: {e}")
return {"error": str(e), "raw_response": response}
def get_model_info(self) -> Dict[str, Any]:
"""
Get information about the loaded model.
Returns:
Dictionary containing model information
"""
if self.model is None:
return {"status": "not_loaded"}
info = {
"model_name": self.model_name,
"device": self.device,
"model_type": type(self.model).__name__,
"parameter_count": sum(p.numel() for p in self.model.parameters()),
"memory_usage_gb": torch.cuda.memory_allocated() / 1e9 if self.device == "cuda" else "N/A"
}
return info
class LLMCache:
"""
Simple caching system for LLM responses to avoid redundant generations.
Particularly useful for planning and analysis tasks that might be repeated.
"""
def __init__(self, max_cache_size: int = 100):
"""
Initialize the cache with maximum size limit.
Args:
max_cache_size: Maximum number of cached responses
"""
self.cache = {}
self.max_size = max_cache_size
self.access_order = []
def get(self, prompt: str) -> Optional[str]:
"""
Get cached response for a prompt.
Args:
prompt: The prompt to look up
Returns:
Cached response or None if not found
"""
prompt_hash = hash(prompt)
if prompt_hash in self.cache:
# Update access order for LRU eviction
self.access_order.remove(prompt_hash)
self.access_order.append(prompt_hash)
return self.cache[prompt_hash]
return None
def put(self, prompt: str, response: str):
"""
Cache a response for a prompt.
Args:
prompt: The prompt
response: The generated response
"""
prompt_hash = hash(prompt)
# Evict oldest if cache is full
if len(self.cache) >= self.max_size and prompt_hash not in self.cache:
oldest = self.access_order.pop(0)
del self.cache[oldest]
# Add to cache
self.cache[prompt_hash] = response
if prompt_hash not in self.access_order:
self.access_order.append(prompt_hash)
Code Explanation:
- MistralProvider handles all aspects of local LLM integration including loading, optimization, and generation
- The class automatically handles device placement (GPU/CPU) and memory optimization using float16 and quantization
- Mistral-specific prompt formatting ensures optimal model performance with the [INST] tag structure
- Error handling and fallback loading with quantization ensures the system works on various hardware configurations
- generate_structured method enables reliable JSON/YAML output for agent planning and tool parameters
- Caching system prevents redundant generations for repeated prompts, improving efficiency
- Comprehensive logging and monitoring helps debug performance issues and track resource usage
- The async/sync interface allows integration with both synchronous and asynchronous codebases
STEP 7: MAIN APPLICATION
COMPLETE AGENT IMPLEMENTATION
# main_agent.py
import asyncio
from typing import List, Dict, Any, Optional
import logging
import uuid
import json
from datetime import datetime
import traceback
class ResearchAgentApp:
"""
Main application class that orchestrates all components of the research agent.
Provides high-level interfaces for research tasks and manages the complete
agent lifecycle from initialization through execution to cleanup.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the complete research agent application.
Args:
config: Optional configuration dictionary for customizing behavior
"""
# Set up logging
self._setup_logging()
self.logger = logging.getLogger(__name__)
# Load configuration
self.config = self._load_config(config)
# Initialize core components
self.logger.info("Initializing Research Agent components...")
try:
# Initialize LLM provider
self.logger.info("Loading Mistral LLM...")
self.llm = MistralProvider(self.config.get("model_name", "mistralai/Mistral-7B-Instruct-v0.2"))
# Initialize memory system
self.logger.info("Setting up memory system...")
self.memory = MemorySystem(self.config.get("memory_collection", "research_agent"))
# Initialize context manager
self.context_manager = ContextManager(self.memory)
# Initialize and register tools
self.logger.info("Setting up tool registry...")
self.tools = self._setup_tools()
# Initialize planning system
self.planner = PlanningSystem(self.llm)
# Initialize performance tracking
self.performance_stats = {
"total_research_sessions": 0,
"successful_completions": 0,
"total_tasks_executed": 0,
"average_completion_time": 0.0
}
self.logger.info("Research Agent initialization complete!")
except Exception as e:
self.logger.error(f"Failed to initialize Research Agent: {e}")
raise RuntimeError(f"Agent initialization failed: {e}")
def _setup_logging(self):
"""Configure logging for the application."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('research_agent.log'),
logging.StreamHandler()
]
)
def _load_config(self, config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""
Load configuration with defaults.
Args:
config: User-provided configuration
Returns:
Complete configuration dictionary with defaults
"""
default_config = {
"model_name": "mistralai/Mistral-7B-Instruct-v0.2",
"memory_collection": "research_agent",
"max_tasks_per_session": 20,
"search_api_key": "your_search_api_key_here",
"enable_caching": True,
"output_directory": "./research_outputs"
}
if config:
default_config.update(config)
return default_config
def _setup_tools(self) -> ToolRegistry:
"""
Initialize and register all available tools.
Returns:
Configured tool registry with all tools registered
"""
registry = ToolRegistry()
try:
# Register web search tool
if self.config.get("search_api_key"):
web_search = WebSearchTool(self.config["search_api_key"])
registry.register("web_search", web_search)
self.logger.info("Registered web_search tool")
else:
self.logger.warning("No search API key provided - web_search tool disabled")
# Register PDF analyzer
pdf_analyzer = PDFAnalyzerTool()
registry.register("pdf_analyzer", pdf_analyzer)
self.logger.info("Registered pdf_analyzer tool")
# Register data visualizer
data_visualizer = DataVisualizerTool()
registry.register("data_visualizer", data_visualizer)
self.logger.info("Registered data_visualizer tool")
# Log available tools
available_tools = registry.get_available_tools()
self.logger.info(f"Tool registry initialized with {len(available_tools)} tools: {available_tools}")
except Exception as e:
self.logger.error(f"Error setting up tools: {e}")
raise
return registry
async def research(self, goal: str, context: str = "") -> str:
"""
Execute a complete research session for the given goal.
This is the main entry point for research tasks.
Args:
goal: High-level research objective
context: Additional context or constraints for the research
Returns:
Comprehensive research report
"""
session_id = str(uuid.uuid4())
start_time = datetime.now()
self.logger.info(f"Starting research session {session_id}")
self.logger.info(f"Research goal: {goal}")
try:
# Update performance stats
self.performance_stats["total_research_sessions"] += 1
# Retrieve relevant past research for context
self.logger.info("Retrieving relevant context from memory...")
past_context = self.memory.get_research_context(goal)
# Update context manager
self.context_manager.update_context({
"goal": goal,
"session_id": session_id,
"past_context": past_context
})
# Initialize agent for this session
agent = ResearchAgent(self.llm, self.tools)
agent.state.current_goal = goal
# Add context to agent if provided
if context or past_context:
combined_context = f"{context}\n\n{past_context}".strip()
agent.state.context = combined_context
self.logger.info("Added context to agent state")
# Execute the research
self.logger.info("Beginning research execution...")
result = await self._execute_research_session(agent, goal, session_id)
# Calculate session duration
end_time = datetime.now()
session_duration = (end_time - start_time).total_seconds()
# Update performance statistics
self.performance_stats["successful_completions"] += 1
self.performance_stats["total_tasks_executed"] += len(agent.state.completed_tasks)
# Calculate average completion time
total_sessions = self.performance_stats["successful_completions"]
current_avg = self.performance_stats["average_completion_time"]
self.performance_stats["average_completion_time"] = (
(current_avg * (total_sessions - 1) + session_duration) / total_sessions
)
# Store final result in long-term memory
self.memory.store_result(
task_id=f"research_session_{session_id}",
content=result,
metadata={
"goal": goal,
"session_duration": session_duration,
"tasks_completed": len(agent.state.completed_tasks),
"type": "final_research_report"
}
)
# Update context manager with final results
self.context_manager.update_context({
"progress_summary": f"Completed research on: {goal}",
"key_findings": [f"Session {session_id} completed successfully"]
})
self.logger.info(f"Research session {session_id} completed successfully in {session_duration:.2f}s")
return result
except Exception as e:
self.logger.error(f"Research session {session_id} failed: {e}")
self.logger.error(f"Traceback: {traceback.format_exc()}")
# Store failure information
error_report = f"Research session failed: {str(e)}"
self.memory.store_result(
task_id=f"research_session_{session_id}_error",
content=error_report,
metadata={
"goal": goal,
"error": str(e),
"type": "research_error"
}
)
return f"Research failed: {str(e)}. Please check the logs for more details."
async def _execute_research_session(self, agent: ResearchAgent, goal: str, session_id: str) -> str:
"""
Execute the core research logic with comprehensive error handling.
Args:
agent: The research agent instance
goal: Research objective
session_id: Unique session identifier
Returns:
Research results
"""
try:
# PHASE 1: Strategic Planning
self.logger.info("Phase 1: Creating strategic research plan...")
tasks = await self.planner.create_plan(goal, agent.state.context)
agent.state.task_queue.extend(tasks)
if not agent.state.task_queue:
raise ValueError("No tasks generated for research goal")
self.logger.info(f"Generated {len(agent.state.task_queue)} tasks for execution")
# PHASE 2: Task Execution with Adaptive Replanning
self.logger.info("Phase 2: Executing research tasks...")
task_counter = 0
max_tasks = self.config.get("max_tasks_per_session", 20)
while agent.state.task_queue and task_counter < max_tasks:
task = agent.state.task_queue.pop(0)
task_counter += 1
self.logger.info(f"Executing task {task_counter}: {task.description}")
try:
# Execute the task
task.status = TaskStatus.IN_PROGRESS
result = await self.tools.execute_tool(task.tool_name, **task.parameters)
# Store task result
task.result = result
task.status = TaskStatus.COMPLETED
agent.state.completed_tasks.append(task)
# Store in memory for future reference
self.memory.store_result(
task_id=task.id,
content=str(result),
metadata={
"task_description": task.description,
"tool_used": task.tool_name,
"session_id": session_id,
"type": "task_result"
}
)
self.logger.info(f"Task {task_counter} completed successfully")
# Adaptive replanning every 3 tasks
if task_counter % 3 == 0 and agent.state.task_queue:
should_replan = await self.planner.should_replan(agent.state)
if should_replan:
self.logger.info("Initiating adaptive replanning...")
new_tasks = await self._generate_additional_tasks(agent, goal)
agent.state.task_queue.extend(new_tasks)
self.logger.info(f"Added {len(new_tasks)} new tasks from replanning")
except Exception as task_error:
self.logger.error(f"Task {task_counter} failed: {task_error}")
task.status = TaskStatus.FAILED
task.result = {"error": str(task_error)}
agent.state.completed_tasks.append(task)
# Continue with other tasks despite failure
continue
if task_counter >= max_tasks:
self.logger.warning(f"Reached maximum task limit ({max_tasks}) for session")
# PHASE 3: Results Synthesis
self.logger.info("Phase 3: Synthesizing research results...")
final_report = await self._synthesize_final_report(agent, goal)
return final_report
except Exception as e:
self.logger.error(f"Error in research execution: {e}")
raise
async def _generate_additional_tasks(self, agent: ResearchAgent, goal: str) -> List[Task]:
"""
Generate additional tasks based on current research progress.
Args:
agent: Current agent state
goal: Original research goal
Returns:
List of additional tasks to execute
"""
replan_prompt = f"""
Based on the research progress so far, generate additional tasks to improve the research quality.
ORIGINAL GOAL: {goal}
COMPLETED TASKS SUMMARY:
{self._format_completed_tasks(agent.state.completed_tasks[-5:])}
CURRENT FINDINGS:
{self._extract_key_findings(agent.state.completed_tasks)}
Generate 1-3 additional tasks that would:
1. Fill important gaps in the research
2. Validate key findings
3. Provide additional perspectives
4. Enhance the final analysis
Format as JSON:
{{
"additional_tasks": [
{{
"id": "additional_001",
"description": "Task description",
"tool_name": "tool_to_use",
"parameters": {{"param": "value"}}
}}
]
}}
Additional tasks:
"""
try:
response = await self.llm.generate(replan_prompt)
task_data = json.loads(response)
additional_tasks = []
for task_info in task_data.get("additional_tasks", []):
task = Task(
id=task_info["id"],
description=task_info["description"],
tool_name=task_info["tool_name"],
parameters=task_info["parameters"]
)
additional_tasks.append(task)
return additional_tasks
except Exception as e:
self.logger.error(f"Error generating additional tasks: {e}")
return []
async def _synthesize_final_report(self, agent: ResearchAgent, goal: str) -> str:
"""
Create a comprehensive final report from all research results.
Args:
agent: Agent with completed research
goal: Original research goal
Returns:
Formatted final research report
"""
synthesis_prompt = f"""
Create a comprehensive research report based on the completed analysis.
RESEARCH GOAL: {goal}
COMPLETED RESEARCH SUMMARY:
{self._format_all_results(agent.state.completed_tasks)}
Create a well-structured research report with:
1. EXECUTIVE SUMMARY: Key findings and conclusions (2-3 paragraphs)
2. METHODOLOGY: Research approach and data sources used
3. KEY FINDINGS: Detailed analysis of the most important discoveries
- Use specific data and evidence from the research
- Organize findings thematically
- Include relevant statistics or trends
4. ANALYSIS: Deeper interpretation of the findings
- Identify patterns and relationships
- Discuss implications and significance
- Address potential limitations
5. CONCLUSIONS: Summary of main insights and their implications
6. RECOMMENDATIONS: Actionable next steps or further research directions
Make the report professional, evidence-based, and actionable. Use clear headings and organize information logically.
RESEARCH REPORT:
"""
try:
final_report = await self.llm.generate(synthesis_prompt, max_length=3000)
# Add metadata header to report
report_header = f"""
RESEARCH REPORT
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Research Goal: {goal}
Tasks Completed: {len(agent.state.completed_tasks)}
Successful Tasks: {len([t for t in agent.state.completed_tasks if t.status == TaskStatus.COMPLETED])}
{'='*80}
"""
return report_header + final_report
except Exception as e:
self.logger.error(f"Error synthesizing final report: {e}")
return self._create_fallback_report(agent, goal)
def _create_fallback_report(self, agent: ResearchAgent, goal: str) -> str:
"""Create a basic report when synthesis fails."""
report = f"""
RESEARCH REPORT (Fallback)
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Research Goal: {goal}
SUMMARY:
Research was conducted using {len(agent.state.completed_tasks)} tasks.
{len([t for t in agent.state.completed_tasks if t.status == TaskStatus.COMPLETED])} tasks completed successfully.
TASK RESULTS:
"""
for i, task in enumerate(agent.state.completed_tasks, 1):
status_icon = "✓" if task.status == TaskStatus.COMPLETED else "✗"
report += f"\n{i}. {status_icon} {task.description}"
if task.result and task.status == TaskStatus.COMPLETED:
result_summary = str(task.result)[:200] + "..." if len(str(task.result)) > 200 else str(task.result)
report += f"\n Result: {result_summary}"
return report
def _format_completed_tasks(self, tasks: List[Task]) -> str:
"""Format completed tasks for LLM prompts."""
if not tasks:
return "No completed tasks"
formatted = []
for task in tasks:
status = "✓" if task.status == TaskStatus.COMPLETED else "✗"
result_preview = str(task.result)[:100] + "..." if len(str(task.result)) > 100 else str(task.result)
formatted.append(f"{status} {task.description}: {result_preview}")
return "\n".join(formatted)
def _extract_key_findings(self, tasks: List[Task]) -> str:
"""Extract key findings from completed tasks."""
findings = []
for task in tasks:
if task.status == TaskStatus.COMPLETED and task.result:
# Extract meaningful information from task results
result_str = str(task.result)
if len(result_str) > 50: # Only include substantial results
finding = result_str[:200] + "..." if len(result_str) > 200 else result_str
findings.append(f"- {finding}")
return "\n".join(findings) if findings else "No significant findings extracted"
def _format_all_results(self, tasks: List[Task]) -> str:
"""Format all task results for final synthesis."""
if not tasks:
return "No tasks completed"
formatted = []
for i, task in enumerate(tasks, 1):
formatted.append(f"\nTASK {i}: {task.description}")
formatted.append(f"Status: {task.status.value}")
if task.result:
result_str = str(task.result)
formatted.append(f"Result: {result_str}")
formatted.append("-" * 40)
return "\n".join(formatted)
async def interactive_mode(self):
"""
Run the agent in interactive mode for user interaction.
Provides a command-line interface for research requests.
"""
print("\n" + "="*60)
print("🤖 RESEARCH ASSISTANT AGENT - Interactive Mode")
print("="*60)
print("Ask me to research any topic and I'll provide comprehensive analysis!")
print("\nCommands:")
print(" • Type any research question or goal")
print(" • 'stats' - Show performance statistics")
print(" • 'memory' - Show memory system status")
print(" • 'help' - Show detailed help")
print(" • 'quit' - Exit the application")
print("-"*60)
while True:
try:
user_input = input("\n📝 Research Goal: ").strip()
if user_input.lower() == 'quit':
print("👋 Thank you for using Research Assistant Agent!")
break
elif user_input.lower() == 'help':
self._show_detailed_help()
continue
elif user_input.lower() == 'stats':
self._show_performance_stats()
continue
elif user_input.lower() == 'memory':
self._show_memory_stats()
continue
elif not user_input:
print("Please enter a research goal or command.")
continue
print("\n🔍 Starting research...")
print("This may take several minutes depending on the complexity...")
# Execute research
result = await self.research(user_input)
print("\n📊 Research Complete!")
print("="*80)
print(result)
print("="*80)
except KeyboardInterrupt:
print("\n👋 Research interrupted. Goodbye!")
break
except Exception as e:
self.logger.error(f"Interactive mode error: {e}")
print(f"❌ Error: {str(e)}")
print("Please try again or type 'quit' to exit.")
def _show_detailed_help(self):
"""Show detailed help information."""
help_text = """
RESEARCH ASSISTANT AGENT - HELP
WHAT I CAN DO:
• Conduct comprehensive research on any topic
• Search the web for current information
• Analyze documents and research papers
• Create data visualizations and charts
• Synthesize findings into detailed reports
• Learn from past research to improve future work
EXAMPLE RESEARCH GOALS:
• "Analyze the current state of renewable energy adoption in Europe"
• "Compare electric vehicle market trends across different countries"
• "Research the latest developments in artificial intelligence"
• "Investigate the economic impact of remote work policies"
• "Study consumer preferences in sustainable packaging"
HOW IT WORKS:
1. You provide a research goal
2. I create a strategic research plan
3. I execute tasks using various tools (web search, document analysis, etc.)
4. I adapt the plan based on findings
5. I synthesize everything into a comprehensive report
TIPS FOR BETTER RESULTS:
• Be specific about what aspects interest you most
• Mention if you want focus on particular regions, time periods, or perspectives
• Ask follow-up questions to dive deeper into specific findings
• Use 'stats' to see how much research I've conducted
COMMANDS:
• Type any research question
• 'stats' - Performance statistics
• 'memory' - Memory system status
• 'help' - This help message
• 'quit' - Exit application
"""
print(help_text)
def _show_performance_stats(self):
"""Display performance statistics."""
stats = self.performance_stats
memory_stats = self.memory.get_memory_stats()
print("\n📈 PERFORMANCE STATISTICS")
print("-" * 40)
print(f"Total Research Sessions: {stats['total_research_sessions']}")
print(f"Successful Completions: {stats['successful_completions']}")
print(f"Success Rate: {stats['successful_completions']/max(stats['total_research_sessions'], 1)*100:.1f}%")
print(f"Total Tasks Executed: {stats['total_tasks_executed']}")
print(f"Average Completion Time: {stats['average_completion_time']:.1f} seconds")
print(f"Memories Stored: {memory_stats.get('total_memories', 0)}")
# Tool execution stats
tool_history = self.tools.get_execution_history()
if tool_history:
successful_tools = len([h for h in tool_history if h['status'] == 'success'])
print(f"Tool Executions: {len(tool_history)} (Success: {successful_tools})")
def _show_memory_stats(self):
"""Display memory system statistics."""
stats = self.memory.get_memory_stats()
print("\n🧠 MEMORY SYSTEM STATUS")
print("-" * 40)
print(f"Total Memories: {stats.get('total_memories', 0)}")
print(f"Collection: {stats.get('collection_name', 'Unknown')}")
if stats.get('memory_types'):
print("Memory Types:")
for mem_type, count in stats['memory_types'].items():
print(f" • {mem_type}: {count}")
if stats.get('oldest_memory'):
print(f"Oldest Memory: {stats['oldest_memory'][:10]}")
if stats.get('newest_memory'):
print(f"Newest Memory: {stats['newest_memory'][:10]}")
# Example usage and testing
async def main():
"""
Main function demonstrating how to use the Research Agent.
Provides examples of both programmatic and interactive usage.
"""
try:
print("Initializing Research Agent...")
# Configure the agent (replace with your actual API keys)
config = {
"search_api_key": "your_search_api_key_here", # Replace with real API key
"max_tasks_per_session": 15,
"enable_caching": True
}
# Initialize the agent
agent = ResearchAgentApp(config)
# Example 1: Programmatic research
print("\n" + "="*60)
print("EXAMPLE: Programmatic Research")
print("="*60)
research_goal = "Analyze the impact of artificial intelligence on job markets in 2024"
result = await agent.research(research_goal)
print("\nRESEARCH RESULT:")
print("-" * 40)
print(result)
# Example 2: Interactive mode (uncomment to try)
# print("\nSwitching to interactive mode...")
# await agent.interactive_mode()
except Exception as e:
print(f"Error in main: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())
Code Explanation:
- ResearchAgentApp is the main orchestrator that combines all components into a unified system
- The initialization process loads and configures the LLM, memory system, tools, and planner
- Configuration management allows customization of model, API keys, and operational parameters
- The research method implements a complete research workflow with error handling and performance tracking
- Adaptive replanning allows the agent to adjust its approach based on intermediate findings
- Comprehensive logging and statistics tracking enable monitoring and debugging
- Interactive mode provides a user-friendly interface for research requests
- Fallback mechanisms ensure the system continues operating even when components fail
- Performance statistics help optimize the agent's behavior over time
- The modular design makes it easy to extend with new capabilities and tools
STEP 8: ADVANCED FEATURES
SELF-REFLECTION AND IMPROVEMENT SYSTEM
# reflection.py
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import json
class ReflectionSystem:
"""
Advanced self-reflection system that enables the agent to analyze
its own performance and continuously improve its research capabilities.
This is a key component of truly agentic behavior.
"""
def __init__(self, llm_provider, memory_system):
"""
Initialize the reflection system.
Args:
llm_provider: LLM for generating reflections
memory_system: Access to historical performance data
"""
self.llm = llm_provider
self.memory = memory_system
self.reflection_history = []
self.improvement_metrics = {
"research_quality_trend": [],
"efficiency_trend": [],
"user_satisfaction_trend": []
}
async def reflect_on_performance(self, completed_tasks: List[Task],
original_goal: str,
final_result: str,
session_duration: float) -> Dict[str, Any]:
"""
Conduct comprehensive performance analysis of a research session.
This enables the agent to learn from experience and improve future performance.
Args:
completed_tasks: All tasks executed in the session
original_goal: The original research objective
final_result: Final research report generated
session_duration: Time taken to complete the research
Returns:
Detailed reflection analysis with improvement recommendations
"""
print("Conducting performance reflection...")
# Analyze task execution patterns
task_analysis = self._analyze_task_execution(completed_tasks)
# Evaluate research quality
quality_assessment = await self._assess_research_quality(
original_goal, final_result, completed_tasks
)
# Analyze efficiency metrics
efficiency_analysis = self._analyze_efficiency(completed_tasks, session_duration)
# Generate improvement recommendations
recommendations = await self._generate_improvement_recommendations(
task_analysis, quality_assessment, efficiency_analysis, original_goal
)
# Create comprehensive reflection
reflection = {
"timestamp": datetime.now().isoformat(),
"session_summary": {
"goal": original_goal,
"duration_seconds": session_duration,
"tasks_completed": len(completed_tasks),
"success_rate": task_analysis["success_rate"]
},
"task_analysis": task_analysis,
"quality_assessment": quality_assessment,
"efficiency_analysis": efficiency_analysis,
"recommendations": recommendations,
"overall_score": self._calculate_overall_score(quality_assessment, efficiency_analysis)
}
# Store reflection in history
self.reflection_history.append(reflection)
# Update improvement metrics
self._update_improvement_trends(reflection)
print(f"Reflection complete. Overall score: {reflection['overall_score']:.2f}/10")
return reflection
def _analyze_task_execution(self, tasks: List[Task]) -> Dict[str, Any]:
"""
Analyze patterns in task execution to identify strengths and weaknesses.
Args:
tasks: List of executed tasks
Returns:
Dictionary containing task execution analysis
"""
if not tasks:
return {"error": "No tasks to analyze"}
# Calculate basic metrics
total_tasks = len(tasks)
successful_tasks = len([t for t in tasks if t.status == TaskStatus.COMPLETED])
failed_tasks = len([t for t in tasks if t.status == TaskStatus.FAILED])
# Analyze tool usage patterns
tool_usage = {}
tool_success_rates = {}
for task in tasks:
tool_name = task.tool_name
tool_usage[tool_name] = tool_usage.get(tool_name, 0) + 1
if tool_name not in tool_success_rates:
tool_success_rates[tool_name] = {"successful": 0, "total": 0}
tool_success_rates[tool_name]["total"] += 1
if task.status == TaskStatus.COMPLETED:
tool_success_rates[tool_name]["successful"] += 1
# Calculate success rates per tool
for tool in tool_success_rates:
tool_data = tool_success_rates[tool]
tool_data["success_rate"] = tool_data["successful"] / tool_data["total"]
# Identify task sequence patterns
task_sequence = [task.tool_name for task in tasks]
sequence_effectiveness = self._analyze_task_sequence(tasks)
return {
"total_tasks": total_tasks,
"successful_tasks": successful_tasks,
"failed_tasks": failed_tasks,
"success_rate": successful_tasks / total_tasks,
"tool_usage": tool_usage,
"tool_success_rates": tool_success_rates,
"task_sequence": task_sequence,
"sequence_effectiveness": sequence_effectiveness
}
def _analyze_task_sequence(self, tasks: List[Task]) -> Dict[str, Any]:
"""
Analyze the effectiveness of task sequencing.
Good sequencing can significantly improve research quality.
"""
if len(tasks) < 2:
return {"insufficient_data": True}
# Analyze common beneficial patterns
beneficial_patterns = [
["web_search", "pdf_analyzer"], # Search then deep dive
["web_search", "data_visualizer"], # Search then visualize
["pdf_analyzer", "synthesis"] # Analyze then synthesize
]
sequence = [task.tool_name for task in tasks]
pattern_matches = 0
for i in range(len(sequence) - 1):
current_pair = [sequence[i], sequence[i + 1]]
if current_pair in beneficial_patterns:
pattern_matches += 1
# Calculate sequence coherence score
coherence_score = pattern_matches / max(len(sequence) - 1, 1)
return {
"pattern_matches": pattern_matches,
"coherence_score": coherence_score,
"sequence_length": len(sequence),
"follows_best_practices": coherence_score > 0.3
}
async def _assess_research_quality(self, goal: str, result: str, tasks: List[Task]) -> Dict[str, Any]:
"""
Use the LLM to assess the quality of research conducted.
This provides qualitative analysis of research effectiveness.
"""
quality_prompt = f"""
Assess the quality of this research session on a scale of 1-10.
RESEARCH GOAL: {goal}
RESEARCH APPROACH:
{self._format_research_approach
async def _assess_research_quality(self, goal: str, result: str, tasks: List[Task]) -> Dict[str, Any]:
"""
Use the LLM to assess the quality of research conducted.
This provides qualitative analysis of research effectiveness.
"""
quality_prompt = f"""
Assess the quality of this research session on a scale of 1-10.
RESEARCH GOAL: {goal}
RESEARCH APPROACH:
{self._format_research_approach(tasks)}
FINAL RESULT LENGTH: {len(result)} characters
EVALUATION CRITERIA:
1. Completeness: Does the research address all aspects of the goal?
2. Accuracy: Are the findings well-supported and credible?
3. Depth: Does the analysis go beyond surface-level information?
4. Coherence: Is the research logically structured and well-organized?
5. Actionability: Are the conclusions useful and practical?
Rate each criterion (1-10) and provide overall assessment:
{{
"completeness_score": 8,
"accuracy_score": 7,
"depth_score": 6,
"coherence_score": 9,
"actionability_score": 7,
"overall_quality_score": 7.4,
"strengths": ["strength1", "strength2"],
"weaknesses": ["weakness1", "weakness2"],
"improvement_areas": ["area1", "area2"]
}}
Assessment:
"""
try:
response = await self.llm.generate(quality_prompt, temperature=0.3)
assessment = json.loads(response)
# Validate and ensure all required fields
required_fields = ["completeness_score", "accuracy_score", "depth_score",
"coherence_score", "actionability_score", "overall_quality_score"]
for field in required_fields:
if field not in assessment:
assessment[field] = 5.0 # Default middle score
return assessment
except Exception as e:
print(f"Error in quality assessment: {e}")
return {
"overall_quality_score": 5.0,
"error": str(e),
"strengths": [],
"weaknesses": ["Assessment failed"],
"improvement_areas": ["Fix quality assessment system"]
}
def _format_research_approach(self, tasks: List[Task]) -> str:
"""Format the research approach for quality assessment."""
if not tasks:
return "No tasks executed"
approach_summary = []
for i, task in enumerate(tasks, 1):
status_icon = "✓" if task.status == TaskStatus.COMPLETED else "✗"
approach_summary.append(f"{i}. {status_icon} {task.description} (using {task.tool_name})")
return "\n".join(approach_summary)
def _analyze_efficiency(self, tasks: List[Task], session_duration: float) -> Dict[str, Any]:
"""
Analyze the efficiency of the research session.
Efficiency metrics help optimize future research approaches.
"""
if not tasks:
return {"error": "No tasks to analyze"}
# Calculate time per task (estimated)
avg_time_per_task = session_duration / len(tasks)
# Analyze tool efficiency
tool_usage = {}
for task in tasks:
tool_name = task.tool_name
if tool_name not in tool_usage:
tool_usage[tool_name] = {"count": 0, "success_count": 0}
tool_usage[tool_name]["count"] += 1
if task.status == TaskStatus.COMPLETED:
tool_usage[tool_name]["success_count"] += 1
# Calculate efficiency score based on success rate and time
total_tasks = len(tasks)
successful_tasks = len([t for t in tasks if t.status == TaskStatus.COMPLETED])
success_rate = successful_tasks / total_tasks
# Efficiency score considers both success rate and reasonable time usage
time_efficiency = min(1.0, 300 / max(avg_time_per_task, 1)) # 5 minutes is ideal per task
overall_efficiency = (success_rate * 0.7) + (time_efficiency * 0.3)
return {
"session_duration": session_duration,
"total_tasks": total_tasks,
"successful_tasks": successful_tasks,
"avg_time_per_task": avg_time_per_task,
"success_rate": success_rate,
"time_efficiency": time_efficiency,
"overall_efficiency": overall_efficiency,
"tool_usage": tool_usage
}
async def _generate_improvement_recommendations(self, task_analysis: Dict,
quality_assessment: Dict,
efficiency_analysis: Dict,
goal: str) -> List[str]:
"""
Generate specific recommendations for improving future research sessions.
These recommendations enable continuous learning and adaptation.
"""
recommendations = []
# Analyze success rate
success_rate = task_analysis.get("success_rate", 0)
if success_rate < 0.8:
recommendations.append(
f"Improve task success rate (currently {success_rate:.1%}). "
"Consider adding error recovery mechanisms or tool validation."
)
# Analyze tool usage patterns
tool_success_rates = task_analysis.get("tool_success_rates", {})
for tool, data in tool_success_rates.items():
if data["success_rate"] < 0.7 and data["total"] > 1:
recommendations.append(
f"Tool '{tool}' has low success rate ({data['success_rate']:.1%}). "
"Review tool implementation or usage patterns."
)
# Analyze research quality
quality_score = quality_assessment.get("overall_quality_score", 5)
if quality_score < 7:
recommendations.append(
f"Research quality below target (score: {quality_score}/10). "
"Consider adding more diverse sources or deeper analysis steps."
)
# Analyze efficiency
efficiency = efficiency_analysis.get("overall_efficiency", 0.5)
if efficiency < 0.6:
recommendations.append(
f"Research efficiency could be improved (score: {efficiency:.1%}). "
"Optimize task sequencing and reduce redundant operations."
)
# Analyze task sequence effectiveness
sequence_data = task_analysis.get("sequence_effectiveness", {})
if not sequence_data.get("follows_best_practices", False):
recommendations.append(
"Task sequencing could be improved. Consider starting with broad searches "
"before diving into specific analysis, and end with synthesis."
)
# Generate LLM-based recommendations
llm_recommendations = await self._generate_llm_recommendations(
task_analysis, quality_assessment, efficiency_analysis, goal
)
recommendations.extend(llm_recommendations)
return recommendations[:10] # Limit to top 10 recommendations
async def _generate_llm_recommendations(self, task_analysis: Dict,
quality_assessment: Dict,
efficiency_analysis: Dict,
goal: str) -> List[str]:
"""Generate LLM-based improvement recommendations."""
recommendation_prompt = f"""
Based on this research session analysis, provide 3-5 specific improvement recommendations.
RESEARCH GOAL: {goal}
SUCCESS RATE: {task_analysis.get('success_rate', 0):.1%}
QUALITY SCORE: {quality_assessment.get('overall_quality_score', 5)}/10
EFFICIENCY SCORE: {efficiency_analysis.get('overall_efficiency', 0.5):.1%}
IDENTIFIED ISSUES:
- Quality weaknesses: {quality_assessment.get('weaknesses', [])}
- Efficiency concerns: Low success rate or long duration
- Tool performance: Some tools may have failed frequently
Provide specific, actionable recommendations for improvement:
1. [First recommendation]
2. [Second recommendation]
3. [Third recommendation]
4. [Fourth recommendation]
5. [Fifth recommendation]
Focus on concrete steps that can be implemented to improve future research sessions.
Recommendations:
"""
try:
response = await self.llm.generate(recommendation_prompt, temperature=0.5)
# Extract numbered recommendations
lines = response.strip().split('\n')
recommendations = []
for line in lines:
line = line.strip()
if line and (line[0].isdigit() or line.startswith('-')):
# Clean up the recommendation text
clean_rec = line.split('.', 1)[-1].strip()
if clean_rec:
recommendations.append(clean_rec)
return recommendations[:5] # Return max 5 recommendations
except Exception as e:
print(f"Error generating LLM recommendations: {e}")
return ["Review and optimize research methodology based on session performance"]
def _calculate_overall_score(self, quality_assessment: Dict, efficiency_analysis: Dict) -> float:
"""Calculate overall performance score combining quality and efficiency."""
quality_score = quality_assessment.get("overall_quality_score", 5) / 10 # Normalize to 0-1
efficiency_score = efficiency_analysis.get("overall_efficiency", 0.5)
# Weight quality higher than efficiency (70/30 split)
overall_score = (quality_score * 0.7) + (efficiency_score * 0.3)
return round(overall_score * 10, 2) # Convert back to 0-10 scale
def _update_improvement_trends(self, reflection: Dict):
"""Update improvement metrics to track progress over time."""
quality_score = reflection["quality_assessment"].get("overall_quality_score", 5)
efficiency_score = reflection["efficiency_analysis"].get("overall_efficiency", 0.5) * 10
overall_score = reflection["overall_score"]
# Add to trends (keep only last 20 sessions)
self.improvement_metrics["research_quality_trend"].append(quality_score)
self.improvement_metrics["efficiency_trend"].append(efficiency_score)
# Trim to keep only recent history
for trend in self.improvement_metrics.values():
if len(trend) > 20:
trend.pop(0)
def get_improvement_summary(self) -> Dict[str, Any]:
"""Get a summary of improvement trends over time."""
if not self.reflection_history:
return {"message": "No reflection history available"}
recent_reflections = self.reflection_history[-10:] # Last 10 sessions
# Calculate trends
quality_trend = self.improvement_metrics["research_quality_trend"]
efficiency_trend = self.improvement_metrics["efficiency_trend"]
def calculate_trend(values):
if len(values) < 3:
return "insufficient_data"
recent_avg = sum(values[-3:]) / 3
earlier_avg = sum(values[-6:-3]) / 3 if len(values) >= 6 else sum(values[:-3]) / len(values[:-3])
if recent_avg > earlier_avg * 1.1:
return "improving"
elif recent_avg < earlier_avg * 0.9:
return "declining"
else:
return "stable"
return {
"total_sessions_analyzed": len(self.reflection_history),
"quality_trend": calculate_trend(quality_trend),
"efficiency_trend": calculate_trend(efficiency_trend),
"average_quality_score": sum(quality_trend) / len(quality_trend) if quality_trend else 0,
"average_efficiency_score": sum(efficiency_trend) / len(efficiency_trend) if efficiency_trend else 0,
"most_common_recommendations": self._get_common_recommendations(),
"improvement_areas": self._identify_improvement_areas()
}
def _get_common_recommendations(self) -> List[str]:
"""Identify the most common recommendations across sessions."""
all_recommendations = []
for reflection in self.reflection_history:
all_recommendations.extend(reflection.get("recommendations", []))
# Simple frequency analysis (in production, use more sophisticated NLP)
recommendation_keywords = {}
for rec in all_recommendations:
words = rec.lower().split()
for word in words:
if len(word) > 4: # Only meaningful words
recommendation_keywords[word] = recommendation_keywords.get(word, 0) + 1
# Return top keywords that appear in recommendations
sorted_keywords = sorted(recommendation_keywords.items(), key=lambda x: x[1], reverse=True)
return [f"Focus on {word}" for word, count in sorted_keywords[:5] if count > 1]
def _identify_improvement_areas(self) -> List[str]:
"""Identify persistent improvement areas across sessions."""
if len(self.reflection_history) < 3:
return ["Need more sessions to identify patterns"]
areas = []
# Check for persistent quality issues
recent_quality = [r["quality_assessment"].get("overall_quality_score", 5)
for r in self.reflection_history[-5:]]
if sum(recent_quality) / len(recent_quality) < 7:
areas.append("Research quality consistency")
# Check for persistent efficiency issues
recent_efficiency = [r["efficiency_analysis"].get("overall_efficiency", 0.5)
for r in self.reflection_history[-5:]]
if sum(recent_efficiency) / len(recent_efficiency) < 0.7:
areas.append("Task execution efficiency")
# Check for recurring tool failures
tool_failures = {}
for reflection in self.reflection_history[-5:]:
task_analysis = reflection.get("task_analysis", {})
tool_success_rates = task_analysis.get("tool_success_rates", {})
for tool, data in tool_success_rates.items():
if data["success_rate"] < 0.8:
tool_failures[tool] = tool_failures.get(tool, 0) + 1
for tool, failure_count in tool_failures.items():
if failure_count >= 3:
areas.append(f"Tool reliability: {tool}")
return areas if areas else ["Overall performance is good"]
PERFORMANCE MONITORING SYSTEM
# monitoring.py
import time
import psutil
import threading
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import json
@dataclass
class PerformanceMetrics:
"""
Data class for storing comprehensive performance metrics.
Tracks both system resources and agent-specific performance indicators.
"""
session_id: str
start_time: float
end_time: Optional[float] = None
tasks_completed: int = 0
tasks_failed: int = 0
tools_used: List[str] = field(default_factory=list)
memory_usage_mb: float = 0.0
cpu_usage_percent: float = 0.0
llm_generation_time: float = 0.0
tool_execution_time: float = 0.0
@property
def duration(self) -> float:
"""Calculate session duration in seconds."""
if self.end_time is None:
return time.time() - self.start_time
return self.end_time - self.start_time
@property
def success_rate(self) -> float:
"""Calculate task success rate."""
total_tasks = self.tasks_completed + self.tasks_failed
return self.tasks_completed / total_tasks if total_tasks > 0 else 0.0
def to_dict(self) -> Dict[str, Any]:
"""Convert metrics to dictionary for storage/analysis."""
return {
"session_id": self.session_id,
"start_time": self.start_time,
"end_time": self.end_time,
"duration": self.duration,
"tasks_completed": self.tasks_completed,
"tasks_failed": self.tasks_failed,
"success_rate": self.success_rate,
"tools_used": self.tools_used,
"memory_usage_mb": self.memory_usage_mb,
"cpu_usage_percent": self.cpu_usage_percent,
"llm_generation_time": self.llm_generation_time,
"tool_execution_time": self.tool_execution_time
}
class PerformanceMonitor:
"""
Comprehensive performance monitoring system for the research agent.
Tracks system resources, execution times, and agent-specific metrics
to enable optimization and debugging.
"""
def __init__(self, monitoring_interval: float = 5.0):
"""
Initialize the performance monitor.
Args:
monitoring_interval: Seconds between system resource measurements
"""
self.monitoring_interval = monitoring_interval
self.active_sessions = {}
self.completed_sessions = []
self.system_metrics_history = []
self.monitoring_thread = None
self.monitoring_active = False
# Performance thresholds for alerts
self.thresholds = {
"max_memory_mb": 8000, # 8GB memory threshold
"max_cpu_percent": 80, # 80% CPU threshold
"max_session_duration": 1800, # 30 minutes max session
"min_success_rate": 0.7 # 70% minimum success rate
}
def start_session(self, session_id: str) -> PerformanceMetrics:
"""
Start monitoring a new research session.
Args:
session_id: Unique identifier for the session
Returns:
PerformanceMetrics object for tracking this session
"""
print(f"Starting performance monitoring for session: {session_id}")
metrics = PerformanceMetrics(
session_id=session_id,
start_time=time.time()
)
self.active_sessions[session_id] = metrics
# Start system monitoring if not already running
if not self.monitoring_active:
self._start_system_monitoring()
return metrics
def end_session(self, session_id: str) -> Optional[PerformanceMetrics]:
"""
End monitoring for a research session and finalize metrics.
Args:
session_id: Session identifier to end
Returns:
Final performance metrics for the session
"""
if session_id not in self.active_sessions:
print(f"Warning: Session {session_id} not found in active sessions")
return None
metrics = self.active_sessions[session_id]
metrics.end_time = time.time()
# Capture final system state
metrics.memory_usage_mb = psutil.virtual_memory().used / 1024 / 1024
metrics.cpu_usage_percent = psutil.cpu_percent()
# Move to completed sessions
self.completed_sessions.append(metrics)
del self.active_sessions[session_id]
# Stop system monitoring if no active sessions
if not self.active_sessions and self.monitoring_active:
self._stop_system_monitoring()
# Check for performance alerts
self._check_performance_alerts(metrics)
print(f"Session {session_id} completed in {metrics.duration:.2f}s")
print(f"Success rate: {metrics.success_rate:.1%}")
return metrics
def record_task_completion(self, session_id: str, success: bool, tool_name: str):
"""
Record the completion of a task within a session.
Args:
session_id: Session identifier
success: Whether the task completed successfully
tool_name: Name of the tool that was used
"""
if session_id not in self.active_sessions:
return
metrics = self.active_sessions[session_id]
if success:
metrics.tasks_completed += 1
else:
metrics.tasks_failed += 1
if tool_name not in metrics.tools_used:
metrics.tools_used.append(tool_name)
def record_llm_generation(self, session_id: str, generation_time: float):
"""
Record LLM generation time for performance analysis.
Args:
session_id: Session identifier
generation_time: Time taken for LLM generation in seconds
"""
if session_id in self.active_sessions:
self.active_sessions[session_id].llm_generation_time += generation_time
def record_tool_execution(self, session_id: str, execution_time: float):
"""
Record tool execution time for performance analysis.
Args:
session_id: Session identifier
execution_time: Time taken for tool execution in seconds
"""
if session_id in self.active_sessions:
self.active_sessions[session_id].tool_execution_time += execution_time
def _start_system_monitoring(self):
"""Start background thread for system resource monitoring."""
self.monitoring_active = True
self.monitoring_thread = threading.Thread(target=self._monitor_system_resources)
self.monitoring_thread.daemon = True
self.monitoring_thread.start()
print("System resource monitoring started")
def _stop_system_monitoring(self):
"""Stop background system resource monitoring."""
self.monitoring_active = False
if self.monitoring_thread:
self.monitoring_thread.join(timeout=1)
print("System resource monitoring stopped")
def _monitor_system_resources(self):
"""Background monitoring of system resources."""
while self.monitoring_active:
try:
# Collect system metrics
memory_info = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent(interval=1)
# Get GPU memory if available
gpu_memory_mb = 0
try:
import torch
if torch.cuda.is_available():
gpu_memory_mb = torch.cuda.memory_allocated() / 1024 / 1024
except:
pass
system_metrics = {
"timestamp": time.time(),
"memory_used_mb": memory_info.used / 1024 / 1024,
"memory_percent": memory_info.percent,
"cpu_percent": cpu_percent,
"gpu_memory_mb": gpu_memory_mb,
"active_sessions": len(self.active_sessions)
}
self.system_metrics_history.append(system_metrics)
# Keep only last hour of metrics
cutoff_time = time.time() - 3600
self.system_metrics_history = [
m for m in self.system_metrics_history
if m["timestamp"] > cutoff_time
]
# Check for system resource alerts
self._check_system_alerts(system_metrics)
except Exception as e:
print(f"Error in system monitoring: {e}")
time.sleep(self.monitoring_interval)
def _check_performance_alerts(self, metrics: PerformanceMetrics):
"""Check if session metrics exceed performance thresholds."""
alerts = []
if metrics.duration > self.thresholds["max_session_duration"]:
alerts.append(f"Session duration exceeded threshold: {metrics.duration:.0f}s")
if metrics.success_rate < self.thresholds["min_success_rate"]:
alerts.append(f"Success rate below threshold: {metrics.success_rate:.1%}")
if metrics.memory_usage_mb > self.thresholds["max_memory_mb"]:
alerts.append(f"Memory usage high: {metrics.memory_usage_mb:.0f}MB")
if alerts:
print("PERFORMANCE ALERTS:")
for alert in alerts:
print(f" • {alert}")
def _check_system_alerts(self, system_metrics: Dict[str, Any]):
"""Check if system metrics exceed thresholds."""
if system_metrics["memory_percent"] > 90:
print(f"WARNING: System memory usage at {system_metrics['memory_percent']:.1f}%")
if system_metrics["cpu_percent"] > self.thresholds["max_cpu_percent"]:
print(f"WARNING: CPU usage at {system_metrics['cpu_percent']:.1f}%")
def get_performance_summary(self, hours: int = 24) -> Dict[str, Any]:
"""
Get a comprehensive performance summary for the specified time period.
Args:
hours: Number of hours to include in the summary
Returns:
Dictionary containing performance summary
"""
cutoff_time = time.time() - (hours * 3600)
# Filter recent sessions
recent_sessions = [
s for s in self.completed_sessions
if s.start_time > cutoff_time
]
if not recent_sessions:
return {"message": f"No sessions in the last {hours} hours"}
# Calculate aggregate metrics
total_sessions = len(recent_sessions)
total_duration = sum(s.duration for s in recent_sessions)
total_tasks = sum(s.tasks_completed + s.tasks_failed for s in recent_sessions)
successful_tasks = sum(s.tasks_completed for s in recent_sessions)
# Calculate averages
avg_duration = total_duration / total_sessions
avg_success_rate = successful_tasks / total_tasks if total_tasks > 0 else 0
avg_memory_usage = sum(s.memory_usage_mb for s in recent_sessions) / total_sessions
# Tool usage analysis
all_tools = []
for session in recent_sessions:
all_tools.extend(session.tools_used)
tool_usage = {}
for tool in all_tools:
tool_usage[tool] = tool_usage.get(tool, 0) + 1
# System resource trends
recent_system_metrics = [
m for m in self.system_metrics_history
if m["timestamp"] > cutoff_time
]
if recent_system_metrics:
avg_cpu = sum(m["cpu_percent"] for m in recent_system_metrics) / len(recent_system_metrics)
peak_memory = max(m["memory_used_mb"] for m in recent_system_metrics)
else:
avg_cpu = 0
peak_memory = 0
return {
"time_period_hours": hours,
"total_sessions": total_sessions,
"total_duration_minutes": total_duration / 60,
"average_session_duration": avg_duration,
"total_tasks_executed": total_tasks,
"overall_success_rate": avg_success_rate,
"average_memory_usage_mb": avg_memory_usage,
"tool_usage_frequency": tool_usage,
"system_performance": {
"average_cpu_percent": avg_cpu,
"peak_memory_usage_mb": peak_memory,
"monitoring_data_points": len(recent_system_metrics)
},
"performance_trends": self._calculate_performance_trends(recent_sessions)
}
def _calculate_performance_trends(self, sessions: List[PerformanceMetrics]) -> Dict[str, str]:
"""Calculate performance trends over recent sessions."""
if len(sessions) < 4:
return {"trend_analysis": "Insufficient data for trend analysis"}
# Split sessions into two halves for comparison
mid_point = len(sessions) // 2
early_sessions = sessions[:mid_point]
recent_sessions = sessions[mid_point:]
# Calculate averages for each half
early_success_rate = sum(s.success_rate for s in early_sessions) / len(early_sessions)
recent_success_rate = sum(s.success_rate for s in recent_sessions) / len(recent_sessions)
early_duration = sum(s.duration for s in early_sessions) / len(early_sessions)
recent_duration = sum(s.duration for s in recent_sessions) / len(recent_sessions)
# Determine trends
success_trend = "improving" if recent_success_rate > early_success_rate * 1.05 else \
"declining" if recent_success_rate < early_success_rate * 0.95 else "stable"
duration_trend = "improving" if recent_duration < early_duration * 0.95 else \
"declining" if recent_duration > early_duration * 1.05 else "stable"
return {
"success_rate_trend": success_trend,
"duration_trend": duration_trend,
"early_success_rate": f"{early_success_rate:.1%}",
"recent_success_rate": f"{recent_success_rate:.1%}",
"early_avg_duration": f"{early_duration:.1f}s",
"recent_avg_duration": f"{recent_duration:.1f}s"
}
def export_metrics(self, filename: str = None) -> str:
"""
Export performance metrics to JSON file.
Args:
filename: Optional custom filename
Returns:
Path to the exported file
"""
if filename is None:
filename = f"performance_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
export_data = {
"export_timestamp": datetime.now().isoformat(),
"completed_sessions": [s.to_dict() for s in self.completed_sessions],
"system_metrics_history": self.system_metrics_history,
"performance_summary": self.get_performance_summary(24),
"thresholds": self.thresholds
}
try:
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
print(f"Performance metrics exported to: {filename}")
return filename
except Exception as e:
print(f"Error exporting metrics: {e}")
return ""
Code Explanation for Advanced Features:
- The ReflectionSystem enables the agent to analyze its own performance and continuously improve
- Quality assessment uses the LLM to evaluate research completeness, accuracy, and actionability
- Efficiency analysis tracks time usage, success rates, and resource optimization
- Improvement recommendations are generated both programmatically and using LLM analysis
- Performance trends are tracked over time to identify long-term improvement patterns
- The PerformanceMonitor provides comprehensive system and agent-specific monitoring
- Resource usage tracking helps optimize hardware requirements and identify bottlenecks
- Alert systems warn about performance issues before they become critical
- Metrics export enables detailed analysis and reporting for system optimization
STEP 9: DEPLOYMENT OPTIONS
API SERVER DEPLOYMENT
# api_server.py
from fastapi import FastAPI, BackgroundTasks, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import Dict, List, Any, Optional
import asyncio
import uuid
from datetime import datetime
import logging
# Pydantic models for API requests and responses
class ResearchRequest(BaseModel):
"""Model for research request data."""
goal: str = Field(..., description="Research objective or question", min_length=10)
context: str = Field("", description="Additional context for the research")
priority: int = Field(1, description="Priority level (1-5)", ge=1, le=5)
max_tasks: int = Field(15, description="Maximum number of tasks to execute", ge=1, le=50)
tools_to_use: List[str] = Field([], description="Specific tools to use (empty for auto-select)")
class ResearchResponse(BaseModel):
"""Model for research response data."""
task_id: str = Field(..., description="Unique task identifier")
status: str = Field(..., description="Current status of the research")
goal: str = Field(..., description="Research goal")
created_at: str = Field(..., description="Timestamp when research was started")
result: Optional[str] = Field(None, description="Research results (when completed)")
progress: Optional[Dict[str, Any]] = Field(None, description="Progress information")
error: Optional[str] = Field(None, description="Error message if failed")
class HealthResponse(BaseModel):
"""Model for health check responses."""
status: str
agent_status: str
uptime_seconds: float
total_requests: int
active_tasks: int
system_info: Dict[str, Any]
# Global variables for the API server
app = FastAPI(
title="Research Agent API",
description="RESTful API for the Agentic AI Research Assistant",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware for web applications
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify exact origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global state management
research_agent = None
active_tasks = {}
completed_tasks = {}
server_start_time = datetime.now()
request_counter = 0
class APIServer:
"""
FastAPI server wrapper for the Research Agent.
Provides RESTful endpoints for research requests and monitoring.
"""
def __init__(self, agent_config: Dict[str, Any] = None):
"""
Initialize the API server with a research agent instance.
Args:
agent_config: Configuration for the research agent
"""
global research_agent
self.logger = logging.getLogger(__name__)
try:
# Initialize the research agent
self.logger.info("Initializing Research Agent for API server...")
research_agent = ResearchAgentApp(agent_config or {})
self.logger.info("Research Agent initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize Research Agent: {e}")
raise RuntimeError(f"Could not start API server: {e}")
async def get_agent():
"""Dependency to get the research agent instance."""
if research_agent is None:
raise HTTPException(status_code=503, detail="Research Agent not initialized")
return research_agent
@app.on_event("startup")
async def startup_event():
"""Initialize the research agent when the server starts."""
global research_agent
print("Starting Research Agent API Server...")
try:
# You would initialize your agent configuration here
config = {
"search_api_key": "your_search_api_key_here",
"max_tasks_per_session": 20,
"enable_caching": True
}
server = APIServer(config)
print("✅ Research Agent API Server ready!")
except Exception as e:
print(f"❌ Failed to start server: {e}")
raise
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""
Health check endpoint to monitor server status.
Returns system information and agent status.
"""
global request_counter, active_tasks, server_start_time
uptime = (datetime.now() - server_start_time).total_seconds()
# Get system information
import psutil
system_info = {
"cpu_usage_percent": psutil.cpu_percent(),
"memory_usage_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage('/').percent
}
# Check if GPU is available
try:
import torch
if torch.cuda.is_available():
system_info["gpu_available"] = True
system_info["gpu_memory_gb"] = torch.cuda.memory_allocated() / 1e9
else:
system_info["gpu_available"] = False
except:
system_info["gpu_available"] = False
agent_status = "ready" if research_agent else "not_initialized"
return HealthResponse(
status="healthy",
agent_status=agent_status,
uptime_seconds=uptime,
total_requests=request_counter,
active_tasks=len(active_tasks),
system_info=system_info
)
@app.post("/research", response_model=ResearchResponse)
async def start_research(
request: ResearchRequest,
background_tasks: BackgroundTasks,
agent = Depends(get_agent)
):
"""
Start a new research task.
Research runs in the background and can be monitored via the status endpoint.
"""
global request_counter, active_tasks
task_id = str(uuid.uuid4())
request_counter += 1
# Store task information
task_info = {
"task_id": task_id,
"goal": request.goal,
"context": request.context,
"priority": request.priority,
"status": "started",
"created_at": datetime.now().isoformat(),
"progress": {"phase": "initialization", "tasks_completed": 0}
}
active_tasks[task_id] = task_info
# Start research in background
background_tasks.add_task(
execute_research_task,
task_id,
request.goal,
request.context,
request.max_tasks,
request.tools_to_use
)
return ResearchResponse(
task_id=task_id,
status="started",
goal=request.goal,
created_at=task_info["created_at"],
progress=task_info["progress"]
)
@app.get("/research/{task_id}", response_model=ResearchResponse)
async def get_research_status(task_id: str):
"""
Get the status of a research task.
Returns current progress, results if completed, or error information.
"""
# Check active tasks first
if task_id in active_tasks:
task_info = active_tasks[task_id]
return ResearchResponse(
task_id=task_id,
status=task_info["status"],
goal=task_info["goal"],
created_at=task_info["created_at"],
progress=task_info.get("progress")
)
# Check completed tasks
elif task_id in completed_tasks:
task_info = completed_tasks[task_id]
return ResearchResponse(
task_id=task_id,
status=task_info["status"],
goal=task_info["goal"],
created_at=task_info["created_at"],
result=task_info.get("result"),
error=task_info.get("error")
)
else:
raise HTTPException(status_code=404, detail="Task not found")
@app.get("/research", response_model=List[ResearchResponse])
async def list_research_tasks(limit: int = 20, status_filter: str = None):
"""
List recent research tasks with optional status filtering.
Args:
limit: Maximum number of tasks to return
status_filter: Filter by status (started, running, completed, failed)
"""
all_tasks = []
# Add active tasks
for task_info in active_tasks.values():
if status_filter is None or task_info["status"] == status_filter:
all_tasks.append(ResearchResponse(
task_id=task_info["task_id"],
status=task_info["status"],
goal=task_info["goal"],
created_at=task_info["created_at"],
progress=task_info.get("progress")
))
# Add completed tasks
for task_info in completed_tasks.values():
if status_filter is None or task_info["status"] == status_filter:
all_tasks.append(ResearchResponse(
task_id=task_info["task_id"],
status=task_info["status"],
goal=task_info["goal"],
created_at=task_info["created_at"],
result=task_info.get("result"),
error=task_info.get("error")
))
# Sort by creation time (newest first)
all_tasks.sort(key=lambda x: x.created_at, reverse=True)
return all_tasks[:limit]
@app.delete("/research/{task_id}")
async def cancel_research_task(task_id: str):
"""
Cancel an active research task.
Note: This is a simple implementation - in production you'd need
proper task cancellation mechanisms.
"""
if task_id not in active_tasks:
raise HTTPException(status_code=404, detail="Task not found or already completed")
task_info = active_tasks[task_id]
task_info["status"] = "cancelled"
task_info["error"] = "Task cancelled by user request"
# Move to completed tasks
completed_tasks[task_id] = task_info
del active_tasks[task_id]
return {"message": f"Task {task_id} cancelled successfully"}
@app.get("/stats")
async def get_server_stats():
"""Get comprehensive server statistics."""
uptime = (datetime.now() - server_start_time).total_seconds()
# Calculate status distribution
status_counts = {}
for task in active_tasks.values():
status = task["status"]
status_counts[status] = status_counts.get(status, 0) + 1
for task in completed_tasks.values():
status = task["status"]
status_counts[status] = status_counts.get(status, 0) + 1
# Calculate average completion time for completed tasks
completed_task_list = list(completed_tasks.values())
avg_completion_time = 0
if completed_task_list:
total_time = 0
count = 0
for task in completed_task_list:
if "completion_time" in task:
total_time += task["completion_time"]
count += 1
avg_completion_time = total_time / count if count > 0 else 0
return {
"server_uptime_seconds": uptime,
"total_requests": request_counter,
"active_tasks": len(active_tasks),
"completed_tasks": len(completed_tasks),
"status_distribution": status_counts,
"average_completion_time_seconds": avg_completion_time,
"agent_status": "ready" if research_agent else "not_initialized"
}
async def execute_research_task(task_id: str, goal: str, context: str,
max_tasks: int, tools_to_use: List[str]):
"""
Background task that executes the actual research.
Updates task status and stores results.
"""
global active_tasks, completed_tasks
start_time = datetime.now()
try:
# Update status to running
if task_id in active_tasks:
active_tasks[task_id]["status"] = "running"
active_tasks[task_id]["progress"] = {
"phase": "research_execution",
"tasks_completed": 0
}
# Execute the research using the global agent
result = await research_agent.research(goal, context)
# Calculate completion time
completion_time = (datetime.now() - start_time).total_seconds()
# Move to completed tasks
completed_task_info = {
"task_id": task_id,
"goal": goal,
"status": "completed",
"created_at": active_tasks[task_id]["created_at"],
"completed_at": datetime.now().isoformat(),
"completion_time": completion_time,
"result": result
}
completed_tasks[task_id] = completed_task_info
if task_id in active_tasks:
del active_tasks[task_id]
print(f"Research task {task_id} completed successfully")
except Exception as e:
# Handle research failure
error_message = str(e)
completion_time = (datetime.now() - start_time).total_seconds()
completed_task_info = {
"task_id": task_id,
"goal": goal,
"status": "failed",
"created_at": active_tasks[task_id]["created_at"] if task_id in active_tasks else start_time.isoformat(),
"completed_at": datetime.now().isoformat(),
"completion_time": completion_time,
"error": error_message
}
completed_tasks[task_id] = completed_task_info
if task_id in active_tasks:
del active_tasks[task_id]
print(f"Research task {task_id} failed: {error_message}")
# Server startup script
def run_server(host: str = "0.0.0.0", port: int = 8000, reload: bool = False):
"""
Run the API server.
Args:
host: Host address to bind to
port: Port number to use
reload: Enable auto-reload for development
"""
import uvicorn
print(f"Starting Research Agent API Server on {host}:{port}")
print(f"API Documentation: http://{host}:{port}/docs")
print(f"Alternative Docs: http://{host}:{port}/redoc")
uvicorn.run(
"api_server:app",
host=host,
port=port,
reload=reload,
log_level="info"
)
if __name__ == "__main__":
run_server(reload=True)
DOCKER DEPLOYMENT
# Dockerfile
FROM python:3.9-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
software-properties-common \
git \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements file
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create directories for outputs
RUN mkdir -p /app/charts /app/research_outputs
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=60s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run the application
CMD ["python", "api_server.py"]
# docker-compose.yml
version: '3.8'
services:
research-agent:
build: .
ports:
- "8000:8000"
environment:
- SEARCH_API_KEY=${SEARCH_API_KEY}
- MODEL_NAME=mistralai/Mistral-7B-Instruct-v0.2
- MAX_TASKS_PER_SESSION=20
volumes:
- ./research_outputs:/app/research_outputs
- ./charts:/app/charts
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
# Optional: Add Redis for caching (production)
redis:
image: redis:alpine
ports:
- "6379:6379"
restart: unless-stopped
# Optional: Add monitoring with Prometheus
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
restart: unless-stopped
# requirements.txt for deployment
transformers==4.35.0
torch==2.1.0
accelerate==0.24.0
langchain==0.1.0
langchain-community==0.0.10
requests==2.31.0
beautifulsoup4==4.12.2
arxiv==1.4.8
matplotlib==3.8.0
plotly==5.17.0
pandas==2.1.0
sentence-transformers==2.2.2
chromadb==0.4.15
pydantic==2.5.0
fastapi==0.104.0
uvicorn==0.24.0
psutil==5.9.6
KUBERNETES DEPLOYMENT
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: research-agent
labels:
app: research-agent
spec:
replicas: 2
selector:
matchLabels:
app: research-agent
template:
metadata:
labels:
app: research-agent
spec:
containers:
- name: research-agent
image: research-agent:latest
ports:
- containerPort: 8000
env:
- name: SEARCH_API_KEY
valueFrom:
secretKeyRef:
name: research-agent-secrets
key: search-api-key
- name: MODEL_NAME
value: "mistralai/Mistral-7B-Instruct-v0.2"
- name: MAX_TASKS_PER_SESSION
value: "20"
resources:
requests:
memory: "4Gi"
cpu: "1000m"
limits:
memory: "8Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 60
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
volumeMounts:
- name: research-outputs
mountPath: /app/research_outputs
- name: charts
mountPath: /app/charts
volumes:
- name: research-outputs
persistentVolumeClaim:
claimName: research-outputs-pvc
- name: charts
persistentVolumeClaim:
claimName: charts-pvc
---
apiVersion: v1
kind: Service
metadata:
name: research-agent-service
spec:
selector:
app: research-agent
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
---
apiVersion: v1
kind: Secret
metadata:
name: research-agent-secrets
type: Opaque
data:
search-api-key: <base64-encoded-api-key>
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: research-outputs-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: charts-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
Code Explanation for Deployment:
- FastAPI server provides RESTful endpoints for research requests and monitoring
- Background task execution allows non-blocking research operations
- Comprehensive health checks and status monitoring enable production deployment
- Docker containerization ensures consistent deployment across environments
- Kubernetes deployment provides scalability, health checks, and persistent storage
- Environment variable configuration allows easy customization per deployment
- Resource limits prevent any single task from consuming excessive system resources
STEP 10: TESTING AND VALIDATION
UNIT TESTS
# test_agent.py
import pytest
import asyncio
import tempfile
import json
from datetime import datetime
from unittest.mock import Mock, patch, AsyncMock
# Import the classes we want to test
from agent_core import Task, TaskStatus, AgentState, ResearchAgent
from planning import PlanningSystem
from tools import ToolRegistry, WebSearchTool, DataVisualizerTool
from memory import MemorySystem
from llm_provider import MistralProvider
from main_agent import ResearchAgentApp
class TestTask:
"""Test the Task data class functionality."""
def test_task_creation(self):
"""Test basic task creation and initialization."""
task = Task(
id="test_001",
description="Test task description",
tool_name="web_search",
parameters={"query": "test query", "num_results": 5}
)
assert task.id == "test_001"
assert task.description == "Test task description"
assert task.tool_name == "web_search"
assert task.parameters["query"] == "test query"
assert task.status == TaskStatus.PENDING
assert task.result is None
assert task.created_at is not None
def test_task_status_updates(self):
"""Test task status transitions."""
task = Task("test_001", "Test task", "web_search", {})
# Test status progression
assert task.status == TaskStatus.PENDING
task.status = TaskStatus.IN_PROGRESS
assert task.status == TaskStatus.IN_PROGRESS
task.status = TaskStatus.COMPLETED
task.result = {"success": True, "data": "test result"}
assert task.status == TaskStatus.COMPLETED
assert task.result["success"] is True
class TestAgentState:
"""Test the AgentState data class."""
def test_agent_state_initialization(self):
"""Test agent state initialization."""
state = AgentState(
current_goal="Test research goal",
task_queue=[],
completed_tasks=[],
memory={},
context=""
)
assert state.current_goal == "Test research goal"
assert len(state.task_queue) == 0
assert len(state.completed_tasks) == 0
assert state.memory == {}
assert state.context == ""
class TestToolRegistry:
"""Test the tool registry functionality."""
def test_tool_registration(self):
"""Test registering tools in the registry."""
registry = ToolRegistry()
# Create a mock tool
mock_tool = Mock()
mock_tool.execute = AsyncMock(return_value={"status": "success"})
registry.register("test_tool", mock_tool)
assert "test_tool" in registry.tools
assert registry.tools["test_tool"] == mock_tool
def test_tool_not_found_error(self):
"""Test error handling for non-existent tools."""
registry = ToolRegistry()
with pytest.raises(ValueError) as exc_info:
asyncio.run(registry.execute_tool("nonexistent_tool", param="value"))
assert "Tool 'nonexistent_tool' not found" in str(exc_info.value)
@pytest.mark.asyncio
async def test_tool_execution(self):
"""Test successful tool execution."""
registry = ToolRegistry()
# Create and register a mock tool
mock_tool = Mock()
mock_tool.execute = AsyncMock(return_value={"result": "test data", "status": "success"})
registry.register("test_tool", mock_tool)
# Execute the tool
result = await registry.execute_tool("test_tool", query="test")
assert result["result"] == "test data"
assert result["status"] == "success"
mock_tool.execute.assert_called_once_with(query="test")
@pytest.mark.asyncio
async def test_tool_execution_error_handling(self):
"""Test tool execution error handling."""
registry = ToolRegistry()
# Create a tool that raises an exception
mock_tool = Mock()
mock_tool.execute = AsyncMock(side_effect=Exception("Tool execution failed"))
registry.register("failing_tool", mock_tool)
# Test that the exception is propagated
with pytest.raises(Exception) as exc_info:
await registry.execute_tool("failing_tool", param="value")
assert "Tool execution failed" in str(exc_info.value)
class TestMemorySystem:
"""Test the memory system functionality."""
def setUp(self):
"""Set up test environment with temporary memory."""
# Use in-memory ChromaDB for testing
self.memory = MemorySystem("test_collection")
def test_memory_storage(self):
"""Test storing and retrieving memories."""
memory = MemorySystem("test_memory")
# Store a test result
task_id = "test_task_001"
content = "This is test research content about renewable energy"
metadata = {
"type": "research_result",
"goal": "renewable energy research"
}
memory.store_result(task_id, content, metadata)
# Retrieve relevant memories
relevant = memory.retrieve_relevant("renewable energy", k=1)
assert len(relevant) > 0
assert "renewable energy" in relevant[0]["content"]
def test_memory_relevance_filtering(self):
"""Test memory relevance scoring and filtering."""
memory = MemorySystem("test_relevance")
# Store multiple memories with different relevance
memory.store_result("task1", "Solar energy research findings", {"type": "research"})
memory.store_result("task2", "Wind energy statistics and data", {"type": "research"})
memory.store_result("task3", "Cooking recipes for dinner", {"type": "personal"})
# Query for energy-related content
relevant = memory.retrieve_relevant("solar power energy", k=5)
# Should return energy-related content with higher relevance
assert len(relevant) >= 2
# Solar energy should be most relevant
assert "solar" in relevant[0]["content"].lower()
class TestPlanningSystem:
"""Test the planning system functionality."""
def setUp(self):
"""Set up mock LLM for testing."""
self.mock_llm = Mock()
@pytest.mark.asyncio
async def test_plan_creation(self):
"""Test creation of research plans."""
mock_llm = Mock()
# Mock LLM response with valid JSON
mock_plan_response = '''
{
"strategy": "Comprehensive renewable energy analysis",
"tasks": [
{
"id": "task_001",
"description": "Search for renewable energy statistics",
"tool_name": "web_search",
"parameters": {"query": "renewable energy statistics 2024", "num_results": 5},
"priority": 1,
"dependencies": []
},
{
"id": "task_002",
"description": "Analyze renewable energy trends",
"tool_name": "data_visualizer",
"parameters": {"data": [], "chart_type": "line", "title": "Renewable Energy Trends"},
"priority": 2,
"dependencies": ["task_001"]
}
],
"success_criteria": [
"Gather current renewable energy adoption statistics",
"Identify key trends and patterns"
]
}
'''
mock_llm.generate = AsyncMock(return_value=mock_plan_response)
planner = PlanningSystem(mock_llm)
# Test plan creation
tasks = await planner.create_plan("Analyze renewable energy adoption trends")
assert len(tasks) == 2
assert tasks[0].id == "task_001"
assert tasks[0].tool_name == "web_search"
assert tasks[1].id == "task_002"
assert tasks[1].tool_name == "data_visualizer"
@pytest.mark.asyncio
async def test_plan_creation_with_invalid_json(self):
"""Test plan creation fallback when LLM returns invalid JSON."""
mock_llm = Mock()
mock_llm.generate = AsyncMock(return_value="Invalid JSON response")
planner = PlanningSystem(mock_llm)
# Should fall back to default plan
tasks = await planner.create_plan("Test research goal")
assert len(tasks) >= 1
assert tasks[0].tool_name in ["web_search", "synthesis"]
class TestDataVisualizerTool:
"""Test the data visualization tool."""
@pytest.mark.asyncio
async def test_bar_chart_creation(self):
"""Test creating a bar chart."""
with tempfile.TemporaryDirectory() as temp_dir:
# Create visualizer with temporary output directory
visualizer = DataVisualizerTool()
visualizer.output_dir = temp_dir
# Test data
data = [
{"Country": "Germany", "Capacity": 150},
{"Country": "France", "Capacity": 120},
{"Country": "Spain", "Capacity": 100}
]
result = await visualizer.execute(
data=data,
chart_type="bar",
title="Renewable Energy Capacity by Country"
)
assert result["status"] == "success"
assert result["chart_type"] == "bar"
assert result["data_points"] == 3
assert "chart_path" in result
@pytest.mark.asyncio
async def test_empty_data_error_handling(self):
"""Test error handling for empty data."""
visualizer = DataVisualizerTool()
result = await visualizer.execute(
data=[],
chart_type="bar",
title="Empty Data Test"
)
assert result["status"] == "failed"
assert "error" in result
class TestResearchAgentApp:
"""Integration tests for the complete research agent application."""
@pytest.fixture
def mock_config(self):
"""Provide test configuration."""
return {
"search_api_key": "test_api_key",
"max_tasks_per_session": 5,
"enable_caching": False
}
@patch('main_agent.MistralProvider')
@patch('main_agent.MemorySystem')
@patch('main_agent.ToolRegistry')
def test_agent_initialization(self, mock_tool_registry, mock_memory, mock_llm, mock_config):
"""Test agent initialization with mocked components."""
# Configure mocks
mock_llm_instance = Mock()
mock_llm.return_value = mock_llm_instance
mock_memory_instance = Mock()
mock_memory.return_value = mock_memory_instance
mock_registry_instance = Mock()
mock_tool_registry.return_value = mock_registry_instance
# Initialize agent
agent = ResearchAgentApp(mock_config)
assert agent.config["max_tasks_per_session"] == 5
assert agent.config["enable_caching"] is False
assert agent.llm == mock_llm_instance
assert agent.memory == mock_memory_instance
@pytest.mark.asyncio
@patch('main_agent.MistralProvider')
@patch('main_agent.MemorySystem')
@patch('main_agent.ToolRegistry')
async def test_research_execution_flow(self, mock_tool_registry, mock_memory, mock_llm, mock_config):
"""Test the complete research execution flow."""
# Set up mocks
mock_llm_instance = Mock()
mock_llm_instance.generate = AsyncMock(return_value="Mock research result")
mock_llm.return_value = mock_llm_instance
mock_memory_instance = Mock()
mock_memory_instance.get_research_context = Mock(return_value="Mock context")
mock_memory_instance.store_result = Mock()
mock_memory.return_value = mock_memory_instance
mock_registry_instance = Mock()
mock_registry_instance.execute_tool = AsyncMock(return_value={"status": "success", "data": "mock data"})
mock_tool_registry.return_value = mock_registry_instance
# Mock planning system
with patch('main_agent.PlanningSystem') as mock_planning:
mock_planner = Mock()
mock_task = Task("test_001", "Mock task", "web_search", {"query": "test"})
mock_planner.create_plan = AsyncMock(return_value=[mock_task])
mock_planner.should_replan = AsyncMock(return_value=False)
mock_planning.return_value = mock_planner
# Initialize and run research
agent = ResearchAgentApp(mock_config)
result = await agent.research("Test research goal")
# Verify execution
assert isinstance(result, str)
assert len(result) > 0
mock_planner.create_plan.assert_called()
mock_registry_instance.execute_tool.assert_called()
INTEGRATION TESTS
# test_integration.py
import pytest
import asyncio
import tempfile
import os
from datetime import datetime
class TestEndToEndIntegration:
"""
End-to-end integration tests that use real components
but with test data and configurations.
"""
@pytest.mark.asyncio
async def test_complete_research_workflow(self):
"""
Test a complete research workflow with minimal real components.
This test uses actual classes but with mock external dependencies.
"""
# Create temporary directories for test outputs
with tempfile.TemporaryDirectory() as temp_dir:
# Configure test agent with minimal real setup
test_config = {
"search_api_key": "test_key", # Will use mock search
"max_tasks_per_session": 3,
"output_directory": temp_dir,
"enable_caching": False
}
# Mock external dependencies
with patch('tools.requests.get') as mock_requests, \
patch('llm_provider.AutoModelForCausalLM') as mock_model, \
patch('llm_provider.AutoTokenizer') as mock_tokenizer:
# Configure HTTP mock
mock_response = Mock()
mock_response.json.return_value = {
"items": [
{
"title": "Renewable Energy Report 2024",
"snippet": "Latest trends in renewable energy adoption",
"link": "https://example.com/report",
"displayLink": "example.com"
}
]
}
mock_response.raise_for_status.return_value = None
mock_requests.return_value = mock_response
# Configure LLM mocks
mock_tokenizer_instance = Mock()
mock_tokenizer_instance.encode.return_value = [1, 2, 3, 4, 5]
mock_tokenizer_instance.decode.return_value = "Mock LLM response"
mock_tokenizer_instance.eos_token_id = 2
mock_tokenizer_instance.pad_token = None
mock_tokenizer.from_pretrained.return_value = mock_tokenizer_instance
mock_model_instance = Mock()
mock_model_instance.generate.return_value = [[1, 2, 3, 4, 5, 6, 7]]
mock_model_instance.eval.return_value = None
mock_model.from_pretrained.return_value = mock_model_instance
# Initialize and test the agent
try:
agent = ResearchAgentApp(test_config)
# Execute a research task
result = await agent.research("Test renewable energy trends")
# Verify results
assert isinstance(result, str)
assert len(result) > 50 # Should be a substantial response
assert "renewable energy" in result.lower() or "research" in result.lower()
except Exception as e:
pytest.skip(f"Integration test skipped due to setup issues: {e}")
@pytest.mark.asyncio
async def test_tool_integration(self):
"""Test integration between different tools."""
# Test tool registry with multiple tools
registry = ToolRegistry()
# Add real tools but with mocked external dependencies
with patch('tools.requests.get') as mock_get:
# Mock successful web search
mock_response = Mock()
mock_response.json.return_value = {"items": [{"title": "Test", "snippet": "Test result"}]}
mock_response.raise_for_status.return_value = None
mock_response.elapsed.total_seconds.return_value = 0.5
mock_get.return_value = mock_response
# Register tools
web_search = WebSearchTool("test_key")
registry.register("web_search", web_search)
data_viz = DataVisualizerTool()
registry.register("data_visualizer", data_viz)
# Test tool execution
search_result = await registry.execute_tool("web_search", query="test", num_results=1)
assert search_result["status"] == "success"
# Test visualization with sample data
viz_data = [{"x": 1, "y": 10}, {"x": 2, "y": 20}]
viz_result = await registry.execute_tool(
"data_visualizer",
data=viz_data,
chart_type="line",
title="Test Chart"
)
assert viz_result["status"] == "success"
# test_performance.py
import pytest
import asyncio
import time
import psutil
import threading
from concurrent.futures import ThreadPoolExecutor
class TestPerformance:
"""Performance and load testing for the research agent."""
@pytest.mark.asyncio
async def test_memory_usage_stability(self):
"""Test that memory usage remains stable during operation."""
# Monitor memory usage
initial_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
# Simulate multiple research sessions
with patch('main_agent.MistralProvider') as mock_llm, \
patch('main_agent.MemorySystem') as mock_memory, \
patch('main_agent.ToolRegistry') as mock_tools:
# Configure mocks for minimal resource usage
mock_llm_instance = Mock()
mock_llm_instance.generate = AsyncMock(return_value="Mock result")
mock_llm.return_value = mock_llm_instance
mock_memory_instance = Mock()
mock_memory_instance.get_research_context = Mock(return_value="")
mock_memory_instance.store_result = Mock()
mock_memory.return_value = mock_memory_instance
mock_tools_instance = Mock()
mock_tools_instance.execute_tool = AsyncMock(return_value={"status": "success"})
mock_tools.return_value = mock_tools_instance
# Run multiple sessions
config = {"max_tasks_per_session": 2}
agent = ResearchAgentApp(config)
for i in range(5):
await agent.research(f"Test research goal {i}")
# Check memory usage
current_memory = psutil.Process().memory_info().rss / 1024 / 1024
memory_increase = current_memory - initial_memory
# Memory increase should be reasonable (less than 100MB per session)
assert memory_increase < 100 * (i + 1), f"Excessive memory usage: {memory_increase}MB"
@pytest.mark.asyncio
async def test_concurrent_requests(self):
"""Test handling of concurrent research requests."""
async def mock_research_task(agent, goal):
"""Mock research task that takes some time."""
await asyncio.sleep(0.1) # Simulate work
return f"Research result for: {goal}"
# Mock the research method
with patch('main_agent.ResearchAgentApp.research', side_effect=mock_research_task):
agent = Mock()
# Submit concurrent requests
tasks = []
for i in range(10):
task = asyncio.create_task(mock_research_task(agent, f"Goal {i}"))
tasks.append(task)
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
# All tasks should complete
assert len(results) == 10
# Should complete in reasonable time (concurrent execution)
assert end_time - start_time < 1.0 # Less than 1 second for 10 concurrent 0.1s tasks
def test_task_execution_performance(self):
"""Test performance of individual task components."""
# Test task creation performance
start_time = time.time()
tasks = []
for i in range(1000):
task = Task(f"task_{i}", f"Description {i}", "web_search", {"query": f"test {i}"})
tasks.append(task)
creation_time = time.time() - start_time
assert creation_time < 1.0 # Should create 1000 tasks in under 1 second
assert len(tasks) == 1000
# Test task status updates
start_time = time.time()
for task in tasks[:100]: # Test first 100
task.status = TaskStatus.COMPLETED
task.result = {"data": "test result"}
update_time = time.time() - start_time
assert update_time < 0.1 # Should update 100 tasks in under 0.1 seconds
VALIDATION TESTS
# test_validation.py
import pytest
import json
from datetime import datetime
class TestDataValidation:
"""Test data validation and error handling throughout the system."""
def test_task_parameter_validation(self):
"""Test validation of task parameters."""
# Valid task creation
valid_task = Task(
id="valid_001",
description="Valid task description",
tool_name="web_search",
parameters={"query": "test", "num_results": 5}
)
assert valid_task.id == "valid_001"
# Test with invalid parameters
with pytest.raises(TypeError):
Task(
id=None, # Should require string ID
description="Test",
tool_name="web_search",
parameters={}
)
def test_research_goal_validation(self):
"""Test validation of research goals."""
# Valid goals
valid_goals = [
"Analyze renewable energy trends in Europe",
"Research artificial intelligence applications in healthcare",
"Study consumer behavior in e-commerce"
]
for goal in valid_goals:
assert len(goal) >= 10 # Reasonable minimum length
assert any(keyword in goal.lower() for keyword in ['analyze', 'research', 'study', 'investigate'])
# Invalid goals
invalid_goals = [
"", # Empty
"Hi", # Too short
" ", # Whitespace only
]
for goal in invalid_goals:
assert len(goal.strip()) < 10 # Should be rejected
def test_json_response_validation(self):
"""Test validation of JSON responses from LLM."""
# Valid JSON response
valid_json = '''
{
"strategy": "Test strategy",
"tasks": [
{
"id": "task_001",
"description": "Test task",
"tool_name": "web_search",
"parameters": {"query": "test"},
"priority": 1,
"dependencies": []
}
],
"success_criteria": ["Test criterion"]
}
'''
try:
parsed = json.loads(valid_json)
assert "strategy" in parsed
assert "tasks" in parsed
assert len(parsed["tasks"]) > 0
assert "id" in parsed["tasks"][0]
except json.JSONDecodeError:
pytest.fail("Valid JSON should parse successfully")
# Invalid JSON responses
invalid_json_responses = [
'{"incomplete": true', # Malformed JSON
'Not JSON at all', # Not JSON
'{}', # Empty object
'{"tasks": []}', # Missing required fields
]
for invalid_json in invalid_json_responses:
try:
parsed = json.loads(invalid_json)
# Check for required fields
required_fields = ["strategy", "tasks", "success_criteria"]
missing_fields = [field for field in required_fields if field not in parsed]
if missing_fields:
assert len(missing_fields) > 0 # Should be missing required fields
except json.JSONDecodeError:
assert True # Expected for malformed JSON
class TestErrorRecovery:
"""Test error recovery and resilience mechanisms."""
@pytest.mark.asyncio
async def test_tool_failure_recovery(self):
"""Test recovery when tools fail."""
registry = ToolRegistry()
# Create a tool that always fails
failing_tool = Mock()
failing_tool.execute = AsyncMock(side_effect=Exception("Simulated tool failure"))
registry.register("failing_tool", failing_tool)
# Test that failure is properly handled
with pytest.raises(Exception) as exc_info:
await registry.execute_tool("failing_tool", param="test")
assert "Simulated tool failure" in str(exc_info.value)
# Check that execution history records the failure
history = registry.get_execution_history()
assert len(history) > 0
assert history[-1]["status"] == "failed"
assert "Simulated tool failure" in history[-1]["error"]
@pytest.mark.asyncio
async def test_llm_failure_recovery(self):
"""Test recovery when LLM generation fails."""
# Mock LLM that fails
failing_llm = Mock()
failing_llm.generate = AsyncMock(side_effect=Exception("LLM generation failed"))
planner = PlanningSystem(failing_llm)
# Should fall back to default plan when LLM fails
tasks = await planner.create_plan("Test research goal")
# Should return fallback tasks
assert len(tasks) >= 1
assert tasks[0].id.startswith("fallback_")
def test_memory_system_resilience(self):
"""Test memory system resilience to corrupted data."""
memory = MemorySystem("test_resilience")
# Test storing various types of data
test_data = [
("normal_task", "Normal research content", {"type": "research"}),
("empty_task", "", {"type": "empty"}),
("unicode_task", "Unicode content: 中文 español émoji 🔬", {"type": "unicode"}),
("large_task", "x" * 10000, {"type": "large"}), # Large content
]
for task_id, content, metadata in test_data:
try:
memory.store_result(task_id, content, metadata)
# If storage succeeds, retrieval should work
relevant = memory.retrieve_relevant(content[:50] if content else "test", k=1)
assert isinstance(relevant, list)
except Exception as e:
# Log but don't fail - some edge cases may be expected to fail
print(f"Memory storage failed for {task_id}: {e}")
# Test runner configuration
# pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--tb=short
--strict-markers
--disable-warnings
markers =
asyncio: marks tests as async (deselect with '-m "not asyncio"')
integration: marks tests as integration tests
performance: marks tests as performance tests
slow: marks tests as slow running
# Test execution script
# run_tests.sh
#!/bin/bash
echo "Running Research Agent Test Suite"
echo "================================="
# Run unit tests
echo "1. Running unit tests..."
python -m pytest test_agent.py -v
# Run integration tests
echo "2. Running integration tests..."
python -m pytest test_integration.py -v -m integration
# Run performance tests (optional, slower)
echo "3. Running performance tests..."
python -m pytest test_performance.py -v -m performance
# Run validation tests
echo "4. Running validation tests..."
python -m pytest test_validation.py -v
# Generate coverage report
echo "5. Generating coverage report..."
python -m pytest --cov=. --cov-report=html --cov-report=term
echo "Test suite completed!"
Code Explanation for Testing:
- Comprehensive unit tests cover all major components with proper mocking
- Integration tests verify component interactions with controlled dependencies
- Performance tests ensure memory stability and concurrent request handling
- Validation tests verify data integrity and error handling throughout the system
- Test configuration includes proper async test support and clear organization
- Error recovery tests ensure the system gracefully handles component failures
- The test suite provides confidence in system reliability and performance
- Coverage reporting helps identify untested code paths for improvement
USER GUIDE - AGENTIC AI RESEARCH ASSISTANT
GETTING STARTED
QUICK START (Interactive Mode)
1. Open your terminal/command prompt
2. Navigate to the installation directory
3. Run: python main_agent.py
4. When you see the prompt, type your research goal
5. Press Enter and wait for results
Example:
📝 Research Goal: Analyze renewable energy trends in Europe
🔍 Starting research... (this may take 2-10 minutes)
📊 Research Complete! [Results displayed]
QUICK START (Web Interface)
1. Start the web server: python api_server.py
2. Open your browser to: http://localhost:8000/docs
3. Use the /research endpoint to submit requests
4. Check results with the /research/{task_id} endpoint
HOW TO ASK FOR RESEARCH
BASIC FORMAT
Simply describe what you want to research in natural language.
✅ GOOD EXAMPLES:
• "Analyze the impact of remote work on productivity"
• "Compare electric vehicle adoption rates across different countries"
• "Study consumer behavior changes during economic inflation"
• "Investigate the effectiveness of meditation for anxiety treatment"
❌ AVOID THESE:
• Too vague: "cars" or "technology stuff"
• Too narrow: "What did John Smith say about EVs on Twitter yesterday?"
• Impossible requests: "Predict the exact stock price of Tesla next week"
ADDING CONTEXT (OPTIONAL)
For better results, you can add specific requirements:
Format: [Main Goal] + [Additional Context]
Examples:
• Goal: "Analyze social media impact on teenagers"
Context: "Focus on mental health effects, use studies from 2020-2024"
• Goal: "Research sustainable packaging trends"
Context: "Need data for business presentation, focus on food industry"
• Goal: "Study cryptocurrency regulation"
Context: "Compare approaches in US, EU, and Asia"
UNDERSTANDING THE RESEARCH PROCESS
WHAT THE AGENT DOES AUTOMATICALLY
The agent will independently:
• Break your goal into specific research tasks
• Search multiple sources for information
• Analyze documents and data
• Create visualizations when helpful
• Synthesize findings into a comprehensive report
• Adapt its approach based on what it discovers
TYPICAL TIMELINE
• Simple topics: 2-5 minutes
• Complex topics: 5-15 minutes
• Very comprehensive research: 15-30 minutes
You'll see progress updates like:
Phase 1: Creating research plan...
Phase 2: Executing research tasks...
Executing task 1: Search for renewable energy statistics...
Executing task 2: Analyze EU energy policy documents...
Phase 3: Synthesizing results...
WHAT YOU'LL RECEIVE
RESEARCH REPORT FORMAT
Your results will include:
1. EXECUTIVE SUMMARY
Key findings and main conclusions
2. METHODOLOGY
How the research was conducted
3. KEY FINDINGS
Detailed discoveries organized by theme
4. ANALYSIS
Interpretation of the data and trends
5. CONCLUSIONS
Summary of insights and implications
6. RECOMMENDATIONS
Actionable next steps (when applicable)
ADDITIONAL OUTPUTS
Depending on your topic, you may also get:
• Charts and graphs (saved as PNG files)
• Links to important sources discovered
• Suggestions for follow-up research
TIPS FOR BETTER RESULT
BE SPECIFIC ABOUT WHAT YOU NEED
Instead of: "Research AI"
Try: "Analyze how AI is changing job markets in healthcare"
Instead of: "Look into climate change"
Try: "Study the effectiveness of carbon pricing policies in reducing emissions"
SPECIFY YOUR PERSPECTIVE
• "From a business strategy perspective..."
• "For academic research purposes..."
• "To inform policy decisions..."
• "For investment analysis..."
SET BOUNDARIES WHEN NEEDED
• Time period: "...trends since 2020"
• Geography: "...focusing on European markets"
• Industry: "...in the automotive sector"
• Data type: "...with quantitative evidence"
WORKING WITH RESULTS
FOLLOW-UP RESEARCH
If you want to explore specific aspects deeper:
"The previous research mentioned battery storage challenges.
Can you investigate battery technology developments in more detail?"
"I'd like more information about the policy recommendations
from the renewable energy research."
SAVING YOUR WORK
• Reports are automatically saved with timestamps
• Charts are saved in the /charts directory
• You can copy/paste text results into your own documents
COMMON COMMANDS
INTERACTIVE MODE COMMANDS
• help - Show available commands
• stats - Display performance statistics
• memory - Show system memory status
• quit - Exit the application
WEB API ENDPOINTS
• POST /research - Start new research
• GET /research/{id} - Check research status
• GET /research - List recent research tasks
• GET /health - Check system status
• GET /stats - View system statistics
TROUBLESHOOTING
COMMON ISSUES AND SOLUTIONS
"Research is taking too long"
→ Complex topics need more time. Check the progress messages.
→ You can cancel with Ctrl+C and try a more focused topic.
"Results seem incomplete"
→ Try being more specific about what you're looking for.
→ Add context about the type of information you need.
"Agent says it can't find information"
→ Topic might be too recent or too specific.
→ Try broadening the scope or using different terminology.
"Getting errors about tools failing"
→ Check your internet connection.
→ Verify API keys are configured correctly.
→ Some external sources might be temporarily unavailable.
OPTIMIZING PERFORMANCE
For faster results:
• Be specific rather than asking broad questions
• Limit scope when possible ("...in the past 2 years")
• Avoid requests that require real-time data
For more comprehensive results:
• Allow more time for complex topics
• Ask follow-up questions to dive deeper
• Request specific types of analysis or visualization
EXAMPLE WORKFLOWS
BUSINESS RESEARCH WORKFLOW
1. "Analyze the market opportunity for sustainable packaging in food industry"
2. Review results, identify promising areas
3. "Research specific sustainable packaging technologies mentioned in the previous analysis"
4. "Compare costs and adoption barriers for biodegradable vs recyclable packaging"
ACADEMIC RESEARCH WORKFLOW
1. "Study the relationship between social media use and academic performance in college students"
2. Examine methodology and findings
3. "Find additional peer-reviewed studies on social media and student focus/attention"
4. "Analyze criticisms and limitations of existing research in this area"
INVESTMENT RESEARCH WORKFLOW
1. "Analyze growth trends in the electric vehicle charging infrastructure market"
2. Review market data and projections
3. "Research key companies and competitive landscape in EV charging"
4. "Investigate regulatory factors affecting EV charging investments"
BEST PRACTICES
DO'S
✅ Start with clear, specific goals
✅ Provide context when you have specific needs
✅ Allow adequate time for comprehensive research
✅ Ask follow-up questions to explore interesting findings
✅ Save important results for future reference
DON'TS
❌ Don't expect instant results for complex topics
❌ Don't ask for personal/private information about individuals
❌ Don't request illegal or unethical research
❌ Don't assume the agent has access to proprietary databases
❌ Don't interrupt the research process unless necessary
GETTING HELP
If you need assistance:
• Type 'help' in interactive mode for commands
• Check the system logs for error details
• Review this user guide for common solutions
• Ensure all requirements are properly installed
For technical issues:
• Verify Python dependencies are installed
• Check that you have sufficient disk space and memory
• Ensure internet connectivity for web searches
• Confirm API keys are valid (if using external services)
Remember: The AI Research Assistant is designed to be intuitive and helpful.
Don't hesitate to experiment with different types of research requests to
discover what works best for your needs!
CONCLUSION
This completes the comprehensive guide for building an agentic AI research
assistant. The system includes:
CORE CAPABILITIES:
- Autonomous planning and task execution
- Local LLM integration with Mistral
- Advanced memory and context management
- Modular tool system for extensibility
- Self-reflection and continuous improvement
- Performance monitoring and optimization
DEPLOYMENT OPTIONS:
- FastAPI REST server for web integration
- Docker containerization for consistent deployment
- Kubernetes support for scalable production deployment
- Comprehensive health monitoring and metrics
QUALITY ASSURANCE:
- Complete test suite covering unit, integration, and performance testing
- Error recovery and resilience mechanisms
- Data validation and input sanitization
- Comprehensive logging and debugging capabilities
The architecture is designed to be production-ready while remaining
extensible for additional capabilities and use cases. The agent can operate
autonomously while providing transparency and control over its operations.
Key success factors:
1. Modular design enables easy extension and maintenance
2. Comprehensive error handling ensures robust operation
3. Performance monitoring enables optimization and scaling
4. Test coverage provides confidence in system reliability
5. Documentation supports adoption and contribution
This foundation provides everything needed to build sophisticated agentic AI
applications that can operate autonomously while maintaining human oversight
and control.
COMPLETE SETUP GUIDE AGENTIC AI RESEARCH ASSISTANT
OVERVIEW
========
This guide will walk you through setting up the Research Assistant from scratch.
Total setup time: 2-6 hours depending on your experience level and hardware.
PREREQUISITES AND SYSTEM REQUIREMENTS
=====================================
HARDWARE REQUIREMENTS
---------------------
MINIMUM (Basic functionality):
• CPU: Modern 4-core processor (Intel i5/i7, AMD Ryzen 5/7)
• RAM: 16GB (12GB for model + 4GB for system)
• Storage: 25GB free space (20GB for models + 5GB for data)
• Internet: Stable broadband connection for initial setup
RECOMMENDED (Optimal performance):
• CPU: 8+ core processor
• RAM: 32GB or more
• GPU: NVIDIA GPU with 8GB+ VRAM (RTX 3070, RTX 4060, or better)
• Storage: 50GB+ on SSD
• Internet: Fast broadband for model downloads
SUPPORTED OPERATING SYSTEMS
---------------------------
• Windows 10/11 (64-bit)
• macOS 10.15+ (Intel or Apple Silicon)
• Linux (Ubuntu 20.04+, CentOS 8+, or equivalent)
SOFTWARE PREREQUISITES
----------------------
• Python 3.9, 3.10, or 3.11 (Python 3.12 may have compatibility issues)
• Git (for cloning repositories)
• Text editor or IDE (VS Code, PyCharm, etc.)
STEP 1: ENVIRONMENT SETUP
=========================
INSTALL PYTHON
--------------
Windows:
1. Download Python from https://python.org/downloads/
2. During installation, CHECK "Add Python to PATH"
3. Verify: Open Command Prompt, type: python --version
macOS:
1. Install Homebrew: /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
2. Install Python: brew install python@3.10
3. Verify: python3 --version
Linux (Ubuntu/Debian):
sudo apt update
sudo apt install python3.10 python3.10-pip python3.10-venv
python3.10 --version
CREATE PROJECT DIRECTORY
------------------------
mkdir agentic-research-assistant
cd agentic-research-assistant
CREATE VIRTUAL ENVIRONMENT
--------------------------
# Create virtual environment
python -m venv research_env
# Activate it
# Windows:
research_env\Scripts\activate
# macOS/Linux:
source research_env/bin/activate
# You should see (research_env) in your terminal prompt
STEP 2: CODE SETUP
==================
CREATE PROJECT STRUCTURE
------------------------
mkdir -p src tools tests data charts research_outputs logs config
Your directory should look like:
agentic-research-assistant/
├── src/ # Main application code
├── tools/ # Tool implementations
├── tests/ # Test files
├── data/ # Data storage
├── charts/ # Generated visualizations
├── research_outputs/ # Research results
├── logs/ # Application logs
├── config/ # Configuration files
├── requirements.txt # Python dependencies
└── README.md # Documentation
CREATE MAIN FILES
-----------------
Create the following files in your project directory:
requirements.txt:
transformers==4.35.0
torch==2.1.0
accelerate==0.24.0
sentence-transformers==2.2.2
chromadb==0.4.15
pydantic==2.5.0
fastapi==0.104.0
uvicorn==0.24.0
requests==2.31.0
beautifulsoup4==4.12.2
matplotlib==3.8.0
plotly==5.17.0
pandas==2.1.0
psutil==5.9.6
pytest==7.4.0
python-dotenv==1.0.0
.env (for environment variables):
# API Keys (get these from respective providers)
SEARCH_API_KEY=your_search_api_key_here
OPENAI_API_KEY=your_openai_key_here
# Model Configuration
MODEL_NAME=mistralai/Mistral-7B-Instruct-v0.2
MAX_TASKS_PER_SESSION=20
ENABLE_CACHING=true
# Directories
OUTPUT_DIRECTORY=./research_outputs
CHARTS_DIRECTORY=./charts
LOGS_DIRECTORY=./logs
config/config.py:
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# API Keys
SEARCH_API_KEY = os.getenv('SEARCH_API_KEY', '')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', '')
# Model Settings
MODEL_NAME = os.getenv('MODEL_NAME', 'mistralai/Mistral-7B-Instruct-v0.2')
MAX_TASKS_PER_SESSION = int(os.getenv('MAX_TASKS_PER_SESSION', '20'))
ENABLE_CACHING = os.getenv('ENABLE_CACHING', 'true').lower() == 'true'
# Directories
OUTPUT_DIRECTORY = os.getenv('OUTPUT_DIRECTORY', './research_outputs')
CHARTS_DIRECTORY = os.getenv('CHARTS_DIRECTORY', './charts')
LOGS_DIRECTORY = os.getenv('LOGS_DIRECTORY', './logs')
# Ensure directories exist
os.makedirs(OUTPUT_DIRECTORY, exist_ok=True)
os.makedirs(CHARTS_DIRECTORY, exist_ok=True)
os.makedirs(LOGS_DIRECTORY, exist_ok=True)
STEP 3: INSTALL DEPENDENCIES
============================
INSTALL PYTHON PACKAGES
-----------------------
# Make sure virtual environment is activated
pip install --upgrade pip
# Install PyTorch (choose based on your system)
# For CPU only:
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
# For NVIDIA GPU (CUDA 11.8):
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# For NVIDIA GPU (CUDA 12.1):
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# Install remaining dependencies
pip install -r requirements.txt
VERIFY INSTALLATION
------------------
python -c "import torch; print(f'PyTorch version: {torch.__version__}')"
python -c "import torch; print(f'CUDA available: {torch.cuda.is_available()}')"
python -c "import transformers; print(f'Transformers version: {transformers.__version__}')"
STEP 4: MODEL SETUP
===================
DOWNLOAD MISTRAL MODEL
---------------------
Create download_model.py:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import os
def download_mistral_model():
model_name = "mistralai/Mistral-7B-Instruct-v0.2"
cache_dir = "./models"
print(f"Downloading {model_name}...")
print("This will take 30-60 minutes and use ~7GB of storage.")
try:
# Download tokenizer
print("Downloading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(
model_name,
cache_dir=cache_dir
)
# Download model
print("Downloading model...")
model = AutoModelForCausalLM.from_pretrained(
model_name,
cache_dir=cache_dir,
torch_dtype=torch.float16,
low_cpu_mem_usage=True
)
print("✅ Model downloaded successfully!")
print(f"📁 Cached in: {cache_dir}")
# Test the model
print("🧪 Testing model...")
inputs = tokenizer("Hello, I am", return_tensors="pt")
outputs = model.generate(**inputs, max_length=20)
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"✅ Model test successful: {response}")
except Exception as e:
print(f"❌ Error downloading model: {e}")
print("💡 Try running with more RAM or check internet connection")
if __name__ == "__main__":
download_mistral_model()
Run the download:
python download_model.py
ALTERNATIVE: USE SMALLER MODEL (If you have limited RAM)
-------------------------------------------------------
Edit your .env file:
MODEL_NAME=microsoft/DialoGPT-small
Or for even smaller:
MODEL_NAME=distilgpt2
STEP 5: API CONFIGURATION
=========================
GET SEARCH API KEY
-----------------
You need a web search API. Choose one:
Option 1 - Google Custom Search (Recommended):
1. Go to https://developers.google.com/custom-search/v1/introduction
2. Create a project and enable Custom Search API
3. Create credentials (API key)
4. Create a Custom Search Engine at https://cse.google.com/
5. Get your Search Engine ID
Option 2 - SerpAPI:
1. Go to https://serpapi.com/
2. Sign up for free account (100 searches/month)
3. Get your API key from dashboard
Option 3 - Bing Search API:
1. Go to https://azure.microsoft.com/en-us/services/cognitive-services/bing-web-search-api/
2. Create Azure account
3. Create Bing Search resource
4. Get API key
UPDATE CONFIGURATION
-------------------
Edit your .env file:
SEARCH_API_KEY=your_actual_api_key_here
STEP 6: CREATE MAIN APPLICATION FILES
=====================================
Copy the code files from the previous guide into appropriate directories:
src/agent_core.py # Core agent classes
src/planning.py # Planning system
src/tools.py # Tool implementations
src/memory.py # Memory system
src/llm_provider.py # LLM integration
src/main_agent.py # Main application
src/api_server.py # Web API (optional)
Make sure to update import statements in each file:
from config.config import Config
from src.agent_core import Task, TaskStatus, AgentState
STEP 7: CREATE STARTUP SCRIPT
=============================
Create run.py in the root directory:
#!/usr/bin/env python3
import asyncio
import sys
import os
# Add src directory to path
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
from main_agent import ResearchAgentApp
from config.config import Config
async def main():
"""Main entry point for the research assistant."""
print("🤖 Initializing Agentic AI Research Assistant...")
print("=" * 60)
# Check configuration
if not Config.SEARCH_API_KEY:
print("⚠️ WARNING: No search API key configured")
print(" Web search functionality will be limited")
print(" Add SEARCH_API_KEY to your .env file")
try:
# Initialize the agent
config = {
"search_api_key": Config.SEARCH_API_KEY,
"model_name": Config.MODEL_NAME,
"max_tasks_per_session": Config.MAX_TASKS_PER_SESSION,
"output_directory": Config.OUTPUT_DIRECTORY
}
agent = ResearchAgentApp(config)
print("✅ Agent initialized successfully!")
print("📚 Ready for research requests")
print("=" * 60)
# Run in interactive mode
await agent.interactive_mode()
except Exception as e:
print(f"❌ Error starting agent: {e}")
print("\nTroubleshooting tips:")
print("• Check that all dependencies are installed")
print("• Verify your API keys are correct")
print("• Ensure you have enough RAM/storage")
print("• Check the logs directory for detailed errors")
if __name__ == "__main__":
asyncio.run(main())
STEP 8: INITIAL TESTING
=======================
TEST BASIC SETUP
----------------
# Test Python environment
python -c "print('Python environment: OK')"
# Test dependencies
python -c "import torch, transformers, chromadb; print('Dependencies: OK')"
# Test configuration
python -c "from config.config import Config; print(f'Config loaded: {Config.MODEL_NAME}')"
RUN YOUR FIRST RESEARCH
-----------------------
python run.py
You should see:
🤖 Initializing Agentic AI Research Assistant...
================================================================
✅ Agent initialized successfully!
📚 Ready for research requests
📝 Research Goal: _
Try a simple test:
📝 Research Goal: Tell me about renewable energy
STEP 9: TROUBLESHOOTING COMMON ISSUES
=====================================
MEMORY ISSUES
------------
Error: "RuntimeError: CUDA out of memory" or "Unable to allocate memory"
Solutions:
1. Use smaller model:
MODEL_NAME=distilgpt2
2. Use CPU instead of GPU:
In llm_provider.py, force CPU:
device_map="cpu"
3. Increase virtual memory (Windows):
Control Panel > System > Advanced > Performance Settings > Virtual Memory
MODEL DOWNLOAD ISSUES
--------------------
Error: "ConnectionError" or "403 Forbidden"
Solutions:
1. Check internet connection
2. Try downloading manually:
python -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('mistralai/Mistral-7B-Instruct-v0.2')"
3. Use Hugging Face login:
pip install huggingface_hub
huggingface-cli login
API KEY ISSUES
-------------
Error: "Invalid API key" or "Quota exceeded"
Solutions:
1. Verify API key in .env file
2. Check API key permissions on provider website
3. Monitor usage quotas
4. Use alternative search provider
IMPORT ERRORS
------------
Error: "ModuleNotFoundError"
Solutions:
1. Activate virtual environment:
source research_env/bin/activate # Linux/Mac
research_env\Scripts\activate # Windows
2. Reinstall dependencies:
pip install -r requirements.txt
3. Check Python path:
python -c "import sys; print(sys.path)"
STEP 10: OPTIONAL ENHANCEMENTS
==============================
WEB INTERFACE SETUP
-------------------
To run the web API:
python src/api_server.py
Then visit: http://localhost:8000/docs
DOCKER SETUP (Advanced)
-----------------------
Create Dockerfile:
FROM python:3.10-slim
WORKDIR /app
# Install system dependencies
RUN apt-update && apt-get install -y git curl
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application
COPY . .
# Download model during build (optional)
RUN python download_model.py
EXPOSE 8000
CMD ["python", "run.py"]
Build and run:
docker build -t research-agent .
docker run -p 8000:8000 research-agent
MONITORING SETUP
---------------
Install monitoring tools:
pip install prometheus_client grafana-api
Add to your .env:
ENABLE_MONITORING=true
PROMETHEUS_PORT=9090
PERFORMANCE OPTIMIZATION
-----------------------
For better performance:
1. Use SSD storage for model cache
2. Increase swap file size
3. Close other applications while running
4. Use GPU if available
5. Consider model quantization:
pip install bitsandbytes
BACKUP AND SECURITY
------------------
1. Backup your .env file (without committing to git)
2. Regularly backup research_outputs directory
3. Use strong API keys and rotate them periodically
4. Monitor API usage to avoid unexpected charges
STEP 11: VERIFICATION CHECKLIST
===============================
Before considering setup complete, verify:
□ Python 3.9+ installed and virtual environment active
□ All dependencies installed without errors
□ Model downloaded and cached locally
□ API keys configured and working
□ Directory structure created correctly
□ Basic "Hello World" research query works
□ Charts and outputs are generated correctly
□ No critical errors in logs
□ System performance is acceptable
NEXT STEPS
==========
Once setup is complete:
1. Read the User Guide for effective usage
2. Try different types of research queries
3. Experiment with the API interface
4. Set up monitoring and backups
5. Consider customizing tools for your specific needs
GETTING HELP
============
If you encounter issues:
1. Check the logs directory for detailed error messages
2. Review this setup guide for missed steps
3. Search online for specific error messages
4. Consider using cloud-based alternatives if local setup is problematic
5. Start with a simpler configuration and gradually add complexity
Remember: This is a complex system with many dependencies. Don't be discouraged if setup takes longer than expected - this is normal for advanced AI applications!
No comments:
Post a Comment