INTRODUCTION
The exponential growth of information on the internet presents both opportunities and challenges for professionals seeking to stay current in their fields. Whether you are tracking developments in software engineering, artificial intelligence, robotics, generative AI, integrated development environments, 3D printing technologies, lasers, or astronomy, the sheer volume of data makes manual trend identification increasingly impractical. This article presents a comprehensive guide to building an LLM-powered agent that automatically discovers, analyzes, and categorizes emerging trends in any given topic area.
Our trend discovery agent combines the reasoning capabilities of large language models with real-time internet search functionality to identify and classify trends according to established frameworks from trend research. The system leverages GPU acceleration through NVIDIA CUDA or Apple Metal Performance Shaders to ensure optimal performance, supports both local and remote LLM deployments, and provides detailed analysis including trend classification, impact assessment, and curated resources for further exploration.
UNDERSTANDING THE PROBLEM DOMAIN
Before diving into implementation details, we must establish a clear understanding of what constitutes a trend and how trend research methodologies can inform our agent's design. In trend research, professionals distinguish between several categories of trends based on their scope, duration, and impact. A fad represents a short-lived phenomenon with limited lasting impact. A trend typically spans several years and affects specific industries or domains. A megatrend encompasses decades-long shifts that fundamentally reshape society, technology, and markets across multiple sectors.
Our agent must not only identify emerging patterns but also classify them appropriately, assess their potential impact on technology, science, and products, and provide substantive analysis that goes beyond simple keyword matching. This requires integrating multiple capabilities including web search, content analysis, pattern recognition, and structured reasoning about trend characteristics.
ARCHITECTURAL OVERVIEW
The trend discovery agent architecture consists of several interconnected components working in harmony. At the foundation lies the LLM interface layer, which abstracts the differences between local and remote language models while ensuring optimal GPU utilization. Above this sits the search orchestration layer, responsible for formulating effective search queries, retrieving relevant content, and managing the information gathering process. The analysis engine processes retrieved information to identify patterns, extract key insights, and classify trends according to established frameworks. Finally, the presentation layer structures the findings into coherent reports with proper citations and recommendations for further reading.
The system follows clean architecture principles by separating concerns into distinct layers with well-defined interfaces. This separation ensures that we can swap implementations, for example replacing one LLM provider with another, without affecting the rest of the system. The architecture also emphasizes testability, maintainability, and extensibility to accommodate future enhancements.
STEP ONE: ESTABLISHING THE LLM FOUNDATION
The first step in building our trend discovery agent involves creating a robust abstraction layer for language model interactions. This layer must handle both local models running on consumer hardware and remote API-based services while optimizing for available GPU resources.
We begin by defining a base interface that all LLM implementations must satisfy. This interface specifies methods for generating completions, managing conversation context, and configuring generation parameters.
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
import torch
@dataclass
class GenerationConfig:
"""Configuration parameters for text generation."""
temperature: float = 0.7
max_tokens: int = 2048
top_p: float = 0.9
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
stop_sequences: Optional[List[str]] = None
@dataclass
class Message:
"""Represents a single message in a conversation."""
role: str # 'system', 'user', or 'assistant'
content: str
class LLMInterface(ABC):
"""Abstract base class for all LLM implementations."""
@abstractmethod
def generate(self, messages: List[Message], config: GenerationConfig) -> str:
"""
Generate a response based on the conversation history.
Args:
messages: List of conversation messages
config: Generation configuration parameters
Returns:
Generated text response
"""
pass
@abstractmethod
def get_device_info(self) -> Dict[str, Any]:
"""
Retrieve information about the compute device being used.
Returns:
Dictionary containing device type, name, and capabilities
"""
pass
This interface provides the contract that all concrete implementations must fulfill. The Message dataclass encapsulates individual conversation turns, while GenerationConfig allows fine-grained control over the generation process. The get_device_info method enables monitoring and debugging of GPU utilization.
Now we implement a local LLM provider that leverages GPU acceleration through PyTorch. This implementation automatically detects available hardware and configures itself accordingly.
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import List, Dict, Any
class LocalLLMProvider(LLMInterface):
"""
Local LLM implementation with automatic GPU acceleration.
Supports NVIDIA CUDA and Apple Metal Performance Shaders.
"""
def __init__(self, model_name: str, device: Optional[str] = None):
"""
Initialize the local LLM provider.
Args:
model_name: HuggingFace model identifier
device: Target device ('cuda', 'mps', 'cpu', or None for auto-detect)
"""
self.model_name = model_name
self.device = self._determine_device(device)
# Load tokenizer and model with appropriate device mapping
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
# Configure model loading based on available hardware
if self.device == 'cuda':
# Use CUDA with automatic mixed precision for optimal performance
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map='auto'
)
elif self.device == 'mps':
# Apple Silicon optimization
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16
).to('mps')
else:
# CPU fallback
self.model = AutoModelForCausalLM.from_pretrained(model_name)
self.model.to('cpu')
def _determine_device(self, preferred_device: Optional[str]) -> str:
"""
Determine the optimal compute device.
Args:
preferred_device: User-specified device preference
Returns:
Device string ('cuda', 'mps', or 'cpu')
"""
if preferred_device:
return preferred_device
# Auto-detect best available device
if torch.cuda.is_available():
return 'cuda'
elif torch.backends.mps.is_available():
return 'mps'
else:
return 'cpu'
def generate(self, messages: List[Message], config: GenerationConfig) -> str:
"""Generate response using the local model."""
# Format messages into a prompt string
prompt = self._format_messages(messages)
# Tokenize input
inputs = self.tokenizer(prompt, return_tensors='pt').to(self.device)
# Configure generation parameters
gen_kwargs = {
'max_new_tokens': config.max_tokens,
'temperature': config.temperature,
'top_p': config.top_p,
'do_sample': True,
'pad_token_id': self.tokenizer.eos_token_id
}
# Generate response
with torch.no_grad():
outputs = self.model.generate(**inputs, **gen_kwargs)
# Decode and return only the new tokens
full_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
response = full_response[len(prompt):].strip()
return response
def _format_messages(self, messages: List[Message]) -> str:
"""
Format conversation messages into a prompt string.
Args:
messages: List of conversation messages
Returns:
Formatted prompt string
"""
formatted_parts = []
for msg in messages:
if msg.role == 'system':
formatted_parts.append(f"System: {msg.content}")
elif msg.role == 'user':
formatted_parts.append(f"User: {msg.content}")
elif msg.role == 'assistant':
formatted_parts.append(f"Assistant: {msg.content}")
formatted_parts.append("Assistant:")
return "\n\n".join(formatted_parts)
def get_device_info(self) -> Dict[str, Any]:
"""Retrieve information about the compute device."""
info = {
'device_type': self.device,
'model_name': self.model_name
}
if self.device == 'cuda':
info['gpu_name'] = torch.cuda.get_device_name(0)
info['gpu_memory_total'] = torch.cuda.get_device_properties(0).total_memory
info['gpu_memory_allocated'] = torch.cuda.memory_allocated(0)
elif self.device == 'mps':
info['gpu_name'] = 'Apple Silicon'
return info
The LocalLLMProvider class demonstrates several important design decisions. First, it automatically detects the best available hardware and configures PyTorch accordingly. When NVIDIA CUDA is available, it uses half-precision floating point arithmetic to maximize throughput and minimize memory consumption. For Apple Silicon devices, it leverages the Metal Performance Shaders backend. The implementation falls back gracefully to CPU execution when no GPU acceleration is available.
The message formatting logic converts our structured conversation history into a text prompt suitable for causal language models. This approach maintains conversation context while remaining compatible with various model architectures.
Next, we implement a remote LLM provider that interfaces with API-based services such as OpenAI, Anthropic, or other providers. This implementation shares the same interface, allowing seamless substitution.
import requests
from typing import List, Dict, Any
import os
class RemoteLLMProvider(LLMInterface):
"""
Remote LLM implementation for API-based services.
Supports OpenAI-compatible endpoints.
"""
def __init__(self, api_key: str, model_name: str, base_url: str = "https://api.openai.com/v1"):
"""
Initialize the remote LLM provider.
Args:
api_key: API authentication key
model_name: Model identifier for the remote service
base_url: Base URL for the API endpoint
"""
self.api_key = api_key
self.model_name = model_name
self.base_url = base_url
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
def generate(self, messages: List[Message], config: GenerationConfig) -> str:
"""Generate response using the remote API."""
# Convert messages to API format
api_messages = [
{'role': msg.role, 'content': msg.content}
for msg in messages
]
# Prepare request payload
payload = {
'model': self.model_name,
'messages': api_messages,
'temperature': config.temperature,
'max_tokens': config.max_tokens,
'top_p': config.top_p,
'frequency_penalty': config.frequency_penalty,
'presence_penalty': config.presence_penalty
}
if config.stop_sequences:
payload['stop'] = config.stop_sequences
# Make API request
response = requests.post(
f'{self.base_url}/chat/completions',
headers=self.headers,
json=payload,
timeout=120
)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
def get_device_info(self) -> Dict[str, Any]:
"""Retrieve information about the remote service."""
return {
'device_type': 'remote',
'model_name': self.model_name,
'base_url': self.base_url
}
The RemoteLLMProvider handles communication with external API services, managing authentication, request formatting, and error handling. By implementing the same LLMInterface, we ensure that the rest of our system remains agnostic to whether it is using a local or remote model.
STEP TWO: IMPLEMENTING WEB SEARCH CAPABILITIES
With our LLM foundation established, we now turn our attention to web search functionality. The trend discovery agent must be able to formulate effective search queries, retrieve relevant content from the internet, and extract meaningful information from web pages.
We begin by creating a search interface that abstracts different search providers. This allows us to support multiple search engines or services while maintaining a consistent interface.
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class SearchResult:
"""Represents a single search result."""
title: str
url: str
snippet: str
published_date: Optional[datetime] = None
source: Optional[str] = None
class SearchInterface(ABC):
"""Abstract base class for search providers."""
@abstractmethod
def search(self, query: str, num_results: int = 10, time_filter: Optional[str] = None) -> List[SearchResult]:
"""
Execute a search query and return results.
Args:
query: Search query string
num_results: Maximum number of results to return
time_filter: Optional time filter ('day', 'week', 'month', 'year')
Returns:
List of search results
"""
pass
Now we implement a concrete search provider using the DuckDuckGo search engine, which provides a free API without requiring authentication. This makes it ideal for our trend discovery agent.
from duckduckgo_search import DDGS
from typing import List, Optional
from datetime import datetime
import time
class DuckDuckGoSearchProvider(SearchInterface):
"""
Search provider implementation using DuckDuckGo.
Provides free, privacy-focused search without API keys.
"""
def __init__(self):
"""Initialize the DuckDuckGo search provider."""
self.ddgs = DDGS()
def search(self, query: str, num_results: int = 10, time_filter: Optional[str] = None) -> List[SearchResult]:
"""
Execute a search query using DuckDuckGo.
Args:
query: Search query string
num_results: Maximum number of results to return
time_filter: Optional time filter ('d' for day, 'w' for week, 'm' for month, 'y' for year)
Returns:
List of search results
"""
try:
# Execute search with optional time filter
search_params = {'max_results': num_results}
if time_filter:
search_params['timelimit'] = time_filter
results = list(self.ddgs.text(query, **search_params))
# Convert to our SearchResult format
search_results = []
for result in results:
search_result = SearchResult(
title=result.get('title', ''),
url=result.get('href', ''),
snippet=result.get('body', ''),
source=result.get('source', None)
)
search_results.append(search_result)
return search_results
except Exception as e:
print(f"Search error: {str(e)}")
return []
The DuckDuckGoSearchProvider wraps the DuckDuckGo search API and converts results into our standardized SearchResult format. This abstraction allows us to easily swap search providers if needed without affecting the rest of the system.
To extract meaningful content from web pages, we need a robust web scraping component that can handle various page structures and extract the main textual content while filtering out navigation, advertisements, and other non-essential elements.
import requests
from bs4 import BeautifulSoup
from typing import Optional
import re
class WebContentExtractor:
"""
Extracts main textual content from web pages.
Filters out navigation, ads, and other non-essential elements.
"""
def __init__(self, timeout: int = 10):
"""
Initialize the web content extractor.
Args:
timeout: Request timeout in seconds
"""
self.timeout = timeout
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def extract_content(self, url: str) -> Optional[str]:
"""
Extract main textual content from a web page.
Args:
url: URL of the web page to extract content from
Returns:
Extracted text content or None if extraction fails
"""
try:
# Fetch the web page
response = requests.get(url, headers=self.headers, timeout=self.timeout)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):
element.decompose()
# Extract text from main content areas
main_content = soup.find('main') or soup.find('article') or soup.find('body')
if not main_content:
return None
# Get text and clean it
text = main_content.get_text(separator='\n', strip=True)
# Remove excessive whitespace
text = re.sub(r'\n\s*\n', '\n\n', text)
text = re.sub(r' +', ' ', text)
return text
except Exception as e:
print(f"Content extraction error for {url}: {str(e)}")
return None
def extract_summary(self, url: str, max_length: int = 1000) -> Optional[str]:
"""
Extract a summary of the web page content.
Args:
url: URL of the web page
max_length: Maximum length of the summary in characters
Returns:
Summarized content or None if extraction fails
"""
content = self.extract_content(url)
if not content:
return None
# Take the first max_length characters, breaking at sentence boundaries
if len(content) <= max_length:
return content
truncated = content[:max_length]
last_period = truncated.rfind('.')
if last_period > max_length * 0.7:
return truncated[:last_period + 1]
else:
return truncated + '...'
The WebContentExtractor class provides methods for retrieving and cleaning web page content. It removes non-essential elements like scripts, styles, and navigation components, focusing on the main textual content. The extract_summary method provides a convenient way to get a condensed version of the content, which is useful when we need to process multiple sources efficiently.
STEP THREE: BUILDING THE TREND ANALYSIS ENGINE
With our LLM and search capabilities in place, we now construct the core trend analysis engine. This component orchestrates the entire trend discovery process, from query formulation through result synthesis.
The trend analysis engine must perform several sophisticated tasks. First, it generates effective search queries based on the user's topic area. Second, it retrieves and processes relevant web content. Third, it analyzes the collected information to identify patterns and emerging trends. Fourth, it classifies trends according to established frameworks. Finally, it synthesizes findings into a comprehensive report.
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
class TrendCategory(Enum):
"""Classification categories for identified trends."""
FAD = "fad"
MICRO_TREND = "micro_trend"
TREND = "trend"
MACRO_TREND = "macro_trend"
MEGA_TREND = "mega_trend"
@dataclass
class TrendAnalysis:
"""Represents a complete trend analysis."""
trend_name: str
category: TrendCategory
summary: str
technology_impact: str
science_impact: str
product_impact: str
key_indicators: List[str]
time_horizon: str
confidence_level: float
sources: List[SearchResult]
recommended_urls: List[str]
class TrendAnalysisEngine:
"""
Core engine for discovering and analyzing trends.
Orchestrates search, content extraction, and LLM-based analysis.
"""
def __init__(self, llm: LLMInterface, search_provider: SearchInterface, content_extractor: WebContentExtractor):
"""
Initialize the trend analysis engine.
Args:
llm: Language model interface for analysis
search_provider: Search interface for finding relevant content
content_extractor: Web content extraction utility
"""
self.llm = llm
self.search_provider = search_provider
self.content_extractor = content_extractor
# System prompt for trend analysis
self.system_prompt = """You are an expert trend researcher and analyst with deep knowledge of trend research methodologies. Your task is to analyze information about emerging patterns in various domains and classify them according to established trend research frameworks.
When analyzing trends, consider the following classification criteria:
A FAD is a short-lived phenomenon, typically lasting less than a year, with limited impact beyond a specific niche or community. Fads generate temporary excitement but lack the substance for long-term adoption.
A MICRO TREND affects a specific subculture or niche market, lasting one to three years. These trends have limited geographic or demographic reach but can be significant within their specific context.
A TREND represents a significant pattern of change lasting three to ten years, affecting entire industries or substantial market segments. Trends reshape business practices, consumer behavior, or technological approaches within specific domains.
A MACRO TREND spans ten to twenty years and affects multiple industries or sectors simultaneously. These trends represent fundamental shifts in how people work, live, or interact with technology.
A MEGA TREND encompasses twenty years or more and represents transformational changes that reshape society, economy, and technology on a global scale. Mega trends affect virtually all aspects of human activity.
Your analysis should be evidence-based, drawing on concrete indicators such as investment patterns, adoption rates, research activity, media coverage, and expert commentary. Always distinguish between hype and substance, and provide balanced assessments of both opportunities and challenges."""
def analyze_topic(self, topic_area: str, num_trends: int = 5) -> List[TrendAnalysis]:
"""
Analyze a topic area and identify emerging trends.
Args:
topic_area: Domain, discipline, market, or technical subject to analyze
num_trends: Number of trends to identify and analyze
Returns:
List of trend analyses
"""
print(f"Analyzing trends in: {topic_area}")
# Step 1: Generate search queries
search_queries = self._generate_search_queries(topic_area)
print(f"Generated {len(search_queries)} search queries")
# Step 2: Execute searches and collect results
all_results = []
for query in search_queries:
results = self.search_provider.search(query, num_results=10, time_filter='m')
all_results.extend(results)
time.sleep(1) # Rate limiting
print(f"Collected {len(all_results)} search results")
# Step 3: Extract content from top results
content_samples = self._extract_content_samples(all_results, max_samples=20)
print(f"Extracted content from {len(content_samples)} sources")
# Step 4: Identify potential trends
potential_trends = self._identify_trends(topic_area, content_samples)
print(f"Identified {len(potential_trends)} potential trends")
# Step 5: Analyze each trend in detail
trend_analyses = []
for trend_name in potential_trends[:num_trends]:
analysis = self._analyze_single_trend(topic_area, trend_name, all_results)
if analysis:
trend_analyses.append(analysis)
return trend_analyses
def _generate_search_queries(self, topic_area: str) -> List[str]:
"""
Generate effective search queries for the topic area.
Args:
topic_area: Topic to generate queries for
Returns:
List of search query strings
"""
messages = [
Message(role='system', content=self.system_prompt),
Message(role='user', content=f"""Generate 5 effective search queries to discover emerging trends in {topic_area}.
The queries should target:
1. Recent developments and innovations
2. Industry reports and forecasts
3. Research publications and breakthroughs
4. Market analysis and adoption patterns
5. Expert commentary and thought leadership
Return only the search queries, one per line, without numbering or additional explanation.""")
]
config = GenerationConfig(temperature=0.7, max_tokens=500)
response = self.llm.generate(messages, config)
# Parse queries from response
queries = [q.strip() for q in response.strip().split('\n') if q.strip()]
return queries
def _extract_content_samples(self, results: List[SearchResult], max_samples: int = 20) -> List[Dict[str, str]]:
"""
Extract content from search results.
Args:
results: List of search results
max_samples: Maximum number of content samples to extract
Returns:
List of dictionaries containing URL and extracted content
"""
content_samples = []
for result in results[:max_samples]:
content = self.content_extractor.extract_summary(result.url, max_length=2000)
if content:
content_samples.append({
'url': result.url,
'title': result.title,
'content': content
})
return content_samples
def _identify_trends(self, topic_area: str, content_samples: List[Dict[str, str]]) -> List[str]:
"""
Identify potential trends from content samples.
Args:
topic_area: Topic area being analyzed
content_samples: Extracted content from web sources
Returns:
List of trend names
"""
# Compile content summaries
content_summary = "\n\n".join([
f"Source: {sample['title']}\n{sample['content'][:500]}"
for sample in content_samples[:10]
])
messages = [
Message(role='system', content=self.system_prompt),
Message(role='user', content=f"""Based on the following content about {topic_area}, identify 5-7 distinct emerging trends or patterns.
Content samples:
{content_summary}
List the trend names only, one per line. Each trend name should be concise (2-5 words) and descriptive.""")
]
config = GenerationConfig(temperature=0.7, max_tokens=500)
response = self.llm.generate(messages, config)
# Parse trend names
trends = [t.strip() for t in response.strip().split('\n') if t.strip()]
return trends
def _analyze_single_trend(self, topic_area: str, trend_name: str, all_results: List[SearchResult]) -> Optional[TrendAnalysis]:
"""
Perform detailed analysis of a single trend.
Args:
topic_area: Topic area being analyzed
trend_name: Name of the trend to analyze
all_results: All search results for reference
Returns:
TrendAnalysis object or None if analysis fails
"""
# Find relevant sources for this specific trend
relevant_sources = self._find_relevant_sources(trend_name, all_results)
# Extract detailed content
detailed_content = []
for source in relevant_sources[:5]:
content = self.content_extractor.extract_summary(source.url, max_length=1500)
if content:
detailed_content.append({
'url': source.url,
'title': source.title,
'content': content
})
if not detailed_content:
return None
# Compile context for analysis
context = "\n\n".join([
f"Source: {item['title']}\nURL: {item['url']}\n{item['content']}"
for item in detailed_content
])
# Request comprehensive analysis
messages = [
Message(role='system', content=self.system_prompt),
Message(role='user', content=f"""Analyze the trend "{trend_name}" in the context of {topic_area}.
Based on the following sources, provide a comprehensive analysis:
{context}
Your analysis must include:
1. TREND CLASSIFICATION: Classify this as a fad, micro trend, trend, macro trend, or mega trend based on the criteria provided in your instructions.
2. SUMMARY: A concise 2-3 sentence summary of what this trend represents.
3. TECHNOLOGY IMPACT: How this trend affects or will affect technology development, including specific technologies, platforms, or approaches.
4. SCIENCE IMPACT: How this trend influences scientific research, methodologies, or understanding in relevant fields.
5. PRODUCT IMPACT: How this trend affects or will affect products, services, and market offerings.
6. KEY INDICATORS: List 3-5 specific, observable indicators that demonstrate this is a genuine trend rather than speculation.
7. TIME HORIZON: Estimated timeframe for significant impact (e.g., "1-2 years", "5-10 years").
8. CONFIDENCE LEVEL: Your confidence in this analysis on a scale of 0.0 to 1.0, with justification.
Format your response as follows:
CLASSIFICATION: [category]
SUMMARY: [summary text]
TECHNOLOGY_IMPACT: [impact description]
SCIENCE_IMPACT: [impact description]
PRODUCT_IMPACT: [impact description]
KEY_INDICATORS: [indicator 1] | [indicator 2] | [indicator 3]
TIME_HORIZON: [timeframe]
CONFIDENCE: [0.0-1.0]""")
]
config = GenerationConfig(temperature=0.3, max_tokens=2000)
response = self.llm.generate(messages, config)
# Parse the structured response
analysis_dict = self._parse_analysis_response(response)
if not analysis_dict:
return None
# Select recommended URLs
recommended_urls = [item['url'] for item in detailed_content[:3]]
# Create TrendAnalysis object
return TrendAnalysis(
trend_name=trend_name,
category=self._parse_category(analysis_dict.get('CLASSIFICATION', 'trend')),
summary=analysis_dict.get('SUMMARY', ''),
technology_impact=analysis_dict.get('TECHNOLOGY_IMPACT', ''),
science_impact=analysis_dict.get('SCIENCE_IMPACT', ''),
product_impact=analysis_dict.get('PRODUCT_IMPACT', ''),
key_indicators=analysis_dict.get('KEY_INDICATORS', '').split('|'),
time_horizon=analysis_dict.get('TIME_HORIZON', ''),
confidence_level=float(analysis_dict.get('CONFIDENCE', '0.5')),
sources=relevant_sources[:5],
recommended_urls=recommended_urls
)
def _find_relevant_sources(self, trend_name: str, all_results: List[SearchResult]) -> List[SearchResult]:
"""
Find search results most relevant to a specific trend.
Args:
trend_name: Name of the trend
all_results: All available search results
Returns:
Filtered and sorted list of relevant results
"""
# Simple relevance scoring based on keyword matching
scored_results = []
trend_keywords = set(trend_name.lower().split())
for result in all_results:
text = f"{result.title} {result.snippet}".lower()
score = sum(1 for keyword in trend_keywords if keyword in text)
if score > 0:
scored_results.append((score, result))
# Sort by relevance score
scored_results.sort(reverse=True, key=lambda x: x[0])
return [result for score, result in scored_results]
def _parse_analysis_response(self, response: str) -> Dict[str, str]:
"""
Parse structured analysis response from LLM.
Args:
response: LLM response text
Returns:
Dictionary of parsed fields
"""
result = {}
current_field = None
current_value = []
for line in response.split('\n'):
line = line.strip()
if not line:
continue
# Check if this is a field header
if ':' in line:
parts = line.split(':', 1)
field_name = parts[0].strip().upper()
# Save previous field if exists
if current_field:
result[current_field] = ' '.join(current_value).strip()
# Start new field
current_field = field_name
current_value = [parts[1].strip()] if len(parts) > 1 else []
elif current_field:
# Continue current field
current_value.append(line)
# Save last field
if current_field:
result[current_field] = ' '.join(current_value).strip()
return result
def _parse_category(self, category_str: str) -> TrendCategory:
"""
Parse trend category from string.
Args:
category_str: Category string from analysis
Returns:
TrendCategory enum value
"""
category_lower = category_str.lower().replace(' ', '_').replace('-', '_')
for category in TrendCategory:
if category.value in category_lower or category_lower in category.value:
return category
return TrendCategory.TREND # Default fallback
The TrendAnalysisEngine represents the heart of our system. It orchestrates the entire trend discovery workflow, from generating search queries through producing comprehensive trend analyses. The engine breaks down the complex task into manageable steps, each with a specific responsibility.
The query generation phase leverages the LLM to create targeted search queries that explore different facets of the topic area. Rather than using generic searches, the system generates queries designed to uncover recent developments, industry reports, research publications, market analyses, and expert commentary.
The content extraction phase retrieves and processes information from web sources, filtering and summarizing content to make it suitable for analysis. This step is crucial because raw web content often contains noise that can confuse the analysis process.
The trend identification phase analyzes the collected content to identify distinct patterns and emerging phenomena. The LLM examines the information holistically, looking for recurring themes, novel developments, and significant shifts in the domain.
Finally, the detailed analysis phase performs deep dives into each identified trend, classifying it according to trend research frameworks, assessing its impact across multiple dimensions, and providing evidence-based justifications for the classification.
STEP FOUR: CREATING THE USER INTERFACE AND ORCHESTRATION LAYER
With our core components in place, we need to create a user-facing interface that makes the trend discovery agent accessible and easy to use. This layer handles user input, manages the analysis workflow, and presents results in a clear, actionable format.
from typing import Optional
import json
class TrendDiscoveryAgent:
"""
Main interface for the trend discovery system.
Orchestrates all components and provides user-facing functionality.
"""
def __init__(self, llm: LLMInterface, search_provider: SearchInterface):
"""
Initialize the trend discovery agent.
Args:
llm: Language model interface
search_provider: Search provider interface
"""
self.llm = llm
self.search_provider = search_provider
self.content_extractor = WebContentExtractor()
self.analysis_engine = TrendAnalysisEngine(llm, search_provider, self.content_extractor)
def discover_trends(self, topic_area: str, num_trends: int = 5) -> str:
"""
Discover and analyze trends in a given topic area.
Args:
topic_area: Domain, discipline, market, or technical subject
num_trends: Number of trends to identify and analyze
Returns:
Formatted report of trend analyses
"""
print(f"\n{'='*80}")
print(f"TREND DISCOVERY AGENT")
print(f"Topic Area: {topic_area}")
print(f"{'='*80}\n")
# Display device information
device_info = self.llm.get_device_info()
print(f"Using {device_info['device_type'].upper()} acceleration")
if 'gpu_name' in device_info:
print(f"GPU: {device_info['gpu_name']}")
print()
# Execute trend analysis
trend_analyses = self.analysis_engine.analyze_topic(topic_area, num_trends)
# Format and return report
report = self._format_report(topic_area, trend_analyses)
return report
def _format_report(self, topic_area: str, analyses: List[TrendAnalysis]) -> str:
"""
Format trend analyses into a comprehensive report.
Args:
topic_area: Topic area analyzed
analyses: List of trend analyses
Returns:
Formatted report string
"""
report_lines = []
report_lines.append(f"\n{'='*80}")
report_lines.append(f"TREND ANALYSIS REPORT: {topic_area.upper()}")
report_lines.append(f"{'='*80}\n")
report_lines.append(f"Total Trends Identified: {len(analyses)}\n")
# Summary table
report_lines.append("TREND OVERVIEW")
report_lines.append("-" * 80)
for i, analysis in enumerate(analyses, 1):
report_lines.append(f"{i}. {analysis.trend_name}")
report_lines.append(f" Category: {analysis.category.value.replace('_', ' ').title()}")
report_lines.append(f" Confidence: {analysis.confidence_level:.2f}")
report_lines.append(f" Time Horizon: {analysis.time_horizon}")
report_lines.append("")
# Detailed analyses
for i, analysis in enumerate(analyses, 1):
report_lines.append(f"\n{'='*80}")
report_lines.append(f"TREND {i}: {analysis.trend_name.upper()}")
report_lines.append(f"{'='*80}\n")
report_lines.append(f"Classification: {analysis.category.value.replace('_', ' ').title()}")
report_lines.append(f"Confidence Level: {analysis.confidence_level:.2f}")
report_lines.append(f"Time Horizon: {analysis.time_horizon}\n")
report_lines.append("SUMMARY")
report_lines.append("-" * 80)
report_lines.append(self._wrap_text(analysis.summary, 80))
report_lines.append("")
report_lines.append("TECHNOLOGY IMPACT")
report_lines.append("-" * 80)
report_lines.append(self._wrap_text(analysis.technology_impact, 80))
report_lines.append("")
report_lines.append("SCIENCE IMPACT")
report_lines.append("-" * 80)
report_lines.append(self._wrap_text(analysis.science_impact, 80))
report_lines.append("")
report_lines.append("PRODUCT IMPACT")
report_lines.append("-" * 80)
report_lines.append(self._wrap_text(analysis.product_impact, 80))
report_lines.append("")
report_lines.append("KEY INDICATORS")
report_lines.append("-" * 80)
for indicator in analysis.key_indicators:
if indicator.strip():
report_lines.append(f" - {indicator.strip()}")
report_lines.append("")
report_lines.append("RECOMMENDED READING")
report_lines.append("-" * 80)
for url in analysis.recommended_urls:
report_lines.append(f" {url}")
report_lines.append("")
return "\n".join(report_lines)
def _wrap_text(self, text: str, width: int = 80) -> str:
"""
Wrap text to specified width while preserving words.
Args:
text: Text to wrap
width: Maximum line width
Returns:
Wrapped text
"""
words = text.split()
lines = []
current_line = []
current_length = 0
for word in words:
if current_length + len(word) + 1 <= width:
current_line.append(word)
current_length += len(word) + 1
else:
if current_line:
lines.append(' '.join(current_line))
current_line = [word]
current_length = len(word)
if current_line:
lines.append(' '.join(current_line))
return '\n'.join(lines)
def save_report(self, report: str, filename: str):
"""
Save trend report to a file.
Args:
report: Report text to save
filename: Output filename
"""
with open(filename, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\nReport saved to: {filename}")
def export_json(self, analyses: List[TrendAnalysis], filename: str):
"""
Export trend analyses to JSON format.
Args:
analyses: List of trend analyses
filename: Output filename
"""
data = {
'trends': [
{
'name': a.trend_name,
'category': a.category.value,
'summary': a.summary,
'technology_impact': a.technology_impact,
'science_impact': a.science_impact,
'product_impact': a.product_impact,
'key_indicators': a.key_indicators,
'time_horizon': a.time_horizon,
'confidence_level': a.confidence_level,
'recommended_urls': a.recommended_urls
}
for a in analyses
]
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
print(f"\nJSON export saved to: {filename}")
The TrendDiscoveryAgent class provides the primary interface for users to interact with the system. It encapsulates all the complexity of the underlying components and presents a simple, intuitive API. Users can discover trends with a single method call, and the system handles all the orchestration automatically.
The report formatting functionality creates human-readable output that presents trend analyses in a structured, easy-to-digest format. The report includes both a high-level overview and detailed analyses for each trend, making it suitable for both quick scanning and in-depth review.
STEP FIVE: ENHANCING TREND CLASSIFICATION WITH RESEARCH METHODOLOGIES
To ensure our trend classifications are rigorous and defensible, we need to incorporate established methodologies from trend research. This involves implementing scoring mechanisms that evaluate trends across multiple dimensions and apply objective criteria for classification.
from typing import Dict, List, Tuple
from dataclasses import dataclass
@dataclass
class TrendMetrics:
"""Quantitative metrics for trend evaluation."""
adoption_velocity: float # Rate of adoption (0.0 to 1.0)
market_breadth: float # Geographic and demographic reach (0.0 to 1.0)
investment_level: float # Financial investment and resources (0.0 to 1.0)
innovation_intensity: float # Degree of novelty and disruption (0.0 to 1.0)
media_attention: float # Level of media coverage and discussion (0.0 to 1.0)
expert_consensus: float # Agreement among domain experts (0.0 to 1.0)
sustainability: float # Long-term viability indicators (0.0 to 1.0)
class TrendClassifier:
"""
Advanced trend classification using multi-dimensional analysis.
Implements methodologies from academic trend research.
"""
def __init__(self, llm: LLMInterface):
"""
Initialize the trend classifier.
Args:
llm: Language model interface for metric extraction
"""
self.llm = llm
def extract_metrics(self, trend_name: str, content_samples: List[Dict[str, str]]) -> TrendMetrics:
"""
Extract quantitative metrics from content about a trend.
Args:
trend_name: Name of the trend
content_samples: Content samples discussing the trend
Returns:
TrendMetrics object with scored dimensions
"""
# Compile content for analysis
context = "\n\n".join([
f"{sample['title']}\n{sample['content'][:800]}"
for sample in content_samples[:5]
])
prompt = f"""Analyze the following content about the trend "{trend_name}" and score it across seven dimensions on a scale from 0.0 to 1.0.
Content:
{context}
Provide scores for each dimension based on evidence in the content:
1. ADOPTION_VELOCITY: How quickly is this trend being adopted? (0.0 = very slow/stagnant, 1.0 = explosive growth)
2. MARKET_BREADTH: How broad is the geographic and demographic reach? (0.0 = very narrow niche, 1.0 = global and cross-demographic)
3. INVESTMENT_LEVEL: What is the level of financial investment and resource allocation? (0.0 = minimal investment, 1.0 = massive investment from multiple sources)
4. INNOVATION_INTENSITY: How novel and disruptive is this trend? (0.0 = incremental improvement, 1.0 = paradigm-shifting innovation)
5. MEDIA_ATTENTION: What is the level of media coverage and public discussion? (0.0 = minimal coverage, 1.0 = extensive mainstream coverage)
6. EXPERT_CONSENSUS: What is the level of agreement among domain experts? (0.0 = highly controversial/disputed, 1.0 = strong expert consensus)
7. SUSTAINABILITY: What are the indicators of long-term viability? (0.0 = likely to fade quickly, 1.0 = strong fundamentals for longevity)
Format your response as:
ADOPTION_VELOCITY: [score]
MARKET_BREADTH: [score]
INVESTMENT_LEVEL: [score]
INNOVATION_INTENSITY: [score]
MEDIA_ATTENTION: [score]
EXPERT_CONSENSUS: [score]
SUSTAINABILITY: [score]"""
messages = [
Message(role='user', content=prompt)
]
config = GenerationConfig(temperature=0.2, max_tokens=500)
response = self.llm.generate(messages, config)
# Parse scores
scores = self._parse_metric_scores(response)
return TrendMetrics(
adoption_velocity=scores.get('ADOPTION_VELOCITY', 0.5),
market_breadth=scores.get('MARKET_BREADTH', 0.5),
investment_level=scores.get('INVESTMENT_LEVEL', 0.5),
innovation_intensity=scores.get('INNOVATION_INTENSITY', 0.5),
media_attention=scores.get('MEDIA_ATTENTION', 0.5),
expert_consensus=scores.get('EXPERT_CONSENSUS', 0.5),
sustainability=scores.get('SUSTAINABILITY', 0.5)
)
def _parse_metric_scores(self, response: str) -> Dict[str, float]:
"""
Parse metric scores from LLM response.
Args:
response: LLM response text
Returns:
Dictionary mapping metric names to scores
"""
scores = {}
for line in response.split('\n'):
line = line.strip()
if ':' in line:
parts = line.split(':', 1)
metric_name = parts[0].strip().upper()
try:
score_str = parts[1].strip()
score = float(score_str)
scores[metric_name] = max(0.0, min(1.0, score))
except ValueError:
continue
return scores
def classify_from_metrics(self, metrics: TrendMetrics) -> Tuple[TrendCategory, float]:
"""
Classify a trend based on its metrics.
Args:
metrics: TrendMetrics object
Returns:
Tuple of (TrendCategory, confidence_score)
"""
# Calculate composite scores for different aspects
reach_score = (metrics.market_breadth + metrics.adoption_velocity) / 2
impact_score = (metrics.innovation_intensity + metrics.investment_level) / 2
longevity_score = (metrics.sustainability + metrics.expert_consensus) / 2
# Overall trend strength
overall_strength = (reach_score + impact_score + longevity_score) / 3
# Classification logic based on research frameworks
if longevity_score < 0.3 or metrics.sustainability < 0.25:
category = TrendCategory.FAD
confidence = 0.7 + (0.3 * (1.0 - longevity_score))
elif reach_score < 0.4 and metrics.market_breadth < 0.35:
category = TrendCategory.MICRO_TREND
confidence = 0.6 + (0.3 * metrics.expert_consensus)
elif overall_strength >= 0.75 and longevity_score >= 0.7 and reach_score >= 0.7:
if metrics.market_breadth >= 0.8 and impact_score >= 0.75:
category = TrendCategory.MEGA_TREND
confidence = 0.65 + (0.35 * overall_strength)
else:
category = TrendCategory.MACRO_TREND
confidence = 0.7 + (0.25 * overall_strength)
elif overall_strength >= 0.5:
category = TrendCategory.TREND
confidence = 0.6 + (0.35 * overall_strength)
else:
category = TrendCategory.MICRO_TREND
confidence = 0.55 + (0.3 * overall_strength)
return category, confidence
The TrendClassifier implements a sophisticated multi-dimensional evaluation framework. Rather than relying solely on the LLM's judgment, it extracts quantitative metrics across seven key dimensions and applies objective classification rules based on these metrics. This approach combines the LLM's ability to understand nuanced content with rigorous analytical frameworks from trend research.
The seven dimensions capture different aspects of trend significance. Adoption velocity measures how quickly the trend is spreading. Market breadth assesses geographic and demographic reach. Investment level indicates the financial resources being allocated. Innovation intensity evaluates the degree of novelty and disruption. Media attention reflects public awareness and discussion. Expert consensus measures agreement among domain specialists. Sustainability assesses long-term viability indicators.
By scoring trends across these dimensions and applying classification rules, we ensure that our trend categorizations are defensible and grounded in observable evidence rather than subjective impressions.
STEP SIX: IMPLEMENTING CACHING AND OPTIMIZATION
To improve performance and reduce redundant computations, we implement caching mechanisms that store intermediate results and enable efficient reuse of previously analyzed content.
import hashlib
import pickle
import os
from pathlib import Path
from typing import Optional, Any
from datetime import datetime, timedelta
class CacheManager:
"""
Manages caching of search results, extracted content, and analyses.
Improves performance by avoiding redundant operations.
"""
def __init__(self, cache_dir: str = ".trend_cache", ttl_hours: int = 24):
"""
Initialize the cache manager.
Args:
cache_dir: Directory for cache storage
ttl_hours: Time-to-live for cached items in hours
"""
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.ttl = timedelta(hours=ttl_hours)
def _get_cache_key(self, key_data: str) -> str:
"""
Generate a cache key from input data.
Args:
key_data: Data to generate key from
Returns:
Cache key string
"""
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""
Retrieve an item from cache if it exists and is not expired.
Args:
key: Cache key
Returns:
Cached item or None if not found or expired
"""
cache_key = self._get_cache_key(key)
cache_file = self.cache_dir / f"{cache_key}.pkl"
if not cache_file.exists():
return None
# Check if cache has expired
file_time = datetime.fromtimestamp(cache_file.stat().st_mtime)
if datetime.now() - file_time > self.ttl:
cache_file.unlink()
return None
# Load cached data
try:
with open(cache_file, 'rb') as f:
return pickle.load(f)
except Exception as e:
print(f"Cache read error: {e}")
return None
def set(self, key: str, value: Any):
"""
Store an item in cache.
Args:
key: Cache key
value: Value to cache
"""
cache_key = self._get_cache_key(key)
cache_file = self.cache_dir / f"{cache_key}.pkl"
try:
with open(cache_file, 'wb') as f:
pickle.dump(value, f)
except Exception as e:
print(f"Cache write error: {e}")
def clear(self):
"""Clear all cached items."""
for cache_file in self.cache_dir.glob("*.pkl"):
cache_file.unlink()
The CacheManager provides a simple but effective caching layer that stores search results, extracted content, and intermediate analyses. By caching these expensive operations, we significantly reduce the time required for subsequent analyses of the same or similar topics. The time-to-live mechanism ensures that cached data remains reasonably fresh while still providing performance benefits.
PRODUCTION-READY COMPLETE IMPLEMENTATION
The following complete implementation integrates all components into a production-ready system. This code represents a fully functional trend discovery agent that can be deployed and used immediately.
#!/usr/bin/env python3
"""
Trend Discovery Agent - Production Implementation
A comprehensive LLM-powered system for discovering and analyzing emerging trends
in any topic area. Supports both local and remote LLMs with GPU acceleration.
Usage:
python trend_agent.py --topic "Artificial Intelligence" --num-trends 5
"""
import argparse
import sys
import time
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Optional, Any, Tuple
from datetime import datetime, timedelta
from enum import Enum
import hashlib
import pickle
from pathlib import Path
# Third-party imports
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
import re
# ============================================================================
# DATA STRUCTURES
# ============================================================================
@dataclass
class GenerationConfig:
"""Configuration parameters for text generation."""
temperature: float = 0.7
max_tokens: int = 2048
top_p: float = 0.9
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
stop_sequences: Optional[List[str]] = None
@dataclass
class Message:
"""Represents a single message in a conversation."""
role: str
content: str
@dataclass
class SearchResult:
"""Represents a single search result."""
title: str
url: str
snippet: str
published_date: Optional[datetime] = None
source: Optional[str] = None
class TrendCategory(Enum):
"""Classification categories for identified trends."""
FAD = "fad"
MICRO_TREND = "micro_trend"
TREND = "trend"
MACRO_TREND = "macro_trend"
MEGA_TREND = "mega_trend"
@dataclass
class TrendMetrics:
"""Quantitative metrics for trend evaluation."""
adoption_velocity: float
market_breadth: float
investment_level: float
innovation_intensity: float
media_attention: float
expert_consensus: float
sustainability: float
@dataclass
class TrendAnalysis:
"""Represents a complete trend analysis."""
trend_name: str
category: TrendCategory
summary: str
technology_impact: str
science_impact: str
product_impact: str
key_indicators: List[str]
time_horizon: str
confidence_level: float
sources: List[SearchResult]
recommended_urls: List[str]
metrics: Optional[TrendMetrics] = None
# ============================================================================
# LLM INTERFACE LAYER
# ============================================================================
class LLMInterface(ABC):
"""Abstract base class for all LLM implementations."""
@abstractmethod
def generate(self, messages: List[Message], config: GenerationConfig) -> str:
"""Generate a response based on the conversation history."""
pass
@abstractmethod
def get_device_info(self) -> Dict[str, Any]:
"""Retrieve information about the compute device being used."""
pass
class LocalLLMProvider(LLMInterface):
"""Local LLM implementation with automatic GPU acceleration."""
def __init__(self, model_name: str, device: Optional[str] = None):
"""Initialize the local LLM provider."""
self.model_name = model_name
self.device = self._determine_device(device)
print(f"Loading model {model_name} on {self.device}...")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
if self.device == 'cuda':
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
device_map='auto'
)
elif self.device == 'mps':
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16
).to('mps')
else:
self.model = AutoModelForCausalLM.from_pretrained(model_name)
self.model.to('cpu')
print(f"Model loaded successfully on {self.device}")
def _determine_device(self, preferred_device: Optional[str]) -> str:
"""Determine the optimal compute device."""
if preferred_device:
return preferred_device
if torch.cuda.is_available():
return 'cuda'
elif torch.backends.mps.is_available():
return 'mps'
else:
return 'cpu'
def generate(self, messages: List[Message], config: GenerationConfig) -> str:
"""Generate response using the local model."""
prompt = self._format_messages(messages)
inputs = self.tokenizer(prompt, return_tensors='pt').to(self.device)
gen_kwargs = {
'max_new_tokens': config.max_tokens,
'temperature': config.temperature,
'top_p': config.top_p,
'do_sample': True,
'pad_token_id': self.tokenizer.eos_token_id
}
with torch.no_grad():
outputs = self.model.generate(**inputs, **gen_kwargs)
full_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
response = full_response[len(prompt):].strip()
return response
def _format_messages(self, messages: List[Message]) -> str:
"""Format conversation messages into a prompt string."""
formatted_parts = []
for msg in messages:
if msg.role == 'system':
formatted_parts.append(f"System: {msg.content}")
elif msg.role == 'user':
formatted_parts.append(f"User: {msg.content}")
elif msg.role == 'assistant':
formatted_parts.append(f"Assistant: {msg.content}")
formatted_parts.append("Assistant:")
return "\n\n".join(formatted_parts)
def get_device_info(self) -> Dict[str, Any]:
"""Retrieve information about the compute device."""
info = {
'device_type': self.device,
'model_name': self.model_name
}
if self.device == 'cuda':
info['gpu_name'] = torch.cuda.get_device_name(0)
info['gpu_memory_total'] = torch.cuda.get_device_properties(0).total_memory
info['gpu_memory_allocated'] = torch.cuda.memory_allocated(0)
elif self.device == 'mps':
info['gpu_name'] = 'Apple Silicon'
return info
class RemoteLLMProvider(LLMInterface):
"""Remote LLM implementation for API-based services."""
def __init__(self, api_key: str, model_name: str, base_url: str = "https://api.openai.com/v1"):
"""Initialize the remote LLM provider."""
self.api_key = api_key
self.model_name = model_name
self.base_url = base_url
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
def generate(self, messages: List[Message], config: GenerationConfig) -> str:
"""Generate response using the remote API."""
api_messages = [
{'role': msg.role, 'content': msg.content}
for msg in messages
]
payload = {
'model': self.model_name,
'messages': api_messages,
'temperature': config.temperature,
'max_tokens': config.max_tokens,
'top_p': config.top_p,
'frequency_penalty': config.frequency_penalty,
'presence_penalty': config.presence_penalty
}
if config.stop_sequences:
payload['stop'] = config.stop_sequences
response = requests.post(
f'{self.base_url}/chat/completions',
headers=self.headers,
json=payload,
timeout=120
)
response.raise_for_status()
result = response.json()
return result['choices'][0]['message']['content']
def get_device_info(self) -> Dict[str, Any]:
"""Retrieve information about the remote service."""
return {
'device_type': 'remote',
'model_name': self.model_name,
'base_url': self.base_url
}
# ============================================================================
# SEARCH INTERFACE LAYER
# ============================================================================
class SearchInterface(ABC):
"""Abstract base class for search providers."""
@abstractmethod
def search(self, query: str, num_results: int = 10, time_filter: Optional[str] = None) -> List[SearchResult]:
"""Execute a search query and return results."""
pass
class DuckDuckGoSearchProvider(SearchInterface):
"""Search provider implementation using DuckDuckGo."""
def __init__(self):
"""Initialize the DuckDuckGo search provider."""
self.ddgs = DDGS()
def search(self, query: str, num_results: int = 10, time_filter: Optional[str] = None) -> List[SearchResult]:
"""Execute a search query using DuckDuckGo."""
try:
search_params = {'max_results': num_results}
if time_filter:
search_params['timelimit'] = time_filter
results = list(self.ddgs.text(query, **search_params))
search_results = []
for result in results:
search_result = SearchResult(
title=result.get('title', ''),
url=result.get('href', ''),
snippet=result.get('body', ''),
source=result.get('source', None)
)
search_results.append(search_result)
return search_results
except Exception as e:
print(f"Search error: {str(e)}")
return []
# ============================================================================
# WEB CONTENT EXTRACTION
# ============================================================================
class WebContentExtractor:
"""Extracts main textual content from web pages."""
def __init__(self, timeout: int = 10):
"""Initialize the web content extractor."""
self.timeout = timeout
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def extract_content(self, url: str) -> Optional[str]:
"""Extract main textual content from a web page."""
try:
response = requests.get(url, headers=self.headers, timeout=self.timeout)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):
element.decompose()
main_content = soup.find('main') or soup.find('article') or soup.find('body')
if not main_content:
return None
text = main_content.get_text(separator='\n', strip=True)
text = re.sub(r'\n\s*\n', '\n\n', text)
text = re.sub(r' +', ' ', text)
return text
except Exception as e:
print(f"Content extraction error for {url}: {str(e)}")
return None
def extract_summary(self, url: str, max_length: int = 1000) -> Optional[str]:
"""Extract a summary of the web page content."""
content = self.extract_content(url)
if not content:
return None
if len(content) <= max_length:
return content
truncated = content[:max_length]
last_period = truncated.rfind('.')
if last_period > max_length * 0.7:
return truncated[:last_period + 1]
else:
return truncated + '...'
# ============================================================================
# CACHE MANAGEMENT
# ============================================================================
class CacheManager:
"""Manages caching of search results and analyses."""
def __init__(self, cache_dir: str = ".trend_cache", ttl_hours: int = 24):
"""Initialize the cache manager."""
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.ttl = timedelta(hours=ttl_hours)
def _get_cache_key(self, key_data: str) -> str:
"""Generate a cache key from input data."""
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""Retrieve an item from cache if it exists and is not expired."""
cache_key = self._get_cache_key(key)
cache_file = self.cache_dir / f"{cache_key}.pkl"
if not cache_file.exists():
return None
file_time = datetime.fromtimestamp(cache_file.stat().st_mtime)
if datetime.now() - file_time > self.ttl:
cache_file.unlink()
return None
try:
with open(cache_file, 'rb') as f:
return pickle.load(f)
except Exception:
return None
def set(self, key: str, value: Any):
"""Store an item in cache."""
cache_key = self._get_cache_key(key)
cache_file = self.cache_dir / f"{cache_key}.pkl"
try:
with open(cache_file, 'wb') as f:
pickle.dump(value, f)
except Exception as e:
print(f"Cache write error: {e}")
def clear(self):
"""Clear all cached items."""
for cache_file in self.cache_dir.glob("*.pkl"):
cache_file.unlink()
# ============================================================================
# TREND CLASSIFICATION
# ============================================================================
class TrendClassifier:
"""Advanced trend classification using multi-dimensional analysis."""
def __init__(self, llm: LLMInterface):
"""Initialize the trend classifier."""
self.llm = llm
def extract_metrics(self, trend_name: str, content_samples: List[Dict[str, str]]) -> TrendMetrics:
"""Extract quantitative metrics from content about a trend."""
context = "\n\n".join([
f"{sample['title']}\n{sample['content'][:800]}"
for sample in content_samples[:5]
])
prompt = f"""Analyze the following content about the trend "{trend_name}" and score it across seven dimensions on a scale from 0.0 to 1.0.
Content:
{context}
Provide scores for each dimension based on evidence in the content:
1. ADOPTION_VELOCITY: How quickly is this trend being adopted? (0.0 = very slow/stagnant, 1.0 = explosive growth)
2. MARKET_BREADTH: How broad is the geographic and demographic reach? (0.0 = very narrow niche, 1.0 = global and cross-demographic)
3. INVESTMENT_LEVEL: What is the level of financial investment and resource allocation? (0.0 = minimal investment, 1.0 = massive investment from multiple sources)
4. INNOVATION_INTENSITY: How novel and disruptive is this trend? (0.0 = incremental improvement, 1.0 = paradigm-shifting innovation)
5. MEDIA_ATTENTION: What is the level of media coverage and public discussion? (0.0 = minimal coverage, 1.0 = extensive mainstream coverage)
6. EXPERT_CONSENSUS: What is the level of agreement among domain experts? (0.0 = highly controversial/disputed, 1.0 = strong expert consensus)
7. SUSTAINABILITY: What are the indicators of long-term viability? (0.0 = likely to fade quickly, 1.0 = strong fundamentals for longevity)
Format your response as:
ADOPTION_VELOCITY: [score]
MARKET_BREADTH: [score]
INVESTMENT_LEVEL: [score]
INNOVATION_INTENSITY: [score]
MEDIA_ATTENTION: [score]
EXPERT_CONSENSUS: [score]
SUSTAINABILITY: [score]"""
messages = [Message(role='user', content=prompt)]
config = GenerationConfig(temperature=0.2, max_tokens=500)
response = self.llm.generate(messages, config)
scores = self._parse_metric_scores(response)
return TrendMetrics(
adoption_velocity=scores.get('ADOPTION_VELOCITY', 0.5),
market_breadth=scores.get('MARKET_BREADTH', 0.5),
investment_level=scores.get('INVESTMENT_LEVEL', 0.5),
innovation_intensity=scores.get('INNOVATION_INTENSITY', 0.5),
media_attention=scores.get('MEDIA_ATTENTION', 0.5),
expert_consensus=scores.get('EXPERT_CONSENSUS', 0.5),
sustainability=scores.get('SUSTAINABILITY', 0.5)
)
def _parse_metric_scores(self, response: str) -> Dict[str, float]:
"""Parse metric scores from LLM response."""
scores = {}
for line in response.split('\n'):
line = line.strip()
if ':' in line:
parts = line.split(':', 1)
metric_name = parts[0].strip().upper()
try:
score_str = parts[1].strip()
score = float(score_str)
scores[metric_name] = max(0.0, min(1.0, score))
except ValueError:
continue
return scores
def classify_from_metrics(self, metrics: TrendMetrics) -> Tuple[TrendCategory, float]:
"""Classify a trend based on its metrics."""
reach_score = (metrics.market_breadth + metrics.adoption_velocity) / 2
impact_score = (metrics.innovation_intensity + metrics.investment_level) / 2
longevity_score = (metrics.sustainability + metrics.expert_consensus) / 2
overall_strength = (reach_score + impact_score + longevity_score) / 3
if longevity_score < 0.3 or metrics.sustainability < 0.25:
category = TrendCategory.FAD
confidence = 0.7 + (0.3 * (1.0 - longevity_score))
elif reach_score < 0.4 and metrics.market_breadth < 0.35:
category = TrendCategory.MICRO_TREND
confidence = 0.6 + (0.3 * metrics.expert_consensus)
elif overall_strength >= 0.75 and longevity_score >= 0.7 and reach_score >= 0.7:
if metrics.market_breadth >= 0.8 and impact_score >= 0.75:
category = TrendCategory.MEGA_TREND
confidence = 0.65 + (0.35 * overall_strength)
else:
category = TrendCategory.MACRO_TREND
confidence = 0.7 + (0.25 * overall_strength)
elif overall_strength >= 0.5:
category = TrendCategory.TREND
confidence = 0.6 + (0.35 * overall_strength)
else:
category = TrendCategory.MICRO_TREND
confidence = 0.55 + (0.3 * overall_strength)
return category, confidence
# ============================================================================
# TREND ANALYSIS ENGINE
# ============================================================================
class TrendAnalysisEngine:
"""Core engine for discovering and analyzing trends."""
def __init__(self, llm: LLMInterface, search_provider: SearchInterface,
content_extractor: WebContentExtractor, cache_manager: CacheManager):
"""Initialize the trend analysis engine."""
self.llm = llm
self.search_provider = search_provider
self.content_extractor = content_extractor
self.cache_manager = cache_manager
self.classifier = TrendClassifier(llm)
self.system_prompt = """You are an expert trend researcher and analyst with deep knowledge of trend research methodologies. Your task is to analyze information about emerging patterns in various domains and classify them according to established trend research frameworks.
When analyzing trends, consider the following classification criteria:
A FAD is a short-lived phenomenon, typically lasting less than a year, with limited impact beyond a specific niche or community. Fads generate temporary excitement but lack the substance for long-term adoption.
A MICRO TREND affects a specific subculture or niche market, lasting one to three years. These trends have limited geographic or demographic reach but can be significant within their specific context.
A TREND represents a significant pattern of change lasting three to ten years, affecting entire industries or substantial market segments. Trends reshape business practices, consumer behavior, or technological approaches within specific domains.
A MACRO TREND spans ten to twenty years and affects multiple industries or sectors simultaneously. These trends represent fundamental shifts in how people work, live, or interact with technology.
A MEGA TREND encompasses twenty years or more and represents transformational changes that reshape society, economy, and technology on a global scale. Mega trends affect virtually all aspects of human activity.
Your analysis should be evidence-based, drawing on concrete indicators such as investment patterns, adoption rates, research activity, media coverage, and expert commentary. Always distinguish between hype and substance, and provide balanced assessments of both opportunities and challenges."""
def analyze_topic(self, topic_area: str, num_trends: int = 5) -> List[TrendAnalysis]:
"""Analyze a topic area and identify emerging trends."""
print(f"Analyzing trends in: {topic_area}")
cache_key = f"analysis_{topic_area}_{num_trends}"
cached_result = self.cache_manager.get(cache_key)
if cached_result:
print("Using cached analysis results")
return cached_result
search_queries = self._generate_search_queries(topic_area)
print(f"Generated {len(search_queries)} search queries")
all_results = []
for query in search_queries:
results = self.search_provider.search(query, num_results=10, time_filter='m')
all_results.extend(results)
time.sleep(1)
print(f"Collected {len(all_results)} search results")
content_samples = self._extract_content_samples(all_results, max_samples=20)
print(f"Extracted content from {len(content_samples)} sources")
potential_trends = self._identify_trends(topic_area, content_samples)
print(f"Identified {len(potential_trends)} potential trends")
trend_analyses = []
for trend_name in potential_trends[:num_trends]:
analysis = self._analyze_single_trend(topic_area, trend_name, all_results, content_samples)
if analysis:
trend_analyses.append(analysis)
self.cache_manager.set(cache_key, trend_analyses)
return trend_analyses
def _generate_search_queries(self, topic_area: str) -> List[str]:
"""Generate effective search queries for the topic area."""
cache_key = f"queries_{topic_area}"
cached_queries = self.cache_manager.get(cache_key)
if cached_queries:
return cached_queries
messages = [
Message(role='system', content=self.system_prompt),
Message(role='user', content=f"""Generate 5 effective search queries to discover emerging trends in {topic_area}.
The queries should target:
1. Recent developments and innovations
2. Industry reports and forecasts
3. Research publications and breakthroughs
4. Market analysis and adoption patterns
5. Expert commentary and thought leadership
Return only the search queries, one per line, without numbering or additional explanation.""")
]
config = GenerationConfig(temperature=0.7, max_tokens=500)
response = self.llm.generate(messages, config)
queries = [q.strip() for q in response.strip().split('\n') if q.strip()]
self.cache_manager.set(cache_key, queries)
return queries
def _extract_content_samples(self, results: List[SearchResult], max_samples: int = 20) -> List[Dict[str, str]]:
"""Extract content from search results."""
content_samples = []
for result in results[:max_samples]:
cache_key = f"content_{result.url}"
cached_content = self.cache_manager.get(cache_key)
if cached_content:
content = cached_content
else:
content = self.content_extractor.extract_summary(result.url, max_length=2000)
if content:
self.cache_manager.set(cache_key, content)
if content:
content_samples.append({
'url': result.url,
'title': result.title,
'content': content
})
return content_samples
def _identify_trends(self, topic_area: str, content_samples: List[Dict[str, str]]) -> List[str]:
"""Identify potential trends from content samples."""
content_summary = "\n\n".join([
f"Source: {sample['title']}\n{sample['content'][:500]}"
for sample in content_samples[:10]
])
messages = [
Message(role='system', content=self.system_prompt),
Message(role='user', content=f"""Based on the following content about {topic_area}, identify 5-7 distinct emerging trends or patterns.
Content samples:
{content_summary}
List the trend names only, one per line. Each trend name should be concise (2-5 words) and descriptive.""")
]
config = GenerationConfig(temperature=0.7, max_tokens=500)
response = self.llm.generate(messages, config)
trends = [t.strip() for t in response.strip().split('\n') if t.strip()]
return trends
def _analyze_single_trend(self, topic_area: str, trend_name: str,
all_results: List[SearchResult],
content_samples: List[Dict[str, str]]) -> Optional[TrendAnalysis]:
"""Perform detailed analysis of a single trend."""
relevant_sources = self._find_relevant_sources(trend_name, all_results)
detailed_content = []
for source in relevant_sources[:5]:
cache_key = f"content_{source.url}"
cached_content = self.cache_manager.get(cache_key)
if cached_content:
content = cached_content
else:
content = self.content_extractor.extract_summary(source.url, max_length=1500)
if content:
self.cache_manager.set(cache_key, content)
if content:
detailed_content.append({
'url': source.url,
'title': source.title,
'content': content
})
if not detailed_content:
return None
metrics = self.classifier.extract_metrics(trend_name, detailed_content)
category, confidence = self.classifier.classify_from_metrics(metrics)
context = "\n\n".join([
f"Source: {item['title']}\nURL: {item['url']}\n{item['content']}"
for item in detailed_content
])
messages = [
Message(role='system', content=self.system_prompt),
Message(role='user', content=f"""Analyze the trend "{trend_name}" in the context of {topic_area}.
Based on the following sources, provide a comprehensive analysis:
{context}
Your analysis must include:
1. SUMMARY: A concise 2-3 sentence summary of what this trend represents.
2. TECHNOLOGY_IMPACT: How this trend affects or will affect technology development, including specific technologies, platforms, or approaches.
3. SCIENCE_IMPACT: How this trend influences scientific research, methodologies, or understanding in relevant fields.
4. PRODUCT_IMPACT: How this trend affects or will affect products, services, and market offerings.
5. KEY_INDICATORS: List 3-5 specific, observable indicators that demonstrate this is a genuine trend rather than speculation.
6. TIME_HORIZON: Estimated timeframe for significant impact (e.g., "1-2 years", "5-10 years").
Format your response as follows:
SUMMARY: [summary text]
TECHNOLOGY_IMPACT: [impact description]
SCIENCE_IMPACT: [impact description]
PRODUCT_IMPACT: [impact description]
KEY_INDICATORS: [indicator 1] | [indicator 2] | [indicator 3]
TIME_HORIZON: [timeframe]""")
]
config = GenerationConfig(temperature=0.3, max_tokens=2000)
response = self.llm.generate(messages, config)
analysis_dict = self._parse_analysis_response(response)
if not analysis_dict:
return None
recommended_urls = [item['url'] for item in detailed_content[:3]]
return TrendAnalysis(
trend_name=trend_name,
category=category,
summary=analysis_dict.get('SUMMARY', ''),
technology_impact=analysis_dict.get('TECHNOLOGY_IMPACT', ''),
science_impact=analysis_dict.get('SCIENCE_IMPACT', ''),
product_impact=analysis_dict.get('PRODUCT_IMPACT', ''),
key_indicators=[k.strip() for k in analysis_dict.get('KEY_INDICATORS', '').split('|') if k.strip()],
time_horizon=analysis_dict.get('TIME_HORIZON', ''),
confidence_level=confidence,
sources=relevant_sources[:5],
recommended_urls=recommended_urls,
metrics=metrics
)
def _find_relevant_sources(self, trend_name: str, all_results: List[SearchResult]) -> List[SearchResult]:
"""Find search results most relevant to a specific trend."""
scored_results = []
trend_keywords = set(trend_name.lower().split())
for result in all_results:
text = f"{result.title} {result.snippet}".lower()
score = sum(1 for keyword in trend_keywords if keyword in text)
if score > 0:
scored_results.append((score, result))
scored_results.sort(reverse=True, key=lambda x: x[0])
return [result for score, result in scored_results]
def _parse_analysis_response(self, response: str) -> Dict[str, str]:
"""Parse structured analysis response from LLM."""
result = {}
current_field = None
current_value = []
for line in response.split('\n'):
line = line.strip()
if not line:
continue
if ':' in line:
parts = line.split(':', 1)
field_name = parts[0].strip().upper()
if current_field:
result[current_field] = ' '.join(current_value).strip()
current_field = field_name
current_value = [parts[1].strip()] if len(parts) > 1 else []
elif current_field:
current_value.append(line)
if current_field:
result[current_field] = ' '.join(current_value).strip()
return result
# ============================================================================
# MAIN AGENT INTERFACE
# ============================================================================
class TrendDiscoveryAgent:
"""Main interface for the trend discovery system."""
def __init__(self, llm: LLMInterface, search_provider: SearchInterface, cache_manager: CacheManager):
"""Initialize the trend discovery agent."""
self.llm = llm
self.search_provider = search_provider
self.cache_manager = cache_manager
self.content_extractor = WebContentExtractor()
self.analysis_engine = TrendAnalysisEngine(llm, search_provider, self.content_extractor, cache_manager)
def discover_trends(self, topic_area: str, num_trends: int = 5) -> str:
"""Discover and analyze trends in a given topic area."""
print(f"\n{'='*80}")
print(f"TREND DISCOVERY AGENT")
print(f"Topic Area: {topic_area}")
print(f"{'='*80}\n")
device_info = self.llm.get_device_info()
print(f"Using {device_info['device_type'].upper()} acceleration")
if 'gpu_name' in device_info:
print(f"GPU: {device_info['gpu_name']}")
print()
trend_analyses = self.analysis_engine.analyze_topic(topic_area, num_trends)
report = self._format_report(topic_area, trend_analyses)
return report
def _format_report(self, topic_area: str, analyses: List[TrendAnalysis]) -> str:
"""Format trend analyses into a comprehensive report."""
report_lines = []
report_lines.append(f"\n{'='*80}")
report_lines.append(f"TREND ANALYSIS REPORT: {topic_area.upper()}")
report_lines.append(f"{'='*80}\n")
report_lines.append(f"Total Trends Identified: {len(analyses)}\n")
report_lines.append("TREND OVERVIEW")
report_lines.append("-" * 80)
for i, analysis in enumerate(analyses, 1):
report_lines.append(f"{i}. {analysis.trend_name}")
report_lines.append(f" Category: {analysis.category.value.replace('_', ' ').title()}")
report_lines.append(f" Confidence: {analysis.confidence_level:.2f}")
report_lines.append(f" Time Horizon: {analysis.time_horizon}")
report_lines.append("")
for i, analysis in enumerate(analyses, 1):
report_lines.append(f"\n{'='*80}")
report_lines.append(f"TREND {i}: {analysis.trend_name.upper()}")
report_lines.append(f"{'='*80}\n")
report_lines.append(f"Classification: {analysis.category.value.replace('_', ' ').title()}")
report_lines.append(f"Confidence Level: {analysis.confidence_level:.2f}")
report_lines.append(f"Time Horizon: {analysis.time_horizon}\n")
if analysis.metrics:
report_lines.append("TREND METRICS")
report_lines.append("-" * 80)
report_lines.append(f"Adoption Velocity: {analysis.metrics.adoption_velocity:.2f}")
report_lines.append(f"Market Breadth: {analysis.metrics.market_breadth:.2f}")
report_lines.append(f"Investment Level: {analysis.metrics.investment_level:.2f}")
report_lines.append(f"Innovation Intensity: {analysis.metrics.innovation_intensity:.2f}")
report_lines.append(f"Media Attention: {analysis.metrics.media_attention:.2f}")
report_lines.append(f"Expert Consensus: {analysis.metrics.expert_consensus:.2f}")
report_lines.append(f"Sustainability: {analysis.metrics.sustainability:.2f}")
report_lines.append("")
report_lines.append("SUMMARY")
report_lines.append("-" * 80)
report_lines.append(self._wrap_text(analysis.summary, 80))
report_lines.append("")
report_lines.append("TECHNOLOGY IMPACT")
report_lines.append("-" * 80)
report_lines.append(self._wrap_text(analysis.technology_impact, 80))
report_lines.append("")
report_lines.append("SCIENCE IMPACT")
report_lines.append("-" * 80)
report_lines.append(self._wrap_text(analysis.science_impact, 80))
report_lines.append("")
report_lines.append("PRODUCT IMPACT")
report_lines.append("-" * 80)
report_lines.append(self._wrap_text(analysis.product_impact, 80))
report_lines.append("")
report_lines.append("KEY INDICATORS")
report_lines.append("-" * 80)
for indicator in analysis.key_indicators:
if indicator.strip():
report_lines.append(f" - {indicator.strip()}")
report_lines.append("")
report_lines.append("RECOMMENDED READING")
report_lines.append("-" * 80)
for url in analysis.recommended_urls:
report_lines.append(f" {url}")
report_lines.append("")
return "\n".join(report_lines)
def _wrap_text(self, text: str, width: int = 80) -> str:
"""Wrap text to specified width while preserving words."""
words = text.split()
lines = []
current_line = []
current_length = 0
for word in words:
if current_length + len(word) + 1 <= width:
current_line.append(word)
current_length += len(word) + 1
else:
if current_line:
lines.append(' '.join(current_line))
current_line = [word]
current_length = len(word)
if current_line:
lines.append(' '.join(current_line))
return '\n'.join(lines)
# ============================================================================
# COMMAND LINE INTERFACE
# ============================================================================
def main():
"""Main entry point for the trend discovery agent."""
parser = argparse.ArgumentParser(
description='Discover and analyze emerging trends in any topic area'
)
parser.add_argument(
'--topic',
type=str,
required=True,
help='Topic area to analyze (e.g., "Artificial Intelligence", "3D Printing")'
)
parser.add_argument(
'--num-trends',
type=int,
default=5,
help='Number of trends to identify (default: 5)'
)
parser.add_argument(
'--llm-type',
type=str,
choices=['local', 'remote'],
default='remote',
help='Type of LLM to use (default: remote)'
)
parser.add_argument(
'--model',
type=str,
default='gpt-4',
help='Model name (default: gpt-4 for remote, or specify local model)'
)
parser.add_argument(
'--api-key',
type=str,
help='API key for remote LLM (can also use OPENAI_API_KEY env var)'
)
parser.add_argument(
'--device',
type=str,
choices=['cuda', 'mps', 'cpu'],
help='Device for local LLM (auto-detect if not specified)'
)
parser.add_argument(
'--output',
type=str,
help='Output file for the report (optional)'
)
parser.add_argument(
'--clear-cache',
action='store_true',
help='Clear the cache before running'
)
args = parser.parse_args()
cache_manager = CacheManager()
if args.clear_cache:
print("Clearing cache...")
cache_manager.clear()
if args.llm_type == 'local':
if not args.model or args.model == 'gpt-4':
print("Error: Please specify a local model name with --model")
sys.exit(1)
llm = LocalLLMProvider(args.model, args.device)
else:
api_key = args.api_key or os.environ.get('OPENAI_API_KEY')
if not api_key:
print("Error: API key required for remote LLM. Use --api-key or set OPENAI_API_KEY env var")
sys.exit(1)
llm = RemoteLLMProvider(api_key, args.model)
search_provider = DuckDuckGoSearchProvider()
agent = TrendDiscoveryAgent(llm, search_provider, cache_manager)
try:
report = agent.discover_trends(args.topic, args.num_trends)
print(report)
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\nReport saved to: {args.output}")
except KeyboardInterrupt:
print("\n\nAnalysis interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\nError during analysis: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()
This complete implementation provides a production-ready trend discovery agent that can be deployed immediately. The system supports both local and remote LLMs, automatically optimizes GPU usage, implements comprehensive caching, and produces detailed trend analyses based on rigorous methodologies from trend research.
To use the system, save the code to a file named trend_agent.py and install the required dependencies using pip install torch transformers requests beautifulsoup4 duckduckgo-search. Then run the agent with a command like python trend_agent.py --topic "Generative AI" --num-trends 5 --llm-type remote --api-key YOUR_API_KEY.
The system will automatically discover emerging trends, classify them according to established frameworks, assess their impact across technology, science, and products, and provide curated resources for further exploration. The multi-dimensional analysis ensures that trend classifications are defensible and evidence-based, while the caching system improves performance for repeated analyses.