Monday, January 26, 2026

Building an Agentic AI for Real-Time Stock Market Analysis and Investment Decision Making



Introduction and Problem Definition


The financial markets represent one of the most complex and dynamic environments where artificial intelligence can provide significant value. Building an agentic AI system capable of monitoring live market data, analyzing historical trends, tracking political and economic developments, and making investment recommendations requires a sophisticated architecture that combines multiple data sources, analytical capabilities, and decision-making frameworks enhanced by Large Language Models (LLMs).


An agentic AI differs from traditional algorithmic trading systems in its autonomous nature and ability to reason about complex, multi-faceted information. Rather than following predetermined rules, such a system must be capable of adapting to new information, learning from market patterns, and making nuanced decisions based on incomplete or conflicting data. The integration of LLMs adds a crucial layer of natural language understanding and reasoning that enables the system to process unstructured information, generate human-readable explanations, and adapt its decision-making based on contextual understanding.


The core challenge lies in creating a system that can process vast amounts of real-time data from disparate sources, identify relevant patterns and correlations, and translate this analysis into actionable investment advice while managing risk and uncertainty inherent in financial markets. The addition of LLM capabilities allows the system to interpret complex financial documents, understand nuanced market commentary, and provide sophisticated reasoning for its recommendations.


System Architecture Overview with LLM Integration


The agentic AI system requires a multi-layered architecture that separates concerns while enabling seamless data flow and decision coordination. The foundation consists of data ingestion layers that connect to various market data feeds, news sources, and economic indicators. Above this sits the analytical engine that processes and correlates information, followed by the LLM-enhanced reasoning layer that interprets complex information and generates insights, and finally the decision-making layer that synthesizes all inputs into investment recommendations.


The LLM integration occurs at multiple levels within the architecture. At the data processing level, LLMs help interpret unstructured text data from news articles, analyst reports, and regulatory filings. At the analysis level, LLMs provide contextual understanding of market events and their potential implications. At the decision level, LLMs generate human-readable explanations for recommendations and can engage in sophisticated reasoning about market conditions.


The architecture must be designed for scalability and real-time processing, as financial markets generate enormous volumes of data that require immediate analysis. Additionally, the system needs robust error handling and fallback mechanisms, as any downtime or incorrect analysis could result in significant financial losses. The LLM components must be carefully designed to provide fast inference while maintaining accuracy and reliability.


A critical aspect of the architecture is the separation between data collection, analysis, LLM reasoning, and decision-making components. This modular approach allows for independent scaling and updating of different system components while maintaining overall system integrity. The LLM layer serves as both a processing component and an interface layer that can communicate findings in natural language.


Data Sources and Integration Framework


The effectiveness of an agentic AI for stock market analysis depends heavily on the quality and comprehensiveness of its data sources. The system must integrate multiple types of data feeds to build a complete picture of market conditions and influencing factors. The LLM component enhances this integration by providing sophisticated text processing capabilities for unstructured data sources.


Real-time market data forms the foundation of the system. This includes current stock prices, trading volumes, bid-ask spreads, and order book information for major indices like the Dow Jones Industrial Average and the German DAX. The system must connect to reliable financial data providers such as Bloomberg, Reuters, or Alpha Vantage to ensure data accuracy and minimal latency.



import asyncio

import websocket

import json

import openai

from datetime import datetime

from typing import Dict, List, Optional

from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM


class LLMEnhancedMarketDataCollector:

    def __init__(self, api_key: str, symbols: List[str], llm_model: str = "gpt-4"):

        self.api_key = api_key

        self.symbols = symbols

        self.current_prices = {}

        self.price_history = {}

        self.connection = None

        

        # Initialize LLM components

        self.llm_client = openai.OpenAI(api_key=api_key)

        self.llm_model = llm_model

        self.sentiment_analyzer = pipeline("sentiment-analysis", 

                                         model="ProsusAI/finbert")

        

        # Initialize local LLM for fast processing

        self.local_tokenizer = AutoTokenizer.from_pretrained("microsoft/DialoGPT-medium")

        self.local_model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")

        

    async def connect_to_feed(self):

        """Establish connection to real-time market data feed"""

        try:

            self.connection = await websocket.connect(

                f"wss://ws.finnhub.io?token={self.api_key}"

            )

            await self.subscribe_to_symbols()

        except Exception as e:

            print(f"Connection failed: {e}")

            await self.handle_connection_error()

    

    async def subscribe_to_symbols(self):

        """Subscribe to real-time updates for specified symbols"""

        for symbol in self.symbols:

            subscribe_msg = {

                "type": "subscribe",

                "symbol": symbol

            }

            await self.connection.send(json.dumps(subscribe_msg))

    

    async def process_market_data_with_llm(self, message: Dict):

        """Process incoming market data with LLM enhancement"""

        if message.get('type') == 'trade':

            symbol = message.get('s')

            price = message.get('p')

            volume = message.get('v')

            timestamp = message.get('t')

            

            # Store basic market data

            self.current_prices[symbol] = {

                'price': price,

                'volume': volume,

                'timestamp': timestamp,

                'last_updated': datetime.now()

            }

            

            # Store historical data for trend analysis

            if symbol not in self.price_history:

                self.price_history[symbol] = []

            

            self.price_history[symbol].append({

                'price': price,

                'volume': volume,

                'timestamp': timestamp

            })

            

            # LLM-enhanced analysis of price movement

            price_analysis = await self.analyze_price_movement_with_llm(symbol, price, volume)

            

            # Store LLM insights

            self.current_prices[symbol]['llm_analysis'] = price_analysis

    

    async def analyze_price_movement_with_llm(self, symbol: str, price: float, volume: int) -> Dict:

        """Use LLM to analyze price movement patterns and generate insights"""

        

        # Get recent price history for context

        recent_prices = self.price_history.get(symbol, [])[-20:]

        

        if len(recent_prices) < 5:

            return {"analysis": "Insufficient data for analysis", "confidence": 0.0}

        

        # Prepare context for LLM

        price_context = self.prepare_price_context(recent_prices, price, volume)

        

        # Use LLM to analyze the price movement

        prompt = f"""

        Analyze the following price movement data for stock symbol {symbol}:

        

        {price_context}

        

        Current price: ${price}

        Current volume: {volume}

        

        Provide a brief analysis of:

        1. The price trend (bullish, bearish, or sideways)

        2. Volume significance

        3. Any notable patterns

        4. Potential support/resistance levels

        

        Keep the response concise and focused on actionable insights.

        """

        

        try:

            response = await self.get_llm_response(prompt)

            

            # Extract sentiment from the analysis

            sentiment_result = self.sentiment_analyzer(response[:512])

            

            return {

                "analysis": response,

                "sentiment": sentiment_result[0]['label'],

                "sentiment_score": sentiment_result[0]['score'],

                "confidence": self.calculate_analysis_confidence(recent_prices, price, volume),

                "timestamp": datetime.now()

            }

            

        except Exception as e:

            print(f"LLM analysis error: {e}")

            return {"analysis": "Analysis unavailable", "confidence": 0.0}

    

    async def get_llm_response(self, prompt: str, use_local: bool = False) -> str:

        """Get response from LLM (local or cloud-based)"""

        if use_local:

            # Use local model for faster response

            inputs = self.local_tokenizer.encode(prompt, return_tensors='pt')

            outputs = self.local_model.generate(inputs, max_length=200, 

                                              num_return_sequences=1, 

                                              temperature=0.7)

            response = self.local_tokenizer.decode(outputs[0], skip_special_tokens=True)

            return response[len(prompt):].strip()

        else:

            # Use cloud-based LLM for more sophisticated analysis

            response = self.llm_client.chat.completions.create(

                model=self.llm_model,

                messages=[

                    {"role": "system", "content": "You are a financial analyst AI assistant specializing in real-time market analysis."},

                    {"role": "user", "content": prompt}

                ],

                max_tokens=300,

                temperature=0.3

            )

            return response.choices[0].message.content

    

    def prepare_price_context(self, recent_prices: List[Dict], current_price: float, current_volume: int) -> str:

        """Prepare price context for LLM analysis"""

        if not recent_prices:

            return "No recent price data available."

        

        context_lines = []

        context_lines.append("Recent price history (last 20 data points):")

        

        for i, price_point in enumerate(recent_prices[-10:]):  # Last 10 points for brevity

            timestamp = datetime.fromtimestamp(price_point['timestamp']/1000)

            context_lines.append(f"{timestamp.strftime('%H:%M:%S')}: ${price_point['price']:.2f} (Vol: {price_point['volume']})")

        

        # Calculate basic statistics

        prices = [p['price'] for p in recent_prices]

        volumes = [p['volume'] for p in recent_prices]

        

        avg_price = sum(prices) / len(prices)

        avg_volume = sum(volumes) / len(volumes)

        price_change = ((current_price - prices[0]) / prices[0]) * 100

        

        context_lines.append(f"\nStatistics:")

        context_lines.append(f"Average price: ${avg_price:.2f}")

        context_lines.append(f"Average volume: {avg_volume:,.0f}")

        context_lines.append(f"Price change from start: {price_change:.2f}%")

        context_lines.append(f"Volume vs average: {(current_volume/avg_volume)*100:.1f}%")

        

        return "\n".join(context_lines)

    

    def calculate_analysis_confidence(self, recent_prices: List[Dict], current_price: float, current_volume: int) -> float:

        """Calculate confidence score for the analysis based on data quality"""

        if len(recent_prices) < 5:

            return 0.2

        

        # Factors affecting confidence

        data_points = min(1.0, len(recent_prices) / 20.0)  # More data = higher confidence

        

        # Price volatility (lower volatility = higher confidence for trend analysis)

        prices = [p['price'] for p in recent_prices]

        price_std = np.std(prices) / np.mean(prices) if prices else 1.0

        volatility_factor = max(0.1, 1.0 - price_std)

        

        # Volume consistency

        volumes = [p['volume'] for p in recent_prices]

        volume_consistency = 1.0 - (np.std(volumes) / np.mean(volumes)) if volumes else 0.5

        volume_consistency = max(0.1, min(1.0, volume_consistency))

        

        # Combine factors

        confidence = (data_points * 0.4 + volatility_factor * 0.3 + volume_consistency * 0.3)

        return min(1.0, max(0.1, confidence))



Historical market data provides the context necessary for trend analysis and pattern recognition. The system must maintain comprehensive historical databases that include daily, hourly, and minute-level price data, trading volumes, and market indicators. This historical context enables the AI to identify seasonal patterns, support and resistance levels, and long-term trends that inform investment decisions. The LLM component can analyze this historical data to identify narrative patterns and contextual relationships that traditional statistical methods might miss.


News and sentiment data represent another crucial information source where LLMs provide exceptional value. The system must monitor financial news feeds, social media sentiment, and analyst reports to gauge market sentiment and identify potential catalysts for price movements. LLMs excel at processing unstructured text sources, extracting relevant information, understanding context and nuance, and identifying subtle sentiment indicators that traditional keyword-based approaches might miss.


Economic indicators and political developments require specialized data feeds that track government announcements, central bank decisions, economic reports, and geopolitical events. These macro-level factors often drive broad market movements and must be incorporated into the analysis framework. LLMs can interpret complex economic reports, understand policy implications, and reason about potential market impacts in ways that traditional rule-based systems cannot.


Real-Time Data Processing Engine with LLM Enhancement


The data processing engine serves as the central nervous system of the agentic AI, responsible for ingesting, cleaning, and preparing data for analysis. This component must handle high-frequency data streams while maintaining data quality and consistency across different sources. The integration of LLMs enhances this processing by adding sophisticated text understanding and contextual analysis capabilities.


The processing engine employs a streaming architecture that can handle thousands of data points per second without introducing significant latency. Apache Kafka or similar message queuing systems provide the backbone for data distribution, ensuring that all system components receive timely updates. The LLM components are designed to operate in parallel with traditional processing pipelines, providing enhanced insights without blocking critical data flows.



import pandas as pd

import numpy as np

import asyncio

from typing import Dict, List, Tuple

from datetime import datetime, timedelta

from transformers import pipeline

import openai


class LLMEnhancedDataProcessor:

    def __init__(self, llm_api_key: str):

        self.price_buffers = {}

        self.technical_indicators = {}

        self.anomaly_detector = AnomalyDetector()

        self.news_buffer = {}

        

        # Initialize LLM components

        self.llm_client = openai.OpenAI(api_key=llm_api_key)

        self.sentiment_analyzer = pipeline("sentiment-analysis", 

                                         model="ProsusAI/finbert")

        self.summarizer = pipeline("summarization", 

                                 model="facebook/bart-large-cnn")

        self.ner_pipeline = pipeline("ner", 

                                   model="dbmdz/bert-large-cased-finetuned-conll03-english")

        

        # LLM processing queue for async operations

        self.llm_queue = asyncio.Queue()

        self.llm_results = {}

        

    async def process_price_update_with_llm(self, symbol: str, price_data: Dict) -> Dict:

        """Process new price data with LLM-enhanced analysis"""

        if symbol not in self.price_buffers:

            self.price_buffers[symbol] = []

        

        # Add new price point to buffer

        self.price_buffers[symbol].append(price_data)

        

        # Maintain rolling window of recent prices

        if len(self.price_buffers[symbol]) > 1000:

            self.price_buffers[symbol] = self.price_buffers[symbol][-1000:]

        

        # Calculate traditional technical indicators

        indicators = self.calculate_technical_indicators(symbol)

        

        # Detect anomalies in price movement

        anomaly_score = self.anomaly_detector.detect_anomaly(

            symbol, price_data['price']

        )

        

        # LLM-enhanced pattern analysis

        llm_analysis = await self.analyze_patterns_with_llm(symbol, indicators, anomaly_score)

        

        # Combine traditional and LLM analysis

        enhanced_analysis = {

            'symbol': symbol,

            'current_price': price_data['price'],

            'technical_indicators': indicators,

            'anomaly_score': anomaly_score,

            'llm_insights': llm_analysis,

            'timestamp': datetime.now()

        }

        

        return enhanced_analysis

    

    async def analyze_patterns_with_llm(self, symbol: str, indicators: Dict, anomaly_score: float) -> Dict:

        """Use LLM to analyze technical patterns and provide insights"""

        

        # Prepare technical analysis context

        technical_context = self.prepare_technical_context(symbol, indicators, anomaly_score)

        

        # Create prompt for LLM analysis

        prompt = f"""

        Analyze the following technical indicators for {symbol}:

        

        {technical_context}

        

        Anomaly Score: {anomaly_score:.3f} (0=normal, 1=highly unusual)

        

        Please provide:

        1. Overall technical outlook (bullish/bearish/neutral)

        2. Key support and resistance levels

        3. Notable pattern formations

        4. Risk factors to monitor

        5. Confidence level in your analysis (0-100%)

        

        Keep response concise and actionable for trading decisions.

        """

        

        try:

            # Get LLM analysis

            llm_response = await self.get_llm_analysis(prompt)

            

            # Extract structured insights from LLM response

            structured_insights = self.extract_structured_insights(llm_response)

            

            # Combine with sentiment analysis

            sentiment_result = self.sentiment_analyzer(llm_response[:512])

            

            return {

                'raw_analysis': llm_response,

                'structured_insights': structured_insights,

                'technical_sentiment': sentiment_result[0]['label'],

                'technical_sentiment_score': sentiment_result[0]['score'],

                'analysis_timestamp': datetime.now()

            }

            

        except Exception as e:

            print(f"LLM pattern analysis error: {e}")

            return {

                'raw_analysis': 'Analysis unavailable due to processing error',

                'structured_insights': {},

                'technical_sentiment': 'NEUTRAL',

                'technical_sentiment_score': 0.5,

                'analysis_timestamp': datetime.now()

            }

    

    async def process_news_with_llm(self, news_articles: List[Dict], symbol: str) -> Dict:

        """Process news articles with LLM for enhanced understanding"""

        

        if not news_articles:

            return {'summary': 'No recent news available', 'sentiment': 'neutral', 'key_themes': []}

        

        # Combine recent articles for analysis

        combined_text = self.combine_news_articles(news_articles)

        

        # Generate summary using LLM

        news_summary = await self.generate_news_summary(combined_text, symbol)

        

        # Extract key entities and themes

        entities = self.extract_entities_from_news(combined_text)

        key_themes = await self.extract_key_themes(combined_text, symbol)

        

        # Analyze overall sentiment

        overall_sentiment = self.analyze_news_sentiment(news_articles)

        

        # Assess market impact potential

        impact_assessment = await self.assess_news_impact(news_summary, symbol)

        

        return {

            'summary': news_summary,

            'sentiment': overall_sentiment,

            'key_themes': key_themes,

            'entities': entities,

            'impact_assessment': impact_assessment,

            'article_count': len(news_articles),

            'analysis_timestamp': datetime.now()

        }

    

    async def generate_news_summary(self, combined_text: str, symbol: str) -> str:

        """Generate comprehensive news summary using LLM"""

        

        # If text is too long, use extractive summarization first

        if len(combined_text) > 2000:

            extractive_summary = self.summarizer(combined_text[:2000], 

                                                max_length=300, 

                                                min_length=100, 

                                                do_sample=False)[0]['summary_text']

        else:

            extractive_summary = combined_text

        

        # Use LLM for abstractive summarization with financial focus

        prompt = f"""

        Summarize the following news content related to {symbol}, focusing on:

        1. Key financial developments

        2. Market-moving events

        3. Potential impact on stock price

        4. Important dates or deadlines

        

        News content:

        {extractive_summary}

        

        Provide a concise summary highlighting the most important information for investment decisions.

        """

        

        try:

            summary = await self.get_llm_analysis(prompt)

            return summary

        except Exception as e:

            print(f"News summarization error: {e}")

            return extractive_summary[:500] + "..." if len(extractive_summary) > 500 else extractive_summary

    

    async def extract_key_themes(self, text: str, symbol: str) -> List[str]:

        """Extract key themes from news using LLM"""

        

        prompt = f"""

        Extract the main themes and topics from the following news content about {symbol}.

        Focus on themes that could impact stock performance such as:

        - Financial performance

        - Product launches

        - Regulatory issues

        - Market expansion

        - Management changes

        - Industry trends

        

        News content:

        {text[:1500]}

        

        Return a list of 3-7 key themes, each as a brief phrase.

        """

        

        try:

            themes_response = await self.get_llm_analysis(prompt)

            # Parse themes from response

            themes = self.parse_themes_from_response(themes_response)

            return themes

        except Exception as e:

            print(f"Theme extraction error: {e}")

            return ['General market news']

    

    async def assess_news_impact(self, news_summary: str, symbol: str) -> Dict:

        """Assess potential market impact of news using LLM"""

        

        prompt = f"""

        Assess the potential market impact of the following news summary for {symbol}:

        

        {news_summary}

        

        Provide assessment on:

        1. Impact magnitude (Low/Medium/High)

        2. Impact direction (Positive/Negative/Neutral)

        3. Time horizon (Immediate/Short-term/Long-term)

        4. Confidence level (0-100%)

        5. Key risk factors

        

        Format as structured analysis suitable for algorithmic processing.

        """

        

        try:

            impact_response = await self.get_llm_analysis(prompt)

            structured_impact = self.parse_impact_assessment(impact_response)

            return structured_impact

        except Exception as e:

            print(f"Impact assessment error: {e}")

            return {

                'magnitude': 'Medium',

                'direction': 'Neutral',

                'time_horizon': 'Short-term',

                'confidence': 50,

                'risk_factors': ['Analysis unavailable']

            }

    

    async def get_llm_analysis(self, prompt: str) -> str:

        """Get analysis from LLM with error handling and retries"""

        max_retries = 3

        for attempt in range(max_retries):

            try:

                response = self.llm_client.chat.completions.create(

                    model="gpt-4",

                    messages=[

                        {"role": "system", "content": "You are a senior financial analyst with expertise in technical analysis and market interpretation."},

                        {"role": "user", "content": prompt}

                    ],

                    max_tokens=500,

                    temperature=0.3

                )

                return response.choices[0].message.content

            except Exception as e:

                if attempt == max_retries - 1:

                    raise e

                await asyncio.sleep(2 ** attempt)  # Exponential backoff

    

    def calculate_technical_indicators(self, symbol: str) -> Dict:

        """Calculate various technical indicators for the symbol"""

        if len(self.price_buffers[symbol]) < 20:

            return {}

        

        prices = [point['price'] for point in self.price_buffers[symbol]]

        volumes = [point['volume'] for point in self.price_buffers[symbol]]

        

        # Simple Moving Averages

        sma_20 = np.mean(prices[-20:])

        sma_50 = np.mean(prices[-50:]) if len(prices) >= 50 else None

        

        # Exponential Moving Average

        ema_12 = self.calculate_ema(prices, 12)

        ema_26 = self.calculate_ema(prices, 26)

        

        # MACD

        macd = ema_12 - ema_26 if ema_12 and ema_26 else None

        

        # Relative Strength Index

        rsi = self.calculate_rsi(prices)

        

        # Bollinger Bands

        bb_upper, bb_lower = self.calculate_bollinger_bands(prices)

        

        return {

            'sma_20': sma_20,

            'sma_50': sma_50,

            'ema_12': ema_12,

            'ema_26': ema_26,

            'macd': macd,

            'rsi': rsi,

            'bollinger_upper': bb_upper,

            'bollinger_lower': bb_lower,

            'volume_avg': np.mean(volumes[-20:])

        }

    

    def prepare_technical_context(self, symbol: str, indicators: Dict, anomaly_score: float) -> str:

        """Prepare technical analysis context for LLM"""

        context_lines = []

        

        if not indicators:

            return "Insufficient data for technical analysis."

        

        context_lines.append(f"Technical Indicators for {symbol}:")

        

        # Moving averages

        if indicators.get('sma_20') and indicators.get('sma_50'):

            ma_trend = "Bullish" if indicators['sma_20'] > indicators['sma_50'] else "Bearish"

            context_lines.append(f"Moving Average Trend: {ma_trend}")

            context_lines.append(f"SMA 20: ${indicators['sma_20']:.2f}")

            context_lines.append(f"SMA 50: ${indicators['sma_50']:.2f}")

        

        # RSI

        if indicators.get('rsi'):

            rsi = indicators['rsi']

            rsi_condition = "Overbought" if rsi > 70 else "Oversold" if rsi < 30 else "Neutral"

            context_lines.append(f"RSI: {rsi:.1f} ({rsi_condition})")

        

        # MACD

        if indicators.get('macd'):

            macd_signal = "Bullish" if indicators['macd'] > 0 else "Bearish"

            context_lines.append(f"MACD: {indicators['macd']:.3f} ({macd_signal})")

        

        # Bollinger Bands

        if indicators.get('bollinger_upper') and indicators.get('bollinger_lower'):

            current_price = self.price_buffers[symbol][-1]['price']

            bb_position = ((current_price - indicators['bollinger_lower']) / 

                          (indicators['bollinger_upper'] - indicators['bollinger_lower'])) * 100

            context_lines.append(f"Bollinger Band Position: {bb_position:.1f}%")

        

        # Volume analysis

        if indicators.get('volume_avg'):

            recent_volume = self.price_buffers[symbol][-1]['volume']

            volume_ratio = recent_volume / indicators['volume_avg']

            volume_description = "High" if volume_ratio > 1.5 else "Low" if volume_ratio < 0.5 else "Normal"

            context_lines.append(f"Volume: {volume_description} ({volume_ratio:.1f}x average)")

        

        return "\n".join(context_lines)



Data quality assurance represents a critical function within the processing engine. Financial data can contain errors, gaps, or inconsistencies that could lead to incorrect analysis. The system implements multiple validation layers that check for data completeness, detect outliers, and cross-reference information across multiple sources to ensure accuracy. LLMs can assist in this process by identifying inconsistencies in textual data and flagging potential data quality issues based on contextual understanding.


The processing engine also handles data normalization and standardization across different sources. Market data from various providers may use different formats, time zones, or conventions. The system must reconcile these differences to create a unified data model that supports consistent analysis. LLMs help in this process by understanding and standardizing textual data formats and extracting structured information from unstructured sources.


Historical Data Analysis and Pattern Recognition with LLM Enhancement


Historical data analysis forms the foundation for understanding market behavior and identifying patterns that can inform future predictions. The agentic AI must be capable of analyzing years or decades of historical data to identify recurring patterns, seasonal trends, and long-term market cycles. The integration of LLMs enhances this analysis by providing sophisticated pattern interpretation and the ability to understand historical context and narrative patterns.


Pattern recognition algorithms scan historical price data to identify technical patterns such as head and shoulders formations, triangles, flags, and other chart patterns that traders commonly use for decision-making. The system employs machine learning techniques to automatically detect these patterns and assess their reliability based on historical performance. LLMs add value by interpreting these patterns in context and providing nuanced analysis of their significance.



import numpy as np

from sklearn.ensemble import RandomForestClassifier

from sklearn.preprocessing import StandardScaler

from typing import List, Dict, Tuple

import openai

import asyncio


class LLMEnhancedPatternRecognizer:

    def __init__(self, llm_api_key: str):

        self.pattern_classifier = RandomForestClassifier(n_estimators=100)

        self.scaler = StandardScaler()

        self.trained = False

        

        # Initialize LLM components

        self.llm_client = openai.OpenAI(api_key=llm_api_key)

        self.pattern_descriptions = {}

        self.historical_context = {}

        

    def extract_features(self, price_series: List[float], window_size: int = 20) -> np.ndarray:

        """Extract features from price series for pattern recognition"""

        if len(price_series) < window_size:

            return np.array([])

        

        features = []

        

        # Price-based features

        current_price = price_series[-1]

        price_change = (current_price - price_series[-2]) / price_series[-2]

        

        # Moving averages

        sma_5 = np.mean(price_series[-5:])

        sma_10 = np.mean(price_series[-10:])

        sma_20 = np.mean(price_series[-20:])

        

        # Volatility measures

        returns = [(price_series[i] - price_series[i-1]) / price_series[i-1] 

                  for i in range(1, len(price_series))]

        volatility = np.std(returns[-20:])

        

        # Momentum indicators

        momentum_5 = (current_price - price_series[-6]) / price_series[-6]

        momentum_10 = (current_price - price_series[-11]) / price_series[-11]

        

        # Support and resistance levels

        recent_high = max(price_series[-20:])

        recent_low = min(price_series[-20:])

        price_position = (current_price - recent_low) / (recent_high - recent_low)

        

        features.extend([

            price_change, sma_5/current_price, sma_10/current_price, 

            sma_20/current_price, volatility, momentum_5, momentum_10,

            price_position

        ])

        

        return np.array(features)

    

    async def identify_chart_patterns_with_llm(self, symbol: str, price_series: List[float], 

                                             timestamps: List[datetime] = None) -> Dict[str, Dict]:

        """Identify common chart patterns with LLM interpretation"""

        patterns = {}

        

        if len(price_series) < 50:

            return patterns

        

        # Traditional pattern detection

        traditional_patterns = self.identify_traditional_patterns(price_series)

        

        # LLM-enhanced pattern analysis

        for pattern_name, confidence in traditional_patterns.items():

            if confidence > 0.3:  # Only analyze patterns with reasonable confidence

                llm_analysis = await self.analyze_pattern_with_llm(

                    symbol, pattern_name, price_series, confidence, timestamps

                )

                

                patterns[pattern_name] = {

                    'confidence': confidence,

                    'llm_analysis': llm_analysis,

                    'trading_implications': llm_analysis.get('trading_implications', {}),

                    'historical_performance': await self.get_historical_pattern_performance(

                        symbol, pattern_name

                    )

                }

        

        return patterns

    

    def identify_traditional_patterns(self, prices: List[float]) -> Dict[str, float]:

        """Identify traditional chart patterns with confidence scores"""

        patterns = {}

        

        # Head and Shoulders pattern detection

        patterns['head_shoulders'] = self.detect_head_shoulders(prices)

        

        # Double top/bottom detection

        patterns['double_top'] = self.detect_double_top(prices)

        patterns['double_bottom'] = self.detect_double_bottom(prices)

        

        # Triangle patterns

        patterns['ascending_triangle'] = self.detect_ascending_triangle(prices)

        patterns['descending_triangle'] = self.detect_descending_triangle(prices)

        

        # Flag and pennant patterns

        patterns['bull_flag'] = self.detect_bull_flag(prices)

        patterns['bear_flag'] = self.detect_bear_flag(prices)

        

        # Cup and handle

        patterns['cup_and_handle'] = self.detect_cup_and_handle(prices)

        

        # Wedge patterns

        patterns['rising_wedge'] = self.detect_rising_wedge(prices)

        patterns['falling_wedge'] = self.detect_falling_wedge(prices)

        

        return patterns

    

    async def analyze_pattern_with_llm(self, symbol: str, pattern_name: str, 

                                     price_series: List[float], confidence: float,

                                     timestamps: List[datetime] = None) -> Dict:

        """Use LLM to provide detailed pattern analysis and interpretation"""

        

        # Prepare pattern context

        pattern_context = self.prepare_pattern_context(

            symbol, pattern_name, price_series, confidence, timestamps

        )

        

        # Create comprehensive analysis prompt

        prompt = f"""

        Analyze the {pattern_name} pattern detected in {symbol} with {confidence:.1%} confidence.

        

        Pattern Context:

        {pattern_context}

        

        Please provide detailed analysis including:

        

        1. Pattern Validity Assessment:

           - Is this a textbook example of the pattern?

           - What factors support or weaken the pattern formation?

           - Reliability score (0-100%)

        

        2. Trading Implications:

           - Expected price direction and magnitude

           - Key breakout/breakdown levels

           - Suggested entry and exit points

           - Stop-loss recommendations

        

        3. Risk Assessment:

           - Probability of pattern failure

           - Alternative scenarios to consider

           - Risk management considerations

        

        4. Market Context:

           - How does current market environment affect pattern reliability?

           - Sector-specific considerations for {symbol}

           - Volume confirmation requirements

        

        5. Historical Performance:

           - How has this pattern performed historically for similar stocks?

           - Success rate expectations

           - Average price targets achieved

        

        Provide actionable insights for trading decisions.

        """

        

        try:

            llm_response = await self.get_llm_pattern_analysis(prompt)

            structured_analysis = self.parse_pattern_analysis(llm_response)

            

            return {

                'raw_analysis': llm_response,

                'validity_assessment': structured_analysis.get('validity', {}),

                'trading_implications': structured_analysis.get('trading', {}),

                'risk_assessment': structured_analysis.get('risk', {}),

                'market_context': structured_analysis.get('context', {}),

                'historical_performance': structured_analysis.get('historical', {}),

                'analysis_timestamp': datetime.now()

            }

            

        except Exception as e:

            print(f"LLM pattern analysis error for {pattern_name}: {e}")

            return {

                'raw_analysis': f'Analysis unavailable for {pattern_name}',

                'validity_assessment': {'reliability_score': confidence * 100},

                'trading_implications': {'direction': 'uncertain'},

                'risk_assessment': {'failure_probability': 'unknown'},

                'analysis_timestamp': datetime.now()

            }

    

    async def analyze_seasonal_patterns_with_llm(self, symbol: str, 

                                               historical_data: Dict) -> Dict:

        """Analyze seasonal patterns with LLM interpretation"""

        

        seasonal_analysis = self.calculate_seasonal_statistics(historical_data)

        

        # Prepare seasonal context for LLM

        seasonal_context = self.prepare_seasonal_context(symbol, seasonal_analysis)

        

        prompt = f"""

        Analyze the seasonal patterns for {symbol} based on historical data:

        

        {seasonal_context}

        

        Provide insights on:

        1. Strongest seasonal trends and their likely causes

        2. Best and worst months/quarters for performance

        3. Seasonal trading strategies

        4. Risk factors that could disrupt seasonal patterns

        5. Current seasonal positioning and recommendations

        

        Consider industry-specific factors that might drive these patterns.

        """

        

        try:

            llm_analysis = await self.get_llm_pattern_analysis(prompt)

            

            return {

                'seasonal_statistics': seasonal_analysis,

                'llm_interpretation': llm_analysis,

                'trading_calendar': self.extract_trading_calendar(llm_analysis),

                'risk_factors': self.extract_seasonal_risks(llm_analysis),

                'current_recommendation': self.extract_current_seasonal_recommendation(llm_analysis)

            }

            

        except Exception as e:

            print(f"Seasonal analysis error: {e}")

            return {

                'seasonal_statistics': seasonal_analysis,

                'llm_interpretation': 'Analysis unavailable',

                'analysis_timestamp': datetime.now()

            }

    

    async def get_historical_pattern_performance(self, symbol: str, pattern_name: str) -> Dict:

        """Get historical performance data for specific pattern"""

        

        # This would typically query a historical database

        # For demonstration, we'll use LLM to provide general pattern performance insights

        

        prompt = f"""

        Provide historical performance statistics for {pattern_name} patterns in stocks similar to {symbol}.

        

        Include:

        1. Success rate (percentage of patterns that achieve target)

        2. Average price movement magnitude

        3. Typical timeframe for pattern completion

        4. Common failure modes

        5. Market conditions that favor this pattern

        

        Base your response on established technical analysis research and market observations.

        """

        

        try:

            performance_analysis = await self.get_llm_pattern_analysis(prompt)

            structured_performance = self.parse_performance_analysis(performance_analysis)

            

            return {

                'success_rate': structured_performance.get('success_rate', 'Unknown'),

                'average_movement': structured_performance.get('average_movement', 'Unknown'),

                'timeframe': structured_performance.get('timeframe', 'Unknown'),

                'failure_modes': structured_performance.get('failure_modes', []),

                'favorable_conditions': structured_performance.get('favorable_conditions', []),

                'raw_analysis': performance_analysis

            }

            

        except Exception as e:

            print(f"Historical performance analysis error: {e}")

            return {'analysis': 'Historical data unavailable'}

    

    def prepare_pattern_context(self, symbol: str, pattern_name: str, 

                              price_series: List[float], confidence: float,

                              timestamps: List[datetime] = None) -> str:

        """Prepare detailed context for pattern analysis"""

        

        context_lines = []

        context_lines.append(f"Symbol: {symbol}")

        context_lines.append(f"Pattern: {pattern_name}")

        context_lines.append(f"Detection Confidence: {confidence:.1%}")

        context_lines.append("")

        

        # Price statistics

        current_price = price_series[-1]

        price_range = max(price_series) - min(price_series)

        volatility = np.std(price_series) / np.mean(price_series)

        

        context_lines.append("Price Statistics:")

        context_lines.append(f"Current Price: ${current_price:.2f}")

        context_lines.append(f"Range: ${price_range:.2f}")

        context_lines.append(f"Volatility: {volatility:.1%}")

        context_lines.append("")

        

        # Recent price action (last 20 points)

        context_lines.append("Recent Price Action:")

        recent_prices = price_series[-20:]

        for i in range(0, len(recent_prices), 5):  # Every 5th point to keep it concise

            if timestamps and i < len(timestamps):

                timestamp_str = timestamps[-(20-i)].strftime("%Y-%m-%d")

                context_lines.append(f"{timestamp_str}: ${recent_prices[i]:.2f}")

            else:

                context_lines.append(f"Point {i}: ${recent_prices[i]:.2f}")

        

        # Key levels

        support_level = min(price_series[-50:]) if len(price_series) >= 50 else min(price_series)

        resistance_level = max(price_series[-50:]) if len(price_series) >= 50 else max(price_series)

        

        context_lines.append("")

        context_lines.append("Key Levels:")

        context_lines.append(f"Support: ${support_level:.2f}")

        context_lines.append(f"Resistance: ${resistance_level:.2f}")

        

        return "\n".join(context_lines)

    

    async def get_llm_pattern_analysis(self, prompt: str) -> str:

        """Get pattern analysis from LLM with specialized system prompt"""

        

        system_prompt = """You are a senior technical analyst with 20+ years of experience in pattern recognition and trading. 

        You have deep expertise in chart patterns, market psychology, and risk management. 

        Provide detailed, actionable analysis based on established technical analysis principles. 

        Be specific about price levels, timeframes, and risk parameters."""

        

        try:

            response = self.llm_client.chat.completions.create(

                model="gpt-4",

                messages=[

                    {"role": "system", "content": system_prompt},

                    {"role": "user", "content": prompt}

                ],

                max_tokens=800,

                temperature=0.2  # Lower temperature for more consistent analysis

            )

            return response.choices[0].message.content

        except Exception as e:

            raise e

    

    def detect_head_shoulders(self, prices: List[float]) -> float:

        """Detect head and shoulders pattern with confidence score"""

        if len(prices) < 30:

            return 0.0

        

        # Find local maxima

        peaks = []

        for i in range(2, len(prices) - 2):

            if (prices[i] > prices[i-1] and prices[i] > prices[i+1] and

                prices[i] > prices[i-2] and prices[i] > prices[i+2]):

                peaks.append((i, prices[i]))

        

        if len(peaks) < 3:

            return 0.0

        

        # Check for head and shoulders formation

        for i in range(len(peaks) - 2):

            left_shoulder = peaks[i]

            head = peaks[i + 1]

            right_shoulder = peaks[i + 2]

            

            # Head should be higher than both shoulders

            if (head[1] > left_shoulder[1] and head[1] > right_shoulder[1]):

                # Shoulders should be approximately equal

                shoulder_ratio = min(left_shoulder[1], right_shoulder[1]) / max(left_shoulder[1], right_shoulder[1])

                

                if shoulder_ratio > 0.95:  # Within 5% of each other

                    return min(1.0, shoulder_ratio + 0.05)

        

        return 0.0



Seasonal analysis examines how stocks and market indices perform during different times of the year, months, or even days of the week. Many stocks exhibit seasonal patterns due to factors such as earnings seasons, holiday shopping periods, or tax-related selling. The AI system identifies these patterns and incorporates them into its predictive models. LLMs enhance this analysis by understanding the underlying business and economic reasons for seasonal patterns and providing contextual interpretation.


Correlation analysis examines relationships between different stocks, sectors, and market indices. Understanding these correlations helps the system predict how movements in one asset might affect others and identify diversification opportunities or risk concentrations in investment portfolios. LLMs can interpret correlation patterns in the context of business relationships, supply chains, and market dynamics.


The system also analyzes market regime changes, identifying periods of bull markets, bear markets, high volatility, and low volatility. Different market regimes require different investment strategies, and the AI must be capable of recognizing when market conditions have shifted and adjusting its recommendations accordingly. LLMs provide sophisticated interpretation of market regime changes by understanding the underlying economic and psychological factors driving these shifts.

News and Sentiment Analysis Integration with Advanced LLM Processing


News and sentiment analysis provides crucial context for understanding market movements that cannot be explained by technical analysis alone. The agentic AI must be capable of processing vast amounts of textual information from news articles, social media posts, analyst reports, and other sources to gauge market sentiment and identify potential catalysts for price movements. The integration of LLMs significantly enhances this capability by providing sophisticated natural language understanding, context awareness, and reasoning abilities.


The system employs advanced LLM techniques to extract relevant information from unstructured text sources. This includes named entity recognition to identify companies, people, and locations mentioned in news articles, as well as sophisticated sentiment analysis that can understand nuance, sarcasm, and context-dependent meaning that traditional keyword-based approaches might miss.



import requests

import nltk

from textblob import TextBlob

from transformers import pipeline

import re

import openai

import asyncio

from datetime import datetime

from typing import List, Dict, Optional


class AdvancedLLMNewsAnalyzer:

    def __init__(self, api_keys: Dict[str, str]):

        self.api_keys = api_keys

        self.llm_client = openai.OpenAI(api_key=api_keys['openai'])

        

        # Initialize specialized models

        self.sentiment_analyzer = pipeline("sentiment-analysis", 

                                         model="ProsusAI/finbert")

        self.ner_pipeline = pipeline("ner", 

                                   model="dbmdz/bert-large-cased-finetuned-conll03-english")

        self.summarizer = pipeline("summarization", 

                                 model="facebook/bart-large-cnn")

        

        # Cache for processed articles and analysis

        self.news_cache = {}

        self.sentiment_cache = {}

        self.entity_cache = {}

        

    async def comprehensive_news_analysis(self, symbol: str, days_back: int = 7) -> Dict:

        """Perform comprehensive news analysis using advanced LLM techniques"""

        

        # Fetch news articles from multiple sources

        articles = await self.fetch_comprehensive_news(symbol, days_back)

        

        if not articles:

            return self.create_empty_analysis(symbol)

        

        # Parallel processing of different analysis components

        analysis_tasks = [

            self.analyze_sentiment_with_llm(articles, symbol),

            self.extract_market_events(articles, symbol),

            self.analyze_competitive_landscape(articles, symbol),

            self.assess_regulatory_impact(articles, symbol),

            self.evaluate_financial_metrics_mentions(articles, symbol),

            self.predict_market_impact(articles, symbol)

        ]

        

        analysis_results = await asyncio.gather(*analysis_tasks, return_exceptions=True)

        

        # Combine all analysis components

        comprehensive_analysis = {

            'symbol': symbol,

            'analysis_period_days': days_back,

            'article_count': len(articles),

            'sentiment_analysis': analysis_results[0] if not isinstance(analysis_results[0], Exception) else {},

            'market_events': analysis_results[1] if not isinstance(analysis_results[1], Exception) else {},

            'competitive_analysis': analysis_results[2] if not isinstance(analysis_results[2], Exception) else {},

            'regulatory_analysis': analysis_results[3] if not isinstance(analysis_results[3], Exception) else {},

            'financial_metrics': analysis_results[4] if not isinstance(analysis_results[4], Exception) else {},

            'market_impact_prediction': analysis_results[5] if not isinstance(analysis_results[5], Exception) else {},

            'analysis_timestamp': datetime.now()

        }

        

        # Generate executive summary using LLM

        executive_summary = await self.generate_executive_summary(comprehensive_analysis, symbol)

        comprehensive_analysis['executive_summary'] = executive_summary

        

        return comprehensive_analysis

    

    async def analyze_sentiment_with_llm(self, articles: List[Dict], symbol: str) -> Dict:

        """Advanced sentiment analysis using LLM with context understanding"""

        

        if not articles:

            return {'overall_sentiment': 'neutral', 'confidence': 0.0}

        

        # Process articles in batches for efficiency

        batch_size = 5

        sentiment_results = []

        

        for i in range(0, len(articles), batch_size):

            batch = articles[i:i + batch_size]

            batch_analysis = await self.analyze_article_batch_sentiment(batch, symbol)

            sentiment_results.extend(batch_analysis)

        

        # Aggregate sentiment with LLM interpretation

        aggregated_sentiment = await self.aggregate_sentiment_with_llm(sentiment_results, symbol)

        

        return aggregated_sentiment

    

    async def analyze_article_batch_sentiment(self, articles: List[Dict], symbol: str) -> List[Dict]:

        """Analyze sentiment for a batch of articles using LLM"""

        

        batch_results = []

        

        for article in articles:

            # Combine title and content

            full_text = f"{article.get('title', '')}. {article.get('content', '')}"

            

            # LLM-enhanced sentiment analysis

            sentiment_analysis = await self.llm_sentiment_analysis(full_text, symbol)

            

            # Traditional sentiment analysis for comparison

            traditional_sentiment = self.sentiment_analyzer(full_text[:512])

            

            # Entity extraction for relevance scoring

            entities = self.extract_entities(full_text)

            relevance_score = self.calculate_relevance_score(entities, symbol)

            

            batch_results.append({

                'article_id': article.get('id', ''),

                'title': article.get('title', ''),

                'llm_sentiment': sentiment_analysis,

                'traditional_sentiment': traditional_sentiment[0],

                'entities': entities,

                'relevance_score': relevance_score,

                'publication_date': article.get('published_at', ''),

                'source': article.get('source', '')

            })

        

        return batch_results

    

    async def llm_sentiment_analysis(self, text: str, symbol: str) -> Dict:

        """Perform sophisticated sentiment analysis using LLM"""

        

        prompt = f"""

        Analyze the sentiment of the following financial news text regarding {symbol}.

        Consider:

        1. Overall sentiment (positive, negative, neutral)

        2. Sentiment strength (0-100)

        3. Market impact potential (low, medium, high)

        4. Key sentiment drivers

        5. Confidence in assessment (0-100)

        6. Temporal aspects (immediate vs long-term sentiment)

        

        Text to analyze:

        {text[:2000]}

        

        Provide structured analysis focusing on investment implications.

        """

        

        try:

            response = await self.get_llm_response(prompt, temperature=0.2)

            structured_sentiment = self.parse_sentiment_response(response)

            

            return {

                'overall_sentiment': structured_sentiment.get('sentiment', 'neutral'),

                'strength': structured_sentiment.get('strength', 50),

                'market_impact': structured_sentiment.get('market_impact', 'medium'),

                'key_drivers': structured_sentiment.get('drivers', []),

                'confidence': structured_sentiment.get('confidence', 50),

                'temporal_aspect': structured_sentiment.get('temporal', 'immediate'),

                'raw_analysis': response

            }

            

        except Exception as e:

            print(f"LLM sentiment analysis error: {e}")

            return {

                'overall_sentiment': 'neutral',

                'strength': 50,

                'confidence': 0,

                'error': str(e)

            }

    

    async def extract_market_events(self, articles: List[Dict], symbol: str) -> Dict:

        """Extract and categorize market-relevant events using LLM"""

        

        # Combine articles for comprehensive event extraction

        combined_content = self.combine_article_content(articles)

        

        prompt = f"""

        Extract and categorize market-relevant events for {symbol} from the following news content:

        

        {combined_content[:3000]}

        

        Categorize events into:

        1. Earnings and Financial Results

        2. Product Launches and Innovations

        3. Management Changes

        4. Regulatory and Legal Issues

        5. Partnerships and Acquisitions

        6. Market Expansion

        7. Competitive Developments

        8. Industry Trends

        

        For each event, provide:

        - Event description

        - Category

        - Potential market impact (positive/negative/neutral)

        - Impact magnitude (low/medium/high)

        - Timeframe (immediate/short-term/long-term)

        

        Focus on events that could significantly impact stock price.

        """

        

        try:

            events_response = await self.get_llm_response(prompt, temperature=0.3)

            structured_events = self.parse_events_response(events_response)

            

            # Prioritize events by impact potential

            prioritized_events = self.prioritize_events(structured_events)

            

            return {

                'total_events': len(structured_events),

                'high_impact_events': [e for e in prioritized_events if e.get('impact_magnitude') == 'high'],

                'categorized_events': self.categorize_events(structured_events),

                'event_timeline': self.create_event_timeline(structured_events),

                'raw_analysis': events_response

            }

            

        except Exception as e:

            print(f"Event extraction error: {e}")

            return {'total_events': 0, 'error': str(e)}

    

    async def analyze_competitive_landscape(self, articles: List[Dict], symbol: str) -> Dict:

        """Analyze competitive landscape mentions using LLM"""

        

        combined_content = self.combine_article_content(articles)

        

        prompt = f"""

        Analyze competitive landscape information for {symbol} from the following content:

        

        {combined_content[:2500]}

        

        Identify:

        1. Direct competitors mentioned

        2. Competitive advantages/disadvantages discussed

        3. Market share implications

        4. Competitive threats or opportunities

        5. Industry positioning changes

        6. Competitive response strategies

        

        Assess how competitive developments might impact {symbol}'s market position and stock performance.

        """

        

        try:

            competitive_response = await self.get_llm_response(prompt, temperature=0.3)

            competitive_analysis = self.parse_competitive_analysis(competitive_response)

            

            return {

                'competitors_mentioned': competitive_analysis.get('competitors', []),

                'competitive_position': competitive_analysis.get('position', 'neutral'),

                'threats': competitive_analysis.get('threats', []),

                'opportunities': competitive_analysis.get('opportunities', []),

                'market_share_impact': competitive_analysis.get('market_share_impact', 'neutral'),

                'raw_analysis': competitive_response

            }

            

        except Exception as e:

            print(f"Competitive analysis error: {e}")

            return {'competitive_position': 'unknown', 'error': str(e)}

    

    async def assess_regulatory_impact(self, articles: List[Dict], symbol: str) -> Dict:

        """Assess regulatory and legal impact using LLM"""

        

        combined_content = self.combine_article_content(articles)

        

        prompt = f"""

        Analyze regulatory and legal developments affecting {symbol}:

        

        {combined_content[:2500]}

        

        Identify:

        1. Regulatory changes or proposals

        2. Legal proceedings or settlements

        3. Compliance issues

        4. Government policy impacts

        5. Industry regulation changes

        6. International regulatory considerations

        

        Assess potential financial and operational impacts on {symbol}.

        """

        

        try:

            regulatory_response = await self.get_llm_response(prompt, temperature=0.2)

            regulatory_analysis = self.parse_regulatory_analysis(regulatory_response)

            

            return {

                'regulatory_developments': regulatory_analysis.get('developments', []),

                'impact_assessment': regulatory_analysis.get('impact', 'neutral'),

                'compliance_issues': regulatory_analysis.get('compliance', []),

                'policy_implications': regulatory_analysis.get('policy', []),

                'risk_level': regulatory_analysis.get('risk_level', 'medium'),

                'raw_analysis': regulatory_response

            }

            

        except Exception as e:

            print(f"Regulatory analysis error: {e}")

            return {'impact_assessment': 'unknown', 'error': str(e)}

    

    async def predict_market_impact(self, articles: List[Dict], symbol: str) -> Dict:

        """Predict overall market impact using comprehensive LLM analysis"""

        

        # Synthesize all available information

        synthesis_content = self.prepare_synthesis_content(articles, symbol)

        

        prompt = f"""

        Based on comprehensive news analysis for {symbol}, predict the likely market impact:

        

        {synthesis_content}

        

        Provide predictions for:

        1. Short-term price impact (1-5 days)

        2. Medium-term impact (1-4 weeks)

        3. Long-term impact (1-6 months)

        4. Volatility expectations

        5. Trading volume implications

        6. Sector spillover effects

        

        Include:

        - Direction (positive/negative/neutral)

        - Magnitude (low/medium/high)

        - Confidence level (0-100%)

        - Key risk factors

        - Catalysts to watch

        

        Base predictions on historical patterns and market behavior.

        """

        

        try:

            impact_response = await self.get_llm_response(prompt, temperature=0.3)

            impact_prediction = self.parse_impact_prediction(impact_response)

            

            return {

                'short_term_impact': impact_prediction.get('short_term', {}),

                'medium_term_impact': impact_prediction.get('medium_term', {}),

                'long_term_impact': impact_prediction.get('long_term', {}),

                'volatility_forecast': impact_prediction.get('volatility', 'medium'),

                'volume_implications': impact_prediction.get('volume', 'normal'),

                'sector_effects': impact_prediction.get('sector_effects', []),

                'key_catalysts': impact_prediction.get('catalysts', []),

                'risk_factors': impact_prediction.get('risks', []),

                'overall_confidence': impact_prediction.get('confidence', 50),

                'raw_analysis': impact_response

            }

            

        except Exception as e:

            print(f"Market impact prediction error: {e}")

            return {'overall_confidence': 0, 'error': str(e)}

    

    async def generate_executive_summary(self, comprehensive_analysis: Dict, symbol: str) -> str:

        """Generate executive summary of all analysis components"""

        

        # Prepare summary of all analysis components

        summary_input = self.prepare_executive_summary_input(comprehensive_analysis, symbol)

        

        prompt = f"""

        Create an executive summary of the comprehensive news analysis for {symbol}:

        

        {summary_input}

        

        The summary should:

        1. Highlight the most important findings

        2. Provide clear investment implications

        3. Identify key risks and opportunities

        4. Give actionable recommendations

        5. Be concise but comprehensive (200-300 words)

        

        Focus on information that would be most valuable for investment decision-making.

        """

        

        try:

            summary = await self.get_llm_response(prompt, temperature=0.3)

            return summary

        except Exception as e:

            print(f"Executive summary generation error: {e}")

            return f"Executive summary unavailable due to processing error: {str(e)}"

    

    async def get_llm_response(self, prompt: str, temperature: float = 0.3) -> str:

        """Get response from LLM with error handling"""

        

        try:

            response = self.llm_client.chat.completions.create(

                model="gpt-4",

                messages=[

                    {"role": "system", "content": "You are a senior financial analyst specializing in news analysis and market impact assessment. Provide detailed, actionable insights based on financial news content."},

                    {"role": "user", "content": prompt}

                ],

                max_tokens=1000,

                temperature=temperature

            )

            return response.choices[0].message.content

        except Exception as e:

            raise e



Real-time sentiment tracking allows the AI to detect sudden shifts in market sentiment that might precede significant price movements. The system monitors social media platforms, financial forums, and news feeds for unusual spikes in negative or positive sentiment that could indicate emerging market trends. LLMs provide sophisticated understanding of context, tone, and implied meaning that enables more accurate sentiment detection.


The integration of news analysis with technical analysis provides a more comprehensive view of market conditions. For example, a technical breakout pattern combined with positive news sentiment provides stronger conviction for a bullish recommendation than technical analysis alone. LLMs can reason about these relationships and provide nuanced interpretation of how different factors interact.


The system must be capable of distinguishing between different types of news and their potential market impact. Earnings announcements, regulatory changes, mergers and acquisitions, and management changes typically have more significant impact on stock prices than general industry news or analyst opinions. LLMs excel at understanding these distinctions and can provide sophisticated analysis of news importance and relevance.


Economic and Political Event Monitoring with LLM Enhancement


Economic and political events represent major drivers of market movements that often override technical patterns and short-term sentiment. The agentic AI must monitor a wide range of economic indicators, government announcements, and geopolitical developments to anticipate their impact on financial markets. The integration of LLMs significantly enhances this capability by providing sophisticated interpretation of complex economic and political information.


Economic indicators such as GDP growth, inflation rates, unemployment figures, and central bank interest rate decisions have systematic effects on different market sectors. The system maintains a comprehensive database of historical relationships between economic announcements and market reactions to predict the likely impact of upcoming events. LLMs enhance this analysis by understanding the nuanced implications of economic data and providing contextual interpretation.



import requests

import pandas as pd

import openai

import asyncio

from datetime import datetime, timedelta

from typing import Dict, List, Optional, Tuple


class LLMEnhancedEconomicEventMonitor:

    def __init__(self, api_keys: Dict[str, str]):

        self.api_keys = api_keys

        self.llm_client = openai.OpenAI(api_key=api_keys['openai'])

        self.economic_calendar = {}

        self.historical_impacts = {}

        self.event_weights = self.initialize_event_weights()

        self.policy_tracker = PolicyTracker(api_keys)

        

    def initialize_event_weights(self) -> Dict[str, float]:

        """Initialize weights for different types of economic events"""

        return {

            'interest_rate_decision': 1.0,

            'gdp_announcement': 0.9,

            'inflation_data': 0.8,

            'employment_data': 0.8,

            'trade_balance': 0.6,

            'consumer_confidence': 0.5,

            'manufacturing_pmi': 0.7,

            'retail_sales': 0.6,

            'central_bank_speech': 0.4,

            'political_election': 0.8,

            'trade_war_development': 0.9,

            'geopolitical_crisis': 0.7,

            'regulatory_announcement': 0.6,

            'fiscal_policy_change': 0.8

        }

    

    async def comprehensive_economic_analysis(self, symbols: List[str], 

                                            forecast_days: int = 30) -> Dict:

        """Perform comprehensive economic and political analysis"""

        

        # Fetch upcoming economic events

        upcoming_events = await self.fetch_economic_calendar_with_llm(forecast_days)

        

        # Analyze current economic environment

        economic_environment = await self.analyze_economic_environment()

        

        # Monitor political developments

        political_analysis = await self.analyze_political_developments()

        

        # Central bank communication analysis

        central_bank_analysis = await self.analyze_central_bank_communications()

        

        # Geopolitical risk assessment

        geopolitical_risks = await self.assess_geopolitical_risks()

        

        # Generate symbol-specific impact forecasts

        symbol_forecasts = {}

        for symbol in symbols:

            symbol_forecasts[symbol] = await self.create_symbol_impact_forecast(

                symbol, upcoming_events, economic_environment, political_analysis

            )

        

        # Synthesize overall market outlook

        market_outlook = await self.synthesize_market_outlook(

            economic_environment, political_analysis, central_bank_analysis, geopolitical_risks

        )

        

        return {

            'upcoming_events': upcoming_events,

            'economic_environment': economic_environment,

            'political_analysis': political_analysis,

            'central_bank_analysis': central_bank_analysis,

            'geopolitical_risks': geopolitical_risks,

            'symbol_forecasts': symbol_forecasts,

            'market_outlook': market_outlook,

            'analysis_timestamp': datetime.now()

        }

    

    async def fetch_economic_calendar_with_llm(self, days_ahead: int = 30) -> List[Dict]:

        """Fetch and analyze upcoming economic events with LLM interpretation"""

        

        # Fetch raw economic calendar data

        raw_events = await self.fetch_raw_economic_calendar(days_ahead)

        

        # Enhance each event with LLM analysis

        enhanced_events = []

        for event in raw_events:

            enhanced_event = await self.enhance_event_with_llm(event)

            enhanced_events.append(enhanced_event)

        

        # Prioritize events by market impact potential

        prioritized_events = await self.prioritize_events_with_llm(enhanced_events)

        

        return prioritized_events

    

    async def enhance_event_with_llm(self, event: Dict) -> Dict:

        """Enhance economic event with LLM analysis and interpretation"""

        

        event_description = f"""

        Economic Event: {event.get('name', 'Unknown')}

        Date: {event.get('date', 'Unknown')}

        Country: {event.get('country', 'Unknown')}

        Previous Value: {event.get('previous', 'N/A')}

        Forecast: {event.get('forecast', 'N/A')}

        Importance: {event.get('importance', 'Medium')}

        """

        

        prompt = f"""

        Analyze the following economic event and its potential market impact:

        

        {event_description}

        

        Provide analysis on:

        1. Market significance and why this event matters

        2. Potential impact on different asset classes (stocks, bonds, currencies)

        3. Sector-specific implications

        4. Historical context and typical market reactions

        5. Key scenarios and their probabilities

        6. Risk factors that could amplify or dampen the impact

        7. Recommended monitoring approach

        

        Focus on actionable insights for investment decision-making.

        """

        

        try:

            llm_analysis = await self.get_llm_economic_analysis(prompt)

            enhanced_analysis = self.parse_economic_event_analysis(llm_analysis)

            

            # Add LLM insights to the original event

            event['llm_analysis'] = enhanced_analysis

            event['market_significance'] = enhanced_analysis.get('significance', 'medium')

            event['sector_impacts'] = enhanced_analysis.get('sector_impacts', {})

            event['risk_factors'] = enhanced_analysis.get('risk_factors', [])

            event['monitoring_priority'] = enhanced_analysis.get('monitoring_priority', 'medium')

            

            return event

            

        except Exception as e:

            print(f"Event enhancement error: {e}")

            event['llm_analysis'] = {'error': str(e)}

            return event

    

    async def analyze_economic_environment(self) -> Dict:

        """Analyze current economic environment using LLM"""

        

        # Gather recent economic data

        recent_data = await self.gather_recent_economic_data()

        

        prompt = f"""

        Analyze the current economic environment based on recent data:

        

        {recent_data}

        

        Provide comprehensive analysis including:

        1. Overall economic health assessment

        2. Key economic trends and their sustainability

        3. Inflation outlook and monetary policy implications

        4. Employment market conditions

        5. Consumer and business sentiment

        6. International economic relationships

        7. Risk factors and potential economic scenarios

        8. Investment implications by sector

        

        Focus on factors most relevant to equity market performance.

        """

        

        try:

            environment_analysis = await self.get_llm_economic_analysis(prompt)

            structured_analysis = self.parse_environment_analysis(environment_analysis)

            

            return {

                'overall_assessment': structured_analysis.get('assessment', 'neutral'),

                'key_trends': structured_analysis.get('trends', []),

                'inflation_outlook': structured_analysis.get('inflation', {}),

                'employment_conditions': structured_analysis.get('employment', {}),

                'sentiment_indicators': structured_analysis.get('sentiment', {}),

                'risk_scenarios': structured_analysis.get('scenarios', []),

                'sector_implications': structured_analysis.get('sector_implications', {}),

                'raw_analysis': environment_analysis,

                'confidence_level': structured_analysis.get('confidence', 70)

            }

            

        except Exception as e:

            print(f"Economic environment analysis error: {e}")

            return {'overall_assessment': 'unknown', 'error': str(e)}

    

    async def analyze_political_developments(self, region: str = 'global') -> Dict:

        """Analyze political developments with LLM interpretation"""

        

        # Gather recent political news and developments

        political_data = await self.gather_political_developments(region)

        

        prompt = f"""

        Analyze recent political developments and their market implications:

        

        {political_data}

        

        Focus on:

        1. Policy changes affecting business and markets

        2. Regulatory developments and their industry impacts

        3. Trade relationships and international agreements

        4. Political stability and governance issues

        5. Upcoming elections and their potential outcomes

        6. Geopolitical tensions and their economic implications

        7. Fiscal policy directions

        8. Market-relevant political risks and opportunities

        

        Assess both immediate and longer-term market implications.

        """

        

        try:

            political_analysis = await self.get_llm_economic_analysis(prompt)

            structured_political = self.parse_political_analysis(political_analysis)

            

            return {

                'policy_changes': structured_political.get('policy_changes', []),

                'regulatory_developments': structured_political.get('regulatory', []),

                'trade_implications': structured_political.get('trade', {}),

                'political_stability': structured_political.get('stability', 'stable'),

                'election_impacts': structured_political.get('elections', {}),

                'geopolitical_factors': structured_political.get('geopolitical', []),

                'fiscal_policy': structured_political.get('fiscal', {}),

                'market_risks': structured_political.get('risks', []),

                'market_opportunities': structured_political.get('opportunities', []),

                'raw_analysis': political_analysis

            }

            

        except Exception as e:

            print(f"Political analysis error: {e}")

            return {'political_stability': 'unknown', 'error': str(e)}

    

    async def analyze_central_bank_communications(self) -> Dict:

        """Analyze central bank communications using LLM"""

        

        # Gather recent central bank communications

        cb_communications = await self.gather_central_bank_data()

        

        prompt = f"""

        Analyze recent central bank communications and their market implications:

        

        {cb_communications}

        

        Extract insights on:

        1. Monetary policy direction and timeline

        2. Interest rate expectations

        3. Inflation targeting and concerns

        4. Economic growth outlook from central bank perspective

        5. Financial stability considerations

        6. Forward guidance interpretation

        7. Policy divergence between major central banks

        8. Market implications of policy signals

        

        Focus on actionable insights for investment strategy.

        """

        

        try:

            cb_analysis = await self.get_llm_economic_analysis(prompt)

            structured_cb = self.parse_central_bank_analysis(cb_analysis)

            

            return {

                'policy_direction': structured_cb.get('policy_direction', 'neutral'),

                'rate_expectations': structured_cb.get('rate_expectations', {}),

                'inflation_stance': structured_cb.get('inflation_stance', {}),

                'growth_outlook': structured_cb.get('growth_outlook', {}),

                'forward_guidance': structured_cb.get('forward_guidance', ''),

                'policy_divergence': structured_cb.get('policy_divergence', {}),

                'market_implications': structured_cb.get('market_implications', []),

                'key_dates': structured_cb.get('key_dates', []),

                'raw_analysis': cb_analysis

            }

            

        except Exception as e:

            print(f"Central bank analysis error: {e}")

            return {'policy_direction': 'unknown', 'error': str(e)}

    

    async def create_symbol_impact_forecast(self, symbol: str, upcoming_events: List[Dict],

                                          economic_environment: Dict, political_analysis: Dict) -> Dict:

        """Create symbol-specific impact forecast using LLM"""

        

        # Prepare comprehensive context for symbol analysis

        symbol_context = await self.prepare_symbol_context(symbol, upcoming_events, 

                                                          economic_environment, political_analysis)

        

        prompt = f"""

        Create a comprehensive impact forecast for {symbol} based on economic and political analysis:

        

        {symbol_context}

        

        Provide forecast including:

        1. Short-term impact (1-4 weeks)

        2. Medium-term impact (1-3 months)

        3. Long-term impact (3-12 months)

        4. Key event sensitivities

        5. Sector-specific considerations

        6. Risk factors and mitigation strategies

        7. Opportunity identification

        8. Recommended monitoring approach

        

        Include specific price impact expectations and confidence levels.

        """

        

        try:

            forecast_analysis = await self.get_llm_economic_analysis(prompt)

            structured_forecast = self.parse_symbol_forecast(forecast_analysis)

            

            return {

                'symbol': symbol,

                'short_term_forecast': structured_forecast.get('short_term', {}),

                'medium_term_forecast': structured_forecast.get('medium_term', {}),

                'long_term_forecast': structured_forecast.get('long_term', {}),

                'event_sensitivities': structured_forecast.get('sensitivities', []),

                'sector_considerations': structured_forecast.get('sector', {}),

                'risk_factors': structured_forecast.get('risks', []),

                'opportunities': structured_forecast.get('opportunities', []),

                'monitoring_priorities': structured_forecast.get('monitoring', []),

                'confidence_level': structured_forecast.get('confidence', 60),

                'raw_analysis': forecast_analysis

            }

            

        except Exception as e:

            print(f"Symbol forecast error for {symbol}: {e}")

            return {'symbol': symbol, 'error': str(e)}

    

    async def synthesize_market_outlook(self, economic_environment: Dict, 

                                      political_analysis: Dict, central_bank_analysis: Dict,

                                      geopolitical_risks: Dict) -> Dict:

        """Synthesize overall market outlook using LLM"""

        

        # Prepare comprehensive market context

        market_context = self.prepare_market_synthesis_context(

            economic_environment, political_analysis, central_bank_analysis, geopolitical_risks

        )

        

        prompt = f"""

        Synthesize an overall market outlook based on comprehensive analysis:

        

        {market_context}

        

        Provide market outlook including:

        1. Overall market direction and conviction level

        2. Preferred sectors and investment themes

        3. Risk-on vs risk-off environment assessment

        4. Volatility expectations

        5. Key catalysts and inflection points

        6. Tail risks and black swan scenarios

        7. Portfolio positioning recommendations

        8. Timing considerations for major decisions

        

        Focus on actionable investment strategy implications.

        """

        

        try:

            outlook_analysis = await self.get_llm_economic_analysis(prompt)

            structured_outlook = self.parse_market_outlook(outlook_analysis)

            

            return {

                'market_direction': structured_outlook.get('direction', 'neutral'),

                'conviction_level': structured_outlook.get('conviction', 50),

                'preferred_sectors': structured_outlook.get('sectors', []),

                'investment_themes': structured_outlook.get('themes', []),

                'risk_environment': structured_outlook.get('risk_environment', 'balanced'),

                'volatility_outlook': structured_outlook.get('volatility', 'medium'),

                'key_catalysts': structured_outlook.get('catalysts', []),

                'tail_risks': structured_outlook.get('tail_risks', []),

                'positioning_recommendations': structured_outlook.get('positioning', []),

                'timing_considerations': structured_outlook.get('timing', []),

                'raw_analysis': outlook_analysis

            }

            

        except Exception as e:

            print(f"Market outlook synthesis error: {e}")

            return {'market_direction': 'unknown', 'error': str(e)}

    

    async def get_llm_economic_analysis(self, prompt: str) -> str:

        """Get economic analysis from LLM with specialized system prompt"""

        

        system_prompt = """You are a senior macroeconomic analyst and strategist with expertise in:

        - Central bank policy and monetary economics

        - Political economy and policy analysis

        - Market impact assessment of economic events

        - Cross-asset and sector analysis

        - Risk assessment and scenario planning

        

        Provide detailed, nuanced analysis that considers multiple perspectives and scenarios.

        Focus on actionable insights for investment decision-making."""

        

        try:

            response = self.llm_client.chat.completions.create(

                model="gpt-4",

                messages=[

                    {"role": "system", "content": system_prompt},

                    {"role": "user", "content": prompt}

                ],

                max_tokens=1200,

                temperature=0.3

            )

            return response.choices[0].message.content

        except Exception as e:

            raise e



Political event monitoring requires tracking elections, policy announcements, trade negotiations, and geopolitical tensions. The system must understand how different political outcomes might affect various market sectors and individual stocks. For example, healthcare stocks might react differently to election results than defense contractors or renewable energy companies. LLMs provide sophisticated understanding of political nuance and can reason about complex policy implications.


The AI system maintains models that correlate historical economic announcements with subsequent market movements. These models help predict not only the direction of market reactions but also their magnitude and duration. The system learns from each new economic release to improve its predictive accuracy over time. LLMs enhance this process by providing contextual interpretation of economic data and understanding the nuanced factors that might cause market reactions to differ from historical patterns.


Central bank communication analysis represents a particularly important area where LLMs provide significant value. Central bank officials often communicate through carefully worded statements that require sophisticated interpretation to understand their policy intentions. LLMs can analyze the subtle language changes in central bank communications and identify shifts in policy stance that might not be apparent through traditional keyword analysis.


Machine Learning Models for Prediction with LLM Integration


The predictive capabilities of the agentic AI rely on sophisticated machine learning models that can process multiple types of input data and generate probabilistic forecasts of future price movements. These models must be capable of handling the non-linear, time-varying relationships that characterize financial markets. The integration of LLMs enhances these capabilities by providing sophisticated feature engineering, model interpretation, and ensemble reasoning.


The system employs ensemble methods that combine multiple machine learning approaches to improve prediction accuracy and robustness. Different models may excel in different market conditions, and the ensemble approach allows the system to adapt to changing market dynamics. LLMs contribute to this ensemble by providing high-level reasoning about model outputs and helping to weight different predictions based on current market context.



import numpy as np

import pandas as pd

from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor

from sklearn.neural_network import MLPRegressor

from sklearn.preprocessing import StandardScaler, MinMaxScaler

from sklearn.model_selection import TimeSeriesSplit

import tensorflow as tf

from tensorflow.keras.models import Sequential

from tensorflow.keras.layers import LSTM, Dense, Dropout

import openai

import asyncio

from typing import Dict, List, Tuple, Optional

from datetime import datetime


class LLMEnhancedPredictionEnsemble:

    def __init__(self, symbol: str, llm_api_key: str):

        self.symbol = symbol

        self.models = {}

        self.scalers = {}

        self.feature_importance = {}

        self.model_weights = {}

        self.trained = False

        

        # Initialize LLM components

        self.llm_client = openai.OpenAI(api_key=llm_api_key)

        self.feature_interpreter = LLMFeatureInterpreter(llm_api_key)

        self.model_explainer = LLMModelExplainer(llm_api_key)

        

        # Initialize different model types

        self.initialize_models()

        

    def initialize_models(self):

        """Initialize different types of prediction models"""

        # Random Forest for feature importance and non-linear relationships

        self.models['random_forest'] = RandomForestRegressor(

            n_estimators=200,

            max_depth=15,

            min_samples_split=10,

            random_state=42

        )

        

        # Gradient Boosting for sequential learning

        self.models['gradient_boosting'] = GradientBoostingRegressor(

            n_estimators=150,

            learning_rate=0.1,

            max_depth=8,

            random_state=42

        )

        

        # Neural Network for complex pattern recognition

        self.models['neural_network'] = MLPRegressor(

            hidden_layer_sizes=(100, 50, 25),

            activation='relu',

            solver='adam',

            max_iter=500,

            random_state=42

        )

        

        # LSTM for time series patterns

        self.models['lstm'] = None  # Will be built dynamically

        

        # LLM-enhanced ensemble model

        self.models['llm_ensemble'] = LLMEnsembleModel(self.llm_client)

        

        # Initialize scalers for each model

        for model_name in self.models.keys():

            if model_name != 'llm_ensemble':

                self.scalers[model_name] = StandardScaler()

    

    async def prepare_features_with_llm(self, market_data: Dict, news_sentiment: Dict, 

                                      economic_events: List[Dict], lookback_days: int = 60) -> Dict:

        """Prepare feature matrix with LLM-enhanced feature engineering"""

        

        # Traditional feature extraction

        traditional_features = self.extract_traditional_features(market_data, news_sentiment, 

                                                               economic_events, lookback_days)

        

        # LLM-enhanced feature engineering

        llm_features = await self.extract_llm_features(market_data, news_sentiment, economic_events)

        

        # Combine and validate features

        combined_features = self.combine_feature_sets(traditional_features, llm_features)

        

        # LLM-based feature selection and weighting

        feature_analysis = await self.analyze_features_with_llm(combined_features, market_data)

        

        return {

            'features': combined_features,

            'feature_names': self.get_feature_names(),

            'feature_importance': feature_analysis.get('importance', {}),

            'feature_interactions': feature_analysis.get('interactions', []),

            'llm_insights': feature_analysis.get('insights', ''),

            'confidence_factors': feature_analysis.get('confidence_factors', {})

        }

    

    async def extract_llm_features(self, market_data: Dict, news_sentiment: Dict, 

                                 economic_events: List[Dict]) -> Dict:

        """Extract features using LLM analysis"""

        

        # Prepare context for LLM feature extraction

        context = self.prepare_llm_feature_context(market_data, news_sentiment, economic_events)

        

        prompt = f"""

        Analyze the following market context for {self.symbol} and extract relevant features for price prediction:

        

        {context}

        

        Extract and quantify the following types of features:

        1. Market sentiment indicators (0-100 scale)

        2. News impact scores (-100 to +100)

        3. Economic sensitivity factors (0-100)

        4. Technical pattern strength (0-100)

        5. Risk factors (0-100)

        6. Momentum indicators (0-100)

        7. Volatility expectations (0-100)

        8. Sector rotation signals (-100 to +100)

        

        Provide numerical scores and brief explanations for each feature.

        """

        

        try:

            llm_response = await self.get_llm_analysis(prompt)

            llm_features = self.parse_llm_features(llm_response)

            

            return llm_features

            

        except Exception as e:

            print(f"LLM feature extraction error: {e}")

            return {}

    

    async def analyze_features_with_llm(self, features: np.ndarray, market_data: Dict) -> Dict:

        """Analyze feature importance and interactions using LLM"""

        

        # Prepare feature analysis context

        feature_context = self.prepare_feature_analysis_context(features, market_data)

        

        prompt = f"""

        Analyze the feature set for {self.symbol} price prediction:

        

        {feature_context}

        

        Provide analysis on:

        1. Most important features for current market conditions

        2. Feature interactions and dependencies

        3. Potential feature redundancies

        4. Market regime considerations

        5. Feature reliability assessment

        6. Confidence factors affecting predictions

        

        Focus on actionable insights for model weighting and interpretation.

        """

        

        try:

            analysis_response = await self.get_llm_analysis(prompt)

            feature_analysis = self.parse_feature_analysis(analysis_response)

            

            return feature_analysis

            

        except Exception as e:

            print(f"Feature analysis error: {e}")

            return {}

    

    async def train_ensemble_with_llm(self, training_data: pd.DataFrame, target_column: str) -> Dict:

        """Train ensemble models with LLM-enhanced analysis"""

        

        training_results = {}

        

        # Prepare features and targets

        feature_columns = [col for col in training_data.columns if col != target_column]

        X = training_data[feature_columns].values

        y = training_data[target_column].values

        

        # Time series cross-validation

        tscv = TimeSeriesSplit(n_splits=5)

        

        # Train traditional models

        for model_name, model in self.models.items():

            if model_name in ['llm_ensemble']:

                continue

                

            if model_name == 'lstm':

                # Handle LSTM separately due to different input format

                lstm_results = await self.train_lstm_model_with_llm(X, y, tscv)

                training_results[model_name] = lstm_results

                continue

            

            # Scale features for current model

            X_scaled = self.scalers[model_name].fit_transform(X)

            

            # Cross-validation with LLM analysis

            cv_results = await self.cross_validate_with_llm(model, X_scaled, y, tscv, model_name)

            training_results[model_name] = cv_results

            

            # Final training on full dataset

            model.fit(X_scaled, y)

        

        # Train LLM ensemble model

        llm_ensemble_results = await self.train_llm_ensemble(training_data, target_column)

        training_results['llm_ensemble'] = llm_ensemble_results

        

        # Calculate model weights with LLM input

        model_weights = await self.calculate_model_weights_with_llm(training_results)

        self.model_weights = model_weights

        

        # Generate training summary with LLM

        training_summary = await self.generate_training_summary(training_results)

        

        self.trained = True

        

        return {

            'individual_results': training_results,

            'model_weights': model_weights,

            'training_summary': training_summary,

            'feature_importance': self.calculate_ensemble_feature_importance()

        }

    

    async def cross_validate_with_llm(self, model, X: np.ndarray, y: np.ndarray, 

                                    tscv, model_name: str) -> Dict:

        """Perform cross-validation with LLM analysis of results"""

        

        cv_scores = []

        prediction_analyses = []

        

        for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):

            X_train, X_val = X[train_idx], X[val_idx]

            y_train, y_val = y[train_idx], y[val_idx]

            

            # Train model

            model.fit(X_train, y_train)

            

            # Validate

            val_predictions = model.predict(X_val)

            val_score = self.calculate_prediction_accuracy(y_val, val_predictions)

            cv_scores.append(val_score)

            

            # LLM analysis of predictions

            prediction_analysis = await self.analyze_predictions_with_llm(

                y_val, val_predictions, fold, model_name

            )

            prediction_analyses.append(prediction_analysis)

        

        # Aggregate results

        avg_score = np.mean(cv_scores)

        score_std = np.std(cv_scores)

        

        # LLM summary of cross-validation performance

        cv_summary = await self.summarize_cv_performance(cv_scores, prediction_analyses, model_name)

        

        return {

            'cv_score_mean': avg_score,

            'cv_score_std': score_std,

            'cv_scores': cv_scores,

            'prediction_analyses': prediction_analyses,

            'llm_summary': cv_summary,

            'model_reliability': cv_summary.get('reliability_score', 0.5)

        }

    

    async def predict_with_llm_ensemble(self, current_data: Dict, 

                                      prediction_horizon_days: int = 1) -> Dict:

        """Generate ensemble prediction with LLM reasoning"""

        

        if not self.trained:

            raise ValueError("Models must be trained before making predictions")

        

        # Prepare features with LLM enhancement

        feature_data = await self.prepare_features_with_llm(

            current_data['market_data'],

            current_data['sentiment'],

            current_data['economic_events']

        )

        

        features = feature_data['features']

        

        # Get predictions from all models

        model_predictions = {}

        model_confidences = {}

        

        for model_name, model in self.models.items():

            if model is None:

                continue

                

            try:

                if model_name == 'llm_ensemble':

                    pred, conf = await self.predict_with_llm_model(current_data, prediction_horizon_days)

                elif model_name == 'lstm':

                    pred, conf = await self.predict_with_lstm(features, prediction_horizon_days)

                else:

                    # Scale features

                    features_scaled = self.scalers[model_name].transform(features.reshape(1, -1))

                    pred = model.predict(features_scaled)[0]

                    conf = self.calculate_prediction_confidence(model, features_scaled)

                

                model_predictions[model_name] = pred

                model_confidences[model_name] = conf

                

            except Exception as e:

                print(f"Error in {model_name} prediction: {e}")

                continue

        

        # LLM-enhanced ensemble combination

        ensemble_result = await self.combine_predictions_with_llm(

            model_predictions, model_confidences, current_data, prediction_horizon_days

        )

        

        # Generate prediction explanation

        prediction_explanation = await self.explain_prediction_with_llm(

            ensemble_result, model_predictions, current_data

        )

        

        return {

            'ensemble_prediction': ensemble_result['prediction'],

            'confidence': ensemble_result['confidence'],

            'individual_predictions': model_predictions,

            'model_confidences': model_confidences,

            'model_weights_used': ensemble_result['weights_used'],

            'prediction_explanation': prediction_explanation,

            'risk_assessment': ensemble_result.get('risk_assessment', {}),

            'prediction_horizon_days': prediction_horizon_days,

            'timestamp': datetime.now()

        }

    

    async def combine_predictions_with_llm(self, predictions: Dict, confidences: Dict,

                                         current_data: Dict, horizon_days: int) -> Dict:

        """Combine model predictions using LLM reasoning"""

        

        # Prepare prediction context

        prediction_context = self.prepare_prediction_context(predictions, confidences, current_data)

        

        prompt = f"""

        Combine the following model predictions for {self.symbol} using intelligent weighting:

        

        {prediction_context}

        

        Prediction horizon: {horizon_days} days

        

        Consider:

        1. Model reliability in current market conditions

        2. Prediction consistency across models

        3. Market regime appropriateness

        4. Confidence levels and uncertainty

        5. Risk factors that might affect accuracy

        

        Provide:

        1. Final ensemble prediction

        2. Confidence level (0-100)

        3. Model weights used

        4. Risk assessment

        5. Key uncertainty factors

        

        Focus on robust prediction that accounts for model limitations.

        """

        

        try:

            combination_response = await self.get_llm_analysis(prompt)

            combination_result = self.parse_prediction_combination(combination_response)

            

            return combination_result

            

        except Exception as e:

            print(f"LLM prediction combination error: {e}")

            # Fallback to simple weighted average

            return self.fallback_prediction_combination(predictions, confidences)

    

    async def explain_prediction_with_llm(self, ensemble_result: Dict, 

                                        model_predictions: Dict, current_data: Dict) -> str:

        """Generate human-readable explanation of the prediction"""

        

        explanation_context = self.prepare_explanation_context(

            ensemble_result, model_predictions, current_data

        )

        

        prompt = f"""

        Explain the price prediction for {self.symbol} in clear, actionable terms:

        

        {explanation_context}

        

        Provide explanation covering:

        1. Key factors driving the prediction

        2. Confidence rationale

        3. Main risks to the prediction

        4. Market conditions considered

        5. Recommended actions based on prediction

        6. Monitoring points for prediction validation

        

        Write for investment professionals who need to understand and act on the prediction.

        """

        

        try:

            explanation = await self.get_llm_analysis(prompt)

            return explanation

        except Exception as e:

            print(f"Prediction explanation error: {e}")

            return f"Prediction explanation unavailable: {str(e)}"

    

    async def get_llm_analysis(self, prompt: str) -> str:

        """Get analysis from LLM with specialized system prompt for predictions"""

        

        system_prompt = """You are a senior quantitative analyst and portfolio manager with expertise in:

        - Machine learning model interpretation

        - Financial market prediction

        - Risk assessment and management

        - Model ensemble techniques

        - Market regime analysis

        

        Provide detailed, actionable analysis that helps in investment decision-making.

        Consider model limitations and market uncertainties in your analysis."""

        

        try:

            response = self.llm_client.chat.completions.create(

                model="gpt-4",

                messages=[

                    {"role": "system", "content": system_prompt},

                    {"role": "user", "content": prompt}

                ],

                max_tokens=1000,

                temperature=0.3

            )

            return response.choices[0].message.content

        except Exception as e:

            raise e



Deep learning models, particularly Long Short-Term Memory (LSTM) networks, excel at capturing temporal dependencies in financial time series data. These models can learn complex patterns in price movements, trading volumes, and other market variables that traditional statistical methods might miss. LLMs enhance these models by providing contextual interpretation of the patterns they identify and helping to understand when these patterns might break down.


The system continuously evaluates model performance and adjusts model weights based on recent prediction accuracy. Models that perform well in current market conditions receive higher weights in the ensemble, while underperforming models have their influence reduced. LLMs contribute to this process by providing sophisticated reasoning about why certain models might be performing better or worse in specific market conditions.


Feature engineering plays a crucial role in model performance. The system automatically generates hundreds of potential features from raw market data, news sentiment, and economic indicators, then uses feature selection techniques to identify the most predictive variables for each model. LLMs enhance this process by suggesting novel features based on their understanding of market dynamics and by providing interpretation of feature importance.


Decision Making Framework with LLM-Enhanced Reasoning


The decision-making framework represents the core intelligence of the agentic AI system, responsible for synthesizing all available information into actionable investment recommendations. This framework must balance multiple competing objectives, including maximizing returns, minimizing risk, and maintaining portfolio diversification. The integration of LLMs significantly enhances this framework by providing sophisticated reasoning capabilities, contextual understanding, and the ability to generate human-readable explanations for decisions.


The decision engine employs a hierarchical approach that first assesses overall market conditions, then evaluates individual stock opportunities within that context. Market regime detection helps determine whether the current environment favors growth stocks, value stocks, defensive positions, or cash holdings. LLMs enhance this process by providing nuanced interpretation of market conditions and reasoning about how different factors interact.



import numpy as np

from typing import Dict, List, Optional, Tuple

from enum import Enum

from dataclasses import dataclass

from datetime import datetime, timedelta

import openai

import asyncio


class MarketRegime(Enum):

    BULL_MARKET = "bull_market"

    BEAR_MARKET = "bear_market"

    SIDEWAYS_MARKET = "sideways_market"

    HIGH_VOLATILITY = "high_volatility"

    LOW_VOLATILITY = "low_volatility"

    TRANSITION = "transition"


class InvestmentAction(Enum):

    STRONG_BUY = "strong_buy"

    BUY = "buy"

    HOLD = "hold"

    SELL = "sell"

    STRONG_SELL = "strong_sell"


@dataclass

class LLMEnhancedInvestmentRecommendation:

    symbol: str

    action: InvestmentAction

    confidence: float

    target_price: Optional[float]

    stop_loss: Optional[float]

    time_horizon: str

    reasoning: List[str]

    llm_analysis: str

    risk_score: float

    expected_return: float

    position_size_recommendation: float

    key_catalysts: List[str]

    risk_factors: List[str]

    monitoring_points: List[str]

    alternative_scenarios: List[Dict]

    timestamp: datetime


class LLMEnhancedDecisionEngine:

    def __init__(self, risk_tolerance: float = 0.5, llm_api_key: str = None):

        self.risk_tolerance = risk_tolerance  # 0 = very conservative, 1 = very aggressive

        self.llm_client = openai.OpenAI(api_key=llm_api_key)

        self.market_regime_detector = LLMEnhancedMarketRegimeDetector(llm_api_key)

        self.risk_manager = LLMEnhancedRiskManager(llm_api_key)

        self.portfolio_optimizer = LLMEnhancedPortfolioOptimizer(llm_api_key)

        self.decision_history = []

        

    async def make_comprehensive_investment_decision(self, symbol: str, 

                                                   analysis_data: Dict) -> LLMEnhancedInvestmentRecommendation:

        """Make comprehensive investment decision with LLM-enhanced reasoning"""

        

        # Detect current market regime with LLM analysis

        market_regime_analysis = await self.market_regime_detector.detect_regime_with_llm(

            analysis_data['market_data']

        )

        

        # Comprehensive stock assessment with LLM insights

        stock_assessment = await self.assess_stock_opportunity_with_llm(

            symbol, analysis_data, market_regime_analysis

        )

        

        # Advanced risk analysis with LLM interpretation

        risk_analysis = await self.risk_manager.comprehensive_risk_analysis(

            symbol, analysis_data, market_regime_analysis

        )

        

        # LLM-enhanced position sizing

        position_analysis = await self.calculate_optimal_position_size(

            stock_assessment, risk_analysis, market_regime_analysis

        )

        

        # Generate comprehensive recommendation with LLM reasoning

        recommendation = await self.generate_comprehensive_recommendation(

            symbol, stock_assessment, risk_analysis, position_analysis, 

            market_regime_analysis, analysis_data

        )

        

        # Store decision for learning and improvement

        self.decision_history.append({

            'recommendation': recommendation,

            'analysis_data': analysis_data,

            'timestamp': datetime.now()

        })

        

        return recommendation

    

    async def assess_stock_opportunity_with_llm(self, symbol: str, analysis_data: Dict, 

                                              market_regime: Dict) -> Dict:

        """Comprehensive stock assessment with LLM reasoning"""

        

        # Traditional quantitative assessment

        quantitative_scores = self.calculate_quantitative_scores(symbol, analysis_data)

        

        # LLM-enhanced qualitative assessment

        qualitative_analysis = await self.perform_qualitative_assessment(

            symbol, analysis_data, market_regime

        )

        

        # Combine quantitative and qualitative insights

        comprehensive_assessment = await self.synthesize_stock_assessment(

            symbol, quantitative_scores, qualitative_analysis, market_regime

        )

        

        return comprehensive_assessment

    

    async def perform_qualitative_assessment(self, symbol: str, analysis_data: Dict,

                                           market_regime: Dict) -> Dict:

        """Perform qualitative assessment using LLM analysis"""

        

        # Prepare comprehensive context for LLM analysis

        assessment_context = self.prepare_qualitative_context(symbol, analysis_data, market_regime)

        

        prompt = f"""

        Perform comprehensive qualitative assessment for {symbol} investment opportunity:

        

        {assessment_context}

        

        Analyze:

        1. Business fundamentals and competitive position

        2. Management quality and strategic direction

        3. Industry dynamics and growth prospects

        4. Valuation attractiveness in current context

        5. Catalyst potential and timing

        6. ESG considerations and sustainability

        7. Regulatory and political risks

        8. Market sentiment and positioning

        

        Provide detailed assessment with specific reasoning for each factor.

        Include both bullish and bearish perspectives.

        """

        

        try:

            qualitative_response = await self.get_llm_decision_analysis(prompt)

            structured_assessment = self.parse_qualitative_assessment(qualitative_response)

            

            return {

                'business_fundamentals': structured_assessment.get('fundamentals', {}),

                'management_quality': structured_assessment.get('management', {}),

                'industry_dynamics': structured_assessment.get('industry', {}),

                'valuation_assessment': structured_assessment.get('valuation', {}),

                'catalyst_analysis': structured_assessment.get('catalysts', {}),

                'esg_considerations': structured_assessment.get('esg', {}),

                'risk_factors': structured_assessment.get('risks', []),

                'market_sentiment': structured_assessment.get('sentiment', {}),

                'overall_quality_score': structured_assessment.get('overall_score', 50),

                'raw_analysis': qualitative_response

            }

            

        except Exception as e:

            print(f"Qualitative assessment error: {e}")

            return {'overall_quality_score': 50, 'error': str(e)}

    

    async def synthesize_stock_assessment(self, symbol: str, quantitative_scores: Dict,

                                        qualitative_analysis: Dict, market_regime: Dict) -> Dict:

        """Synthesize quantitative and qualitative assessments using LLM"""

        

        synthesis_context = self.prepare_synthesis_context(

            symbol, quantitative_scores, qualitative_analysis, market_regime

        )

        

        prompt = f"""

        Synthesize comprehensive investment assessment for {symbol}:

        

        {synthesis_context}

        

        Provide integrated analysis considering:

        1. Alignment between quantitative metrics and qualitative factors

        2. Market regime appropriateness

        3. Risk-adjusted opportunity assessment

        4. Timing considerations

        5. Position in investment cycle

        6. Relative attractiveness vs alternatives

        7. Scenario analysis and probability weighting

        8. Key decision factors and trade-offs

        

        Generate overall investment thesis with supporting evidence.

        """

        

        try:

            synthesis_response = await self.get_llm_decision_analysis(prompt)

            synthesis_result = self.parse_synthesis_analysis(synthesis_response)

            

            return {

                'investment_thesis': synthesis_result.get('thesis', ''),

                'overall_score': synthesis_result.get('overall_score', 50),

                'conviction_level': synthesis_result.get('conviction', 50),

                'key_strengths': synthesis_result.get('strengths', []),

                'key_weaknesses': synthesis_result.get('weaknesses', []),

                'scenario_analysis': synthesis_result.get('scenarios', []),

                'timing_assessment': synthesis_result.get('timing', {}),

                'relative_attractiveness': synthesis_result.get('relative_attractiveness', 50),

                'quantitative_scores': quantitative_scores,

                'qualitative_analysis': qualitative_analysis,

                'raw_synthesis': synthesis_response

            }

            

        except Exception as e:

            print(f"Assessment synthesis error: {e}")

            return {'overall_score': 50, 'error': str(e)}

    

    async def calculate_optimal_position_size(self, stock_assessment: Dict, 

                                            risk_analysis: Dict, market_regime: Dict) -> Dict:

        """Calculate optimal position size with LLM-enhanced reasoning"""

        

        # Traditional position sizing calculations

        traditional_sizing = self.calculate_traditional_position_size(

            stock_assessment, risk_analysis

        )

        

        # LLM-enhanced position sizing analysis

        position_context = self.prepare_position_sizing_context(

            stock_assessment, risk_analysis, market_regime, traditional_sizing

        )

        

        prompt = f"""

        Determine optimal position sizing for {stock_assessment.get('symbol', 'Unknown')}:

        

        {position_context}

        

        Consider:

        1. Risk-adjusted expected return

        2. Portfolio correlation and diversification

        3. Market regime appropriateness

        4. Liquidity considerations

        5. Volatility and drawdown expectations

        6. Opportunity cost vs alternatives

        7. Implementation timing and scaling

        8. Risk management requirements

        

        Provide position size recommendation with detailed reasoning.

        Include scenarios for different market conditions.

        """

        

        try:

            sizing_response = await self.get_llm_decision_analysis(prompt)

            sizing_analysis = self.parse_position_sizing(sizing_response)

            

            return {

                'recommended_position_size': sizing_analysis.get('position_size', 0.0),

                'sizing_rationale': sizing_analysis.get('rationale', ''),

                'implementation_strategy': sizing_analysis.get('implementation', {}),

                'scaling_plan': sizing_analysis.get('scaling', []),

                'risk_limits': sizing_analysis.get('risk_limits', {}),

                'scenario_adjustments': sizing_analysis.get('scenarios', {}),

                'traditional_sizing': traditional_sizing,

                'raw_analysis': sizing_response

            }

            

        except Exception as e:

            print(f"Position sizing error: {e}")

            return {'recommended_position_size': 0.0, 'error': str(e)}

    

    async def generate_comprehensive_recommendation(self, symbol: str, stock_assessment: Dict,

                                                  risk_analysis: Dict, position_analysis: Dict,

                                                  market_regime: Dict, analysis_data: Dict) -> LLMEnhancedInvestmentRecommendation:

        """Generate comprehensive investment recommendation with LLM reasoning"""

        

        # Prepare comprehensive recommendation context

        recommendation_context = self.prepare_recommendation_context(

            symbol, stock_assessment, risk_analysis, position_analysis, market_regime

        )

        

        prompt = f"""

        Generate comprehensive investment recommendation for {symbol}:

        

        {recommendation_context}

        

        Provide detailed recommendation including:

        1. Investment action (Strong Buy/Buy/Hold/Sell/Strong Sell)

        2. Confidence level and reasoning

        3. Target price with methodology

        4. Stop-loss levels and risk management

        5. Time horizon and milestone expectations

        6. Key catalysts and monitoring points

        7. Alternative scenarios and contingencies

        8. Position sizing and implementation guidance

        

        Structure as actionable investment decision with clear rationale.

        """

        

        try:

            recommendation_response = await self.get_llm_decision_analysis(prompt)

            recommendation_data = self.parse_comprehensive_recommendation(recommendation_response)

            

            # Determine investment action

            action = self.determine_investment_action(

                stock_assessment['overall_score'], 

                risk_analysis['risk_score'],

                market_regime['regime']

            )

            

            # Calculate target price and stop loss

            current_price = analysis_data['market_data'].get('current_price', 0)

            target_price = self.calculate_target_price(

                current_price, stock_assessment, recommendation_data

            )

            stop_loss = self.calculate_stop_loss(

                current_price, action, risk_analysis, recommendation_data

            )

            

            # Generate reasoning list

            reasoning = self.compile_reasoning(

                stock_assessment, risk_analysis, position_analysis, recommendation_data

            )

            

            return LLMEnhancedInvestmentRecommendation(

                symbol=symbol,

                action=action,

                confidence=recommendation_data.get('confidence', 50) / 100.0,

                target_price=target_price,

                stop_loss=stop_loss,

                time_horizon=recommendation_data.get('time_horizon', '3-6 months'),

                reasoning=reasoning,

                llm_analysis=recommendation_response,

                risk_score=risk_analysis.get('risk_score', 0.5),

                expected_return=recommendation_data.get('expected_return', 0.0),

                position_size_recommendation=position_analysis.get('recommended_position_size', 0.0),

                key_catalysts=recommendation_data.get('catalysts', []),

                risk_factors=recommendation_data.get('risk_factors', []),

                monitoring_points=recommendation_data.get('monitoring_points', []),

                alternative_scenarios=recommendation_data.get('scenarios', []),

                timestamp=datetime.now()

            )

            

        except Exception as e:

            print(f"Recommendation generation error: {e}")

            # Return basic recommendation as fallback

            return self.generate_fallback_recommendation(symbol, stock_assessment, risk_analysis)

    

    async def generate_investment_report(self, recommendation: LLMEnhancedInvestmentRecommendation,

                                       analysis_data: Dict) -> str:

        """Generate comprehensive investment report using LLM"""

        

        report_context = self.prepare_report_context(recommendation, analysis_data)

        

        prompt = f"""

        Generate a comprehensive investment research report for {recommendation.symbol}:

        

        {report_context}

        

        Structure the report with:

        1. Executive Summary

        2. Investment Thesis

        3. Fundamental Analysis

        4. Technical Analysis

        5. Risk Assessment

        6. Valuation Analysis

        7. Catalysts and Timeline

        8. Recommendation and Price Targets

        9. Risk Management

        10. Monitoring Framework

        

        Write in professional investment research format suitable for institutional investors.

        Include specific data points, analysis methodology, and clear reasoning.

        """

        

        try:

            report = await self.get_llm_decision_analysis(prompt, max_tokens=2000)

            return report

        except Exception as e:

            print(f"Report generation error: {e}")

            return f"Investment report generation failed: {str(e)}"

    

    async def explain_decision_reasoning(self, recommendation: LLMEnhancedInvestmentRecommendation,

                                       user_question: str = None) -> str:

        """Provide interactive explanation of decision reasoning"""

        

        if user_question:

            explanation_prompt = f"""

            Explain the investment decision for {recommendation.symbol} in response to this question:

            "{user_question}"

            

            Recommendation: {recommendation.action.value}

            Confidence: {recommendation.confidence:.1%}

            Target Price: ${recommendation.target_price}

            

            LLM Analysis: {recommendation.llm_analysis[:1000]}...

            

            Provide detailed explanation addressing the specific question while covering the key decision factors.

            """

        else:

            explanation_prompt = f"""

            Provide a comprehensive explanation of the investment decision for {recommendation.symbol}:

            

            Recommendation: {recommendation.action.value}

            Confidence: {recommendation.confidence:.1%}

            Target Price: ${recommendation.target_price}

            

            Key factors considered:

            - Reasoning: {'; '.join(recommendation.reasoning[:5])}

            - Key Catalysts: {'; '.join(recommendation.key_catalysts[:3])}

            - Risk Factors: {'; '.join(recommendation.risk_factors[:3])}

            

            Explain the decision-making process, key factors, and how they led to this recommendation.

            Address potential concerns and alternative viewpoints.

            """

        

        try:

            explanation = await self.get_llm_decision_analysis(explanation_prompt)

            return explanation

        except Exception as e:

            print(f"Decision explanation error: {e}")

            return f"Decision explanation unavailable: {str(e)}"

    

    async def get_llm_decision_analysis(self, prompt: str, max_tokens: int = 1200) -> str:

        """Get decision analysis from LLM with specialized system prompt"""

        

        system_prompt = """You are a senior portfolio manager and investment strategist with 20+ years of experience. 

        You have expertise in:

        - Fundamental and technical analysis

        - Risk management and portfolio construction

        - Market regime analysis and timing

        - Behavioral finance and market psychology

        - Quantitative modeling and valuation

        

        Provide detailed, actionable investment analysis with clear reasoning.

        Consider multiple perspectives and scenarios in your analysis.

        Focus on practical implementation and risk management."""

        

        try:

            response = self.llm_client.chat.completions.create(

                model="gpt-4",

                messages=[

                    {"role": "system", "content": system_prompt},

                    {"role": "user", "content": prompt}

                ],

                max_tokens=max_tokens,

                temperature=0.3

            )

            return response.choices[0].message.content

        except Exception as e:

            raise e

    

    def determine_investment_action(self, overall_score: float, risk_score: float, 

                                  market_regime: str) -> InvestmentAction:

        """Determine investment action based on comprehensive analysis"""

        

        # Adjust thresholds based on risk tolerance and market regime

        conservative_factor = 1 - self.risk_tolerance

        

        # Base thresholds

        strong_buy_threshold = 80 - (conservative_factor * 10)

        buy_threshold = 65 - (conservative_factor * 5)

        sell_threshold = 35 + (conservative_factor * 5)

        strong_sell_threshold = 20 + (conservative_factor * 10)

        

        # Adjust for market regime

        if market_regime in ['bear_market', 'high_volatility']:

            strong_buy_threshold += 10

            buy_threshold += 5

        elif market_regime == 'bull_market':

            strong_buy_threshold -= 5

            buy_threshold -= 3

        

        # Consider risk score

        if risk_score > 0.7:  # High risk

            strong_buy_threshold += 10

            buy_threshold += 5

        

        # Determine action

        if overall_score >= strong_buy_threshold and risk_score <= 0.6:

            return InvestmentAction.STRONG_BUY

        elif overall_score >= buy_threshold and risk_score <= 0.7:

            return InvestmentAction.BUY

        elif overall_score <= strong_sell_threshold or risk_score >= 0.9:

            return InvestmentAction.STRONG_SELL

        elif overall_score <= sell_threshold or risk_score >= 0.8:

            return InvestmentAction.SELL

        else:

            return InvestmentAction.HOLD



The decision framework incorporates sophisticated risk management that goes beyond traditional volatility measures. The system considers correlation risks, liquidity risks, concentration risks, and tail risks when making investment recommendations. LLMs enhance this process by providing contextual understanding of risk factors and their potential interactions.


Position sizing represents a critical component of the decision framework. The system must determine not only whether to buy or sell a stock but also how much capital to allocate to each position. This decision depends on the expected return, risk level, correlation with existing positions, and overall portfolio objectives. LLMs contribute by providing sophisticated reasoning about position sizing that considers multiple factors and scenarios.


The framework generates detailed explanations for each investment decision, including the key factors that influenced the recommendation, the confidence level, and the potential risks. These explanations are crucial for building trust with users and enabling them to understand and validate the AI's reasoning. LLMs excel at generating these explanations in natural language that is both comprehensive and accessible.


## Report Generation and User Interface with LLM Enhancement


The final component of the agentic AI system is the report generation and user interface that presents analysis results and recommendations in a clear, actionable format. This component must be capable of generating different types of reports for different audiences, from executive summaries for senior management to detailed technical analysis for portfolio managers. The integration of LLMs significantly enhances this capability by providing sophisticated natural language generation and the ability to customize reports based on user preferences and requirements.


The system generates multiple types of reports, including real-time market alerts, daily market summaries, individual stock analysis reports, portfolio performance reviews, and comprehensive investment research reports. Each report type serves a different purpose and audience, requiring different levels of detail and different presentation formats.



import asyncio

import json

from datetime import datetime, timedelta

from typing import Dict, List, Optional, Any

import openai

from dataclasses import asdict


class LLMEnhancedReportGenerator:

    def __init__(self, llm_api_key: str):

        self.llm_client = openai.OpenAI(api_key=llm_api_key)

        self.report_templates = self.initialize_report_templates()

        self.user_preferences = {}

        

    def initialize_report_templates(self) -> Dict[str, Dict]:

        """Initialize report templates for different report types"""

        return {

            'executive_summary': {

                'sections': ['key_highlights', 'market_outlook', 'top_recommendations', 'risk_alerts'],

                'max_length': 500,

                'tone': 'executive',

                'focus': 'strategic'

            },

            'detailed_analysis': {

                'sections': ['market_analysis', 'technical_analysis', 'fundamental_analysis', 

                           'risk_assessment', 'recommendations', 'implementation'],

                'max_length': 2000,

                'tone': 'analytical',

                'focus': 'comprehensive'

            },

            'daily_briefing': {

                'sections': ['market_summary', 'key_events', 'stock_alerts', 'economic_calendar'],

                'max_length': 800,

                'tone': 'informative',

                'focus': 'current'

            },

            'research_report': {

                'sections': ['investment_thesis', 'company_analysis', 'valuation', 'risks', 

                           'catalysts', 'recommendation'],

                'max_length': 3000,

                'tone': 'professional',

                'focus': 'detailed'

            }

        }

    

    async def generate_comprehensive_stock_report(self, symbol: str, 

                                                recommendation: LLMEnhancedInvestmentRecommendation,

                                                analysis_data: Dict,

                                                report_type: str = 'detailed_analysis') -> Dict:

        """Generate comprehensive stock analysis report"""

        

        # Prepare report context

        report_context = self.prepare_stock_report_context(symbol, recommendation, analysis_data)

        

        # Generate different sections based on report type

        report_sections = await self.generate_report_sections(

            symbol, report_context, report_type

        )

        

        # Create executive summary

        executive_summary = await self.generate_executive_summary(

            symbol, recommendation, report_sections

        )

        

        # Generate visual elements descriptions

        visual_elements = await self.generate_visual_elements_description(

            symbol, analysis_data

        )

        

        # Compile final report

        final_report = {

            'symbol': symbol,

            'report_type': report_type,

            'executive_summary': executive_summary,

            'sections': report_sections,

            'recommendation': asdict(recommendation),

            'visual_elements': visual_elements,

            'metadata': {

                'generated_at': datetime.now(),

                'data_as_of': analysis_data.get('timestamp', datetime.now()),

                'confidence_level': recommendation.confidence,

                'report_version': '1.0'

            }

        }

        

        return final_report

    

    async def generate_report_sections(self, symbol: str, report_context: Dict, 

                                     report_type: str) -> Dict[str, str]:

        """Generate individual report sections using LLM"""

        

        template = self.report_templates.get(report_type, self.report_templates['detailed_analysis'])

        sections = {}

        

        # Generate each section

        for section_name in template['sections']:

            section_content = await self.generate_section_content(

                symbol, section_name, report_context, template

            )

            sections[section_name] = section_content

        

        return sections

    

    async def generate_section_content(self, symbol: str, section_name: str, 

                                     report_context: Dict, template: Dict) -> str:

        """Generate content for a specific report section"""

        

        section_prompts = {

            'market_analysis': f"""

            Analyze the current market environment for {symbol}:

            

            {report_context.get('market_data', '')}

            

            Cover:

            - Overall market conditions and trends

            - Sector performance and positioning

            - Market sentiment and investor behavior

            - Volatility and risk environment

            - Comparative analysis with benchmarks

            

            Tone: {template['tone']}

            Focus: {template['focus']}

            """,

            

            'technical_analysis': f"""

            Provide technical analysis for {symbol}:

            

            {report_context.get('technical_data', '')}

            

            Include:

            - Chart pattern analysis

            - Support and resistance levels

            - Momentum indicators

            - Volume analysis

            - Technical outlook and price targets

            

            Tone: {template['tone']}

            """,

            

            'fundamental_analysis': f"""

            Analyze fundamental factors for {symbol}:

            

            {report_context.get('fundamental_data', '')}

            

            Cover:

            - Business model and competitive position

            - Financial performance and metrics

            - Management quality and strategy

            - Industry dynamics and growth prospects

            - Valuation assessment

            

            Tone: {template['tone']}

            """,

            

            'risk_assessment': f"""

            Assess risks for {symbol} investment:

            

            {report_context.get('risk_data', '')}

            

            Analyze:

            - Company-specific risks

            - Market and sector risks

            - Economic and political risks

            - Liquidity and operational risks

            - Risk mitigation strategies

            

            Tone: {template['tone']}

            """,

            

            'recommendations': f"""

            Provide investment recommendations for {symbol}:

            

            {report_context.get('recommendation_data', '')}

            

            Include:

            - Investment action and rationale

            - Price targets and timeline

            - Position sizing guidance

            - Risk management approach

            - Monitoring framework

            

            Tone: {template['tone']}

            """

        }

        

        prompt = section_prompts.get(section_name, f"Analyze {section_name} for {symbol}")

        

        try:

            section_content = await self.get_llm_report_content(prompt, template)

            return section_content

        except Exception as e:

            print(f"Section generation error for {section_name}: {e}")

            return f"Section content unavailable: {str(e)}"

    

    async def generate_portfolio_report(self, portfolio_data: Dict, 

                                      performance_data: Dict,

                                      recommendations: List[LLMEnhancedInvestmentRecommendation]) -> Dict:

        """Generate comprehensive portfolio analysis report"""

        

        # Prepare portfolio context

        portfolio_context = self.prepare_portfolio_context(

            portfolio_data, performance_data, recommendations

        )

        

        prompt = f"""

        Generate comprehensive portfolio analysis report:

        

        {portfolio_context}

        

        Structure the report with:

        1. Portfolio Performance Summary

        2. Asset Allocation Analysis

        3. Risk Assessment and Metrics

        4. Individual Position Analysis

        5. Sector and Geographic Exposure

        6. Recent Trading Activity

        7. Current Recommendations

        8. Risk Management Status

        9. Performance Attribution

        10. Outlook and Strategy

        

        Provide actionable insights for portfolio optimization and risk management.

        """

        

        try:

            portfolio_report = await self.get_llm_report_content(prompt, 

                                                               {'tone': 'professional', 'max_length': 2500})

            

            # Generate portfolio metrics summary

            metrics_summary = await self.generate_portfolio_metrics_summary(

                portfolio_data, performance_data

            )

            

            # Generate recommendations summary

            recommendations_summary = await self.generate_recommendations_summary(

                recommendations

            )

            

            return {

                'report_type': 'portfolio_analysis',

                'main_report': portfolio_report,

                'metrics_summary': metrics_summary,

                'recommendations_summary': recommendations_summary,

                'portfolio_data': portfolio_data,

                'performance_data': performance_data,

                'generated_at': datetime.now()

            }

            

        except Exception as e:

            print(f"Portfolio report generation error: {e}")

            return {'error': str(e)}

    

    async def generate_market_briefing(self, market_data: Dict, economic_events: List[Dict],

                                     news_analysis: Dict, top_movers: List[Dict]) -> Dict:

        """Generate daily market briefing"""

        

        briefing_context = self.prepare_market_briefing_context(

            market_data, economic_events, news_analysis, top_movers

        )

        

        prompt = f"""

        Generate daily market briefing:

        

        {briefing_context}

        

        Include:

        1. Market Overview (major indices, key moves)

        2. Top Stories and Market Drivers

        3. Economic Events and Data

        4. Sector Performance Highlights

        5. Notable Stock Movements

        6. Currency and Commodity Updates

        7. Looking Ahead (key events, earnings)

        8. Risk Factors to Monitor

        

        Keep concise but comprehensive. Focus on actionable information for traders and investors.

        """

        

        try:

            market_briefing = await self.get_llm_report_content(prompt, 

                                                              {'tone': 'informative', 'max_length': 1200})

            

            return {

                'report_type': 'market_briefing',

                'briefing_content': market_briefing,

                'market_data': market_data,

                'key_events': economic_events[:5],  # Top 5 events

                'top_movers': top_movers,

                'generated_at': datetime.now()

            }

            

        except Exception as e:

            print(f"Market briefing generation error: {e}")

            return {'error': str(e)}

    

    async def generate_interactive_qa_response(self, user_question: str, 

                                             context_data: Dict) -> str:

        """Generate interactive Q&A response about analysis or recommendations"""

        

        qa_prompt = f"""

        Answer the following question about market analysis or investment recommendations:

        

        Question: {user_question}

        

        Context:

        {json.dumps(context_data, indent=2, default=str)[:2000]}

        

        Provide a comprehensive, accurate answer based on the available data and analysis.

        If the question cannot be fully answered with available data, explain what information would be needed.

        Use specific data points and analysis when possible.

        """

        

        try:

            qa_response = await self.get_llm_report_content(qa_prompt, 

                                                          {'tone': 'conversational', 'max_length': 800})

            return qa_response

        except Exception as e:

            print(f"Q&A response generation error: {e}")

            return f"I apologize, but I'm unable to process your question at the moment: {str(e)}"

    

    async def generate_custom_report(self, user_requirements: Dict, 

                                   analysis_data: Dict) -> Dict:

        """Generate custom report based on user specifications"""

        

        custom_prompt = f"""

        Generate custom investment report based on user requirements:

        

        User Requirements:

        {json.dumps(user_requirements, indent=2)}

        

        Available Data:

        {json.dumps(analysis_data, indent=2, default=str)[:3000]}

        

        Create report that meets the specified requirements while maintaining professional standards.

        Include relevant analysis, data, and insights based on user preferences.

        """

        

        try:

            custom_report = await self.get_llm_report_content(custom_prompt, 

                                                            {'tone': user_requirements.get('tone', 'professional'),

                                                             'max_length': user_requirements.get('length', 1500)})

            

            return {

                'report_type': 'custom',

                'user_requirements': user_requirements,

                'report_content': custom_report,

                'data_sources': list(analysis_data.keys()),

                'generated_at': datetime.now()

            }

            

        except Exception as e:

            print(f"Custom report generation error: {e}")

            return {'error': str(e)}

    

    async def generate_alert_notifications(self, alerts: List[Dict]) -> List[Dict]:

        """Generate formatted alert notifications"""

        

        formatted_alerts = []

        

        for alert in alerts:

            alert_prompt = f"""

            Format the following market alert for notification:

            

            Alert Type: {alert.get('type', 'Unknown')}

            Symbol: {alert.get('symbol', 'N/A')}

            Trigger: {alert.get('trigger', 'N/A')}

            Current Value: {alert.get('current_value', 'N/A')}

            Threshold: {alert.get('threshold', 'N/A')}

            

            Additional Context:

            {alert.get('context', 'No additional context')}

            

            Create concise, actionable alert message (max 200 characters for mobile notification).

            Include urgency level and recommended action if applicable.

            """

            

            try:

                formatted_alert = await self.get_llm_report_content(alert_prompt, 

                                                                  {'tone': 'urgent', 'max_length': 200})

                

                formatted_alerts.append({

                    'original_alert': alert,

                    'formatted_message': formatted_alert,

                    'urgency': alert.get('urgency', 'medium'),

                    'timestamp': datetime.now()

                })

                

            except Exception as e:

                print(f"Alert formatting error: {e}")

                formatted_alerts.append({

                    'original_alert': alert,

                    'formatted_message': f"Alert: {alert.get('symbol', 'N/A')} - {alert.get('trigger', 'N/A')}",

                    'urgency': alert.get('urgency', 'medium'),

                    'timestamp': datetime.now()

                })

        

        return formatted_alerts

    

    async def get_llm_report_content(self, prompt: str, template: Dict) -> str:

        """Get report content from LLM with appropriate formatting"""

        

        system_prompt = f"""You are a professional financial report writer with expertise in investment research and analysis.

        

        Writing Guidelines:

        - Tone: {template.get('tone', 'professional')}

        - Maximum length: {template.get('max_length', 1000)} words

        - Use clear, concise language appropriate for financial professionals

        - Include specific data points and metrics when available

        - Maintain objectivity while providing actionable insights

        - Structure content with clear headings and bullet points when appropriate

        

        Focus on providing valuable, actionable information for investment decision-making."""

        

        try:

            response = self.llm_client.chat.completions.create(

                model="gpt-4",

                messages=[

                    {"role": "system", "content": system_prompt},

                    {"role": "user", "content": prompt}

                ],

                max_tokens=min(template.get('max_length', 1000) * 2, 2000),

                temperature=0.3

            )

            return response.choices[0].message.content

        except Exception as e:

            raise e

    

    def prepare_stock_report_context(self, symbol: str, 

                                   recommendation: LLMEnhancedInvestmentRecommendation,

                                   analysis_data: Dict) -> Dict:

        """Prepare context for stock report generation"""

        

        return {

            'symbol': symbol,

            'recommendation_summary': {

                'action': recommendation.action.value,

                'confidence': recommendation.confidence,

                'target_price': recommendation.target_price,

                'time_horizon': recommendation.time_horizon

            },

            'market_data': analysis_data.get('market_data', {}),

            'technical_data': analysis_data.get('technical_analysis', {}),

            'fundamental_data': analysis_data.get('fundamental_analysis', {}),

            'news_sentiment': analysis_data.get('news_analysis', {}),

            'risk_data': {

                'risk_score': recommendation.risk_score,

                'risk_factors': recommendation.risk_factors

            },

            'recommendation_data': {

                'reasoning': recommendation.reasoning,

                'catalysts': recommendation.key_catalysts,

                'monitoring_points': recommendation.monitoring_points

            }

        }



The report generation system must be capable of adapting to different user preferences and requirements. Some users may prefer concise executive summaries, while others need detailed technical analysis. The system maintains user profiles that specify preferred report formats, level of detail, and areas of focus. LLMs enable this customization by understanding user preferences and generating content that matches their specific needs.


Interactive capabilities represent an important aspect of the user interface. Users should be able to ask questions about the analysis, request clarification on recommendations, or explore alternative scenarios. The system provides a conversational interface that can answer questions about the analysis methodology, explain the reasoning behind recommendations, and provide additional insights based on user queries.


Real-time alerts and notifications keep users informed of important market developments and changes in recommendations. The system monitors market conditions continuously and generates alerts when significant events occur or when investment theses change. These alerts are customized based on user portfolios and preferences, ensuring that users receive relevant and timely information.


Complete Running Example


The following complete example demonstrates the integration of all system components in a working agentic AI system for stock market analysis:



import asyncio

import numpy as np

import pandas as pd

import openai

from datetime import datetime, timedelta

from typing import Dict, List, Optional

import json


class ComprehensiveAgenticAI:

    def __init__(self, api_keys: Dict[str, str], symbols: List[str], risk_tolerance: float = 0.5):

        self.api_keys = api_keys

        self.symbols = symbols

        self.risk_tolerance = risk_tolerance

        

        # Initialize all system components

        self.data_collector = LLMEnhancedMarketDataCollector(

            api_keys['market_data'], symbols, api_keys['openai']

        )

        self.data_processor = LLMEnhancedDataProcessor(api_keys['openai'])

        self.pattern_recognizer = LLMEnhancedPatternRecognizer(api_keys['openai'])

        self.news_analyzer = AdvancedLLMNewsAnalyzer(api_keys)

        self.economic_monitor = LLMEnhancedEconomicEventMonitor(api_keys)

        self.prediction_ensemble = LLMEnhancedPredictionEnsemble('AAPL', api_keys['openai'])

        self.decision_engine = LLMEnhancedDecisionEngine(risk_tolerance, api_keys['openai'])

        self.report_generator = LLMEnhancedReportGenerator(api_keys['openai'])

        

        # System state

        self.current_analysis = {}

        self.recommendations = {}

        self.system_status = 'initialized'

        

    async def run_comprehensive_analysis(self, symbol: str) -> Dict:

        """Run complete analysis pipeline for a symbol"""

        

        print(f"Starting comprehensive analysis for {symbol}...")

        

        try:

            # Step 1: Data Collection

            print("Collecting market data...")

            await self.data_collector.connect_to_feed()

            market_data = await self.collect_market_data(symbol)

            

            # Step 2: News and Sentiment Analysis

            print("Analyzing news and sentiment...")

            news_analysis = await self.news_analyzer.comprehensive_news_analysis(symbol, 7)

            

            # Step 3: Economic Event Analysis

            print("Monitoring economic events...")

            economic_analysis = await self.economic_monitor.comprehensive_economic_analysis([symbol], 30)

            

            # Step 4: Technical Pattern Recognition

            print("Recognizing technical patterns...")

            if 'price_history' in market_data and len(market_data['price_history']) > 50:

                prices = [p['price'] for p in market_data['price_history']]

                pattern_analysis = await self.pattern_recognizer.identify_chart_patterns_with_llm(

                    symbol, prices

                )

            else:

                pattern_analysis = {}

            

            # Step 5: Machine Learning Predictions

            print("Generating ML predictions...")

            if len(market_data.get('price_history', [])) > 100:

                prediction_data = {

                    'market_data': market_data,

                    'sentiment': news_analysis,

                    'economic_events': economic_analysis.get('upcoming_events', [])

                }

                ml_predictions = await self.prediction_ensemble.predict_with_llm_ensemble(

                    prediction_data, 5

                )

            else:

                ml_predictions = {'ensemble_prediction': 0.0, 'confidence': 0.0}

            

            # Step 6: Comprehensive Analysis Synthesis

            analysis_data = {

                'symbol': symbol,

                'market_data': market_data,

                'news_analysis': news_analysis,

                'economic_analysis': economic_analysis,

                'pattern_analysis': pattern_analysis,

                'ml_predictions': ml_predictions,

                'timestamp': datetime.now()

            }

            

            # Step 7: Investment Decision

            print("Making investment decision...")

            recommendation = await self.decision_engine.make_comprehensive_investment_decision(

                symbol, analysis_data

            )

            

            # Step 8: Report Generation

            print("Generating comprehensive report...")

            report = await self.report_generator.generate_comprehensive_stock_report(

                symbol, recommendation, analysis_data, 'detailed_analysis'

            )

            

            # Store results

            self.current_analysis[symbol] = analysis_data

            self.recommendations[symbol] = recommendation

            

            print(f"Analysis complete for {symbol}")

            

            return {

                'symbol': symbol,

                'analysis': analysis_data,

                'recommendation': recommendation,

                'report': report,

                'status': 'success'

            }

            

        except Exception as e:

            print(f"Analysis error for {symbol}: {e}")

            return {

                'symbol': symbol,

                'status': 'error',

                'error': str(e)

            }

    

    async def collect_market_data(self, symbol: str) -> Dict:

        """Collect comprehensive market data for symbol"""

        

        # Simulate market data collection

        # In real implementation, this would connect to actual data feeds

        

        current_time = datetime.now()

        price_history = []

        

        # Generate sample price history

        base_price = 150.0

        for i in range(200):

            timestamp = current_time - timedelta(minutes=i)

            price = base_price + np.random.normal(0, 2) + np.sin(i/10) * 5

            volume = np.random.randint(1000000, 5000000)

            

            price_history.append({

                'price': max(price, 1.0),  # Ensure positive price

                'volume': volume,

                'timestamp': timestamp.timestamp() * 1000

            })

        

        price_history.reverse()  # Chronological order

        

        # Calculate technical indicators

        prices = [p['price'] for p in price_history]

        current_price = prices[-1]

        

        technical_indicators = {

            'sma_20': np.mean(prices[-20:]),

            'sma_50': np.mean(prices[-50:]) if len(prices) >= 50 else np.mean(prices),

            'rsi': self.calculate_rsi(prices),

            'macd': self.calculate_macd(prices),

            'volume_avg': np.mean([p['volume'] for p in price_history[-20:]])

        }

        

        return {

            'symbol': symbol,

            'current_price': current_price,

            'price_history': price_history,

            'technical_indicators': technical_indicators,

            'last_updated': current_time

        }

    

    def calculate_rsi(self, prices: List[float], period: int = 14) -> float:

        """Calculate RSI indicator"""

        if len(prices) < period + 1:

            return 50.0

        

        deltas = [prices[i] - prices[i-1] for i in range(1, len(prices))]

        gains = [delta if delta > 0 else 0 for delta in deltas]

        losses = [-delta if delta < 0 else 0 for delta in deltas]

        

        avg_gain = np.mean(gains[-period:])

        avg_loss = np.mean(losses[-period:])

        

        if avg_loss == 0:

            return 100.0

        

        rs = avg_gain / avg_loss

        rsi = 100 - (100 / (1 + rs))

        

        return rsi

    

    def calculate_macd(self, prices: List[float]) -> float:

        """Calculate MACD indicator"""

        if len(prices) < 26:

            return 0.0

        

        ema_12 = self.calculate_ema(prices, 12)

        ema_26 = self.calculate_ema(prices, 26)

        

        return ema_12 - ema_26 if ema_12 and ema_26 else 0.0

    

    def calculate_ema(self, prices: List[float], period: int) -> Optional[float]:

        """Calculate Exponential Moving Average"""

        if len(prices) < period:

            return None

        

        multiplier = 2 / (period + 1)

        ema = prices[0]

        

        for price in prices[1:]:

            ema = (price * multiplier) + (ema * (1 - multiplier))

        

        return ema

    

    async def run_portfolio_analysis(self, portfolio_symbols: List[str]) -> Dict:

        """Run analysis for entire portfolio"""

        

        print(f"Running portfolio analysis for {len(portfolio_symbols)} symbols...")

        

        portfolio_results = {}

        

        # Analyze each symbol

        for symbol in portfolio_symbols:

            result = await self.run_comprehensive_analysis(symbol)

            portfolio_results[symbol] = result

        

        # Generate portfolio-level insights

        portfolio_summary = await self.generate_portfolio_summary(portfolio_results)

        

        # Generate portfolio report

        portfolio_report = await self.report_generator.generate_portfolio_report(

            portfolio_results, {}, list(self.recommendations.values())

        )

        

        return {

            'portfolio_results': portfolio_results,

            'portfolio_summary': portfolio_summary,

            'portfolio_report': portfolio_report,

            'analysis_timestamp': datetime.now()

        }

    

    async def generate_portfolio_summary(self, portfolio_results: Dict) -> Dict:

        """Generate high-level portfolio summary"""

        

        total_symbols = len(portfolio_results)

        successful_analyses = sum(1 for r in portfolio_results.values() if r['status'] == 'success')

        

        # Aggregate recommendations

        recommendations_summary = {}

        for symbol, result in portfolio_results.items():

            if result['status'] == 'success':

                action = result['recommendation'].action.value

                recommendations_summary[action] = recommendations_summary.get(action, 0) + 1

        

        # Calculate average confidence

        confidences = [r['recommendation'].confidence for r in portfolio_results.values() 

                      if r['status'] == 'success']

        avg_confidence = np.mean(confidences) if confidences else 0.0

        

        return {

            'total_symbols_analyzed': total_symbols,

            'successful_analyses': successful_analyses,

            'recommendations_breakdown': recommendations_summary,

            'average_confidence': avg_confidence,

            'analysis_completion_rate': successful_analyses / total_symbols if total_symbols > 0 else 0.0

        }

    

    async def interactive_query(self, user_question: str, symbol: str = None) -> str:

        """Handle interactive user queries"""

        

        if symbol and symbol in self.current_analysis:

            context_data = {

                'analysis': self.current_analysis[symbol],

                'recommendation': self.recommendations.get(symbol)

            }

        else:

            context_data = {

                'portfolio_analysis': self.current_analysis,

                'recommendations': self.recommendations

            }

        

        response = await self.report_generator.generate_interactive_qa_response(

            user_question, context_data

        )

        

        return response

    

    def get_system_status(self) -> Dict:

        """Get current system status and statistics"""

        

        return {

            'system_status': self.system_status,

            'symbols_monitored': len(self.symbols),

            'active_analyses': len(self.current_analysis),

            'recommendations_generated': len(self.recommendations),

            'last_update': max([a.get('timestamp', datetime.min) 

                              for a in self.current_analysis.values()], 

                              default=datetime.min),

            'risk_tolerance': self.risk_tolerance

        }


async def main():

    """Main function demonstrating the complete system"""

    

    # Configuration

    api_keys = {

        'openai': 'your-openai-api-key',

        'market_data': 'your-market-data-api-key',

        'news': 'your-news-api-key'

    }

    

    symbols = ['AAPL', 'MSFT', 'GOOGL', 'TSLA', 'NVDA']

    risk_tolerance = 0.6  # Moderately aggressive

    

    # Initialize the agentic AI system

    ai_system = ComprehensiveAgenticAI(api_keys, symbols, risk_tolerance)

    

    print("Agentic AI Stock Analysis System Initialized")

    print("=" * 50)

    

    # Run analysis for a single stock

    print("\n1. Single Stock Analysis")

    print("-" * 30)

    apple_analysis = await ai_system.run_comprehensive_analysis('AAPL')

    

    if apple_analysis['status'] == 'success':

        recommendation = apple_analysis['recommendation']

        print(f"Recommendation for AAPL: {recommendation.action.value}")

        print(f"Confidence: {recommendation.confidence:.1%}")

        print(f"Target Price: ${recommendation.target_price}")

        print(f"Key Reasoning: {'; '.join(recommendation.reasoning[:3])}")

    

    # Run portfolio analysis

    print("\n2. Portfolio Analysis")

    print("-" * 30)

    portfolio_analysis = await ai_system.run_portfolio_analysis(['AAPL', 'MSFT', 'GOOGL'])

    

    portfolio_summary = portfolio_analysis['portfolio_summary']

    print(f"Portfolio Analysis Complete:")

    print(f"- Symbols Analyzed: {portfolio_summary['total_symbols_analyzed']}")

    print(f"- Success Rate: {portfolio_summary['analysis_completion_rate']:.1%}")

    print(f"- Average Confidence: {portfolio_summary['average_confidence']:.1%}")

    print(f"- Recommendations: {portfolio_summary['recommendations_breakdown']}")

    

    # Interactive query example

    print("\n3. Interactive Query")

    print("-" * 30)

    user_question = "What are the main risks for AAPL investment right now?"

    response = await ai_system.interactive_query(user_question, 'AAPL')

    print(f"Q: {user_question}")

    print(f"A: {response[:300]}...")

    

    # System status

    print("\n4. System Status")

    print("-" * 30)

    status = ai_system.get_system_status()

    print(f"System Status: {status['system_status']}")

    print(f"Symbols Monitored: {status['symbols_monitored']}")

    print(f"Active Analyses: {status['active_analyses']}")

    print(f"Last Update: {status['last_update']}")

    

    print("\nAgentic AI Analysis Complete!")


if __name__ == "__main__":

    asyncio.run(main())



This complete running example demonstrates the integration of all system components working together to provide comprehensive stock market analysis and investment recommendations. The system combines real-time data collection, advanced analytics, machine learning predictions, and LLM-enhanced reasoning to deliver sophisticated investment insights.


The agentic AI system represents a significant advancement in automated investment analysis, providing capabilities that go far beyond traditional algorithmic trading systems. By integrating LLMs throughout the system architecture, the AI can understand context, reason about complex relationships, and provide human-readable explanations for its decisions. This combination of quantitative analysis and qualitative reasoning creates a powerful tool for investment professionals and individual investors alike.


The system's modular architecture ensures that individual components can be updated and improved independently while maintaining overall system integrity. The LLM integration provides flexibility and adaptability that allows the system to handle new market conditions and evolving investment strategies. As LLM technology continues to advance, the system can incorporate new capabilities and improve its analytical sophistication.

No comments: