Monday, June 02, 2025

COMPLETE GUIDE: BUILDING AN AGENTIC AI RESEARCH ASSISTANT APPLICATION

     

OVERVIEW


This comprehensive guide demonstrates how to build an autonomous Research Assistant Agent capable of planning research tasks, executing them using various tools, and providing comprehensive analysis. The agent uses a local Mistral LLM and demonstrates key agentic AI capabilities including autonomous planning, tool orchestration, and self-reflection.


Parts contained in this Document:

  • Description of Sample Use Case, its Architecture, and its Implementation
  • User‘s Guide
  • Setup Guide


EXAMPLE USE CASE: RESEARCH ASSISTANT AGENT


The Research Assistant Agent we'll build can:

  • Accept high-level research requests like "Analyze renewable energy adoption"
  • Plan a research strategy autonomously without human intervention
  • Use multiple tools to gather data (web search, PDF analysis, visualization)
  • Synthesize findings into comprehensive reports
  • Iterate and refine research based on initial findings
  • Learn from past research sessions to improve future performance


ARCHITECTURE OVERVIEW

 




                    COMPREHENSIVE ARCHITECTURE EXPLANATION - AGENTIC AI RESEARCH ASSISTANT SYSTEM

INTRODUCTION: WHAT MAKES THIS SYSTEM "AGENTIC"


The Research Assistant system represents a sophisticated implementation of agentic artificial intelligence, which is a paradigm where AI systems operate with genuine autonomy, making independent decisions, adapting to circumstances, and pursuing goals without constant human intervention. Unlike traditional AI systems that follow predetermined scripts or respond reactively to inputs, this architecture embodies the core principles of agency through several fundamental capabilities.


The system demonstrates autonomous goal pursuit by receiving high-level objectives and independently determining how to achieve them, breaking complex goals into manageable tasks and executing them strategically. This capability allows users to specify broad research intentions without needing to prescribe specific methodologies or detailed execution steps. Adaptive planning represents another core agentic capability where the system continuously evaluates its progress and adjusts its approach based on intermediate findings and changing circumstances, rather than following rigid procedures. When new information suggests better research directions or when initial approaches prove less fruitful than expected, the agent autonomously modifies its strategy to optimize outcomes.


Tool orchestration demonstrates the system's ability to intelligently select and coordinate multiple tools and data sources, understanding when and how to use each capability to maximum effect. The agent doesn't simply execute tools in predetermined sequences but makes contextual decisions about which capabilities to employ based on the specific requirements of each research goal. Self-reflection represents perhaps the most sophisticated agentic capability, where the agent analyzes its own performance, learns from experience, and continuously improves its strategies and methodologies. This metacognitive ability enables the system to become more effective over time without external programming or manual optimization.


Contextual memory provides the agent with both short-term working memory for current tasks and long-term semantic memory that allows it to build upon previous research and avoid redundant work. This memory architecture enables the system to maintain coherence across extended research sessions while accumulating knowledge that improves future performance. This architecture achieves agency through a carefully orchestrated interplay of specialized subsystems, each contributing essential capabilities while maintaining clear separation of concerns and robust error handling. The resulting system exhibits emergent intelligent behavior that arises from the interaction of these subsystems rather than being explicitly programmed.


ARCHITECTURAL PHILOSOPHY AND DESIGN PRINCIPLES


The system architecture is founded on several key design principles that enable both sophisticated autonomous behavior and practical production deployment. These principles guide every aspect of the system design and implementation, ensuring that the architecture remains maintainable, extensible, and reliable while supporting complex autonomous behaviors.


LAYERED ABSTRACTION ARCHITECTURE


The system employs a multi-layered architecture where each layer provides specific abstractions and services to the layers above it. This design enables complexity management while maintaining clear interfaces between different system concerns. 

  • The presentation layer handles REST API endpoints and user interfaces, providing clean abstractions for external interaction with the system. 
  • The application layer manages main agent orchestration and session management, coordinating overall system behavior and maintaining state across research sessions. 
  • The domain layer implements core agentic capabilities including planning, execution, and reflection, encapsulating the intelligent behaviors that define the system's autonomous nature. 
  • The infrastructure layer provides tools, memory systems, and LLM integration, offering the fundamental capabilities that enable intelligent behavior. 
  • Finally, the hardware layer manages local compute resources, storage, and networking, abstracting the physical infrastructure from higher-level concerns.


AUTONOMOUS CONTROL LOOPS


The architecture implements multiple autonomous control loops that operate at different time scales and abstraction levels, creating a hierarchy of autonomous behaviors that work together to achieve sophisticated goals. 

  • The planning loop handles strategic goal decomposition and task sequencing, operating at the highest level to transform abstract objectives into concrete action plans. 
  • The execution loop manages individual task execution with real-time adaptation, making tactical decisions about tool usage and parameter optimization during task execution. 
  • The reflection loop conducts performance analysis and methodology improvement, operating at a longer timescale to identify patterns and optimize future behavior. 
  • The monitoring loop maintains system health and resource management, ensuring that the system operates within acceptable performance and resource boundaries.


Each loop operates independently while sharing information through well-defined interfaces, creating emergent intelligent behavior through their interaction. This design enables the system to simultaneously pursue long-term strategic goals while adapting to immediate tactical challenges and continuously improving its overall capabilities.


MICROSERVICES-INSPIRED MODULARITY


While deployed as a single application for simplicity, the architecture follows microservices principles with loosely coupled, highly cohesive components. Each major subsystem could theoretically be deployed separately, enabling future scaling and evolution of the system architecture. 

  • The Tool Registry functions as a service discovery mechanism, enabling dynamic registration and discovery of available capabilities. 
  • The Memory System operates as a knowledge management service, providing sophisticated storage and retrieval of research findings and contextual information. 
  • The Planning System serves as a strategic reasoning service, transforming high-level goals into executable plans through sophisticated reasoning processes. 
  • The Reflection System acts as a performance optimization service, analyzing system behavior and generating improvement recommendations.


This modular design ensures that individual components can be enhanced, replaced, or scaled independently while maintaining overall system coherence. The clean interfaces between components also facilitate testing, debugging, and maintenance of individual subsystems without affecting the entire system.


EVENT-DRIVEN COMMUNICATION


Components communicate through an event-driven pattern where state changes trigger appropriate responses throughout the system. This design enables loose coupling between components while ensuring that relevant information flows efficiently throughout the system. When tasks complete, events notify relevant subsystems about new information, completed work, or changed circumstances. When plans are updated, events inform execution components about new priorities or modified strategies. This approach makes the system highly extensible and maintainable while supporting the complex coordination required for autonomous behavior.


CORE ARCHITECTURAL COMPONENTS


THE RESEARCH AGENT CORE: ORCHESTRATING INTELLIGENCE


At the heart of the system lies the ResearchAgent class, which serves as the primary orchestrator of all agentic behavior. This component embodies the "executive function" of artificial intelligence, making high-level decisions about resource allocation, task prioritization, and strategic direction. The Agent Core represents the central coordination point where all autonomous behaviors converge and where the system's overall intelligence emerges from the interaction of specialized subsystems.


The Agent Core maintains the complete state of ongoing research through the AgentState data structure, which provides a comprehensive view of current system status and context. The current goal represents the high-level research objective driving all activities, serving as the primary organizing principle for all system behavior. The task queue contains a dynamically managed list of planned tasks awaiting execution, with priorities and dependencies that guide execution order. The completed tasks section maintains a historical record of accomplished work with results, enabling the system to build upon previous accomplishments and avoid redundant effort. Working memory stores immediate context and intermediate findings that influence ongoing decisions and maintain coherence across related tasks. Session context preserves environmental information and constraints that shape how the agent approaches its current research objectives.


The core implements a sophisticated execution loop that demonstrates true autonomous behavior through several key phases of operation. During goal analysis, upon receiving a research objective, the agent doesn't simply execute predefined procedures but instead analyzes the goal's complexity, identifies knowledge gaps, and determines the most effective research strategy based on available resources and contextual factors. The dynamic planning phase sees the agent creating initial plans but continuously adapting them based on discoveries, such as when initial web searches reveal unexpected information sources that prompt the agent to autonomously adjust its strategy to pursue more promising directions.


Resource management demonstrates the agent's ability to intelligently manage computational resources, balancing thoroughness with efficiency by tracking token usage in the LLM, monitoring tool execution times, and adjusting its approach to stay within operational constraints. Throughout execution, quality assessment enables the agent to evaluate the quality and relevance of information it discovers, making autonomous decisions about when to continue pursuing a particular line of inquiry versus when to pivot to alternative approaches based on the value and reliability of discovered information.


THE PLANNING SYSTEM: STRATEGIC INTELLIGENCE


The PlanningSystem represents the "strategic reasoning" capability of the agent, transforming abstract research goals into concrete, executable action plans. This component demonstrates sophisticated reasoning capabilities that go far beyond simple task decomposition, incorporating contextual awareness, resource optimization, and adaptive strategy formulation into its planning processes.


The planning system operates through a multi-level planning architecture that addresses different aspects of research strategy simultaneously. At the strategic level, the system determines overall research approach and methodology selection, making high-level decisions about how to approach complex research objectives based on goal characteristics, available resources, and historical performance data. The tactical level focuses on specific tool selection and parameter optimization, choosing which capabilities to employ and how to configure them for maximum effectiveness in the current context. At the operational level, the system handles detailed task sequencing and dependency management, ensuring that individual tasks execute in logical order while maximizing parallel execution opportunities.


Rather than generating generic plans, the system creates contextually appropriate strategies that adapt to available capabilities and environmental constraints. A research goal about "renewable energy adoption" will generate different plans depending on whether the agent has access to academic databases, government statistics, or only web search capabilities, demonstrating the system's ability to adapt its approach based on available resources and constraints.


The planning system implements sophisticated adaptive replanning mechanisms that continuously monitor execution progress and trigger replanning when circumstances warrant modification of the original strategy. Replanning may occur when initial searches reveal better data sources than anticipated, requiring strategy adjustment to take advantage of superior information sources. Discovered information might suggest more promising research directions that weren't apparent during initial planning, prompting strategic pivots to pursue more valuable lines of inquiry. Resource constraints may require strategy optimization to maintain progress within available computational or time limitations. Quality assessment might indicate gaps in the current approach that necessitate additional tasks or alternative methodologies to ensure comprehensive coverage of the research objective.


The planner demonstrates dependency-aware scheduling by understanding task dependencies and optimally sequencing work to minimize waiting time while ensuring logical progression of research activities. For example, the system ensures that data gathering tasks complete before analysis tasks that depend on their results, while also identifying opportunities for parallel execution of independent tasks to maximize overall efficiency.


THE TOOL ORCHESTRATION SYSTEM: CAPABILITY INTEGRATION


The ToolRegistry and associated tool implementations provide the agent with concrete capabilities for interacting with the external world. This subsystem demonstrates sophisticated capability composition and resource management, enabling the agent to leverage diverse tools and data sources in coordinated fashion to achieve research objectives that would be impossible with any single capability.


The system implements dynamic tool selection processes where the agent doesn't simply use tools in predetermined sequences but instead intelligently selects tools based on multiple contextual factors. The specific requirements of each research goal influence tool selection, with different objectives requiring different combinations of search, analysis, and visualization capabilities. The current context and available information shape tool choices, as the agent considers what information it already possesses and what additional data would be most valuable. Historical performance data for different tool combinations informs selection decisions, enabling the agent to learn which tool sequences tend to produce the best results for different types of research objectives. Resource constraints and efficiency considerations also influence tool selection, ensuring that the agent chooses approaches that balance thoroughness with practical limitations.


For each tool invocation, the agent demonstrates intelligent parameter optimization based on the specific context and objectives of the current task. Web search queries are crafted to maximize relevant results by incorporating context from previous research findings and optimizing search terms for the specific information being sought. Visualization parameters are selected to best represent the data characteristics and research objectives, choosing chart types, scales, and formatting that most effectively communicate the insights being discovered. Document analysis focuses on the most relevant sections based on research goals and current context, optimizing processing time while ensuring comprehensive coverage of pertinent information.


The tool system implements sophisticated error recovery mechanisms that enable resilient operation even when individual tools fail or produce suboptimal results. When tools fail, the agent automatically retries with adjusted parameters, attempting to resolve issues through parameter modification or alternative configuration options. If direct retry approaches fail, the system falls back to alternative tools that can accomplish similar objectives, maintaining research progress even when preferred tools are unavailable. The agent adjusts its overall strategy to work around unavailable capabilities, modifying research approaches to achieve objectives through alternative means when specific tools cannot be utilized. Most importantly, the system learns from failures to avoid similar issues in future sessions, incorporating failure experiences into its decision-making processes to improve future tool selection and configuration.


The system continuously monitors tool performance and optimizes usage patterns through systematic performance optimization processes. Execution times are tracked across different tools and configurations, enabling the agent to choose approaches that balance thoroughness with efficiency. Success rates are monitored for different tool combinations and contexts, allowing the system to identify which approaches are most reliable for specific types of research tasks. Result quality metrics are collected and analyzed to ensure that efficiency optimizations don't compromise the effectiveness of research outcomes, maintaining a balance between speed and quality that serves user objectives.


THE MEMORY AND CONTEXT SYSTEM: COGNITIVE ARCHITECTURE


The memory system provides the agent with sophisticated cognitive capabilities that enable learning, context maintenance, and knowledge accumulation over time. This cognitive architecture represents one of the most advanced aspects of the system, implementing memory patterns inspired by human cognition while leveraging modern computational capabilities to achieve sophisticated information storage and retrieval.


The system implements a dual memory architecture inspired by human cognition that addresses both immediate operational needs and long-term learning requirements. Working Memory, managed by the ContextManager, maintains immediate context and intermediate results during active research sessions, enabling coherent reasoning across multiple tasks within a single research effort and ensuring that discoveries in one task inform decision-making in subsequent tasks. Long-Term Memory, implemented through the MemorySystem, stores semantic knowledge from past research sessions using vector embeddings for semantic retrieval, enabling the agent to build upon previous work and avoid redundant research while accessing relevant insights from historical research activities.


Rather than storing simple key-value pairs, the memory system uses vector embeddings to represent knowledge semantically, enabling sophisticated queries and retrieval operations that go beyond exact text matching. This semantic knowledge representation allows the agent to find conceptually related information even when different terminology is used, creating a more flexible and powerful knowledge management system that can adapt to varied expression of similar concepts.


The memory system implements context-aware retrieval mechanisms that don't just store information but intelligently retrieve relevant context based on current goals and activities. When researching "solar power efficiency," the system automatically surfaces relevant past research on "photovoltaic technology" or "renewable energy systems," demonstrating the semantic understanding that enables effective knowledge reuse and prevents redundant research efforts.


Like biological memory systems, the architecture implements selective retention through memory consolidation and forgetting processes where more important and frequently accessed information is preserved while less relevant details are allowed to fade. This approach prevents memory systems from becoming cluttered with irrelevant information while ensuring that valuable insights and frequently needed knowledge remain easily accessible for future research activities.


THE REFLECTION AND IMPROVEMENT SYSTEM: METACOGNITIVE CAPABILITIES


The ReflectionSystem represents perhaps the most sophisticated aspect of the architecture, implementing the agent's ability to analyze its own performance and continuously improve its capabilities. This metacognitive capability distinguishes truly agentic systems from simpler automated tools by enabling self-directed learning and optimization that doesn't require external programming or manual tuning.


The reflection system evaluates performance across multiple dimensions to provide comprehensive assessment of system effectiveness and identify specific areas for improvement. Research quality assessment examines the comprehensiveness, accuracy, and depth of analysis produced by research sessions, evaluating whether the agent is meeting its primary objective of producing valuable and reliable research outputs. Efficiency analysis focuses on resource utilization and time management, ensuring that the agent achieves its objectives within reasonable resource constraints while identifying opportunities for optimization. Tool effectiveness evaluation examines success rates and optimal usage patterns for different tools and configurations, enabling the system to learn which approaches work best in different contexts. Strategic soundness assessment evaluates the quality of planning and adaptation decisions, analyzing whether the agent's high-level strategic choices lead to effective research outcomes.


The system goes beyond collecting performance metrics to actively learning from experience through sophisticated analysis and pattern recognition. The reflection system identifies patterns in successful research sessions and incorporates those insights into future planning and execution strategies, enabling the agent to replicate successful approaches while avoiding strategies that have proven ineffective. This learning process operates continuously, with each research session contributing to the agent's understanding of effective research methodologies and optimal resource utilization.


Based on comprehensive performance analysis, the system generates specific, actionable improvement recommendations that can be implemented in future research sessions. These automated improvement recommendations might include adjusting tool selection criteria based on observed performance patterns, modifying planning strategies to better sequence research activities, or optimizing resource allocation to improve efficiency while maintaining quality. The recommendations are specific and actionable rather than generic, enabling meaningful system improvements over time.


The reflection system tracks performance trends over time and provides predictive insights about when certain approaches are likely to be effective and when alternative strategies should be considered. This trend analysis enables the agent to adapt its behavior based on longer-term patterns rather than just immediate feedback, creating more stable and reliable improvements in system performance.


LOCAL LLM INTEGRATION: AUTONOMOUS REASONING ENGINE


The MistralProvider component represents the reasoning engine that enables the agent's sophisticated language understanding and generation capabilities while maintaining complete local control over all reasoning processes. This local deployment approach ensures privacy, reduces latency, and eliminates dependencies on external services while providing the sophisticated language capabilities necessary for autonomous research and analysis.


The system implements numerous optimizations for local LLM deployment that enable effective operation on standard hardware configurations. Memory optimization uses 16-bit precision and quantization techniques to reduce memory requirements while maintaining quality, making it possible to run sophisticated language models on systems with limited GPU memory. Device management automatically distributes model components across available hardware resources including both CPU and GPU, optimizing performance based on the specific hardware configuration available. Caching strategies implement intelligent caching to avoid redundant generations for repeated queries, improving both performance and resource efficiency by reusing previous reasoning results when appropriate.


The LLM integration goes beyond simple text generation to implement sophisticated context management that enables complex reasoning and planning tasks. Structured output generation ensures reliable JSON generation for tool parameters and planning decisions, enabling the LLM to interface effectively with other system components through well-defined data structures. Context window management implements intelligent truncation and summarization to work within model limitations while preserving essential information needed for effective reasoning. Dynamic temperature adjustment modifies generation parameters based on the type of reasoning required, using lower temperatures for analytical tasks that require precision and higher temperatures for creative or exploratory reasoning tasks.


By using local LLM deployment, the system ensures that sensitive research data never leaves the user's environment, providing essential privacy and security protections. This approach is crucial for proprietary research, confidential analysis, or work with sensitive data sources where external service dependencies would create unacceptable security risks or privacy concerns.


DATA FLOW AND COMMUNICATION PATTERNS


INFORMATION FLOW ARCHITECTURE


The system implements sophisticated information flow patterns that enable coherent autonomous behavior while maintaining clear separation of concerns between different subsystems. These patterns ensure that information flows efficiently throughout the system while maintaining data integrity and enabling the complex coordination required for autonomous research activities.


When a research request enters the system, it follows a well-defined request processing pipeline that ensures consistent handling and appropriate resource allocation. Input validation and normalization processes research goals, validating them for completeness and clarity while normalizing terminology and extracting key requirements that will guide subsequent processing. Context enrichment incorporates relevant historical information and environmental constraints that influence how the research should be conducted, drawing from the memory system to provide background context that improves research quality. Goal decomposition transforms abstract research objectives into concrete, measurable sub-goals that can be addressed through specific research activities, creating a clear roadmap for achieving the overall objective. Resource allocation determines appropriate computational resources and time constraints for the research session, ensuring that the approach matches available capabilities and user expectations.


INTER-COMPONENT COMMUNICATION


Components throughout the system communicate through standardized interfaces and protocols that enable loose coupling while ensuring reliable information exchange. The Planning System communicates with the Tool Registry to understand available capabilities and their characteristics, enabling informed decisions about which tools to incorporate into research plans. The Execution Engine coordinates with individual tools through the Tool Registry, passing parameters and receiving results in standardized formats that enable consistent processing regardless of the specific tool being utilized.


The Memory System receives information from multiple sources throughout the system, including task results from the Execution Engine, performance metrics from the Monitoring System, and reflection insights from the Reflection System. This comprehensive information gathering enables the memory system to maintain a complete picture of system activity and provide relevant context for future research sessions.


The Reflection System monitors activity across all other components, collecting performance data, analyzing patterns, and generating insights that inform future system behavior. This monitoring occurs continuously and non-intrusively, ensuring that reflection processes don't interfere with primary research activities while gathering the information necessary for meaningful performance analysis.


ERROR HANDLING AND RESILIENCE PATTERNS


The architecture implements comprehensive error handling and resilience patterns that enable continued operation even when individual components experience failures or suboptimal performance. These patterns are essential for autonomous operation, as the system must be able to recover from failures and adapt to unexpected circumstances without human intervention.


Component-level error handling ensures that failures in individual tools or subsystems don't cascade to affect the entire system. When tools fail, the system implements automatic retry mechanisms with adjusted parameters, attempting to resolve transient issues through configuration changes or alternative approaches. If retry mechanisms are unsuccessful, fallback strategies activate alternative tools or methodologies that can accomplish similar objectives, ensuring that research progress continues even when preferred approaches are unavailable.


System-level resilience mechanisms monitor overall system health and implement corrective actions when performance degrades or resource constraints are encountered. Resource monitoring tracks memory usage, CPU utilization, and other system resources, triggering optimization measures or graceful degradation when resources become constrained. Performance monitoring identifies when system components are operating outside normal parameters and initiates diagnostic and corrective processes to restore optimal operation.


The error handling system learns from failures to improve future resilience and performance. Failure patterns are analyzed to identify common causes and potential preventive measures, enabling the system to avoid similar issues in future operations. Recovery strategies are evaluated for effectiveness and optimized based on actual performance during failure scenarios, creating a continuously improving resilience capability.


SCALABILITY AND PERFORMANCE ARCHITECTURE


COMPUTATIONAL RESOURCE MANAGEMENT


The system implements sophisticated computational resource management that enables effective operation across a wide range of hardware configurations while optimizing performance based on available capabilities. This resource management approach ensures that the system can scale appropriately from development environments to production deployments while maintaining consistent behavior and performance characteristics.


Memory management strategies optimize usage of both system RAM and GPU memory to enable effective operation on standard hardware configurations. The system monitors memory usage continuously and implements intelligent caching and garbage collection strategies that minimize memory overhead while maintaining performance. Model loading and optimization techniques ensure that the local LLM operates efficiently within available memory constraints while providing the reasoning capabilities necessary for autonomous operation.


CPU and GPU utilization optimization balances computational workloads across available processing resources to maximize throughput while preventing resource contention that could degrade performance. The system automatically detects available hardware capabilities and configures processing strategies accordingly, ensuring optimal performance regardless of the specific hardware configuration.


Concurrent processing capabilities enable parallel execution of independent tasks while maintaining coordination and resource sharing for dependent activities. The system identifies opportunities for parallel execution within research plans and coordinates multiple concurrent activities without creating resource conflicts or coordination problems.


HORIZONTAL AND VERTICAL SCALING


The architecture supports both horizontal and vertical scaling approaches to accommodate different deployment requirements and growth patterns. Vertical scaling capabilities enable the system to take advantage of more powerful hardware by automatically utilizing additional CPU cores, GPU resources, and memory capacity when available. The system detects hardware capabilities at startup and configures processing strategies to utilize available resources effectively.


Horizontal scaling support enables deployment across multiple systems or containers for high-throughput research applications. The modular architecture facilitates distribution of different components across multiple systems, with clear interfaces that enable network-based communication between distributed components. Load balancing capabilities distribute research requests across multiple agent instances while maintaining session consistency and result quality.


The stateless design of many components facilitates scaling by enabling multiple instances to operate independently while sharing common resources like the memory system and tool registry. This approach enables linear scaling of processing capacity while maintaining the coordination and context sharing necessary for effective autonomous operation.


PERFORMANCE MONITORING AND OPTIMIZATION


Comprehensive performance monitoring capabilities track system behavior across multiple dimensions and provide insights for ongoing optimization efforts. Response time monitoring tracks the duration of different types of research activities and identifies bottlenecks or performance degradation that might require attention. Throughput analysis measures the system's capacity to handle multiple concurrent research requests while maintaining quality and responsiveness.


Resource utilization monitoring provides detailed insights into how computational resources are being used and identifies opportunities for optimization. Memory usage patterns are analyzed to identify potential memory leaks or inefficient allocation strategies that could be improved. CPU and GPU utilization patterns help identify whether processing resources are being used effectively or whether workload distribution could be optimized.


Quality metrics monitoring ensures that performance optimizations don't compromise the effectiveness of research outputs. Research quality scores are tracked over time to ensure that efficiency improvements maintain or improve the value of research results. User satisfaction metrics provide feedback about whether system performance meets user expectations and requirements.


The performance monitoring system generates automated optimization recommendations based on observed patterns and performance characteristics. These recommendations might include configuration adjustments, resource allocation modifications, or architectural changes that could improve system performance while maintaining reliability and quality.


SECURITY AND PRIVACY ARCHITECTURE


DATA PROTECTION AND PRIVACY


The architecture implements comprehensive data protection and privacy measures that ensure sensitive research information remains secure throughout all system operations. Local processing capabilities ensure that research data never leaves the user's environment, eliminating the privacy risks associated with cloud-based processing or external service dependencies.


Data encryption mechanisms protect sensitive information both at rest and during processing, ensuring that research data remains secure even if storage media or memory dumps are compromised. The system implements strong encryption standards for all persistent storage and uses secure memory handling practices to minimize the risk of sensitive information exposure.


Access control mechanisms ensure that research data and system capabilities are only accessible to authorized users and processes. Authentication and authorization systems verify user identity and enforce appropriate access restrictions based on user roles and permissions. Session management ensures that research activities remain isolated between different users and sessions.


Data retention policies automatically manage the lifecycle of research data and system logs, ensuring that sensitive information is retained only as long as necessary for system operation and user requirements. Secure deletion mechanisms ensure that sensitive data is completely removed from system storage when it is no longer needed.


SYSTEM SECURITY AND HARDENING


The system implements multiple layers of security controls that protect against various types of attacks and security threats. Input validation mechanisms ensure that all user inputs and external data sources are properly sanitized and validated before processing, preventing injection attacks and other input-based security vulnerabilities.


Sandbox isolation mechanisms ensure that tool execution and external data processing occur in controlled environments that limit the potential impact of malicious or corrupted data. The system implements appropriate isolation between different research sessions and between research activities and core system functions.


Network security controls limit and monitor external network communications to ensure that only necessary and authorized network activities occur. The system implements appropriate firewall rules and network monitoring to detect and prevent unauthorized network access or data exfiltration attempts.


Regular security updates and vulnerability management processes ensure that the system remains protected against newly discovered security threats. The system implements automated update mechanisms for security-critical components while maintaining stability and compatibility of research capabilities.


AUDIT AND COMPLIANCE CAPABILITIES


Comprehensive audit logging capabilities track all significant system activities and provide detailed records for compliance and security monitoring purposes. Research activity logs record details about research requests, execution steps, and results in a format that enables compliance verification and security analysis. System access logs track user authentication, authorization decisions, and system administration activities to enable security monitoring and incident investigation.


Data handling audit trails record how sensitive information is processed, stored, and transmitted throughout the system, enabling compliance verification for data protection regulations and organizational policies. Tool execution logs provide detailed records of external data access and processing activities to enable security analysis and compliance verification.


Compliance reporting capabilities generate standardized reports that demonstrate adherence to relevant regulations and organizational policies. The system can generate reports for data protection compliance, security compliance, and operational compliance requirements based on collected audit data.


Incident response capabilities enable rapid detection and response to security incidents or compliance violations. The system implements automated monitoring and alerting mechanisms that detect suspicious activities or policy violations and initiate appropriate response procedures to minimize impact and ensure proper incident handling.


DEPLOYMENT AND OPERATIONS ARCHITECTURE


CONTAINERIZATION AND ORCHESTRATION


The system leverages containerization technologies to ensure consistent deployment across different environments while simplifying operations and maintenance activities. Docker containerization encapsulates all system components and dependencies in portable containers that can be deployed consistently across development, testing, and production environments. This approach eliminates environment-specific configuration issues and ensures that the system behaves consistently regardless of the underlying infrastructure.


Kubernetes orchestration capabilities enable sophisticated deployment strategies including rolling updates, automatic scaling, and failure recovery mechanisms. The system implements appropriate health checks and readiness probes that enable Kubernetes to monitor system health and automatically restart or replace failed components. Service discovery mechanisms enable dynamic configuration and communication between system components even as they are moved or scaled across different nodes in a cluster.


Configuration management approaches enable environment-specific customization while maintaining consistent core behavior across different deployments. The system uses environment variables and configuration files to customize behavior for different deployment scenarios while ensuring that core functionality remains consistent and reliable.


MONITORING AND OBSERVABILITY


Comprehensive monitoring and observability capabilities provide detailed insights into system behavior and enable proactive identification and resolution of performance or reliability issues. Application performance monitoring tracks response times, throughput, and error rates across all system components, enabling identification of performance bottlenecks or degradation that might require attention.


Infrastructure monitoring tracks the health and performance of underlying hardware and system resources including CPU utilization, memory usage, disk I/O, and network performance. This monitoring enables identification of resource constraints or hardware issues that might affect system performance or reliability.


Distributed tracing capabilities track research requests as they flow through different system components, enabling detailed analysis of request processing paths and identification of performance bottlenecks or failure points. This capability is essential for debugging complex issues and optimizing system performance in distributed deployment scenarios.


Log aggregation and analysis systems collect and analyze log data from all system components to enable centralized monitoring and troubleshooting. The system implements structured logging practices that facilitate automated analysis and alerting based on log content and patterns.


BACKUP AND DISASTER RECOVERY


Comprehensive backup and disaster recovery capabilities ensure that research data and system configuration can be recovered in the event of hardware failures, software issues, or other catastrophic events. Automated backup mechanisms regularly capture research data, system configuration, and learned knowledge from the memory system to enable complete system recovery if necessary.


Data replication strategies ensure that critical information is stored in multiple locations to prevent data loss in the event of storage failures. The system implements appropriate replication mechanisms for different types of data based on their criticality and recovery requirements.


Disaster recovery procedures provide step-by-step guidance for restoring system operation after various types of failures or disasters. These procedures are tested regularly to ensure that they remain effective and can be executed successfully when needed.


Business continuity planning ensures that research capabilities can be maintained even during system outages or maintenance activities. The system implements appropriate redundancy and failover mechanisms that enable continued operation during planned maintenance or unexpected failures.


CONCLUSION: EMERGENT INTELLIGENCE THROUGH ARCHITECTURAL SOPHISTICATION


The Research Assistant system demonstrates that sophisticated autonomous behavior can emerge from the careful orchestration of well-designed architectural components working in concert. The system achieves genuine agency not through any single "intelligent" component but through the complex interactions between specialized subsystems that each contribute essential capabilities to the overall intelligent behavior.


The layered architecture enables sophisticated autonomous behavior while maintaining clear separation of concerns and manageable complexity. Each layer provides appropriate abstractions that enable higher-level reasoning while hiding implementation details that would otherwise complicate decision-making processes. This approach enables the system to operate at multiple levels of abstraction simultaneously, from high-level strategic planning to low-level resource management, while maintaining coherent overall behavior.


The autonomous control loops create a hierarchy of intelligent behavior that operates at different timescales and addresses different aspects of research execution. The interaction between these loops generates emergent intelligence that adapts to circumstances, learns from experience, and continuously improves performance without external programming or manual optimization.


The sophisticated memory and context management systems enable the agent to maintain coherence across extended research sessions while accumulating knowledge that improves future performance. This cognitive architecture represents one of the most advanced aspects of the system, implementing memory patterns that enable both immediate operational effectiveness and long-term learning and improvement.


The comprehensive error handling and resilience mechanisms ensure that the system can operate autonomously even in the face of failures, unexpected circumstances, or resource constraints. This resilience is essential for practical autonomous operation and distinguishes the system from simpler automated tools that require constant supervision and intervention.


The result is a truly agentic system that can receive high-level objectives and autonomously determine how to achieve them, adapting to circumstances, learning from experience, and continuously improving its capabilities. The architecture demonstrates that sophisticated autonomous behavior can be achieved through careful design and implementation of interconnected components that work together to create intelligence that emerges from their interaction rather than being explicitly programmed into any single component.


HOW TO IMPLEMENT THIS ARCHITECTURE

To implement this architecture, I‘ll introduce ten steps, starting with setup and ending with testing. After the code for each step you‘ll find sections that explain the code.


STEP 1: ENVIRONMENT SETUP


PREREQUISITES AND DEPENDENCIES


The following Python packages are required for the complete system:


install_dependencies.sh:

------------------------

pip install transformers torch accelerate

pip install langchain langchain-community  

pip install requests beautifulsoup4 arxiv

pip install matplotlib plotly pandas

pip install sentence-transformers chromadb

pip install pydantic fastapi uvicorn


Code Explanation:

  • transformers: Provides access to Hugging Face transformer models like Mistral
  • torch: PyTorch framework for running neural networks locally
  • langchain: Framework for building LLM applications with tool integration
  • requests/beautifulsoup4: Web scraping and HTTP request handling
  • matplotlib/plotly: Data visualization capabilities
  • sentence-transformers: For creating embeddings for the memory system
  • chromadb: Vector database for storing and retrieving agent memories
  • pydantic/fastapi: For building REST APIs and data validation


LOCAL MISTRAL SETUP


# install_mistral.py

from transformers import AutoTokenizer, AutoModelForCausalLM

import torch


def setup_mistral():

    """

    Initialize and load the Mistral 7B Instruct model locally.

    This function handles model download, tokenizer setup, and memory optimization.

    """

    model_name = "mistralai/Mistral-7B-Instruct-v0.2"

    

    # Load tokenizer - handles text encoding/decoding for the model

    print("Loading Mistral tokenizer...")

    tokenizer = AutoTokenizer.from_pretrained(model_name)

    

    # Load model with optimization settings

    print("Loading Mistral model (this may take several minutes)...")

    model = AutoModelForCausalLM.from_pretrained(

        model_name,

        torch_dtype=torch.float16,  # Use half precision to reduce memory usage

        device_map="auto"           # Automatically distribute across available GPUs/CPU

    )

    

    return tokenizer, model


# Test the setup to ensure everything works

if __name__ == "__main__":

    try:

        tokenizer, model = setup_mistral()

        print("SUCCESS: Mistral model loaded successfully!")

        print(f"Model loaded on: {model.device}")

        print(f"Memory usage: ~{torch.cuda.memory_allocated() / 1e9:.2f} GB")

    except Exception as e:

        print(f"ERROR: Failed to load Mistral model: {e}")


Code Explanation:

  • We use Mistral-7B-Instruct-v0.2, which is optimized for instruction following torch.float16 reduces memory usage from ~14GB to ~7GB without major quality loss
  • device_map="auto" automatically handles GPU/CPU placement based on available hardware
  • The tokenizer converts text to tokens the model can understand
  • Error handling ensures we can diagnose loading issues


STEP 2: CORE AGENT ARCHITECTURE  


# agent_core.py

from typing import List, Dict, Any, Optional

from dataclasses import dataclass

from enum import Enum

import json

import uuid

from datetime import datetime


class TaskStatus(Enum):

    """

    Enumeration defining the possible states of agent tasks.

    This helps track task progress and enables proper error handling.

    """

    PENDING = "pending"        # Task created but not started

    IN_PROGRESS = "in_progress"  # Task currently being executed

    COMPLETED = "completed"    # Task finished successfully  

    FAILED = "failed"         # Task encountered an error


@dataclass

class Task:

    """

    Represents a single executable task within the agent's plan.

    Each task encapsulates what tool to use, with what parameters,

    and tracks its execution status and results.

    """

    id: str                    # Unique identifier for tracking

    description: str           # Human-readable task description

    tool_name: str            # Which tool should execute this task

    parameters: Dict[str, Any] # Tool-specific parameters

    status: TaskStatus = TaskStatus.PENDING

    result: Optional[Any] = None      # Stores execution results

    created_at: datetime = None       # Timestamp for debugging

    

    def __post_init__(self):

        """Automatically set creation timestamp if not provided"""

        if self.created_at is None:

            self.created_at = datetime.now()


@dataclass  

class AgentState:

    """

    Maintains the complete state of the agent during execution.

    This is the agent's "working memory" containing current goals,

    task queues, completed work, and accumulated context.

    """

    current_goal: str               # The high-level objective

    task_queue: List[Task]          # Tasks waiting to be executed

    completed_tasks: List[Task]     # Successfully finished tasks

    memory: Dict[str, Any]          # Persistent knowledge storage

    context: str                    # Current situational context


class ResearchAgent:

    """

    Main agent class that orchestrates the research process.

    Combines planning, execution, and reflection capabilities

    to autonomously achieve research objectives.

    """

    

    def __init__(self, llm_provider, tool_registry):

        """

        Initialize the agent with required components.

        

        Args:

            llm_provider: Interface to the local Mistral model

            tool_registry: Collection of available tools

        """

        self.llm = llm_provider

        self.tools = tool_registry

        self.state = AgentState(

            current_goal="",

            task_queue=[],

            completed_tasks=[],

            memory={},

            context=""

        )

    

    async def execute_goal(self, goal: str) -> str:

        """

        Main execution loop for achieving a research goal.

        This method orchestrates the entire research process from

        planning through execution to final synthesis.

        

        Args:

            goal: High-level research objective (e.g., "Analyze renewable energy trends")

            

        Returns:

            Comprehensive research report as a string

        """

        print(f"Agent received goal: {goal}")

        self.state.current_goal = goal

        

        # PHASE 1: Strategic Planning

        print("Phase 1: Creating research plan...")

        plan = await self.create_plan(goal)

        print(f"Generated {len(self.state.task_queue)} tasks")

        

        # PHASE 2: Autonomous Execution  

        print("Phase 2: Executing research tasks...")

        task_count = 0

        while self.state.task_queue:

            task = self.state.task_queue.pop(0)

            task_count += 1

            

            print(f"  Executing task {task_count}: {task.description}")

            await self.execute_task(task)

            

            # PHASE 3: Adaptive Replanning

            # After every few tasks, evaluate if we need to adjust our approach

            if task_count % 3 == 0 and await self.should_replan():

                print("  Adaptive replanning triggered...")

                new_tasks = await self.replan()

                self.state.task_queue.extend(new_tasks)

        

        # PHASE 4: Synthesis and Reporting

        print("Phase 4: Synthesizing final results...")

        return await self.synthesize_results()


Code Explanation:

  • The Task dataclass encapsulates all information needed to execute a single action
  • TaskStatus enum provides clear state tracking and enables robust error handling
  • AgentState maintains all context needed for autonomous operation
  • The execute_goal method implements a four-phase research process:

  1. Strategic planning using the LLM

  2. Autonomous task execution using tools

  3. Adaptive replanning based on intermediate results  

  4. Final synthesis into a comprehensive report

  • The architecture is designed to be extensible - new tools and capabilities  can be added without modifying the core agent logic


STEP 3: PLANNING SYSTEM


STRATEGIC PLANNING MODULE


# planning.py

class PlanningSystem:

    """

    Handles strategic planning for research objectives.

    Uses the local LLM to break down high-level goals into

    concrete, executable tasks with proper sequencing.

    """

    

    def __init__(self, llm_provider):

        self.llm = llm_provider

    

    async def create_plan(self, goal: str, context: str = "") -> List[Task]:

        """

        Generate a strategic research plan for achieving the given goal.

        This method prompts the LLM to think strategically about the research

        process and create a well-sequenced set of tasks.

        

        Args:

            goal: The research objective to plan for

            context: Additional context from previous research or user input

            

        Returns:

            List of Task objects representing the complete research plan

        """

        

        # Construct a detailed planning prompt that guides the LLM

        # to think systematically about the research process

        planning_prompt = f"""

You are an expert research planning AI. Your job is to create a comprehensive, well-sequenced plan for achieving a research objective.


RESEARCH GOAL: {goal}

ADDITIONAL CONTEXT: {context}


Available tools and their capabilities:

- web_search: Search the internet for current information and sources

  Parameters: query (string), num_results (int)

  

- pdf_analyzer: Download and analyze research papers, reports, and documents

  Parameters: url (string), focus_area (string)

  

- data_visualizer: Create charts, graphs, and visual representations  

  Parameters: data (list), chart_type (string), title (string)

  

- web_scraper: Extract structured data from specific websites

  Parameters: url (string), data_fields (list)

  

- synthesis: Combine and analyze information from multiple sources

  Parameters: sources (list), analysis_type (string)


PLANNING GUIDELINES:

1. Start with broad information gathering before diving into specifics

2. Sequence tasks logically - gather data before analyzing it

3. Include validation steps to verify important findings

4. Plan for synthesis and visualization of key insights

5. Consider multiple perspectives and sources


OUTPUT FORMAT (JSON):

{{

  "strategy": "Brief description of the overall research approach",

  "tasks": [

    {{

      "id": "task_001", 

      "description": "Clear description of what this task accomplishes",

      "tool_name": "name_of_tool_to_use",

      "parameters": {{"param1": "value1", "param2": "value2"}},

      "priority": 1,

      "dependencies": ["list_of_task_ids_that_must_complete_first"]

    }}

  ],

  "success_criteria": [

    "Specific measurable criteria for successful completion",

    "Additional criteria that define research quality"

  ]

}}


Create a detailed research plan:

"""

        

        try:

            # Generate the plan using the local LLM

            print("Generating research plan with Mistral...")

            response = await self.llm.generate(planning_prompt)

            

            # Parse the JSON response

            plan_data = json.loads(response)

            

            print(f"Strategy: {plan_data['strategy']}")

            print(f"Success criteria: {plan_data['success_criteria']}")

            

            # Convert the plan data into Task objects

            tasks = []

            for task_data in plan_data["tasks"]:

                task = Task(

                    id=task_data["id"],

                    description=task_data["description"], 

                    tool_name=task_data["tool_name"],

                    parameters=task_data["parameters"]

                )

                tasks.append(task)

                print(f"  Planned task: {task.description}")

            

            return tasks

            

        except json.JSONDecodeError as e:

            print(f"Error parsing LLM response as JSON: {e}")

            # Fallback to a simple default plan

            return self._create_fallback_plan(goal)

        except Exception as e:

            print(f"Error in planning: {e}")

            return self._create_fallback_plan(goal)

    

    def _create_fallback_plan(self, goal: str) -> List[Task]:

        """

        Create a basic research plan when LLM planning fails.

        This ensures the agent can still operate even if planning encounters issues.

        """

        print("Using fallback planning strategy...")

        

        return [

            Task(

                id="fallback_001",

                description=f"Search for general information about: {goal}",

                tool_name="web_search", 

                parameters={"query": goal, "num_results": 5}

            ),

            Task(

                id="fallback_002",

                description="Synthesize findings from search results",

                tool_name="synthesis",

                parameters={"sources": ["search_results"], "analysis_type": "summary"}

            )

        ]

    

    async def should_replan(self, agent_state: AgentState) -> bool:

        """

        Determine if replanning is needed based on current research progress.

        This enables adaptive behavior - the agent can change course if

        initial findings suggest better approaches.

        

        Args:

            agent_state: Current state of the agent including completed tasks

            

        Returns:

            Boolean indicating whether replanning should occur

        """

        

        # Don't replan if we haven't completed enough tasks to assess progress

        if len(agent_state.completed_tasks) < 2:

            return False

        

        # Construct a prompt to evaluate research progress    

        replan_prompt = f"""

Analyze the current research progress and determine if we should adjust our approach.


ORIGINAL RESEARCH GOAL: {agent_state.current_goal}


PROGRESS SUMMARY:

- Tasks completed: {len(agent_state.completed_tasks)}

- Tasks remaining: {len(agent_state.task_queue)}


RECENT TASK RESULTS:

{self._format_recent_results(agent_state.completed_tasks[-3:])}


REMAINING PLANNED TASKS:

{self._format_remaining_tasks(agent_state.task_queue[:3])}


EVALUATION CRITERIA:

1. Are we making meaningful progress toward the research goal?

2. Do recent results suggest more promising research directions?

3. Are there significant gaps in our current approach?

4. Would additional or different tasks improve the final output?


Based on this analysis, should we replan our approach?


Answer with: YES or NO

If YES, provide a brief reason: [explanation]

If NO, provide a brief reason: [explanation]


Decision:

"""

        

        try:

            response = await self.llm.generate(replan_prompt)

            decision = "YES" in response.upper()

            

            if decision:

                print(f"Replanning triggered. Reason: {response}")

            else:

                print("Continuing with current plan")

                

            return decision

            

        except Exception as e:

            print(f"Error in replan evaluation: {e}")

            return False  # Default to not replanning on errors

    

    def _format_recent_results(self, recent_tasks: List[Task]) -> str:

        """Format recent task results for LLM analysis"""

        if not recent_tasks:

            return "No completed tasks yet"

        

        formatted = []

        for task in recent_tasks:

            result_summary = str(task.result)[:200] + "..." if len(str(task.result)) > 200 else str(task.result)

            formatted.append(f"- {task.description}: {result_summary}")

        

        return "\n".join(formatted)

    

    def _format_remaining_tasks(self, remaining_tasks: List[Task]) -> str:

        """Format remaining tasks for LLM analysis"""

        if not remaining_tasks:

            return "No remaining tasks"

        

        formatted = []

        for task in remaining_tasks:

            formatted.append(f"- {task.description} (using {task.tool_name})")

        

        return "\n".join(formatted)


Code Explanation:

  • The PlanningSystem uses structured prompting to guide the LLM through strategic thinking
  • JSON output format ensures machine-readable plans that can be parsed reliably
  • The planning prompt includes detailed tool descriptions so the LLM understands capabilities
  • Error handling with fallback planning ensures the agent continues operating even if the LLM  produces invalid JSON or encounters other issues  
  • The should_replan method enables adaptive behavior by evaluating progress and adjusting  the research approach based on intermediate findings
  • Task formatting methods prepare context for LLM evaluation in a readable format


STEP 4: TOOL SYSTEM


TOOL REGISTRY AND IMPLEMENTATION


# tools.py

from abc import ABC, abstractmethod

import requests

from bs4 import BeautifulSoup

import matplotlib.pyplot as plt

import pandas as pd

import uuid

import os


class Tool(ABC):

    """

    Abstract base class for all agent tools.

    Defines the interface that all tools must implement,

    ensuring consistent integration with the agent system.

    """

    

    @abstractmethod

    async def execute(self, **kwargs) -> Dict[str, Any]:

        """

        Execute the tool with given parameters.

        All tools must implement this method and return structured results.

        

        Args:

            **kwargs: Tool-specific parameters

            

        Returns:

            Dictionary containing execution results and metadata

        """

        pass


class WebSearchTool(Tool):

    """

    Tool for searching the web and retrieving structured information.

    Integrates with search APIs to gather current information for research.

    """

    

    def __init__(self, api_key: str):

        """

        Initialize the web search tool with API credentials.

        

        Args:

            api_key: API key for the search service

        """

        self.api_key = api_key

        self.base_url = "https://api.searchengine.com/search"  # Example URL

    

    async def execute(self, query: str, num_results: int = 5) -> Dict[str, Any]:

        """

        Search the web and return structured, relevant results.

        

        Args:

            query: Search query string

            num_results: Maximum number of results to return

            

        Returns:

            Dictionary containing search results and metadata

        """

        print(f"Searching web for: '{query}'")

        

        try:

            # Prepare search parameters

            params = {

                "q": query,

                "key": self.api_key,

                "num": num_results,

                "safe": "active"  # Enable safe search

            }

            

            # Make the search request

            response = requests.get(self.base_url, params=params, timeout=10)

            response.raise_for_status()

            

            search_data = response.json()

            

            # Process and structure the results

            structured_results = []

            for item in search_data.get("items", []):

                structured_results.append({

                    "title": item.get("title", "No title"),

                    "snippet": item.get("snippet", "No description available"),

                    "url": item.get("link", ""),

                    "source": item.get("displayLink", "Unknown source"),

                    "relevance_score": self._calculate_relevance(query, item)

                })

            

            # Sort by relevance score

            structured_results.sort(key=lambda x: x["relevance_score"], reverse=True)

            

            result = {

                "query": query,

                "results": structured_results,

                "total_found": len(structured_results),

                "search_time": response.elapsed.total_seconds(),

                "status": "success"

            }

            

            print(f"Found {len(structured_results)} relevant results")

            return result

            

        except requests.RequestException as e:

            print(f"Search request failed: {e}")

            return {

                "query": query,

                "results": [],

                "total_found": 0,

                "error": str(e),

                "status": "failed"

            }

    

    def _calculate_relevance(self, query: str, item: Dict) -> float:

        """

        Calculate a relevance score for search results.

        This helps prioritize the most useful results for the research goal.

        """

        query_terms = query.lower().split()

        title = item.get("title", "").lower()

        snippet = item.get("snippet", "").lower()

        

        # Count term matches in title (weighted higher) and snippet

        title_matches = sum(1 for term in query_terms if term in title)

        snippet_matches = sum(1 for term in query_terms if term in snippet)

        

        # Calculate relevance score (0-1 scale)

        relevance = (title_matches * 0.7 + snippet_matches * 0.3) / len(query_terms)

        return min(relevance, 1.0)


class PDFAnalyzerTool(Tool):

    """

    Tool for downloading and analyzing PDF documents.

    Extracts text content and provides focused analysis

    based on research requirements.

    """

    

    async def execute(self, url: str, focus_area: str = "") -> Dict[str, Any]:

        """

        Download, process, and analyze a PDF document.

        

        Args:

            url: URL of the PDF document to analyze

            focus_area: Specific aspect to focus analysis on

            

        Returns:

            Dictionary containing analysis results and metadata

        """

        print(f"Analyzing PDF from: {url}")

        print(f"Focus area: {focus_area}")

        

        try:

            # Download the PDF with proper headers

            headers = {

                'User-Agent': 'Mozilla/5.0 (Research Agent/1.0)',

                'Accept': 'application/pdf'

            }

            

            response = requests.get(url, headers=headers, timeout=30)

            response.raise_for_status()

            

            if 'application/pdf' not in response.headers.get('content-type', ''):

                raise ValueError("URL does not point to a PDF document")

            

            # Extract text content from PDF

            # Note: In production, use PyPDF2, pdfplumber, or similar libraries

            text_content = self._extract_pdf_text(response.content)

            

            if not text_content.strip():

                raise ValueError("Could not extract text from PDF")

            

            # Perform focused analysis using LLM

            analysis = await self._analyze_document(text_content, focus_area, url)

            

            result = {

                "source_url": url,

                "focus_area": focus_area,

                "analysis": analysis,

                "word_count": len(text_content.split()),

                "char_count": len(text_content),

                "status": "success"

            }

            

            print(f"Successfully analyzed {result['word_count']} words")

            return result

            

        except Exception as e:

            print(f"PDF analysis failed: {e}")

            return {

                "source_url": url,

                "focus_area": focus_area,

                "error": str(e),

                "status": "failed"

            }

    

    def _extract_pdf_text(self, pdf_content: bytes) -> str:

        """

        Extract text from PDF content.

        This is a simplified implementation - in production,

        use specialized PDF processing libraries.

        """

        # Placeholder implementation

        # In reality, you would use:

        # import PyPDF2 or import pdfplumber

        # and implement proper PDF text extraction

        

        # For demonstration, return a simulated extraction

        return "Extracted PDF content would appear here..."

    

    async def _analyze_document(self, text_content: str, focus_area: str, source_url: str) -> Dict[str, Any]:

        """

        Analyze document content with focus on specific research areas.

        Uses the LLM to extract relevant insights and information.

        """

        

        # Truncate content to fit in LLM context window

        max_content_length = 2000

        content_preview = text_content[:max_content_length]

        if len(text_content) > max_content_length:

            content_preview += "\n[Content truncated for analysis...]"

        

        analysis_prompt = f"""

Analyze this research document with specific focus on: {focus_area}


DOCUMENT SOURCE: {source_url}

DOCUMENT CONTENT:

{content_preview}


Please extract and analyze:


1. KEY FINDINGS: What are the main findings relevant to "{focus_area}"?

2. METHODOLOGY: What research methods or data sources were used?

3. DATA SOURCES: What data sources or evidence are mentioned?

4. CONCLUSIONS: What conclusions are drawn about "{focus_area}"?

5. LIMITATIONS: What limitations or caveats are noted?

6. RELEVANCE: How relevant is this document to "{focus_area}" (1-10 scale)?


Format your response as structured JSON:

{{

  "key_findings": ["finding1", "finding2", ...],

  "methodology": "description of methods used",

  "data_sources": ["source1", "source2", ...],

  "conclusions": ["conclusion1", "conclusion2", ...],

  "limitations": ["limitation1", "limitation2", ...],

  "relevance_score": 8,

  "summary": "Brief summary of the document's contribution to the research topic"

}}


Analysis:

"""

        

        # This would use your LLM provider to analyze the content

        # For this example, we'll simulate the analysis

        simulated_analysis = {

            "key_findings": [

                f"Document provides insights into {focus_area}",

                "Research methodology appears robust",

                "Multiple data sources referenced"

            ],

            "methodology": "Mixed methods research approach with quantitative and qualitative components",

            "data_sources": ["Government statistics", "Survey data", "Expert interviews"],

            "conclusions": [

                f"Significant trends identified in {focus_area}",

                "Further research recommended"

            ],

            "limitations": ["Sample size constraints", "Temporal limitations"],

            "relevance_score": 7,

            "summary": f"Relevant research document providing valuable insights into {focus_area}"

        }

        

        return simulated_analysis


class DataVisualizerTool(Tool):

    """

    Tool for creating charts, graphs, and visual representations of data.

    Helps transform research data into clear, communicative visualizations.

    """

    

    def __init__(self):

        """Initialize the data visualizer with necessary directories."""

        self.output_dir = "charts"

        os.makedirs(self.output_dir, exist_ok=True)

    

    async def execute(self, data: List[Dict], chart_type: str, title: str, **kwargs) -> Dict[str, Any]:

        """

        Create a visualization from structured data.

        

        Args:

            data: List of dictionaries containing the data to visualize

            chart_type: Type of chart to create ('bar', 'line', 'pie', 'scatter')

            title: Title for the chart

            **kwargs: Additional chart formatting options

            

        Returns:

            Dictionary containing chart information and file path

        """

        print(f"Creating {chart_type} chart: '{title}'")

        

        try:

            # Convert data to pandas DataFrame for easier manipulation

            df = pd.DataFrame(data)

            

            if df.empty:

                raise ValueError("No data provided for visualization")

            

            # Create the chart based on type

            chart_path = await self._create_chart(df, chart_type, title, **kwargs)

            

            result = {

                "chart_path": chart_path,

                "chart_type": chart_type,

                "title": title,

                "data_points": len(data),

                "columns": list(df.columns),

                "status": "success"

            }

            

            print(f"Chart saved to: {chart_path}")

            return result

            

        except Exception as e:

            print(f"Visualization failed: {e}")

            return {

                "chart_type": chart_type,

                "title": title,

                "error": str(e),

                "status": "failed"

            }

    

    async def _create_chart(self, df: pd.DataFrame, chart_type: str, title: str, **kwargs) -> str:

        """

        Create the actual chart using matplotlib.

        Handles different chart types and formatting options.

        """

        # Set up the plot

        plt.figure(figsize=(12, 8))

        plt.style.use('seaborn-v0_8')  # Use a clean, professional style

        

        # Create chart based on type

        if chart_type == "bar":

            plt.bar(df.iloc[:, 0], df.iloc[:, 1], color='skyblue', alpha=0.8)

            plt.xlabel(df.columns[0])

            plt.ylabel(df.columns[1])

            plt.xticks(rotation=45, ha='right')

            

        elif chart_type == "line":

            plt.plot(df.iloc[:, 0], df.iloc[:, 1], marker='o', linewidth=2, markersize=6)

            plt.xlabel(df.columns[0])

            plt.ylabel(df.columns[1])

            plt.grid(True, alpha=0.3)

            

        elif chart_type == "pie":

            plt.pie(df.iloc[:, 1], labels=df.iloc[:, 0], autopct='%1.1f%%', startangle=90)

            plt.axis('equal')

            

        elif chart_type == "scatter":

            plt.scatter(df.iloc[:, 0], df.iloc[:, 1], alpha=0.7, s=60)

            plt.xlabel(df.columns[0])

            plt.ylabel(df.columns[1])

            plt.grid(True, alpha=0.3)

            

        else:

            raise ValueError(f"Unsupported chart type: {chart_type}")

        

        # Add title and formatting

        plt.title(title, fontsize=16, fontweight='bold', pad=20)

        plt.tight_layout()

        

        # Save the chart

        chart_filename = f"{chart_type}_{uuid.uuid4().hex[:8]}.png"

        chart_path = os.path.join(self.output_dir, chart_filename)

        plt.savefig(chart_path, dpi=300, bbox_inches='tight', facecolor='white')

        plt.close()  # Close the figure to free memory

        

        return chart_path


class ToolRegistry:

    """

    Central registry for managing and executing agent tools.

    Provides a unified interface for tool discovery and execution.

    """

    

    def __init__(self):

        """Initialize an empty tool registry."""

        self.tools = {}

        self.execution_history = []

    

    def register(self, name: str, tool: Tool):

        """

        Register a tool with the given name.

        

        Args:

            name: Unique identifier for the tool

            tool: Tool instance implementing the Tool interface

        """

        if not isinstance(tool, Tool):

            raise ValueError(f"Tool must implement the Tool interface")

        

        self.tools[name] = tool

        print(f"Registered tool: {name}")

    

    async def execute_tool(self, name: str, **kwargs) -> Dict[str, Any]:

        """

        Execute a registered tool with the given parameters.

        

        Args:

            name: Name of the tool to execute

            **kwargs: Parameters to pass to the tool

            

        Returns:

            Dictionary containing tool execution results

        """

        if name not in self.tools:

            available_tools = ", ".join(self.tools.keys())

            raise ValueError(f"Tool '{name}' not found. Available tools: {available_tools}")

        

        # Record execution start

        execution_start = datetime.now()

        

        try:

            # Execute the tool

            result = await self.tools[name].execute(**kwargs)

            

            # Record successful execution

            execution_record = {

                "tool_name": name,

                "parameters": kwargs,

                "start_time": execution_start,

                "end_time": datetime.now(),

                "status": "success",

                "result_summary": str(result)[:100] + "..." if len(str(result)) > 100 else str(result)

            }

            

            self.execution_history.append(execution_record)

            return result

            

        except Exception as e:

            # Record failed execution

            execution_record = {

                "tool_name": name,

                "parameters": kwargs,

                "start_time": execution_start,

                "end_time": datetime.now(),

                "status": "failed",

                "error": str(e)

            }

            

            self.execution_history.append(execution_record)

            raise e

    

    def get_available_tools(self) -> List[str]:

        """Return a list of available tool names."""

        return list(self.tools.keys())

    

    def get_execution_history(self) -> List[Dict[str, Any]]:

        """Return the complete tool execution history."""

        return self.execution_history.copy()


Code Explanation:

  • The Tool abstract base class ensures all tools implement a consistent interface
  • WebSearchTool integrates with search APIs and includes relevance scoring to prioritize results
  • PDFAnalyzerTool handles document download, text extraction, and focused analysis
  • DataVisualizerTool creates professional-quality charts using matplotlib with proper styling
  • ToolRegistry provides centralized tool management with execution tracking and error handling
  • All tools include comprehensive error handling and return structured results
  • Execution history tracking enables performance analysis and debugging
  • The modular design makes it easy to add new tools without modifying existing code


STEP 5: MEMORY AND CONTEXT MANAGEMENT


MEMORY SYSTEM IMPLEMENTATION


# memory.py

import chromadb

from sentence_transformers import SentenceTransformer

from typing import Dict, List, Any, Optional

from datetime import datetime

import json

import numpy as np


class MemorySystem:

    """

    Advanced memory system for storing and retrieving research context.

    Uses vector embeddings to enable semantic search across past research,

    allowing the agent to build on previous work and avoid repetition.

    """

    

    def __init__(self, collection_name: str = "agent_memory"):

        """

        Initialize the memory system with vector database and embedding model.

        

        Args:

            collection_name: Name of the ChromaDB collection for this agent

        """

        print("Initializing memory system...")

        

        # Initialize ChromaDB for vector storage

        self.client = chromadb.Client()

        

        try:

            # Try to get existing collection

            self.collection = self.client.get_collection(collection_name)

            print(f"Loaded existing memory collection: {collection_name}")

        except:

            # Create new collection if it doesn't exist

            self.collection = self.client.create_collection(collection_name)

            print(f"Created new memory collection: {collection_name}")

        

        # Initialize sentence transformer for creating embeddings

        print("Loading sentence transformer model...")

        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')

        print("Memory system ready")

    

    def store_result(self, task_id: str, content: str, metadata: Dict[str, Any]):

        """

        Store a task result in vector memory for future retrieval.

        This allows the agent to learn from past research and build context.

        

        Args:

            task_id: Unique identifier for the task

            content: Text content to store (findings, analysis, etc.)

            metadata: Additional structured information about the result

        """

        print(f"Storing memory for task: {task_id}")

        

        try:

            # Create vector embedding for semantic search

            embedding = self.encoder.encode([content])[0]

            

            # Prepare metadata with timestamp and task info

            enriched_metadata = {

                "task_id": task_id,

                "timestamp": datetime.now().isoformat(),

                "content_length": len(content),

                "content_type": metadata.get("type", "research_result"),

                **metadata

            }

            

            # Store in ChromaDB

            self.collection.add(

                embeddings=[embedding.tolist()],

                documents=[content],

                metadatas=[enriched_metadata],

                ids=[task_id]

            )

            

            print(f"Memory stored successfully for task {task_id}")

            

        except Exception as e:

            print(f"Error storing memory: {e}")

    

    def retrieve_relevant(self, query: str, k: int = 5, 

                         time_filter: Optional[str] = None) -> List[Dict[str, Any]]:

        """

        Retrieve relevant memories based on semantic similarity to a query.

        This enables the agent to access relevant past research when working

        on related topics.

        

        Args:

            query: Text query to search for relevant memories

            k: Number of results to return

            time_filter: Optional time filter ("recent", "week", "month")

            

        Returns:

            List of relevant memory entries with similarity scores

        """

        print(f"Retrieving relevant memories for: '{query}'")

        

        try:

            # Create query embedding

            query_embedding = self.encoder.encode([query])[0]

            

            # Prepare filter conditions

            where_conditions = {}

            if time_filter:

                cutoff_time = self._get_time_cutoff(time_filter)

                where_conditions["timestamp"] = {"$gte": cutoff_time}

            

            # Query the vector database

            query_params = {

                "query_embeddings": [query_embedding.tolist()],

                "n_results": k

            }

            

            if where_conditions:

                query_params["where"] = where_conditions

            

            results = self.collection.query(**query_params)

            

            # Format results with similarity scores

            relevant_memories = []

            for i, (doc, meta, distance) in enumerate(zip(

                results["documents"][0],

                results["metadatas"][0], 

                results["distances"][0]

            )):

                similarity_score = 1 - distance  # Convert distance to similarity

                

                memory_entry = {

                    "content": doc,

                    "metadata": meta,

                    "similarity_score": similarity_score,

                    "rank": i + 1

                }

                

                relevant_memories.append(memory_entry)

            

            print(f"Retrieved {len(relevant_memories)} relevant memories")

            return relevant_memories

            

        except Exception as e:

            print(f"Error retrieving memories: {e}")

            return []

    

    def get_research_context(self, current_goal: str, max_context_length: int = 1000) -> str:

        """

        Generate a research context summary based on relevant past work.

        This provides the agent with background information for better decision-making.

        

        Args:

            current_goal: The current research objective

            max_context_length: Maximum length of context to return

            

        Returns:

            Formatted context string summarizing relevant past research

        """

        print("Generating research context...")

        

        # Retrieve relevant memories

        relevant_memories = self.retrieve_relevant(current_goal, k=10)

        

        if not relevant_memories:

            return "No relevant past research found."

        

        # Format context from most relevant memories

        context_parts = []

        current_length = 0

        

        for memory in relevant_memories:

            # Skip if similarity is too low

            if memory["similarity_score"] < 0.3:

                continue

            

            # Format memory entry

            timestamp = memory["metadata"].get("timestamp", "Unknown time")

            content_preview = memory["content"][:200] + "..." if len(memory["content"]) > 200 else memory["content"]

            

            memory_summary = f"[{timestamp[:10]}] {content_preview}"

            

            # Check if adding this memory would exceed length limit

            if current_length + len(memory_summary) > max_context_length:

                break

            

            context_parts.append(memory_summary)

            current_length += len(memory_summary)

        

        if not context_parts:

            return "No sufficiently relevant past research found."

        

        context = "RELEVANT PAST RESEARCH:\n" + "\n\n".join(context_parts)

        print(f"Generated context with {len(context_parts)} relevant memories")

        

        return context

    

    def _get_time_cutoff(self, time_filter: str) -> str:

        """

        Calculate time cutoff for filtering memories.

        

        Args:

            time_filter: Time filter type ("recent", "week", "month")

            

        Returns:

            ISO format timestamp for filtering

        """

        from datetime import timedelta

        

        now = datetime.now()

        

        if time_filter == "recent":

            cutoff = now - timedelta(days=1)

        elif time_filter == "week":

            cutoff = now - timedelta(weeks=1)

        elif time_filter == "month":

            cutoff = now - timedelta(days=30)

        else:

            cutoff = now - timedelta(days=365)  # Default to one year

        

        return cutoff.isoformat()

    

    def get_memory_stats(self) -> Dict[str, Any]:

        """

        Get statistics about the memory system.

        Useful for monitoring and debugging memory usage.

        

        Returns:

            Dictionary containing memory system statistics

        """

        try:

            collection_info = self.collection.get()

            

            total_memories = len(collection_info["ids"])

            

            # Calculate memory types distribution

            memory_types = {}

            for metadata in collection_info["metadatas"]:

                content_type = metadata.get("content_type", "unknown")

                memory_types[content_type] = memory_types.get(content_type, 0) + 1

            

            # Calculate time distribution

            timestamps = [meta.get("timestamp") for meta in collection_info["metadatas"] if meta.get("timestamp")]

            

            stats = {

                "total_memories": total_memories,

                "memory_types": memory_types,

                "oldest_memory": min(timestamps) if timestamps else None,

                "newest_memory": max(timestamps) if timestamps else None,

                "collection_name": self.collection.name

            }

            

            return stats

            

        except Exception as e:

            return {"error": str(e)}


class ContextManager:

    """

    Manages the current context and working memory for agent operations.

    Maintains the immediate context needed for coherent task execution.

    """

    

    def __init__(self, memory_system: MemorySystem):

        """

        Initialize context manager with reference to long-term memory.

        

        Args:

            memory_system: The long-term memory system for context retrieval

        """

        self.memory_system = memory_system

        self.current_context = {

            "goal": "",

            "progress_summary": "",

            "key_findings": [],

            "active_hypotheses": [],

            "next_priorities": []

        }

        self.context_history = []

    

    def update_context(self, new_information: Dict[str, Any]):

        """

        Update the current working context with new information.

        This maintains coherent context throughout the research process.

        

        Args:

            new_information: Dictionary containing new context information

        """

        print("Updating research context...")

        

        # Store previous context in history

        self.context_history.append(self.current_context.copy())

        

        # Update current context

        for key, value in new_information.items():

            if key in self.current_context:

                if isinstance(self.current_context[key], list):

                    if isinstance(value, list):

                        self.current_context[key].extend(value)

                    else:

                        self.current_context[key].append(value)

                else:

                    self.current_context[key] = value

        

        # Trim context to prevent excessive growth

        self._trim_context()

        

        print("Context updated successfully")

    

    def get_current_context(self) -> str:

        """

        Get formatted current context for LLM consumption.

        

        Returns:

            Formatted context string

        """

        context_parts = []

        

        if self.current_context["goal"]:

            context_parts.append(f"CURRENT GOAL: {self.current_context['goal']}")

        

        if self.current_context["progress_summary"]:

            context_parts.append(f"PROGRESS: {self.current_context['progress_summary']}")

        

        if self.current_context["key_findings"]:

            findings = "\n  - ".join(self.current_context["key_findings"][-5:])  # Last 5 findings

            context_parts.append(f"KEY FINDINGS:\n  - {findings}")

        

        if self.current_context["active_hypotheses"]:

            hypotheses = "\n  - ".join(self.current_context["active_hypotheses"])

            context_parts.append(f"ACTIVE HYPOTHESES:\n  - {hypotheses}")

        

        return "\n\n".join(context_parts) if context_parts else "No active context"

    

    def _trim_context(self):

        """Trim context lists to prevent excessive memory usage."""

        max_items = 10

        

        for key in ["key_findings", "active_hypotheses", "next_priorities"]:

            if isinstance(self.current_context[key], list):

                self.current_context[key] = self.current_context[key][-max_items:]


Code Explanation:

  • The MemorySystem uses ChromaDB for vector storage and SentenceTransformers for embeddings
  • Vector embeddings enable semantic search - the agent can find relevant past research even when different terminology is used
  • store_result saves task outcomes with rich metadata for future retrieval
  • retrieve_relevant finds semantically similar past research with configurable filtering
  • get_research_context creates formatted summaries of relevant background information
  • The ContextManager maintains short-term working memory during task execution
  • Context trimming prevents memory usage from growing unboundedly over time
  • Statistics tracking enables monitoring of memory system performance and usage patterns


STEP 6: LLM INTEGRATION


MISTRAL LLM PROVIDER IMPLEMENTATION


# llm_provider.py

from transformers import AutoTokenizer, AutoModelForCausalLM

import torch

from typing import List, Dict, Any, Optional

import time

import logging


class MistralProvider:

    """

    Local Mistral LLM provider for agent operations.

    Handles model loading, prompt formatting, generation, and optimization

    for the specific requirements of agentic AI applications.

    """

    

    def __init__(self, model_name: str = "mistralai/Mistral-7B-Instruct-v0.2"):

        """

        Initialize the Mistral model provider.

        

        Args:

            model_name: Hugging Face model identifier for Mistral

        """

        self.model_name = model_name

        self.tokenizer = None

        self.model = None

        self.device = None

        

        # Generation parameters optimized for agent tasks

        self.default_params = {

            "max_length": 2048,

            "temperature": 0.7,      # Balance between creativity and consistency

            "top_p": 0.9,           # Nucleus sampling for quality

            "do_sample": True,       # Enable sampling for varied responses

            "repetition_penalty": 1.1  # Reduce repetitive output

        }

        

        # Initialize the model

        self._load_model()

    

    def _load_model(self):

        """

        Load the Mistral model and tokenizer with optimization.

        Handles device placement, memory optimization, and error recovery.

        """

        print(f"Loading Mistral model: {self.model_name}")

        print("This may take several minutes on first run...")

        

        try:

            # Load tokenizer first

            print("Loading tokenizer...")

            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)

            

            # Set pad token if not present (common issue with some models)

            if self.tokenizer.pad_token is None:

                self.tokenizer.pad_token = self.tokenizer.eos_token

            

            # Determine optimal device placement

            if torch.cuda.is_available():

                self.device = "cuda"

                print(f"CUDA available: {torch.cuda.get_device_name()}")

                print(f"CUDA memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

            else:

                self.device = "cpu"

                print("Using CPU (GPU not available or not configured)")

            

            # Load model with optimizations

            print("Loading model...")

            self.model = AutoModelForCausalLM.from_pretrained(

                self.model_name,

                torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,

                device_map="auto",  # Automatically distribute across available devices

                trust_remote_code=True  # Required for some Mistral variants

            )

            

            # Set model to evaluation mode

            self.model.eval()

            

            # Display memory usage

            if self.device == "cuda":

                memory_gb = torch.cuda.memory_allocated() / 1e9

                print(f"Model loaded successfully, using {memory_gb:.2f} GB GPU memory")

            else:

                print("Model loaded successfully on CPU")

                

            # Test the model with a simple generation

            self._test_model()

            

        except Exception as e:

            print(f"Error loading model: {e}")

            print("Attempting fallback configuration...")

            self._load_fallback_model()

    

    def _load_fallback_model(self):

        """

        Load model with reduced memory requirements if standard loading fails.

        Uses 8-bit quantization to reduce memory usage.

        """

        try:

            print("Loading model with 8-bit quantization...")

            

            self.model = AutoModelForCausalLM.from_pretrained(

                self.model_name,

                torch_dtype=torch.float16,

                device_map="auto",

                load_in_8bit=True,  # 8-bit quantization to reduce memory

                trust_remote_code=True

            )

            

            self.model.eval()

            print("Fallback model loaded successfully with quantization")

            

        except Exception as e:

            print(f"Fallback loading also failed: {e}")

            raise RuntimeError("Could not load Mistral model with any configuration")

    

    def _test_model(self):

        """

        Test the model with a simple generation to ensure it's working properly.

        """

        try:

            test_prompt = "Hello, I am a research assistant."

            result = self.generate_sync(test_prompt, max_length=50)

            print("Model test successful")

        except Exception as e:

            print(f"Model test failed: {e}")

    

    async def generate(self, prompt: str, **kwargs) -> str:

        """

        Generate text using the Mistral model (async version).

        

        Args:

            prompt: Input prompt for generation

            **kwargs: Override default generation parameters

            

        Returns:

            Generated text response

        """

        # Run synchronous generation in thread pool to maintain async interface

        import asyncio

        loop = asyncio.get_event_loop()

        return await loop.run_in_executor(None, self.generate_sync, prompt, kwargs)

    

    def generate_sync(self, prompt: str, **kwargs) -> str:

        """

        Generate text using the Mistral model (synchronous version).

        

        Args:

            prompt: Input prompt for generation

            **kwargs: Override default generation parameters

            

        Returns:

            Generated text response

        """

        if self.model is None or self.tokenizer is None:

            raise RuntimeError("Model not loaded. Call _load_model() first.")

        

        # Merge provided parameters with defaults

        generation_params = {**self.default_params, **kwargs}

        

        # Format prompt for Mistral instruction format

        formatted_prompt = self._format_prompt(prompt)

        

        # Log generation start

        start_time = time.time()

        

        try:

            # Tokenize input

            inputs = self.tokenizer(

                formatted_prompt, 

                return_tensors="pt", 

                truncation=True,

                max_length=generation_params.get("max_input_length", 1500)

            )

            

            # Move inputs to appropriate device

            inputs = {k: v.to(self.device) for k, v in inputs.items()}

            

            # Generate response

            with torch.no_grad():

                outputs = self.model.generate(

                    **inputs,

                    max_length=generation_params["max_length"],

                    temperature=generation_params["temperature"],

                    top_p=generation_params["top_p"],

                    do_sample=generation_params["do_sample"],

                    repetition_penalty=generation_params["repetition_penalty"],

                    pad_token_id=self.tokenizer.eos_token_id,

                    eos_token_id=self.tokenizer.eos_token_id

                )

            

            # Decode the response

            response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

            

            # Extract only the generated part (remove input prompt)

            response = self._extract_response(response, formatted_prompt)

            

            # Log generation statistics

            generation_time = time.time() - start_time

            tokens_generated = len(outputs[0]) - len(inputs["input_ids"][0])

            

            print(f"Generated {tokens_generated} tokens in {generation_time:.2f}s")

            

            return response.strip()

            

        except Exception as e:

            print(f"Generation error: {e}")

            return f"Error generating response: {str(e)}"

    

    def _format_prompt(self, prompt: str) -> str:

        """

        Format prompt for Mistral's instruction format.

        Mistral uses specific formatting for optimal performance.

        

        Args:

            prompt: Raw prompt text

            

        Returns:

            Properly formatted prompt for Mistral

        """

        # Mistral instruction format

        return f"<s>[INST] {prompt} [/INST]"

    

    def _extract_response(self, full_text: str, formatted_prompt: str) -> str:

        """

        Extract the generated response from the full model output.

        Removes the input prompt and formatting tokens.

        

        Args:

            full_text: Complete model output including prompt

            formatted_prompt: The original formatted prompt

            

        Returns:

            Clean generated response

        """

        # Find the end of the instruction formatting

        inst_end = "[/INST]"

        if inst_end in full_text:

            response = full_text.split(inst_end, 1)[-1]

        else:

            # Fallback: remove the original prompt

            response = full_text.replace(formatted_prompt, "")

        

        return response.strip()

    

    def generate_structured(self, prompt: str, output_format: str = "json") -> Dict[str, Any]:

        """

        Generate structured output (JSON, YAML, etc.) with validation.

        Useful for agent planning and tool parameter generation.

        

        Args:

            prompt: Prompt requesting structured output

            output_format: Expected output format ("json", "yaml")

            

        Returns:

            Parsed structured data

        """

        # Add format instructions to prompt

        format_instruction = f"\nPlease respond with valid {output_format.upper()} only:"

        structured_prompt = prompt + format_instruction

        

        # Generate response

        response = self.generate_sync(structured_prompt, temperature=0.3)  # Lower temperature for consistency

        

        try:

            if output_format.lower() == "json":

                import json

                return json.loads(response)

            elif output_format.lower() == "yaml":

                import yaml

                return yaml.safe_load(response)

            else:

                return {"raw_response": response}

                

        except Exception as e:

            print(f"Error parsing {output_format}: {e}")

            return {"error": str(e), "raw_response": response}

    

    def get_model_info(self) -> Dict[str, Any]:

        """

        Get information about the loaded model.

        

        Returns:

            Dictionary containing model information

        """

        if self.model is None:

            return {"status": "not_loaded"}

        

        info = {

            "model_name": self.model_name,

            "device": self.device,

            "model_type": type(self.model).__name__,

            "parameter_count": sum(p.numel() for p in self.model.parameters()),

            "memory_usage_gb": torch.cuda.memory_allocated() / 1e9 if self.device == "cuda" else "N/A"

        }

        

        return info


class LLMCache:

    """

    Simple caching system for LLM responses to avoid redundant generations.

    Particularly useful for planning and analysis tasks that might be repeated.

    """

    

    def __init__(self, max_cache_size: int = 100):

        """

        Initialize the cache with maximum size limit.

        

        Args:

            max_cache_size: Maximum number of cached responses

        """

        self.cache = {}

        self.max_size = max_cache_size

        self.access_order = []

    

    def get(self, prompt: str) -> Optional[str]:

        """

        Get cached response for a prompt.

        

        Args:

            prompt: The prompt to look up

            

        Returns:

            Cached response or None if not found

        """

        prompt_hash = hash(prompt)

        

        if prompt_hash in self.cache:

            # Update access order for LRU eviction

            self.access_order.remove(prompt_hash)

            self.access_order.append(prompt_hash)

            return self.cache[prompt_hash]

        

        return None

    

    def put(self, prompt: str, response: str):

        """

        Cache a response for a prompt.

        

        Args:

            prompt: The prompt

            response: The generated response

        """

        prompt_hash = hash(prompt)

        

        # Evict oldest if cache is full

        if len(self.cache) >= self.max_size and prompt_hash not in self.cache:

            oldest = self.access_order.pop(0)

            del self.cache[oldest]

        

        # Add to cache

        self.cache[prompt_hash] = response

        

        if prompt_hash not in self.access_order:

            self.access_order.append(prompt_hash)


Code Explanation:

  • MistralProvider handles all aspects of local LLM integration including loading, optimization, and generation
  • The class automatically handles device placement (GPU/CPU) and memory optimization using float16 and quantization
  • Mistral-specific prompt formatting ensures optimal model performance with the [INST] tag structure
  • Error handling and fallback loading with quantization ensures the system works on various hardware configurations
  • generate_structured method enables reliable JSON/YAML output for agent planning and tool parameters
  • Caching system prevents redundant generations for repeated prompts, improving efficiency
  • Comprehensive logging and monitoring helps debug performance issues and track resource usage
  • The async/sync interface allows integration with both synchronous and asynchronous codebases


STEP 7: MAIN APPLICATION


COMPLETE AGENT IMPLEMENTATION


# main_agent.py

import asyncio

from typing import List, Dict, Any, Optional

import logging

import uuid

import json

from datetime import datetime

import traceback


class ResearchAgentApp:

    """

    Main application class that orchestrates all components of the research agent.

    Provides high-level interfaces for research tasks and manages the complete

    agent lifecycle from initialization through execution to cleanup.

    """

    

    def __init__(self, config: Optional[Dict[str, Any]] = None):

        """

        Initialize the complete research agent application.

        

        Args:

            config: Optional configuration dictionary for customizing behavior

        """

        # Set up logging

        self._setup_logging()

        self.logger = logging.getLogger(__name__)

        

        # Load configuration

        self.config = self._load_config(config)

        

        # Initialize core components

        self.logger.info("Initializing Research Agent components...")

        

        try:

            # Initialize LLM provider

            self.logger.info("Loading Mistral LLM...")

            self.llm = MistralProvider(self.config.get("model_name", "mistralai/Mistral-7B-Instruct-v0.2"))

            

            # Initialize memory system

            self.logger.info("Setting up memory system...")

            self.memory = MemorySystem(self.config.get("memory_collection", "research_agent"))

            

            # Initialize context manager

            self.context_manager = ContextManager(self.memory)

            

            # Initialize and register tools

            self.logger.info("Setting up tool registry...")

            self.tools = self._setup_tools()

            

            # Initialize planning system

            self.planner = PlanningSystem(self.llm)

            

            # Initialize performance tracking

            self.performance_stats = {

                "total_research_sessions": 0,

                "successful_completions": 0,

                "total_tasks_executed": 0,

                "average_completion_time": 0.0

            }

            

            self.logger.info("Research Agent initialization complete!")

            

        except Exception as e:

            self.logger.error(f"Failed to initialize Research Agent: {e}")

            raise RuntimeError(f"Agent initialization failed: {e}")

    

    def _setup_logging(self):

        """Configure logging for the application."""

        logging.basicConfig(

            level=logging.INFO,

            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

            handlers=[

                logging.FileHandler('research_agent.log'),

                logging.StreamHandler()

            ]

        )

    

    def _load_config(self, config: Optional[Dict[str, Any]]) -> Dict[str, Any]:

        """

        Load configuration with defaults.

        

        Args:

            config: User-provided configuration

            

        Returns:

            Complete configuration dictionary with defaults

        """

        default_config = {

            "model_name": "mistralai/Mistral-7B-Instruct-v0.2",

            "memory_collection": "research_agent",

            "max_tasks_per_session": 20,

            "search_api_key": "your_search_api_key_here",

            "enable_caching": True,

            "output_directory": "./research_outputs"

        }

        

        if config:

            default_config.update(config)

        

        return default_config

    

    def _setup_tools(self) -> ToolRegistry:

        """

        Initialize and register all available tools.

        

        Returns:

            Configured tool registry with all tools registered

        """

        registry = ToolRegistry()

        

        try:

            # Register web search tool

            if self.config.get("search_api_key"):

                web_search = WebSearchTool(self.config["search_api_key"])

                registry.register("web_search", web_search)

                self.logger.info("Registered web_search tool")

            else:

                self.logger.warning("No search API key provided - web_search tool disabled")

            

            # Register PDF analyzer

            pdf_analyzer = PDFAnalyzerTool()

            registry.register("pdf_analyzer", pdf_analyzer)

            self.logger.info("Registered pdf_analyzer tool")

            

            # Register data visualizer

            data_visualizer = DataVisualizerTool()

            registry.register("data_visualizer", data_visualizer)

            self.logger.info("Registered data_visualizer tool")

            

            # Log available tools

            available_tools = registry.get_available_tools()

            self.logger.info(f"Tool registry initialized with {len(available_tools)} tools: {available_tools}")

            

        except Exception as e:

            self.logger.error(f"Error setting up tools: {e}")

            raise

        

        return registry

    

    async def research(self, goal: str, context: str = "") -> str:

        """

        Execute a complete research session for the given goal.

        This is the main entry point for research tasks.

        

        Args:

            goal: High-level research objective

            context: Additional context or constraints for the research

            

        Returns:

            Comprehensive research report

        """

        session_id = str(uuid.uuid4())

        start_time = datetime.now()

        

        self.logger.info(f"Starting research session {session_id}")

        self.logger.info(f"Research goal: {goal}")

        

        try:

            # Update performance stats

            self.performance_stats["total_research_sessions"] += 1

            

            # Retrieve relevant past research for context

            self.logger.info("Retrieving relevant context from memory...")

            past_context = self.memory.get_research_context(goal)

            

            # Update context manager

            self.context_manager.update_context({

                "goal": goal,

                "session_id": session_id,

                "past_context": past_context

            })

            

            # Initialize agent for this session

            agent = ResearchAgent(self.llm, self.tools)

            agent.state.current_goal = goal

            

            # Add context to agent if provided

            if context or past_context:

                combined_context = f"{context}\n\n{past_context}".strip()

                agent.state.context = combined_context

                self.logger.info("Added context to agent state")

            

            # Execute the research

            self.logger.info("Beginning research execution...")

            result = await self._execute_research_session(agent, goal, session_id)

            

            # Calculate session duration

            end_time = datetime.now()

            session_duration = (end_time - start_time).total_seconds()

            

            # Update performance statistics

            self.performance_stats["successful_completions"] += 1

            self.performance_stats["total_tasks_executed"] += len(agent.state.completed_tasks)

            

            # Calculate average completion time

            total_sessions = self.performance_stats["successful_completions"]

            current_avg = self.performance_stats["average_completion_time"]

            self.performance_stats["average_completion_time"] = (

                (current_avg * (total_sessions - 1) + session_duration) / total_sessions

            )

            

            # Store final result in long-term memory

            self.memory.store_result(

                task_id=f"research_session_{session_id}",

                content=result,

                metadata={

                    "goal": goal,

                    "session_duration": session_duration,

                    "tasks_completed": len(agent.state.completed_tasks),

                    "type": "final_research_report"

                }

            )

            

            # Update context manager with final results

            self.context_manager.update_context({

                "progress_summary": f"Completed research on: {goal}",

                "key_findings": [f"Session {session_id} completed successfully"]

            })

            

            self.logger.info(f"Research session {session_id} completed successfully in {session_duration:.2f}s")

            return result

            

        except Exception as e:

            self.logger.error(f"Research session {session_id} failed: {e}")

            self.logger.error(f"Traceback: {traceback.format_exc()}")

            

            # Store failure information

            error_report = f"Research session failed: {str(e)}"

            self.memory.store_result(

                task_id=f"research_session_{session_id}_error",

                content=error_report,

                metadata={

                    "goal": goal,

                    "error": str(e),

                    "type": "research_error"

                }

            )

            

            return f"Research failed: {str(e)}. Please check the logs for more details."

    

    async def _execute_research_session(self, agent: ResearchAgent, goal: str, session_id: str) -> str:

        """

        Execute the core research logic with comprehensive error handling.

        

        Args:

            agent: The research agent instance

            goal: Research objective

            session_id: Unique session identifier

            

        Returns:

            Research results

        """

        try:

            # PHASE 1: Strategic Planning

            self.logger.info("Phase 1: Creating strategic research plan...")

            tasks = await self.planner.create_plan(goal, agent.state.context)

            agent.state.task_queue.extend(tasks)

            

            if not agent.state.task_queue:

                raise ValueError("No tasks generated for research goal")

            

            self.logger.info(f"Generated {len(agent.state.task_queue)} tasks for execution")

            

            # PHASE 2: Task Execution with Adaptive Replanning

            self.logger.info("Phase 2: Executing research tasks...")

            

            task_counter = 0

            max_tasks = self.config.get("max_tasks_per_session", 20)

            

            while agent.state.task_queue and task_counter < max_tasks:

                task = agent.state.task_queue.pop(0)

                task_counter += 1

                

                self.logger.info(f"Executing task {task_counter}: {task.description}")

                

                try:

                    # Execute the task

                    task.status = TaskStatus.IN_PROGRESS

                    result = await self.tools.execute_tool(task.tool_name, **task.parameters)

                    

                    # Store task result

                    task.result = result

                    task.status = TaskStatus.COMPLETED

                    agent.state.completed_tasks.append(task)

                    

                    # Store in memory for future reference

                    self.memory.store_result(

                        task_id=task.id,

                        content=str(result),

                        metadata={

                            "task_description": task.description,

                            "tool_used": task.tool_name,

                            "session_id": session_id,

                            "type": "task_result"

                        }

                    )

                    

                    self.logger.info(f"Task {task_counter} completed successfully")

                    

                    # Adaptive replanning every 3 tasks

                    if task_counter % 3 == 0 and agent.state.task_queue:

                        should_replan = await self.planner.should_replan(agent.state)

                        

                        if should_replan:

                            self.logger.info("Initiating adaptive replanning...")

                            new_tasks = await self._generate_additional_tasks(agent, goal)

                            agent.state.task_queue.extend(new_tasks)

                            self.logger.info(f"Added {len(new_tasks)} new tasks from replanning")

                    

                except Exception as task_error:

                    self.logger.error(f"Task {task_counter} failed: {task_error}")

                    task.status = TaskStatus.FAILED

                    task.result = {"error": str(task_error)}

                    agent.state.completed_tasks.append(task)

                    

                    # Continue with other tasks despite failure

                    continue

            

            if task_counter >= max_tasks:

                self.logger.warning(f"Reached maximum task limit ({max_tasks}) for session")

            

            # PHASE 3: Results Synthesis

            self.logger.info("Phase 3: Synthesizing research results...")

            final_report = await self._synthesize_final_report(agent, goal)

            

            return final_report

            

        except Exception as e:

            self.logger.error(f"Error in research execution: {e}")

            raise

    

    async def _generate_additional_tasks(self, agent: ResearchAgent, goal: str) -> List[Task]:

        """

        Generate additional tasks based on current research progress.

        

        Args:

            agent: Current agent state

            goal: Original research goal

            

        Returns:

            List of additional tasks to execute

        """

        replan_prompt = f"""

Based on the research progress so far, generate additional tasks to improve the research quality.


ORIGINAL GOAL: {goal}


COMPLETED TASKS SUMMARY:

{self._format_completed_tasks(agent.state.completed_tasks[-5:])}


CURRENT FINDINGS:

{self._extract_key_findings(agent.state.completed_tasks)}


Generate 1-3 additional tasks that would:

1. Fill important gaps in the research

2. Validate key findings

3. Provide additional perspectives

4. Enhance the final analysis


Format as JSON:

{{

  "additional_tasks": [

    {{

      "id": "additional_001",

      "description": "Task description",

      "tool_name": "tool_to_use", 

      "parameters": {{"param": "value"}}

    }}

  ]

}}


Additional tasks:

"""

        

        try:

            response = await self.llm.generate(replan_prompt)

            task_data = json.loads(response)

            

            additional_tasks = []

            for task_info in task_data.get("additional_tasks", []):

                task = Task(

                    id=task_info["id"],

                    description=task_info["description"],

                    tool_name=task_info["tool_name"],

                    parameters=task_info["parameters"]

                )

                additional_tasks.append(task)

            

            return additional_tasks

            

        except Exception as e:

            self.logger.error(f"Error generating additional tasks: {e}")

            return []

    

    async def _synthesize_final_report(self, agent: ResearchAgent, goal: str) -> str:

        """

        Create a comprehensive final report from all research results.

        

        Args:

            agent: Agent with completed research

            goal: Original research goal

            

        Returns:

            Formatted final research report

        """

        synthesis_prompt = f"""

Create a comprehensive research report based on the completed analysis.


RESEARCH GOAL: {goal}


COMPLETED RESEARCH SUMMARY:

{self._format_all_results(agent.state.completed_tasks)}


Create a well-structured research report with:


1. EXECUTIVE SUMMARY: Key findings and conclusions (2-3 paragraphs)


2. METHODOLOGY: Research approach and data sources used


3. KEY FINDINGS: Detailed analysis of the most important discoveries

   - Use specific data and evidence from the research

   - Organize findings thematically

   - Include relevant statistics or trends


4. ANALYSIS: Deeper interpretation of the findings

   - Identify patterns and relationships

   - Discuss implications and significance

   - Address potential limitations


5. CONCLUSIONS: Summary of main insights and their implications


6. RECOMMENDATIONS: Actionable next steps or further research directions


Make the report professional, evidence-based, and actionable. Use clear headings and organize information logically.


RESEARCH REPORT:

"""

        

        try:

            final_report = await self.llm.generate(synthesis_prompt, max_length=3000)

            

            # Add metadata header to report

            report_header = f"""

RESEARCH REPORT

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

Research Goal: {goal}

Tasks Completed: {len(agent.state.completed_tasks)}

Successful Tasks: {len([t for t in agent.state.completed_tasks if t.status == TaskStatus.COMPLETED])}


{'='*80}


"""

            

            return report_header + final_report

            

        except Exception as e:

            self.logger.error(f"Error synthesizing final report: {e}")

            return self._create_fallback_report(agent, goal)

    

    def _create_fallback_report(self, agent: ResearchAgent, goal: str) -> str:

        """Create a basic report when synthesis fails."""

        report = f"""

RESEARCH REPORT (Fallback)

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

Research Goal: {goal}


SUMMARY:

Research was conducted using {len(agent.state.completed_tasks)} tasks.

{len([t for t in agent.state.completed_tasks if t.status == TaskStatus.COMPLETED])} tasks completed successfully.


TASK RESULTS:

"""

        

        for i, task in enumerate(agent.state.completed_tasks, 1):

            status_icon = "✓" if task.status == TaskStatus.COMPLETED else "✗"

            report += f"\n{i}. {status_icon} {task.description}"

            if task.result and task.status == TaskStatus.COMPLETED:

                result_summary = str(task.result)[:200] + "..." if len(str(task.result)) > 200 else str(task.result)

                report += f"\n   Result: {result_summary}"

        

        return report

    

    def _format_completed_tasks(self, tasks: List[Task]) -> str:

        """Format completed tasks for LLM prompts."""

        if not tasks:

            return "No completed tasks"

        

        formatted = []

        for task in tasks:

            status = "✓" if task.status == TaskStatus.COMPLETED else "✗"

            result_preview = str(task.result)[:100] + "..." if len(str(task.result)) > 100 else str(task.result)

            formatted.append(f"{status} {task.description}: {result_preview}")

        

        return "\n".join(formatted)

    

    def _extract_key_findings(self, tasks: List[Task]) -> str:

        """Extract key findings from completed tasks."""

        findings = []

        

        for task in tasks:

            if task.status == TaskStatus.COMPLETED and task.result:

                # Extract meaningful information from task results

                result_str = str(task.result)

                if len(result_str) > 50:  # Only include substantial results

                    finding = result_str[:200] + "..." if len(result_str) > 200 else result_str

                    findings.append(f"- {finding}")

        

        return "\n".join(findings) if findings else "No significant findings extracted"

    

    def _format_all_results(self, tasks: List[Task]) -> str:

        """Format all task results for final synthesis."""

        if not tasks:

            return "No tasks completed"

        

        formatted = []

        for i, task in enumerate(tasks, 1):

            formatted.append(f"\nTASK {i}: {task.description}")

            formatted.append(f"Status: {task.status.value}")

            

            if task.result:

                result_str = str(task.result)

                formatted.append(f"Result: {result_str}")

            

            formatted.append("-" * 40)

        

        return "\n".join(formatted)

    

    async def interactive_mode(self):

        """

        Run the agent in interactive mode for user interaction.

        Provides a command-line interface for research requests.

        """

        print("\n" + "="*60)

        print("🤖 RESEARCH ASSISTANT AGENT - Interactive Mode")

        print("="*60)

        print("Ask me to research any topic and I'll provide comprehensive analysis!")

        print("\nCommands:")

        print("  • Type any research question or goal")

        print("  • 'stats' - Show performance statistics")

        print("  • 'memory' - Show memory system status")

        print("  • 'help' - Show detailed help")

        print("  • 'quit' - Exit the application")

        print("-"*60)

        

        while True:

            try:

                user_input = input("\n📝 Research Goal: ").strip()

                

                if user_input.lower() == 'quit':

                    print("👋 Thank you for using Research Assistant Agent!")

                    break

                    

                elif user_input.lower() == 'help':

                    self._show_detailed_help()

                    continue

                    

                elif user_input.lower() == 'stats':

                    self._show_performance_stats()

                    continue

                    

                elif user_input.lower() == 'memory':

                    self._show_memory_stats()

                    continue

                    

                elif not user_input:

                    print("Please enter a research goal or command.")

                    continue

                

                print("\n🔍 Starting research...")

                print("This may take several minutes depending on the complexity...")

                

                # Execute research

                result = await self.research(user_input)

                

                print("\n📊 Research Complete!")

                print("="*80)

                print(result)

                print("="*80)

                

            except KeyboardInterrupt:

                print("\n👋 Research interrupted. Goodbye!")

                break

            except Exception as e:

                self.logger.error(f"Interactive mode error: {e}")

                print(f"❌ Error: {str(e)}")

                print("Please try again or type 'quit' to exit.")

    

    def _show_detailed_help(self):

        """Show detailed help information."""

        help_text = """

RESEARCH ASSISTANT AGENT - HELP


WHAT I CAN DO:

• Conduct comprehensive research on any topic

• Search the web for current information

• Analyze documents and research papers

• Create data visualizations and charts

• Synthesize findings into detailed reports

• Learn from past research to improve future work


EXAMPLE RESEARCH GOALS:

• "Analyze the current state of renewable energy adoption in Europe"

• "Compare electric vehicle market trends across different countries" 

• "Research the latest developments in artificial intelligence"

• "Investigate the economic impact of remote work policies"

• "Study consumer preferences in sustainable packaging"


HOW IT WORKS:

1. You provide a research goal

2. I create a strategic research plan

3. I execute tasks using various tools (web search, document analysis, etc.)

4. I adapt the plan based on findings

5. I synthesize everything into a comprehensive report


TIPS FOR BETTER RESULTS:

• Be specific about what aspects interest you most

• Mention if you want focus on particular regions, time periods, or perspectives

• Ask follow-up questions to dive deeper into specific findings

• Use 'stats' to see how much research I've conducted


COMMANDS:

• Type any research question

• 'stats' - Performance statistics

• 'memory' - Memory system status  

• 'help' - This help message

• 'quit' - Exit application

"""

        print(help_text)

    

    def _show_performance_stats(self):

        """Display performance statistics."""

        stats = self.performance_stats

        memory_stats = self.memory.get_memory_stats()

        

        print("\n📈 PERFORMANCE STATISTICS")

        print("-" * 40)

        print(f"Total Research Sessions: {stats['total_research_sessions']}")

        print(f"Successful Completions: {stats['successful_completions']}")

        print(f"Success Rate: {stats['successful_completions']/max(stats['total_research_sessions'], 1)*100:.1f}%")

        print(f"Total Tasks Executed: {stats['total_tasks_executed']}")

        print(f"Average Completion Time: {stats['average_completion_time']:.1f} seconds")

        print(f"Memories Stored: {memory_stats.get('total_memories', 0)}")

        

        # Tool execution stats

        tool_history = self.tools.get_execution_history()

        if tool_history:

            successful_tools = len([h for h in tool_history if h['status'] == 'success'])

            print(f"Tool Executions: {len(tool_history)} (Success: {successful_tools})")

    

    def _show_memory_stats(self):

        """Display memory system statistics."""

        stats = self.memory.get_memory_stats()

        

        print("\n🧠 MEMORY SYSTEM STATUS")

        print("-" * 40)

        print(f"Total Memories: {stats.get('total_memories', 0)}")

        print(f"Collection: {stats.get('collection_name', 'Unknown')}")

        

        if stats.get('memory_types'):

            print("Memory Types:")

            for mem_type, count in stats['memory_types'].items():

                print(f"  • {mem_type}: {count}")

        

        if stats.get('oldest_memory'):

            print(f"Oldest Memory: {stats['oldest_memory'][:10]}")

        if stats.get('newest_memory'):

            print(f"Newest Memory: {stats['newest_memory'][:10]}")


# Example usage and testing

async def main():

    """

    Main function demonstrating how to use the Research Agent.

    Provides examples of both programmatic and interactive usage.

    """

    try:

        print("Initializing Research Agent...")

        

        # Configure the agent (replace with your actual API keys)

        config = {

            "search_api_key": "your_search_api_key_here",  # Replace with real API key

            "max_tasks_per_session": 15,

            "enable_caching": True

        }

        

        # Initialize the agent

        agent = ResearchAgentApp(config)

        

        # Example 1: Programmatic research

        print("\n" + "="*60)

        print("EXAMPLE: Programmatic Research")

        print("="*60)

        

        research_goal = "Analyze the impact of artificial intelligence on job markets in 2024"

        result = await agent.research(research_goal)

        

        print("\nRESEARCH RESULT:")

        print("-" * 40)

        print(result)

        

        # Example 2: Interactive mode (uncomment to try)

        # print("\nSwitching to interactive mode...")

        # await agent.interactive_mode()

        

    except Exception as e:

        print(f"Error in main: {e}")

        import traceback

        traceback.print_exc()


if __name__ == "__main__":

    asyncio.run(main())


Code Explanation:

  • ResearchAgentApp is the main orchestrator that combines all components into a unified system
  • The initialization process loads and configures the LLM, memory system, tools, and planner
  • Configuration management allows customization of model, API keys, and operational parameters
  • The research method implements a complete research workflow with error handling and performance tracking
  • Adaptive replanning allows the agent to adjust its approach based on intermediate findings
  • Comprehensive logging and statistics tracking enable monitoring and debugging
  • Interactive mode provides a user-friendly interface for research requests
  • Fallback mechanisms ensure the system continues operating even when components fail
  • Performance statistics help optimize the agent's behavior over time
  • The modular design makes it easy to extend with new capabilities and tools

STEP 8: ADVANCED FEATURES


SELF-REFLECTION AND IMPROVEMENT SYSTEM


# reflection.py

from typing import Dict, List, Any, Optional

from datetime import datetime, timedelta

import json


class ReflectionSystem:

    """

    Advanced self-reflection system that enables the agent to analyze

    its own performance and continuously improve its research capabilities.

    This is a key component of truly agentic behavior.

    """

    

    def __init__(self, llm_provider, memory_system):

        """

        Initialize the reflection system.

        

        Args:

            llm_provider: LLM for generating reflections

            memory_system: Access to historical performance data

        """

        self.llm = llm_provider

        self.memory = memory_system

        self.reflection_history = []

        self.improvement_metrics = {

            "research_quality_trend": [],

            "efficiency_trend": [],

            "user_satisfaction_trend": []

        }

    

    async def reflect_on_performance(self, completed_tasks: List[Task], 

                                   original_goal: str, 

                                   final_result: str,

                                   session_duration: float) -> Dict[str, Any]:

        """

        Conduct comprehensive performance analysis of a research session.

        This enables the agent to learn from experience and improve future performance.

        

        Args:

            completed_tasks: All tasks executed in the session

            original_goal: The original research objective

            final_result: Final research report generated

            session_duration: Time taken to complete the research

            

        Returns:

            Detailed reflection analysis with improvement recommendations

        """

        print("Conducting performance reflection...")

        

        # Analyze task execution patterns

        task_analysis = self._analyze_task_execution(completed_tasks)

        

        # Evaluate research quality

        quality_assessment = await self._assess_research_quality(

            original_goal, final_result, completed_tasks

        )

        

        # Analyze efficiency metrics

        efficiency_analysis = self._analyze_efficiency(completed_tasks, session_duration)

        

        # Generate improvement recommendations

        recommendations = await self._generate_improvement_recommendations(

            task_analysis, quality_assessment, efficiency_analysis, original_goal

        )

        

        # Create comprehensive reflection

        reflection = {

            "timestamp": datetime.now().isoformat(),

            "session_summary": {

                "goal": original_goal,

                "duration_seconds": session_duration,

                "tasks_completed": len(completed_tasks),

                "success_rate": task_analysis["success_rate"]

            },

            "task_analysis": task_analysis,

            "quality_assessment": quality_assessment,

            "efficiency_analysis": efficiency_analysis,

            "recommendations": recommendations,

            "overall_score": self._calculate_overall_score(quality_assessment, efficiency_analysis)

        }

        

        # Store reflection in history

        self.reflection_history.append(reflection)

        

        # Update improvement metrics

        self._update_improvement_trends(reflection)

        

        print(f"Reflection complete. Overall score: {reflection['overall_score']:.2f}/10")

        return reflection

    

    def _analyze_task_execution(self, tasks: List[Task]) -> Dict[str, Any]:

        """

        Analyze patterns in task execution to identify strengths and weaknesses.

        

        Args:

            tasks: List of executed tasks

            

        Returns:

            Dictionary containing task execution analysis

        """

        if not tasks:

            return {"error": "No tasks to analyze"}

        

        # Calculate basic metrics

        total_tasks = len(tasks)

        successful_tasks = len([t for t in tasks if t.status == TaskStatus.COMPLETED])

        failed_tasks = len([t for t in tasks if t.status == TaskStatus.FAILED])

        

        # Analyze tool usage patterns

        tool_usage = {}

        tool_success_rates = {}

        

        for task in tasks:

            tool_name = task.tool_name

            tool_usage[tool_name] = tool_usage.get(tool_name, 0) + 1

            

            if tool_name not in tool_success_rates:

                tool_success_rates[tool_name] = {"successful": 0, "total": 0}

            

            tool_success_rates[tool_name]["total"] += 1

            if task.status == TaskStatus.COMPLETED:

                tool_success_rates[tool_name]["successful"] += 1

        

        # Calculate success rates per tool

        for tool in tool_success_rates:

            tool_data = tool_success_rates[tool]

            tool_data["success_rate"] = tool_data["successful"] / tool_data["total"]

        

        # Identify task sequence patterns

        task_sequence = [task.tool_name for task in tasks]

        sequence_effectiveness = self._analyze_task_sequence(tasks)

        

        return {

            "total_tasks": total_tasks,

            "successful_tasks": successful_tasks,

            "failed_tasks": failed_tasks,

            "success_rate": successful_tasks / total_tasks,

            "tool_usage": tool_usage,

            "tool_success_rates": tool_success_rates,

            "task_sequence": task_sequence,

            "sequence_effectiveness": sequence_effectiveness

        }

    

    def _analyze_task_sequence(self, tasks: List[Task]) -> Dict[str, Any]:

        """

        Analyze the effectiveness of task sequencing.

        Good sequencing can significantly improve research quality.

        """

        if len(tasks) < 2:

            return {"insufficient_data": True}

        

        # Analyze common beneficial patterns

        beneficial_patterns = [

            ["web_search", "pdf_analyzer"],  # Search then deep dive

            ["web_search", "data_visualizer"],  # Search then visualize

            ["pdf_analyzer", "synthesis"]  # Analyze then synthesize

        ]

        

        sequence = [task.tool_name for task in tasks]

        pattern_matches = 0

        

        for i in range(len(sequence) - 1):

            current_pair = [sequence[i], sequence[i + 1]]

            if current_pair in beneficial_patterns:

                pattern_matches += 1

        

        # Calculate sequence coherence score

        coherence_score = pattern_matches / max(len(sequence) - 1, 1)

        

        return {

            "pattern_matches": pattern_matches,

            "coherence_score": coherence_score,

            "sequence_length": len(sequence),

            "follows_best_practices": coherence_score > 0.3

        }

    

    async def _assess_research_quality(self, goal: str, result: str, tasks: List[Task]) -> Dict[str, Any]:

        """

        Use the LLM to assess the quality of research conducted.

        This provides qualitative analysis of research effectiveness.

        """

        quality_prompt = f"""

Assess the quality of this research session on a scale of 1-10.


RESEARCH GOAL: {goal}


RESEARCH APPROACH:

{self._format_research_approach


    async def _assess_research_quality(self, goal: str, result: str, tasks: List[Task]) -> Dict[str, Any]:

        """

        Use the LLM to assess the quality of research conducted.

        This provides qualitative analysis of research effectiveness.

        """

        quality_prompt = f"""

Assess the quality of this research session on a scale of 1-10.


RESEARCH GOAL: {goal}


RESEARCH APPROACH:

{self._format_research_approach(tasks)}


FINAL RESULT LENGTH: {len(result)} characters


EVALUATION CRITERIA:

1. Completeness: Does the research address all aspects of the goal?

2. Accuracy: Are the findings well-supported and credible?

3. Depth: Does the analysis go beyond surface-level information?

4. Coherence: Is the research logically structured and well-organized?

5. Actionability: Are the conclusions useful and practical?


Rate each criterion (1-10) and provide overall assessment:


{{

  "completeness_score": 8,

  "accuracy_score": 7,

  "depth_score": 6,

  "coherence_score": 9,

  "actionability_score": 7,

  "overall_quality_score": 7.4,

  "strengths": ["strength1", "strength2"],

  "weaknesses": ["weakness1", "weakness2"],

  "improvement_areas": ["area1", "area2"]

}}


Assessment:

"""

        

        try:

            response = await self.llm.generate(quality_prompt, temperature=0.3)

            assessment = json.loads(response)

            

            # Validate and ensure all required fields

            required_fields = ["completeness_score", "accuracy_score", "depth_score", 

                             "coherence_score", "actionability_score", "overall_quality_score"]

            

            for field in required_fields:

                if field not in assessment:

                    assessment[field] = 5.0  # Default middle score

            

            return assessment

            

        except Exception as e:

            print(f"Error in quality assessment: {e}")

            return {

                "overall_quality_score": 5.0,

                "error": str(e),

                "strengths": [],

                "weaknesses": ["Assessment failed"],

                "improvement_areas": ["Fix quality assessment system"]

            }

    

    def _format_research_approach(self, tasks: List[Task]) -> str:

        """Format the research approach for quality assessment."""

        if not tasks:

            return "No tasks executed"

        

        approach_summary = []

        for i, task in enumerate(tasks, 1):

            status_icon = "✓" if task.status == TaskStatus.COMPLETED else "✗"

            approach_summary.append(f"{i}. {status_icon} {task.description} (using {task.tool_name})")

        

        return "\n".join(approach_summary)

    

    def _analyze_efficiency(self, tasks: List[Task], session_duration: float) -> Dict[str, Any]:

        """

        Analyze the efficiency of the research session.

        Efficiency metrics help optimize future research approaches.

        """

        if not tasks:

            return {"error": "No tasks to analyze"}

        

        # Calculate time per task (estimated)

        avg_time_per_task = session_duration / len(tasks)

        

        # Analyze tool efficiency

        tool_usage = {}

        for task in tasks:

            tool_name = task.tool_name

            if tool_name not in tool_usage:

                tool_usage[tool_name] = {"count": 0, "success_count": 0}

            

            tool_usage[tool_name]["count"] += 1

            if task.status == TaskStatus.COMPLETED:

                tool_usage[tool_name]["success_count"] += 1

        

        # Calculate efficiency score based on success rate and time

        total_tasks = len(tasks)

        successful_tasks = len([t for t in tasks if t.status == TaskStatus.COMPLETED])

        success_rate = successful_tasks / total_tasks

        

        # Efficiency score considers both success rate and reasonable time usage

        time_efficiency = min(1.0, 300 / max(avg_time_per_task, 1))  # 5 minutes is ideal per task

        overall_efficiency = (success_rate * 0.7) + (time_efficiency * 0.3)

        

        return {

            "session_duration": session_duration,

            "total_tasks": total_tasks,

            "successful_tasks": successful_tasks,

            "avg_time_per_task": avg_time_per_task,

            "success_rate": success_rate,

            "time_efficiency": time_efficiency,

            "overall_efficiency": overall_efficiency,

            "tool_usage": tool_usage

        }

    

    async def _generate_improvement_recommendations(self, task_analysis: Dict, 

                                                   quality_assessment: Dict,

                                                   efficiency_analysis: Dict,

                                                   goal: str) -> List[str]:

        """

        Generate specific recommendations for improving future research sessions.

        These recommendations enable continuous learning and adaptation.

        """

        recommendations = []

        

        # Analyze success rate

        success_rate = task_analysis.get("success_rate", 0)

        if success_rate < 0.8:

            recommendations.append(

                f"Improve task success rate (currently {success_rate:.1%}). "

                "Consider adding error recovery mechanisms or tool validation."

            )

        

        # Analyze tool usage patterns

        tool_success_rates = task_analysis.get("tool_success_rates", {})

        for tool, data in tool_success_rates.items():

            if data["success_rate"] < 0.7 and data["total"] > 1:

                recommendations.append(

                    f"Tool '{tool}' has low success rate ({data['success_rate']:.1%}). "

                    "Review tool implementation or usage patterns."

                )

        

        # Analyze research quality

        quality_score = quality_assessment.get("overall_quality_score", 5)

        if quality_score < 7:

            recommendations.append(

                f"Research quality below target (score: {quality_score}/10). "

                "Consider adding more diverse sources or deeper analysis steps."

            )

        

        # Analyze efficiency

        efficiency = efficiency_analysis.get("overall_efficiency", 0.5)

        if efficiency < 0.6:

            recommendations.append(

                f"Research efficiency could be improved (score: {efficiency:.1%}). "

                "Optimize task sequencing and reduce redundant operations."

            )

        

        # Analyze task sequence effectiveness

        sequence_data = task_analysis.get("sequence_effectiveness", {})

        if not sequence_data.get("follows_best_practices", False):

            recommendations.append(

                "Task sequencing could be improved. Consider starting with broad searches "

                "before diving into specific analysis, and end with synthesis."

            )

        

        # Generate LLM-based recommendations

        llm_recommendations = await self._generate_llm_recommendations(

            task_analysis, quality_assessment, efficiency_analysis, goal

        )

        recommendations.extend(llm_recommendations)

        

        return recommendations[:10]  # Limit to top 10 recommendations

    

    async def _generate_llm_recommendations(self, task_analysis: Dict,

                                          quality_assessment: Dict,

                                          efficiency_analysis: Dict,

                                          goal: str) -> List[str]:

        """Generate LLM-based improvement recommendations."""

        

        recommendation_prompt = f"""

Based on this research session analysis, provide 3-5 specific improvement recommendations.


RESEARCH GOAL: {goal}

SUCCESS RATE: {task_analysis.get('success_rate', 0):.1%}

QUALITY SCORE: {quality_assessment.get('overall_quality_score', 5)}/10

EFFICIENCY SCORE: {efficiency_analysis.get('overall_efficiency', 0.5):.1%}


IDENTIFIED ISSUES:

- Quality weaknesses: {quality_assessment.get('weaknesses', [])}

- Efficiency concerns: Low success rate or long duration

- Tool performance: Some tools may have failed frequently


Provide specific, actionable recommendations for improvement:


1. [First recommendation]

2. [Second recommendation]  

3. [Third recommendation]

4. [Fourth recommendation]

5. [Fifth recommendation]


Focus on concrete steps that can be implemented to improve future research sessions.


Recommendations:

"""

        

        try:

            response = await self.llm.generate(recommendation_prompt, temperature=0.5)

            

            # Extract numbered recommendations

            lines = response.strip().split('\n')

            recommendations = []

            

            for line in lines:

                line = line.strip()

                if line and (line[0].isdigit() or line.startswith('-')):

                    # Clean up the recommendation text

                    clean_rec = line.split('.', 1)[-1].strip()

                    if clean_rec:

                        recommendations.append(clean_rec)

            

            return recommendations[:5]  # Return max 5 recommendations

            

        except Exception as e:

            print(f"Error generating LLM recommendations: {e}")

            return ["Review and optimize research methodology based on session performance"]

    

    def _calculate_overall_score(self, quality_assessment: Dict, efficiency_analysis: Dict) -> float:

        """Calculate overall performance score combining quality and efficiency."""

        quality_score = quality_assessment.get("overall_quality_score", 5) / 10  # Normalize to 0-1

        efficiency_score = efficiency_analysis.get("overall_efficiency", 0.5)

        

        # Weight quality higher than efficiency (70/30 split)

        overall_score = (quality_score * 0.7) + (efficiency_score * 0.3)

        return round(overall_score * 10, 2)  # Convert back to 0-10 scale

    

    def _update_improvement_trends(self, reflection: Dict):

        """Update improvement metrics to track progress over time."""

        quality_score = reflection["quality_assessment"].get("overall_quality_score", 5)

        efficiency_score = reflection["efficiency_analysis"].get("overall_efficiency", 0.5) * 10

        overall_score = reflection["overall_score"]

        

        # Add to trends (keep only last 20 sessions)

        self.improvement_metrics["research_quality_trend"].append(quality_score)

        self.improvement_metrics["efficiency_trend"].append(efficiency_score)

        

        # Trim to keep only recent history

        for trend in self.improvement_metrics.values():

            if len(trend) > 20:

                trend.pop(0)

    

    def get_improvement_summary(self) -> Dict[str, Any]:

        """Get a summary of improvement trends over time."""

        if not self.reflection_history:

            return {"message": "No reflection history available"}

        

        recent_reflections = self.reflection_history[-10:]  # Last 10 sessions

        

        # Calculate trends

        quality_trend = self.improvement_metrics["research_quality_trend"]

        efficiency_trend = self.improvement_metrics["efficiency_trend"]

        

        def calculate_trend(values):

            if len(values) < 3:

                return "insufficient_data"

            recent_avg = sum(values[-3:]) / 3

            earlier_avg = sum(values[-6:-3]) / 3 if len(values) >= 6 else sum(values[:-3]) / len(values[:-3])

            

            if recent_avg > earlier_avg * 1.1:

                return "improving"

            elif recent_avg < earlier_avg * 0.9:

                return "declining"

            else:

                return "stable"

        

        return {

            "total_sessions_analyzed": len(self.reflection_history),

            "quality_trend": calculate_trend(quality_trend),

            "efficiency_trend": calculate_trend(efficiency_trend),

            "average_quality_score": sum(quality_trend) / len(quality_trend) if quality_trend else 0,

            "average_efficiency_score": sum(efficiency_trend) / len(efficiency_trend) if efficiency_trend else 0,

            "most_common_recommendations": self._get_common_recommendations(),

            "improvement_areas": self._identify_improvement_areas()

        }

    

    def _get_common_recommendations(self) -> List[str]:

        """Identify the most common recommendations across sessions."""

        all_recommendations = []

        for reflection in self.reflection_history:

            all_recommendations.extend(reflection.get("recommendations", []))

        

        # Simple frequency analysis (in production, use more sophisticated NLP)

        recommendation_keywords = {}

        for rec in all_recommendations:

            words = rec.lower().split()

            for word in words:

                if len(word) > 4:  # Only meaningful words

                    recommendation_keywords[word] = recommendation_keywords.get(word, 0) + 1

        

        # Return top keywords that appear in recommendations

        sorted_keywords = sorted(recommendation_keywords.items(), key=lambda x: x[1], reverse=True)

        return [f"Focus on {word}" for word, count in sorted_keywords[:5] if count > 1]

    

    def _identify_improvement_areas(self) -> List[str]:

        """Identify persistent improvement areas across sessions."""

        if len(self.reflection_history) < 3:

            return ["Need more sessions to identify patterns"]

        

        areas = []

        

        # Check for persistent quality issues

        recent_quality = [r["quality_assessment"].get("overall_quality_score", 5) 

                         for r in self.reflection_history[-5:]]

        if sum(recent_quality) / len(recent_quality) < 7:

            areas.append("Research quality consistency")

        

        # Check for persistent efficiency issues

        recent_efficiency = [r["efficiency_analysis"].get("overall_efficiency", 0.5) 

                           for r in self.reflection_history[-5:]]

        if sum(recent_efficiency) / len(recent_efficiency) < 0.7:

            areas.append("Task execution efficiency")

        

        # Check for recurring tool failures

        tool_failures = {}

        for reflection in self.reflection_history[-5:]:

            task_analysis = reflection.get("task_analysis", {})

            tool_success_rates = task_analysis.get("tool_success_rates", {})

            for tool, data in tool_success_rates.items():

                if data["success_rate"] < 0.8:

                    tool_failures[tool] = tool_failures.get(tool, 0) + 1

        

        for tool, failure_count in tool_failures.items():

            if failure_count >= 3:

                areas.append(f"Tool reliability: {tool}")

        

        return areas if areas else ["Overall performance is good"]


PERFORMANCE MONITORING SYSTEM


# monitoring.py

import time

import psutil

import threading

from dataclasses import dataclass, field

from typing import Dict, List, Any, Optional

from datetime import datetime, timedelta

import json


@dataclass

class PerformanceMetrics:

    """

    Data class for storing comprehensive performance metrics.

    Tracks both system resources and agent-specific performance indicators.

    """

    session_id: str

    start_time: float

    end_time: Optional[float] = None

    tasks_completed: int = 0

    tasks_failed: int = 0

    tools_used: List[str] = field(default_factory=list)

    memory_usage_mb: float = 0.0

    cpu_usage_percent: float = 0.0

    llm_generation_time: float = 0.0

    tool_execution_time: float = 0.0

    

    @property

    def duration(self) -> float:

        """Calculate session duration in seconds."""

        if self.end_time is None:

            return time.time() - self.start_time

        return self.end_time - self.start_time

    

    @property

    def success_rate(self) -> float:

        """Calculate task success rate."""

        total_tasks = self.tasks_completed + self.tasks_failed

        return self.tasks_completed / total_tasks if total_tasks > 0 else 0.0

    

    def to_dict(self) -> Dict[str, Any]:

        """Convert metrics to dictionary for storage/analysis."""

        return {

            "session_id": self.session_id,

            "start_time": self.start_time,

            "end_time": self.end_time,

            "duration": self.duration,

            "tasks_completed": self.tasks_completed,

            "tasks_failed": self.tasks_failed,

            "success_rate": self.success_rate,

            "tools_used": self.tools_used,

            "memory_usage_mb": self.memory_usage_mb,

            "cpu_usage_percent": self.cpu_usage_percent,

            "llm_generation_time": self.llm_generation_time,

            "tool_execution_time": self.tool_execution_time

        }


class PerformanceMonitor:

    """

    Comprehensive performance monitoring system for the research agent.

    Tracks system resources, execution times, and agent-specific metrics

    to enable optimization and debugging.

    """

    

    def __init__(self, monitoring_interval: float = 5.0):

        """

        Initialize the performance monitor.

        

        Args:

            monitoring_interval: Seconds between system resource measurements

        """

        self.monitoring_interval = monitoring_interval

        self.active_sessions = {}

        self.completed_sessions = []

        self.system_metrics_history = []

        self.monitoring_thread = None

        self.monitoring_active = False

        

        # Performance thresholds for alerts

        self.thresholds = {

            "max_memory_mb": 8000,  # 8GB memory threshold

            "max_cpu_percent": 80,   # 80% CPU threshold

            "max_session_duration": 1800,  # 30 minutes max session

            "min_success_rate": 0.7  # 70% minimum success rate

        }

    

    def start_session(self, session_id: str) -> PerformanceMetrics:

        """

        Start monitoring a new research session.

        

        Args:

            session_id: Unique identifier for the session

            

        Returns:

            PerformanceMetrics object for tracking this session

        """

        print(f"Starting performance monitoring for session: {session_id}")

        

        metrics = PerformanceMetrics(

            session_id=session_id,

            start_time=time.time()

        )

        

        self.active_sessions[session_id] = metrics

        

        # Start system monitoring if not already running

        if not self.monitoring_active:

            self._start_system_monitoring()

        

        return metrics

    

    def end_session(self, session_id: str) -> Optional[PerformanceMetrics]:

        """

        End monitoring for a research session and finalize metrics.

        

        Args:

            session_id: Session identifier to end

            

        Returns:

            Final performance metrics for the session

        """

        if session_id not in self.active_sessions:

            print(f"Warning: Session {session_id} not found in active sessions")

            return None

        

        metrics = self.active_sessions[session_id]

        metrics.end_time = time.time()

        

        # Capture final system state

        metrics.memory_usage_mb = psutil.virtual_memory().used / 1024 / 1024

        metrics.cpu_usage_percent = psutil.cpu_percent()

        

        # Move to completed sessions

        self.completed_sessions.append(metrics)

        del self.active_sessions[session_id]

        

        # Stop system monitoring if no active sessions

        if not self.active_sessions and self.monitoring_active:

            self._stop_system_monitoring()

        

        # Check for performance alerts

        self._check_performance_alerts(metrics)

        

        print(f"Session {session_id} completed in {metrics.duration:.2f}s")

        print(f"Success rate: {metrics.success_rate:.1%}")

        

        return metrics

    

    def record_task_completion(self, session_id: str, success: bool, tool_name: str):

        """

        Record the completion of a task within a session.

        

        Args:

            session_id: Session identifier

            success: Whether the task completed successfully

            tool_name: Name of the tool that was used

        """

        if session_id not in self.active_sessions:

            return

        

        metrics = self.active_sessions[session_id]

        

        if success:

            metrics.tasks_completed += 1

        else:

            metrics.tasks_failed += 1

        

        if tool_name not in metrics.tools_used:

            metrics.tools_used.append(tool_name)

    

    def record_llm_generation(self, session_id: str, generation_time: float):

        """

        Record LLM generation time for performance analysis.

        

        Args:

            session_id: Session identifier

            generation_time: Time taken for LLM generation in seconds

        """

        if session_id in self.active_sessions:

            self.active_sessions[session_id].llm_generation_time += generation_time

    

    def record_tool_execution(self, session_id: str, execution_time: float):

        """

        Record tool execution time for performance analysis.

        

        Args:

            session_id: Session identifier

            execution_time: Time taken for tool execution in seconds

        """

        if session_id in self.active_sessions:

            self.active_sessions[session_id].tool_execution_time += execution_time

    

    def _start_system_monitoring(self):

        """Start background thread for system resource monitoring."""

        self.monitoring_active = True

        self.monitoring_thread = threading.Thread(target=self._monitor_system_resources)

        self.monitoring_thread.daemon = True

        self.monitoring_thread.start()

        print("System resource monitoring started")

    

    def _stop_system_monitoring(self):

        """Stop background system resource monitoring."""

        self.monitoring_active = False

        if self.monitoring_thread:

            self.monitoring_thread.join(timeout=1)

        print("System resource monitoring stopped")

    

    def _monitor_system_resources(self):

        """Background monitoring of system resources."""

        while self.monitoring_active:

            try:

                # Collect system metrics

                memory_info = psutil.virtual_memory()

                cpu_percent = psutil.cpu_percent(interval=1)

                

                # Get GPU memory if available

                gpu_memory_mb = 0

                try:

                    import torch

                    if torch.cuda.is_available():

                        gpu_memory_mb = torch.cuda.memory_allocated() / 1024 / 1024

                except:

                    pass

                

                system_metrics = {

                    "timestamp": time.time(),

                    "memory_used_mb": memory_info.used / 1024 / 1024,

                    "memory_percent": memory_info.percent,

                    "cpu_percent": cpu_percent,

                    "gpu_memory_mb": gpu_memory_mb,

                    "active_sessions": len(self.active_sessions)

                }

                

                self.system_metrics_history.append(system_metrics)

                

                # Keep only last hour of metrics

                cutoff_time = time.time() - 3600

                self.system_metrics_history = [

                    m for m in self.system_metrics_history 

                    if m["timestamp"] > cutoff_time

                ]

                

                # Check for system resource alerts

                self._check_system_alerts(system_metrics)

                

            except Exception as e:

                print(f"Error in system monitoring: {e}")

            

            time.sleep(self.monitoring_interval)

    

    def _check_performance_alerts(self, metrics: PerformanceMetrics):

        """Check if session metrics exceed performance thresholds."""

        alerts = []

        

        if metrics.duration > self.thresholds["max_session_duration"]:

            alerts.append(f"Session duration exceeded threshold: {metrics.duration:.0f}s")

        

        if metrics.success_rate < self.thresholds["min_success_rate"]:

            alerts.append(f"Success rate below threshold: {metrics.success_rate:.1%}")

        

        if metrics.memory_usage_mb > self.thresholds["max_memory_mb"]:

            alerts.append(f"Memory usage high: {metrics.memory_usage_mb:.0f}MB")

        

        if alerts:

            print("PERFORMANCE ALERTS:")

            for alert in alerts:

                print(f"  • {alert}")

    

    def _check_system_alerts(self, system_metrics: Dict[str, Any]):

        """Check if system metrics exceed thresholds."""

        if system_metrics["memory_percent"] > 90:

            print(f"WARNING: System memory usage at {system_metrics['memory_percent']:.1f}%")

        

        if system_metrics["cpu_percent"] > self.thresholds["max_cpu_percent"]:

            print(f"WARNING: CPU usage at {system_metrics['cpu_percent']:.1f}%")

    

    def get_performance_summary(self, hours: int = 24) -> Dict[str, Any]:

        """

        Get a comprehensive performance summary for the specified time period.

        

        Args:

            hours: Number of hours to include in the summary

            

        Returns:

            Dictionary containing performance summary

        """

        cutoff_time = time.time() - (hours * 3600)

        

        # Filter recent sessions

        recent_sessions = [

            s for s in self.completed_sessions 

            if s.start_time > cutoff_time

        ]

        

        if not recent_sessions:

            return {"message": f"No sessions in the last {hours} hours"}

        

        # Calculate aggregate metrics

        total_sessions = len(recent_sessions)

        total_duration = sum(s.duration for s in recent_sessions)

        total_tasks = sum(s.tasks_completed + s.tasks_failed for s in recent_sessions)

        successful_tasks = sum(s.tasks_completed for s in recent_sessions)

        

        # Calculate averages

        avg_duration = total_duration / total_sessions

        avg_success_rate = successful_tasks / total_tasks if total_tasks > 0 else 0

        avg_memory_usage = sum(s.memory_usage_mb for s in recent_sessions) / total_sessions

        

        # Tool usage analysis

        all_tools = []

        for session in recent_sessions:

            all_tools.extend(session.tools_used)

        

        tool_usage = {}

        for tool in all_tools:

            tool_usage[tool] = tool_usage.get(tool, 0) + 1

        

        # System resource trends

        recent_system_metrics = [

            m for m in self.system_metrics_history 

            if m["timestamp"] > cutoff_time

        ]

        

        if recent_system_metrics:

            avg_cpu = sum(m["cpu_percent"] for m in recent_system_metrics) / len(recent_system_metrics)

            peak_memory = max(m["memory_used_mb"] for m in recent_system_metrics)

        else:

            avg_cpu = 0

            peak_memory = 0

        

        return {

            "time_period_hours": hours,

            "total_sessions": total_sessions,

            "total_duration_minutes": total_duration / 60,

            "average_session_duration": avg_duration,

            "total_tasks_executed": total_tasks,

            "overall_success_rate": avg_success_rate,

            "average_memory_usage_mb": avg_memory_usage,

            "tool_usage_frequency": tool_usage,

            "system_performance": {

                "average_cpu_percent": avg_cpu,

                "peak_memory_usage_mb": peak_memory,

                "monitoring_data_points": len(recent_system_metrics)

            },

            "performance_trends": self._calculate_performance_trends(recent_sessions)

        }

    

    def _calculate_performance_trends(self, sessions: List[PerformanceMetrics]) -> Dict[str, str]:

        """Calculate performance trends over recent sessions."""

        if len(sessions) < 4:

            return {"trend_analysis": "Insufficient data for trend analysis"}

        

        # Split sessions into two halves for comparison

        mid_point = len(sessions) // 2

        early_sessions = sessions[:mid_point]

        recent_sessions = sessions[mid_point:]

        

        # Calculate averages for each half

        early_success_rate = sum(s.success_rate for s in early_sessions) / len(early_sessions)

        recent_success_rate = sum(s.success_rate for s in recent_sessions) / len(recent_sessions)

        

        early_duration = sum(s.duration for s in early_sessions) / len(early_sessions)

        recent_duration = sum(s.duration for s in recent_sessions) / len(recent_sessions)

        

        # Determine trends

        success_trend = "improving" if recent_success_rate > early_success_rate * 1.05 else \

                       "declining" if recent_success_rate < early_success_rate * 0.95 else "stable"

        

        duration_trend = "improving" if recent_duration < early_duration * 0.95 else \

                        "declining" if recent_duration > early_duration * 1.05 else "stable"

        

        return {

            "success_rate_trend": success_trend,

            "duration_trend": duration_trend,

            "early_success_rate": f"{early_success_rate:.1%}",

            "recent_success_rate": f"{recent_success_rate:.1%}",

            "early_avg_duration": f"{early_duration:.1f}s",

            "recent_avg_duration": f"{recent_duration:.1f}s"

        }

    

    def export_metrics(self, filename: str = None) -> str:

        """

        Export performance metrics to JSON file.

        

        Args:

            filename: Optional custom filename

            

        Returns:

            Path to the exported file

        """

        if filename is None:

            filename = f"performance_metrics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"

        

        export_data = {

            "export_timestamp": datetime.now().isoformat(),

            "completed_sessions": [s.to_dict() for s in self.completed_sessions],

            "system_metrics_history": self.system_metrics_history,

            "performance_summary": self.get_performance_summary(24),

            "thresholds": self.thresholds

        }

        

        try:

            with open(filename, 'w') as f:

                json.dump(export_data, f, indent=2)

            

            print(f"Performance metrics exported to: {filename}")

            return filename

            

        except Exception as e:

            print(f"Error exporting metrics: {e}")

            return ""


Code Explanation for Advanced Features:

  • The ReflectionSystem enables the agent to analyze its own performance and continuously improve
  • Quality assessment uses the LLM to evaluate research completeness, accuracy, and actionability
  • Efficiency analysis tracks time usage, success rates, and resource optimization
  • Improvement recommendations are generated both programmatically and using LLM analysis
  • Performance trends are tracked over time to identify long-term improvement patterns
  • The PerformanceMonitor provides comprehensive system and agent-specific monitoring
  • Resource usage tracking helps optimize hardware requirements and identify bottlenecks
  • Alert systems warn about performance issues before they become critical
  • Metrics export enables detailed analysis and reporting for system optimization


STEP 9: DEPLOYMENT OPTIONS


API SERVER DEPLOYMENT


# api_server.py

from fastapi import FastAPI, BackgroundTasks, HTTPException, Depends

from fastapi.middleware.cors import CORSMiddleware

from fastapi.responses import JSONResponse

from pydantic import BaseModel, Field

from typing import Dict, List, Any, Optional

import asyncio

import uuid

from datetime import datetime

import logging


# Pydantic models for API requests and responses

class ResearchRequest(BaseModel):

    """Model for research request data."""

    goal: str = Field(..., description="Research objective or question", min_length=10)

    context: str = Field("", description="Additional context for the research")

    priority: int = Field(1, description="Priority level (1-5)", ge=1, le=5)

    max_tasks: int = Field(15, description="Maximum number of tasks to execute", ge=1, le=50)

    tools_to_use: List[str] = Field([], description="Specific tools to use (empty for auto-select)")


class ResearchResponse(BaseModel):

    """Model for research response data."""

    task_id: str = Field(..., description="Unique task identifier")

    status: str = Field(..., description="Current status of the research")

    goal: str = Field(..., description="Research goal")

    created_at: str = Field(..., description="Timestamp when research was started")

    result: Optional[str] = Field(None, description="Research results (when completed)")

    progress: Optional[Dict[str, Any]] = Field(None, description="Progress information")

    error: Optional[str] = Field(None, description="Error message if failed")


class HealthResponse(BaseModel):

    """Model for health check responses."""

    status: str

    agent_status: str

    uptime_seconds: float

    total_requests: int

    active_tasks: int

    system_info: Dict[str, Any]


# Global variables for the API server

app = FastAPI(

    title="Research Agent API",

    description="RESTful API for the Agentic AI Research Assistant",

    version="1.0.0",

    docs_url="/docs",

    redoc_url="/redoc"

)


# Add CORS middleware for web applications

app.add_middleware(

    CORSMiddleware,

    allow_origins=["*"],  # In production, specify exact origins

    allow_credentials=True,

    allow_methods=["*"],

    allow_headers=["*"],

)


# Global state management

research_agent = None

active_tasks = {}

completed_tasks = {}

server_start_time = datetime.now()

request_counter = 0


class APIServer:

    """

    FastAPI server wrapper for the Research Agent.

    Provides RESTful endpoints for research requests and monitoring.

    """

    

    def __init__(self, agent_config: Dict[str, Any] = None):

        """

        Initialize the API server with a research agent instance.

        

        Args:

            agent_config: Configuration for the research agent

        """

        global research_agent

        

        self.logger = logging.getLogger(__name__)

        

        try:

            # Initialize the research agent

            self.logger.info("Initializing Research Agent for API server...")

            research_agent = ResearchAgentApp(agent_config or {})

            self.logger.info("Research Agent initialized successfully")

            

        except Exception as e:

            self.logger.error(f"Failed to initialize Research Agent: {e}")

            raise RuntimeError(f"Could not start API server: {e}")


async def get_agent():

    """Dependency to get the research agent instance."""

    if research_agent is None:

        raise HTTPException(status_code=503, detail="Research Agent not initialized")

    return research_agent


@app.on_event("startup")

async def startup_event():

    """Initialize the research agent when the server starts."""

    global research_agent

    

    print("Starting Research Agent API Server...")

    

    try:

        # You would initialize your agent configuration here

        config = {

            "search_api_key": "your_search_api_key_here",

            "max_tasks_per_session": 20,

            "enable_caching": True

        }

        

        server = APIServer(config)

        print("✅ Research Agent API Server ready!")

        

    except Exception as e:

        print(f"❌ Failed to start server: {e}")

        raise


@app.get("/health", response_model=HealthResponse)

async def health_check():

    """

    Health check endpoint to monitor server status.

    Returns system information and agent status.

    """

    global request_counter, active_tasks, server_start_time

    

    uptime = (datetime.now() - server_start_time).total_seconds()

    

    # Get system information

    import psutil

    system_info = {

        "cpu_usage_percent": psutil.cpu_percent(),

        "memory_usage_percent": psutil.virtual_memory().percent,

        "disk_usage_percent": psutil.disk_usage('/').percent

    }

    

    # Check if GPU is available

    try:

        import torch

        if torch.cuda.is_available():

            system_info["gpu_available"] = True

            system_info["gpu_memory_gb"] = torch.cuda.memory_allocated() / 1e9

        else:

            system_info["gpu_available"] = False

    except:

        system_info["gpu_available"] = False

    

    agent_status = "ready" if research_agent else "not_initialized"

    

    return HealthResponse(

        status="healthy",

        agent_status=agent_status,

        uptime_seconds=uptime,

        total_requests=request_counter,

        active_tasks=len(active_tasks),

        system_info=system_info

    )


@app.post("/research", response_model=ResearchResponse)

async def start_research(

    request: ResearchRequest, 

    background_tasks: BackgroundTasks,

    agent = Depends(get_agent)

):

    """

    Start a new research task.

    Research runs in the background and can be monitored via the status endpoint.

    """

    global request_counter, active_tasks

    

    task_id = str(uuid.uuid4())

    request_counter += 1

    

    # Store task information

    task_info = {

        "task_id": task_id,

        "goal": request.goal,

        "context": request.context,

        "priority": request.priority,

        "status": "started",

        "created_at": datetime.now().isoformat(),

        "progress": {"phase": "initialization", "tasks_completed": 0}

    }

    

    active_tasks[task_id] = task_info

    

    # Start research in background

    background_tasks.add_task(

        execute_research_task, 

        task_id, 

        request.goal, 

        request.context,

        request.max_tasks,

        request.tools_to_use

    )

    

    return ResearchResponse(

        task_id=task_id,

        status="started",

        goal=request.goal,

        created_at=task_info["created_at"],

        progress=task_info["progress"]

    )


@app.get("/research/{task_id}", response_model=ResearchResponse)

async def get_research_status(task_id: str):

    """

    Get the status of a research task.

    Returns current progress, results if completed, or error information.

    """

    # Check active tasks first

    if task_id in active_tasks:

        task_info = active_tasks[task_id]

        return ResearchResponse(

            task_id=task_id,

            status=task_info["status"],

            goal=task_info["goal"],

            created_at=task_info["created_at"],

            progress=task_info.get("progress")

        )

    

    # Check completed tasks

    elif task_id in completed_tasks:

        task_info = completed_tasks[task_id]

        return ResearchResponse(

            task_id=task_id,

            status=task_info["status"],

            goal=task_info["goal"],

            created_at=task_info["created_at"],

            result=task_info.get("result"),

            error=task_info.get("error")

        )

    

    else:

        raise HTTPException(status_code=404, detail="Task not found")


@app.get("/research", response_model=List[ResearchResponse])

async def list_research_tasks(limit: int = 20, status_filter: str = None):

    """

    List recent research tasks with optional status filtering.

    

    Args:

        limit: Maximum number of tasks to return

        status_filter: Filter by status (started, running, completed, failed)

    """

    all_tasks = []

    

    # Add active tasks

    for task_info in active_tasks.values():

        if status_filter is None or task_info["status"] == status_filter:

            all_tasks.append(ResearchResponse(

                task_id=task_info["task_id"],

                status=task_info["status"],

                goal=task_info["goal"],

                created_at=task_info["created_at"],

                progress=task_info.get("progress")

            ))

    

    # Add completed tasks

    for task_info in completed_tasks.values():

        if status_filter is None or task_info["status"] == status_filter:

            all_tasks.append(ResearchResponse(

                task_id=task_info["task_id"],

                status=task_info["status"],

                goal=task_info["goal"],

                created_at=task_info["created_at"],

                result=task_info.get("result"),

                error=task_info.get("error")

            ))

    

    # Sort by creation time (newest first)

    all_tasks.sort(key=lambda x: x.created_at, reverse=True)

    

    return all_tasks[:limit]


@app.delete("/research/{task_id}")

async def cancel_research_task(task_id: str):

    """

    Cancel an active research task.

    Note: This is a simple implementation - in production you'd need

    proper task cancellation mechanisms.

    """

    if task_id not in active_tasks:

        raise HTTPException(status_code=404, detail="Task not found or already completed")

    

    task_info = active_tasks[task_id]

    task_info["status"] = "cancelled"

    task_info["error"] = "Task cancelled by user request"

    

    # Move to completed tasks

    completed_tasks[task_id] = task_info

    del active_tasks[task_id]

    

    return {"message": f"Task {task_id} cancelled successfully"}


@app.get("/stats")

async def get_server_stats():

    """Get comprehensive server statistics."""

    uptime = (datetime.now() - server_start_time).total_seconds()

    

    # Calculate status distribution

    status_counts = {}

    for task in active_tasks.values():

        status = task["status"]

        status_counts[status] = status_counts.get(status, 0) + 1

    

    for task in completed_tasks.values():

        status = task["status"]

        status_counts[status] = status_counts.get(status, 0) + 1

    

    # Calculate average completion time for completed tasks

    completed_task_list = list(completed_tasks.values())

    avg_completion_time = 0

    if completed_task_list:

        total_time = 0

        count = 0

        for task in completed_task_list:

            if "completion_time" in task:

                total_time += task["completion_time"]

                count += 1

        avg_completion_time = total_time / count if count > 0 else 0

    

    return {

        "server_uptime_seconds": uptime,

        "total_requests": request_counter,

        "active_tasks": len(active_tasks),

        "completed_tasks": len(completed_tasks),

        "status_distribution": status_counts,

        "average_completion_time_seconds": avg_completion_time,

        "agent_status": "ready" if research_agent else "not_initialized"

    }


async def execute_research_task(task_id: str, goal: str, context: str, 

                               max_tasks: int, tools_to_use: List[str]):

    """

    Background task that executes the actual research.

    Updates task status and stores results.

    """

    global active_tasks, completed_tasks

    

    start_time = datetime.now()

    

    try:

        # Update status to running

        if task_id in active_tasks:

            active_tasks[task_id]["status"] = "running"

            active_tasks[task_id]["progress"] = {

                "phase": "research_execution", 

                "tasks_completed": 0

            }

        

        # Execute the research using the global agent

        result = await research_agent.research(goal, context)

        

        # Calculate completion time

        completion_time = (datetime.now() - start_time).total_seconds()

        

        # Move to completed tasks

        completed_task_info = {

            "task_id": task_id,

            "goal": goal,

            "status": "completed",

            "created_at": active_tasks[task_id]["created_at"],

            "completed_at": datetime.now().isoformat(),

            "completion_time": completion_time,

            "result": result

        }

        

        completed_tasks[task_id] = completed_task_info

        

        if task_id in active_tasks:

            del active_tasks[task_id]

            

        print(f"Research task {task_id} completed successfully")

        

    except Exception as e:

        # Handle research failure

        error_message = str(e)

        completion_time = (datetime.now() - start_time).total_seconds()

        

        completed_task_info = {

            "task_id": task_id,

            "goal": goal,

            "status": "failed",

            "created_at": active_tasks[task_id]["created_at"] if task_id in active_tasks else start_time.isoformat(),

            "completed_at": datetime.now().isoformat(),

            "completion_time": completion_time,

            "error": error_message

        }

        

        completed_tasks[task_id] = completed_task_info

        

        if task_id in active_tasks:

            del active_tasks[task_id]

            

        print(f"Research task {task_id} failed: {error_message}")


# Server startup script

def run_server(host: str = "0.0.0.0", port: int = 8000, reload: bool = False):

    """

    Run the API server.

    

    Args:

        host: Host address to bind to

        port: Port number to use

        reload: Enable auto-reload for development

    """

    import uvicorn

    

    print(f"Starting Research Agent API Server on {host}:{port}")

    print(f"API Documentation: http://{host}:{port}/docs")

    print(f"Alternative Docs: http://{host}:{port}/redoc")

    

    uvicorn.run(

        "api_server:app",

        host=host,

        port=port,

        reload=reload,

        log_level="info"

    )


if __name__ == "__main__":

    run_server(reload=True)



DOCKER DEPLOYMENT 


# Dockerfile

FROM python:3.9-slim


# Set working directory

WORKDIR /app


# Install system dependencies

RUN apt-get update && apt-get install -y \

    build-essential \

    curl \

    software-properties-common \

    git \

    && rm -rf /var/lib/apt/lists/*


# Copy requirements file

COPY requirements.txt .


# Install Python dependencies

RUN pip install --no-cache-dir -r requirements.txt


# Copy application code

COPY . .


# Create directories for outputs

RUN mkdir -p /app/charts /app/research_outputs


# Expose port

EXPOSE 8000


# Health check

HEALTHCHECK --interval=30s --timeout=30s --start-period=60s --retries=3 \

    CMD curl -f http://localhost:8000/health || exit 1


# Run the application

CMD ["python", "api_server.py"]


# docker-compose.yml

version: '3.8'


services:

  research-agent:

    build: .

    ports:

      - "8000:8000"

    environment:

      - SEARCH_API_KEY=${SEARCH_API_KEY}

      - MODEL_NAME=mistralai/Mistral-7B-Instruct-v0.2

      - MAX_TASKS_PER_SESSION=20

    volumes:

      - ./research_outputs:/app/research_outputs

      - ./charts:/app/charts

      - ./logs:/app/logs

    restart: unless-stopped

    healthcheck:

      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]

      interval: 30s

      timeout: 10s

      retries: 3

      start_period: 60s


  # Optional: Add Redis for caching (production)

  redis:

    image: redis:alpine

    ports:

      - "6379:6379"

    restart: unless-stopped


  # Optional: Add monitoring with Prometheus

  prometheus:

    image: prom/prometheus

    ports:

      - "9090:9090"

    volumes:

      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml

    restart: unless-stopped


# requirements.txt for deployment

transformers==4.35.0

torch==2.1.0

accelerate==0.24.0

langchain==0.1.0

langchain-community==0.0.10

requests==2.31.0

beautifulsoup4==4.12.2

arxiv==1.4.8

matplotlib==3.8.0

plotly==5.17.0

pandas==2.1.0

sentence-transformers==2.2.2

chromadb==0.4.15

pydantic==2.5.0

fastapi==0.104.0

uvicorn==0.24.0

psutil==5.9.6


KUBERNETES DEPLOYMENT


# k8s-deployment.yaml

apiVersion: apps/v1

kind: Deployment

metadata:

  name: research-agent

  labels:

    app: research-agent

spec:

  replicas: 2

  selector:

    matchLabels:

      app: research-agent

  template:

    metadata:

      labels:

        app: research-agent

    spec:

      containers:

      - name: research-agent

        image: research-agent:latest

        ports:

        - containerPort: 8000

        env:

        - name: SEARCH_API_KEY

          valueFrom:

            secretKeyRef:

              name: research-agent-secrets

              key: search-api-key

        - name: MODEL_NAME

          value: "mistralai/Mistral-7B-Instruct-v0.2"

        - name: MAX_TASKS_PER_SESSION

          value: "20"

        resources:

          requests:

            memory: "4Gi"

            cpu: "1000m"

          limits:

            memory: "8Gi"

            cpu: "2000m"

        livenessProbe:

          httpGet:

            path: /health

            port: 8000

          initialDelaySeconds: 60

          periodSeconds: 30

        readinessProbe:

          httpGet:

            path: /health

            port: 8000

          initialDelaySeconds: 30

          periodSeconds: 10

        volumeMounts:

        - name: research-outputs

          mountPath: /app/research_outputs

        - name: charts

          mountPath: /app/charts

      volumes:

      - name: research-outputs

        persistentVolumeClaim:

          claimName: research-outputs-pvc

      - name: charts

        persistentVolumeClaim:

          claimName: charts-pvc


---

apiVersion: v1

kind: Service

metadata:

  name: research-agent-service

spec:

  selector:

    app: research-agent

  ports:

  - protocol: TCP

    port: 80

    targetPort: 8000

  type: LoadBalancer


---

apiVersion: v1

kind: Secret

metadata:

  name: research-agent-secrets

type: Opaque

data:

  search-api-key: <base64-encoded-api-key>


---

apiVersion: v1

kind: PersistentVolumeClaim

metadata:

  name: research-outputs-pvc

spec:

  accessModes:

    - ReadWriteOnce

  resources:

    requests:

      storage: 10Gi


---

apiVersion: v1

kind: PersistentVolumeClaim

metadata:

  name: charts-pvc

spec:

  accessModes:

    - ReadWriteOnce

  resources:

    requests:

      storage: 5Gi


Code Explanation for Deployment:

  • FastAPI server provides RESTful endpoints for research requests and monitoring
  • Background task execution allows non-blocking research operations
  • Comprehensive health checks and status monitoring enable production deployment
  • Docker containerization ensures consistent deployment across environments
  • Kubernetes deployment provides scalability, health checks, and persistent storage
  • Environment variable configuration allows easy customization per deployment
  • Resource limits prevent any single task from consuming excessive system resources


STEP 10: TESTING AND VALIDATION



UNIT TESTS


# test_agent.py

import pytest

import asyncio

import tempfile

import json

from datetime import datetime

from unittest.mock import Mock, patch, AsyncMock


# Import the classes we want to test

from agent_core import Task, TaskStatus, AgentState, ResearchAgent

from planning import PlanningSystem

from tools import ToolRegistry, WebSearchTool, DataVisualizerTool

from memory import MemorySystem

from llm_provider import MistralProvider

from main_agent import ResearchAgentApp


class TestTask:

    """Test the Task data class functionality."""

    

    def test_task_creation(self):

        """Test basic task creation and initialization."""

        task = Task(

            id="test_001",

            description="Test task description",

            tool_name="web_search",

            parameters={"query": "test query", "num_results": 5}

        )

        

        assert task.id == "test_001"

        assert task.description == "Test task description"

        assert task.tool_name == "web_search"

        assert task.parameters["query"] == "test query"

        assert task.status == TaskStatus.PENDING

        assert task.result is None

        assert task.created_at is not None

    

    def test_task_status_updates(self):

        """Test task status transitions."""

        task = Task("test_001", "Test task", "web_search", {})

        

        # Test status progression

        assert task.status == TaskStatus.PENDING

        

        task.status = TaskStatus.IN_PROGRESS

        assert task.status == TaskStatus.IN_PROGRESS

        

        task.status = TaskStatus.COMPLETED

        task.result = {"success": True, "data": "test result"}

        assert task.status == TaskStatus.COMPLETED

        assert task.result["success"] is True


class TestAgentState:

    """Test the AgentState data class."""

    

    def test_agent_state_initialization(self):

        """Test agent state initialization."""

        state = AgentState(

            current_goal="Test research goal",

            task_queue=[],

            completed_tasks=[],

            memory={},

            context=""

        )

        

        assert state.current_goal == "Test research goal"

        assert len(state.task_queue) == 0

        assert len(state.completed_tasks) == 0

        assert state.memory == {}

        assert state.context == ""


class TestToolRegistry:

    """Test the tool registry functionality."""

    

    def test_tool_registration(self):

        """Test registering tools in the registry."""

        registry = ToolRegistry()

        

        # Create a mock tool

        mock_tool = Mock()

        mock_tool.execute = AsyncMock(return_value={"status": "success"})

        

        registry.register("test_tool", mock_tool)

        

        assert "test_tool" in registry.tools

        assert registry.tools["test_tool"] == mock_tool

    

    def test_tool_not_found_error(self):

        """Test error handling for non-existent tools."""

        registry = ToolRegistry()

        

        with pytest.raises(ValueError) as exc_info:

            asyncio.run(registry.execute_tool("nonexistent_tool", param="value"))

        

        assert "Tool 'nonexistent_tool' not found" in str(exc_info.value)

    

    @pytest.mark.asyncio

    async def test_tool_execution(self):

        """Test successful tool execution."""

        registry = ToolRegistry()

        

        # Create and register a mock tool

        mock_tool = Mock()

        mock_tool.execute = AsyncMock(return_value={"result": "test data", "status": "success"})

        

        registry.register("test_tool", mock_tool)

        

        # Execute the tool

        result = await registry.execute_tool("test_tool", query="test")

        

        assert result["result"] == "test data"

        assert result["status"] == "success"

        mock_tool.execute.assert_called_once_with(query="test")

    

    @pytest.mark.asyncio

    async def test_tool_execution_error_handling(self):

        """Test tool execution error handling."""

        registry = ToolRegistry()

        

        # Create a tool that raises an exception

        mock_tool = Mock()

        mock_tool.execute = AsyncMock(side_effect=Exception("Tool execution failed"))

        

        registry.register("failing_tool", mock_tool)

        

        # Test that the exception is propagated

        with pytest.raises(Exception) as exc_info:

            await registry.execute_tool("failing_tool", param="value")

        

        assert "Tool execution failed" in str(exc_info.value)


class TestMemorySystem:

    """Test the memory system functionality."""

    

    def setUp(self):

        """Set up test environment with temporary memory."""

        # Use in-memory ChromaDB for testing

        self.memory = MemorySystem("test_collection")

    

    def test_memory_storage(self):

        """Test storing and retrieving memories."""

        memory = MemorySystem("test_memory")

        

        # Store a test result

        task_id = "test_task_001"

        content = "This is test research content about renewable energy"

        metadata = {

            "type": "research_result",

            "goal": "renewable energy research"

        }

        

        memory.store_result(task_id, content, metadata)

        

        # Retrieve relevant memories

        relevant = memory.retrieve_relevant("renewable energy", k=1)

        

        assert len(relevant) > 0

        assert "renewable energy" in relevant[0]["content"]

    

    def test_memory_relevance_filtering(self):

        """Test memory relevance scoring and filtering."""

        memory = MemorySystem("test_relevance")

        

        # Store multiple memories with different relevance

        memory.store_result("task1", "Solar energy research findings", {"type": "research"})

        memory.store_result("task2", "Wind energy statistics and data", {"type": "research"})

        memory.store_result("task3", "Cooking recipes for dinner", {"type": "personal"})

        

        # Query for energy-related content

        relevant = memory.retrieve_relevant("solar power energy", k=5)

        

        # Should return energy-related content with higher relevance

        assert len(relevant) >= 2

        # Solar energy should be most relevant

        assert "solar" in relevant[0]["content"].lower()


class TestPlanningSystem:

    """Test the planning system functionality."""

    

    def setUp(self):

        """Set up mock LLM for testing."""

        self.mock_llm = Mock()

    

    @pytest.mark.asyncio

    async def test_plan_creation(self):

        """Test creation of research plans."""

        mock_llm = Mock()

        

        # Mock LLM response with valid JSON

        mock_plan_response = '''

        {

          "strategy": "Comprehensive renewable energy analysis",

          "tasks": [

            {

              "id": "task_001",

              "description": "Search for renewable energy statistics",

              "tool_name": "web_search",

              "parameters": {"query": "renewable energy statistics 2024", "num_results": 5},

              "priority": 1,

              "dependencies": []

            },

            {

              "id": "task_002", 

              "description": "Analyze renewable energy trends",

              "tool_name": "data_visualizer",

              "parameters": {"data": [], "chart_type": "line", "title": "Renewable Energy Trends"},

              "priority": 2,

              "dependencies": ["task_001"]

            }

          ],

          "success_criteria": [

            "Gather current renewable energy adoption statistics",

            "Identify key trends and patterns"

          ]

        }

        '''

        

        mock_llm.generate = AsyncMock(return_value=mock_plan_response)

        

        planner = PlanningSystem(mock_llm)

        

        # Test plan creation

        tasks = await planner.create_plan("Analyze renewable energy adoption trends")

        

        assert len(tasks) == 2

        assert tasks[0].id == "task_001"

        assert tasks[0].tool_name == "web_search"

        assert tasks[1].id == "task_002"

        assert tasks[1].tool_name == "data_visualizer"

    

    @pytest.mark.asyncio

    async def test_plan_creation_with_invalid_json(self):

        """Test plan creation fallback when LLM returns invalid JSON."""

        mock_llm = Mock()

        mock_llm.generate = AsyncMock(return_value="Invalid JSON response")

        

        planner = PlanningSystem(mock_llm)

        

        # Should fall back to default plan

        tasks = await planner.create_plan("Test research goal")

        

        assert len(tasks) >= 1

        assert tasks[0].tool_name in ["web_search", "synthesis"]


class TestDataVisualizerTool:

    """Test the data visualization tool."""

    

    @pytest.mark.asyncio

    async def test_bar_chart_creation(self):

        """Test creating a bar chart."""

        with tempfile.TemporaryDirectory() as temp_dir:

            # Create visualizer with temporary output directory

            visualizer = DataVisualizerTool()

            visualizer.output_dir = temp_dir

            

            # Test data

            data = [

                {"Country": "Germany", "Capacity": 150},

                {"Country": "France", "Capacity": 120},

                {"Country": "Spain", "Capacity": 100}

            ]

            

            result = await visualizer.execute(

                data=data,

                chart_type="bar", 

                title="Renewable Energy Capacity by Country"

            )

            

            assert result["status"] == "success"

            assert result["chart_type"] == "bar"

            assert result["data_points"] == 3

            assert "chart_path" in result

    

    @pytest.mark.asyncio

    async def test_empty_data_error_handling(self):

        """Test error handling for empty data."""

        visualizer = DataVisualizerTool()

        

        result = await visualizer.execute(

            data=[],

            chart_type="bar",

            title="Empty Data Test"

        )

        

        assert result["status"] == "failed"

        assert "error" in result


class TestResearchAgentApp:

    """Integration tests for the complete research agent application."""

    

    @pytest.fixture

    def mock_config(self):

        """Provide test configuration."""

        return {

            "search_api_key": "test_api_key",

            "max_tasks_per_session": 5,

            "enable_caching": False

        }

    

    @patch('main_agent.MistralProvider')

    @patch('main_agent.MemorySystem')

    @patch('main_agent.ToolRegistry')

    def test_agent_initialization(self, mock_tool_registry, mock_memory, mock_llm, mock_config):

        """Test agent initialization with mocked components."""

        

        # Configure mocks

        mock_llm_instance = Mock()

        mock_llm.return_value = mock_llm_instance

        

        mock_memory_instance = Mock()

        mock_memory.return_value = mock_memory_instance

        

        mock_registry_instance = Mock()

        mock_tool_registry.return_value = mock_registry_instance

        

        # Initialize agent

        agent = ResearchAgentApp(mock_config)

        

        assert agent.config["max_tasks_per_session"] == 5

        assert agent.config["enable_caching"] is False

        assert agent.llm == mock_llm_instance

        assert agent.memory == mock_memory_instance

    

    @pytest.mark.asyncio

    @patch('main_agent.MistralProvider')

    @patch('main_agent.MemorySystem') 

    @patch('main_agent.ToolRegistry')

    async def test_research_execution_flow(self, mock_tool_registry, mock_memory, mock_llm, mock_config):

        """Test the complete research execution flow."""

        

        # Set up mocks

        mock_llm_instance = Mock()

        mock_llm_instance.generate = AsyncMock(return_value="Mock research result")

        mock_llm.return_value = mock_llm_instance

        

        mock_memory_instance = Mock()

        mock_memory_instance.get_research_context = Mock(return_value="Mock context")

        mock_memory_instance.store_result = Mock()

        mock_memory.return_value = mock_memory_instance

        

        mock_registry_instance = Mock()

        mock_registry_instance.execute_tool = AsyncMock(return_value={"status": "success", "data": "mock data"})

        mock_tool_registry.return_value = mock_registry_instance

        

        # Mock planning system

        with patch('main_agent.PlanningSystem') as mock_planning:

            mock_planner = Mock()

            mock_task = Task("test_001", "Mock task", "web_search", {"query": "test"})

            mock_planner.create_plan = AsyncMock(return_value=[mock_task])

            mock_planner.should_replan = AsyncMock(return_value=False)

            mock_planning.return_value = mock_planner

            

            # Initialize and run research

            agent = ResearchAgentApp(mock_config)

            result = await agent.research("Test research goal")

            

            # Verify execution

            assert isinstance(result, str)

            assert len(result) > 0

            mock_planner.create_plan.assert_called()

            mock_registry_instance.execute_tool.assert_called()


INTEGRATION TESTS


# test_integration.py

import pytest

import asyncio

import tempfile

import os

from datetime import datetime


class TestEndToEndIntegration:

    """

    End-to-end integration tests that use real components

    but with test data and configurations.

    """

    

    @pytest.mark.asyncio

    async def test_complete_research_workflow(self):

        """

        Test a complete research workflow with minimal real components.

        This test uses actual classes but with mock external dependencies.

        """

        

        # Create temporary directories for test outputs

        with tempfile.TemporaryDirectory() as temp_dir:

            

            # Configure test agent with minimal real setup

            test_config = {

                "search_api_key": "test_key",  # Will use mock search

                "max_tasks_per_session": 3,

                "output_directory": temp_dir,

                "enable_caching": False

            }

            

            # Mock external dependencies

            with patch('tools.requests.get') as mock_requests, \

                 patch('llm_provider.AutoModelForCausalLM') as mock_model, \

                 patch('llm_provider.AutoTokenizer') as mock_tokenizer:

                

                # Configure HTTP mock

                mock_response = Mock()

                mock_response.json.return_value = {

                    "items": [

                        {

                            "title": "Renewable Energy Report 2024",

                            "snippet": "Latest trends in renewable energy adoption",

                            "link": "https://example.com/report",

                            "displayLink": "example.com"

                        }

                    ]

                }

                mock_response.raise_for_status.return_value = None

                mock_requests.return_value = mock_response

                

                # Configure LLM mocks

                mock_tokenizer_instance = Mock()

                mock_tokenizer_instance.encode.return_value = [1, 2, 3, 4, 5]

                mock_tokenizer_instance.decode.return_value = "Mock LLM response"

                mock_tokenizer_instance.eos_token_id = 2

                mock_tokenizer_instance.pad_token = None

                mock_tokenizer.from_pretrained.return_value = mock_tokenizer_instance

                

                mock_model_instance = Mock()

                mock_model_instance.generate.return_value = [[1, 2, 3, 4, 5, 6, 7]]

                mock_model_instance.eval.return_value = None

                mock_model.from_pretrained.return_value = mock_model_instance

                

                # Initialize and test the agent

                try:

                    agent = ResearchAgentApp(test_config)

                    

                    # Execute a research task

                    result = await agent.research("Test renewable energy trends")

                    

                    # Verify results

                    assert isinstance(result, str)

                    assert len(result) > 50  # Should be a substantial response

                    assert "renewable energy" in result.lower() or "research" in result.lower()

                    

                except Exception as e:

                    pytest.skip(f"Integration test skipped due to setup issues: {e}")

    

    @pytest.mark.asyncio

    async def test_tool_integration(self):

        """Test integration between different tools."""

        

        # Test tool registry with multiple tools

        registry = ToolRegistry()

        

        # Add real tools but with mocked external dependencies

        with patch('tools.requests.get') as mock_get:

            

            # Mock successful web search

            mock_response = Mock()

            mock_response.json.return_value = {"items": [{"title": "Test", "snippet": "Test result"}]}

            mock_response.raise_for_status.return_value = None

            mock_response.elapsed.total_seconds.return_value = 0.5

            mock_get.return_value = mock_response

            

            # Register tools

            web_search = WebSearchTool("test_key")

            registry.register("web_search", web_search)

            

            data_viz = DataVisualizerTool()

            registry.register("data_visualizer", data_viz)

            

            # Test tool execution

            search_result = await registry.execute_tool("web_search", query="test", num_results=1)

            assert search_result["status"] == "success"

            

            # Test visualization with sample data

            viz_data = [{"x": 1, "y": 10}, {"x": 2, "y": 20}]

            viz_result = await registry.execute_tool(

                "data_visualizer", 

                data=viz_data, 

                chart_type="line", 

                title="Test Chart"

            )

            assert viz_result["status"] == "success"


PERFORMANCE TESTS


# test_performance.py

import pytest

import asyncio

import time

import psutil

import threading

from concurrent.futures import ThreadPoolExecutor


class TestPerformance:

    """Performance and load testing for the research agent."""

    

    @pytest.mark.asyncio

    async def test_memory_usage_stability(self):

        """Test that memory usage remains stable during operation."""

        

        # Monitor memory usage

        initial_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB

        

        # Simulate multiple research sessions

        with patch('main_agent.MistralProvider') as mock_llm, \

             patch('main_agent.MemorySystem') as mock_memory, \

             patch('main_agent.ToolRegistry') as mock_tools:

            

            # Configure mocks for minimal resource usage

            mock_llm_instance = Mock()

            mock_llm_instance.generate = AsyncMock(return_value="Mock result")

            mock_llm.return_value = mock_llm_instance

            

            mock_memory_instance = Mock()

            mock_memory_instance.get_research_context = Mock(return_value="")

            mock_memory_instance.store_result = Mock()

            mock_memory.return_value = mock_memory_instance

            

            mock_tools_instance = Mock()

            mock_tools_instance.execute_tool = AsyncMock(return_value={"status": "success"})

            mock_tools.return_value = mock_tools_instance

            

            # Run multiple sessions

            config = {"max_tasks_per_session": 2}

            agent = ResearchAgentApp(config)

            

            for i in range(5):

                await agent.research(f"Test research goal {i}")

                

                # Check memory usage

                current_memory = psutil.Process().memory_info().rss / 1024 / 1024

                memory_increase = current_memory - initial_memory

                

                # Memory increase should be reasonable (less than 100MB per session)

                assert memory_increase < 100 * (i + 1), f"Excessive memory usage: {memory_increase}MB"

    

    @pytest.mark.asyncio

    async def test_concurrent_requests(self):

        """Test handling of concurrent research requests."""

        

        async def mock_research_task(agent, goal):

            """Mock research task that takes some time."""

            await asyncio.sleep(0.1)  # Simulate work

            return f"Research result for: {goal}"

        

        # Mock the research method

        with patch('main_agent.ResearchAgentApp.research', side_effect=mock_research_task):

            agent = Mock()

            

            # Submit concurrent requests

            tasks = []

            for i in range(10):

                task = asyncio.create_task(mock_research_task(agent, f"Goal {i}"))

                tasks.append(task)

            

            start_time = time.time()

            results = await asyncio.gather(*tasks)

            end_time = time.time()

            

            # All tasks should complete

            assert len(results) == 10

            

            # Should complete in reasonable time (concurrent execution)

            assert end_time - start_time < 1.0  # Less than 1 second for 10 concurrent 0.1s tasks

    

    def test_task_execution_performance(self):

        """Test performance of individual task components."""

        

        # Test task creation performance

        start_time = time.time()

        tasks = []

        for i in range(1000):

            task = Task(f"task_{i}", f"Description {i}", "web_search", {"query": f"test {i}"})

            tasks.append(task)

        creation_time = time.time() - start_time

        

        assert creation_time < 1.0  # Should create 1000 tasks in under 1 second

        assert len(tasks) == 1000

        

        # Test task status updates

        start_time = time.time()

        for task in tasks[:100]:  # Test first 100

            task.status = TaskStatus.COMPLETED

            task.result = {"data": "test result"}

        update_time = time.time() - start_time

        

        assert update_time < 0.1  # Should update 100 tasks in under 0.1 seconds



VALIDATION TESTS


# test_validation.py

import pytest

import json

from datetime import datetime


class TestDataValidation:

    """Test data validation and error handling throughout the system."""

    

    def test_task_parameter_validation(self):

        """Test validation of task parameters."""

        

        # Valid task creation

        valid_task = Task(

            id="valid_001",

            description="Valid task description",

            tool_name="web_search",

            parameters={"query": "test", "num_results": 5}

        )

        assert valid_task.id == "valid_001"

        

        # Test with invalid parameters

        with pytest.raises(TypeError):

            Task(

                id=None,  # Should require string ID

                description="Test",

                tool_name="web_search",

                parameters={}

            )

    

    def test_research_goal_validation(self):

        """Test validation of research goals."""

        

        # Valid goals

        valid_goals = [

            "Analyze renewable energy trends in Europe",

            "Research artificial intelligence applications in healthcare",

            "Study consumer behavior in e-commerce"

        ]

        

        for goal in valid_goals:

            assert len(goal) >= 10  # Reasonable minimum length

            assert any(keyword in goal.lower() for keyword in ['analyze', 'research', 'study', 'investigate'])

        

        # Invalid goals

        invalid_goals = [

            "",  # Empty

            "Hi",  # Too short

            "   ",  # Whitespace only

        ]

        

        for goal in invalid_goals:

            assert len(goal.strip()) < 10  # Should be rejected

    

    def test_json_response_validation(self):

        """Test validation of JSON responses from LLM."""

        

        # Valid JSON response

        valid_json = '''

        {

          "strategy": "Test strategy",

          "tasks": [

            {

              "id": "task_001",

              "description": "Test task",

              "tool_name": "web_search",

              "parameters": {"query": "test"},

              "priority": 1,

              "dependencies": []

            }

          ],

          "success_criteria": ["Test criterion"]

        }

        '''

        

        try:

            parsed = json.loads(valid_json)

            assert "strategy" in parsed

            assert "tasks" in parsed

            assert len(parsed["tasks"]) > 0

            assert "id" in parsed["tasks"][0]

        except json.JSONDecodeError:

            pytest.fail("Valid JSON should parse successfully")

        

        # Invalid JSON responses

        invalid_json_responses = [

            '{"incomplete": true',  # Malformed JSON

            'Not JSON at all',  # Not JSON

            '{}',  # Empty object

            '{"tasks": []}',  # Missing required fields

        ]

        

        for invalid_json in invalid_json_responses:

            try:

                parsed = json.loads(invalid_json)

                # Check for required fields

                required_fields = ["strategy", "tasks", "success_criteria"]

                missing_fields = [field for field in required_fields if field not in parsed]

                if missing_fields:

                    assert len(missing_fields) > 0  # Should be missing required fields

            except json.JSONDecodeError:

                assert True  # Expected for malformed JSON


class TestErrorRecovery:

    """Test error recovery and resilience mechanisms."""

    

    @pytest.mark.asyncio

    async def test_tool_failure_recovery(self):

        """Test recovery when tools fail."""

        

        registry = ToolRegistry()

        

        # Create a tool that always fails

        failing_tool = Mock()

        failing_tool.execute = AsyncMock(side_effect=Exception("Simulated tool failure"))

        

        registry.register("failing_tool", failing_tool)

        

        # Test that failure is properly handled

        with pytest.raises(Exception) as exc_info:

            await registry.execute_tool("failing_tool", param="test")

        

        assert "Simulated tool failure" in str(exc_info.value)

        

        # Check that execution history records the failure

        history = registry.get_execution_history()

        assert len(history) > 0

        assert history[-1]["status"] == "failed"

        assert "Simulated tool failure" in history[-1]["error"]

    

    @pytest.mark.asyncio

    async def test_llm_failure_recovery(self):

        """Test recovery when LLM generation fails."""

        

        # Mock LLM that fails

        failing_llm = Mock()

        failing_llm.generate = AsyncMock(side_effect=Exception("LLM generation failed"))

        

        planner = PlanningSystem(failing_llm)

        

        # Should fall back to default plan when LLM fails

        tasks = await planner.create_plan("Test research goal")

        

        # Should return fallback tasks

        assert len(tasks) >= 1

        assert tasks[0].id.startswith("fallback_")

    

    def test_memory_system_resilience(self):

        """Test memory system resilience to corrupted data."""

        

        memory = MemorySystem("test_resilience")

        

        # Test storing various types of data

        test_data = [

            ("normal_task", "Normal research content", {"type": "research"}),

            ("empty_task", "", {"type": "empty"}),

            ("unicode_task", "Unicode content: 中文 español émoji 🔬", {"type": "unicode"}),

            ("large_task", "x" * 10000, {"type": "large"}),  # Large content

        ]

        

        for task_id, content, metadata in test_data:

            try:

                memory.store_result(task_id, content, metadata)

                # If storage succeeds, retrieval should work

                relevant = memory.retrieve_relevant(content[:50] if content else "test", k=1)

                assert isinstance(relevant, list)

            except Exception as e:

                # Log but don't fail - some edge cases may be expected to fail

                print(f"Memory storage failed for {task_id}: {e}")


# Test runner configuration

# pytest.ini

[tool:pytest]

testpaths = tests

python_files = test_*.py

python_classes = Test*

python_functions = test_*

addopts = 

    -v

    --tb=short

    --strict-markers

    --disable-warnings

markers =

    asyncio: marks tests as async (deselect with '-m "not asyncio"')

    integration: marks tests as integration tests

    performance: marks tests as performance tests

    slow: marks tests as slow running


# Test execution script

# run_tests.sh

#!/bin/bash


echo "Running Research Agent Test Suite"

echo "================================="


# Run unit tests

echo "1. Running unit tests..."

python -m pytest test_agent.py -v


# Run integration tests

echo "2. Running integration tests..."

python -m pytest test_integration.py -v -m integration


# Run performance tests (optional, slower)

echo "3. Running performance tests..."

python -m pytest test_performance.py -v -m performance


# Run validation tests

echo "4. Running validation tests..."

python -m pytest test_validation.py -v


# Generate coverage report

echo "5. Generating coverage report..."

python -m pytest --cov=. --cov-report=html --cov-report=term


echo "Test suite completed!"


Code Explanation for Testing:

  • Comprehensive unit tests cover all major components with proper mocking
  • Integration tests verify component interactions with controlled dependencies  
  • Performance tests ensure memory stability and concurrent request handling
  • Validation tests verify data integrity and error handling throughout the system
  • Test configuration includes proper async test support and clear organization
  • Error recovery tests ensure the system gracefully handles component failures
  • The test suite provides confidence in system reliability and performance
  • Coverage reporting helps identify untested code paths for improvement


                         USER GUIDE - AGENTIC AI RESEARCH ASSISTANT

GETTING STARTED


QUICK START (Interactive Mode)


1. Open your terminal/command prompt

2. Navigate to the installation directory

3. Run: python main_agent.py

4. When you see the prompt, type your research goal

5. Press Enter and wait for results


Example:

📝 Research Goal: Analyze renewable energy trends in Europe

🔍 Starting research... (this may take 2-10 minutes)

📊 Research Complete! [Results displayed]


QUICK START (Web Interface)


1. Start the web server: python api_server.py

2. Open your browser to: http://localhost:8000/docs

3. Use the /research endpoint to submit requests

4. Check results with the /research/{task_id} endpoint


HOW TO ASK FOR RESEARCH

BASIC FORMAT

Simply describe what you want to research in natural language.


✅ GOOD EXAMPLES:

• "Analyze the impact of remote work on productivity"

• "Compare electric vehicle adoption rates across different countries"  

• "Study consumer behavior changes during economic inflation"

• "Investigate the effectiveness of meditation for anxiety treatment"


❌ AVOID THESE:

• Too vague: "cars" or "technology stuff"

• Too narrow: "What did John Smith say about EVs on Twitter yesterday?"

• Impossible requests: "Predict the exact stock price of Tesla next week"


ADDING CONTEXT (OPTIONAL)

For better results, you can add specific requirements:


Format: [Main Goal] + [Additional Context]


Examples:

• Goal: "Analyze social media impact on teenagers"

  Context: "Focus on mental health effects, use studies from 2020-2024"


• Goal: "Research sustainable packaging trends"  

  Context: "Need data for business presentation, focus on food industry"


• Goal: "Study cryptocurrency regulation"

  Context: "Compare approaches in US, EU, and Asia"


UNDERSTANDING THE RESEARCH PROCESS

WHAT THE AGENT DOES AUTOMATICALLY

The agent will independently:

• Break your goal into specific research tasks

• Search multiple sources for information

• Analyze documents and data

• Create visualizations when helpful

• Synthesize findings into a comprehensive report

• Adapt its approach based on what it discovers


TYPICAL TIMELINE

• Simple topics: 2-5 minutes

• Complex topics: 5-15 minutes  

• Very comprehensive research: 15-30 minutes


You'll see progress updates like:

Phase 1: Creating research plan...

Phase 2: Executing research tasks...

  Executing task 1: Search for renewable energy statistics...

  Executing task 2: Analyze EU energy policy documents...

Phase 3: Synthesizing results...


WHAT YOU'LL RECEIVE

RESEARCH REPORT FORMAT

Your results will include:


1. EXECUTIVE SUMMARY

   Key findings and main conclusions


2. METHODOLOGY  

   How the research was conducted


3. KEY FINDINGS

   Detailed discoveries organized by theme

   

4. ANALYSIS

   Interpretation of the data and trends

   

5. CONCLUSIONS

   Summary of insights and implications

   

6. RECOMMENDATIONS

   Actionable next steps (when applicable)


ADDITIONAL OUTPUTS

Depending on your topic, you may also get:

• Charts and graphs (saved as PNG files)

• Links to important sources discovered

• Suggestions for follow-up research


TIPS FOR BETTER RESULT

BE SPECIFIC ABOUT WHAT YOU NEED

Instead of: "Research AI"

Try: "Analyze how AI is changing job markets in healthcare"


Instead of: "Look into climate change"  

Try: "Study the effectiveness of carbon pricing policies in reducing emissions"


SPECIFY YOUR PERSPECTIVE

• "From a business strategy perspective..."

• "For academic research purposes..."  

• "To inform policy decisions..."

• "For investment analysis..."


SET BOUNDARIES WHEN NEEDED

• Time period: "...trends since 2020"

• Geography: "...focusing on European markets"  

• Industry: "...in the automotive sector"

• Data type: "...with quantitative evidence"


WORKING WITH RESULTS

FOLLOW-UP RESEARCH

If you want to explore specific aspects deeper:


"The previous research mentioned battery storage challenges. 

Can you investigate battery technology developments in more detail?"


"I'd like more information about the policy recommendations 

from the renewable energy research."


SAVING YOUR WORK

• Reports are automatically saved with timestamps

• Charts are saved in the /charts directory

• You can copy/paste text results into your own documents


COMMON COMMANDS

INTERACTIVE MODE COMMANDS

• help - Show available commands

• stats - Display performance statistics  

• memory - Show system memory status

• quit - Exit the application


WEB API ENDPOINTS

• POST /research - Start new research

• GET /research/{id} - Check research status

• GET /research - List recent research tasks

• GET /health - Check system status

• GET /stats - View system statistics


TROUBLESHOOTING


COMMON ISSUES AND SOLUTIONS


"Research is taking too long"

→ Complex topics need more time. Check the progress messages.

→ You can cancel with Ctrl+C and try a more focused topic.


"Results seem incomplete"  

→ Try being more specific about what you're looking for.

→ Add context about the type of information you need.


"Agent says it can't find information"

→ Topic might be too recent or too specific.

→ Try broadening the scope or using different terminology.


"Getting errors about tools failing"

→ Check your internet connection.

→ Verify API keys are configured correctly.

→ Some external sources might be temporarily unavailable.


OPTIMIZING PERFORMANCE


For faster results:

• Be specific rather than asking broad questions

• Limit scope when possible ("...in the past 2 years")

• Avoid requests that require real-time data


For more comprehensive results:

• Allow more time for complex topics

• Ask follow-up questions to dive deeper

• Request specific types of analysis or visualization


EXAMPLE WORKFLOWS

BUSINESS RESEARCH WORKFLOW

1. "Analyze the market opportunity for sustainable packaging in food industry"

2. Review results, identify promising areas

3. "Research specific sustainable packaging technologies mentioned in the previous analysis"

4. "Compare costs and adoption barriers for biodegradable vs recyclable packaging"


ACADEMIC RESEARCH WORKFLOW

1. "Study the relationship between social media use and academic performance in college students"

2. Examine methodology and findings

3. "Find additional peer-reviewed studies on social media and student focus/attention"

4. "Analyze criticisms and limitations of existing research in this area"


INVESTMENT RESEARCH WORKFLOW

1. "Analyze growth trends in the electric vehicle charging infrastructure market"

2. Review market data and projections

3. "Research key companies and competitive landscape in EV charging"

4. "Investigate regulatory factors affecting EV charging investments"


BEST PRACTICES


DO'S

✅ Start with clear, specific goals

✅ Provide context when you have specific needs

✅ Allow adequate time for comprehensive research

✅ Ask follow-up questions to explore interesting findings

✅ Save important results for future reference


DON'TS  

❌ Don't expect instant results for complex topics

❌ Don't ask for personal/private information about individuals

❌ Don't request illegal or unethical research

❌ Don't assume the agent has access to proprietary databases

❌ Don't interrupt the research process unless necessary


GETTING HELP

If you need assistance:

• Type 'help' in interactive mode for commands

• Check the system logs for error details

• Review this user guide for common solutions

• Ensure all requirements are properly installed


For technical issues:

• Verify Python dependencies are installed

• Check that you have sufficient disk space and memory

• Ensure internet connectivity for web searches

• Confirm API keys are valid (if using external services)


Remember: The AI Research Assistant is designed to be intuitive and helpful. 

Don't hesitate to experiment with different types of research requests to 

discover what works best for your needs!


CONCLUSION

This completes the comprehensive guide for building an agentic AI research 

assistant. The system includes:


CORE CAPABILITIES:

  • Autonomous planning and task execution
  • Local LLM integration with Mistral
  • Advanced memory and context management
  • Modular tool system for extensibility
  • Self-reflection and continuous improvement
  • Performance monitoring and optimization


DEPLOYMENT OPTIONS:

  • FastAPI REST server for web integration
  • Docker containerization for consistent deployment
  • Kubernetes support for scalable production deployment
  • Comprehensive health monitoring and metrics


QUALITY ASSURANCE:

  • Complete test suite covering unit, integration, and performance testing
  • Error recovery and resilience mechanisms
  • Data validation and input sanitization
  • Comprehensive logging and debugging capabilities


The architecture is designed to be production-ready while remaining 

extensible for additional capabilities and use cases. The agent can operate

autonomously while providing transparency and control over its operations.


Key success factors:

1. Modular design enables easy extension and maintenance

2. Comprehensive error handling ensures robust operation

3. Performance monitoring enables optimization and scaling

4. Test coverage provides confidence in system reliability

5. Documentation supports adoption and contribution


This foundation provides everything needed to build sophisticated agentic AI

applications that can operate autonomously while maintaining human oversight

and control.


                         

COMPLETE SETUP GUIDE AGENTIC AI RESEARCH ASSISTANT



OVERVIEW

========


This guide will walk you through setting up the Research Assistant from scratch.

Total setup time: 2-6 hours depending on your experience level and hardware.


PREREQUISITES AND SYSTEM REQUIREMENTS

=====================================


HARDWARE REQUIREMENTS

---------------------

MINIMUM (Basic functionality):

• CPU: Modern 4-core processor (Intel i5/i7, AMD Ryzen 5/7)

• RAM: 16GB (12GB for model + 4GB for system)

• Storage: 25GB free space (20GB for models + 5GB for data)

• Internet: Stable broadband connection for initial setup


RECOMMENDED (Optimal performance):

• CPU: 8+ core processor 

• RAM: 32GB or more

• GPU: NVIDIA GPU with 8GB+ VRAM (RTX 3070, RTX 4060, or better)

• Storage: 50GB+ on SSD

• Internet: Fast broadband for model downloads


SUPPORTED OPERATING SYSTEMS

---------------------------

• Windows 10/11 (64-bit)

• macOS 10.15+ (Intel or Apple Silicon)

• Linux (Ubuntu 20.04+, CentOS 8+, or equivalent)


SOFTWARE PREREQUISITES

----------------------

• Python 3.9, 3.10, or 3.11 (Python 3.12 may have compatibility issues)

• Git (for cloning repositories)

• Text editor or IDE (VS Code, PyCharm, etc.)


STEP 1: ENVIRONMENT SETUP

=========================


INSTALL PYTHON

--------------

Windows:

1. Download Python from https://python.org/downloads/

2. During installation, CHECK "Add Python to PATH"

3. Verify: Open Command Prompt, type: python --version


macOS:

1. Install Homebrew: /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

2. Install Python: brew install python@3.10

3. Verify: python3 --version


Linux (Ubuntu/Debian):

sudo apt update

sudo apt install python3.10 python3.10-pip python3.10-venv

python3.10 --version


CREATE PROJECT DIRECTORY

------------------------

mkdir agentic-research-assistant

cd agentic-research-assistant


CREATE VIRTUAL ENVIRONMENT

--------------------------

# Create virtual environment

python -m venv research_env


# Activate it

# Windows:

research_env\Scripts\activate


# macOS/Linux:

source research_env/bin/activate


# You should see (research_env) in your terminal prompt


STEP 2: CODE SETUP

==================


CREATE PROJECT STRUCTURE

------------------------

mkdir -p src tools tests data charts research_outputs logs config


Your directory should look like:

agentic-research-assistant/

├── src/                    # Main application code

├── tools/                  # Tool implementations  

├── tests/                  # Test files

├── data/                   # Data storage

├── charts/                 # Generated visualizations

├── research_outputs/       # Research results

├── logs/                   # Application logs

├── config/                 # Configuration files

├── requirements.txt        # Python dependencies

└── README.md              # Documentation


CREATE MAIN FILES

-----------------

Create the following files in your project directory:


requirements.txt:

transformers==4.35.0

torch==2.1.0

accelerate==0.24.0

sentence-transformers==2.2.2

chromadb==0.4.15

pydantic==2.5.0

fastapi==0.104.0

uvicorn==0.24.0

requests==2.31.0

beautifulsoup4==4.12.2

matplotlib==3.8.0

plotly==5.17.0

pandas==2.1.0

psutil==5.9.6

pytest==7.4.0

python-dotenv==1.0.0


.env (for environment variables):

# API Keys (get these from respective providers)

SEARCH_API_KEY=your_search_api_key_here

OPENAI_API_KEY=your_openai_key_here


# Model Configuration

MODEL_NAME=mistralai/Mistral-7B-Instruct-v0.2

MAX_TASKS_PER_SESSION=20

ENABLE_CACHING=true


# Directories

OUTPUT_DIRECTORY=./research_outputs

CHARTS_DIRECTORY=./charts

LOGS_DIRECTORY=./logs


config/config.py:

import os

from dotenv import load_dotenv


load_dotenv()


class Config:

    # API Keys

    SEARCH_API_KEY = os.getenv('SEARCH_API_KEY', '')

    OPENAI_API_KEY = os.getenv('OPENAI_API_KEY', '')

    

    # Model Settings

    MODEL_NAME = os.getenv('MODEL_NAME', 'mistralai/Mistral-7B-Instruct-v0.2')

    MAX_TASKS_PER_SESSION = int(os.getenv('MAX_TASKS_PER_SESSION', '20'))

    ENABLE_CACHING = os.getenv('ENABLE_CACHING', 'true').lower() == 'true'

    

    # Directories

    OUTPUT_DIRECTORY = os.getenv('OUTPUT_DIRECTORY', './research_outputs')

    CHARTS_DIRECTORY = os.getenv('CHARTS_DIRECTORY', './charts')

    LOGS_DIRECTORY = os.getenv('LOGS_DIRECTORY', './logs')

    

    # Ensure directories exist

    os.makedirs(OUTPUT_DIRECTORY, exist_ok=True)

    os.makedirs(CHARTS_DIRECTORY, exist_ok=True)

    os.makedirs(LOGS_DIRECTORY, exist_ok=True)


STEP 3: INSTALL DEPENDENCIES

============================


INSTALL PYTHON PACKAGES

-----------------------

# Make sure virtual environment is activated

pip install --upgrade pip


# Install PyTorch (choose based on your system)

# For CPU only:

pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu


# For NVIDIA GPU (CUDA 11.8):

pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118


# For NVIDIA GPU (CUDA 12.1):

pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121


# Install remaining dependencies

pip install -r requirements.txt


VERIFY INSTALLATION

------------------

python -c "import torch; print(f'PyTorch version: {torch.__version__}')"

python -c "import torch; print(f'CUDA available: {torch.cuda.is_available()}')"

python -c "import transformers; print(f'Transformers version: {transformers.__version__}')"


STEP 4: MODEL SETUP

===================


DOWNLOAD MISTRAL MODEL

---------------------

Create download_model.py:


from transformers import AutoTokenizer, AutoModelForCausalLM

import torch

import os


def download_mistral_model():

    model_name = "mistralai/Mistral-7B-Instruct-v0.2"

    cache_dir = "./models"

    

    print(f"Downloading {model_name}...")

    print("This will take 30-60 minutes and use ~7GB of storage.")

    

    try:

        # Download tokenizer

        print("Downloading tokenizer...")

        tokenizer = AutoTokenizer.from_pretrained(

            model_name,

            cache_dir=cache_dir

        )

        

        # Download model

        print("Downloading model...")

        model = AutoModelForCausalLM.from_pretrained(

            model_name,

            cache_dir=cache_dir,

            torch_dtype=torch.float16,

            low_cpu_mem_usage=True

        )

        

        print("✅ Model downloaded successfully!")

        print(f"📁 Cached in: {cache_dir}")

        

        # Test the model

        print("🧪 Testing model...")

        inputs = tokenizer("Hello, I am", return_tensors="pt")

        outputs = model.generate(**inputs, max_length=20)

        response = tokenizer.decode(outputs[0], skip_special_tokens=True)

        print(f"✅ Model test successful: {response}")

        

    except Exception as e:

        print(f"❌ Error downloading model: {e}")

        print("💡 Try running with more RAM or check internet connection")


if __name__ == "__main__":

    download_mistral_model()


Run the download:

python download_model.py


ALTERNATIVE: USE SMALLER MODEL (If you have limited RAM)

-------------------------------------------------------

Edit your .env file:

MODEL_NAME=microsoft/DialoGPT-small


Or for even smaller:

MODEL_NAME=distilgpt2


STEP 5: API CONFIGURATION

=========================


GET SEARCH API KEY

-----------------

You need a web search API. Choose one:


Option 1 - Google Custom Search (Recommended):

1. Go to https://developers.google.com/custom-search/v1/introduction

2. Create a project and enable Custom Search API

3. Create credentials (API key)

4. Create a Custom Search Engine at https://cse.google.com/

5. Get your Search Engine ID


Option 2 - SerpAPI:

1. Go to https://serpapi.com/

2. Sign up for free account (100 searches/month)

3. Get your API key from dashboard


Option 3 - Bing Search API:

1. Go to https://azure.microsoft.com/en-us/services/cognitive-services/bing-web-search-api/

2. Create Azure account

3. Create Bing Search resource

4. Get API key


UPDATE CONFIGURATION

-------------------

Edit your .env file:

SEARCH_API_KEY=your_actual_api_key_here


STEP 6: CREATE MAIN APPLICATION FILES

=====================================


Copy the code files from the previous guide into appropriate directories:


src/agent_core.py          # Core agent classes

src/planning.py            # Planning system

src/tools.py              # Tool implementations

src/memory.py             # Memory system

src/llm_provider.py       # LLM integration

src/main_agent.py         # Main application

src/api_server.py         # Web API (optional)


Make sure to update import statements in each file:

from config.config import Config

from src.agent_core import Task, TaskStatus, AgentState


STEP 7: CREATE STARTUP SCRIPT

=============================


Create run.py in the root directory:


#!/usr/bin/env python3

import asyncio

import sys

import os


# Add src directory to path

sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))


from main_agent import ResearchAgentApp

from config.config import Config


async def main():

    """Main entry point for the research assistant."""

    

    print("🤖 Initializing Agentic AI Research Assistant...")

    print("=" * 60)

    

    # Check configuration

    if not Config.SEARCH_API_KEY:

        print("⚠️  WARNING: No search API key configured")

        print("   Web search functionality will be limited")

        print("   Add SEARCH_API_KEY to your .env file")

    

    try:

        # Initialize the agent

        config = {

            "search_api_key": Config.SEARCH_API_KEY,

            "model_name": Config.MODEL_NAME,

            "max_tasks_per_session": Config.MAX_TASKS_PER_SESSION,

            "output_directory": Config.OUTPUT_DIRECTORY

        }

        

        agent = ResearchAgentApp(config)

        

        print("✅ Agent initialized successfully!")

        print("📚 Ready for research requests")

        print("=" * 60)

        

        # Run in interactive mode

        await agent.interactive_mode()

        

    except Exception as e:

        print(f"❌ Error starting agent: {e}")

        print("\nTroubleshooting tips:")

        print("• Check that all dependencies are installed")

        print("• Verify your API keys are correct") 

        print("• Ensure you have enough RAM/storage")

        print("• Check the logs directory for detailed errors")


if __name__ == "__main__":

    asyncio.run(main())


STEP 8: INITIAL TESTING

=======================


TEST BASIC SETUP

----------------

# Test Python environment

python -c "print('Python environment: OK')"


# Test dependencies

python -c "import torch, transformers, chromadb; print('Dependencies: OK')"


# Test configuration

python -c "from config.config import Config; print(f'Config loaded: {Config.MODEL_NAME}')"


RUN YOUR FIRST RESEARCH

-----------------------

python run.py


You should see:

🤖 Initializing Agentic AI Research Assistant...

================================================================

✅ Agent initialized successfully!

📚 Ready for research requests


📝 Research Goal: _


Try a simple test:

📝 Research Goal: Tell me about renewable energy


STEP 9: TROUBLESHOOTING COMMON ISSUES

=====================================


MEMORY ISSUES

------------

Error: "RuntimeError: CUDA out of memory" or "Unable to allocate memory"


Solutions:

1. Use smaller model:

   MODEL_NAME=distilgpt2


2. Use CPU instead of GPU:

   In llm_provider.py, force CPU:

   device_map="cpu"


3. Increase virtual memory (Windows):

   Control Panel > System > Advanced > Performance Settings > Virtual Memory


MODEL DOWNLOAD ISSUES

--------------------

Error: "ConnectionError" or "403 Forbidden"


Solutions:

1. Check internet connection

2. Try downloading manually:

   python -c "from transformers import AutoTokenizer; AutoTokenizer.from_pretrained('mistralai/Mistral-7B-Instruct-v0.2')"


3. Use Hugging Face login:

   pip install huggingface_hub

   huggingface-cli login


API KEY ISSUES

-------------

Error: "Invalid API key" or "Quota exceeded"


Solutions:

1. Verify API key in .env file

2. Check API key permissions on provider website

3. Monitor usage quotas

4. Use alternative search provider


IMPORT ERRORS

------------

Error: "ModuleNotFoundError"


Solutions:

1. Activate virtual environment:

   source research_env/bin/activate  # Linux/Mac

   research_env\Scripts\activate     # Windows


2. Reinstall dependencies:

   pip install -r requirements.txt


3. Check Python path:

   python -c "import sys; print(sys.path)"


STEP 10: OPTIONAL ENHANCEMENTS

==============================


WEB INTERFACE SETUP

-------------------

To run the web API:

python src/api_server.py


Then visit: http://localhost:8000/docs


DOCKER SETUP (Advanced)

-----------------------

Create Dockerfile:


FROM python:3.10-slim


WORKDIR /app


# Install system dependencies

RUN apt-update && apt-get install -y git curl


# Copy requirements and install Python dependencies

COPY requirements.txt .

RUN pip install -r requirements.txt


# Copy application

COPY . .


# Download model during build (optional)

RUN python download_model.py


EXPOSE 8000


CMD ["python", "run.py"]


Build and run:

docker build -t research-agent .

docker run -p 8000:8000 research-agent


MONITORING SETUP

---------------

Install monitoring tools:

pip install prometheus_client grafana-api


Add to your .env:

ENABLE_MONITORING=true

PROMETHEUS_PORT=9090


PERFORMANCE OPTIMIZATION

-----------------------

For better performance:


1. Use SSD storage for model cache

2. Increase swap file size

3. Close other applications while running

4. Use GPU if available

5. Consider model quantization:

   pip install bitsandbytes


BACKUP AND SECURITY

------------------

1. Backup your .env file (without committing to git)

2. Regularly backup research_outputs directory

3. Use strong API keys and rotate them periodically

4. Monitor API usage to avoid unexpected charges


STEP 11: VERIFICATION CHECKLIST

===============================


Before considering setup complete, verify:


□ Python 3.9+ installed and virtual environment active

□ All dependencies installed without errors

□ Model downloaded and cached locally

□ API keys configured and working

□ Directory structure created correctly

□ Basic "Hello World" research query works

□ Charts and outputs are generated correctly

□ No critical errors in logs

□ System performance is acceptable


NEXT STEPS

==========


Once setup is complete:


1. Read the User Guide for effective usage

2. Try different types of research queries

3. Experiment with the API interface

4. Set up monitoring and backups

5. Consider customizing tools for your specific needs


GETTING HELP

============


If you encounter issues:


1. Check the logs directory for detailed error messages

2. Review this setup guide for missed steps

3. Search online for specific error messages

4. Consider using cloud-based alternatives if local setup is problematic

5. Start with a simpler configuration and gradually add complexity


Remember: This is a complex system with many dependencies. Don't be discouraged if setup takes longer than expected - this is normal for advanced AI applications!

No comments: