Thursday, October 09, 2025

Building a Self-Learning LLM-Based Email Spam Detector: An Agentic AI Approach



Introduction and Core Concepts


Email spam detection has evolved significantly from simple rule-based filters to sophisticated machine learning systems. Traditional approaches rely on static patterns and predefined rules that quickly become obsolete as spammers adapt their techniques. A self-learning LLM-based spam detector represents a paradigm shift toward adaptive intelligence that can evolve with emerging threats.


An agentic AI system differs from conventional machine learning models in its ability to autonomously make decisions, learn from interactions, and adapt its behavior based on feedback. In the context of spam detection, this means the system can identify new spam patterns, update its understanding of legitimate communications, and refine its detection algorithms without explicit reprogramming.


The foundation of our approach rests on using a local Large Language Model that processes email content contextually rather than relying solely on keyword matching or statistical features. This contextual understanding allows the system to detect sophisticated social engineering attempts, phishing campaigns that use legitimate-looking language, and emerging fraud patterns that traditional filters might miss.


System Architecture Overview


Our spam detection system consists of several interconnected components that work together to create an intelligent, self-improving filter. The core architecture includes a local LLM inference engine, a feedback collection mechanism, a learning orchestrator, and an email processing pipeline.


The local LLM serves as the primary decision-making component, analyzing email content for spam indicators while maintaining privacy by keeping all data processing on-premises. The feedback collection mechanism captures user corrections and system performance metrics, feeding this information back into the learning loop. The learning orchestrator manages the continuous improvement process, deciding when and how to update the model based on accumulated feedback.


The email processing pipeline handles the integration with existing email systems, preprocessing incoming messages, and applying the spam detection logic in real-time. This pipeline also manages the quarantine and delivery decisions based on the LLM's confidence scores and learned patterns.


Setting Up the Local LLM Environment


Before implementing the spam detection logic, we need to establish a robust local LLM environment. We'll use Ollama as our local LLM runtime because it provides excellent performance for inference tasks and supports various model architectures suitable for text classification.


The following code example demonstrates the initial setup and configuration of our local LLM environment. This setup includes model loading, configuration management, and basic inference testing to ensure the system is ready for spam detection tasks.


import ollama

import json

import logging

from typing import Dict, List, Optional

from dataclasses import dataclass

from datetime import datetime


@dataclass

class SpamDetectionConfig:

    model_name: str = "llama2:7b"

    confidence_threshold: float = 0.7

    max_tokens: int = 512

    temperature: float = 0.1

    learning_rate: float = 0.001


class LocalLLMManager:

    def __init__(self, config: SpamDetectionConfig):

        self.config = config

        self.logger = logging.getLogger(__name__)

        self.model_loaded = False

        

    def initialize_model(self):

        """Initialize and verify the local LLM model"""

        try:

            # Pull the model if not already available

            ollama.pull(self.config.model_name)

            

            # Test basic inference capability

            test_response = ollama.generate(

                model=self.config.model_name,

                prompt="Test prompt for model verification",

                options={

                    'num_predict': 50,

                    'temperature': self.config.temperature

                }

            )

            

            if test_response and 'response' in test_response:

                self.model_loaded = True

                self.logger.info(f"Model {self.config.model_name} loaded successfully")

                return True

            else:

                raise Exception("Model test failed")

                

        except Exception as e:

            self.logger.error(f"Failed to initialize model: {str(e)}")

            return False

    

    def generate_response(self, prompt: str) -> Optional[str]:

        """Generate response from the local LLM"""

        if not self.model_loaded:

            self.logger.error("Model not loaded")

            return None

            

        try:

            response = ollama.generate(

                model=self.config.model_name,

                prompt=prompt,

                options={

                    'num_predict': self.config.max_tokens,

                    'temperature': self.config.temperature

                }

            )

            return response.get('response', '')

        except Exception as e:

            self.logger.error(f"Generation failed: {str(e)}")

            return None


This code establishes the foundation for our LLM-based system by creating a manager class that handles model initialization and basic inference operations. The SpamDetectionConfig dataclass centralizes all configuration parameters, making it easy to adjust the system's behavior as we learn from its performance. The LocalLLMManager class encapsulates the interaction with Ollama, providing error handling and logging capabilities that will be crucial for monitoring the system's operation in production.


Core Spam Detection Implementation


The heart of our spam detection system lies in crafting effective prompts that guide the LLM to make accurate classification decisions. Unlike traditional machine learning approaches that rely on feature engineering, our LLM-based system uses carefully designed prompts that provide context, examples, and decision criteria.


The following implementation demonstrates how we structure the spam detection logic using prompt engineering techniques. The system analyzes email content by considering multiple factors including sender reputation, content patterns, urgency indicators, and contextual anomalies.


import re

from email.message import EmailMessage

from typing import Tuple, Dict, Any


class SpamDetector:

    def __init__(self, llm_manager: LocalLLMManager):

        self.llm_manager = llm_manager

        self.detection_history = []

        

    def create_detection_prompt(self, email_content: Dict[str, str]) -> str:

        """Create a comprehensive prompt for spam detection"""

        

        base_prompt = """

You are an expert email security analyst tasked with identifying spam and fraudulent emails. 

Analyze the following email carefully and determine if it is spam or legitimate.


Consider these factors in your analysis:

- Sender authenticity and reputation indicators

- Content quality and linguistic patterns

- Urgency tactics and pressure techniques

- Suspicious links or attachments

- Social engineering attempts

- Grammar and spelling inconsistencies

- Unusual formatting or structure


Email Details:

Subject: {subject}

From: {sender}

Content: {content}


Provide your analysis in the following JSON format:

{{

    "is_spam": true/false,

    "confidence": 0.0-1.0,

    "reasoning": "detailed explanation of your decision",

    "risk_factors": ["list", "of", "identified", "risks"],

    "spam_type": "phishing/scam/promotional/malware/legitimate"

}}


Be thorough in your analysis and explain your reasoning clearly.

"""

        

        return base_prompt.format(

            subject=email_content.get('subject', 'No Subject'),

            sender=email_content.get('from', 'Unknown Sender'),

            content=email_content.get('body', 'No Content')[:2000]  # Limit content length

        )

    

    def analyze_email(self, email_data: Dict[str, str]) -> Dict[str, Any]:

        """Analyze an email for spam indicators"""

        

        # Create detection prompt

        prompt = self.create_detection_prompt(email_data)

        

        # Get LLM response

        response = self.llm_manager.generate_response(prompt)

        

        if not response:

            return self._create_fallback_result()

        

        # Parse JSON response

        try:

            result = json.loads(self._extract_json_from_response(response))

            

            # Validate and sanitize result

            validated_result = self._validate_detection_result(result)

            

            # Store detection for learning

            self._record_detection(email_data, validated_result, response)

            

            return validated_result

            

        except (json.JSONDecodeError, KeyError) as e:

            self.logger.error(f"Failed to parse LLM response: {str(e)}")

            return self._create_fallback_result()

    

    def _extract_json_from_response(self, response: str) -> str:

        """Extract JSON content from LLM response"""

        # Look for JSON block in the response

        json_match = re.search(r'\{.*\}', response, re.DOTALL)

        if json_match:

            return json_match.group(0)

        else:

            raise ValueError("No JSON found in response")

    

    def _validate_detection_result(self, result: Dict[str, Any]) -> Dict[str, Any]:

        """Validate and sanitize detection result"""

        validated = {

            'is_spam': bool(result.get('is_spam', False)),

            'confidence': max(0.0, min(1.0, float(result.get('confidence', 0.5)))),

            'reasoning': str(result.get('reasoning', 'No reasoning provided')),

            'risk_factors': result.get('risk_factors', []),

            'spam_type': result.get('spam_type', 'unknown'),

            'timestamp': datetime.now().isoformat()

        }

        return validated

    

    def _create_fallback_result(self) -> Dict[str, Any]:

        """Create fallback result when LLM analysis fails"""

        return {

            'is_spam': False,

            'confidence': 0.0,

            'reasoning': 'Analysis failed - defaulting to safe classification',

            'risk_factors': ['analysis_failure'],

            'spam_type': 'unknown',

            'timestamp': datetime.now().isoformat()

        }

    

    def _record_detection(self, email_data: Dict[str, str], 

                         result: Dict[str, Any], raw_response: str):

        """Record detection for learning purposes"""

        detection_record = {

            'email_hash': hash(email_data.get('body', '')),

            'result': result,

            'raw_response': raw_response,

            'timestamp': datetime.now().isoformat()

        }

        self.detection_history.append(detection_record)


