Introduction
The integration of Large Language Models (LLMs) into research workflows has revolutionized how we approach information discovery and synthesis. An LLM-based research agent represents a sophisticated application that can understand user queries, autonomously gather relevant information from diverse sources, critically evaluate this information, and present it in a coherent manner with proper citations. This article provides a detailed guide for software engineers on how to build such a system, covering both theoretical concepts and practical implementation details.
A research agent differs from a standard chatbot in its ability to act autonomously on behalf of the user, making decisions about what information to retrieve, how to validate it, and how to present findings in a useful way. The agentic nature of these systems allows them to perform complex sequences of actions without constant user intervention, making them particularly valuable for research tasks that require exploring multiple sources and synthesizing diverse information.
System Architecture Overview
The architecture of an LLM-based research agent consists of several interconnected components that work together to process user queries, retrieve information, validate sources, and generate responses. At its core, the system uses one or more LLMs to understand user intent, generate search queries, evaluate information relevance, and synthesize findings.
The research agent operates in a cycle that begins with receiving a user query, proceeds through information gathering and evaluation, and concludes with presenting findings to the user. The user can then refine their query based on the initial results, initiating another cycle of research. This iterative approach allows for progressive refinement of the research focus and ensures that the information provided aligns with the user's needs.
Core Components
User Interface
The user interface serves as the entry point for interactions with the research agent. We can implement this interface in two primary ways: as a text-based chatbot or as a web application. The text-based interface is simpler to implement and can be integrated into existing command-line tools or messaging platforms. The web-based interface offers more flexibility in terms of displaying information, managing interactions, and integrating with other web services.
For a text-based interface, we might use a simple command-line application that processes user input and displays responses. This approach is straightforward but limits the ways in which information can be presented. A web-based interface, on the other hand, can leverage HTML, CSS, and JavaScript to create a more interactive and visually appealing experience. This might include features such as collapsible sections for different sources, interactive citations, and the ability to save or export research findings.
Let's look at a simple example of how we might implement a basic text-based interface using Python:
import readline # For better input handling with command history
class ResearchAgentCLI:
def __init__(self, research_agent):
self.research_agent = research_agent
self.history = []
def start_session(self):
print("Welcome to the Research Agent. Enter your research question or type 'exit' to quit.")
while True:
user_input = input("\nResearch Question: ")
if user_input.lower() == 'exit':
print("Thank you for using the Research Agent. Goodbye!")
break
# Process the user query through the research agent
response = self.research_agent.process_query(user_input)
# Display the response
print("\n" + "="*80)
print("RESEARCH FINDINGS:")
print(response['findings'])
print("\nSOURCES:")
for i, source in enumerate(response['sources'], 1):
print(f"{i}. {source['title']} - {source['authors']} ({source['year']})")
print(f" URL: {source['url']}")
print("="*80)
# Store the interaction in history
self.history.append({
'query': user_input,
'response': response
})
print("\nWould you like to refine your query based on these findings? (yes/no)")
refine = input().lower()
if refine == 'yes':
print("Please enter your refined query:")
else:
print("You can ask a new research question or type 'exit' to quit.")
# Usage example
if __name__ == "__main__":
from research_agent import ResearchAgent
agent = ResearchAgent()
cli = ResearchAgentCLI(agent)
cli.start_session()
This code creates a simple command-line interface for interacting with the research agent. It initializes a session, takes user input, processes it through the research agent, and displays the findings and sources. It also provides an option for the user to refine their query based on the initial results.
For a web-based interface, we might use a framework like Flask or FastAPI to create a backend API that communicates with the research agent, and a frontend built with HTML, CSS, and JavaScript (possibly using a framework like React or Vue.js) to handle user interactions. Here's a simplified example of how we might implement a Flask-based API for our research agent:
from flask import Flask, request, jsonify
from research_agent import ResearchAgent
app = Flask(__name__)
research_agent = ResearchAgent()
@app.route('/api/research', methods=['POST'])
def conduct_research():
data = request.json
user_query = data.get('query', '')
if not user_query:
return jsonify({'error': 'No query provided'}), 400
# Process the query through the research agent
response = research_agent.process_query(user_query)
return jsonify(response)
@app.route('/api/refine', methods=['POST'])
def refine_research():
data = request.json
original_query = data.get('original_query', '')
refined_query = data.get('refined_query', '')
previous_findings = data.get('previous_findings', {})
if not refined_query:
return jsonify({'error': 'No refined query provided'}), 400
# Process the refined query, potentially using previous findings
response = research_agent.refine_query(original_query, refined_query, previous_findings)
return jsonify(response)
if __name__ == '__main__':
app.run(debug=True)
This Flask application creates two API endpoints: one for conducting initial research based on a user query, and another for refining research based on previous findings. The frontend would make HTTP requests to these endpoints and display the results to the user.
Query Processing
The query processing component is responsible for understanding the user's research question and transforming it into a format that can be used for information retrieval. This involves several steps, including parsing the user's input, identifying key concepts and terms, and generating search queries for different information sources.
LLMs are particularly well-suited for this task because they can understand natural language queries and extract relevant information. By prompting the LLM with the user's query and instructions on how to process it, we can generate effective search queries that capture the essence of the research question.
Here's an example of how we might implement query processing using an LLM:
import openai
import os
from typing import List, Dict, Any
class QueryProcessor:
def __init__(self, api_key=None):
# Use provided API key or get from environment variable
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
openai.api_key = self.api_key
def process_query(self, user_query: str) -> Dict[str, Any]:
"""
Process a user query to extract key concepts and generate search queries.
Args:
user_query: The research question or description from the user
Returns:
A dictionary containing processed query information
"""
# Construct a prompt for the LLM
prompt = f"""
I need to conduct research on the following topic:
"{user_query}"
Please help me by:
1. Identifying the main research question
2. Extracting key concepts and terms
3. Generating 3-5 effective search queries for academic databases
4. Suggesting specific academic databases or sources that might be relevant
Format your response as a structured JSON object.
"""
# Call the LLM API
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a research assistant helping to process research queries."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
# Parse the response
try:
result = json.loads(response.choices[0].message.content)
return result
except json.JSONDecodeError:
# Fallback if the response is not valid JSON
return {
"main_question": user_query,
"key_concepts": [],
"search_queries": [user_query],
"suggested_sources": ["Google Scholar", "PubMed", "arXiv"]
}
def refine_query(self, original_query: str, refined_query: str, previous_results: Dict[str, Any]) -> Dict[str, Any]:
"""
Refine a query based on user feedback and previous results.
Args:
original_query: The original research question
refined_query: The user's refined question or feedback
previous_results: Results from the previous search
Returns:
A dictionary containing the refined query information
"""
# Construct a prompt for the LLM
prompt = f"""
I previously researched:
"{original_query}"
Based on the initial findings, I want to refine my research with this follow-up:
"{refined_query}"
Previous findings summary:
{previous_results.get('summary', 'No summary available')}
Please help me refine my research by:
1. Identifying the new focus or direction
2. Extracting additional key concepts and terms
3. Generating 3-5 refined search queries
4. Suggesting specific sources to focus on or exclude
Format your response as a structured JSON object.
"""
# Call the LLM API
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a research assistant helping to refine research queries."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
# Parse the response
try:
result = json.loads(response.choices[0].message.content)
return result
except json.JSONDecodeError:
# Fallback if the response is not valid JSON
return {
"new_focus": refined_query,
"additional_concepts": [],
"refined_queries": [refined_query],
"source_recommendations": []
}
This QueryProcessor class uses the OpenAI API to process user queries and generate structured information that can be used for information retrieval. It includes methods for processing initial queries and refining queries based on user feedback and previous results. The LLM is prompted to identify key concepts, generate search queries, and suggest relevant sources, with the results returned in a structured format that can be used by other components of the system.
Information Retrieval System
The information retrieval component is responsible for gathering relevant information from various sources based on the processed query. This includes searching the internet, academic databases, and other repositories for documents, papers, and articles that might contain information relevant to the user's research question.
To implement this component, we need to integrate with various APIs and services that provide access to different information sources. This might include general search engines like Google, academic databases like PubMed or arXiv, and specialized repositories for specific domains.
Here's an example of how we might implement a modular information retrieval system that can search multiple sources:
import requests
import json
import os
import time
from typing import List, Dict, Any
from bs4 import BeautifulSoup
import arxiv
import scholarly
class InformationRetriever:
def __init__(self, api_keys=None):
"""
Initialize the information retriever with API keys for various services.
Args:
api_keys: Dictionary of API keys for different services
"""
self.api_keys = api_keys or {}
self.serp_api_key = self.api_keys.get('serp_api') or os.getenv("SERP_API_KEY")
def search_web(self, query: str, num_results: int = 10) -> List[Dict[str, Any]]:
"""
Search the web using SerpAPI (Google Search API wrapper).
Args:
query: The search query
num_results: Number of results to return
Returns:
A list of search results with title, snippet, and URL
"""
if not self.serp_api_key:
print("Warning: No SerpAPI key provided. Web search disabled.")
return []
url = "https://serpapi.com/search"
params = {
"q": query,
"api_key": self.serp_api_key,
"engine": "google",
"num": num_results
}
try:
response = requests.get(url, params=params)
data = response.json()
results = []
for item in data.get('organic_results', []):
results.append({
'title': item.get('title', ''),
'snippet': item.get('snippet', ''),
'url': item.get('link', ''),
'source_type': 'web'
})
return results
except Exception as e:
print(f"Error searching the web: {e}")
return []
def search_arxiv(self, query: str, max_results: int = 10) -> List[Dict[str, Any]]:
"""
Search arXiv for academic papers.
Args:
query: The search query
max_results: Maximum number of results to return
Returns:
A list of papers with title, authors, abstract, and URL
"""
try:
search = arxiv.Search(
query=query,
max_results=max_results,
sort_by=arxiv.SortCriterion.Relevance
)
results = []
for paper in search.results():
results.append({
'title': paper.title,
'authors': [author for author in paper.authors],
'abstract': paper.summary,
'url': paper.pdf_url,
'published': paper.published.strftime('%Y-%m-%d'),
'source_type': 'arxiv'
})
return results
except Exception as e:
print(f"Error searching arXiv: {e}")
return []
def search_google_scholar(self, query: str, max_results: int = 10) -> List[Dict[str, Any]]:
"""
Search Google Scholar for academic papers.
Args:
query: The search query
max_results: Maximum number of results to return
Returns:
A list of papers with title, authors, abstract, and URL
"""
try:
search_query = scholarly.search_pubs(query)
results = []
count = 0
for paper in search_query:
if count >= max_results:
break
# Get detailed information
try:
detailed_paper = scholarly.fill(paper)
results.append({
'title': detailed_paper.bib.get('title', ''),
'authors': detailed_paper.bib.get('author', []),
'abstract': detailed_paper.bib.get('abstract', ''),
'url': detailed_paper.bib.get('url', ''),
'year': detailed_paper.bib.get('year', ''),
'citations': detailed_paper.citedby,
'source_type': 'google_scholar'
})
count += 1
except Exception as inner_e:
print(f"Error retrieving details for a Google Scholar paper: {inner_e}")
# Avoid rate limiting
time.sleep(1)
return results
except Exception as e:
print(f"Error searching Google Scholar: {e}")
return []
def retrieve_information(self, queries: List[str], sources: List[str] = None) -> Dict[str, List[Dict[str, Any]]]:
"""
Retrieve information from multiple sources based on a list of queries.
Args:
queries: List of search queries
sources: List of sources to search (defaults to all available)
Returns:
A dictionary mapping source names to lists of results
"""
if sources is None:
sources = ['web', 'arxiv', 'google_scholar']
results = {source: [] for source in sources}
for query in queries:
if 'web' in sources:
web_results = self.search_web(query)
results['web'].extend(web_results)
if 'arxiv' in sources:
arxiv_results = self.search_arxiv(query)
results['arxiv'].extend(arxiv_results)
if 'google_scholar' in sources:
scholar_results = self.search_google_scholar(query)
results['google_scholar'].extend(scholar_results)
# Remove duplicates (based on URL)
for source in results:
unique_urls = set()
unique_results = []
for result in results[source]:
url = result.get('url', '')
if url and url not in unique_urls:
unique_urls.add(url)
unique_results.append(result)
results[source] = unique_results
return results
This InformationRetriever class provides methods for searching different sources of information, including the web (using SerpAPI), arXiv, and Google Scholar. It includes a method for retrieving information from multiple sources based on a list of queries, with results organized by source. The class handles error cases and rate limiting to ensure reliable operation.
Source Validation and Reference Management
The source validation and reference management component is responsible for evaluating the credibility and relevance of the information retrieved, and for managing citations and references. This is a critical component for ensuring that the research agent provides reliable and verifiable information.
To implement this component, we need to develop methods for assessing the quality of sources, extracting citation information, and formatting references according to standard citation styles. We can use LLMs to assist with these tasks, particularly for evaluating the relevance and credibility of sources.
Here's an example of how we might implement this component:
import openai
import os
import json
from typing import List, Dict, Any
from datetime import datetime
class SourceValidator:
def __init__(self, api_key=None):
"""
Initialize the source validator with an API key for the LLM service.
Args:
api_key: API key for the LLM service
"""
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
openai.api_key = self.api_key
def validate_sources(self, sources: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
"""
Validate sources for relevance and credibility.
Args:
sources: List of sources to validate
query: The original research query
Returns:
A list of validated sources with additional metadata
"""
validated_sources = []
for source in sources:
# Skip if there's not enough information to validate
if not source.get('title') or not source.get('url'):
continue
# Prepare source information for validation
source_info = {
'title': source.get('title', ''),
'authors': source.get('authors', []),
'abstract': source.get('abstract', source.get('snippet', '')),
'url': source.get('url', ''),
'published_date': source.get('published', source.get('year', '')),
'source_type': source.get('source_type', 'unknown')
}
# Validate the source using the LLM
validation_result = self._validate_with_llm(source_info, query)
# Combine original source info with validation results
validated_source = {**source, **validation_result}
validated_sources.append(validated_source)
# Sort sources by relevance score
validated_sources.sort(key=lambda x: x.get('relevance_score', 0), reverse=True)
return validated_sources
def _validate_with_llm(self, source_info: Dict[str, Any], query: str) -> Dict[str, Any]:
"""
Use an LLM to validate a source for relevance and credibility.
Args:
source_info: Information about the source
query: The original research query
Returns:
A dictionary with validation results
"""
# Construct a prompt for the LLM
prompt = f"""
I'm researching: "{query}"
Please evaluate the following source for relevance and credibility:
Title: {source_info['title']}
Authors: {', '.join(source_info['authors']) if isinstance(source_info['authors'], list) else source_info['authors']}
Abstract/Snippet: {source_info['abstract']}
URL: {source_info['url']}
Published: {source_info['published_date']}
Source Type: {source_info['source_type']}
Please provide:
1. A relevance score (0-100) indicating how relevant this source is to my research query
2. A credibility score (0-100) based on the source type, authors, publication venue, etc.
3. A brief explanation of your evaluation
4. Key points from this source that are relevant to my research
Format your response as a structured JSON object.
"""
try:
# Call the LLM API
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a research assistant evaluating sources for relevance and credibility."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
# Parse the response
result = json.loads(response.choices[0].message.content)
# Add a timestamp for the validation
result['validated_at'] = datetime.now().isoformat()
return result
except Exception as e:
print(f"Error validating source with LLM: {e}")
# Return default values if validation fails
return {
'relevance_score': 50,
'credibility_score': 50,
'evaluation': "Automated validation failed. Please evaluate this source manually.",
'key_points': [],
'validated_at': datetime.now().isoformat()
}
class ReferenceManager:
def __init__(self):
"""Initialize the reference manager."""
pass
def format_reference(self, source: Dict[str, Any], style: str = 'apa') -> str:
"""
Format a source as a reference according to the specified citation style.
Args:
source: Source information
style: Citation style (apa, mla, chicago, etc.)
Returns:
A formatted reference string
"""
if style.lower() == 'apa':
return self._format_apa(source)
elif style.lower() == 'mla':
return self._format_mla(source)
elif style.lower() == 'chicago':
return self._format_chicago(source)
else:
return self._format_apa(source) # Default to APA
def _format_apa(self, source: Dict[str, Any]) -> str:
"""Format a reference in APA style."""
authors = source.get('authors', [])
if isinstance(authors, list):
if len(authors) == 0:
author_str = "No author"
elif len(authors) == 1:
author_str = authors[0]
elif len(authors) < 8:
author_str = ", ".join(authors[:-1]) + ", & " + authors[-1]
else:
author_str = ", ".join(authors[:6]) + ", ... " + authors[-1]
else:
author_str = authors
year = source.get('year', source.get('published', 'n.d.'))
if isinstance(year, str) and len(year) >= 4:
year = year[:4] # Extract just the year
title = source.get('title', 'Untitled')
url = source.get('url', '')
# Format based on source type
source_type = source.get('source_type', '').lower()
if source_type == 'arxiv' or source_type == 'google_scholar':
# Format as a journal article or preprint
return f"{author_str}. ({year}). {title}. Retrieved from {url}"
else:
# Format as a web page
return f"{author_str}. ({year}). {title}. Retrieved from {url}"
def _format_mla(self, source: Dict[str, Any]) -> str:
"""Format a reference in MLA style."""
# Implementation similar to _format_apa but following MLA guidelines
# This is a simplified version
authors = source.get('authors', [])
if isinstance(authors, list):
if len(authors) == 0:
author_str = "No author"
elif len(authors) == 1:
author_str = authors[0]
elif len(authors) == 2:
author_str = authors[0] + " and " + authors[1]
else:
author_str = authors[0] + ", et al."
else:
author_str = authors
title = source.get('title', 'Untitled')
url = source.get('url', '')
# Format based on source type
return f"{author_str}. \"{title}.\" {url}. Accessed {datetime.now().strftime('%d %b. %Y')}."
def _format_chicago(self, source: Dict[str, Any]) -> str:
"""Format a reference in Chicago style."""
# Implementation similar to _format_apa but following Chicago guidelines
# This is a simplified version
authors = source.get('authors', [])
if isinstance(authors, list):
if len(authors) == 0:
author_str = "No author"
elif len(authors) == 1:
author_str = authors[0]
else:
author_str = authors[0] + ", et al."
else:
author_str = authors
year = source.get('year', source.get('published', 'n.d.'))
if isinstance(year, str) and len(year) >= 4:
year = year[:4] # Extract just the year
title = source.get('title', 'Untitled')
url = source.get('url', '')
return f"{author_str}. {year}. \"{title}.\" {url}."
This code includes two classes: SourceValidator and ReferenceManager. The SourceValidator class uses an LLM to evaluate sources for relevance and credibility, providing scores and explanations for each source. The ReferenceManager class formats references according to different citation styles, including APA, MLA, and Chicago. Together, these classes ensure that the research agent provides reliable information with proper citations.
Response Generation
The response generation component is responsible for synthesizing the information retrieved and validated into a coherent response that addresses the user's research question. This involves summarizing the findings, highlighting key points, and presenting the information in a clear and organized manner.
LLMs are particularly well-suited for this task because they can understand and generate natural language text. By providing the LLM with the validated information and instructions on how to synthesize it, we can generate comprehensive and coherent responses.
Here's an example of how we might implement this component:
import openai
import os
import json
from typing import List, Dict, Any
class ResponseGenerator:
def __init__(self, api_key=None):
"""
Initialize the response generator with an API key for the LLM service.
Args:
api_key: API key for the LLM service
"""
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
openai.api_key = self.api_key
def generate_response(self, query: str, validated_sources: List[Dict[str, Any]],
reference_style: str = 'apa') -> Dict[str, Any]:
"""
Generate a comprehensive response based on validated sources.
Args:
query: The original research query
validated_sources: List of validated sources
reference_style: Citation style for references
Returns:
A dictionary containing the response and metadata
"""
# Filter sources by relevance
relevant_sources = [s for s in validated_sources if s.get('relevance_score', 0) > 50]
# If no relevant sources, return a message indicating this
if not relevant_sources:
return {
'findings': "I couldn't find any relevant information for your query. Please try refining your question or exploring different keywords.",
'sources': [],
'summary': "No relevant information found."
}
# Prepare source information for the LLM
source_info = []
for i, source in enumerate(relevant_sources[:10]): # Limit to top 10 sources
source_info.append({
'id': i + 1,
'title': source.get('title', ''),
'authors': source.get('authors', []),
'key_points': source.get('key_points', []),
'relevance_score': source.get('relevance_score', 0),
'credibility_score': source.get('credibility_score', 0),
'evaluation': source.get('evaluation', '')
})
# Generate a response using the LLM
response = self._synthesize_with_llm(query, source_info)
# Format references for the sources
formatted_sources = []
for i, source in enumerate(relevant_sources[:10]):
reference_id = f"[{i+1}]"
formatted_reference = self._format_reference(source, reference_style)
formatted_sources.append({
'id': reference_id,
'reference': formatted_reference,
'title': source.get('title', ''),
'authors': source.get('authors', []),
'url': source.get('url', ''),
'year': source.get('year', source.get('published', '')),
'relevance_score': source.get('relevance_score', 0),
'credibility_score': source.get('credibility_score', 0)
})
# Return the complete response
return {
'findings': response['findings'],
'sources': formatted_sources,
'summary': response['summary'],
'limitations': response.get('limitations', 'No specific limitations noted.')
}
def _synthesize_with_llm(self, query: str, source_info: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Use an LLM to synthesize information from multiple sources.
Args:
query: The original research query
source_info: Information about relevant sources
Returns:
A dictionary with the synthesized findings and metadata
"""
# Construct a prompt for the LLM
sources_text = "\n\n".join([
f"Source {s['id']}:\nTitle: {s['title']}\nAuthors: {', '.join(s['authors']) if isinstance(s['authors'], list) else s['authors']}\nKey Points: {'; '.join(s['key_points'])}\nRelevance: {s['relevance_score']}/100, Credibility: {s['credibility_score']}/100"
for s in source_info
])
prompt = f"""
I'm researching: "{query}"
I've found the following relevant sources:
{sources_text}
Please synthesize this information into a comprehensive response that addresses my research question. Your response should:
1. Provide a clear and detailed answer to my question based on the sources
2. Highlight key findings and insights
3. Note areas of consensus and disagreement among sources
4. Identify any limitations or gaps in the available information
5. Include in-text citations using the format [Source ID] (e.g., [1], [2], etc.)
Also provide:
- A brief summary (2-3 sentences) of the overall findings
- Any limitations of the current research
Format your response as a structured JSON object with 'findings', 'summary', and 'limitations' fields.
"""
try:
# Call the LLM API
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a research assistant synthesizing information from multiple sources."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
# Parse the response
result = json.loads(response.choices[0].message.content)
return result
except Exception as e:
print(f"Error synthesizing response with LLM: {e}")
# Return a default response if synthesis fails
return {
'findings': "I encountered an error while synthesizing the information. Here are the key points from the most relevant sources:\n\n" +
"\n\n".join([f"Source {s['id']}: {s['title']}\n" +
"Key Points: " + "; ".join(s['key_points']) for s in source_info]),
'summary': "Synthesis failed. Please review the key points from each source.",
'limitations': "The automated synthesis process encountered an error."
}
def _format_reference(self, source: Dict[str, Any], style: str) -> str:
"""
Format a reference according to the specified citation style.
Args:
source: Source information
style: Citation style
Returns:
A formatted reference string
"""
# This is a simplified version - in a real implementation, you would use a more robust
# reference formatting system or library
authors = source.get('authors', [])
if isinstance(authors, list):
if len(authors) == 0:
author_str = "No author"
elif len(authors) == 1:
author_str = authors[0]
elif len(authors) < 8:
author_str = ", ".join(authors[:-1]) + ", & " + authors[-1]
else:
author_str = ", ".join(authors[:6]) + ", ... " + authors[-1]
else:
author_str = authors
year = source.get('year', source.get('published', 'n.d.'))
if isinstance(year, str) and len(year) >= 4:
year = year[:4] # Extract just the year
title = source.get('title', 'Untitled')
url = source.get('url', '')
if style.lower() == 'apa':
return f"{author_str}. ({year}). {title}. Retrieved from {url}"
elif style.lower() == 'mla':
return f"{author_str}. \"{title}.\" {url}. Accessed {datetime.now().strftime('%d %b. %Y')}."
elif style.lower() == 'chicago':
return f"{author_str}. {year}. \"{title}.\" {url}."
else:
return f"{author_str}. ({year}). {title}. {url}"
This ResponseGenerator class uses an LLM to synthesize information from multiple validated sources into a coherent response. It includes methods for generating the response and formatting references according to different citation styles. The response includes the synthesized findings, a summary, and information about any limitations or gaps in the available information.
Iterative Refinement Mechanism
The iterative refinement mechanism allows users to refine their research questions based on the initial findings. This is a critical feature for a research agent because it enables a more interactive and dynamic research process, where the user can progressively narrow down their focus or explore different aspects of their topic.
To implement this mechanism, we need to develop methods for processing refined queries and incorporating previous findings into the new search. This involves maintaining a history of interactions and using this history to inform subsequent searches.
Here's an example of how we might implement this component:
import json
from typing import List, Dict, Any
from datetime import datetime
class ResearchSession:
def __init__(self, session_id=None):
"""
Initialize a research session.
Args:
session_id: Unique identifier for the session
"""
self.session_id = session_id or datetime.now().strftime("%Y%m%d%H%M%S")
self.interactions = []
self.current_query = None
self.current_findings = None
def add_interaction(self, query: str, findings: Dict[str, Any]) -> None:
"""
Add an interaction to the session history.
Args:
query: The research query
findings: The findings returned for the query
"""
interaction = {
'timestamp': datetime.now().isoformat(),
'query': query,
'findings': findings
}
self.interactions.append(interaction)
self.current_query = query
self.current_findings = findings
def get_session_history(self) -> List[Dict[str, Any]]:
"""
Get the session history.
Returns:
A list of interactions in the session
"""
return self.interactions
def get_latest_interaction(self) -> Dict[str, Any]:
"""
Get the latest interaction in the session.
Returns:
The latest interaction or None if there are no interactions
"""
if self.interactions:
return self.interactions[-1]
return None
def save_session(self, file_path: str) -> None:
"""
Save the session to a file.
Args:
file_path: Path to the file
"""
session_data = {
'session_id': self.session_id,
'interactions': self.interactions,
'created_at': self.interactions[0]['timestamp'] if self.interactions else datetime.now().isoformat(),
'last_updated': datetime.now().isoformat()
}
with open(file_path, 'w') as f:
json.dump(session_data, f, indent=2)
@classmethod
def load_session(cls, file_path: str) -> 'ResearchSession':
"""
Load a session from a file.
Args:
file_path: Path to the file
Returns:
A ResearchSession object
"""
with open(file_path, 'r') as f:
session_data = json.load(f)
session = cls(session_id=session_data.get('session_id'))
session.interactions = session_data.get('interactions', [])
if session.interactions:
session.current_query = session.interactions[-1]['query']
session.current_findings = session.interactions[-1]['findings']
return session
class RefinementManager:
def __init__(self, api_key=None):
"""
Initialize the refinement manager with an API key for the LLM service.
Args:
api_key: API key for the LLM service
"""
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
openai.api_key = self.api_key
def process_refinement(self, session: ResearchSession, refined_query: str) -> Dict[str, Any]:
"""
Process a refined query based on the session history.
Args:
session: The research session
refined_query: The refined query from the user
Returns:
A dictionary with information for the refined search
"""
# Get the latest interaction
latest = session.get_latest_interaction()
if not latest:
# If there's no previous interaction, treat this as a new query
return {
'query': refined_query,
'search_queries': [refined_query],
'suggested_sources': ["Google Scholar", "PubMed", "arXiv"],
'is_refinement': False
}
# Use the LLM to analyze the refinement
refinement_info = self._analyze_refinement(latest['query'], refined_query, latest['findings'])
return {
'original_query': latest['query'],
'refined_query': refined_query,
'search_queries': refinement_info.get('search_queries', [refined_query]),
'suggested_sources': refinement_info.get('suggested_sources', []),
'focus_areas': refinement_info.get('focus_areas', []),
'exclude_areas': refinement_info.get('exclude_areas', []),
'is_refinement': True
}
def _analyze_refinement(self, original_query: str, refined_query: str,
previous_findings: Dict[str, Any]) -> Dict[str, Any]:
"""
Use an LLM to analyze how a query has been refined.
Args:
original_query: The original research query
refined_query: The refined query from the user
previous_findings: Findings from the previous search
Returns:
A dictionary with analysis of the refinement
"""
# Construct a prompt for the LLM
prompt = f"""
Original Research Question: "{original_query}"
Refined Research Question: "{refined_query}"
Previous Findings Summary:
{previous_findings.get('summary', 'No summary available')}
Please analyze how the research question has been refined and provide:
1. 3-5 specific search queries that would help address the refined question
2. Suggested sources that would be most relevant for this refinement
3. Specific areas or aspects to focus on
4. Areas or aspects from the original search that should be excluded
Format your response as a structured JSON object.
"""
try:
# Call the LLM API
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a research assistant helping to refine research queries."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)
# Parse the response
result = json.loads(response.choices[0].message.content)
return result
except Exception as e:
print(f"Error analyzing refinement with LLM: {e}")
# Return default values if analysis fails
return {
'search_queries': [refined_query],
'suggested_sources': ["Google Scholar", "PubMed", "arXiv"],
'focus_areas': [],
'exclude_areas': []
}
This code includes two classes: ResearchSession and RefinementManager. The ResearchSession class manages the history of interactions in a research session, including queries and findings. The RefinementManager class processes refined queries based on the session history, using an LLM to analyze how the query has been refined and to generate appropriate search queries and suggestions for the refined search.
Technical Implementation
LLM Selection and Integration
The choice of LLM is a critical decision in building a research agent. There are two main categories to consider: commercial LLMs provided as services (like OpenAI's GPT-4, Anthropic's Claude, or Google's Gemini) and local LLMs that can be run on your own hardware (like Llama 2, Mistral, or Falcon).
Commercial LLMs typically offer better performance and are easier to integrate, but they come with costs, potential privacy concerns, and dependency on external services. Local LLMs provide more control, privacy, and potentially lower costs for high-volume usage, but they require more technical expertise to set up and may have lower performance, especially on consumer hardware.
For a research agent that needs to process and synthesize complex information, a more capable model is generally preferable. However, the choice depends on your specific requirements, budget, and technical constraints.
Here's an example of how we might implement a modular LLM interface that can work with different models:
import os
import json
from typing import List, Dict, Any, Union, Optional
import openai
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
class LLMInterface:
def __init__(self, model_type: str = 'openai', model_name: str = 'gpt-4',
api_key: str = None, device: str = None):
"""
Initialize the LLM interface.
Args:
model_type: Type of LLM ('openai', 'local', etc.)
model_name: Name of the model
api_key: API key for commercial LLMs
device: Device to run local models on ('cuda', 'rocm', 'mps', 'cpu')
"""
self.model_type = model_type.lower()
self.model_name = model_name
# Set up based on model type
if self.model_type == 'openai':
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
openai.api_key = self.api_key
self.model = None
self.tokenizer = None
elif self.model_type == 'local':
# Determine device for local models
if device:
self.device = device
else:
if torch.cuda.is_available():
self.device = 'cuda'
elif hasattr(torch, 'has_mps') and torch.has_mps:
self.device = 'mps'
elif hasattr(torch, 'has_rocm') and torch.has_rocm:
self.device = 'rocm'
else:
self.device = 'cpu'
print(f"Loading local model {model_name} on {self.device}...")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if self.device != 'cpu' else torch.float32,
device_map=self.device
)
else:
raise ValueError(f"Unsupported model type: {model_type}")
def generate(self, prompt: str, system_message: str = None,
max_tokens: int = 1000, temperature: float = 0.7,
json_format: bool = False) -> Dict[str, Any]:
"""
Generate a response from the LLM.
Args:
prompt: The prompt to send to the LLM
system_message: System message for models that support it
max_tokens: Maximum number of tokens to generate
temperature: Temperature for generation
json_format: Whether to request JSON format output
Returns:
A dictionary with the generated text and metadata
"""
if self.model_type == 'openai':
return self._generate_openai(prompt, system_message, max_tokens, temperature, json_format)
elif self.model_type == 'local':
return self._generate_local(prompt, system_message, max_tokens, temperature)
else:
raise ValueError(f"Unsupported model type: {self.model_type}")
def _generate_openai(self, prompt: str, system_message: str = None,
max_tokens: int = 1000, temperature: float = 0.7,
json_format: bool = False) -> Dict[str, Any]:
"""Generate text using OpenAI API."""
messages = []
if system_message:
messages.append({"role": "system", "content": system_message})
messages.append({"role": "user", "content": prompt})
kwargs = {
"model": self.model_name,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature
}
if json_format:
kwargs["response_format"] = {"type": "json_object"}
try:
response = openai.ChatCompletion.create(**kwargs)
return {
'text': response.choices[0].message.content,
'model': self.model_name,
'finish_reason': response.choices[0].finish_reason,
'usage': {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens
}
}
except Exception as e:
print(f"Error generating text with OpenAI: {e}")
return {
'text': f"Error generating response: {str(e)}",
'model': self.model_name,
'error': str(e)
}
def _generate_local(self, prompt: str, system_message: str = None,
max_tokens: int = 1000, temperature: float = 0.7) -> Dict[str, Any]:
"""Generate text using a local model."""
try:
# Combine system message and prompt if provided
full_prompt = ""
if system_message:
full_prompt = f"{system_message}\n\n"
full_prompt += prompt
# Tokenize the input
inputs = self.tokenizer(full_prompt, return_tensors="pt").to(self.device)
# Generate response
with torch.no_grad():
outputs = self.model.generate(
inputs.input_ids,
max_new_tokens=max_tokens,
temperature=temperature,
do_sample=temperature > 0,
pad_token_id=self.tokenizer.eos_token_id
)
# Decode the response
generated_text = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
return {
'text': generated_text,
'model': self.model_name,
'finish_reason': 'stop',
'usage': {
'prompt_tokens': inputs.input_ids.shape[1],
'completion_tokens': outputs.shape[1] - inputs.input_ids.shape[1],
'total_tokens': outputs.shape[1]
}
}
except Exception as e:
print(f"Error generating text with local model: {e}")
return {
'text': f"Error generating response: {str(e)}",
'model': self.model_name,
'error': str(e)
}
This LLMInterface class provides a unified interface for interacting with different types of LLMs, including commercial APIs like OpenAI and local models using the Hugging Face Transformers library. It handles device selection for local models, supporting NVIDIA CUDA, AMD ROCm, and Apple MPS for GPU acceleration. The class includes methods for generating text with different parameters and formats, with appropriate error handling.
GPU Acceleration
GPU acceleration is essential for running large language models efficiently, especially when using local models. Different hardware platforms require different approaches to GPU acceleration:
1. NVIDIA CUDA: The most widely supported platform for deep learning, used with NVIDIA GPUs.
2. AMD ROCm: AMD's open-source platform for GPU computing, used with AMD GPUs.
3. Apple MPS (Metal Performance Shaders): Apple's framework for GPU computing on Mac devices with Apple Silicon or compatible AMD GPUs.
To support these different platforms, we need to configure our environment appropriately and ensure that our code can detect and use the available hardware.
Here's an example of how we might implement a utility class for managing GPU resources:
import os
import torch
import platform
import subprocess
from typing import Dict, Any, List, Optional
class GPUManager:
def __init__(self):
"""Initialize the GPU manager."""
self.device = self._detect_device()
self.device_info = self._get_device_info()
def _detect_device(self) -> str:
"""
Detect the available GPU device.
Returns:
Device string ('cuda', 'rocm', 'mps', or 'cpu')
"""
if torch.cuda.is_available():
return 'cuda'
elif hasattr(torch, 'has_mps') and torch.has_mps:
return 'mps'
elif hasattr(torch, 'has_rocm') and torch.has_rocm:
return 'rocm'
else:
return 'cpu'
def _get_device_info(self) -> Dict[str, Any]:
"""
Get information about the detected device.
Returns:
A dictionary with device information
"""
info = {
'device_type': self.device,
'platform': platform.system()
}
if self.device == 'cuda':
info['device_count'] = torch.cuda.device_count()
info['current_device'] = torch.cuda.current_device()
info['device_name'] = torch.cuda.get_device_name(info['current_device'])
info['memory_allocated'] = torch.cuda.memory_allocated(info['current_device'])
info['memory_reserved'] = torch.cuda.memory_reserved(info['current_device'])
try:
# Try to get more detailed information using nvidia-smi
result = subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total,memory.free,memory.used,temperature.gpu', '--format=csv,noheader'],
capture_output=True, text=True)
if result.returncode == 0:
parts = result.stdout.strip().split(', ')
if len(parts) >= 5:
info['detailed'] = {
'name': parts[0],
'memory_total': parts[1],
'memory_free': parts[2],
'memory_used': parts[3],
'temperature': parts[4]
}
except:
pass
elif self.device == 'rocm':
info['device_count'] = torch.cuda.device_count() # ROCm uses CUDA API
info['current_device'] = torch.cuda.current_device()
info['device_name'] = torch.cuda.get_device_name(info['current_device'])
info['memory_allocated'] = torch.cuda.memory_allocated(info['current_device'])
info['memory_reserved'] = torch.cuda.memory_reserved(info['current_device'])
try:
# Try to get more detailed information using rocm-smi
result = subprocess.run(['rocm-smi', '--showmeminfo', 'vram'],
capture_output=True, text=True)
if result.returncode == 0:
info['detailed'] = {'rocm_smi_output': result.stdout.strip()}
except:
pass
elif self.device == 'mps':
# Limited information available for MPS
info['device_name'] = 'Apple Silicon GPU'
else:
info['device_name'] = 'CPU'
info['cpu_count'] = os.cpu_count()
return info
def optimize_for_inference(self, model) -> Any:
"""
Optimize a model for inference on the detected device.
Args:
model: The model to optimize
Returns:
The optimized model
"""
if self.device == 'cuda':
# Convert to half precision for NVIDIA GPUs
model = model.half().to(self.device)
# Could add CUDA-specific optimizations here
elif self.device == 'rocm':
# ROCm optimizations
model = model.half().to(self.device)
elif self.device == 'mps':
# MPS optimizations
model = model.to(self.device)
else:
# CPU optimizations
model = model.float().to(self.device)
return model
def get_recommended_batch_size(self) -> int:
"""
Get a recommended batch size based on the device.
Returns:
Recommended batch size
"""
if self.device == 'cpu':
return 1
elif self.device == 'mps':
return 4 # Conservative default for Apple Silicon
else:
# For CUDA and ROCm, try to estimate based on available memory
try:
if self.device == 'cuda' or self.device == 'rocm':
free_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)
# Very rough estimate: assume 1GB per batch item for a large model
batch_size = max(1, int(free_memory / (1024**3 * 1e9)))
return min(8, batch_size) # Cap at 8 to be safe
except:
pass
# Default if estimation fails
return 2
def print_device_info(self) -> None:
"""Print information about the detected device."""
print(f"Device: {self.device_info['device_type']}")
print(f"Device Name: {self.device_info['device_name']}")
if self.device == 'cuda' or self.device == 'rocm':
print(f"Device Count: {self.device_info['device_count']}")
print(f"Current Device: {self.device_info['current_device']}")
print(f"Memory Allocated: {self.device_info['memory_allocated'] / 1024**2:.2f} MB")
print(f"Memory Reserved: {self.device_info['memory_reserved'] / 1024**2:.2f} MB")
if 'detailed' in self.device_info:
print("\nDetailed Information:")
for key, value in self.device_info['detailed'].items():
print(f" {key}: {value}")
elif self.device == 'cpu':
print(f"CPU Count: {self.device_info['cpu_count']}")
This GPUManager class provides utilities for detecting and managing GPU resources. It can identify the type of GPU available (NVIDIA CUDA, AMD ROCm, or Apple MPS), gather information about the device, optimize models for inference on the detected device, and recommend appropriate batch sizes based on available memory. This helps ensure that the research agent can run efficiently on different hardware platforms.
Tool Integration for Agentic Capabilities
An agentic AI system can use various tools to extend its capabilities beyond what the base LLM can do. For a research agent, these tools might include web browsers, search engines, PDF readers, citation managers, and other utilities that help with gathering and processing information.
To implement tool integration, we need to create a framework that allows the LLM to select and use appropriate tools based on the user's query and the current state of the research process. This involves defining a set of tools with clear interfaces, implementing a mechanism for the LLM to select tools, and handling the results of tool usage.
Here's an example of how we might implement a tool integration framework:
import json
import requests
import subprocess
import os
from typing import List, Dict, Any, Callable, Optional
from abc import ABC, abstractmethod
class Tool(ABC):
"""Abstract base class for tools."""
@property
@abstractmethod
def name(self) -> str:
"""Get the name of the tool."""
pass
@property
@abstractmethod
def description(self) -> str:
"""Get the description of the tool."""
pass
@abstractmethod
def run(self, input_data: Any) -> Dict[str, Any]:
"""
Run the tool with the given input.
Args:
input_data: Input data for the tool
Returns:
A dictionary with the results
"""
pass
class WebSearchTool(Tool):
"""Tool for searching the web."""
def __init__(self, api_key: str = None):
"""
Initialize the web search tool.
Args:
api_key: API key for the search service
"""
self.api_key = api_key or os.getenv("SERP_API_KEY")
@property
def name(self) -> str:
return "web_search"
@property
def description(self) -> str:
return "Search the web for information on a topic"
def run(self, input_data: str) -> Dict[str, Any]:
"""
Search the web for the given query.
Args:
input_data: Search query
Returns:
A dictionary with search results
"""
if not self.api_key:
return {"error": "No API key provided for web search"}
url = "https://serpapi.com/search"
params = {
"q": input_data,
"api_key": self.api_key,
"engine": "google"
}
try:
response = requests.get(url, params=params)
data = response.json()
results = []
for item in data.get('organic_results', [])[:5]:
results.append({
'title': item.get('title', ''),
'snippet': item.get('snippet', ''),
'link': item.get('link', '')
})
return {
"query": input_data,
"results": results
}
except Exception as e:
return {"error": f"Error searching the web: {str(e)}"}
class ArxivSearchTool(Tool):
"""Tool for searching arXiv."""
@property
def name(self) -> str:
return "arxiv_search"
@property
def description(self) -> str:
return "Search arXiv for academic papers on a topic"
def run(self, input_data: str) -> Dict[str, Any]:
"""
Search arXiv for the given query.
Args:
input_data: Search query
Returns:
A dictionary with search results
"""
try:
import arxiv
search = arxiv.Search(
query=input_data,
max_results=5,
sort_by=arxiv.SortCriterion.Relevance
)
results = []
for paper in search.results():
results.append({
'title': paper.title,
'authors': [author for author in paper.authors],
'abstract': paper.summary,
'url': paper.pdf_url,
'published': paper.published.strftime('%Y-%m-%d')
})
return {
"query": input_data,
"results": results
}
except Exception as e:
return {"error": f"Error searching arXiv: {str(e)}"}
class PDFReaderTool(Tool):
"""Tool for reading PDF documents."""
@property
def name(self) -> str:
return "pdf_reader"
@property
def description(self) -> str:
return "Extract text from a PDF document"
def run(self, input_data: str) -> Dict[str, Any]:
"""
Extract text from a PDF document.
Args:
input_data: URL or path to the PDF document
Returns:
A dictionary with the extracted text
"""
try:
import PyPDF2
import io
# Check if input is a URL or a local path
if input_data.startswith(('http://', 'https://')):
# Download the PDF
response = requests.get(input_data)
pdf_file = io.BytesIO(response.content)
else:
# Open local file
pdf_file = open(input_data, 'rb')
# Extract text
reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page_num in range(min(5, len(reader.pages))): # Limit to first 5 pages
page = reader.pages[page_num]
text += page.extract_text() + "\n\n"
# Close the file if it's a local file
if not input_data.startswith(('http://', 'https://')):
pdf_file.close()
return {
"source": input_data,
"text": text[:10000], # Limit text length
"pages_read": min(5, len(reader.pages)),
"total_pages": len(reader.pages)
}
except Exception as e:
return {"error": f"Error reading PDF: {str(e)}"}
class ToolManager:
"""Manager for tools used by the research agent."""
def __init__(self, llm_interface):
"""
Initialize the tool manager.
Args:
llm_interface: Interface to the LLM
"""
self.llm_interface = llm_interface
self.tools = {}
# Register default tools
self.register_tool(WebSearchTool())
self.register_tool(ArxivSearchTool())
self.register_tool(PDFReaderTool())
def register_tool(self, tool: Tool) -> None:
"""
Register a tool with the manager.
Args:
tool: The tool to register
"""
self.tools[tool.name] = tool
def get_tool(self, tool_name: str) -> Optional[Tool]:
"""
Get a tool by name.
Args:
tool_name: Name of the tool
Returns:
The tool or None if not found
"""
return self.tools.get(tool_name)
def list_tools(self) -> List[Dict[str, str]]:
"""
List all available tools.
Returns:
A list of dictionaries with tool information
"""
return [{"name": tool.name, "description": tool.description} for tool in self.tools.values()]
def select_tool(self, query: str) -> Dict[str, Any]:
"""
Use the LLM to select an appropriate tool for a query.
Args:
query: The user's query
Returns:
A dictionary with the selected tool and parameters
"""
tools_list = self.list_tools()
tools_json = json.dumps(tools_list)
prompt = f"""
I need to select the most appropriate tool to help with this research query:
"{query}"
Available tools:
{tools_json}
Please select the most appropriate tool and provide any parameters needed.
Format your response as a JSON object with 'tool_name' and 'parameters' fields.
"""
system_message = "You are a research assistant selecting tools to help with research queries."
response = self.llm_interface.generate(
prompt=prompt,
system_message=system_message,
json_format=True
)
try:
result = json.loads(response['text'])
return result
except json.JSONDecodeError:
# Fallback if the response is not valid JSON
return {
"tool_name": "web_search",
"parameters": query
}
def use_tool(self, tool_name: str, parameters: Any) -> Dict[str, Any]:
"""
Use a tool with the given parameters.
Args:
tool_name: Name of the tool to use
parameters: Parameters for the tool
Returns:
The results from the tool
"""
tool = self.get_tool(tool_name)
if not tool:
return {"error": f"Tool not found: {tool_name}"}
try:
return tool.run(parameters)
except Exception as e:
return {"error": f"Error using tool {tool_name}: {str(e)}"}
def process_with_tools(self, query: str, max_tools: int = 3) -> Dict[str, Any]:
"""
Process a query using appropriate tools.
Args:
query: The user's query
max_tools: Maximum number of tools to use
Returns:
A dictionary with the results
"""
results = []
for i in range(max_tools):
# Select a tool
tool_selection = self.select_tool(query)
tool_name = tool_selection.get('tool_name')
parameters = tool_selection.get('parameters')
# Use the tool
tool_result = self.use_tool(tool_name, parameters)
# Add to results
results.append({
"tool": tool_name,
"parameters": parameters,
"result": tool_result
})
# Check if we need to continue
if i < max_tools - 1:
# Ask the LLM if we need more information
prompt = f"""
I'm researching: "{query}"
So far, I've used these tools and found:
{json.dumps(results, indent=2)}
Do I need to use additional tools to answer the query effectively?
Respond with a JSON object with 'need_more_tools' (true/false) and 'next_tool_suggestion' fields.
"""
system_message = "You are a research assistant deciding if more information is needed."
response = self.llm_interface.generate(
prompt=prompt,
system_message=system_message,
json_format=True
)
try:
continuation = json.loads(response['text'])
if not continuation.get('need_more_tools', True):
break
except:
# If parsing fails, continue with the loop
pass
return {
"query": query,
"tool_results": results
}
This code includes several classes for implementing tool integration in the research agent. The Tool abstract base class defines the interface for all tools, with concrete implementations for web search, arXiv search, and PDF reading. The ToolManager class manages the available tools, selects appropriate tools for a given query using the LLM, and processes queries using multiple tools in sequence. This framework allows the research agent to leverage external capabilities to gather and process information more effectively.
Putting It All Together: The Research Agent
Now that we've implemented all the core components of our research agent, we need to integrate them into a cohesive system. This involves creating a main class that orchestrates the various components and manages the overall research process.
Here's an example of how we might implement the main ResearchAgent class:
import os
import json
from typing import Dict, Any, List, Optional
from datetime import datetime
class ResearchAgent:
def __init__(self, config: Dict[str, Any] = None):
"""
Initialize the research agent with the specified configuration.
Args:
config: Configuration dictionary
"""
self.config = config or {}
# Set up LLM interface
model_type = self.config.get('model_type', 'openai')
model_name = self.config.get('model_name', 'gpt-4')
api_key = self.config.get('api_key')
# Initialize GPU manager
self.gpu_manager = GPUManager()
device = self.gpu_manager.device
print(f"Initializing Research Agent with {model_type} model {model_name} on {device}")
self.gpu_manager.print_device_info()
# Initialize LLM interface
self.llm_interface = LLMInterface(
model_type=model_type,
model_name=model_name,
api_key=api_key,
device=device
)
# Initialize components
self.query_processor = QueryProcessor(api_key=api_key)
self.tool_manager = ToolManager(self.llm_interface)
self.information_retriever = InformationRetriever(api_keys=self.config.get('api_keys', {}))
self.source_validator = SourceValidator(api_key=api_key)
self.reference_manager = ReferenceManager()
self.response_generator = ResponseGenerator(api_key=api_key)
self.refinement_manager = RefinementManager(api_key=api_key)
# Initialize session
self.session = ResearchSession()
def process_query(self, query: str) -> Dict[str, Any]:
"""
Process a research query.
Args:
query: The research query
Returns:
A dictionary with the research findings
"""
print(f"Processing query: {query}")
# Process the query
processed_query = self.query_processor.process_query(query)
print(f"Processed query: {json.dumps(processed_query, indent=2)}")
# Use tools to gather initial information
tool_results = self.tool_manager.process_with_tools(query)
print(f"Tool results gathered from {len(tool_results['tool_results'])} tools")
# Retrieve information from various sources
search_queries = processed_query.get('search_queries', [query])
suggested_sources = processed_query.get('suggested_sources', [])
sources_to_search = ['web', 'arxiv', 'google_scholar']
if suggested_sources:
# Map suggested sources to actual source names
source_mapping = {
'google scholar': 'google_scholar',
'arxiv': 'arxiv',
'web': 'web',
'internet': 'web',
'pubmed': 'pubmed'
}
sources_to_search = [source_mapping.get(s.lower(), s.lower()) for s in suggested_sources]
sources_to_search = [s for s in sources_to_search if s in ['web', 'arxiv', 'google_scholar', 'pubmed']]
retrieved_info = self.information_retriever.retrieve_information(search_queries, sources_to_search)
# Combine all sources
all_sources = []
# Add sources from tool results
for tool_result in tool_results['tool_results']:
if 'result' in tool_result and 'results' in tool_result['result']:
for result in tool_result['result']['results']:
all_sources.append({
'title': result.get('title', ''),
'authors': result.get('authors', []),
'abstract': result.get('abstract', result.get('snippet', '')),
'url': result.get('link', result.get('url', '')),
'published': result.get('published', ''),
'source_type': tool_result['tool']
})
# Add sources from information retrieval
for source_type, sources in retrieved_info.items():
all_sources.extend(sources)
print(f"Total sources gathered: {len(all_sources)}")
# Validate sources
validated_sources = self.source_validator.validate_sources(all_sources, query)
print(f"Sources validated: {len(validated_sources)}")
# Generate response
reference_style = self.config.get('reference_style', 'apa')
response = self.response_generator.generate_response(query, validated_sources, reference_style)
# Add the interaction to the session
self.session.add_interaction(query, response)
return response
def refine_query(self, refined_query: str) -> Dict[str, Any]:
"""
Process a refined query based on previous research.
Args:
refined_query: The refined research query
Returns:
A dictionary with the research findings
"""
# Get refinement information
refinement_info = self.refinement_manager.process_refinement(self.session, refined_query)
# Process the refined query
return self.process_query(refined_query)
def save_session(self, file_path: str) -> None:
"""
Save the current session to a file.
Args:
file_path: Path to save the session
"""
self.session.save_session(file_path)
def load_session(self, file_path: str) -> None:
"""
Load a session from a file.
Args:
file_path: Path to the session file
"""
self.session = ResearchSession.load_session(file_path)
This ResearchAgent class integrates all the components we've developed into a cohesive system. It initializes the necessary components, processes research queries by coordinating the various components, and manages research sessions. The process_query method orchestrates the entire research process, from processing the query to generating the final response. The refine_query method handles query refinement based on previous research.
Running the Research Agent
To run the research agent, we need to create a script that initializes the agent and provides an interface for users to interact with it. Here's an example of how we might implement a simple command-line interface:
import argparse
import json
import os
from research_agent import ResearchAgent
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="LLM-based Research Agent")
parser.add_argument("--config", type=str, default="config.json", help="Path to configuration file")
parser.add_argument("--model", type=str, help="Model type (openai, local)")
parser.add_argument("--model_name", type=str, help="Model name")
parser.add_argument("--api_key", type=str, help="API key for commercial LLMs")
parser.add_argument("--session", type=str, help="Path to session file to load")
args = parser.parse_args()
# Load configuration
config = {}
if os.path.exists(args.config):
with open(args.config, 'r') as f:
config = json.load(f)
# Override config with command-line arguments
if args.model:
config['model_type'] = args.model
if args.model_name:
config['model_name'] = args.model_name
if args.api_key:
config['api_key'] = args.api_key
# Initialize the research agent
agent = ResearchAgent(config)
# Load session if specified
if args.session and os.path.exists(args.session):
agent.load_session(args.session)
print(f"Loaded session from {args.session}")
# Start the CLI
print("Welcome to the Research Agent CLI")
print("Enter your research question or type 'exit' to quit")
print("Type 'refine' to refine your previous query")
print("Type 'save <filename>' to save the current session")
while True:
user_input = input("\nResearch Question: ")
if user_input.lower() == 'exit':
print("Thank you for using the Research Agent. Goodbye!")
break
elif user_input.lower() == 'refine':
if not agent.session.interactions:
print("No previous query to refine. Please enter a research question first.")
continue
print(f"Previous query: {agent.session.current_query}")
refined_query = input("Refined Query: ")
response = agent.refine_query(refined_query)
print("\n" + "="*80)
print("RESEARCH FINDINGS:")
print(response['findings'])
print("\nSOURCES:")
for i, source in enumerate(response['sources'], 1):
print(f"{source['id']} {source['reference']}")
print("="*80)
elif user_input.lower().startswith('save '):
filename = user_input[5:].strip()
if not filename:
filename = f"session_{agent.session.session_id}.json"
agent.save_session(filename)
print(f"Session saved to {filename}")
else:
response = agent.process_query(user_input)
print("\n" + "="*80)
print("RESEARCH FINDINGS:")
print(response['findings'])
print("\nSOURCES:")
for i, source in enumerate(response['sources'], 1):
print(f"{source['id']} {source['reference']}")
print("="*80)
if __name__ == "__main__":
main()
This script provides a simple command-line interface for interacting with the research agent. It parses command-line arguments, loads configuration from a file, initializes the research agent, and provides commands for conducting research, refining queries, and saving sessions.
For a web-based interface, we would need to create a web server using a framework like Flask or FastAPI, and a frontend using HTML, CSS, and JavaScript. Here's a simplified example of how we might implement a Flask-based web server:
from flask import Flask, request, jsonify, render_template
import json
import os
from research_agent import ResearchAgent
app = Flask(__name__)
# Initialize the research agent
config_path = os.environ.get('CONFIG_PATH', 'config.json')
config = {}
if os.path.exists(config_path):
with open(config_path, 'r') as f:
config = json.load(f)
research_agent = ResearchAgent(config)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/research', methods=['POST'])
def conduct_research():
data = request.json
query = data.get('query', '')
if not query:
return jsonify({'error': 'No query provided'}), 400
response = research_agent.process_query(query)
return jsonify(response)
@app.route('/api/refine', methods=['POST'])
def refine_research():
data = request.json
refined_query = data.get('refined_query', '')
if not refined_query:
return jsonify({'error': 'No refined query provided'}), 400
response = research_agent.refine_query(refined_query)
return jsonify(response)
@app.route('/api/save_session', methods=['POST'])
def save_session():
data = request.json
filename = data.get('filename', f"session_{research_agent.session.session_id}.json")
research_agent.save_session(filename)
return jsonify({'message': f'Session saved to {filename}'})
@app.route('/api/load_session', methods=['POST'])
def load_session():
data = request.json
filename = data.get('filename', '')
if not filename or not os.path.exists(filename):
return jsonify({'error': 'Invalid or non-existent session file'}), 400
research_agent.load_session(filename)
return jsonify({'message': f'Session loaded from {filename}'})
if __name__ == '__main__':
app.run(debug=True)
This Flask application provides API endpoints for conducting research, refining queries, and saving/loading sessions. It would need to be complemented with HTML, CSS, and JavaScript files to create a complete web interface.
Evaluation and Optimization
To ensure that our research agent is performing effectively, we need to evaluate its performance and optimize its components. This involves assessing the quality of the information retrieved, the relevance of the sources, the accuracy of the synthesized findings, and the overall user experience.
There are several approaches to evaluation and optimization:
1. User feedback: Collect feedback from users on the quality and usefulness of the research findings. This can be done through explicit feedback mechanisms (like ratings or comments) or implicit feedback (like user engagement metrics).
2. Expert evaluation: Have domain experts evaluate the quality of the research findings and the accuracy of the information provided. This can help identify areas where the agent is performing well or poorly.
3. Automated metrics: Develop automated metrics to evaluate different aspects of the agent's performance, such as the relevance of retrieved sources, the diversity of information, and the coherence of the synthesized findings.
4. A/B testing: Compare different configurations or components of the agent to identify which ones perform better. This can help optimize the system over time.
Here's an example of how we might implement a simple evaluation framework:
import json
from typing import Dict, Any, List
from datetime import datetime
class EvaluationMetrics:
"""Class for calculating evaluation metrics for the research agent."""
@staticmethod
def calculate_metrics(query: str, response: Dict[str, Any], feedback: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Calculate evaluation metrics for a research response.
Args:
query: The research query
response: The response from the research agent
feedback: Optional user feedback
Returns:
A dictionary with evaluation metrics
"""
metrics = {}
# Source metrics
sources = response.get('sources', [])
metrics['num_sources'] = len(sources)
if sources:
relevance_scores = [s.get('relevance_score', 0) for s in sources if 'relevance_score' in s]
credibility_scores = [s.get('credibility_score', 0) for s in sources if 'credibility_score' in s]
if relevance_scores:
metrics['avg_relevance_score'] = sum(relevance_scores) / len(relevance_scores)
metrics['max_relevance_score'] = max(relevance_scores)
metrics['min_relevance_score'] = min(relevance_scores)
if credibility_scores:
metrics['avg_credibility_score'] = sum(credibility_scores) / len(credibility_scores)
metrics['max_credibility_score'] = max(credibility_scores)
metrics['min_credibility_score'] = min(credibility_scores)
# Source diversity
source_types = [s.get('source_type', 'unknown') for s in sources]
unique_source_types = set(source_types)
metrics['source_diversity'] = len(unique_source_types)
metrics['source_type_distribution'] = {t: source_types.count(t) for t in unique_source_types}
# Response metrics
findings = response.get('findings', '')
metrics['response_length'] = len(findings)
metrics['response_word_count'] = len(findings.split())
# Citation metrics
citation_count = findings.count('[') # Simple approximation
metrics['citation_count'] = citation_count
metrics['citation_density'] = citation_count / metrics['response_word_count'] if metrics['response_word_count'] > 0 else 0
# User feedback metrics
if feedback:
metrics['user_feedback'] = feedback
return metrics
class EvaluationManager:
"""Manager for evaluating the research agent."""
def __init__(self, log_file: str = None):
"""
Initialize the evaluation manager.
Args:
log_file: Path to the evaluation log file
"""
self.log_file = log_file or f"evaluation_log_{datetime.now().strftime('%Y%m%d%H%M%S')}.jsonl"
def log_evaluation(self, query: str, response: Dict[str, Any], feedback: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Log an evaluation entry.
Args:
query: The research query
response: The response from the research agent
feedback: Optional user feedback
Returns:
The evaluation metrics
"""
# Calculate metrics
metrics = EvaluationMetrics.calculate_metrics(query, response, feedback)
# Create log entry
log_entry = {
'timestamp': datetime.now().isoformat(),
'query': query,
'metrics': metrics
}
# Write to log file
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
return metrics
def analyze_logs(self, log_file: str = None) -> Dict[str, Any]:
"""
Analyze evaluation logs.
Args:
log_file: Path to the log file to analyze (defaults to the instance's log file)
Returns:
A dictionary with analysis results
"""
file_to_analyze = log_file or self.log_file
entries = []
with open(file_to_analyze, 'r') as f:
for line in f:
entries.append(json.loads(line))
if not entries:
return {'error': 'No log entries found'}
# Aggregate metrics
aggregated = {
'num_entries': len(entries),
'avg_num_sources': sum(e['metrics'].get('num_sources', 0) for e in entries) / len(entries),
'avg_response_word_count': sum(e['metrics'].get('response_word_count', 0) for e in entries) / len(entries),
'avg_citation_density': sum(e['metrics'].get('citation_density', 0) for e in entries) / len(entries)
}
# User feedback if available
feedback_entries = [e for e in entries if 'user_feedback' in e['metrics']]
if feedback_entries:
aggregated['avg_user_rating'] = sum(e['metrics']['user_feedback'].get('rating', 0) for e in feedback_entries) / len(feedback_entries)
return aggregated
This code includes two classes for evaluating the research agent: EvaluationMetrics and EvaluationManager. The EvaluationMetrics class calculates various metrics for a research response, including source metrics, response metrics, citation metrics, and user feedback metrics. The EvaluationManager class logs evaluation entries and provides methods for analyzing the logs.
Conclusion
In this article, we've explored how to build an LLM-based research agent that can help users find, evaluate, and synthesize information from various sources. We've covered the key components of such a system, including the user interface, query processing, information retrieval, source validation, response generation, and iterative refinement. We've also discussed technical implementation details, such as LLM selection, GPU acceleration, and tool integration.
The research agent we've designed is capable of understanding natural language queries, searching for relevant information from multiple sources, evaluating the credibility and relevance of sources, synthesizing findings into coherent responses, and supporting iterative refinement of research questions. It leverages the power of large language models to understand and generate text, while also integrating with external tools to extend its capabilities.
Building an effective research agent requires careful attention to various aspects, including the quality and relevance of information retrieved, the credibility of sources, the coherence and accuracy of synthesized findings, and the overall user experience. By evaluating and optimizing these aspects, we can create a research agent that provides valuable assistance to users in their research endeavors.
As large language models and related technologies continue to evolve, the capabilities of research agents will likely improve, enabling more sophisticated and effective research assistance. Future directions might include more advanced source evaluation, better handling of conflicting information, more sophisticated tool integration, and improved personalization based on user preferences and research history.
No comments:
Post a Comment