Friday, May 08, 2026

BUILDING AN INTELLIGENT TREND DISCOVERY AGENT: A FULL GUIDE TO CREATING AN LLM-POWERED RESEARCH SYSTEM

 



INTRODUCTION


The exponential growth of information on the internet presents both opportunities and challenges for professionals seeking to stay current in their fields. Whether you are tracking developments in software engineering, artificial intelligence, robotics, generative AI, integrated development environments, 3D printing technologies, lasers, or astronomy, the sheer volume of data makes manual trend identification increasingly impractical. This article presents a comprehensive guide to building an LLM-powered agent that automatically discovers, analyzes, and categorizes emerging trends in any given topic area.

Our trend discovery agent combines the reasoning capabilities of large language models with real-time internet search functionality to identify and classify trends according to established frameworks from trend research. The system leverages GPU acceleration through NVIDIA CUDA or Apple Metal Performance Shaders to ensure optimal performance, supports both local and remote LLM deployments, and provides detailed analysis including trend classification, impact assessment, and curated resources for further exploration.



UNDERSTANDING THE PROBLEM DOMAIN


Before diving into implementation details, we must establish a clear understanding of what constitutes a trend and how trend research methodologies can inform our agent's design. In trend research, professionals distinguish between several categories of trends based on their scope, duration, and impact. A fad represents a short-lived phenomenon with limited lasting impact. A trend typically spans several years and affects specific industries or domains. A megatrend encompasses decades-long shifts that fundamentally reshape society, technology, and markets across multiple sectors.

Our agent must not only identify emerging patterns but also classify them appropriately, assess their potential impact on technology, science, and products, and provide substantive analysis that goes beyond simple keyword matching. This requires integrating multiple capabilities including web search, content analysis, pattern recognition, and structured reasoning about trend characteristics.



ARCHITECTURAL OVERVIEW


The trend discovery agent architecture consists of several interconnected components working in harmony. At the foundation lies the LLM interface layer, which abstracts the differences between local and remote language models while ensuring optimal GPU utilization. Above this sits the search orchestration layer, responsible for formulating effective search queries, retrieving relevant content, and managing the information gathering process. The analysis engine processes retrieved information to identify patterns, extract key insights, and classify trends according to established frameworks. Finally, the presentation layer structures the findings into coherent reports with proper citations and recommendations for further reading.

The system follows clean architecture principles by separating concerns into distinct layers with well-defined interfaces. This separation ensures that we can swap implementations, for example replacing one LLM provider with another, without affecting the rest of the system. The architecture also emphasizes testability, maintainability, and extensibility to accommodate future enhancements.



STEP ONE: ESTABLISHING THE LLM FOUNDATION


The first step in building our trend discovery agent involves creating a robust abstraction layer for language model interactions. This layer must handle both local models running on consumer hardware and remote API-based services while optimizing for available GPU resources.

We begin by defining a base interface that all LLM implementations must satisfy. This interface specifies methods for generating completions, managing conversation context, and configuring generation parameters.


from abc import ABC, abstractmethod

from typing import List, Dict, Optional, Any

from dataclasses import dataclass

import torch



@dataclass

class GenerationConfig:

    """Configuration parameters for text generation."""

    temperature: float = 0.7

    max_tokens: int = 2048

    top_p: float = 0.9

    frequency_penalty: float = 0.0

    presence_penalty: float = 0.0

    stop_sequences: Optional[List[str]] = None



@dataclass

class Message:

    """Represents a single message in a conversation."""

    role: str  # 'system', 'user', or 'assistant'

    content: str



class LLMInterface(ABC):

    """Abstract base class for all LLM implementations."""

    

    @abstractmethod

    def generate(self, messages: List[Message], config: GenerationConfig) -> str:

        """

        Generate a response based on the conversation history.

        

        Args:

            messages: List of conversation messages

            config: Generation configuration parameters

            

        Returns:

            Generated text response

        """

        pass

    

    @abstractmethod

    def get_device_info(self) -> Dict[str, Any]:

        """

        Retrieve information about the compute device being used.

        

        Returns:

            Dictionary containing device type, name, and capabilities

        """

        pass



This interface provides the contract that all concrete implementations must fulfill. The Message dataclass encapsulates individual conversation turns, while GenerationConfig allows fine-grained control over the generation process. The get_device_info method enables monitoring and debugging of GPU utilization.

Now we implement a local LLM provider that leverages GPU acceleration through PyTorch. This implementation automatically detects available hardware and configures itself accordingly.


import torch

from transformers import AutoModelForCausalLM, AutoTokenizer

from typing import List, Dict, Any



class LocalLLMProvider(LLMInterface):

    """

    Local LLM implementation with automatic GPU acceleration.

    Supports NVIDIA CUDA and Apple Metal Performance Shaders.

    """

    

    def __init__(self, model_name: str, device: Optional[str] = None):

        """

        Initialize the local LLM provider.

        

        Args:

            model_name: HuggingFace model identifier

            device: Target device ('cuda', 'mps', 'cpu', or None for auto-detect)

        """

        self.model_name = model_name

        self.device = self._determine_device(device)

        

        # Load tokenizer and model with appropriate device mapping

        self.tokenizer = AutoTokenizer.from_pretrained(model_name)

        

        # Configure model loading based on available hardware

        if self.device == 'cuda':

            # Use CUDA with automatic mixed precision for optimal performance

            self.model = AutoModelForCausalLM.from_pretrained(

                model_name,

                torch_dtype=torch.float16,

                device_map='auto'

            )

        elif self.device == 'mps':

            # Apple Silicon optimization

            self.model = AutoModelForCausalLM.from_pretrained(

                model_name,

                torch_dtype=torch.float16

            ).to('mps')

        else:

            # CPU fallback

            self.model = AutoModelForCausalLM.from_pretrained(model_name)

            self.model.to('cpu')

    

    def _determine_device(self, preferred_device: Optional[str]) -> str:

        """

        Determine the optimal compute device.

        

        Args:

            preferred_device: User-specified device preference

            

        Returns:

            Device string ('cuda', 'mps', or 'cpu')

        """

        if preferred_device:

            return preferred_device

        

        # Auto-detect best available device

        if torch.cuda.is_available():

            return 'cuda'

        elif torch.backends.mps.is_available():

            return 'mps'

        else:

            return 'cpu'

    

    def generate(self, messages: List[Message], config: GenerationConfig) -> str:

        """Generate response using the local model."""

        # Format messages into a prompt string

        prompt = self._format_messages(messages)

        

        # Tokenize input

        inputs = self.tokenizer(prompt, return_tensors='pt').to(self.device)

        

        # Configure generation parameters

        gen_kwargs = {

            'max_new_tokens': config.max_tokens,

            'temperature': config.temperature,

            'top_p': config.top_p,

            'do_sample': True,

            'pad_token_id': self.tokenizer.eos_token_id

        }

        

        # Generate response

        with torch.no_grad():

            outputs = self.model.generate(**inputs, **gen_kwargs)

        

        # Decode and return only the new tokens

        full_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        response = full_response[len(prompt):].strip()

        

        return response

    

    def _format_messages(self, messages: List[Message]) -> str:

        """

        Format conversation messages into a prompt string.

        

        Args:

            messages: List of conversation messages

            

        Returns:

            Formatted prompt string

        """

        formatted_parts = []

        for msg in messages:

            if msg.role == 'system':

                formatted_parts.append(f"System: {msg.content}")

            elif msg.role == 'user':

                formatted_parts.append(f"User: {msg.content}")

            elif msg.role == 'assistant':

                formatted_parts.append(f"Assistant: {msg.content}")

        

        formatted_parts.append("Assistant:")

        return "\n\n".join(formatted_parts)

    

    def get_device_info(self) -> Dict[str, Any]:

        """Retrieve information about the compute device."""

        info = {

            'device_type': self.device,

            'model_name': self.model_name

        }

        

        if self.device == 'cuda':

            info['gpu_name'] = torch.cuda.get_device_name(0)

            info['gpu_memory_total'] = torch.cuda.get_device_properties(0).total_memory

            info['gpu_memory_allocated'] = torch.cuda.memory_allocated(0)

        elif self.device == 'mps':

            info['gpu_name'] = 'Apple Silicon'

        

        return info


The LocalLLMProvider class demonstrates several important design decisions. First, it automatically detects the best available hardware and configures PyTorch accordingly. When NVIDIA CUDA is available, it uses half-precision floating point arithmetic to maximize throughput and minimize memory consumption. For Apple Silicon devices, it leverages the Metal Performance Shaders backend. The implementation falls back gracefully to CPU execution when no GPU acceleration is available.

The message formatting logic converts our structured conversation history into a text prompt suitable for causal language models. This approach maintains conversation context while remaining compatible with various model architectures.

Next, we implement a remote LLM provider that interfaces with API-based services such as OpenAI, Anthropic, or other providers. This implementation shares the same interface, allowing seamless substitution.


import requests

from typing import List, Dict, Any

import os



class RemoteLLMProvider(LLMInterface):

    """

    Remote LLM implementation for API-based services.

    Supports OpenAI-compatible endpoints.

    """

    

    def __init__(self, api_key: str, model_name: str, base_url: str = "https://api.openai.com/v1"):

        """

        Initialize the remote LLM provider.

        

        Args:

            api_key: API authentication key

            model_name: Model identifier for the remote service

            base_url: Base URL for the API endpoint

        """

        self.api_key = api_key

        self.model_name = model_name

        self.base_url = base_url

        self.headers = {

            'Authorization': f'Bearer {api_key}',

            'Content-Type': 'application/json'

        }

    

    def generate(self, messages: List[Message], config: GenerationConfig) -> str:

        """Generate response using the remote API."""

        # Convert messages to API format

        api_messages = [

            {'role': msg.role, 'content': msg.content}

            for msg in messages

        ]

        

        # Prepare request payload

        payload = {

            'model': self.model_name,

            'messages': api_messages,

            'temperature': config.temperature,

            'max_tokens': config.max_tokens,

            'top_p': config.top_p,

            'frequency_penalty': config.frequency_penalty,

            'presence_penalty': config.presence_penalty

        }

        

        if config.stop_sequences:

            payload['stop'] = config.stop_sequences

        

        # Make API request

        response = requests.post(

            f'{self.base_url}/chat/completions',

            headers=self.headers,

            json=payload,

            timeout=120

        )

        

        response.raise_for_status()

        result = response.json()

        

        return result['choices'][0]['message']['content']

    

    def get_device_info(self) -> Dict[str, Any]:

        """Retrieve information about the remote service."""

        return {

            'device_type': 'remote',

            'model_name': self.model_name,

            'base_url': self.base_url

        }


The RemoteLLMProvider handles communication with external API services, managing authentication, request formatting, and error handling. By implementing the same LLMInterface, we ensure that the rest of our system remains agnostic to whether it is using a local or remote model.



STEP TWO: IMPLEMENTING WEB SEARCH CAPABILITIES


With our LLM foundation established, we now turn our attention to web search functionality. The trend discovery agent must be able to formulate effective search queries, retrieve relevant content from the internet, and extract meaningful information from web pages.

We begin by creating a search interface that abstracts different search providers. This allows us to support multiple search engines or services while maintaining a consistent interface.


from abc import ABC, abstractmethod

from dataclasses import dataclass

from typing import List, Optional

from datetime import datetime



@dataclass

class SearchResult:

    """Represents a single search result."""

    title: str

    url: str

    snippet: str

    published_date: Optional[datetime] = None

    source: Optional[str] = None



class SearchInterface(ABC):

    """Abstract base class for search providers."""

    

    @abstractmethod

    def search(self, query: str, num_results: int = 10, time_filter: Optional[str] = None) -> List[SearchResult]:

        """

        Execute a search query and return results.

        

        Args:

            query: Search query string

            num_results: Maximum number of results to return

            time_filter: Optional time filter ('day', 'week', 'month', 'year')

            

        Returns:

            List of search results

        """

        pass


Now we implement a concrete search provider using the DuckDuckGo search engine, which provides a free API without requiring authentication. This makes it ideal for our trend discovery agent.


from duckduckgo_search import DDGS

from typing import List, Optional

from datetime import datetime

import time



class DuckDuckGoSearchProvider(SearchInterface):

    """

    Search provider implementation using DuckDuckGo.

    Provides free, privacy-focused search without API keys.

    """

    

    def __init__(self):

        """Initialize the DuckDuckGo search provider."""

        self.ddgs = DDGS()

    

    def search(self, query: str, num_results: int = 10, time_filter: Optional[str] = None) -> List[SearchResult]:

        """

        Execute a search query using DuckDuckGo.

        

        Args:

            query: Search query string

            num_results: Maximum number of results to return

            time_filter: Optional time filter ('d' for day, 'w' for week, 'm' for month, 'y' for year)

            

        Returns:

            List of search results

        """

        try:

            # Execute search with optional time filter

            search_params = {'max_results': num_results}

            if time_filter:

                search_params['timelimit'] = time_filter

            

            results = list(self.ddgs.text(query, **search_params))

            

            # Convert to our SearchResult format

            search_results = []

            for result in results:

                search_result = SearchResult(

                    title=result.get('title', ''),

                    url=result.get('href', ''),

                    snippet=result.get('body', ''),

                    source=result.get('source', None)

                )

                search_results.append(search_result)

            

            return search_results

            

        except Exception as e:

            print(f"Search error: {str(e)}")

            return []


The DuckDuckGoSearchProvider wraps the DuckDuckGo search API and converts results into our standardized SearchResult format. This abstraction allows us to easily swap search providers if needed without affecting the rest of the system.

To extract meaningful content from web pages, we need a robust web scraping component that can handle various page structures and extract the main textual content while filtering out navigation, advertisements, and other non-essential elements.


import requests


from bs4 import BeautifulSoup

from typing import Optional

import re



class WebContentExtractor:

    """

    Extracts main textual content from web pages.

    Filters out navigation, ads, and other non-essential elements.

    """

    

    def __init__(self, timeout: int = 10):

        """

        Initialize the web content extractor.

        

        Args:

            timeout: Request timeout in seconds

        """

        self.timeout = timeout

        self.headers = {

            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'

        }

    

    def extract_content(self, url: str) -> Optional[str]:

        """

        Extract main textual content from a web page.

        

        Args:

            url: URL of the web page to extract content from

            

        Returns:

            Extracted text content or None if extraction fails

        """

        try:

            # Fetch the web page

            response = requests.get(url, headers=self.headers, timeout=self.timeout)

            response.raise_for_status()

            

            # Parse HTML

            soup = BeautifulSoup(response.content, 'html.parser')

            

            # Remove script and style elements

            for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):

                element.decompose()

            

            # Extract text from main content areas

            main_content = soup.find('main') or soup.find('article') or soup.find('body')

            

            if not main_content:

                return None

            

            # Get text and clean it

            text = main_content.get_text(separator='\n', strip=True)

            

            # Remove excessive whitespace

            text = re.sub(r'\n\s*\n', '\n\n', text)

            text = re.sub(r' +', ' ', text)

            

            return text

            

        except Exception as e:

            print(f"Content extraction error for {url}: {str(e)}")

            return None

    

    def extract_summary(self, url: str, max_length: int = 1000) -> Optional[str]:

        """

        Extract a summary of the web page content.

        

        Args:

            url: URL of the web page

            max_length: Maximum length of the summary in characters

            

        Returns:

            Summarized content or None if extraction fails

        """

        content = self.extract_content(url)

        

        if not content:

            return None

        

        # Take the first max_length characters, breaking at sentence boundaries

        if len(content) <= max_length:

            return content

        

        truncated = content[:max_length]

        last_period = truncated.rfind('.')

        

        if last_period > max_length * 0.7:

            return truncated[:last_period + 1]

        else:

            return truncated + '...'


The WebContentExtractor class provides methods for retrieving and cleaning web page content. It removes non-essential elements like scripts, styles, and navigation components, focusing on the main textual content. The extract_summary method provides a convenient way to get a condensed version of the content, which is useful when we need to process multiple sources efficiently.



STEP THREE: BUILDING THE TREND ANALYSIS ENGINE


With our LLM and search capabilities in place, we now construct the core trend analysis engine. This component orchestrates the entire trend discovery process, from query formulation through result synthesis.

The trend analysis engine must perform several sophisticated tasks. First, it generates effective search queries based on the user's topic area. Second, it retrieves and processes relevant web content. Third, it analyzes the collected information to identify patterns and emerging trends. Fourth, it classifies trends according to established frameworks. Finally, it synthesizes findings into a comprehensive report.


from typing import List, Dict, Any

from dataclasses import dataclass

from enum import Enum



class TrendCategory(Enum):

    """Classification categories for identified trends."""

    FAD = "fad"

    MICRO_TREND = "micro_trend"

    TREND = "trend"

    MACRO_TREND = "macro_trend"

    MEGA_TREND = "mega_trend"



@dataclass

class TrendAnalysis:

    """Represents a complete trend analysis."""

    trend_name: str

    category: TrendCategory

    summary: str

    technology_impact: str

    science_impact: str

    product_impact: str

    key_indicators: List[str]

    time_horizon: str

    confidence_level: float

    sources: List[SearchResult]

    recommended_urls: List[str]



class TrendAnalysisEngine:

    """

    Core engine for discovering and analyzing trends.

    Orchestrates search, content extraction, and LLM-based analysis.

    """

    

    def __init__(self, llm: LLMInterface, search_provider: SearchInterface, content_extractor: WebContentExtractor):

        """

        Initialize the trend analysis engine.

        

        Args:

            llm: Language model interface for analysis

            search_provider: Search interface for finding relevant content

            content_extractor: Web content extraction utility

        """

        self.llm = llm

        self.search_provider = search_provider

        self.content_extractor = content_extractor

        

        # System prompt for trend analysis

        self.system_prompt = """You are an expert trend researcher and analyst with deep knowledge of trend research methodologies. Your task is to analyze information about emerging patterns in various domains and classify them according to established trend research frameworks.


When analyzing trends, consider the following classification criteria:


A FAD is a short-lived phenomenon, typically lasting less than a year, with limited impact beyond a specific niche or community. Fads generate temporary excitement but lack the substance for long-term adoption.


A MICRO TREND affects a specific subculture or niche market, lasting one to three years. These trends have limited geographic or demographic reach but can be significant within their specific context.


A TREND represents a significant pattern of change lasting three to ten years, affecting entire industries or substantial market segments. Trends reshape business practices, consumer behavior, or technological approaches within specific domains.


A MACRO TREND spans ten to twenty years and affects multiple industries or sectors simultaneously. These trends represent fundamental shifts in how people work, live, or interact with technology.


A MEGA TREND encompasses twenty years or more and represents transformational changes that reshape society, economy, and technology on a global scale. Mega trends affect virtually all aspects of human activity.


Your analysis should be evidence-based, drawing on concrete indicators such as investment patterns, adoption rates, research activity, media coverage, and expert commentary. Always distinguish between hype and substance, and provide balanced assessments of both opportunities and challenges."""

    

    def analyze_topic(self, topic_area: str, num_trends: int = 5) -> List[TrendAnalysis]:

        """

        Analyze a topic area and identify emerging trends.

        

        Args:

            topic_area: Domain, discipline, market, or technical subject to analyze

            num_trends: Number of trends to identify and analyze

            

        Returns:

            List of trend analyses

        """

        print(f"Analyzing trends in: {topic_area}")

        

        # Step 1: Generate search queries

        search_queries = self._generate_search_queries(topic_area)

        print(f"Generated {len(search_queries)} search queries")

        

        # Step 2: Execute searches and collect results

        all_results = []

        for query in search_queries:

            results = self.search_provider.search(query, num_results=10, time_filter='m')

            all_results.extend(results)

            time.sleep(1)  # Rate limiting

        

        print(f"Collected {len(all_results)} search results")

        

        # Step 3: Extract content from top results

        content_samples = self._extract_content_samples(all_results, max_samples=20)

        print(f"Extracted content from {len(content_samples)} sources")

        

        # Step 4: Identify potential trends

        potential_trends = self._identify_trends(topic_area, content_samples)

        print(f"Identified {len(potential_trends)} potential trends")

        

        # Step 5: Analyze each trend in detail

        trend_analyses = []

        for trend_name in potential_trends[:num_trends]:

            analysis = self._analyze_single_trend(topic_area, trend_name, all_results)

            if analysis:

                trend_analyses.append(analysis)

        

        return trend_analyses

    

    def _generate_search_queries(self, topic_area: str) -> List[str]:

        """

        Generate effective search queries for the topic area.

        

        Args:

            topic_area: Topic to generate queries for

            

        Returns:

            List of search query strings

        """

        messages = [

            Message(role='system', content=self.system_prompt),

            Message(role='user', content=f"""Generate 5 effective search queries to discover emerging trends in {topic_area}. 


The queries should target:

1. Recent developments and innovations

2. Industry reports and forecasts

3. Research publications and breakthroughs

4. Market analysis and adoption patterns

5. Expert commentary and thought leadership


Return only the search queries, one per line, without numbering or additional explanation.""")

        ]

        

        config = GenerationConfig(temperature=0.7, max_tokens=500)

        response = self.llm.generate(messages, config)

        

        # Parse queries from response

        queries = [q.strip() for q in response.strip().split('\n') if q.strip()]

        return queries

    

    def _extract_content_samples(self, results: List[SearchResult], max_samples: int = 20) -> List[Dict[str, str]]:

        """

        Extract content from search results.

        

        Args:

            results: List of search results

            max_samples: Maximum number of content samples to extract

            

        Returns:

            List of dictionaries containing URL and extracted content

        """

        content_samples = []

        

        for result in results[:max_samples]:

            content = self.content_extractor.extract_summary(result.url, max_length=2000)

            if content:

                content_samples.append({

                    'url': result.url,

                    'title': result.title,

                    'content': content

                })

        

        return content_samples

    

    def _identify_trends(self, topic_area: str, content_samples: List[Dict[str, str]]) -> List[str]:

        """

        Identify potential trends from content samples.

        

        Args:

            topic_area: Topic area being analyzed

            content_samples: Extracted content from web sources

            

        Returns:

            List of trend names

        """

        # Compile content summaries

        content_summary = "\n\n".join([

            f"Source: {sample['title']}\n{sample['content'][:500]}"

            for sample in content_samples[:10]

        ])

        

        messages = [

            Message(role='system', content=self.system_prompt),

            Message(role='user', content=f"""Based on the following content about {topic_area}, identify 5-7 distinct emerging trends or patterns.


Content samples:

{content_summary}


List the trend names only, one per line. Each trend name should be concise (2-5 words) and descriptive.""")

        ]

        

        config = GenerationConfig(temperature=0.7, max_tokens=500)

        response = self.llm.generate(messages, config)

        

        # Parse trend names

        trends = [t.strip() for t in response.strip().split('\n') if t.strip()]

        return trends

    

    def _analyze_single_trend(self, topic_area: str, trend_name: str, all_results: List[SearchResult]) -> Optional[TrendAnalysis]:

        """

        Perform detailed analysis of a single trend.

        

        Args:

            topic_area: Topic area being analyzed

            trend_name: Name of the trend to analyze

            all_results: All search results for reference

            

        Returns:

            TrendAnalysis object or None if analysis fails

        """

        # Find relevant sources for this specific trend

        relevant_sources = self._find_relevant_sources(trend_name, all_results)

        

        # Extract detailed content

        detailed_content = []

        for source in relevant_sources[:5]:

            content = self.content_extractor.extract_summary(source.url, max_length=1500)

            if content:

                detailed_content.append({

                    'url': source.url,

                    'title': source.title,

                    'content': content

                })

        

        if not detailed_content:

            return None

        

        # Compile context for analysis

        context = "\n\n".join([

            f"Source: {item['title']}\nURL: {item['url']}\n{item['content']}"

            for item in detailed_content

        ])

        

        # Request comprehensive analysis

        messages = [

            Message(role='system', content=self.system_prompt),

            Message(role='user', content=f"""Analyze the trend "{trend_name}" in the context of {topic_area}.


Based on the following sources, provide a comprehensive analysis:


{context}


Your analysis must include:


1. TREND CLASSIFICATION: Classify this as a fad, micro trend, trend, macro trend, or mega trend based on the criteria provided in your instructions.


2. SUMMARY: A concise 2-3 sentence summary of what this trend represents.


3. TECHNOLOGY IMPACT: How this trend affects or will affect technology development, including specific technologies, platforms, or approaches.


4. SCIENCE IMPACT: How this trend influences scientific research, methodologies, or understanding in relevant fields.


5. PRODUCT IMPACT: How this trend affects or will affect products, services, and market offerings.


6. KEY INDICATORS: List 3-5 specific, observable indicators that demonstrate this is a genuine trend rather than speculation.


7. TIME HORIZON: Estimated timeframe for significant impact (e.g., "1-2 years", "5-10 years").


8. CONFIDENCE LEVEL: Your confidence in this analysis on a scale of 0.0 to 1.0, with justification.


Format your response as follows:

CLASSIFICATION: [category]

SUMMARY: [summary text]

TECHNOLOGY_IMPACT: [impact description]

SCIENCE_IMPACT: [impact description]

PRODUCT_IMPACT: [impact description]

KEY_INDICATORS: [indicator 1] | [indicator 2] | [indicator 3]

TIME_HORIZON: [timeframe]

CONFIDENCE: [0.0-1.0]""")

        ]

        

        config = GenerationConfig(temperature=0.3, max_tokens=2000)

        response = self.llm.generate(messages, config)

        

        # Parse the structured response

        analysis_dict = self._parse_analysis_response(response)

        

        if not analysis_dict:

            return None

        

        # Select recommended URLs

        recommended_urls = [item['url'] for item in detailed_content[:3]]

        

        # Create TrendAnalysis object

        return TrendAnalysis(

            trend_name=trend_name,

            category=self._parse_category(analysis_dict.get('CLASSIFICATION', 'trend')),

            summary=analysis_dict.get('SUMMARY', ''),

            technology_impact=analysis_dict.get('TECHNOLOGY_IMPACT', ''),

            science_impact=analysis_dict.get('SCIENCE_IMPACT', ''),

            product_impact=analysis_dict.get('PRODUCT_IMPACT', ''),

            key_indicators=analysis_dict.get('KEY_INDICATORS', '').split('|'),

            time_horizon=analysis_dict.get('TIME_HORIZON', ''),

            confidence_level=float(analysis_dict.get('CONFIDENCE', '0.5')),

            sources=relevant_sources[:5],

            recommended_urls=recommended_urls

        )

    

    def _find_relevant_sources(self, trend_name: str, all_results: List[SearchResult]) -> List[SearchResult]:

        """

        Find search results most relevant to a specific trend.

        

        Args:

            trend_name: Name of the trend

            all_results: All available search results

            

        Returns:

            Filtered and sorted list of relevant results

        """

        # Simple relevance scoring based on keyword matching

        scored_results = []

        trend_keywords = set(trend_name.lower().split())

        

        for result in all_results:

            text = f"{result.title} {result.snippet}".lower()

            score = sum(1 for keyword in trend_keywords if keyword in text)

            if score > 0:

                scored_results.append((score, result))

        

        # Sort by relevance score

        scored_results.sort(reverse=True, key=lambda x: x[0])

        

        return [result for score, result in scored_results]

    

    def _parse_analysis_response(self, response: str) -> Dict[str, str]:

        """

        Parse structured analysis response from LLM.

        

        Args:

            response: LLM response text

            

        Returns:

            Dictionary of parsed fields

        """

        result = {}

        current_field = None

        current_value = []

        

        for line in response.split('\n'):

            line = line.strip()

            if not line:

                continue

            

            # Check if this is a field header

            if ':' in line:

                parts = line.split(':', 1)

                field_name = parts[0].strip().upper()

                

                # Save previous field if exists

                if current_field:

                    result[current_field] = ' '.join(current_value).strip()

                

                # Start new field

                current_field = field_name

                current_value = [parts[1].strip()] if len(parts) > 1 else []

            elif current_field:

                # Continue current field

                current_value.append(line)

        

        # Save last field

        if current_field:

            result[current_field] = ' '.join(current_value).strip()

        

        return result

    

    def _parse_category(self, category_str: str) -> TrendCategory:

        """

        Parse trend category from string.

        

        Args:

            category_str: Category string from analysis

            

        Returns:

            TrendCategory enum value

        """

        category_lower = category_str.lower().replace(' ', '_').replace('-', '_')

        

        for category in TrendCategory:

            if category.value in category_lower or category_lower in category.value:

                return category

        

        return TrendCategory.TREND  # Default fallback


The TrendAnalysisEngine represents the heart of our system. It orchestrates the entire trend discovery workflow, from generating search queries through producing comprehensive trend analyses. The engine breaks down the complex task into manageable steps, each with a specific responsibility.

The query generation phase leverages the LLM to create targeted search queries that explore different facets of the topic area. Rather than using generic searches, the system generates queries designed to uncover recent developments, industry reports, research publications, market analyses, and expert commentary.

The content extraction phase retrieves and processes information from web sources, filtering and summarizing content to make it suitable for analysis. This step is crucial because raw web content often contains noise that can confuse the analysis process.

The trend identification phase analyzes the collected content to identify distinct patterns and emerging phenomena. The LLM examines the information holistically, looking for recurring themes, novel developments, and significant shifts in the domain.

Finally, the detailed analysis phase performs deep dives into each identified trend, classifying it according to trend research frameworks, assessing its impact across multiple dimensions, and providing evidence-based justifications for the classification.



STEP FOUR: CREATING THE USER INTERFACE AND ORCHESTRATION LAYER


With our core components in place, we need to create a user-facing interface that makes the trend discovery agent accessible and easy to use. This layer handles user input, manages the analysis workflow, and presents results in a clear, actionable format.


from typing import Optional

import json