This implementation demonstrates how we leverage the LLM's natural language understanding capabilities to perform sophisticated spam detection. The create_detection_prompt method constructs a detailed prompt that guides the LLM through a systematic analysis process, considering multiple spam indicators that traditional filters might miss.


The analyze_email method orchestrates the entire detection process, from prompt creation to result validation. It includes robust error handling to ensure the system remains operational even when the LLM produces unexpected responses. The JSON-based response format allows us to extract structured information from the LLM's analysis, including confidence scores and detailed reasoning that can be used for further learning.


Implementing the Agentic Learning Mechanism


The agentic aspect of our spam detector lies in its ability to learn from user feedback and adapt its detection strategies over time. This learning mechanism goes beyond simple retraining by implementing a continuous feedback loop that allows the system to evolve its understanding of spam patterns.


The learning system collects feedback from multiple sources including user corrections, false positive reports, and missed spam notifications. This feedback is then processed to identify patterns in the system's mistakes and generate improved prompts or detection strategies.


from collections import defaultdict, deque

import sqlite3

from typing import List, Tuple

import numpy as np


class AgenticLearningSystem:

    def __init__(self, db_path: str = "spam_learning.db"):

        self.db_path = db_path

        self.feedback_buffer = deque(maxlen=1000)

        self.learning_patterns = defaultdict(list)

        self.adaptation_threshold = 10  # Minimum feedback items before adaptation

        self.initialize_database()

        

    def initialize_database(self):

        """Initialize SQLite database for persistent learning storage"""

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        cursor.execute('''

            CREATE TABLE IF NOT EXISTS feedback (

                id INTEGER PRIMARY KEY AUTOINCREMENT,

                email_hash TEXT,

                original_prediction TEXT,

                user_correction TEXT,

                feedback_type TEXT,

                timestamp TEXT,

                email_features TEXT

            )

        ''')

        

        cursor.execute('''

            CREATE TABLE IF NOT EXISTS learning_patterns (

                id INTEGER PRIMARY KEY AUTOINCREMENT,

                pattern_type TEXT,

                pattern_data TEXT,

                effectiveness_score REAL,

                usage_count INTEGER,

                last_updated TEXT

            )

        ''')

        

        conn.commit()

        conn.close()

    

    def collect_feedback(self, email_hash: str, original_result: Dict[str, Any], 

                        user_correction: str, feedback_type: str):

        """Collect user feedback for learning"""

        

        feedback_entry = {

            'email_hash': email_hash,

            'original_prediction': json.dumps(original_result),

            'user_correction': user_correction,

            'feedback_type': feedback_type,

            'timestamp': datetime.now().isoformat()

        }

        

        # Add to buffer for immediate processing

        self.feedback_buffer.append(feedback_entry)

        

        # Store in database for persistence

        self._store_feedback_in_db(feedback_entry)

        

        # Trigger learning if we have enough feedback

        if len(self.feedback_buffer) >= self.adaptation_threshold:

            self.trigger_learning_cycle()

    

    def _store_feedback_in_db(self, feedback: Dict[str, Any]):

        """Store feedback in persistent database"""

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        cursor.execute('''

            INSERT INTO feedback (email_hash, original_prediction, user_correction, 

                                feedback_type, timestamp, email_features)

            VALUES (?, ?, ?, ?, ?, ?)

        ''', (

            feedback['email_hash'],

            feedback['original_prediction'],

            feedback['user_correction'],

            feedback['feedback_type'],

            feedback['timestamp'],

            json.dumps({})  # Placeholder for email features

        ))

        

        conn.commit()

        conn.close()

    

    def trigger_learning_cycle(self):

        """Trigger a learning cycle to adapt detection strategies"""

        

        # Analyze recent feedback patterns

        false_positive_patterns = self._analyze_false_positives()

        false_negative_patterns = self._analyze_false_negatives()

        

        # Generate improved detection strategies

        new_strategies = self._generate_adaptive_strategies(

            false_positive_patterns, false_negative_patterns

        )

        

        # Update system prompts and detection logic

        self._update_detection_strategies(new_strategies)

        

        # Clear processed feedback from buffer

        self.feedback_buffer.clear()

    

    def _analyze_false_positives(self) -> List[Dict[str, Any]]:

        """Analyze patterns in false positive detections"""

        false_positives = [

            fb for fb in self.feedback_buffer 

            if fb['feedback_type'] == 'false_positive'

        ]

        

        patterns = []

        for fp in false_positives:

            original_pred = json.loads(fp['original_prediction'])

            

            # Extract common characteristics of false positives

            pattern = {

                'spam_type': original_pred.get('spam_type'),

                'confidence_range': self._get_confidence_range(original_pred['confidence']),

                'common_risk_factors': original_pred.get('risk_factors', []),

                'correction_type': fp['user_correction']

            }

            patterns.append(pattern)

        

        return self._cluster_similar_patterns(patterns)

    

    def _analyze_false_negatives(self) -> List[Dict[str, Any]]:

        """Analyze patterns in missed spam (false negatives)"""

        false_negatives = [

            fb for fb in self.feedback_buffer 

            if fb['feedback_type'] == 'false_negative'

        ]

        

        patterns = []

        for fn in false_negatives:

            original_pred = json.loads(fn['original_prediction'])

            

            # Identify what the system missed

            pattern = {

                'missed_indicators': self._extract_missed_indicators(fn),

                'actual_spam_type': fn['user_correction'],

                'original_confidence': original_pred['confidence'],

                'failed_detection_reasoning': original_pred.get('reasoning')

            }

            patterns.append(pattern)

        

        return self._cluster_similar_patterns(patterns)

    

    def _generate_adaptive_strategies(self, fp_patterns: List[Dict], 

                                   fn_patterns: List[Dict]) -> Dict[str, Any]:

        """Generate new detection strategies based on learned patterns"""

        

        strategies = {

            'prompt_adjustments': [],

            'confidence_calibration': {},

            'new_risk_factors': [],

            'refined_classifications': []

        }

        

        # Generate prompt adjustments for false positives

        for pattern in fp_patterns:

            if pattern['frequency'] > 3:  # Only adjust for recurring patterns

                adjustment = {

                    'type': 'reduce_sensitivity',

                    'condition': pattern['common_characteristics'],

                    'adjustment': f"Be more cautious when classifying emails with {pattern['common_characteristics']} as spam"

                }

                strategies['prompt_adjustments'].append(adjustment)

        

        # Generate enhanced detection for false negatives

        for pattern in fn_patterns:

            if pattern['frequency'] > 2:

                enhancement = {

                    'type': 'increase_sensitivity',

                    'indicators': pattern['missed_indicators'],

                    'adjustment': f"Pay special attention to {pattern['missed_indicators']} which may indicate {pattern['actual_spam_type']}"

                }

                strategies['prompt_adjustments'].append(enhancement)

        

        return strategies

    

    def _update_detection_strategies(self, strategies: Dict[str, Any]):

        """Update the spam detector with new strategies"""

        

        # Store new strategies in database

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        for strategy in strategies['prompt_adjustments']:

            cursor.execute('''

                INSERT INTO learning_patterns (pattern_type, pattern_data, 

                                             effectiveness_score, usage_count, last_updated)

                VALUES (?, ?, ?, ?, ?)

            ''', (

                strategy['type'],

                json.dumps(strategy),

                0.0,  # Initial effectiveness score

                0,    # Initial usage count

                datetime.now().isoformat()

            ))

        

        conn.commit()

        conn.close()

        

        # Log learning activity

        logging.info(f"Applied {len(strategies['prompt_adjustments'])} new detection strategies")


This learning system implements a sophisticated feedback loop that allows the spam detector to evolve its detection capabilities based on real-world performance. The system collects feedback from users when they correct false positives or report missed spam, then analyzes these corrections to identify systematic patterns in the detector's mistakes.


The learning mechanism operates on multiple levels. At the immediate level, it adjusts confidence thresholds and detection sensitivity based on recent feedback. At a deeper level, it modifies the prompts used for detection to incorporate new understanding about spam patterns and legitimate email characteristics.


Enhanced Prompt Engineering with Learning Integration


As the system learns from feedback, it needs to dynamically update its detection prompts to incorporate new knowledge. This requires a sophisticated prompt management system that can integrate learned patterns while maintaining the coherence and effectiveness of the original detection logic.


class AdaptivePromptManager:

    def __init__(self, learning_system: AgenticLearningSystem):

        self.learning_system = learning_system

        self.base_prompt_template = self._load_base_template()

        self.active_adaptations = []

        

    def _load_base_template(self) -> str:

        """Load the base prompt template for spam detection"""

        return """

You are an expert email security analyst with extensive experience in identifying spam and fraudulent emails.

Your analysis should be thorough, considering both obvious and subtle indicators of malicious intent.


{adaptive_instructions}


Analyze the following email and determine if it is spam or legitimate:


Email Details:

Subject: {subject}

From: {sender}

Content: {content}


{learned_patterns}


Consider these factors in your analysis:

- Sender authenticity and reputation indicators

- Content quality and linguistic patterns  

- Urgency tactics and pressure techniques

- Suspicious links or attachments

- Social engineering attempts

- Grammar and spelling inconsistencies

- Unusual formatting or structure


{specific_guidance}


Provide your analysis in JSON format:

{{

    "is_spam": true/false,

    "confidence": 0.0-1.0,

    "reasoning": "detailed explanation",

    "risk_factors": ["identified", "risks"],

    "spam_type": "classification"

}}

"""

    

    def generate_adaptive_prompt(self, email_data: Dict[str, str]) -> str:

        """Generate a prompt that incorporates learned adaptations"""

        

        # Load current learning patterns

        learned_patterns = self._get_relevant_patterns(email_data)

        

        # Generate adaptive instructions

        adaptive_instructions = self._create_adaptive_instructions(learned_patterns)

        

        # Create specific guidance based on email characteristics

        specific_guidance = self._generate_specific_guidance(email_data, learned_patterns)

        

        # Format the complete prompt

        return self.base_prompt_template.format(

            adaptive_instructions=adaptive_instructions,

            subject=email_data.get('subject', 'No Subject'),

            sender=email_data.get('from', 'Unknown Sender'),

            content=email_data.get('body', 'No Content')[:2000],

            learned_patterns=self._format_learned_patterns(learned_patterns),

            specific_guidance=specific_guidance

        )

    

    def _get_relevant_patterns(self, email_data: Dict[str, str]) -> List[Dict[str, Any]]:

        """Retrieve learning patterns relevant to the current email"""

        

        conn = sqlite3.connect(self.learning_system.db_path)

        cursor = conn.cursor()

        

        cursor.execute('''

            SELECT pattern_type, pattern_data, effectiveness_score, usage_count

            FROM learning_patterns

            WHERE effectiveness_score > 0.3

            ORDER BY effectiveness_score DESC, usage_count DESC

            LIMIT 10

        ''')

        

        patterns = []

        for row in cursor.fetchall():

            pattern = {

                'type': row[0],

                'data': json.loads(row[1]),

                'effectiveness': row[2],

                'usage_count': row[3]

            }

            patterns.append(pattern)

        

        conn.close()

        return patterns

    

    def _create_adaptive_instructions(self, patterns: List[Dict[str, Any]]) -> str:

        """Create adaptive instructions based on learned patterns"""

        

        if not patterns:

            return "Apply your standard spam detection methodology."

        

        instructions = ["Based on recent learning, pay special attention to:"]

        

        for pattern in patterns[:5]:  # Use top 5 most effective patterns

            if pattern['type'] == 'reduce_sensitivity':

                instructions.append(f"- Avoid over-classifying emails with {pattern['data'].get('condition', 'certain characteristics')} as spam")

            elif pattern['type'] == 'increase_sensitivity':

                instructions.append(f"- Be more vigilant for {pattern['data'].get('indicators', 'specific indicators')} which may indicate spam")

        

        return "\n".join(instructions)

    

    def _generate_specific_guidance(self, email_data: Dict[str, str], 

                                  patterns: List[Dict[str, Any]]) -> str:

        """Generate specific guidance based on email content and learned patterns"""

        

        guidance_items = []

        

        # Check for patterns that match current email characteristics

        subject = email_data.get('subject', '').lower()

        content = email_data.get('body', '').lower()

        sender = email_data.get('from', '').lower()

        

        # Apply learned pattern matching

        for pattern in patterns:

            if self._pattern_matches_email(pattern, email_data):

                if pattern['type'] == 'reduce_sensitivity':

                    guidance_items.append(f"Note: Similar emails have been incorrectly classified as spam in the past. Exercise caution.")

                elif pattern['type'] == 'increase_sensitivity':

                    guidance_items.append(f"Alert: This email shows characteristics similar to previously missed spam.")

        

        # Add contextual guidance based on email features

        if 'urgent' in subject or 'immediate' in content:

            guidance_items.append("Urgency language detected - verify legitimacy carefully.")

        

        if len(re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', content)) > 3:

            guidance_items.append("Multiple links detected - assess for suspicious destinations.")

        

        return "\n".join(guidance_items) if guidance_items else "Apply standard analysis procedures."

    

    def _pattern_matches_email(self, pattern: Dict[str, Any], 

                              email_data: Dict[str, str]) -> bool:

        """Check if a learned pattern matches the current email"""

        

        pattern_data = pattern['data']

        

        # Simple pattern matching - can be enhanced with more sophisticated logic

        if 'condition' in pattern_data:

            condition = pattern_data['condition'].lower()

            email_text = (email_data.get('subject', '') + ' ' + 

                         email_data.get('body', '')).lower()

            return condition in email_text

        

        if 'indicators' in pattern_data:

            indicators = pattern_data['indicators']

            email_text = (email_data.get('subject', '') + ' ' + 

                         email_data.get('body', '')).lower()

            return any(indicator.lower() in email_text for indicator in indicators)

        

        return False

    

    def _format_learned_patterns(self, patterns: List[Dict[str, Any]]) -> str:

        """Format learned patterns for inclusion in prompt"""

        

        if not patterns:

            return ""

        

        formatted = ["Recent learning insights:"]

        for pattern in patterns[:3]:  # Include top 3 patterns

            effectiveness = pattern['effectiveness']

            pattern_desc = self._describe_pattern(pattern)

            formatted.append(f"- {pattern_desc} (effectiveness: {effectiveness:.2f})")

        

        return "\n".join(formatted)

    

    def _describe_pattern(self, pattern: Dict[str, Any]) -> str:

        """Create human-readable description of a learned pattern"""

        

        if pattern['type'] == 'reduce_sensitivity':

            return f"Avoid false positives for emails with {pattern['data'].get('condition', 'certain characteristics')}"

        elif pattern['type'] == 'increase_sensitivity':

            return f"Watch for {pattern['data'].get('indicators', 'specific indicators')} indicating potential spam"

        else:

            return f"Pattern type: {pattern['type']}"


This adaptive prompt management system represents the core of the agentic learning capability. It dynamically modifies the detection prompts based on accumulated learning, ensuring that the system becomes more accurate over time. The system maintains a balance between incorporating new knowledge and preserving the fundamental detection logic that works well.


Email Integration and Processing Pipeline


To make our spam detector practical for real-world use, we need to integrate it with existing email systems. This requires building a processing pipeline that can handle various email formats, manage the flow of messages through the detection system, and implement appropriate actions based on detection results.


import email

import imaplib

import smtplib

from email.mime.text import MIMEText

from email.mime.multipart import MIMEMultipart

import threading

import queue

import time


class EmailProcessor:

    def __init__(self, spam_detector: SpamDetector, 

                 learning_system: AgenticLearningSystem):

        self.spam_detector = spam_detector

        self.learning_system = learning_system

        self.processing_queue = queue.Queue()

        self.results_queue = queue.Queue()

        self.is_running = False

        self.worker_threads = []

        

    def start_processing(self, num_workers: int = 3):

        """Start the email processing pipeline"""

        self.is_running = True

        

        # Start worker threads

        for i in range(num_workers):

            worker = threading.Thread(target=self._process_emails_worker, 

                                    name=f"EmailWorker-{i}")

            worker.daemon = True

            worker.start()

            self.worker_threads.append(worker)

        

        # Start result handler

        result_handler = threading.Thread(target=self._handle_results)

        result_handler.daemon = True

        result_handler.start()

        

        logging.info(f"Email processing started with {num_workers} workers")

    

    def stop_processing(self):

        """Stop the email processing pipeline"""

        self.is_running = False

        

        # Wait for workers to finish

        for worker in self.worker_threads:

            worker.join(timeout=5)

        

        logging.info("Email processing stopped")

    

    def process_email_message(self, raw_email: str) -> Dict[str, Any]:

        """Process a single email message"""

        

        try:

            # Parse email message

            email_msg = email.message_from_string(raw_email)

            email_data = self._extract_email_data(email_msg)

            

            # Add to processing queue

            processing_item = {

                'email_data': email_data,

                'raw_message': raw_email,

                'timestamp': datetime.now().isoformat(),

                'message_id': email_data.get('message_id', 'unknown')

            }

            

            self.processing_queue.put(processing_item)

            

            return {'status': 'queued', 'message_id': processing_item['message_id']}

            

        except Exception as e:

            logging.error(f"Failed to process email: {str(e)}")

            return {'status': 'error', 'error': str(e)}

    

    def _extract_email_data(self, email_msg: email.message.EmailMessage) -> Dict[str, str]:

        """Extract relevant data from email message"""

        

        # Get basic headers

        email_data = {

            'subject': email_msg.get('Subject', ''),

            'from': email_msg.get('From', ''),

            'to': email_msg.get('To', ''),

            'date': email_msg.get('Date', ''),

            'message_id': email_msg.get('Message-ID', ''),

        }

        

        # Extract body content

        body_parts = []

        

        if email_msg.is_multipart():

            for part in email_msg.walk():

                if part.get_content_type() == "text/plain":

                    body_parts.append(part.get_payload(decode=True).decode('utf-8', errors='ignore'))

                elif part.get_content_type() == "text/html":

                    # Simple HTML to text conversion

                    html_content = part.get_payload(decode=True).decode('utf-8', errors='ignore')

                    text_content = self._html_to_text(html_content)

                    body_parts.append(text_content)

        else:

            if email_msg.get_content_type() == "text/plain":

                body_parts.append(email_msg.get_payload(decode=True).decode('utf-8', errors='ignore'))

        

        email_data['body'] = '\n'.join(body_parts)

        

        # Extract additional metadata

        email_data['attachments'] = self._get_attachment_info(email_msg)

        email_data['headers'] = dict(email_msg.items())

        

        return email_data

    

    def _html_to_text(self, html_content: str) -> str:

        """Convert HTML content to plain text"""

        # Simple HTML tag removal - can be enhanced with proper HTML parsing

        import re

        text = re.sub(r'<[^>]+>', '', html_content)

        text = re.sub(r'\s+', ' ', text)

        return text.strip()

    

    def _get_attachment_info(self, email_msg: email.message.EmailMessage) -> List[Dict[str, str]]:

        """Extract attachment information"""

        attachments = []

        

        if email_msg.is_multipart():

            for part in email_msg.walk():

                if part.get_content_disposition() == 'attachment':

                    filename = part.get_filename()

                    if filename:

                        attachments.append({

                            'filename': filename,

                            'content_type': part.get_content_type(),

                            'size': len(part.get_payload(decode=True)) if part.get_payload() else 0

                        })

        

        return attachments

    

    def _process_emails_worker(self):

        """Worker thread for processing emails"""

        

        while self.is_running:

            try:

                # Get email from queue with timeout

                processing_item = self.processing_queue.get(timeout=1)

                

                # Perform spam detection

                detection_result = self.spam_detector.analyze_email(

                    processing_item['email_data']

                )

                

                # Create processing result

                result = {

                    'message_id': processing_item['message_id'],

                    'detection_result': detection_result,

                    'email_data': processing_item['email_data'],

                    'processing_time': datetime.now().isoformat(),

                    'action_required': self._determine_action(detection_result)

                }

                

                # Add to results queue

                self.results_queue.put(result)

                

                # Mark task as done

                self.processing_queue.task_done()

                

            except queue.Empty:

                continue

            except Exception as e:

                logging.error(f"Worker error: {str(e)}")

                continue

    

    def _determine_action(self, detection_result: Dict[str, Any]) -> str:

        """Determine what action to take based on detection result"""

        

        is_spam = detection_result.get('is_spam', False)

        confidence = detection_result.get('confidence', 0.0)

        

        if is_spam and confidence > 0.8:

            return 'quarantine'

        elif is_spam and confidence > 0.6:

            return 'flag_suspicious'

        elif is_spam and confidence > 0.4:

            return 'mark_potential_spam'

        else:

            return 'deliver_normal'

    

    def _handle_results(self):

        """Handle processing results"""

        

        while self.is_running:

            try:

                result = self.results_queue.get(timeout=1)

                

                # Log result

                logging.info(f"Processed email {result['message_id']}: "

                           f"spam={result['detection_result']['is_spam']}, "

                           f"confidence={result['detection_result']['confidence']:.2f}, "

                           f"action={result['action_required']}")

                

                # Take appropriate action

                self._execute_action(result)

                

                # Mark task as done

                self.results_queue.task_done()

                

            except queue.Empty:

                continue

            except Exception as e:

                logging.error(f"Result handler error: {str(e)}")

                continue

    

    def _execute_action(self, result: Dict[str, Any]):

        """Execute the determined action for an email"""

        

        action = result['action_required']

        email_data = result['email_data']

        detection_result = result['detection_result']

        

        if action == 'quarantine':

            self._quarantine_email(email_data, detection_result)

        elif action == 'flag_suspicious':

            self._flag_suspicious_email(email_data, detection_result)

        elif action == 'mark_potential_spam':

            self._mark_potential_spam(email_data, detection_result)

        else:

            self._deliver_normal(email_data)

    

    def _quarantine_email(self, email_data: Dict[str, str], 

                         detection_result: Dict[str, Any]):

        """Quarantine a spam email"""

        # Implementation would depend on email system integration

        logging.info(f"Quarantined email: {email_data.get('subject', 'No Subject')}")

        

        # Optionally notify administrator

        self._notify_admin_quarantine(email_data, detection_result)

    

    def _flag_suspicious_email(self, email_data: Dict[str, str], 

                              detection_result: Dict[str, Any]):

        """Flag email as suspicious but allow delivery"""

        logging.info(f"Flagged suspicious email: {email_data.get('subject', 'No Subject')}")

        

        # Add warning header or modify subject

        # Implementation depends on email system

    

    def _mark_potential_spam(self, email_data: Dict[str, str], 

                            detection_result: Dict[str, Any]):

        """Mark email as potential spam"""

        logging.info(f"Marked potential spam: {email_data.get('subject', 'No Subject')}")

        

        # Add spam score header or move to spam folder

        # Implementation depends on email system

    

    def _deliver_normal(self, email_data: Dict[str, str]):

        """Deliver email normally"""

        logging.debug(f"Delivered normally: {email_data.get('subject', 'No Subject')}")

    

    def _notify_admin_quarantine(self, email_data: Dict[str, str], 

                                detection_result: Dict[str, Any]):

        """Notify administrator of quarantined email"""

        # Implementation would send notification to admin

        pass


This email processing pipeline provides a robust foundation for integrating the spam detector with real email systems. The multi-threaded architecture ensures that email processing doesn't become a bottleneck, while the queue-based design allows for scalable handling of high email volumes.


The system implements different action levels based on confidence scores, allowing for nuanced handling of emails that fall into gray areas. This approach reduces the risk of false positives affecting important communications while still providing protection against obvious spam.


Performance Monitoring and Optimization


A production spam detection system requires comprehensive monitoring to ensure optimal performance and to identify areas for improvement. The monitoring system tracks detection accuracy, processing speed, resource utilization, and learning effectiveness.


import psutil

import time

from collections import defaultdict

from typing import Dict, List, Any

import matplotlib.pyplot as plt

import json


class PerformanceMonitor:

    def __init__(self, spam_detector: SpamDetector, 

                 learning_system: AgenticLearningSystem):

        self.spam_detector = spam_detector

        self.learning_system = learning_system

        self.metrics = defaultdict(list)

        self.start_time = time.time()

        self.monitoring_active = False

        

    def start_monitoring(self):

        """Start performance monitoring"""

        self.monitoring_active = True

        self.start_time = time.time()

        

        # Start monitoring thread

        monitor_thread = threading.Thread(target=self._monitoring_loop)

        monitor_thread.daemon = True

        monitor_thread.start()

        

        logging.info("Performance monitoring started")

    

    def stop_monitoring(self):

        """Stop performance monitoring"""

        self.monitoring_active = False

        logging.info("Performance monitoring stopped")

    

    def record_detection_metrics(self, email_data: Dict[str, str], 

                               detection_result: Dict[str, Any], 

                               processing_time: float):

        """Record metrics for a single detection"""

        

        timestamp = time.time()

        

        # Basic detection metrics

        self.metrics['detection_count'].append(timestamp)

        self.metrics['processing_time'].append(processing_time)

        self.metrics['confidence_scores'].append(detection_result.get('confidence', 0.0))

        self.metrics['spam_detections'].append(1 if detection_result.get('is_spam') else 0)

        

        # Content analysis metrics

        content_length = len(email_data.get('body', ''))

        self.metrics['content_length'].append(content_length)

        

        # Risk factor analysis

        risk_factors = detection_result.get('risk_factors', [])

        self.metrics['risk_factor_count'].append(len(risk_factors))

        

        # Update running statistics

        self._update_running_stats(timestamp, processing_time, detection_result)

    

    def record_feedback_metrics(self, feedback_type: str, correction_accuracy: float):

        """Record metrics for user feedback"""

        

        timestamp = time.time()

        self.metrics['feedback_events'].append(timestamp)

        self.metrics['feedback_types'].append(feedback_type)

        self.metrics['correction_accuracy'].append(correction_accuracy)

    

    def _monitoring_loop(self):

        """Main monitoring loop for system metrics"""

        

        while self.monitoring_active:

            try:

                # Collect system metrics

                cpu_usage = psutil.cpu_percent(interval=1)

                memory_usage = psutil.virtual_memory().percent

                

                timestamp = time.time()

                self.metrics['cpu_usage'].append((timestamp, cpu_usage))

                self.metrics['memory_usage'].append((timestamp, memory_usage))

                

                # Collect queue metrics if available

                if hasattr(self.spam_detector, 'processing_queue'):

                    queue_size = self.spam_detector.processing_queue.qsize()

                    self.metrics['queue_size'].append((timestamp, queue_size))

                

                time.sleep(60)  # Monitor every minute

                

            except Exception as e:

                logging.error(f"Monitoring error: {str(e)}")

                time.sleep(60)

    

    def _update_running_stats(self, timestamp: float, processing_time: float, 

                            detection_result: Dict[str, Any]):

        """Update running statistics"""

        

        # Calculate recent performance metrics

        recent_window = 3600  # 1 hour window

        cutoff_time = timestamp - recent_window

        

        # Filter recent detections

        recent_times = [t for t in self.metrics['detection_count'] if t > cutoff_time]

        recent_processing_times = [

            pt for i, pt in enumerate(self.metrics['processing_time'])

            if i < len(recent_times) and self.metrics['detection_count'][i] > cutoff_time

        ]

        

        if recent_processing_times:

            avg_processing_time = sum(recent_processing_times) / len(recent_processing_times)

            self.metrics['avg_processing_time_1h'] = avg_processing_time

            

        # Calculate detection rate

        if recent_times:

            detection_rate = len(recent_times) / (recent_window / 60)  # per minute

            self.metrics['detection_rate_1h'] = detection_rate

    

    def generate_performance_report(self) -> Dict[str, Any]:

        """Generate comprehensive performance report"""

        

        current_time = time.time()

        uptime = current_time - self.start_time

        

        report = {

            'system_info': {

                'uptime_hours': uptime / 3600,

                'total_detections': len(self.metrics['detection_count']),

                'total_feedback_events': len(self.metrics['feedback_events'])

            },

            'performance_metrics': self._calculate_performance_metrics(),

            'accuracy_metrics': self._calculate_accuracy_metrics(),

            'resource_usage': self._calculate_resource_metrics(),

            'learning_metrics': self._calculate_learning_metrics()

        }

        

        return report

    

    def _calculate_performance_metrics(self) -> Dict[str, float]:

        """Calculate performance-related metrics"""

        

        processing_times = self.metrics['processing_time']

        

        if not processing_times:

            return {}

        

        return {

            'avg_processing_time': sum(processing_times) / len(processing_times),

            'max_processing_time': max(processing_times),

            'min_processing_time': min(processing_times),

            'processing_time_std': np.std(processing_times) if len(processing_times) > 1 else 0,

            'detection_rate_per_minute': self.metrics.get('detection_rate_1h', 0)

        }

    

    def _calculate_accuracy_metrics(self) -> Dict[str, float]:

        """Calculate accuracy-related metrics"""

        

        feedback_types = self.metrics['feedback_types']

        

        if not feedback_types:

            return {'feedback_available': False}

        

        false_positives = feedback_types.count('false_positive')

        false_negatives = feedback_types.count('false_negative')

        correct_detections = feedback_types.count('correct')

        

        total_feedback = len(feedback_types)

        

        return {

            'feedback_available': True,

            'false_positive_rate': false_positives / total_feedback if total_feedback > 0 else 0,

            'false_negative_rate': false_negatives / total_feedback if total_feedback > 0 else 0,

            'accuracy_rate': correct_detections / total_feedback if total_feedback > 0 else 0,

            'total_feedback_events': total_feedback

        }

    

    def _calculate_resource_metrics(self) -> Dict[str, float]:

        """Calculate resource usage metrics"""

        

        cpu_data = [usage for _, usage in self.metrics['cpu_usage']]

        memory_data = [usage for _, usage in self.metrics['memory_usage']]

        

        if not cpu_data:

            return {}

        

        return {

            'avg_cpu_usage': sum(cpu_data) / len(cpu_data),

            'max_cpu_usage': max(cpu_data),

            'avg_memory_usage': sum(memory_data) / len(memory_data),

            'max_memory_usage': max(memory_data)

        }

    

    def _calculate_learning_metrics(self) -> Dict[str, Any]:

        """Calculate learning system metrics"""

        

        # Get learning patterns from database

        conn = sqlite3.connect(self.learning_system.db_path)

        cursor = conn.cursor()

        

        cursor.execute('SELECT COUNT(*) FROM learning_patterns')

        total_patterns = cursor.fetchone()[0]

        

        cursor.execute('SELECT AVG(effectiveness_score) FROM learning_patterns WHERE effectiveness_score > 0')

        avg_effectiveness = cursor.fetchone()[0] or 0

        

        cursor.execute('SELECT COUNT(*) FROM feedback')

        total_feedback = cursor.fetchone()[0]

        

        conn.close()

        

        return {

            'total_learning_patterns': total_patterns,

            'average_pattern_effectiveness': avg_effectiveness,

            'total_feedback_collected': total_feedback,

            'learning_active': len(self.learning_system.feedback_buffer) > 0

        }

    

    def create_performance_dashboard(self, output_file: str = "performance_dashboard.png"):

        """Create visual performance dashboard"""

        

        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        fig.suptitle('Spam Detector Performance Dashboard', fontsize=16)

        

        # Processing time trend

        if self.metrics['processing_time']:

            axes[0, 0].plot(self.metrics['processing_time'])

            axes[0, 0].set_title('Processing Time Trend')

            axes[0, 0].set_ylabel('Time (seconds)')

            axes[0, 0].set_xlabel('Detection Number')

        

        # Confidence score distribution

        if self.metrics['confidence_scores']:

            axes[0, 1].hist(self.metrics['confidence_scores'], bins=20, alpha=0.7)

            axes[0, 1].set_title('Confidence Score Distribution')

            axes[0, 1].set_xlabel('Confidence Score')

            axes[0, 1].set_ylabel('Frequency')

        

        # CPU usage over time

        if self.metrics['cpu_usage']:

            times, cpu_values = zip(*self.metrics['cpu_usage'])

            axes[1, 0].plot(times, cpu_values)

            axes[1, 0].set_title('CPU Usage Over Time')

            axes[1, 0].set_ylabel('CPU Usage (%)')

            axes[1, 0].set_xlabel('Time')

        

        # Spam detection rate

        if self.metrics['spam_detections']:

            spam_rate = sum(self.metrics['spam_detections']) / len(self.metrics['spam_detections'])

            axes[1, 1].bar(['Spam', 'Ham'], [spam_rate, 1 - spam_rate])

            axes[1, 1].set_title('Spam vs Ham Detection Rate')

            axes[1, 1].set_ylabel('Proportion')

        

        plt.tight_layout()

        plt.savefig(output_file, dpi=300, bbox_inches='tight')

        plt.close()

        

        logging.info(f"Performance dashboard saved to {output_file}")


This monitoring system provides comprehensive insights into the spam detector's performance, enabling administrators to identify bottlenecks, track accuracy improvements, and optimize resource usage. The visual dashboard helps in quickly understanding system health and performance trends.


Security Considerations and Hardening


Security is paramount in a spam detection system, as it processes sensitive email content and makes decisions that affect communication flow. The system must be hardened against various attack vectors while maintaining its effectiveness.


import hashlib

import hmac

import secrets

from cryptography.fernet import Fernet

from cryptography.hazmat.primitives import hashes

from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

import base64


class SecurityManager:

    def __init__(self, master_key: str = None):

        self.master_key = master_key or self._generate_master_key()

        self.encryption_key = self._derive_encryption_key()

        self.cipher_suite = Fernet(self.encryption_key)

        self.rate_limiter = {}

        

    def _generate_master_key(self) -> str:

        """Generate a secure master key"""

        return secrets.token_urlsafe(32)

    

    def _derive_encryption_key(self) -> bytes:

        """Derive encryption key from master key"""

        password = self.master_key.encode()

        salt = b'spam_detector_salt'  # In production, use random salt

        kdf = PBKDF2HMAC(

            algorithm=hashes.SHA256(),

            length=32,

            salt=salt,

            iterations=100000,

        )

        key = base64.urlsafe_b64encode(kdf.derive(password))

        return key

    

    def encrypt_sensitive_data(self, data: str) -> str:

        """Encrypt sensitive data"""

        try:

            encrypted_data = self.cipher_suite.encrypt(data.encode())

            return base64.urlsafe_b64encode(encrypted_data).decode()

        except Exception as e:

            logging.error(f"Encryption failed: {str(e)}")

            return ""

    

    def decrypt_sensitive_data(self, encrypted_data: str) -> str:

        """Decrypt sensitive data"""

        try:

            decoded_data = base64.urlsafe_b64decode(encrypted_data.encode())

            decrypted_data = self.cipher_suite.decrypt(decoded_data)

            return decrypted_data.decode()

        except Exception as e:

            logging.error(f"Decryption failed: {str(e)}")

            return ""

    

    def sanitize_email_content(self, email_data: Dict[str, str]) -> Dict[str, str]:

        """Sanitize email content to prevent injection attacks"""

        

        sanitized = {}

        

        for key, value in email_data.items():

            if isinstance(value, str):

                # Remove potentially dangerous characters

                sanitized_value = re.sub(r'[<>"\'\x00-\x1f\x7f-\x9f]', '', value)

                # Limit length to prevent memory exhaustion

                sanitized_value = sanitized_value[:10000]

                sanitized[key] = sanitized_value

            else:

                sanitized[key] = value

        

        return sanitized

    

    def validate_input_data(self, email_data: Dict[str, str]) -> bool:

        """Validate input data for security"""

        

        required_fields = ['subject', 'from', 'body']

        

        # Check required fields

        for field in required_fields:

            if field not in email_data:

                logging.warning(f"Missing required field: {field}")

                return False

        

        # Check data types

        for key, value in email_data.items():

            if not isinstance(value, (str, list, dict)):

                logging.warning(f"Invalid data type for field {key}")

                return False

        

        # Check for suspicious patterns

        suspicious_patterns = [

            r'<script[^>]*>.*?</script>',

            r'javascript:',

            r'data:text/html',

            r'vbscript:',

        ]

        

        content = str(email_data.get('body', ''))

        for pattern in suspicious_patterns:

            if re.search(pattern, content, re.IGNORECASE):

                logging.warning(f"Suspicious pattern detected: {pattern}")

                return False

        

        return True

    

    def implement_rate_limiting(self, client_id: str, max_requests: int = 100, 

                              time_window: int = 3600) -> bool:

        """Implement rate limiting to prevent abuse"""

        

        current_time = time.time()

        

        if client_id not in self.rate_limiter:

            self.rate_limiter[client_id] = []

        

        # Clean old requests

        self.rate_limiter[client_id] = [

            req_time for req_time in self.rate_limiter[client_id]

            if current_time - req_time < time_window

        ]

        

        # Check rate limit

        if len(self.rate_limiter[client_id]) >= max_requests:

            logging.warning(f"Rate limit exceeded for client: {client_id}")

            return False

        

        # Add current request

        self.rate_limiter[client_id].append(current_time)

        return True

    

    def generate_audit_log(self, action: str, details: Dict[str, Any], 

                          client_id: str = None) -> str:

        """Generate audit log entry"""

        

        log_entry = {

            'timestamp': datetime.now().isoformat(),

            'action': action,

            'client_id': client_id,

            'details': details,

            'hash': self._generate_log_hash(action, details, client_id)

        }

        

        return json.dumps(log_entry)

    

    def _generate_log_hash(self, action: str, details: Dict[str, Any], 

                          client_id: str = None) -> str:

        """Generate hash for log integrity"""

        

        log_data = f"{action}:{client_id}:{json.dumps(details, sort_keys=True)}"

        return hmac.new(

            self.master_key.encode(),

            log_data.encode(),

            hashlib.sha256

        ).hexdigest()


class SecureSpamDetector(SpamDetector):

    """Security-hardened version of spam detector"""

    

    def __init__(self, llm_manager: LocalLLMManager, security_manager: SecurityManager):

        super().__init__(llm_manager)

        self.security_manager = security_manager

        

    def analyze_email(self, email_data: Dict[str, str], 

                     client_id: str = None) -> Dict[str, Any]:

        """Secure email analysis with validation and rate limiting"""

        

        # Rate limiting check

        if client_id and not self.security_manager.implement_rate_limiting(client_id):

            return {

                'error': 'Rate limit exceeded',

                'is_spam': False,

                'confidence': 0.0

            }

        

        # Input validation

        if not self.security_manager.validate_input_data(email_data):

            return {

                'error': 'Invalid input data',

                'is_spam': False,

                'confidence': 0.0

            }

        

        # Sanitize input

        sanitized_data = self.security_manager.sanitize_email_content(email_data)

        

        # Generate audit log

        audit_log = self.security_manager.generate_audit_log(

            'email_analysis',

            {

                'subject_hash': hashlib.sha256(sanitized_data.get('subject', '').encode()).hexdigest()[:16],

                'content_length': len(sanitized_data.get('body', ''))

            },

            client_id

        )

        logging.info(f"Audit: {audit_log}")

        

        # Perform analysis

        try:

            result = super().analyze_email(sanitized_data)

            

            # Encrypt sensitive information in result if needed

            if 'reasoning' in result:

                # Don't encrypt reasoning as it's needed for transparency

                pass

            

            return result

            

        except Exception as e:

            logging.error(f"Secure analysis failed: {str(e)}")

            return {

                'error': 'Analysis failed',

                'is_spam': False,

                'confidence': 0.0

            }


This security framework provides multiple layers of protection including input validation, rate limiting, data sanitization, encryption for sensitive data, and comprehensive audit logging. The secure wrapper ensures that the spam detection system can operate safely in production environments while maintaining its effectiveness.


Testing and Validation Framework


A robust testing framework is essential for validating the spam detector's performance and ensuring its reliability across different scenarios. The testing system should evaluate both accuracy and security aspects of the detector.


import unittest

import tempfile

import shutil

from typing import List, Tuple

import random


class SpamDetectorTestSuite:

    def __init__(self, spam_detector: SpamDetector):

        self.spam_detector = spam_detector

        self.test_results = []

        

    def load_test_dataset(self, spam_file: str, ham_file: str) -> List[Tuple[Dict[str, str], bool]]:

        """Load test dataset with spam and ham examples"""

        

        test_data = []

        

        # Load spam examples

        try:

            with open(spam_file, 'r', encoding='utf-8') as f:

                spam_emails = json.load(f)

                for email in spam_emails:

                    test_data.append((email, True))  # True = spam

        except FileNotFoundError:

            logging.warning(f"Spam test file not found: {spam_file}")

        

        # Load ham examples

        try:

            with open(ham_file, 'r', encoding='utf-8') as f:

                ham_emails = json.load(f)

                for email in ham_emails:

                    test_data.append((email, False))  # False = ham

        except FileNotFoundError:

            logging.warning(f"Ham test file not found: {ham_file}")

        

        # Shuffle the dataset

        random.shuffle(test_data)

        return test_data

    

    def run_accuracy_tests(self, test_data: List[Tuple[Dict[str, str], bool]]) -> Dict[str, float]:

        """Run accuracy tests on the dataset"""

        

        results = {

            'true_positives': 0,

            'true_negatives': 0,

            'false_positives': 0,

            'false_negatives': 0,

            'total_tests': len(test_data)

        }

        

        for email_data, is_spam in test_data:

            try:

                detection_result = self.spam_detector.analyze_email(email_data)

                predicted_spam = detection_result.get('is_spam', False)

                

                if is_spam and predicted_spam:

                    results['true_positives'] += 1

                elif not is_spam and not predicted_spam:

                    results['true_negatives'] += 1

                elif not is_spam and predicted_spam:

                    results['false_positives'] += 1

                elif is_spam and not predicted_spam:

                    results['false_negatives'] += 1

                

                # Store detailed result for analysis

                self.test_results.append({

                    'email_subject': email_data.get('subject', 'No Subject'),

                    'actual_spam': is_spam,

                    'predicted_spam': predicted_spam,

                    'confidence': detection_result.get('confidence', 0.0),

                    'reasoning': detection_result.get('reasoning', ''),

                    'correct': is_spam == predicted_spam

                })

                

            except Exception as e:

                logging.error(f"Test failed for email: {str(e)}")

                continue

        

        # Calculate metrics

        tp = results['true_positives']

        tn = results['true_negatives']

        fp = results['false_positives']

        fn = results['false_negatives']

        

        accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0

        precision = tp / (tp + fp) if (tp + fp) > 0 else 0

        recall = tp / (tp + fn) if (tp + fn) > 0 else 0

        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

        

        results.update({

            'accuracy': accuracy,

            'precision': precision,

            'recall': recall,

            'f1_score': f1_score,

            'false_positive_rate': fp / (fp + tn) if (fp + tn) > 0 else 0,

            'false_negative_rate': fn / (fn + tp) if (fn + tp) > 0 else 0

        })

        

        return results

    

    def run_performance_tests(self, test_data: List[Tuple[Dict[str, str], bool]], 

                            num_iterations: int = 100) -> Dict[str, float]:

        """Run performance tests to measure processing speed"""

        

        processing_times = []

        

        # Select random subset for performance testing

        test_subset = random.sample(test_data, min(num_iterations, len(test_data)))

        

        for email_data, _ in test_subset:

            start_time = time.time()

            

            try:

                self.spam_detector.analyze_email(email_data)

                processing_time = time.time() - start_time

                processing_times.append(processing_time)

                

            except Exception as e:

                logging.error(f"Performance test failed: {str(e)}")

                continue

        

        if not processing_times:

            return {'error': 'No successful performance tests'}

        

        return {

            'avg_processing_time': sum(processing_times) / len(processing_times),

            'min_processing_time': min(processing_times),

            'max_processing_time': max(processing_times),

            'std_processing_time': np.std(processing_times),

            'total_tests': len(processing_times)

        }

    

    def run_security_tests(self) -> Dict[str, bool]:

        """Run security tests to validate input handling"""

        

        security_results = {}

        

        # Test malicious input handling

        malicious_inputs = [

            {

                'subject': '<script>alert("xss")</script>',

                'from': 'test@example.com',

                'body': 'Normal content'

            },

            {

                'subject': 'Normal subject',

                'from': 'test@example.com',

                'body': 'javascript:void(0)' * 1000  # Large malicious content

            },

            {

                'subject': 'Test',

                'from': 'test@example.com',

                'body': '\x00\x01\x02' + 'A' * 50000  # Binary data and large content

            }

        ]

        

        for i, malicious_input in enumerate(malicious_inputs):

            try:

                result = self.spam_detector.analyze_email(malicious_input)

                # Should not crash and should return valid result

                security_results[f'malicious_input_test_{i}'] = (

                    isinstance(result, dict) and 

                    'is_spam' in result and 

                    'confidence' in result

                )

            except Exception as e:

                logging.error(f"Security test {i} failed: {str(e)}")

                security_results[f'malicious_input_test_{i}'] = False

        

        # Test rate limiting if security manager is available

        if hasattr(self.spam_detector, 'security_manager'):

            rate_limit_results = []

            test_email = {

                'subject': 'Rate limit test',

                'from': 'test@example.com',

                'body': 'Testing rate limiting functionality'

            }

            

            # Send multiple requests rapidly

            for _ in range(150):  # Exceed typical rate limit

                try:

                    result = self.spam_detector.analyze_email(test_email, client_id='test_client')

                    rate_limit_results.append('error' not in result)

                except Exception:

                    rate_limit_results.append(False)

            

            # Should have some failures due to rate limiting

            security_results['rate_limiting_active'] = not all(rate_limit_results)

        

        return security_results

    

    def generate_test_report(self, accuracy_results: Dict[str, float], 

                           performance_results: Dict[str, float],

                           security_results: Dict[str, bool]) -> str:

        """Generate comprehensive test report"""

        

        report = []

        report.append("SPAM DETECTOR TEST REPORT")

        report.append("=" * 50)

        report.append("")

        

        # Accuracy Results

        report.append("ACCURACY METRICS:")

        report.append(f"  Total Tests: {accuracy_results.get('total_tests', 0)}")

        report.append(f"  Accuracy: {accuracy_results.get('accuracy', 0):.3f}")

        report.append(f"  Precision: {accuracy_results.get('precision', 0):.3f}")

        report.append(f"  Recall: {accuracy_results.get('recall', 0):.3f}")

        report.append(f"  F1 Score: {accuracy_results.get('f1_score', 0):.3f}")

        report.append(f"  False Positive Rate: {accuracy_results.get('false_positive_rate', 0):.3f}")

        report.append(f"  False Negative Rate: {accuracy_results.get('false_negative_rate', 0):.3f}")

        report.append("")

        

        # Performance Results

        report.append("PERFORMANCE METRICS:")

        if 'error' not in performance_results:

            report.append(f"  Average Processing Time: {performance_results.get('avg_processing_time', 0):.3f}s")

            report.append(f"  Min Processing Time: {performance_results.get('min_processing_time', 0):.3f}s")

            report.append(f"  Max Processing Time: {performance_results.get('max_processing_time', 0):.3f}s")

            report.append(f"  Standard Deviation: {performance_results.get('std_processing_time', 0):.3f}s")

        else:

            report.append(f"  Error: {performance_results['error']}")

        report.append("")

        

        # Security Results

        report.append("SECURITY TEST RESULTS:")

        for test_name, passed in security_results.items():

            status = "PASS" if passed else "FAIL"

            report.append(f"  {test_name}: {status}")

        report.append("")

        

        # Recommendations

        report.append("RECOMMENDATIONS:")

        if accuracy_results.get('false_positive_rate', 0) > 0.05:

            report.append("  - High false positive rate detected. Consider adjusting confidence thresholds.")

        if accuracy_results.get('false_negative_rate', 0) > 0.10:

            report.append("  - High false negative rate detected. Consider enhancing detection patterns.")

        if performance_results.get('avg_processing_time', 0) > 2.0:

            report.append("  - Processing time is high. Consider optimizing LLM inference or prompt length.")

        if not all(security_results.values()):

            report.append("  - Some security tests failed. Review input validation and error handling.")

        

        return "\n".join(report)


def create_sample_test_data():

    """Create sample test data for demonstration"""

    

    spam_examples = [

        {

            'subject': 'URGENT: Claim your $1000 prize NOW!',

            'from': 'winner@fake-lottery.com',

            'body': 'Congratulations! You have won $1000 in our exclusive lottery. Click here immediately to claim your prize before it expires. Act now!'

        },

        {

            'subject': 'Your account will be suspended',

            'from': 'security@fake-bank.com',

            'body': 'Your account has suspicious activity. Please verify your credentials immediately by clicking this link or your account will be suspended within 24 hours.'

        },

        {

            'subject': 'Make money from home - guaranteed!',

            'from': 'opportunity@scam.com',

            'body': 'Work from home and make $5000 per week guaranteed! No experience needed. Send us your personal information to get started today.'

        }

    ]

    

    ham_examples = [

        {

            'subject': 'Meeting reminder for tomorrow',

            'from': 'colleague@company.com',

            'body': 'Hi, just a reminder about our project meeting tomorrow at 2 PM in conference room B. Please bring the latest reports.'

        },

        {

            'subject': 'Your order confirmation #12345',

            'from': 'orders@legitimate-store.com',

            'body': 'Thank you for your order. Your items will be shipped within 2-3 business days. You can track your order using the provided tracking number.'

        },

        {

            'subject': 'Newsletter: Tech industry updates',

            'from': 'newsletter@tech-news.com',

            'body': 'This week in technology: New developments in AI, cybersecurity trends, and startup funding news. Read more in our detailed analysis.'

        }

    ]

    

    # Save sample data to files

    with open('test_spam.json', 'w') as f:

        json.dump(spam_examples, f, indent=2)

    

    with open('test_ham.json', 'w') as f:

        json.dump(ham_examples, f, indent=2)

    

    return 'test_spam.json', 'test_ham.json'


Integration Example and Complete System Assembly


Now we bring all components together into a complete, functional spam detection system. This integration example demonstrates how to initialize and run the entire system with all its components working together.


def main():

    """Main function demonstrating complete system integration"""

    

    # Initialize logging

    logging.basicConfig(

        level=logging.INFO,

        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

        handlers=[

            logging.FileHandler('spam_detector.log'),

            logging.StreamHandler()

        ]

    )

    

    logger = logging.getLogger(__name__)

    logger.info("Starting LLM-based spam detection system")

    

    try:

        # Initialize configuration

        config = SpamDetectionConfig(

            model_name="llama2:7b",

            confidence_threshold=0.7,

            max_tokens=512,

            temperature=0.1

        )

        

        # Initialize LLM manager

        logger.info("Initializing local LLM manager")

        llm_manager = LocalLLMManager(config)

        if not llm_manager.initialize_model():

            logger.error("Failed to initialize LLM model")

            return False

        

        # Initialize security manager

        logger.info("Initializing security manager")

        security_manager = SecurityManager()

        

        # Initialize learning system

        logger.info("Initializing learning system")

        learning_system = AgenticLearningSystem("spam_learning.db")

        

        # Initialize adaptive prompt manager

        logger.info("Initializing adaptive prompt manager")

        prompt_manager = AdaptivePromptManager(learning_system)

        

        # Initialize secure spam detector

        logger.info("Initializing spam detector")

        spam_detector = SecureSpamDetector(llm_manager, security_manager)

        spam_detector.prompt_manager = prompt_manager  # Add prompt manager

        

        # Initialize performance monitor

        logger.info("Initializing performance monitor")

        performance_monitor = PerformanceMonitor(spam_detector, learning_system)

        performance_monitor.start_monitoring()

        

        # Initialize email processor

        logger.info("Initializing email processor")

        email_processor = EmailProcessor(spam_detector, learning_system)

        email_processor.start_processing(num_workers=3)

        

        # Run system tests

        logger.info("Running system tests")

        test_suite = SpamDetectorTestSuite(spam_detector)

        

        # Create sample test data

        spam_file, ham_file = create_sample_test_data()

        test_data = test_suite.load_test_dataset(spam_file, ham_file)

        

        # Run tests

        accuracy_results = test_suite.run_accuracy_tests(test_data)

        performance_results = test_suite.run_performance_tests(test_data)

        security_results = test_suite.run_security_tests()

        

        # Generate and display test report

        test_report = test_suite.generate_test_report(

            accuracy_results, performance_results, security_results

        )

        logger.info(f"Test Results:\n{test_report}")

        

        # Demonstrate email processing

        logger.info("Demonstrating email processing")

        sample_emails = [

            {

                'subject': 'Important business proposal',

                'from': 'partner@business.com',

                'body': 'We have an exciting business opportunity that could benefit both our companies. Would you be interested in discussing this further?'

            },

            {

                'subject': 'WINNER! Claim your prize now!',

                'from': 'prizes@suspicious.com',

                'body': 'You have won $10,000! Click this link immediately to claim your prize. This offer expires in 24 hours!'

            }

        ]

        

        for email_data in sample_emails:

            result = email_processor.process_email_message(

                f"Subject: {email_data['subject']}\nFrom: {email_data['from']}\n\n{email_data['body']}"

            )

            logger.info(f"Processed email: {result}")

        

        # Simulate user feedback for learning

        logger.info("Simulating user feedback for learning")

        learning_system.collect_feedback(

            email_hash="sample_hash_1",

            original_result={'is_spam': False, 'confidence': 0.3},

            user_correction="actually_spam",

            feedback_type="false_negative"

        )

        

        # Generate performance report

        logger.info("Generating performance report")

        performance_report = performance_monitor.generate_performance_report()

        logger.info(f"Performance Report: {json.dumps(performance_report, indent=2)}")

        

        # Keep system running for demonstration

        logger.info("System is running. Press Ctrl+C to stop.")

        try:

            while True:

                time.sleep(60)

                logger.info("System heartbeat - all components operational")

        except KeyboardInterrupt:

            logger.info("Shutdown signal received")

        

        # Cleanup

        logger.info("Shutting down system components")

        email_processor.stop_processing()

        performance_monitor.stop_monitoring()

        

        # Generate final dashboard



Deployment Considerations and Production Readiness


When deploying this spam detection system in a production environment, several additional considerations must be addressed to ensure reliability, scalability, and maintainability.


The system architecture should be containerized using Docker to ensure consistent deployment across different environments. The local LLM component requires significant computational resources, so proper resource allocation and monitoring are essential. Consider implementing horizontal scaling by distributing the email processing load across multiple instances of the spam detector.


Database management becomes critical in production. The SQLite database used for learning storage should be replaced with a more robust solution like PostgreSQL for better concurrent access handling and data integrity. Implement proper database backup and recovery procedures to prevent loss of learned patterns and feedback data.


Monitoring and alerting systems should be integrated to track system health, detection accuracy, and performance metrics in real-time. Set up alerts for unusual patterns such as sudden spikes in false positives, system errors, or performance degradation.


The learning system requires careful tuning in production. Implement safeguards to prevent the system from learning incorrect patterns due to malicious feedback or systematic errors. Consider implementing a human-in-the-loop validation process for significant pattern changes.


Security hardening should include regular security audits, penetration testing, and compliance with relevant data protection regulations. Implement proper access controls, audit logging, and data retention policies.


Conclusion and Future Enhancements


This comprehensive implementation demonstrates how to build a sophisticated, self-learning spam detection system using local LLMs and agentic AI principles. The system combines the contextual understanding capabilities of large language models with adaptive learning mechanisms that allow it to evolve with changing spam patterns.


The key advantages of this approach include privacy preservation through local processing, contextual analysis that can detect sophisticated social engineering attempts, and continuous learning that adapts to new threats without requiring manual rule updates.


Future enhancements could include integration with threat intelligence feeds, implementation of federated learning to share insights across multiple deployments while preserving privacy, and development of specialized models for different types of email content such as marketing communications or technical discussions.


The system provides a solid foundation for organizations seeking to implement advanced spam detection capabilities while maintaining control over their data and detection logic. The modular architecture allows for easy customization and extension based on specific organizational requirements and threat landscapes.

No comments: