Introduction and Core Concepts
Email spam detection has evolved significantly from simple rule-based filters to sophisticated machine learning systems. Traditional approaches rely on static patterns and predefined rules that quickly become obsolete as spammers adapt their techniques. A self-learning LLM-based spam detector represents a paradigm shift toward adaptive intelligence that can evolve with emerging threats.
An agentic AI system differs from conventional machine learning models in its ability to autonomously make decisions, learn from interactions, and adapt its behavior based on feedback. In the context of spam detection, this means the system can identify new spam patterns, update its understanding of legitimate communications, and refine its detection algorithms without explicit reprogramming.
The foundation of our approach rests on using a local Large Language Model that processes email content contextually rather than relying solely on keyword matching or statistical features. This contextual understanding allows the system to detect sophisticated social engineering attempts, phishing campaigns that use legitimate-looking language, and emerging fraud patterns that traditional filters might miss.
System Architecture Overview
Our spam detection system consists of several interconnected components that work together to create an intelligent, self-improving filter. The core architecture includes a local LLM inference engine, a feedback collection mechanism, a learning orchestrator, and an email processing pipeline.
The local LLM serves as the primary decision-making component, analyzing email content for spam indicators while maintaining privacy by keeping all data processing on-premises. The feedback collection mechanism captures user corrections and system performance metrics, feeding this information back into the learning loop. The learning orchestrator manages the continuous improvement process, deciding when and how to update the model based on accumulated feedback.
The email processing pipeline handles the integration with existing email systems, preprocessing incoming messages, and applying the spam detection logic in real-time. This pipeline also manages the quarantine and delivery decisions based on the LLM's confidence scores and learned patterns.
Setting Up the Local LLM Environment
Before implementing the spam detection logic, we need to establish a robust local LLM environment. We'll use Ollama as our local LLM runtime because it provides excellent performance for inference tasks and supports various model architectures suitable for text classification.
The following code example demonstrates the initial setup and configuration of our local LLM environment. This setup includes model loading, configuration management, and basic inference testing to ensure the system is ready for spam detection tasks.
import ollama
import json
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SpamDetectionConfig:
model_name: str = "llama2:7b"
confidence_threshold: float = 0.7
max_tokens: int = 512
temperature: float = 0.1
learning_rate: float = 0.001
class LocalLLMManager:
def __init__(self, config: SpamDetectionConfig):
self.config = config
self.logger = logging.getLogger(__name__)
self.model_loaded = False
def initialize_model(self):
"""Initialize and verify the local LLM model"""
try:
# Pull the model if not already available
ollama.pull(self.config.model_name)
# Test basic inference capability
test_response = ollama.generate(
model=self.config.model_name,
prompt="Test prompt for model verification",
options={
'num_predict': 50,
'temperature': self.config.temperature
}
)
if test_response and 'response' in test_response:
self.model_loaded = True
self.logger.info(f"Model {self.config.model_name} loaded successfully")
return True
else:
raise Exception("Model test failed")
except Exception as e:
self.logger.error(f"Failed to initialize model: {str(e)}")
return False
def generate_response(self, prompt: str) -> Optional[str]:
"""Generate response from the local LLM"""
if not self.model_loaded:
self.logger.error("Model not loaded")
return None
try:
response = ollama.generate(
model=self.config.model_name,
prompt=prompt,
options={
'num_predict': self.config.max_tokens,
'temperature': self.config.temperature
}
)
return response.get('response', '')
except Exception as e:
self.logger.error(f"Generation failed: {str(e)}")
return None
This code establishes the foundation for our LLM-based system by creating a manager class that handles model initialization and basic inference operations. The SpamDetectionConfig dataclass centralizes all configuration parameters, making it easy to adjust the system's behavior as we learn from its performance. The LocalLLMManager class encapsulates the interaction with Ollama, providing error handling and logging capabilities that will be crucial for monitoring the system's operation in production.
Core Spam Detection Implementation
The heart of our spam detection system lies in crafting effective prompts that guide the LLM to make accurate classification decisions. Unlike traditional machine learning approaches that rely on feature engineering, our LLM-based system uses carefully designed prompts that provide context, examples, and decision criteria.
The following implementation demonstrates how we structure the spam detection logic using prompt engineering techniques. The system analyzes email content by considering multiple factors including sender reputation, content patterns, urgency indicators, and contextual anomalies.
import re
from email.message import EmailMessage
from typing import Tuple, Dict, Any
class SpamDetector:
def __init__(self, llm_manager: LocalLLMManager):
self.llm_manager = llm_manager
self.detection_history = []
def create_detection_prompt(self, email_content: Dict[str, str]) -> str:
"""Create a comprehensive prompt for spam detection"""
base_prompt = """
You are an expert email security analyst tasked with identifying spam and fraudulent emails.
Analyze the following email carefully and determine if it is spam or legitimate.
Consider these factors in your analysis:
- Sender authenticity and reputation indicators
- Content quality and linguistic patterns
- Urgency tactics and pressure techniques
- Suspicious links or attachments
- Social engineering attempts
- Grammar and spelling inconsistencies
- Unusual formatting or structure
Email Details:
Subject: {subject}
From: {sender}
Content: {content}
Provide your analysis in the following JSON format:
{{
"is_spam": true/false,
"confidence": 0.0-1.0,
"reasoning": "detailed explanation of your decision",
"risk_factors": ["list", "of", "identified", "risks"],
"spam_type": "phishing/scam/promotional/malware/legitimate"
}}
Be thorough in your analysis and explain your reasoning clearly.
"""
return base_prompt.format(
subject=email_content.get('subject', 'No Subject'),
sender=email_content.get('from', 'Unknown Sender'),
content=email_content.get('body', 'No Content')[:2000] # Limit content length
)
def analyze_email(self, email_data: Dict[str, str]) -> Dict[str, Any]:
"""Analyze an email for spam indicators"""
# Create detection prompt
prompt = self.create_detection_prompt(email_data)
# Get LLM response
response = self.llm_manager.generate_response(prompt)
if not response:
return self._create_fallback_result()
# Parse JSON response
try:
result = json.loads(self._extract_json_from_response(response))
# Validate and sanitize result
validated_result = self._validate_detection_result(result)
# Store detection for learning
self._record_detection(email_data, validated_result, response)
return validated_result
except (json.JSONDecodeError, KeyError) as e:
self.logger.error(f"Failed to parse LLM response: {str(e)}")
return self._create_fallback_result()
def _extract_json_from_response(self, response: str) -> str:
"""Extract JSON content from LLM response"""
# Look for JSON block in the response
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
return json_match.group(0)
else:
raise ValueError("No JSON found in response")
def _validate_detection_result(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and sanitize detection result"""
validated = {
'is_spam': bool(result.get('is_spam', False)),
'confidence': max(0.0, min(1.0, float(result.get('confidence', 0.5)))),
'reasoning': str(result.get('reasoning', 'No reasoning provided')),
'risk_factors': result.get('risk_factors', []),
'spam_type': result.get('spam_type', 'unknown'),
'timestamp': datetime.now().isoformat()
}
return validated
def _create_fallback_result(self) -> Dict[str, Any]:
"""Create fallback result when LLM analysis fails"""
return {
'is_spam': False,
'confidence': 0.0,
'reasoning': 'Analysis failed - defaulting to safe classification',
'risk_factors': ['analysis_failure'],
'spam_type': 'unknown',
'timestamp': datetime.now().isoformat()
}
def _record_detection(self, email_data: Dict[str, str],
result: Dict[str, Any], raw_response: str):
"""Record detection for learning purposes"""
detection_record = {
'email_hash': hash(email_data.get('body', '')),
'result': result,
'raw_response': raw_response,
'timestamp': datetime.now().isoformat()
}
self.detection_history.append(detection_record)
This implementation demonstrates how we leverage the LLM's natural language understanding capabilities to perform sophisticated spam detection. The create_detection_prompt method constructs a detailed prompt that guides the LLM through a systematic analysis process, considering multiple spam indicators that traditional filters might miss.
The analyze_email method orchestrates the entire detection process, from prompt creation to result validation. It includes robust error handling to ensure the system remains operational even when the LLM produces unexpected responses. The JSON-based response format allows us to extract structured information from the LLM's analysis, including confidence scores and detailed reasoning that can be used for further learning.
Implementing the Agentic Learning Mechanism
The agentic aspect of our spam detector lies in its ability to learn from user feedback and adapt its detection strategies over time. This learning mechanism goes beyond simple retraining by implementing a continuous feedback loop that allows the system to evolve its understanding of spam patterns.
The learning system collects feedback from multiple sources including user corrections, false positive reports, and missed spam notifications. This feedback is then processed to identify patterns in the system's mistakes and generate improved prompts or detection strategies.
from collections import defaultdict, deque
import sqlite3
from typing import List, Tuple
import numpy as np
class AgenticLearningSystem:
def __init__(self, db_path: str = "spam_learning.db"):
self.db_path = db_path
self.feedback_buffer = deque(maxlen=1000)
self.learning_patterns = defaultdict(list)
self.adaptation_threshold = 10 # Minimum feedback items before adaptation
self.initialize_database()
def initialize_database(self):
"""Initialize SQLite database for persistent learning storage"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email_hash TEXT,
original_prediction TEXT,
user_correction TEXT,
feedback_type TEXT,
timestamp TEXT,
email_features TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS learning_patterns (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pattern_type TEXT,
pattern_data TEXT,
effectiveness_score REAL,
usage_count INTEGER,
last_updated TEXT
)
''')
conn.commit()
conn.close()
def collect_feedback(self, email_hash: str, original_result: Dict[str, Any],
user_correction: str, feedback_type: str):
"""Collect user feedback for learning"""
feedback_entry = {
'email_hash': email_hash,
'original_prediction': json.dumps(original_result),
'user_correction': user_correction,
'feedback_type': feedback_type,
'timestamp': datetime.now().isoformat()
}
# Add to buffer for immediate processing
self.feedback_buffer.append(feedback_entry)
# Store in database for persistence
self._store_feedback_in_db(feedback_entry)
# Trigger learning if we have enough feedback
if len(self.feedback_buffer) >= self.adaptation_threshold:
self.trigger_learning_cycle()
def _store_feedback_in_db(self, feedback: Dict[str, Any]):
"""Store feedback in persistent database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO feedback (email_hash, original_prediction, user_correction,
feedback_type, timestamp, email_features)
VALUES (?, ?, ?, ?, ?, ?)
''', (
feedback['email_hash'],
feedback['original_prediction'],
feedback['user_correction'],
feedback['feedback_type'],
feedback['timestamp'],
json.dumps({}) # Placeholder for email features
))
conn.commit()
conn.close()
def trigger_learning_cycle(self):
"""Trigger a learning cycle to adapt detection strategies"""
# Analyze recent feedback patterns
false_positive_patterns = self._analyze_false_positives()
false_negative_patterns = self._analyze_false_negatives()
# Generate improved detection strategies
new_strategies = self._generate_adaptive_strategies(
false_positive_patterns, false_negative_patterns
)
# Update system prompts and detection logic
self._update_detection_strategies(new_strategies)
# Clear processed feedback from buffer
self.feedback_buffer.clear()
def _analyze_false_positives(self) -> List[Dict[str, Any]]:
"""Analyze patterns in false positive detections"""
false_positives = [
fb for fb in self.feedback_buffer
if fb['feedback_type'] == 'false_positive'
]
patterns = []
for fp in false_positives:
original_pred = json.loads(fp['original_prediction'])
# Extract common characteristics of false positives
pattern = {
'spam_type': original_pred.get('spam_type'),
'confidence_range': self._get_confidence_range(original_pred['confidence']),
'common_risk_factors': original_pred.get('risk_factors', []),
'correction_type': fp['user_correction']
}
patterns.append(pattern)
return self._cluster_similar_patterns(patterns)
def _analyze_false_negatives(self) -> List[Dict[str, Any]]:
"""Analyze patterns in missed spam (false negatives)"""
false_negatives = [
fb for fb in self.feedback_buffer
if fb['feedback_type'] == 'false_negative'
]
patterns = []
for fn in false_negatives:
original_pred = json.loads(fn['original_prediction'])
# Identify what the system missed
pattern = {
'missed_indicators': self._extract_missed_indicators(fn),
'actual_spam_type': fn['user_correction'],
'original_confidence': original_pred['confidence'],
'failed_detection_reasoning': original_pred.get('reasoning')
}
patterns.append(pattern)
return self._cluster_similar_patterns(patterns)
def _generate_adaptive_strategies(self, fp_patterns: List[Dict],
fn_patterns: List[Dict]) -> Dict[str, Any]:
"""Generate new detection strategies based on learned patterns"""
strategies = {
'prompt_adjustments': [],
'confidence_calibration': {},
'new_risk_factors': [],
'refined_classifications': []
}
# Generate prompt adjustments for false positives
for pattern in fp_patterns:
if pattern['frequency'] > 3: # Only adjust for recurring patterns
adjustment = {
'type': 'reduce_sensitivity',
'condition': pattern['common_characteristics'],
'adjustment': f"Be more cautious when classifying emails with {pattern['common_characteristics']} as spam"
}
strategies['prompt_adjustments'].append(adjustment)
# Generate enhanced detection for false negatives
for pattern in fn_patterns:
if pattern['frequency'] > 2:
enhancement = {
'type': 'increase_sensitivity',
'indicators': pattern['missed_indicators'],
'adjustment': f"Pay special attention to {pattern['missed_indicators']} which may indicate {pattern['actual_spam_type']}"
}
strategies['prompt_adjustments'].append(enhancement)
return strategies
def _update_detection_strategies(self, strategies: Dict[str, Any]):
"""Update the spam detector with new strategies"""
# Store new strategies in database
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for strategy in strategies['prompt_adjustments']:
cursor.execute('''
INSERT INTO learning_patterns (pattern_type, pattern_data,
effectiveness_score, usage_count, last_updated)
VALUES (?, ?, ?, ?, ?)
''', (
strategy['type'],
json.dumps(strategy),
0.0, # Initial effectiveness score
0, # Initial usage count
datetime.now().isoformat()
))
conn.commit()
conn.close()
# Log learning activity
logging.info(f"Applied {len(strategies['prompt_adjustments'])} new detection strategies")
This learning system implements a sophisticated feedback loop that allows the spam detector to evolve its detection capabilities based on real-world performance. The system collects feedback from users when they correct false positives or report missed spam, then analyzes these corrections to identify systematic patterns in the detector's mistakes.
The learning mechanism operates on multiple levels. At the immediate level, it adjusts confidence thresholds and detection sensitivity based on recent feedback. At a deeper level, it modifies the prompts used for detection to incorporate new understanding about spam patterns and legitimate email characteristics.
Enhanced Prompt Engineering with Learning Integration
As the system learns from feedback, it needs to dynamically update its detection prompts to incorporate new knowledge. This requires a sophisticated prompt management system that can integrate learned patterns while maintaining the coherence and effectiveness of the original detection logic.
class AdaptivePromptManager:
def __init__(self, learning_system: AgenticLearningSystem):
self.learning_system = learning_system
self.base_prompt_template = self._load_base_template()
self.active_adaptations = []
def _load_base_template(self) -> str:
"""Load the base prompt template for spam detection"""
return """
You are an expert email security analyst with extensive experience in identifying spam and fraudulent emails.
Your analysis should be thorough, considering both obvious and subtle indicators of malicious intent.
{adaptive_instructions}
Analyze the following email and determine if it is spam or legitimate:
Email Details:
Subject: {subject}
From: {sender}
Content: {content}
{learned_patterns}
Consider these factors in your analysis:
- Sender authenticity and reputation indicators
- Content quality and linguistic patterns
- Urgency tactics and pressure techniques
- Suspicious links or attachments
- Social engineering attempts
- Grammar and spelling inconsistencies
- Unusual formatting or structure
{specific_guidance}
Provide your analysis in JSON format:
{{
"is_spam": true/false,
"confidence": 0.0-1.0,
"reasoning": "detailed explanation",
"risk_factors": ["identified", "risks"],
"spam_type": "classification"
}}
"""
def generate_adaptive_prompt(self, email_data: Dict[str, str]) -> str:
"""Generate a prompt that incorporates learned adaptations"""
# Load current learning patterns
learned_patterns = self._get_relevant_patterns(email_data)
# Generate adaptive instructions
adaptive_instructions = self._create_adaptive_instructions(learned_patterns)
# Create specific guidance based on email characteristics
specific_guidance = self._generate_specific_guidance(email_data, learned_patterns)
# Format the complete prompt
return self.base_prompt_template.format(
adaptive_instructions=adaptive_instructions,
subject=email_data.get('subject', 'No Subject'),
sender=email_data.get('from', 'Unknown Sender'),
content=email_data.get('body', 'No Content')[:2000],
learned_patterns=self._format_learned_patterns(learned_patterns),
specific_guidance=specific_guidance
)
def _get_relevant_patterns(self, email_data: Dict[str, str]) -> List[Dict[str, Any]]:
"""Retrieve learning patterns relevant to the current email"""
conn = sqlite3.connect(self.learning_system.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT pattern_type, pattern_data, effectiveness_score, usage_count
FROM learning_patterns
WHERE effectiveness_score > 0.3
ORDER BY effectiveness_score DESC, usage_count DESC
LIMIT 10
''')
patterns = []
for row in cursor.fetchall():
pattern = {
'type': row[0],
'data': json.loads(row[1]),
'effectiveness': row[2],
'usage_count': row[3]
}
patterns.append(pattern)
conn.close()
return patterns
def _create_adaptive_instructions(self, patterns: List[Dict[str, Any]]) -> str:
"""Create adaptive instructions based on learned patterns"""
if not patterns:
return "Apply your standard spam detection methodology."
instructions = ["Based on recent learning, pay special attention to:"]
for pattern in patterns[:5]: # Use top 5 most effective patterns
if pattern['type'] == 'reduce_sensitivity':
instructions.append(f"- Avoid over-classifying emails with {pattern['data'].get('condition', 'certain characteristics')} as spam")
elif pattern['type'] == 'increase_sensitivity':
instructions.append(f"- Be more vigilant for {pattern['data'].get('indicators', 'specific indicators')} which may indicate spam")
return "\n".join(instructions)
def _generate_specific_guidance(self, email_data: Dict[str, str],
patterns: List[Dict[str, Any]]) -> str:
"""Generate specific guidance based on email content and learned patterns"""
guidance_items = []
# Check for patterns that match current email characteristics
subject = email_data.get('subject', '').lower()
content = email_data.get('body', '').lower()
sender = email_data.get('from', '').lower()
# Apply learned pattern matching
for pattern in patterns:
if self._pattern_matches_email(pattern, email_data):
if pattern['type'] == 'reduce_sensitivity':
guidance_items.append(f"Note: Similar emails have been incorrectly classified as spam in the past. Exercise caution.")
elif pattern['type'] == 'increase_sensitivity':
guidance_items.append(f"Alert: This email shows characteristics similar to previously missed spam.")
# Add contextual guidance based on email features
if 'urgent' in subject or 'immediate' in content:
guidance_items.append("Urgency language detected - verify legitimacy carefully.")
if len(re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', content)) > 3:
guidance_items.append("Multiple links detected - assess for suspicious destinations.")
return "\n".join(guidance_items) if guidance_items else "Apply standard analysis procedures."
def _pattern_matches_email(self, pattern: Dict[str, Any],
email_data: Dict[str, str]) -> bool:
"""Check if a learned pattern matches the current email"""
pattern_data = pattern['data']
# Simple pattern matching - can be enhanced with more sophisticated logic
if 'condition' in pattern_data:
condition = pattern_data['condition'].lower()
email_text = (email_data.get('subject', '') + ' ' +
email_data.get('body', '')).lower()
return condition in email_text
if 'indicators' in pattern_data:
indicators = pattern_data['indicators']
email_text = (email_data.get('subject', '') + ' ' +
email_data.get('body', '')).lower()
return any(indicator.lower() in email_text for indicator in indicators)
return False
def _format_learned_patterns(self, patterns: List[Dict[str, Any]]) -> str:
"""Format learned patterns for inclusion in prompt"""
if not patterns:
return ""
formatted = ["Recent learning insights:"]
for pattern in patterns[:3]: # Include top 3 patterns
effectiveness = pattern['effectiveness']
pattern_desc = self._describe_pattern(pattern)
formatted.append(f"- {pattern_desc} (effectiveness: {effectiveness:.2f})")
return "\n".join(formatted)
def _describe_pattern(self, pattern: Dict[str, Any]) -> str:
"""Create human-readable description of a learned pattern"""
if pattern['type'] == 'reduce_sensitivity':
return f"Avoid false positives for emails with {pattern['data'].get('condition', 'certain characteristics')}"
elif pattern['type'] == 'increase_sensitivity':
return f"Watch for {pattern['data'].get('indicators', 'specific indicators')} indicating potential spam"
else:
return f"Pattern type: {pattern['type']}"
This adaptive prompt management system represents the core of the agentic learning capability. It dynamically modifies the detection prompts based on accumulated learning, ensuring that the system becomes more accurate over time. The system maintains a balance between incorporating new knowledge and preserving the fundamental detection logic that works well.
Email Integration and Processing Pipeline
To make our spam detector practical for real-world use, we need to integrate it with existing email systems. This requires building a processing pipeline that can handle various email formats, manage the flow of messages through the detection system, and implement appropriate actions based on detection results.
import email
import imaplib
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import threading
import queue
import time
class EmailProcessor:
def __init__(self, spam_detector: SpamDetector,
learning_system: AgenticLearningSystem):
self.spam_detector = spam_detector
self.learning_system = learning_system
self.processing_queue = queue.Queue()
self.results_queue = queue.Queue()
self.is_running = False
self.worker_threads = []
def start_processing(self, num_workers: int = 3):
"""Start the email processing pipeline"""
self.is_running = True
# Start worker threads
for i in range(num_workers):
worker = threading.Thread(target=self._process_emails_worker,
name=f"EmailWorker-{i}")
worker.daemon = True
worker.start()
self.worker_threads.append(worker)
# Start result handler
result_handler = threading.Thread(target=self._handle_results)
result_handler.daemon = True
result_handler.start()
logging.info(f"Email processing started with {num_workers} workers")
def stop_processing(self):
"""Stop the email processing pipeline"""
self.is_running = False
# Wait for workers to finish
for worker in self.worker_threads:
worker.join(timeout=5)
logging.info("Email processing stopped")
def process_email_message(self, raw_email: str) -> Dict[str, Any]:
"""Process a single email message"""
try:
# Parse email message
email_msg = email.message_from_string(raw_email)
email_data = self._extract_email_data(email_msg)
# Add to processing queue
processing_item = {
'email_data': email_data,
'raw_message': raw_email,
'timestamp': datetime.now().isoformat(),
'message_id': email_data.get('message_id', 'unknown')
}
self.processing_queue.put(processing_item)
return {'status': 'queued', 'message_id': processing_item['message_id']}
except Exception as e:
logging.error(f"Failed to process email: {str(e)}")
return {'status': 'error', 'error': str(e)}
def _extract_email_data(self, email_msg: email.message.EmailMessage) -> Dict[str, str]:
"""Extract relevant data from email message"""
# Get basic headers
email_data = {
'subject': email_msg.get('Subject', ''),
'from': email_msg.get('From', ''),
'to': email_msg.get('To', ''),
'date': email_msg.get('Date', ''),
'message_id': email_msg.get('Message-ID', ''),
}
# Extract body content
body_parts = []
if email_msg.is_multipart():
for part in email_msg.walk():
if part.get_content_type() == "text/plain":
body_parts.append(part.get_payload(decode=True).decode('utf-8', errors='ignore'))
elif part.get_content_type() == "text/html":
# Simple HTML to text conversion
html_content = part.get_payload(decode=True).decode('utf-8', errors='ignore')
text_content = self._html_to_text(html_content)
body_parts.append(text_content)
else:
if email_msg.get_content_type() == "text/plain":
body_parts.append(email_msg.get_payload(decode=True).decode('utf-8', errors='ignore'))
email_data['body'] = '\n'.join(body_parts)
# Extract additional metadata
email_data['attachments'] = self._get_attachment_info(email_msg)
email_data['headers'] = dict(email_msg.items())
return email_data
def _html_to_text(self, html_content: str) -> str:
"""Convert HTML content to plain text"""
# Simple HTML tag removal - can be enhanced with proper HTML parsing
import re
text = re.sub(r'<[^>]+>', '', html_content)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def _get_attachment_info(self, email_msg: email.message.EmailMessage) -> List[Dict[str, str]]:
"""Extract attachment information"""
attachments = []
if email_msg.is_multipart():
for part in email_msg.walk():
if part.get_content_disposition() == 'attachment':
filename = part.get_filename()
if filename:
attachments.append({
'filename': filename,
'content_type': part.get_content_type(),
'size': len(part.get_payload(decode=True)) if part.get_payload() else 0
})
return attachments
def _process_emails_worker(self):
"""Worker thread for processing emails"""
while self.is_running:
try:
# Get email from queue with timeout
processing_item = self.processing_queue.get(timeout=1)
# Perform spam detection
detection_result = self.spam_detector.analyze_email(
processing_item['email_data']
)
# Create processing result
result = {
'message_id': processing_item['message_id'],
'detection_result': detection_result,
'email_data': processing_item['email_data'],
'processing_time': datetime.now().isoformat(),
'action_required': self._determine_action(detection_result)
}
# Add to results queue
self.results_queue.put(result)
# Mark task as done
self.processing_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logging.error(f"Worker error: {str(e)}")
continue
def _determine_action(self, detection_result: Dict[str, Any]) -> str:
"""Determine what action to take based on detection result"""
is_spam = detection_result.get('is_spam', False)
confidence = detection_result.get('confidence', 0.0)
if is_spam and confidence > 0.8:
return 'quarantine'
elif is_spam and confidence > 0.6:
return 'flag_suspicious'
elif is_spam and confidence > 0.4:
return 'mark_potential_spam'
else:
return 'deliver_normal'
def _handle_results(self):
"""Handle processing results"""
while self.is_running:
try:
result = self.results_queue.get(timeout=1)
# Log result
logging.info(f"Processed email {result['message_id']}: "
f"spam={result['detection_result']['is_spam']}, "
f"confidence={result['detection_result']['confidence']:.2f}, "
f"action={result['action_required']}")
# Take appropriate action
self._execute_action(result)
# Mark task as done
self.results_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logging.error(f"Result handler error: {str(e)}")
continue
def _execute_action(self, result: Dict[str, Any]):
"""Execute the determined action for an email"""
action = result['action_required']
email_data = result['email_data']
detection_result = result['detection_result']
if action == 'quarantine':
self._quarantine_email(email_data, detection_result)
elif action == 'flag_suspicious':
self._flag_suspicious_email(email_data, detection_result)
elif action == 'mark_potential_spam':
self._mark_potential_spam(email_data, detection_result)
else:
self._deliver_normal(email_data)
def _quarantine_email(self, email_data: Dict[str, str],
detection_result: Dict[str, Any]):
"""Quarantine a spam email"""
# Implementation would depend on email system integration
logging.info(f"Quarantined email: {email_data.get('subject', 'No Subject')}")
# Optionally notify administrator
self._notify_admin_quarantine(email_data, detection_result)
def _flag_suspicious_email(self, email_data: Dict[str, str],
detection_result: Dict[str, Any]):
"""Flag email as suspicious but allow delivery"""
logging.info(f"Flagged suspicious email: {email_data.get('subject', 'No Subject')}")
# Add warning header or modify subject
# Implementation depends on email system
def _mark_potential_spam(self, email_data: Dict[str, str],
detection_result: Dict[str, Any]):
"""Mark email as potential spam"""
logging.info(f"Marked potential spam: {email_data.get('subject', 'No Subject')}")
# Add spam score header or move to spam folder
# Implementation depends on email system
def _deliver_normal(self, email_data: Dict[str, str]):
"""Deliver email normally"""
logging.debug(f"Delivered normally: {email_data.get('subject', 'No Subject')}")
def _notify_admin_quarantine(self, email_data: Dict[str, str],
detection_result: Dict[str, Any]):
"""Notify administrator of quarantined email"""
# Implementation would send notification to admin
pass
This email processing pipeline provides a robust foundation for integrating the spam detector with real email systems. The multi-threaded architecture ensures that email processing doesn't become a bottleneck, while the queue-based design allows for scalable handling of high email volumes.
The system implements different action levels based on confidence scores, allowing for nuanced handling of emails that fall into gray areas. This approach reduces the risk of false positives affecting important communications while still providing protection against obvious spam.
Performance Monitoring and Optimization
A production spam detection system requires comprehensive monitoring to ensure optimal performance and to identify areas for improvement. The monitoring system tracks detection accuracy, processing speed, resource utilization, and learning effectiveness.
import psutil
import time
from collections import defaultdict
from typing import Dict, List, Any
import matplotlib.pyplot as plt
import json
class PerformanceMonitor:
def __init__(self, spam_detector: SpamDetector,
learning_system: AgenticLearningSystem):
self.spam_detector = spam_detector
self.learning_system = learning_system
self.metrics = defaultdict(list)
self.start_time = time.time()
self.monitoring_active = False
def start_monitoring(self):
"""Start performance monitoring"""
self.monitoring_active = True
self.start_time = time.time()
# Start monitoring thread
monitor_thread = threading.Thread(target=self._monitoring_loop)
monitor_thread.daemon = True
monitor_thread.start()
logging.info("Performance monitoring started")
def stop_monitoring(self):
"""Stop performance monitoring"""
self.monitoring_active = False
logging.info("Performance monitoring stopped")
def record_detection_metrics(self, email_data: Dict[str, str],
detection_result: Dict[str, Any],
processing_time: float):
"""Record metrics for a single detection"""
timestamp = time.time()
# Basic detection metrics
self.metrics['detection_count'].append(timestamp)
self.metrics['processing_time'].append(processing_time)
self.metrics['confidence_scores'].append(detection_result.get('confidence', 0.0))
self.metrics['spam_detections'].append(1 if detection_result.get('is_spam') else 0)
# Content analysis metrics
content_length = len(email_data.get('body', ''))
self.metrics['content_length'].append(content_length)
# Risk factor analysis
risk_factors = detection_result.get('risk_factors', [])
self.metrics['risk_factor_count'].append(len(risk_factors))
# Update running statistics
self._update_running_stats(timestamp, processing_time, detection_result)
def record_feedback_metrics(self, feedback_type: str, correction_accuracy: float):
"""Record metrics for user feedback"""
timestamp = time.time()
self.metrics['feedback_events'].append(timestamp)
self.metrics['feedback_types'].append(feedback_type)
self.metrics['correction_accuracy'].append(correction_accuracy)
def _monitoring_loop(self):
"""Main monitoring loop for system metrics"""
while self.monitoring_active:
try:
# Collect system metrics
cpu_usage = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent
timestamp = time.time()
self.metrics['cpu_usage'].append((timestamp, cpu_usage))
self.metrics['memory_usage'].append((timestamp, memory_usage))
# Collect queue metrics if available
if hasattr(self.spam_detector, 'processing_queue'):
queue_size = self.spam_detector.processing_queue.qsize()
self.metrics['queue_size'].append((timestamp, queue_size))
time.sleep(60) # Monitor every minute
except Exception as e:
logging.error(f"Monitoring error: {str(e)}")
time.sleep(60)
def _update_running_stats(self, timestamp: float, processing_time: float,
detection_result: Dict[str, Any]):
"""Update running statistics"""
# Calculate recent performance metrics
recent_window = 3600 # 1 hour window
cutoff_time = timestamp - recent_window
# Filter recent detections
recent_times = [t for t in self.metrics['detection_count'] if t > cutoff_time]
recent_processing_times = [
pt for i, pt in enumerate(self.metrics['processing_time'])
if i < len(recent_times) and self.metrics['detection_count'][i] > cutoff_time
]
if recent_processing_times:
avg_processing_time = sum(recent_processing_times) / len(recent_processing_times)
self.metrics['avg_processing_time_1h'] = avg_processing_time
# Calculate detection rate
if recent_times:
detection_rate = len(recent_times) / (recent_window / 60) # per minute
self.metrics['detection_rate_1h'] = detection_rate
def generate_performance_report(self) -> Dict[str, Any]:
"""Generate comprehensive performance report"""
current_time = time.time()
uptime = current_time - self.start_time
report = {
'system_info': {
'uptime_hours': uptime / 3600,
'total_detections': len(self.metrics['detection_count']),
'total_feedback_events': len(self.metrics['feedback_events'])
},
'performance_metrics': self._calculate_performance_metrics(),
'accuracy_metrics': self._calculate_accuracy_metrics(),
'resource_usage': self._calculate_resource_metrics(),
'learning_metrics': self._calculate_learning_metrics()
}
return report
def _calculate_performance_metrics(self) -> Dict[str, float]:
"""Calculate performance-related metrics"""
processing_times = self.metrics['processing_time']
if not processing_times:
return {}
return {
'avg_processing_time': sum(processing_times) / len(processing_times),
'max_processing_time': max(processing_times),
'min_processing_time': min(processing_times),
'processing_time_std': np.std(processing_times) if len(processing_times) > 1 else 0,
'detection_rate_per_minute': self.metrics.get('detection_rate_1h', 0)
}
def _calculate_accuracy_metrics(self) -> Dict[str, float]:
"""Calculate accuracy-related metrics"""
feedback_types = self.metrics['feedback_types']
if not feedback_types:
return {'feedback_available': False}
false_positives = feedback_types.count('false_positive')
false_negatives = feedback_types.count('false_negative')
correct_detections = feedback_types.count('correct')
total_feedback = len(feedback_types)
return {
'feedback_available': True,
'false_positive_rate': false_positives / total_feedback if total_feedback > 0 else 0,
'false_negative_rate': false_negatives / total_feedback if total_feedback > 0 else 0,
'accuracy_rate': correct_detections / total_feedback if total_feedback > 0 else 0,
'total_feedback_events': total_feedback
}
def _calculate_resource_metrics(self) -> Dict[str, float]:
"""Calculate resource usage metrics"""
cpu_data = [usage for _, usage in self.metrics['cpu_usage']]
memory_data = [usage for _, usage in self.metrics['memory_usage']]
if not cpu_data:
return {}
return {
'avg_cpu_usage': sum(cpu_data) / len(cpu_data),
'max_cpu_usage': max(cpu_data),
'avg_memory_usage': sum(memory_data) / len(memory_data),
'max_memory_usage': max(memory_data)
}
def _calculate_learning_metrics(self) -> Dict[str, Any]:
"""Calculate learning system metrics"""
# Get learning patterns from database
conn = sqlite3.connect(self.learning_system.db_path)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM learning_patterns')
total_patterns = cursor.fetchone()[0]
cursor.execute('SELECT AVG(effectiveness_score) FROM learning_patterns WHERE effectiveness_score > 0')
avg_effectiveness = cursor.fetchone()[0] or 0
cursor.execute('SELECT COUNT(*) FROM feedback')
total_feedback = cursor.fetchone()[0]
conn.close()
return {
'total_learning_patterns': total_patterns,
'average_pattern_effectiveness': avg_effectiveness,
'total_feedback_collected': total_feedback,
'learning_active': len(self.learning_system.feedback_buffer) > 0
}
def create_performance_dashboard(self, output_file: str = "performance_dashboard.png"):
"""Create visual performance dashboard"""
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Spam Detector Performance Dashboard', fontsize=16)
# Processing time trend
if self.metrics['processing_time']:
axes[0, 0].plot(self.metrics['processing_time'])
axes[0, 0].set_title('Processing Time Trend')
axes[0, 0].set_ylabel('Time (seconds)')
axes[0, 0].set_xlabel('Detection Number')
# Confidence score distribution
if self.metrics['confidence_scores']:
axes[0, 1].hist(self.metrics['confidence_scores'], bins=20, alpha=0.7)
axes[0, 1].set_title('Confidence Score Distribution')
axes[0, 1].set_xlabel('Confidence Score')
axes[0, 1].set_ylabel('Frequency')
# CPU usage over time
if self.metrics['cpu_usage']:
times, cpu_values = zip(*self.metrics['cpu_usage'])
axes[1, 0].plot(times, cpu_values)
axes[1, 0].set_title('CPU Usage Over Time')
axes[1, 0].set_ylabel('CPU Usage (%)')
axes[1, 0].set_xlabel('Time')
# Spam detection rate
if self.metrics['spam_detections']:
spam_rate = sum(self.metrics['spam_detections']) / len(self.metrics['spam_detections'])
axes[1, 1].bar(['Spam', 'Ham'], [spam_rate, 1 - spam_rate])
axes[1, 1].set_title('Spam vs Ham Detection Rate')
axes[1, 1].set_ylabel('Proportion')
plt.tight_layout()
plt.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close()
logging.info(f"Performance dashboard saved to {output_file}")
This monitoring system provides comprehensive insights into the spam detector's performance, enabling administrators to identify bottlenecks, track accuracy improvements, and optimize resource usage. The visual dashboard helps in quickly understanding system health and performance trends.
Security Considerations and Hardening
Security is paramount in a spam detection system, as it processes sensitive email content and makes decisions that affect communication flow. The system must be hardened against various attack vectors while maintaining its effectiveness.
import hashlib
import hmac
import secrets
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
class SecurityManager:
def __init__(self, master_key: str = None):
self.master_key = master_key or self._generate_master_key()
self.encryption_key = self._derive_encryption_key()
self.cipher_suite = Fernet(self.encryption_key)
self.rate_limiter = {}
def _generate_master_key(self) -> str:
"""Generate a secure master key"""
return secrets.token_urlsafe(32)
def _derive_encryption_key(self) -> bytes:
"""Derive encryption key from master key"""
password = self.master_key.encode()
salt = b'spam_detector_salt' # In production, use random salt
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password))
return key
def encrypt_sensitive_data(self, data: str) -> str:
"""Encrypt sensitive data"""
try:
encrypted_data = self.cipher_suite.encrypt(data.encode())
return base64.urlsafe_b64encode(encrypted_data).decode()
except Exception as e:
logging.error(f"Encryption failed: {str(e)}")
return ""
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive data"""
try:
decoded_data = base64.urlsafe_b64decode(encrypted_data.encode())
decrypted_data = self.cipher_suite.decrypt(decoded_data)
return decrypted_data.decode()
except Exception as e:
logging.error(f"Decryption failed: {str(e)}")
return ""
def sanitize_email_content(self, email_data: Dict[str, str]) -> Dict[str, str]:
"""Sanitize email content to prevent injection attacks"""
sanitized = {}
for key, value in email_data.items():
if isinstance(value, str):
# Remove potentially dangerous characters
sanitized_value = re.sub(r'[<>"\'\x00-\x1f\x7f-\x9f]', '', value)
# Limit length to prevent memory exhaustion
sanitized_value = sanitized_value[:10000]
sanitized[key] = sanitized_value
else:
sanitized[key] = value
return sanitized
def validate_input_data(self, email_data: Dict[str, str]) -> bool:
"""Validate input data for security"""
required_fields = ['subject', 'from', 'body']
# Check required fields
for field in required_fields:
if field not in email_data:
logging.warning(f"Missing required field: {field}")
return False
# Check data types
for key, value in email_data.items():
if not isinstance(value, (str, list, dict)):
logging.warning(f"Invalid data type for field {key}")
return False
# Check for suspicious patterns
suspicious_patterns = [
r'<script[^>]*>.*?</script>',
r'javascript:',
r'data:text/html',
r'vbscript:',
]
content = str(email_data.get('body', ''))
for pattern in suspicious_patterns:
if re.search(pattern, content, re.IGNORECASE):
logging.warning(f"Suspicious pattern detected: {pattern}")
return False
return True
def implement_rate_limiting(self, client_id: str, max_requests: int = 100,
time_window: int = 3600) -> bool:
"""Implement rate limiting to prevent abuse"""
current_time = time.time()
if client_id not in self.rate_limiter:
self.rate_limiter[client_id] = []
# Clean old requests
self.rate_limiter[client_id] = [
req_time for req_time in self.rate_limiter[client_id]
if current_time - req_time < time_window
]
# Check rate limit
if len(self.rate_limiter[client_id]) >= max_requests:
logging.warning(f"Rate limit exceeded for client: {client_id}")
return False
# Add current request
self.rate_limiter[client_id].append(current_time)
return True
def generate_audit_log(self, action: str, details: Dict[str, Any],
client_id: str = None) -> str:
"""Generate audit log entry"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'action': action,
'client_id': client_id,
'details': details,
'hash': self._generate_log_hash(action, details, client_id)
}
return json.dumps(log_entry)
def _generate_log_hash(self, action: str, details: Dict[str, Any],
client_id: str = None) -> str:
"""Generate hash for log integrity"""
log_data = f"{action}:{client_id}:{json.dumps(details, sort_keys=True)}"
return hmac.new(
self.master_key.encode(),
log_data.encode(),
hashlib.sha256
).hexdigest()
class SecureSpamDetector(SpamDetector):
"""Security-hardened version of spam detector"""
def __init__(self, llm_manager: LocalLLMManager, security_manager: SecurityManager):
super().__init__(llm_manager)
self.security_manager = security_manager
def analyze_email(self, email_data: Dict[str, str],
client_id: str = None) -> Dict[str, Any]:
"""Secure email analysis with validation and rate limiting"""
# Rate limiting check
if client_id and not self.security_manager.implement_rate_limiting(client_id):
return {
'error': 'Rate limit exceeded',
'is_spam': False,
'confidence': 0.0
}
# Input validation
if not self.security_manager.validate_input_data(email_data):
return {
'error': 'Invalid input data',
'is_spam': False,
'confidence': 0.0
}
# Sanitize input
sanitized_data = self.security_manager.sanitize_email_content(email_data)
# Generate audit log
audit_log = self.security_manager.generate_audit_log(
'email_analysis',
{
'subject_hash': hashlib.sha256(sanitized_data.get('subject', '').encode()).hexdigest()[:16],
'content_length': len(sanitized_data.get('body', ''))
},
client_id
)
logging.info(f"Audit: {audit_log}")
# Perform analysis
try:
result = super().analyze_email(sanitized_data)
# Encrypt sensitive information in result if needed
if 'reasoning' in result:
# Don't encrypt reasoning as it's needed for transparency
pass
return result
except Exception as e:
logging.error(f"Secure analysis failed: {str(e)}")
return {
'error': 'Analysis failed',
'is_spam': False,
'confidence': 0.0
}
This security framework provides multiple layers of protection including input validation, rate limiting, data sanitization, encryption for sensitive data, and comprehensive audit logging. The secure wrapper ensures that the spam detection system can operate safely in production environments while maintaining its effectiveness.
Testing and Validation Framework
A robust testing framework is essential for validating the spam detector's performance and ensuring its reliability across different scenarios. The testing system should evaluate both accuracy and security aspects of the detector.
import unittest
import tempfile
import shutil
from typing import List, Tuple
import random
class SpamDetectorTestSuite:
def __init__(self, spam_detector: SpamDetector):
self.spam_detector = spam_detector
self.test_results = []
def load_test_dataset(self, spam_file: str, ham_file: str) -> List[Tuple[Dict[str, str], bool]]:
"""Load test dataset with spam and ham examples"""
test_data = []
# Load spam examples
try:
with open(spam_file, 'r', encoding='utf-8') as f:
spam_emails = json.load(f)
for email in spam_emails:
test_data.append((email, True)) # True = spam
except FileNotFoundError:
logging.warning(f"Spam test file not found: {spam_file}")
# Load ham examples
try:
with open(ham_file, 'r', encoding='utf-8') as f:
ham_emails = json.load(f)
for email in ham_emails:
test_data.append((email, False)) # False = ham
except FileNotFoundError:
logging.warning(f"Ham test file not found: {ham_file}")
# Shuffle the dataset
random.shuffle(test_data)
return test_data
def run_accuracy_tests(self, test_data: List[Tuple[Dict[str, str], bool]]) -> Dict[str, float]:
"""Run accuracy tests on the dataset"""
results = {
'true_positives': 0,
'true_negatives': 0,
'false_positives': 0,
'false_negatives': 0,
'total_tests': len(test_data)
}
for email_data, is_spam in test_data:
try:
detection_result = self.spam_detector.analyze_email(email_data)
predicted_spam = detection_result.get('is_spam', False)
if is_spam and predicted_spam:
results['true_positives'] += 1
elif not is_spam and not predicted_spam:
results['true_negatives'] += 1
elif not is_spam and predicted_spam:
results['false_positives'] += 1
elif is_spam and not predicted_spam:
results['false_negatives'] += 1
# Store detailed result for analysis
self.test_results.append({
'email_subject': email_data.get('subject', 'No Subject'),
'actual_spam': is_spam,
'predicted_spam': predicted_spam,
'confidence': detection_result.get('confidence', 0.0),
'reasoning': detection_result.get('reasoning', ''),
'correct': is_spam == predicted_spam
})
except Exception as e:
logging.error(f"Test failed for email: {str(e)}")
continue
# Calculate metrics
tp = results['true_positives']
tn = results['true_negatives']
fp = results['false_positives']
fn = results['false_negatives']
accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
results.update({
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1_score,
'false_positive_rate': fp / (fp + tn) if (fp + tn) > 0 else 0,
'false_negative_rate': fn / (fn + tp) if (fn + tp) > 0 else 0
})
return results
def run_performance_tests(self, test_data: List[Tuple[Dict[str, str], bool]],
num_iterations: int = 100) -> Dict[str, float]:
"""Run performance tests to measure processing speed"""
processing_times = []
# Select random subset for performance testing
test_subset = random.sample(test_data, min(num_iterations, len(test_data)))
for email_data, _ in test_subset:
start_time = time.time()
try:
self.spam_detector.analyze_email(email_data)
processing_time = time.time() - start_time
processing_times.append(processing_time)
except Exception as e:
logging.error(f"Performance test failed: {str(e)}")
continue
if not processing_times:
return {'error': 'No successful performance tests'}
return {
'avg_processing_time': sum(processing_times) / len(processing_times),
'min_processing_time': min(processing_times),
'max_processing_time': max(processing_times),
'std_processing_time': np.std(processing_times),
'total_tests': len(processing_times)
}
def run_security_tests(self) -> Dict[str, bool]:
"""Run security tests to validate input handling"""
security_results = {}
# Test malicious input handling
malicious_inputs = [
{
'subject': '<script>alert("xss")</script>',
'from': 'test@example.com',
'body': 'Normal content'
},
{
'subject': 'Normal subject',
'from': 'test@example.com',
'body': 'javascript:void(0)' * 1000 # Large malicious content
},
{
'subject': 'Test',
'from': 'test@example.com',
'body': '\x00\x01\x02' + 'A' * 50000 # Binary data and large content
}
]
for i, malicious_input in enumerate(malicious_inputs):
try:
result = self.spam_detector.analyze_email(malicious_input)
# Should not crash and should return valid result
security_results[f'malicious_input_test_{i}'] = (
isinstance(result, dict) and
'is_spam' in result and
'confidence' in result
)
except Exception as e:
logging.error(f"Security test {i} failed: {str(e)}")
security_results[f'malicious_input_test_{i}'] = False
# Test rate limiting if security manager is available
if hasattr(self.spam_detector, 'security_manager'):
rate_limit_results = []
test_email = {
'subject': 'Rate limit test',
'from': 'test@example.com',
'body': 'Testing rate limiting functionality'
}
# Send multiple requests rapidly
for _ in range(150): # Exceed typical rate limit
try:
result = self.spam_detector.analyze_email(test_email, client_id='test_client')
rate_limit_results.append('error' not in result)
except Exception:
rate_limit_results.append(False)
# Should have some failures due to rate limiting
security_results['rate_limiting_active'] = not all(rate_limit_results)
return security_results
def generate_test_report(self, accuracy_results: Dict[str, float],
performance_results: Dict[str, float],
security_results: Dict[str, bool]) -> str:
"""Generate comprehensive test report"""
report = []
report.append("SPAM DETECTOR TEST REPORT")
report.append("=" * 50)
report.append("")
# Accuracy Results
report.append("ACCURACY METRICS:")
report.append(f" Total Tests: {accuracy_results.get('total_tests', 0)}")
report.append(f" Accuracy: {accuracy_results.get('accuracy', 0):.3f}")
report.append(f" Precision: {accuracy_results.get('precision', 0):.3f}")
report.append(f" Recall: {accuracy_results.get('recall', 0):.3f}")
report.append(f" F1 Score: {accuracy_results.get('f1_score', 0):.3f}")
report.append(f" False Positive Rate: {accuracy_results.get('false_positive_rate', 0):.3f}")
report.append(f" False Negative Rate: {accuracy_results.get('false_negative_rate', 0):.3f}")
report.append("")
# Performance Results
report.append("PERFORMANCE METRICS:")
if 'error' not in performance_results:
report.append(f" Average Processing Time: {performance_results.get('avg_processing_time', 0):.3f}s")
report.append(f" Min Processing Time: {performance_results.get('min_processing_time', 0):.3f}s")
report.append(f" Max Processing Time: {performance_results.get('max_processing_time', 0):.3f}s")
report.append(f" Standard Deviation: {performance_results.get('std_processing_time', 0):.3f}s")
else:
report.append(f" Error: {performance_results['error']}")
report.append("")
# Security Results
report.append("SECURITY TEST RESULTS:")
for test_name, passed in security_results.items():
status = "PASS" if passed else "FAIL"
report.append(f" {test_name}: {status}")
report.append("")
# Recommendations
report.append("RECOMMENDATIONS:")
if accuracy_results.get('false_positive_rate', 0) > 0.05:
report.append(" - High false positive rate detected. Consider adjusting confidence thresholds.")
if accuracy_results.get('false_negative_rate', 0) > 0.10:
report.append(" - High false negative rate detected. Consider enhancing detection patterns.")
if performance_results.get('avg_processing_time', 0) > 2.0:
report.append(" - Processing time is high. Consider optimizing LLM inference or prompt length.")
if not all(security_results.values()):
report.append(" - Some security tests failed. Review input validation and error handling.")
return "\n".join(report)
def create_sample_test_data():
"""Create sample test data for demonstration"""
spam_examples = [
{
'subject': 'URGENT: Claim your $1000 prize NOW!',
'from': 'winner@fake-lottery.com',
'body': 'Congratulations! You have won $1000 in our exclusive lottery. Click here immediately to claim your prize before it expires. Act now!'
},
{
'subject': 'Your account will be suspended',
'from': 'security@fake-bank.com',
'body': 'Your account has suspicious activity. Please verify your credentials immediately by clicking this link or your account will be suspended within 24 hours.'
},
{
'subject': 'Make money from home - guaranteed!',
'from': 'opportunity@scam.com',
'body': 'Work from home and make $5000 per week guaranteed! No experience needed. Send us your personal information to get started today.'
}
]
ham_examples = [
{
'subject': 'Meeting reminder for tomorrow',
'from': 'colleague@company.com',
'body': 'Hi, just a reminder about our project meeting tomorrow at 2 PM in conference room B. Please bring the latest reports.'
},
{
'subject': 'Your order confirmation #12345',
'from': 'orders@legitimate-store.com',
'body': 'Thank you for your order. Your items will be shipped within 2-3 business days. You can track your order using the provided tracking number.'
},
{
'subject': 'Newsletter: Tech industry updates',
'from': 'newsletter@tech-news.com',
'body': 'This week in technology: New developments in AI, cybersecurity trends, and startup funding news. Read more in our detailed analysis.'
}
]
# Save sample data to files
with open('test_spam.json', 'w') as f:
json.dump(spam_examples, f, indent=2)
with open('test_ham.json', 'w') as f:
json.dump(ham_examples, f, indent=2)
return 'test_spam.json', 'test_ham.json'
Integration Example and Complete System Assembly
Now we bring all components together into a complete, functional spam detection system. This integration example demonstrates how to initialize and run the entire system with all its components working together.
def main():
"""Main function demonstrating complete system integration"""
# Initialize logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('spam_detector.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
logger.info("Starting LLM-based spam detection system")
try:
# Initialize configuration
config = SpamDetectionConfig(
model_name="llama2:7b",
confidence_threshold=0.7,
max_tokens=512,
temperature=0.1
)
# Initialize LLM manager
logger.info("Initializing local LLM manager")
llm_manager = LocalLLMManager(config)
if not llm_manager.initialize_model():
logger.error("Failed to initialize LLM model")
return False
# Initialize security manager
logger.info("Initializing security manager")
security_manager = SecurityManager()
# Initialize learning system
logger.info("Initializing learning system")
learning_system = AgenticLearningSystem("spam_learning.db")
# Initialize adaptive prompt manager
logger.info("Initializing adaptive prompt manager")
prompt_manager = AdaptivePromptManager(learning_system)
# Initialize secure spam detector
logger.info("Initializing spam detector")
spam_detector = SecureSpamDetector(llm_manager, security_manager)
spam_detector.prompt_manager = prompt_manager # Add prompt manager
# Initialize performance monitor
logger.info("Initializing performance monitor")
performance_monitor = PerformanceMonitor(spam_detector, learning_system)
performance_monitor.start_monitoring()
# Initialize email processor
logger.info("Initializing email processor")
email_processor = EmailProcessor(spam_detector, learning_system)
email_processor.start_processing(num_workers=3)
# Run system tests
logger.info("Running system tests")
test_suite = SpamDetectorTestSuite(spam_detector)
# Create sample test data
spam_file, ham_file = create_sample_test_data()
test_data = test_suite.load_test_dataset(spam_file, ham_file)
# Run tests
accuracy_results = test_suite.run_accuracy_tests(test_data)
performance_results = test_suite.run_performance_tests(test_data)
security_results = test_suite.run_security_tests()
# Generate and display test report
test_report = test_suite.generate_test_report(
accuracy_results, performance_results, security_results
)
logger.info(f"Test Results:\n{test_report}")
# Demonstrate email processing
logger.info("Demonstrating email processing")
sample_emails = [
{
'subject': 'Important business proposal',
'from': 'partner@business.com',
'body': 'We have an exciting business opportunity that could benefit both our companies. Would you be interested in discussing this further?'
},
{
'subject': 'WINNER! Claim your prize now!',
'from': 'prizes@suspicious.com',
'body': 'You have won $10,000! Click this link immediately to claim your prize. This offer expires in 24 hours!'
}
]
for email_data in sample_emails:
result = email_processor.process_email_message(
f"Subject: {email_data['subject']}\nFrom: {email_data['from']}\n\n{email_data['body']}"
)
logger.info(f"Processed email: {result}")
# Simulate user feedback for learning
logger.info("Simulating user feedback for learning")
learning_system.collect_feedback(
email_hash="sample_hash_1",
original_result={'is_spam': False, 'confidence': 0.3},
user_correction="actually_spam",
feedback_type="false_negative"
)
# Generate performance report
logger.info("Generating performance report")
performance_report = performance_monitor.generate_performance_report()
logger.info(f"Performance Report: {json.dumps(performance_report, indent=2)}")
# Keep system running for demonstration
logger.info("System is running. Press Ctrl+C to stop.")
try:
while True:
time.sleep(60)
logger.info("System heartbeat - all components operational")
except KeyboardInterrupt:
logger.info("Shutdown signal received")
# Cleanup
logger.info("Shutting down system components")
email_processor.stop_processing()
performance_monitor.stop_monitoring()
# Generate final dashboard
Deployment Considerations and Production Readiness
When deploying this spam detection system in a production environment, several additional considerations must be addressed to ensure reliability, scalability, and maintainability.
The system architecture should be containerized using Docker to ensure consistent deployment across different environments. The local LLM component requires significant computational resources, so proper resource allocation and monitoring are essential. Consider implementing horizontal scaling by distributing the email processing load across multiple instances of the spam detector.
Database management becomes critical in production. The SQLite database used for learning storage should be replaced with a more robust solution like PostgreSQL for better concurrent access handling and data integrity. Implement proper database backup and recovery procedures to prevent loss of learned patterns and feedback data.
Monitoring and alerting systems should be integrated to track system health, detection accuracy, and performance metrics in real-time. Set up alerts for unusual patterns such as sudden spikes in false positives, system errors, or performance degradation.
The learning system requires careful tuning in production. Implement safeguards to prevent the system from learning incorrect patterns due to malicious feedback or systematic errors. Consider implementing a human-in-the-loop validation process for significant pattern changes.
Security hardening should include regular security audits, penetration testing, and compliance with relevant data protection regulations. Implement proper access controls, audit logging, and data retention policies.
Conclusion and Future Enhancements
This comprehensive implementation demonstrates how to build a sophisticated, self-learning spam detection system using local LLMs and agentic AI principles. The system combines the contextual understanding capabilities of large language models with adaptive learning mechanisms that allow it to evolve with changing spam patterns.
The key advantages of this approach include privacy preservation through local processing, contextual analysis that can detect sophisticated social engineering attempts, and continuous learning that adapts to new threats without requiring manual rule updates.
Future enhancements could include integration with threat intelligence feeds, implementation of federated learning to share insights across multiple deployments while preserving privacy, and development of specialized models for different types of email content such as marketing communications or technical discussions.
The system provides a solid foundation for organizations seeking to implement advanced spam detection capabilities while maintaining control over their data and detection logic. The modular architecture allows for easy customization and extension based on specific organizational requirements and threat landscapes.
No comments:
Post a Comment