class TrendDiscoveryAgent:

    """

    Main interface for the trend discovery system.

    Orchestrates all components and provides user-facing functionality.

    """

    

    def __init__(self, llm: LLMInterface, search_provider: SearchInterface):

        """

        Initialize the trend discovery agent.

        

        Args:

            llm: Language model interface

            search_provider: Search provider interface

        """

        self.llm = llm

        self.search_provider = search_provider

        self.content_extractor = WebContentExtractor()

        self.analysis_engine = TrendAnalysisEngine(llm, search_provider, self.content_extractor)

    

    def discover_trends(self, topic_area: str, num_trends: int = 5) -> str:

        """

        Discover and analyze trends in a given topic area.

        

        Args:

            topic_area: Domain, discipline, market, or technical subject

            num_trends: Number of trends to identify and analyze

            

        Returns:

            Formatted report of trend analyses

        """

        print(f"\n{'='*80}")

        print(f"TREND DISCOVERY AGENT")

        print(f"Topic Area: {topic_area}")

        print(f"{'='*80}\n")

        

        # Display device information

        device_info = self.llm.get_device_info()

        print(f"Using {device_info['device_type'].upper()} acceleration")

        if 'gpu_name' in device_info:

            print(f"GPU: {device_info['gpu_name']}")

        print()

        

        # Execute trend analysis

        trend_analyses = self.analysis_engine.analyze_topic(topic_area, num_trends)

        

        # Format and return report

        report = self._format_report(topic_area, trend_analyses)

        return report

    

    def _format_report(self, topic_area: str, analyses: List[TrendAnalysis]) -> str:

        """

        Format trend analyses into a comprehensive report.

        

        Args:

            topic_area: Topic area analyzed

            analyses: List of trend analyses

            

        Returns:

            Formatted report string

        """

        report_lines = []

        

        report_lines.append(f"\n{'='*80}")

        report_lines.append(f"TREND ANALYSIS REPORT: {topic_area.upper()}")

        report_lines.append(f"{'='*80}\n")

        

        report_lines.append(f"Total Trends Identified: {len(analyses)}\n")

        

        # Summary table

        report_lines.append("TREND OVERVIEW")

        report_lines.append("-" * 80)

        for i, analysis in enumerate(analyses, 1):

            report_lines.append(f"{i}. {analysis.trend_name}")

            report_lines.append(f"   Category: {analysis.category.value.replace('_', ' ').title()}")

            report_lines.append(f"   Confidence: {analysis.confidence_level:.2f}")

            report_lines.append(f"   Time Horizon: {analysis.time_horizon}")

            report_lines.append("")

        

        # Detailed analyses

        for i, analysis in enumerate(analyses, 1):

            report_lines.append(f"\n{'='*80}")

            report_lines.append(f"TREND {i}: {analysis.trend_name.upper()}")

            report_lines.append(f"{'='*80}\n")

            

            report_lines.append(f"Classification: {analysis.category.value.replace('_', ' ').title()}")

            report_lines.append(f"Confidence Level: {analysis.confidence_level:.2f}")

            report_lines.append(f"Time Horizon: {analysis.time_horizon}\n")

            

            report_lines.append("SUMMARY")

            report_lines.append("-" * 80)

            report_lines.append(self._wrap_text(analysis.summary, 80))

            report_lines.append("")

            

            report_lines.append("TECHNOLOGY IMPACT")

            report_lines.append("-" * 80)

            report_lines.append(self._wrap_text(analysis.technology_impact, 80))

            report_lines.append("")

            

            report_lines.append("SCIENCE IMPACT")

            report_lines.append("-" * 80)

            report_lines.append(self._wrap_text(analysis.science_impact, 80))

            report_lines.append("")

            

            report_lines.append("PRODUCT IMPACT")

            report_lines.append("-" * 80)

            report_lines.append(self._wrap_text(analysis.product_impact, 80))

            report_lines.append("")

            

            report_lines.append("KEY INDICATORS")

            report_lines.append("-" * 80)

            for indicator in analysis.key_indicators:

                if indicator.strip():

                    report_lines.append(f"  - {indicator.strip()}")

            report_lines.append("")

            

            report_lines.append("RECOMMENDED READING")

            report_lines.append("-" * 80)

            for url in analysis.recommended_urls:

                report_lines.append(f"  {url}")

            report_lines.append("")

        

        return "\n".join(report_lines)

    

    def _wrap_text(self, text: str, width: int = 80) -> str:

        """

        Wrap text to specified width while preserving words.

        

        Args:

            text: Text to wrap

            width: Maximum line width

            

        Returns:

            Wrapped text

        """

        words = text.split()

        lines = []

        current_line = []

        current_length = 0

        

        for word in words:

            if current_length + len(word) + 1 <= width:

                current_line.append(word)

                current_length += len(word) + 1

            else:

                if current_line:

                    lines.append(' '.join(current_line))

                current_line = [word]

                current_length = len(word)

        

        if current_line:

            lines.append(' '.join(current_line))

        

        return '\n'.join(lines)

    

    def save_report(self, report: str, filename: str):

        """

        Save trend report to a file.

        

        Args:

            report: Report text to save

            filename: Output filename

        """

        with open(filename, 'w', encoding='utf-8') as f:

            f.write(report)

        print(f"\nReport saved to: {filename}")

    

    def export_json(self, analyses: List[TrendAnalysis], filename: str):

        """

        Export trend analyses to JSON format.

        

        Args:

            analyses: List of trend analyses

            filename: Output filename

        """

        data = {

            'trends': [

                {

                    'name': a.trend_name,

                    'category': a.category.value,

                    'summary': a.summary,

                    'technology_impact': a.technology_impact,

                    'science_impact': a.science_impact,

                    'product_impact': a.product_impact,

                    'key_indicators': a.key_indicators,

                    'time_horizon': a.time_horizon,

                    'confidence_level': a.confidence_level,

                    'recommended_urls': a.recommended_urls

                }

                for a in analyses

            ]

        }

        

        with open(filename, 'w', encoding='utf-8') as f:

            json.dump(data, f, indent=2)

        

        print(f"\nJSON export saved to: {filename}")


The TrendDiscoveryAgent class provides the primary interface for users to interact with the system. It encapsulates all the complexity of the underlying components and presents a simple, intuitive API. Users can discover trends with a single method call, and the system handles all the orchestration automatically.

The report formatting functionality creates human-readable output that presents trend analyses in a structured, easy-to-digest format. The report includes both a high-level overview and detailed analyses for each trend, making it suitable for both quick scanning and in-depth review.



STEP FIVE: ENHANCING TREND CLASSIFICATION WITH RESEARCH METHODOLOGIES


To ensure our trend classifications are rigorous and defensible, we need to incorporate established methodologies from trend research. This involves implementing scoring mechanisms that evaluate trends across multiple dimensions and apply objective criteria for classification.


from typing import Dict, List, Tuple

from dataclasses import dataclass



@dataclass

class TrendMetrics:

    """Quantitative metrics for trend evaluation."""

    adoption_velocity: float  # Rate of adoption (0.0 to 1.0)

    market_breadth: float  # Geographic and demographic reach (0.0 to 1.0)

    investment_level: float  # Financial investment and resources (0.0 to 1.0)

    innovation_intensity: float  # Degree of novelty and disruption (0.0 to 1.0)

    media_attention: float  # Level of media coverage and discussion (0.0 to 1.0)

    expert_consensus: float  # Agreement among domain experts (0.0 to 1.0)

    sustainability: float  # Long-term viability indicators (0.0 to 1.0)



class TrendClassifier:

    """

    Advanced trend classification using multi-dimensional analysis.

    Implements methodologies from academic trend research.

    """

    

    def __init__(self, llm: LLMInterface):

        """

        Initialize the trend classifier.

        

        Args:

            llm: Language model interface for metric extraction

        """

        self.llm = llm

    

    def extract_metrics(self, trend_name: str, content_samples: List[Dict[str, str]]) -> TrendMetrics:

        """

        Extract quantitative metrics from content about a trend.

        

        Args:

            trend_name: Name of the trend

            content_samples: Content samples discussing the trend

            

        Returns:

            TrendMetrics object with scored dimensions

        """

        # Compile content for analysis

        context = "\n\n".join([

            f"{sample['title']}\n{sample['content'][:800]}"

            for sample in content_samples[:5]

        ])

        

        prompt = f"""Analyze the following content about the trend "{trend_name}" and score it across seven dimensions on a scale from 0.0 to 1.0.


Content:

{context}


Provide scores for each dimension based on evidence in the content:


1. ADOPTION_VELOCITY: How quickly is this trend being adopted? (0.0 = very slow/stagnant, 1.0 = explosive growth)


2. MARKET_BREADTH: How broad is the geographic and demographic reach? (0.0 = very narrow niche, 1.0 = global and cross-demographic)


3. INVESTMENT_LEVEL: What is the level of financial investment and resource allocation? (0.0 = minimal investment, 1.0 = massive investment from multiple sources)


4. INNOVATION_INTENSITY: How novel and disruptive is this trend? (0.0 = incremental improvement, 1.0 = paradigm-shifting innovation)


5. MEDIA_ATTENTION: What is the level of media coverage and public discussion? (0.0 = minimal coverage, 1.0 = extensive mainstream coverage)


6. EXPERT_CONSENSUS: What is the level of agreement among domain experts? (0.0 = highly controversial/disputed, 1.0 = strong expert consensus)


7. SUSTAINABILITY: What are the indicators of long-term viability? (0.0 = likely to fade quickly, 1.0 = strong fundamentals for longevity)


Format your response as:

ADOPTION_VELOCITY: [score]

MARKET_BREADTH: [score]

INVESTMENT_LEVEL: [score]

INNOVATION_INTENSITY: [score]

MEDIA_ATTENTION: [score]

EXPERT_CONSENSUS: [score]

SUSTAINABILITY: [score]"""

        

        messages = [

            Message(role='user', content=prompt)

        ]

        

        config = GenerationConfig(temperature=0.2, max_tokens=500)

        response = self.llm.generate(messages, config)

        

        # Parse scores

        scores = self._parse_metric_scores(response)

        

        return TrendMetrics(

            adoption_velocity=scores.get('ADOPTION_VELOCITY', 0.5),

            market_breadth=scores.get('MARKET_BREADTH', 0.5),

            investment_level=scores.get('INVESTMENT_LEVEL', 0.5),

            innovation_intensity=scores.get('INNOVATION_INTENSITY', 0.5),

            media_attention=scores.get('MEDIA_ATTENTION', 0.5),

            expert_consensus=scores.get('EXPERT_CONSENSUS', 0.5),

            sustainability=scores.get('SUSTAINABILITY', 0.5)

        )

    

    def _parse_metric_scores(self, response: str) -> Dict[str, float]:

        """

        Parse metric scores from LLM response.

        

        Args:

            response: LLM response text

            

        Returns:

            Dictionary mapping metric names to scores

        """

        scores = {}

        

        for line in response.split('\n'):

            line = line.strip()

            if ':' in line:

                parts = line.split(':', 1)

                metric_name = parts[0].strip().upper()

                try:

                    score_str = parts[1].strip()

                    score = float(score_str)

                    scores[metric_name] = max(0.0, min(1.0, score))

                except ValueError:

                    continue

        

        return scores

    

    def classify_from_metrics(self, metrics: TrendMetrics) -> Tuple[TrendCategory, float]:

        """

        Classify a trend based on its metrics.

        

        Args:

            metrics: TrendMetrics object

            

        Returns:

            Tuple of (TrendCategory, confidence_score)

        """

        # Calculate composite scores for different aspects

        reach_score = (metrics.market_breadth + metrics.adoption_velocity) / 2

        impact_score = (metrics.innovation_intensity + metrics.investment_level) / 2

        longevity_score = (metrics.sustainability + metrics.expert_consensus) / 2

        

        # Overall trend strength

        overall_strength = (reach_score + impact_score + longevity_score) / 3

        

        # Classification logic based on research frameworks

        if longevity_score < 0.3 or metrics.sustainability < 0.25:

            category = TrendCategory.FAD

            confidence = 0.7 + (0.3 * (1.0 - longevity_score))

        

        elif reach_score < 0.4 and metrics.market_breadth < 0.35:

            category = TrendCategory.MICRO_TREND

            confidence = 0.6 + (0.3 * metrics.expert_consensus)

        

        elif overall_strength >= 0.75 and longevity_score >= 0.7 and reach_score >= 0.7:

            if metrics.market_breadth >= 0.8 and impact_score >= 0.75:

                category = TrendCategory.MEGA_TREND

                confidence = 0.65 + (0.35 * overall_strength)

            else:

                category = TrendCategory.MACRO_TREND

                confidence = 0.7 + (0.25 * overall_strength)

        

        elif overall_strength >= 0.5:

            category = TrendCategory.TREND

            confidence = 0.6 + (0.35 * overall_strength)

        

        else:

            category = TrendCategory.MICRO_TREND

            confidence = 0.55 + (0.3 * overall_strength)

        

        return category, confidence


The TrendClassifier implements a sophisticated multi-dimensional evaluation framework. Rather than relying solely on the LLM's judgment, it extracts quantitative metrics across seven key dimensions and applies objective classification rules based on these metrics. This approach combines the LLM's ability to understand nuanced content with rigorous analytical frameworks from trend research.

The seven dimensions capture different aspects of trend significance. Adoption velocity measures how quickly the trend is spreading. Market breadth assesses geographic and demographic reach. Investment level indicates the financial resources being allocated. Innovation intensity evaluates the degree of novelty and disruption. Media attention reflects public awareness and discussion. Expert consensus measures agreement among domain specialists. Sustainability assesses long-term viability indicators.

By scoring trends across these dimensions and applying classification rules, we ensure that our trend categorizations are defensible and grounded in observable evidence rather than subjective impressions.



STEP SIX: IMPLEMENTING CACHING AND OPTIMIZATION


To improve performance and reduce redundant computations, we implement caching mechanisms that store intermediate results and enable efficient reuse of previously analyzed content.


import hashlib

import pickle

import os

from pathlib import Path

from typing import Optional, Any

from datetime import datetime, timedelta



class CacheManager:

    """

    Manages caching of search results, extracted content, and analyses.

    Improves performance by avoiding redundant operations.

    """

    

    def __init__(self, cache_dir: str = ".trend_cache", ttl_hours: int = 24):

        """

        Initialize the cache manager.

        

        Args:

            cache_dir: Directory for cache storage

            ttl_hours: Time-to-live for cached items in hours

        """

        self.cache_dir = Path(cache_dir)

        self.cache_dir.mkdir(exist_ok=True)

        self.ttl = timedelta(hours=ttl_hours)

    

    def _get_cache_key(self, key_data: str) -> str:

        """

        Generate a cache key from input data.

        

        Args:

            key_data: Data to generate key from

            

        Returns:

            Cache key string

        """

        return hashlib.md5(key_data.encode()).hexdigest()

    

    def get(self, key: str) -> Optional[Any]:

        """

        Retrieve an item from cache if it exists and is not expired.

        

        Args:

            key: Cache key

            

        Returns:

            Cached item or None if not found or expired

        """

        cache_key = self._get_cache_key(key)

        cache_file = self.cache_dir / f"{cache_key}.pkl"

        

        if not cache_file.exists():

            return None

        

        # Check if cache has expired

        file_time = datetime.fromtimestamp(cache_file.stat().st_mtime)

        if datetime.now() - file_time > self.ttl:

            cache_file.unlink()

            return None

        

        # Load cached data

        try:

            with open(cache_file, 'rb') as f:

                return pickle.load(f)

        except Exception as e:

            print(f"Cache read error: {e}")

            return None

    

    def set(self, key: str, value: Any):

        """

        Store an item in cache.

        

        Args:

            key: Cache key

            value: Value to cache

        """

        cache_key = self._get_cache_key(key)

        cache_file = self.cache_dir / f"{cache_key}.pkl"

        

        try:

            with open(cache_file, 'wb') as f:

                pickle.dump(value, f)

        except Exception as e:

            print(f"Cache write error: {e}")

    

    def clear(self):

        """Clear all cached items."""

        for cache_file in self.cache_dir.glob("*.pkl"):

            cache_file.unlink()


The CacheManager provides a simple but effective caching layer that stores search results, extracted content, and intermediate analyses. By caching these expensive operations, we significantly reduce the time required for subsequent analyses of the same or similar topics. The time-to-live mechanism ensures that cached data remains reasonably fresh while still providing performance benefits.



PRODUCTION-READY COMPLETE IMPLEMENTATION


The following complete implementation integrates all components into a production-ready system. This code represents a fully functional trend discovery agent that can be deployed and used immediately.



#!/usr/bin/env python3

"""

Trend Discovery Agent - Production Implementation


A comprehensive LLM-powered system for discovering and analyzing emerging trends

in any topic area. Supports both local and remote LLMs with GPU acceleration.


Usage:

    python trend_agent.py --topic "Artificial Intelligence" --num-trends 5

"""


import argparse

import sys

import time

import os

from abc import ABC, abstractmethod

from dataclasses import dataclass

from typing import List, Dict, Optional, Any, Tuple

from datetime import datetime, timedelta

from enum import Enum

import hashlib

import pickle

from pathlib import Path


# Third-party imports

import torch

from transformers import AutoModelForCausalLM, AutoTokenizer

import requests

from bs4 import BeautifulSoup

from duckduckgo_search import DDGS

import re



# ============================================================================

# DATA STRUCTURES

# ============================================================================


@dataclass

class GenerationConfig:

    """Configuration parameters for text generation."""

    temperature: float = 0.7

    max_tokens: int = 2048

    top_p: float = 0.9

    frequency_penalty: float = 0.0

    presence_penalty: float = 0.0

    stop_sequences: Optional[List[str]] = None



@dataclass

class Message:

    """Represents a single message in a conversation."""

    role: str

    content: str



@dataclass

class SearchResult:

    """Represents a single search result."""

    title: str

    url: str

    snippet: str

    published_date: Optional[datetime] = None

    source: Optional[str] = None



class TrendCategory(Enum):

    """Classification categories for identified trends."""

    FAD = "fad"

    MICRO_TREND = "micro_trend"

    TREND = "trend"

    MACRO_TREND = "macro_trend"

    MEGA_TREND = "mega_trend"



@dataclass

class TrendMetrics:

    """Quantitative metrics for trend evaluation."""

    adoption_velocity: float

    market_breadth: float

    investment_level: float

    innovation_intensity: float

    media_attention: float

    expert_consensus: float

    sustainability: float



@dataclass

class TrendAnalysis:

    """Represents a complete trend analysis."""

    trend_name: str

    category: TrendCategory

    summary: str

    technology_impact: str

    science_impact: str

    product_impact: str

    key_indicators: List[str]

    time_horizon: str

    confidence_level: float

    sources: List[SearchResult]

    recommended_urls: List[str]

    metrics: Optional[TrendMetrics] = None



# ============================================================================

# LLM INTERFACE LAYER

# ============================================================================


class LLMInterface(ABC):

    """Abstract base class for all LLM implementations."""

    

    @abstractmethod

    def generate(self, messages: List[Message], config: GenerationConfig) -> str:

        """Generate a response based on the conversation history."""

        pass

    

    @abstractmethod

    def get_device_info(self) -> Dict[str, Any]:

        """Retrieve information about the compute device being used."""

        pass



class LocalLLMProvider(LLMInterface):

    """Local LLM implementation with automatic GPU acceleration."""

    

    def __init__(self, model_name: str, device: Optional[str] = None):

        """Initialize the local LLM provider."""

        self.model_name = model_name

        self.device = self._determine_device(device)

        

        print(f"Loading model {model_name} on {self.device}...")

        

        self.tokenizer = AutoTokenizer.from_pretrained(model_name)

        

        if self.device == 'cuda':

            self.model = AutoModelForCausalLM.from_pretrained(

                model_name,

                torch_dtype=torch.float16,

                device_map='auto'

            )

        elif self.device == 'mps':

            self.model = AutoModelForCausalLM.from_pretrained(

                model_name,

                torch_dtype=torch.float16

            ).to('mps')

        else:

            self.model = AutoModelForCausalLM.from_pretrained(model_name)

            self.model.to('cpu')

        

        print(f"Model loaded successfully on {self.device}")

    

    def _determine_device(self, preferred_device: Optional[str]) -> str:

        """Determine the optimal compute device."""

        if preferred_device:

            return preferred_device

        

        if torch.cuda.is_available():

            return 'cuda'

        elif torch.backends.mps.is_available():

            return 'mps'

        else:

            return 'cpu'

    

    def generate(self, messages: List[Message], config: GenerationConfig) -> str:

        """Generate response using the local model."""

        prompt = self._format_messages(messages)

        inputs = self.tokenizer(prompt, return_tensors='pt').to(self.device)

        

        gen_kwargs = {

            'max_new_tokens': config.max_tokens,

            'temperature': config.temperature,

            'top_p': config.top_p,

            'do_sample': True,

            'pad_token_id': self.tokenizer.eos_token_id

        }

        

        with torch.no_grad():

            outputs = self.model.generate(**inputs, **gen_kwargs)

        

        full_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        response = full_response[len(prompt):].strip()

        

        return response

    

    def _format_messages(self, messages: List[Message]) -> str:

        """Format conversation messages into a prompt string."""

        formatted_parts = []

        for msg in messages:

            if msg.role == 'system':

                formatted_parts.append(f"System: {msg.content}")

            elif msg.role == 'user':

                formatted_parts.append(f"User: {msg.content}")

            elif msg.role == 'assistant':

                formatted_parts.append(f"Assistant: {msg.content}")

        

        formatted_parts.append("Assistant:")

        return "\n\n".join(formatted_parts)

    

    def get_device_info(self) -> Dict[str, Any]:

        """Retrieve information about the compute device."""

        info = {

            'device_type': self.device,

            'model_name': self.model_name

        }

        

        if self.device == 'cuda':

            info['gpu_name'] = torch.cuda.get_device_name(0)

            info['gpu_memory_total'] = torch.cuda.get_device_properties(0).total_memory

            info['gpu_memory_allocated'] = torch.cuda.memory_allocated(0)

        elif self.device == 'mps':

            info['gpu_name'] = 'Apple Silicon'

        

        return info



class RemoteLLMProvider(LLMInterface):

    """Remote LLM implementation for API-based services."""

    

    def __init__(self, api_key: str, model_name: str, base_url: str = "https://api.openai.com/v1"):

        """Initialize the remote LLM provider."""

        self.api_key = api_key

        self.model_name = model_name

        self.base_url = base_url

        self.headers = {

            'Authorization': f'Bearer {api_key}',

            'Content-Type': 'application/json'

        }

    

    def generate(self, messages: List[Message], config: GenerationConfig) -> str:

        """Generate response using the remote API."""

        api_messages = [

            {'role': msg.role, 'content': msg.content}

            for msg in messages

        ]

        

        payload = {

            'model': self.model_name,

            'messages': api_messages,

            'temperature': config.temperature,

            'max_tokens': config.max_tokens,

            'top_p': config.top_p,

            'frequency_penalty': config.frequency_penalty,

            'presence_penalty': config.presence_penalty

        }

        

        if config.stop_sequences:

            payload['stop'] = config.stop_sequences

        

        response = requests.post(

            f'{self.base_url}/chat/completions',

            headers=self.headers,

            json=payload,

            timeout=120

        )

        

        response.raise_for_status()

        result = response.json()

        

        return result['choices'][0]['message']['content']

    

    def get_device_info(self) -> Dict[str, Any]:

        """Retrieve information about the remote service."""

        return {

            'device_type': 'remote',

            'model_name': self.model_name,

            'base_url': self.base_url

        }



# ============================================================================

# SEARCH INTERFACE LAYER

# ============================================================================


class SearchInterface(ABC):

    """Abstract base class for search providers."""

    

    @abstractmethod

    def search(self, query: str, num_results: int = 10, time_filter: Optional[str] = None) -> List[SearchResult]:

        """Execute a search query and return results."""

        pass



class DuckDuckGoSearchProvider(SearchInterface):

    """Search provider implementation using DuckDuckGo."""

    

    def __init__(self):

        """Initialize the DuckDuckGo search provider."""

        self.ddgs = DDGS()

    

    def search(self, query: str, num_results: int = 10, time_filter: Optional[str] = None) -> List[SearchResult]:

        """Execute a search query using DuckDuckGo."""

        try:

            search_params = {'max_results': num_results}

            if time_filter:

                search_params['timelimit'] = time_filter

            

            results = list(self.ddgs.text(query, **search_params))

            

            search_results = []

            for result in results:

                search_result = SearchResult(

                    title=result.get('title', ''),

                    url=result.get('href', ''),

                    snippet=result.get('body', ''),

                    source=result.get('source', None)

                )

                search_results.append(search_result)

            

            return search_results

            

        except Exception as e:

            print(f"Search error: {str(e)}")

            return []



# ============================================================================

# WEB CONTENT EXTRACTION

# ============================================================================


class WebContentExtractor:

    """Extracts main textual content from web pages."""

    

    def __init__(self, timeout: int = 10):

        """Initialize the web content extractor."""

        self.timeout = timeout

        self.headers = {

            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'

        }

    

    def extract_content(self, url: str) -> Optional[str]:

        """Extract main textual content from a web page."""

        try:

            response = requests.get(url, headers=self.headers, timeout=self.timeout)

            response.raise_for_status()

            

            soup = BeautifulSoup(response.content, 'html.parser')

            

            for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):

                element.decompose()

            

            main_content = soup.find('main') or soup.find('article') or soup.find('body')

            

            if not main_content:

                return None

            

            text = main_content.get_text(separator='\n', strip=True)

            text = re.sub(r'\n\s*\n', '\n\n', text)

            text = re.sub(r' +', ' ', text)

            

            return text

            

        except Exception as e:

            print(f"Content extraction error for {url}: {str(e)}")

            return None

    

    def extract_summary(self, url: str, max_length: int = 1000) -> Optional[str]:

        """Extract a summary of the web page content."""

        content = self.extract_content(url)

        

        if not content:

            return None

        

        if len(content) <= max_length:

            return content

        

        truncated = content[:max_length]

        last_period = truncated.rfind('.')

        

        if last_period > max_length * 0.7:

            return truncated[:last_period + 1]

        else:

            return truncated + '...'



# ============================================================================

# CACHE MANAGEMENT

# ============================================================================


class CacheManager:

    """Manages caching of search results and analyses."""

    

    def __init__(self, cache_dir: str = ".trend_cache", ttl_hours: int = 24):

        """Initialize the cache manager."""

        self.cache_dir = Path(cache_dir)

        self.cache_dir.mkdir(exist_ok=True)

        self.ttl = timedelta(hours=ttl_hours)

    

    def _get_cache_key(self, key_data: str) -> str:

        """Generate a cache key from input data."""

        return hashlib.md5(key_data.encode()).hexdigest()

    

    def get(self, key: str) -> Optional[Any]:

        """Retrieve an item from cache if it exists and is not expired."""

        cache_key = self._get_cache_key(key)

        cache_file = self.cache_dir / f"{cache_key}.pkl"

        

        if not cache_file.exists():

            return None

        

        file_time = datetime.fromtimestamp(cache_file.stat().st_mtime)

        if datetime.now() - file_time > self.ttl:

            cache_file.unlink()

            return None

        

        try:

            with open(cache_file, 'rb') as f:

                return pickle.load(f)

        except Exception:

            return None

    

    def set(self, key: str, value: Any):

        """Store an item in cache."""

        cache_key = self._get_cache_key(key)

        cache_file = self.cache_dir / f"{cache_key}.pkl"

        

        try:

            with open(cache_file, 'wb') as f:

                pickle.dump(value, f)

        except Exception as e:

            print(f"Cache write error: {e}")

    

    def clear(self):

        """Clear all cached items."""

        for cache_file in self.cache_dir.glob("*.pkl"):

            cache_file.unlink()



# ============================================================================

# TREND CLASSIFICATION

# ============================================================================


class TrendClassifier:

    """Advanced trend classification using multi-dimensional analysis."""

    

    def __init__(self, llm: LLMInterface):

        """Initialize the trend classifier."""

        self.llm = llm

    

    def extract_metrics(self, trend_name: str, content_samples: List[Dict[str, str]]) -> TrendMetrics:

        """Extract quantitative metrics from content about a trend."""

        context = "\n\n".join([

            f"{sample['title']}\n{sample['content'][:800]}"

            for sample in content_samples[:5]

        ])

        

        prompt = f"""Analyze the following content about the trend "{trend_name}" and score it across seven dimensions on a scale from 0.0 to 1.0.


Content:

{context}


Provide scores for each dimension based on evidence in the content:


1. ADOPTION_VELOCITY: How quickly is this trend being adopted? (0.0 = very slow/stagnant, 1.0 = explosive growth)

2. MARKET_BREADTH: How broad is the geographic and demographic reach? (0.0 = very narrow niche, 1.0 = global and cross-demographic)

3. INVESTMENT_LEVEL: What is the level of financial investment and resource allocation? (0.0 = minimal investment, 1.0 = massive investment from multiple sources)

4. INNOVATION_INTENSITY: How novel and disruptive is this trend? (0.0 = incremental improvement, 1.0 = paradigm-shifting innovation)

5. MEDIA_ATTENTION: What is the level of media coverage and public discussion? (0.0 = minimal coverage, 1.0 = extensive mainstream coverage)

6. EXPERT_CONSENSUS: What is the level of agreement among domain experts? (0.0 = highly controversial/disputed, 1.0 = strong expert consensus)

7. SUSTAINABILITY: What are the indicators of long-term viability? (0.0 = likely to fade quickly, 1.0 = strong fundamentals for longevity)


Format your response as:

ADOPTION_VELOCITY: [score]

MARKET_BREADTH: [score]

INVESTMENT_LEVEL: [score]

INNOVATION_INTENSITY: [score]

MEDIA_ATTENTION: [score]

EXPERT_CONSENSUS: [score]

SUSTAINABILITY: [score]"""

        

        messages = [Message(role='user', content=prompt)]

        config = GenerationConfig(temperature=0.2, max_tokens=500)

        response = self.llm.generate(messages, config)

        

        scores = self._parse_metric_scores(response)

        

        return TrendMetrics(

            adoption_velocity=scores.get('ADOPTION_VELOCITY', 0.5),

            market_breadth=scores.get('MARKET_BREADTH', 0.5),

            investment_level=scores.get('INVESTMENT_LEVEL', 0.5),

            innovation_intensity=scores.get('INNOVATION_INTENSITY', 0.5),

            media_attention=scores.get('MEDIA_ATTENTION', 0.5),

            expert_consensus=scores.get('EXPERT_CONSENSUS', 0.5),

            sustainability=scores.get('SUSTAINABILITY', 0.5)

        )

    

    def _parse_metric_scores(self, response: str) -> Dict[str, float]:

        """Parse metric scores from LLM response."""

        scores = {}

        

        for line in response.split('\n'):

            line = line.strip()

            if ':' in line:

                parts = line.split(':', 1)

                metric_name = parts[0].strip().upper()

                try:

                    score_str = parts[1].strip()

                    score = float(score_str)

                    scores[metric_name] = max(0.0, min(1.0, score))

                except ValueError:

                    continue

        

        return scores

    

    def classify_from_metrics(self, metrics: TrendMetrics) -> Tuple[TrendCategory, float]:

        """Classify a trend based on its metrics."""

        reach_score = (metrics.market_breadth + metrics.adoption_velocity) / 2

        impact_score = (metrics.innovation_intensity + metrics.investment_level) / 2

        longevity_score = (metrics.sustainability + metrics.expert_consensus) / 2

        overall_strength = (reach_score + impact_score + longevity_score) / 3

        

        if longevity_score < 0.3 or metrics.sustainability < 0.25:

            category = TrendCategory.FAD

            confidence = 0.7 + (0.3 * (1.0 - longevity_score))

        elif reach_score < 0.4 and metrics.market_breadth < 0.35:

            category = TrendCategory.MICRO_TREND

            confidence = 0.6 + (0.3 * metrics.expert_consensus)

        elif overall_strength >= 0.75 and longevity_score >= 0.7 and reach_score >= 0.7:

            if metrics.market_breadth >= 0.8 and impact_score >= 0.75:

                category = TrendCategory.MEGA_TREND

                confidence = 0.65 + (0.35 * overall_strength)

            else:

                category = TrendCategory.MACRO_TREND

                confidence = 0.7 + (0.25 * overall_strength)

        elif overall_strength >= 0.5:

            category = TrendCategory.TREND

            confidence = 0.6 + (0.35 * overall_strength)

        else:

            category = TrendCategory.MICRO_TREND

            confidence = 0.55 + (0.3 * overall_strength)

        

        return category, confidence



# ============================================================================

# TREND ANALYSIS ENGINE

# ============================================================================


class TrendAnalysisEngine:

    """Core engine for discovering and analyzing trends."""

    

    def __init__(self, llm: LLMInterface, search_provider: SearchInterface, 

                 content_extractor: WebContentExtractor, cache_manager: CacheManager):

        """Initialize the trend analysis engine."""

        self.llm = llm

        self.search_provider = search_provider

        self.content_extractor = content_extractor

        self.cache_manager = cache_manager

        self.classifier = TrendClassifier(llm)

        

        self.system_prompt = """You are an expert trend researcher and analyst with deep knowledge of trend research methodologies. Your task is to analyze information about emerging patterns in various domains and classify them according to established trend research frameworks.


When analyzing trends, consider the following classification criteria:


A FAD is a short-lived phenomenon, typically lasting less than a year, with limited impact beyond a specific niche or community. Fads generate temporary excitement but lack the substance for long-term adoption.


A MICRO TREND affects a specific subculture or niche market, lasting one to three years. These trends have limited geographic or demographic reach but can be significant within their specific context.


A TREND represents a significant pattern of change lasting three to ten years, affecting entire industries or substantial market segments. Trends reshape business practices, consumer behavior, or technological approaches within specific domains.


A MACRO TREND spans ten to twenty years and affects multiple industries or sectors simultaneously. These trends represent fundamental shifts in how people work, live, or interact with technology.


A MEGA TREND encompasses twenty years or more and represents transformational changes that reshape society, economy, and technology on a global scale. Mega trends affect virtually all aspects of human activity.


Your analysis should be evidence-based, drawing on concrete indicators such as investment patterns, adoption rates, research activity, media coverage, and expert commentary. Always distinguish between hype and substance, and provide balanced assessments of both opportunities and challenges."""

    

    def analyze_topic(self, topic_area: str, num_trends: int = 5) -> List[TrendAnalysis]:

        """Analyze a topic area and identify emerging trends."""

        print(f"Analyzing trends in: {topic_area}")

        

        cache_key = f"analysis_{topic_area}_{num_trends}"

        cached_result = self.cache_manager.get(cache_key)

        if cached_result:

            print("Using cached analysis results")

            return cached_result

        

        search_queries = self._generate_search_queries(topic_area)

        print(f"Generated {len(search_queries)} search queries")

        

        all_results = []

        for query in search_queries:

            results = self.search_provider.search(query, num_results=10, time_filter='m')

            all_results.extend(results)

            time.sleep(1)

        

        print(f"Collected {len(all_results)} search results")

        

        content_samples = self._extract_content_samples(all_results, max_samples=20)

        print(f"Extracted content from {len(content_samples)} sources")

        

        potential_trends = self._identify_trends(topic_area, content_samples)

        print(f"Identified {len(potential_trends)} potential trends")

        

        trend_analyses = []

        for trend_name in potential_trends[:num_trends]:

            analysis = self._analyze_single_trend(topic_area, trend_name, all_results, content_samples)

            if analysis:

                trend_analyses.append(analysis)

        

        self.cache_manager.set(cache_key, trend_analyses)

        

        return trend_analyses

    

    def _generate_search_queries(self, topic_area: str) -> List[str]:

        """Generate effective search queries for the topic area."""

        cache_key = f"queries_{topic_area}"

        cached_queries = self.cache_manager.get(cache_key)

        if cached_queries:

            return cached_queries

        

        messages = [

            Message(role='system', content=self.system_prompt),

            Message(role='user', content=f"""Generate 5 effective search queries to discover emerging trends in {topic_area}. 


The queries should target:

1. Recent developments and innovations

2. Industry reports and forecasts

3. Research publications and breakthroughs

4. Market analysis and adoption patterns

5. Expert commentary and thought leadership


Return only the search queries, one per line, without numbering or additional explanation.""")

        ]

        

        config = GenerationConfig(temperature=0.7, max_tokens=500)

        response = self.llm.generate(messages, config)

        

        queries = [q.strip() for q in response.strip().split('\n') if q.strip()]

        

        self.cache_manager.set(cache_key, queries)

        

        return queries

    

    def _extract_content_samples(self, results: List[SearchResult], max_samples: int = 20) -> List[Dict[str, str]]:

        """Extract content from search results."""

        content_samples = []

        

        for result in results[:max_samples]:

            cache_key = f"content_{result.url}"

            cached_content = self.cache_manager.get(cache_key)

            

            if cached_content:

                content = cached_content

            else:

                content = self.content_extractor.extract_summary(result.url, max_length=2000)

                if content:

                    self.cache_manager.set(cache_key, content)

            

            if content:

                content_samples.append({

                    'url': result.url,

                    'title': result.title,

                    'content': content

                })

        

        return content_samples

    

    def _identify_trends(self, topic_area: str, content_samples: List[Dict[str, str]]) -> List[str]:

        """Identify potential trends from content samples."""

        content_summary = "\n\n".join([

            f"Source: {sample['title']}\n{sample['content'][:500]}"

            for sample in content_samples[:10]

        ])

        

        messages = [

            Message(role='system', content=self.system_prompt),

            Message(role='user', content=f"""Based on the following content about {topic_area}, identify 5-7 distinct emerging trends or patterns.


Content samples:

{content_summary}


List the trend names only, one per line. Each trend name should be concise (2-5 words) and descriptive.""")

        ]

        

        config = GenerationConfig(temperature=0.7, max_tokens=500)

        response = self.llm.generate(messages, config)

        

        trends = [t.strip() for t in response.strip().split('\n') if t.strip()]

        return trends

    

    def _analyze_single_trend(self, topic_area: str, trend_name: str, 

                             all_results: List[SearchResult], 

                             content_samples: List[Dict[str, str]]) -> Optional[TrendAnalysis]:

        """Perform detailed analysis of a single trend."""

        relevant_sources = self._find_relevant_sources(trend_name, all_results)

        

        detailed_content = []

        for source in relevant_sources[:5]:

            cache_key = f"content_{source.url}"

            cached_content = self.cache_manager.get(cache_key)

            

            if cached_content:

                content = cached_content

            else:

                content = self.content_extractor.extract_summary(source.url, max_length=1500)

                if content:

                    self.cache_manager.set(cache_key, content)

            

            if content:

                detailed_content.append({

                    'url': source.url,

                    'title': source.title,

                    'content': content

                })

        

        if not detailed_content:

            return None

        

        metrics = self.classifier.extract_metrics(trend_name, detailed_content)

        category, confidence = self.classifier.classify_from_metrics(metrics)

        

        context = "\n\n".join([

            f"Source: {item['title']}\nURL: {item['url']}\n{item['content']}"

            for item in detailed_content

        ])

        

        messages = [

            Message(role='system', content=self.system_prompt),

            Message(role='user', content=f"""Analyze the trend "{trend_name}" in the context of {topic_area}.


Based on the following sources, provide a comprehensive analysis:


{context}


Your analysis must include:


1. SUMMARY: A concise 2-3 sentence summary of what this trend represents.


2. TECHNOLOGY_IMPACT: How this trend affects or will affect technology development, including specific technologies, platforms, or approaches.


3. SCIENCE_IMPACT: How this trend influences scientific research, methodologies, or understanding in relevant fields.


4. PRODUCT_IMPACT: How this trend affects or will affect products, services, and market offerings.


5. KEY_INDICATORS: List 3-5 specific, observable indicators that demonstrate this is a genuine trend rather than speculation.


6. TIME_HORIZON: Estimated timeframe for significant impact (e.g., "1-2 years", "5-10 years").


Format your response as follows:

SUMMARY: [summary text]

TECHNOLOGY_IMPACT: [impact description]

SCIENCE_IMPACT: [impact description]

PRODUCT_IMPACT: [impact description]

KEY_INDICATORS: [indicator 1] | [indicator 2] | [indicator 3]

TIME_HORIZON: [timeframe]""")

        ]

        

        config = GenerationConfig(temperature=0.3, max_tokens=2000)

        response = self.llm.generate(messages, config)

        

        analysis_dict = self._parse_analysis_response(response)

        

        if not analysis_dict:

            return None

        

        recommended_urls = [item['url'] for item in detailed_content[:3]]

        

        return TrendAnalysis(

            trend_name=trend_name,

            category=category,

            summary=analysis_dict.get('SUMMARY', ''),

            technology_impact=analysis_dict.get('TECHNOLOGY_IMPACT', ''),

            science_impact=analysis_dict.get('SCIENCE_IMPACT', ''),

            product_impact=analysis_dict.get('PRODUCT_IMPACT', ''),

            key_indicators=[k.strip() for k in analysis_dict.get('KEY_INDICATORS', '').split('|') if k.strip()],

            time_horizon=analysis_dict.get('TIME_HORIZON', ''),

            confidence_level=confidence,

            sources=relevant_sources[:5],

            recommended_urls=recommended_urls,

            metrics=metrics

        )

    

    def _find_relevant_sources(self, trend_name: str, all_results: List[SearchResult]) -> List[SearchResult]:

        """Find search results most relevant to a specific trend."""

        scored_results = []

        trend_keywords = set(trend_name.lower().split())

        

        for result in all_results:

            text = f"{result.title} {result.snippet}".lower()

            score = sum(1 for keyword in trend_keywords if keyword in text)

            if score > 0:

                scored_results.append((score, result))

        

        scored_results.sort(reverse=True, key=lambda x: x[0])

        

        return [result for score, result in scored_results]

    

    def _parse_analysis_response(self, response: str) -> Dict[str, str]:

        """Parse structured analysis response from LLM."""

        result = {}

        current_field = None

        current_value = []

        

        for line in response.split('\n'):

            line = line.strip()

            if not line:

                continue

            

            if ':' in line:

                parts = line.split(':', 1)

                field_name = parts[0].strip().upper()

                

                if current_field:

                    result[current_field] = ' '.join(current_value).strip()

                

                current_field = field_name

                current_value = [parts[1].strip()] if len(parts) > 1 else []

            elif current_field:

                current_value.append(line)

        

        if current_field:

            result[current_field] = ' '.join(current_value).strip()

        

        return result



# ============================================================================

# MAIN AGENT INTERFACE

# ============================================================================


class TrendDiscoveryAgent:

    """Main interface for the trend discovery system."""

    

    def __init__(self, llm: LLMInterface, search_provider: SearchInterface, cache_manager: CacheManager):

        """Initialize the trend discovery agent."""

        self.llm = llm

        self.search_provider = search_provider

        self.cache_manager = cache_manager

        self.content_extractor = WebContentExtractor()

        self.analysis_engine = TrendAnalysisEngine(llm, search_provider, self.content_extractor, cache_manager)

    

    def discover_trends(self, topic_area: str, num_trends: int = 5) -> str:

        """Discover and analyze trends in a given topic area."""

        print(f"\n{'='*80}")

        print(f"TREND DISCOVERY AGENT")

        print(f"Topic Area: {topic_area}")

        print(f"{'='*80}\n")

        

        device_info = self.llm.get_device_info()

        print(f"Using {device_info['device_type'].upper()} acceleration")

        if 'gpu_name' in device_info:

            print(f"GPU: {device_info['gpu_name']}")

        print()

        

        trend_analyses = self.analysis_engine.analyze_topic(topic_area, num_trends)

        

        report = self._format_report(topic_area, trend_analyses)

        return report

    

    def _format_report(self, topic_area: str, analyses: List[TrendAnalysis]) -> str:

        """Format trend analyses into a comprehensive report."""

        report_lines = []

        

        report_lines.append(f"\n{'='*80}")

        report_lines.append(f"TREND ANALYSIS REPORT: {topic_area.upper()}")

        report_lines.append(f"{'='*80}\n")

        

        report_lines.append(f"Total Trends Identified: {len(analyses)}\n")

        

        report_lines.append("TREND OVERVIEW")

        report_lines.append("-" * 80)

        for i, analysis in enumerate(analyses, 1):

            report_lines.append(f"{i}. {analysis.trend_name}")

            report_lines.append(f"   Category: {analysis.category.value.replace('_', ' ').title()}")

            report_lines.append(f"   Confidence: {analysis.confidence_level:.2f}")

            report_lines.append(f"   Time Horizon: {analysis.time_horizon}")

            report_lines.append("")

        

        for i, analysis in enumerate(analyses, 1):

            report_lines.append(f"\n{'='*80}")

            report_lines.append(f"TREND {i}: {analysis.trend_name.upper()}")

            report_lines.append(f"{'='*80}\n")

            

            report_lines.append(f"Classification: {analysis.category.value.replace('_', ' ').title()}")

            report_lines.append(f"Confidence Level: {analysis.confidence_level:.2f}")

            report_lines.append(f"Time Horizon: {analysis.time_horizon}\n")

            

            if analysis.metrics:

                report_lines.append("TREND METRICS")

                report_lines.append("-" * 80)

                report_lines.append(f"Adoption Velocity: {analysis.metrics.adoption_velocity:.2f}")

                report_lines.append(f"Market Breadth: {analysis.metrics.market_breadth:.2f}")

                report_lines.append(f"Investment Level: {analysis.metrics.investment_level:.2f}")

                report_lines.append(f"Innovation Intensity: {analysis.metrics.innovation_intensity:.2f}")

                report_lines.append(f"Media Attention: {analysis.metrics.media_attention:.2f}")

                report_lines.append(f"Expert Consensus: {analysis.metrics.expert_consensus:.2f}")

                report_lines.append(f"Sustainability: {analysis.metrics.sustainability:.2f}")

                report_lines.append("")

            

            report_lines.append("SUMMARY")

            report_lines.append("-" * 80)

            report_lines.append(self._wrap_text(analysis.summary, 80))

            report_lines.append("")

            

            report_lines.append("TECHNOLOGY IMPACT")

            report_lines.append("-" * 80)

            report_lines.append(self._wrap_text(analysis.technology_impact, 80))

            report_lines.append("")

            

            report_lines.append("SCIENCE IMPACT")

            report_lines.append("-" * 80)

            report_lines.append(self._wrap_text(analysis.science_impact, 80))

            report_lines.append("")

            

            report_lines.append("PRODUCT IMPACT")

            report_lines.append("-" * 80)

            report_lines.append(self._wrap_text(analysis.product_impact, 80))

            report_lines.append("")

            

            report_lines.append("KEY INDICATORS")

            report_lines.append("-" * 80)

            for indicator in analysis.key_indicators:

                if indicator.strip():

                    report_lines.append(f"  - {indicator.strip()}")

            report_lines.append("")

            

            report_lines.append("RECOMMENDED READING")

            report_lines.append("-" * 80)

            for url in analysis.recommended_urls:

                report_lines.append(f"  {url}")

            report_lines.append("")

        

        return "\n".join(report_lines)

    

    def _wrap_text(self, text: str, width: int = 80) -> str:

        """Wrap text to specified width while preserving words."""

        words = text.split()

        lines = []

        current_line = []

        current_length = 0

        

        for word in words:

            if current_length + len(word) + 1 <= width:

                current_line.append(word)

                current_length += len(word) + 1

            else:

                if current_line:

                    lines.append(' '.join(current_line))

                current_line = [word]

                current_length = len(word)

        

        if current_line:

            lines.append(' '.join(current_line))

        

        return '\n'.join(lines)



# ============================================================================

# COMMAND LINE INTERFACE

# ============================================================================


def main():

    """Main entry point for the trend discovery agent."""

    parser = argparse.ArgumentParser(

        description='Discover and analyze emerging trends in any topic area'

    )

    

    parser.add_argument(

        '--topic',

        type=str,

        required=True,

        help='Topic area to analyze (e.g., "Artificial Intelligence", "3D Printing")'

    )

    

    parser.add_argument(

        '--num-trends',

        type=int,

        default=5,

        help='Number of trends to identify (default: 5)'

    )

    

    parser.add_argument(

        '--llm-type',

        type=str,

        choices=['local', 'remote'],

        default='remote',

        help='Type of LLM to use (default: remote)'

    )

    

    parser.add_argument(

        '--model',

        type=str,

        default='gpt-4',

        help='Model name (default: gpt-4 for remote, or specify local model)'

    )

    

    parser.add_argument(

        '--api-key',

        type=str,

        help='API key for remote LLM (can also use OPENAI_API_KEY env var)'

    )

    

    parser.add_argument(

        '--device',

        type=str,

        choices=['cuda', 'mps', 'cpu'],

        help='Device for local LLM (auto-detect if not specified)'

    )

    

    parser.add_argument(

        '--output',

        type=str,

        help='Output file for the report (optional)'

    )

    

    parser.add_argument(

        '--clear-cache',

        action='store_true',

        help='Clear the cache before running'

    )

    

    args = parser.parse_args()

    

    cache_manager = CacheManager()

    

    if args.clear_cache:

        print("Clearing cache...")

        cache_manager.clear()

    

    if args.llm_type == 'local':

        if not args.model or args.model == 'gpt-4':

            print("Error: Please specify a local model name with --model")

            sys.exit(1)

        

        llm = LocalLLMProvider(args.model, args.device)

    else:

        api_key = args.api_key or os.environ.get('OPENAI_API_KEY')

        if not api_key:

            print("Error: API key required for remote LLM. Use --api-key or set OPENAI_API_KEY env var")

            sys.exit(1)

        

        llm = RemoteLLMProvider(api_key, args.model)

    

    search_provider = DuckDuckGoSearchProvider()

    

    agent = TrendDiscoveryAgent(llm, search_provider, cache_manager)

    

    try:

        report = agent.discover_trends(args.topic, args.num_trends)

        

        print(report)

        

        if args.output:

            with open(args.output, 'w', encoding='utf-8') as f:

                f.write(report)

            print(f"\nReport saved to: {args.output}")

    

    except KeyboardInterrupt:

        print("\n\nAnalysis interrupted by user")

        sys.exit(0)

    except Exception as e:

        print(f"\nError during analysis: {str(e)}")

        import traceback

        traceback.print_exc()

        sys.exit(1)



if __name__ == '__main__':

    main()



This complete implementation provides a production-ready trend discovery agent that can be deployed immediately. The system supports both local and remote LLMs, automatically optimizes GPU usage, implements comprehensive caching, and produces detailed trend analyses based on rigorous methodologies from trend research.

To use the system, save the code to a file named trend_agent.py and install the required dependencies using pip install torch transformers requests beautifulsoup4 duckduckgo-search. Then run the agent with a command like python trend_agent.py --topic "Generative AI" --num-trends 5 --llm-type remote --api-key YOUR_API_KEY.

The system will automatically discover emerging trends, classify them according to established frameworks, assess their impact across technology, science, and products, and provide curated resources for further exploration. The multi-dimensional analysis ensures that trend classifications are defensible and evidence-based, while the caching system improves performance for repeated analyses.

No comments: