Introduction and Problem Statement
In today's information-saturated digital landscape, software engineers and technology professionals face an overwhelming challenge: staying current with relevant news across multiple domains while maintaining productivity in their primary work. Traditional news aggregators often present unfiltered streams of content without intelligent categorization or meaningful summarization. This creates a significant time investment burden for users who need to manually sift through numerous articles to extract actionable insights.
An LLM-based news aggregator addresses this challenge by leveraging the natural language understanding capabilities of large language models to automatically discover, categorize, summarize, and present news content in a structured, user-friendly format. The system acts as an intelligent intermediary between raw web content and the end user, applying sophisticated text processing to transform chaotic information streams into organized, digestible knowledge packages.
The core value proposition lies in the system's ability to understand context, extract semantic meaning, and present information in a way that respects the user's time constraints while ensuring comprehensive coverage of their specified interests. Unlike traditional keyword-based aggregators, an LLM-powered system can understand nuanced relationships between topics, identify emerging trends, and provide contextual summaries that highlight the most relevant aspects of each news item.
System Architecture Overview
The architecture of an LLM-based news aggregator consists of several interconnected components that work together to create a seamless content discovery and presentation experience. At its foundation, the system operates on a pipeline architecture where raw web content flows through multiple processing stages before reaching the end user.
The data ingestion layer serves as the entry point for content discovery. This component continuously monitors various news sources, RSS feeds, and web publications to identify new content that matches user-specified topics. The ingestion process must be robust enough to handle different content formats, from traditional news websites to modern single-page applications that load content dynamically.
Following data ingestion, the content processing layer applies LLM-powered analysis to extract meaningful information from raw HTML and text content. This stage involves cleaning the extracted text, removing advertisements and navigation elements, and preparing the content for semantic analysis. The LLM component then performs topic classification, relevance scoring, and summary generation.
The storage and retrieval layer manages the processed content, maintaining indexes for efficient querying and ensuring that duplicate content is properly handled. This component also tracks publication dates, source credibility metrics, and user engagement patterns to inform future content recommendations.
Finally, the presentation layer organizes the processed content according to user preferences, whether by topic, publication date, or alphabetical ordering by title. This layer also handles the generation of the final output format, including URLs, summaries, and metadata presentation.
Core Components Deep Dive
LLM Integration represents the heart of the system's intelligence. The choice between local and remote LLM deployment significantly impacts both performance characteristics and operational considerations. Local LLM deployment offers several advantages including reduced latency, enhanced privacy control, and independence from external service availability. However, local deployment requires substantial computational resources and ongoing model management overhead.
When implementing local LLM integration, software engineers must consider hardware requirements, model selection, and optimization strategies. Modern transformer-based models require significant GPU memory, with larger models like GPT-3.5 equivalent architectures requiring 16GB or more of VRAM for efficient inference. The following code example demonstrates a basic local LLM integration using a popular open-source framework:
This code example shows how to initialize a local language model for news processing tasks. The AutoTokenizer and AutoModelForCausalLM classes from the transformers library provide a standardized interface for loading pre-trained models. The model selection here uses a relatively lightweight option that balances performance with resource requirements. The torch.cuda.is_available() check ensures that GPU acceleration is used when available, falling back to CPU processing when necessary. The model.eval() call switches the model to evaluation mode, which disables dropout and batch normalization updates during inference.
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime
class LocalLLMNewsProcessor:
def __init__(self, model_name="microsoft/DialoGPT-medium"):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(model_name)
self.model.to(self.device)
self.model.eval()
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
def summarize_article(self, article_text, max_length=150):
prompt = f"Summarize the following news article in 2-3 sentences: {article_text[:1000]}"
inputs = self.tokenizer.encode(prompt, return_tensors="pt",
max_length=512, truncation=True)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=inputs.shape[1] + max_length,
num_return_sequences=1,
temperature=0.7,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
summary = self.tokenizer.decode(outputs[0][inputs.shape[1]:],
skip_special_tokens=True)
return summary.strip()
def classify_topic(self, article_text):
topics = ["politics", "technology", "sports", "science", "business", "entertainment"]
prompt = f"Classify this article into one of these categories: {', '.join(topics)}. Article: {article_text[:500]}"
inputs = self.tokenizer.encode(prompt, return_tensors="pt",
max_length=512, truncation=True)
inputs = inputs.to(self.device)
with torch.no_grad():
outputs = self.model.generate(
inputs,
max_length=inputs.shape[1] + 50,
num_return_sequences=1,
temperature=0.3,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
classification = self.tokenizer.decode(outputs[0][inputs.shape[1]:],
skip_special_tokens=True)
for topic in topics:
if topic.lower() in classification.lower():
return topic
return "general"
Remote LLM integration offers different trade-offs, providing access to more powerful models without local hardware constraints. Services like OpenAI's GPT models, Anthropic's Claude, or Google's PaLM offer state-of-the-art language understanding capabilities through API interfaces. However, remote integration introduces network latency, ongoing operational costs, and potential privacy concerns when processing sensitive content.
The following code example demonstrates remote LLM integration using OpenAI's API, showing how to implement the same functionality with cloud-based language models:
This remote integration example showcases the simplicity of cloud-based LLM services while highlighting important considerations such as API key management, rate limiting, and error handling. The OpenAI client provides a clean interface for interacting with powerful language models, but developers must implement robust retry logic and cost monitoring to ensure reliable operation in production environments.
import openai
import time
from typing import Optional, Dict, Any
class RemoteLLMNewsProcessor:
def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
self.client = openai.OpenAI(api_key=api_key)
self.model = model
self.rate_limit_delay = 1.0 # seconds between requests
def _make_api_call(self, messages: list, max_tokens: int = 150) -> Optional[str]:
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
max_tokens=max_tokens,
temperature=0.7
)
time.sleep(self.rate_limit_delay) # Basic rate limiting
return response.choices[0].message.content.strip()
except Exception as e:
print(f"API call failed: {e}")
return None
def summarize_article(self, article_text: str) -> str:
messages = [
{"role": "system", "content": "You are a news summarization expert. Provide concise, accurate summaries."},
{"role": "user", "content": f"Summarize this news article in 2-3 sentences: {article_text[:2000]}"}
]
summary = self._make_api_call(messages, max_tokens=150)
return summary if summary else "Summary unavailable"
def classify_topic(self, article_text: str) -> str:
topics = ["politics", "technology", "sports", "science", "business", "entertainment", "general"]
messages = [
{"role": "system", "content": f"Classify news articles into these categories: {', '.join(topics)}. Respond with only the category name."},
{"role": "user", "content": f"Classify this article: {article_text[:1000]}"}
]
classification = self._make_api_call(messages, max_tokens=20)
if classification:
classification_lower = classification.lower().strip()
for topic in topics:
if topic in classification_lower:
return topic
return "general"
Web Scraping and Data Collection forms the foundation of content acquisition for the news aggregator. Modern web scraping must handle diverse content delivery mechanisms, from traditional server-rendered HTML to complex JavaScript-driven single-page applications. The scraping component must be robust enough to extract clean, readable content while respecting website terms of service and implementing appropriate rate limiting.
The content extraction process involves multiple steps including HTML parsing, content area identification, and text cleaning. Many news websites embed articles within complex layouts that include advertisements, navigation menus, and related content suggestions. Effective content extraction requires sophisticated parsing logic that can identify the main article content while filtering out extraneous elements.
The following comprehensive example demonstrates a robust web scraping implementation that handles various content formats and extraction challenges:
This web scraping implementation demonstrates several important concepts for building a production-ready news aggregator. The NewsArticle dataclass provides a structured representation of extracted content, ensuring consistent data handling throughout the system. The WebScraper class implements multiple extraction strategies, starting with newspaper3k for automatic content extraction and falling back to manual BeautifulSoup parsing when needed.
import requests
from bs4 import BeautifulSoup
from newspaper import Article
from urllib.parse import urljoin, urlparse
import time
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional
import re
@dataclass
class NewsArticle:
title: str
content: str
url: str
publication_date: Optional[datetime]
source: str
topic: str = "general"
summary: str = ""
class WebScraper:
def __init__(self, delay_between_requests: float = 1.0):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.delay = delay_between_requests
def extract_article(self, url: str) -> Optional[NewsArticle]:
try:
# First attempt using newspaper3k for automatic extraction
article = Article(url)
article.download()
article.parse()
if article.text and len(article.text) > 100:
return NewsArticle(
title=article.title or "No title",
content=article.text,
url=url,
publication_date=article.publish_date,
source=urlparse(url).netloc
)
# Fallback to manual extraction
return self._manual_extract(url)
except Exception as e:
print(f"Failed to extract article from {url}: {e}")
return None
def _manual_extract(self, url: str) -> Optional[NewsArticle]:
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'footer', 'aside']):
element.decompose()
# Try to find title
title = self._extract_title(soup)
# Try to find main content
content = self._extract_content(soup)
if content and len(content) > 100:
return NewsArticle(
title=title,
content=content,
url=url,
publication_date=self._extract_date(soup),
source=urlparse(url).netloc
)
return None
except Exception as e:
print(f"Manual extraction failed for {url}: {e}")
return None
def _extract_title(self, soup: BeautifulSoup) -> str:
# Try multiple title extraction strategies
title_selectors = [
'h1.entry-title',
'h1.post-title',
'h1.article-title',
'h1',
'title'
]
for selector in title_selectors:
element = soup.select_one(selector)
if element and element.get_text().strip():
return element.get_text().strip()
return "No title found"
def _extract_content(self, soup: BeautifulSoup) -> str:
# Try multiple content extraction strategies
content_selectors = [
'div.entry-content',
'div.post-content',
'div.article-content',
'div.content',
'article',
'main'
]
for selector in content_selectors:
element = soup.select_one(selector)
if element:
# Clean up the content
text = element.get_text(separator=' ', strip=True)
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
if len(text) > 100:
return text
# Fallback: get all paragraph text
paragraphs = soup.find_all('p')
content = ' '.join([p.get_text().strip() for p in paragraphs if p.get_text().strip()])
return content if len(content) > 100 else ""
def _extract_date(self, soup: BeautifulSoup) -> Optional[datetime]:
# Try to extract publication date
date_selectors = [
'time[datetime]',
'meta[property="article:published_time"]',
'meta[name="publish-date"]',
'.publish-date',
'.post-date'
]
for selector in date_selectors:
element = soup.select_one(selector)
if element:
date_str = element.get('datetime') or element.get('content') or element.get_text()
if date_str:
try:
# Try to parse various date formats
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except:
continue
return None
class NewsSourceManager:
def __init__(self):
self.sources = {
'technology': [
'https://techcrunch.com/feed/',
'https://www.theverge.com/rss/index.xml',
'https://feeds.arstechnica.com/arstechnica/index'
],
'science': [
'https://www.sciencedaily.com/rss/all.xml',
'https://www.nature.com/nature.rss'
],
'politics': [
'https://www.politico.com/rss/politicopicks.xml',
'https://www.reuters.com/rssFeed/politicsNews'
]
}
def get_sources_for_topic(self, topic: str) -> List[str]:
return self.sources.get(topic.lower(), [])
def discover_articles_from_rss(self, rss_url: str, scraper: WebScraper) -> List[NewsArticle]:
articles = []
try:
import feedparser
feed = feedparser.parse(rss_url)
for entry in feed.entries[:10]: # Limit to recent articles
article = scraper.extract_article(entry.link)
if article:
articles.append(article)
time.sleep(scraper.delay)
except Exception as e:
print(f"Failed to process RSS feed {rss_url}: {e}")
return articles
The rate limiting implementation prevents overwhelming target websites and helps maintain good relationships with content providers. The session management with appropriate headers ensures that requests appear legitimate and reduces the likelihood of being blocked by anti-bot measures.
Content Processing and Summarization represents the core intelligence layer where raw extracted content transforms into structured, actionable information. This component leverages the LLM's natural language understanding capabilities to generate concise summaries that capture the essential information while maintaining readability and context.
The summarization process must balance comprehensiveness with brevity, ensuring that key facts, implications, and context remain intact while eliminating redundant or tangential information. Advanced summarization techniques can also extract key entities, sentiment, and relevance scores to provide additional metadata for content organization.
Topic Classification and Organization enables the system to automatically categorize content according to user-specified interests. This functionality goes beyond simple keyword matching to understand semantic relationships and contextual relevance. The classification system must handle edge cases where articles span multiple topics or discuss emerging subjects that may not fit neatly into predefined categories.
The following example demonstrates an integrated content processing pipeline that combines summarization and classification:
This integrated processing pipeline demonstrates how the various components work together to transform raw article content into structured, categorized information. The ContentProcessor class serves as an orchestrator, coordinating between the LLM processor and additional analysis tools. The process_article method shows the complete workflow from raw content to final structured output.
from typing import Dict, List, Tuple
import re
from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize, sent_tokenize
class ContentProcessor:
def __init__(self, llm_processor):
self.llm_processor = llm_processor
self.stop_words = set(stopwords.words('english'))
# Topic keywords for fallback classification
self.topic_keywords = {
'technology': ['software', 'ai', 'artificial intelligence', 'computer', 'tech', 'digital', 'app', 'startup'],
'politics': ['government', 'election', 'policy', 'congress', 'senate', 'president', 'political'],
'science': ['research', 'study', 'discovery', 'experiment', 'scientific', 'laboratory'],
'sports': ['game', 'team', 'player', 'championship', 'league', 'tournament', 'score'],
'business': ['company', 'market', 'stock', 'economy', 'financial', 'revenue', 'profit'],
'entertainment': ['movie', 'music', 'celebrity', 'film', 'actor', 'entertainment', 'show']
}
def process_article(self, article: NewsArticle) -> NewsArticle:
# Generate summary
article.summary = self.llm_processor.summarize_article(article.content)
# Classify topic
article.topic = self.classify_article_topic(article)
# Extract additional metadata
article.word_count = len(article.content.split())
article.reading_time = max(1, article.word_count // 200) # Approximate reading time
article.key_entities = self.extract_key_entities(article.content)
return article
def classify_article_topic(self, article: NewsArticle) -> str:
# Try LLM classification first
llm_topic = self.llm_processor.classify_topic(article.content)
if llm_topic and llm_topic != "general":
return llm_topic
# Fallback to keyword-based classification
return self._keyword_based_classification(article.content)
def _keyword_based_classification(self, content: str) -> str:
content_lower = content.lower()
topic_scores = {}
for topic, keywords in self.topic_keywords.items():
score = sum(content_lower.count(keyword) for keyword in keywords)
topic_scores[topic] = score
if topic_scores:
best_topic = max(topic_scores, key=topic_scores.get)
if topic_scores[best_topic] > 0:
return best_topic
return "general"
def extract_key_entities(self, content: str) -> List[str]:
# Simple entity extraction using frequency analysis
words = word_tokenize(content.lower())
words = [word for word in words if word.isalpha() and word not in self.stop_words]
# Find most frequent words that might be entities
word_freq = Counter(words)
common_words = [word for word, freq in word_freq.most_common(10) if freq > 1]
# Filter for likely entities (capitalized words in original text)
entities = []
sentences = sent_tokenize(content)
for sentence in sentences:
words_in_sentence = sentence.split()
for word in words_in_sentence:
clean_word = re.sub(r'[^\w]', '', word)
if (clean_word.lower() in common_words and
word[0].isupper() and
len(clean_word) > 3 and
clean_word not in entities):
entities.append(clean_word)
return entities[:5] # Return top 5 entities
class NewsAggregator:
def __init__(self, llm_processor, max_articles_per_topic: int = 20):
self.content_processor = ContentProcessor(llm_processor)
self.scraper = WebScraper()
self.source_manager = NewsSourceManager()
self.max_articles_per_topic = max_articles_per_topic
self.processed_articles = {}
def aggregate_news(self, topics: List[str]) -> Dict[str, List[NewsArticle]]:
results = {}
for topic in topics:
print(f"Processing topic: {topic}")
articles = []
# Get RSS sources for this topic
rss_sources = self.source_manager.get_sources_for_topic(topic)
for rss_url in rss_sources:
print(f" Processing RSS feed: {rss_url}")
topic_articles = self.source_manager.discover_articles_from_rss(rss_url, self.scraper)
for article in topic_articles:
if len(articles) >= self.max_articles_per_topic:
break
# Process the article
processed_article = self.content_processor.process_article(article)
articles.append(processed_article)
if len(articles) >= self.max_articles_per_topic:
break
results[topic] = articles
print(f" Found {len(articles)} articles for {topic}")
return results
def sort_articles(self, articles: List[NewsArticle], sort_by: str = "date") -> List[NewsArticle]:
if sort_by == "date":
return sorted(articles,
key=lambda x: x.publication_date or datetime.min,
reverse=True)
elif sort_by == "title":
return sorted(articles, key=lambda x: x.title.lower())
elif sort_by == "source":
return sorted(articles, key=lambda x: x.source.lower())
else:
return articles
def format_output(self, aggregated_news: Dict[str, List[NewsArticle]], sort_by: str = "date") -> str:
output = []
output.append("=" * 80)
output.append("NEWS AGGREGATION REPORT")
output.append(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
output.append("=" * 80)
for topic, articles in aggregated_news.items():
if not articles:
continue
output.append(f"\n{topic.upper()} NEWS")
output.append("-" * 40)
sorted_articles = self.sort_articles(articles, sort_by)
for i, article in enumerate(sorted_articles, 1):
output.append(f"\n{i}. {article.title}")
output.append(f" Source: {article.source}")
output.append(f" URL: {article.url}")
if article.publication_date:
output.append(f" Published: {article.publication_date.strftime('%Y-%m-%d %H:%M')}")
output.append(f" Summary: {article.summary}")
if hasattr(article, 'key_entities') and article.key_entities:
output.append(f" Key entities: {', '.join(article.key_entities)}")
if hasattr(article, 'reading_time'):
output.append(f" Reading time: {article.reading_time} minutes")
return "\n".join(output)
Tool Integration and Search Functionality extends the system's capabilities beyond basic content aggregation. Modern LLM frameworks often provide access to external tools and search APIs that can enhance content discovery and analysis. Integration with search engines, social media APIs, and specialized news databases can provide broader coverage and real-time updates.
The tool integration layer must handle API rate limits, authentication, and error recovery gracefully. Different tools may have varying response formats and reliability characteristics, requiring robust adapter patterns to maintain consistent internal data representations.
Search functionality can operate at multiple levels, from initial content discovery to user-initiated queries within the aggregated content. Advanced search implementations can leverage semantic similarity matching to find related articles or identify trending topics across multiple sources.
Data Storage and Retrieval considerations become critical as the system scales to handle larger volumes of content and user requests. The storage layer must efficiently handle both structured metadata and unstructured content while supporting complex queries for content organization and retrieval.
Database design should accommodate the temporal nature of news content, where recent articles are accessed frequently while older content may be archived or purged. Indexing strategies must balance query performance with storage efficiency, particularly for full-text search capabilities.
The following example demonstrates a comprehensive storage and retrieval system:
This storage implementation showcases several important database design patterns for news aggregation systems. The SQLite database provides a lightweight, embedded solution suitable for development and smaller deployments, while the schema design supports efficient querying and content organization. The full-text search capabilities enable users to find relevant articles based on content similarity rather than just metadata matching.
import sqlite3
from typing import List, Optional, Dict, Any
import json
from datetime import datetime, timedelta
import hashlib
class NewsDatabase:
def __init__(self, db_path: str = "news_aggregator.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create articles table
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url_hash TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
summary TEXT,
url TEXT NOT NULL,
source TEXT NOT NULL,
topic TEXT NOT NULL,
publication_date DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
word_count INTEGER,
reading_time INTEGER,
key_entities TEXT,
processed BOOLEAN DEFAULT FALSE
)
''')
# Create full-text search table
cursor.execute('''
CREATE VIRTUAL TABLE IF NOT EXISTS articles_fts USING fts5(
title, content, summary, key_entities,
content='articles', content_rowid='id'
)
''')
# Create indexes for common queries
cursor.execute('CREATE INDEX IF NOT EXISTS idx_topic ON articles(topic)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_publication_date ON articles(publication_date)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_source ON articles(source)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON articles(created_at)')
conn.commit()
conn.close()
def _generate_url_hash(self, url: str) -> str:
return hashlib.md5(url.encode()).hexdigest()
def store_article(self, article: NewsArticle) -> bool:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
url_hash = self._generate_url_hash(article.url)
key_entities_json = json.dumps(getattr(article, 'key_entities', []))
cursor.execute('''
INSERT OR REPLACE INTO articles
(url_hash, title, content, summary, url, source, topic,
publication_date, word_count, reading_time, key_entities, processed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
url_hash, article.title, article.content, article.summary,
article.url, article.source, article.topic,
article.publication_date, getattr(article, 'word_count', 0),
getattr(article, 'reading_time', 0), key_entities_json, True
))
# Update FTS index
cursor.execute('''
INSERT OR REPLACE INTO articles_fts
(rowid, title, content, summary, key_entities)
SELECT id, title, content, summary, key_entities
FROM articles WHERE url_hash = ?
''', (url_hash,))
conn.commit()
return True
except sqlite3.Error as e:
print(f"Database error storing article: {e}")
return False
finally:
conn.close()
def get_articles_by_topic(self, topic: str, limit: int = 50,
days_back: int = 7) -> List[NewsArticle]:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cutoff_date = datetime.now() - timedelta(days=days_back)
cursor.execute('''
SELECT title, content, summary, url, source, topic,
publication_date, word_count, reading_time, key_entities
FROM articles
WHERE topic = ? AND created_at >= ?
ORDER BY publication_date DESC, created_at DESC
LIMIT ?
''', (topic, cutoff_date, limit))
articles = []
for row in cursor.fetchall():
article = NewsArticle(
title=row[0],
content=row[1],
url=row[3],
publication_date=datetime.fromisoformat(row[6]) if row[6] else None,
source=row[4],
topic=row[5],
summary=row[2]
)
article.word_count = row[7] or 0
article.reading_time = row[8] or 0
article.key_entities = json.loads(row[9]) if row[9] else []
articles.append(article)
conn.close()
return articles
def search_articles(self, query: str, limit: int = 20) -> List[NewsArticle]:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Use FTS for content search
cursor.execute('''
SELECT a.title, a.content, a.summary, a.url, a.source, a.topic,
a.publication_date, a.word_count, a.reading_time, a.key_entities
FROM articles_fts fts
JOIN articles a ON a.id = fts.rowid
WHERE articles_fts MATCH ?
ORDER BY rank
LIMIT ?
''', (query, limit))
articles = []
for row in cursor.fetchall():
article = NewsArticle(
title=row[0],
content=row[1],
url=row[3],
publication_date=datetime.fromisoformat(row[6]) if row[6] else None,
source=row[4],
topic=row[5],
summary=row[2]
)
article.word_count = row[7] or 0
article.reading_time = row[8] or 0
article.key_entities = json.loads(row[9]) if row[9] else []
articles.append(article)
conn.close()
return articles
def get_trending_topics(self, days_back: int = 1) -> List[Tuple[str, int]]:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cutoff_date = datetime.now() - timedelta(days=days_back)
cursor.execute('''
SELECT topic, COUNT(*) as article_count
FROM articles
WHERE created_at >= ?
GROUP BY topic
ORDER BY article_count DESC
''', (cutoff_date,))
trends = cursor.fetchall()
conn.close()
return trends
def cleanup_old_articles(self, days_to_keep: int = 30):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
cursor.execute('DELETE FROM articles WHERE created_at < ?', (cutoff_date,))
cursor.execute('DELETE FROM articles_fts WHERE rowid NOT IN (SELECT id FROM articles)')
conn.commit()
conn.close()
print(f"Cleaned up articles older than {days_to_keep} days")
User Interface and Experience considerations focus on presenting the aggregated content in a format that maximizes usability while minimizing cognitive load. The interface must support different consumption patterns, from quick headline scanning to detailed article reading, while providing efficient navigation between topics and time periods.
Command-line interfaces offer simplicity and scriptability for technical users, while web-based interfaces can provide richer interaction models including real-time updates and personalized content recommendations. The choice of interface technology should align with the target user base and deployment environment.
Performance Considerations become increasingly important as the system scales to handle more sources, topics, and users. Content processing pipelines must be designed to handle concurrent operations efficiently, while storage systems need to support high-throughput read and write operations.
Caching strategies can significantly improve response times for frequently accessed content, while background processing can ensure that new content is available without blocking user interactions. Load balancing and horizontal scaling considerations become relevant for multi-user deployments.
The following example demonstrates a complete system integration with performance optimizations:
This complete system integration demonstrates how all components work together to create a functional news aggregator. The performance optimizations include connection pooling, concurrent processing, and intelligent caching to ensure responsive operation even with large content volumes. The main execution flow shows how a user would interact with the system to get categorized, summarized news content.
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from typing import Dict, List, Set
import time
class PerformantNewsAggregator:
def __init__(self, llm_processor, max_workers: int = 5):
self.llm_processor = llm_processor
self.database = NewsDatabase()
self.content_processor = ContentProcessor(llm_processor)
self.scraper = WebScraper()
self.source_manager = NewsSourceManager()
self.max_workers = max_workers
self.cache = {}
self.cache_lock = threading.Lock()
self.cache_ttl = 3600 # 1 hour cache TTL
def _get_cache_key(self, topics: List[str], sort_by: str) -> str:
return f"{'_'.join(sorted(topics))}_{sort_by}"
def _is_cache_valid(self, cache_entry: Dict) -> bool:
return time.time() - cache_entry['timestamp'] < self.cache_ttl
def get_cached_results(self, topics: List[str], sort_by: str) -> Optional[Dict]:
cache_key = self._get_cache_key(topics, sort_by)
with self.cache_lock:
if cache_key in self.cache and self._is_cache_valid(self.cache[cache_key]):
return self.cache[cache_key]['data']
return None
def cache_results(self, topics: List[str], sort_by: str, results: Dict):
cache_key = self._get_cache_key(topics, sort_by)
with self.cache_lock:
self.cache[cache_key] = {
'data': results,
'timestamp': time.time()
}
def process_articles_concurrently(self, articles: List[NewsArticle]) -> List[NewsArticle]:
processed_articles = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit processing tasks
future_to_article = {
executor.submit(self.content_processor.process_article, article): article
for article in articles
}
# Collect results as they complete
for future in as_completed(future_to_article):
try:
processed_article = future.result(timeout=30) # 30 second timeout
processed_articles.append(processed_article)
# Store in database immediately
self.database.store_article(processed_article)
except Exception as e:
original_article = future_to_article[future]
print(f"Failed to process article {original_article.url}: {e}")
return processed_articles
def aggregate_news_optimized(self, topics: List[str], sort_by: str = "date",
use_cache: bool = True) -> Dict[str, List[NewsArticle]]:
# Check cache first
if use_cache:
cached_results = self.get_cached_results(topics, sort_by)
if cached_results:
print("Returning cached results")
return cached_results
print("Fetching fresh content...")
start_time = time.time()
all_articles = []
processed_urls = set()
# Collect articles from all sources
for topic in topics:
print(f"Processing topic: {topic}")
# First, try to get recent articles from database
db_articles = self.database.get_articles_by_topic(topic, limit=10, days_back=1)
topic_articles = list(db_articles)
processed_urls.update(article.url for article in db_articles)
# Then fetch new articles from RSS feeds
rss_sources = self.source_manager.get_sources_for_topic(topic)
for rss_url in rss_sources:
try:
new_articles = self.source_manager.discover_articles_from_rss(rss_url, self.scraper)
# Filter out already processed articles
fresh_articles = [
article for article in new_articles
if article.url not in processed_urls
]
if fresh_articles:
# Process new articles
processed_new = self.process_articles_concurrently(fresh_articles)
topic_articles.extend(processed_new)
processed_urls.update(article.url for article in processed_new)
except Exception as e:
print(f"Error processing RSS feed {rss_url}: {e}")
all_articles.extend(topic_articles)
# Organize results by topic
results = {}
for topic in topics:
topic_articles = [article for article in all_articles if article.topic == topic]
# Sort articles according to user preference
if sort_by == "date":
topic_articles.sort(key=lambda x: x.publication_date or datetime.min, reverse=True)
elif sort_by == "title":
topic_articles.sort(key=lambda x: x.title.lower())
elif sort_by == "source":
topic_articles.sort(key=lambda x: x.source.lower())
results[topic] = topic_articles[:20] # Limit to top 20 per topic
# Cache the results
if use_cache:
self.cache_results(topics, sort_by, results)
processing_time = time.time() - start_time
print(f"Content aggregation completed in {processing_time:.2f} seconds")
return results
def generate_report(self, topics: List[str], sort_by: str = "date",
output_format: str = "text") -> str:
aggregated_news = self.aggregate_news_optimized(topics, sort_by)
if output_format == "text":
return self._format_text_report(aggregated_news, sort_by)
elif output_format == "json":
return self._format_json_report(aggregated_news)
else:
raise ValueError(f"Unsupported output format: {output_format}")
def _format_text_report(self, aggregated_news: Dict[str, List[NewsArticle]],
sort_by: str) -> str:
output = []
output.append("=" * 80)
output.append("INTELLIGENT NEWS AGGREGATION REPORT")
output.append(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
output.append(f"Sorted by: {sort_by}")
output.append("=" * 80)
total_articles = sum(len(articles) for articles in aggregated_news.values())
output.append(f"Total articles processed: {total_articles}")
output.append("")
for topic, articles in aggregated_news.items():
if not articles:
continue
output.append(f"{topic.upper()} NEWS ({len(articles)} articles)")
output.append("-" * 50)
for i, article in enumerate(articles, 1):
output.append(f"\n{i}. {article.title}")
output.append(f" Source: {article.source}")
output.append(f" URL: {article.url}")
if article.publication_date:
output.append(f" Published: {article.publication_date.strftime('%Y-%m-%d %H:%M')}")
if article.summary:
# Wrap summary text for better readability
summary_lines = []
words = article.summary.split()
current_line = " Summary: "
for word in words:
if len(current_line + word) > 75:
summary_lines.append(current_line)
current_line = " " + word
else:
current_line += word + " "
if current_line.strip():
summary_lines.append(current_line)
output.extend(summary_lines)
if hasattr(article, 'key_entities') and article.key_entities:
output.append(f" Key entities: {', '.join(article.key_entities)}")
if hasattr(article, 'reading_time') and article.reading_time:
output.append(f" Reading time: {article.reading_time} minutes")
output.append("")
# Add trending topics summary
trending = self.database.get_trending_topics(days_back=1)
if trending:
output.append("TRENDING TOPICS (Last 24 hours)")
output.append("-" * 30)
for topic, count in trending[:5]:
output.append(f" {topic}: {count} articles")
return "\n".join(output)
def _format_json_report(self, aggregated_news: Dict[str, List[NewsArticle]]) -> str:
json_data = {
"timestamp": datetime.now().isoformat(),
"topics": {}
}
for topic, articles in aggregated_news.items():
json_data["topics"][topic] = []
for article in articles:
article_data = {
"title": article.title,
"url": article.url,
"source": article.source,
"summary": article.summary,
"publication_date": article.publication_date.isoformat() if article.publication_date else None,
"word_count": getattr(article, 'word_count', 0),
"reading_time": getattr(article, 'reading_time', 0),
"key_entities": getattr(article, 'key_entities', [])
}
json_data["topics"][topic].append(article_data)
return json.dumps(json_data, indent=2, ensure_ascii=False)
# Example usage and main execution
def main():
# Initialize the system with a local LLM processor
print("Initializing LLM-based News Aggregator...")
# Choose between local and remote LLM
use_local_llm = True # Set to False to use OpenAI API
if use_local_llm:
llm_processor = LocalLLMNewsProcessor()
print("Using local LLM processor")
else:
# For remote LLM, you would need to provide your API key
# llm_processor = RemoteLLMNewsProcessor(api_key="your-api-key-here")
print("Remote LLM not configured, using local LLM")
llm_processor = LocalLLMNewsProcessor()
# Initialize the news aggregator
aggregator = PerformantNewsAggregator(llm_processor, max_workers=3)
# Define topics of interest
topics = ["technology", "science", "politics"]
print(f"Aggregating news for topics: {', '.join(topics)}")
print("This may take a few minutes for initial processing...")
try:
# Generate the news report
report = aggregator.generate_report(
topics=topics,
sort_by="date", # Options: "date", "title", "source"
output_format="text" # Options: "text", "json"
)
# Display the report
print(report)
# Optionally save to file
with open(f"news_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", "w", encoding="utf-8") as f:
f.write(report)
print(f"\nReport saved to file: news_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
except Exception as e:
print(f"Error generating news report: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
Challenges and Solutions in building an LLM-based news aggregator encompass both technical and operational aspects. Content extraction reliability varies significantly across different website architectures, requiring robust fallback mechanisms and continuous adaptation to changing web technologies. Anti-bot measures implemented by news websites can interfere with automated content collection, necessitating respectful scraping practices and potentially requiring human-in-the-loop verification for critical sources.
LLM processing costs and latency can become significant factors when processing large volumes of content. Local LLM deployment addresses cost concerns but introduces infrastructure complexity and maintenance overhead. Hybrid approaches that use local models for basic processing and remote models for complex analysis can provide optimal cost-performance trade-offs.
Content quality and accuracy present ongoing challenges, as LLM-generated summaries may occasionally introduce inaccuracies or miss important nuances. Implementing confidence scoring and human review processes for critical content categories can help maintain quality standards while preserving automation benefits.
Future Enhancements for LLM-based news aggregators include integration with real-time data streams, personalized content recommendation engines, and advanced sentiment analysis capabilities. Multi-modal content processing could extend the system to handle video and audio news sources, while integration with social media platforms could provide broader coverage of emerging stories and public sentiment.
Advanced analytics capabilities could identify trending topics, track story evolution over time, and provide predictive insights about developing news situations. Integration with fact-checking services and source credibility databases could enhance the reliability and trustworthiness of aggregated content.
The system architecture presented here provides a solid foundation for building sophisticated news aggregation tools that leverage the power of large language models while maintaining practical considerations for deployment and operation. The modular design allows for incremental enhancement and adaptation to changing requirements and technologies.
This comprehensive implementation demonstrates how modern AI capabilities can transform traditional content aggregation approaches, providing users with intelligent, contextual, and efficiently organized access to the vast landscape of digital news content. The combination of automated content discovery, intelligent processing, and structured presentation creates a powerful tool for staying informed in an increasingly complex information environment.
No comments:
Post a Comment