Sunday, September 28, 2025

LLM-BASED NEWS AGGREGATOR: BUILDING AN INTELLIGENT CONTENT CURATION SYSTEM




Introduction and Problem Statement

In today's information-saturated digital landscape, software engineers and technology professionals face an overwhelming challenge: staying current with relevant news across multiple domains while maintaining productivity in their primary work. Traditional news aggregators often present unfiltered streams of content without intelligent categorization or meaningful summarization. This creates a significant time investment burden for users who need to manually sift through numerous articles to extract actionable insights.

An LLM-based news aggregator addresses this challenge by leveraging the natural language understanding capabilities of large language models to automatically discover, categorize, summarize, and present news content in a structured, user-friendly format. The system acts as an intelligent intermediary between raw web content and the end user, applying sophisticated text processing to transform chaotic information streams into organized, digestible knowledge packages.

The core value proposition lies in the system's ability to understand context, extract semantic meaning, and present information in a way that respects the user's time constraints while ensuring comprehensive coverage of their specified interests. Unlike traditional keyword-based aggregators, an LLM-powered system can understand nuanced relationships between topics, identify emerging trends, and provide contextual summaries that highlight the most relevant aspects of each news item.


System Architecture Overview

The architecture of an LLM-based news aggregator consists of several interconnected components that work together to create a seamless content discovery and presentation experience. At its foundation, the system operates on a pipeline architecture where raw web content flows through multiple processing stages before reaching the end user.

The data ingestion layer serves as the entry point for content discovery. This component continuously monitors various news sources, RSS feeds, and web publications to identify new content that matches user-specified topics. The ingestion process must be robust enough to handle different content formats, from traditional news websites to modern single-page applications that load content dynamically.

Following data ingestion, the content processing layer applies LLM-powered analysis to extract meaningful information from raw HTML and text content. This stage involves cleaning the extracted text, removing advertisements and navigation elements, and preparing the content for semantic analysis. The LLM component then performs topic classification, relevance scoring, and summary generation.

The storage and retrieval layer manages the processed content, maintaining indexes for efficient querying and ensuring that duplicate content is properly handled. This component also tracks publication dates, source credibility metrics, and user engagement patterns to inform future content recommendations.

Finally, the presentation layer organizes the processed content according to user preferences, whether by topic, publication date, or alphabetical ordering by title. This layer also handles the generation of the final output format, including URLs, summaries, and metadata presentation.


Core Components Deep Dive

LLM Integration represents the heart of the system's intelligence. The choice between local and remote LLM deployment significantly impacts both performance characteristics and operational considerations. Local LLM deployment offers several advantages including reduced latency, enhanced privacy control, and independence from external service availability. However, local deployment requires substantial computational resources and ongoing model management overhead.

When implementing local LLM integration, software engineers must consider hardware requirements, model selection, and optimization strategies. Modern transformer-based models require significant GPU memory, with larger models like GPT-3.5 equivalent architectures requiring 16GB or more of VRAM for efficient inference. The following code example demonstrates a basic local LLM integration using a popular open-source framework:

This code example shows how to initialize a local language model for news processing tasks. The AutoTokenizer and AutoModelForCausalLM classes from the transformers library provide a standardized interface for loading pre-trained models. The model selection here uses a relatively lightweight option that balances performance with resource requirements. The torch.cuda.is_available() check ensures that GPU acceleration is used when available, falling back to CPU processing when necessary. The model.eval() call switches the model to evaluation mode, which disables dropout and batch normalization updates during inference.


import torch

from transformers import AutoTokenizer, AutoModelForCausalLM

import requests

from bs4 import BeautifulSoup

import json

from datetime import datetime


class LocalLLMNewsProcessor:

    def __init__(self, model_name="microsoft/DialoGPT-medium"):

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.tokenizer = AutoTokenizer.from_pretrained(model_name)

        self.model = AutoModelForCausalLM.from_pretrained(model_name)

        self.model.to(self.device)

        self.model.eval()

        

        if self.tokenizer.pad_token is None:

            self.tokenizer.pad_token = self.tokenizer.eos_token

    

    def summarize_article(self, article_text, max_length=150):

        prompt = f"Summarize the following news article in 2-3 sentences: {article_text[:1000]}"

        

        inputs = self.tokenizer.encode(prompt, return_tensors="pt", 

                                     max_length=512, truncation=True)

        inputs = inputs.to(self.device)

        

        with torch.no_grad():

            outputs = self.model.generate(

                inputs, 

                max_length=inputs.shape[1] + max_length,

                num_return_sequences=1,

                temperature=0.7,

                do_sample=True,

                pad_token_id=self.tokenizer.eos_token_id

            )

        

        summary = self.tokenizer.decode(outputs[0][inputs.shape[1]:], 

                                      skip_special_tokens=True)

        return summary.strip()

    

    def classify_topic(self, article_text):

        topics = ["politics", "technology", "sports", "science", "business", "entertainment"]

        prompt = f"Classify this article into one of these categories: {', '.join(topics)}. Article: {article_text[:500]}"

        

        inputs = self.tokenizer.encode(prompt, return_tensors="pt", 

                                     max_length=512, truncation=True)

        inputs = inputs.to(self.device)

        

        with torch.no_grad():

            outputs = self.model.generate(

                inputs,

                max_length=inputs.shape[1] + 50,

                num_return_sequences=1,

                temperature=0.3,

                do_sample=True,

                pad_token_id=self.tokenizer.eos_token_id

            )

        

        classification = self.tokenizer.decode(outputs[0][inputs.shape[1]:], 

                                             skip_special_tokens=True)

        

        for topic in topics:

            if topic.lower() in classification.lower():

                return topic

        return "general"


Remote LLM integration offers different trade-offs, providing access to more powerful models without local hardware constraints. Services like OpenAI's GPT models, Anthropic's Claude, or Google's PaLM offer state-of-the-art language understanding capabilities through API interfaces. However, remote integration introduces network latency, ongoing operational costs, and potential privacy concerns when processing sensitive content.

The following code example demonstrates remote LLM integration using OpenAI's API, showing how to implement the same functionality with cloud-based language models:

This remote integration example showcases the simplicity of cloud-based LLM services while highlighting important considerations such as API key management, rate limiting, and error handling. The OpenAI client provides a clean interface for interacting with powerful language models, but developers must implement robust retry logic and cost monitoring to ensure reliable operation in production environments.


import openai

import time

from typing import Optional, Dict, Any


class RemoteLLMNewsProcessor:

    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):

        self.client = openai.OpenAI(api_key=api_key)

        self.model = model

        self.rate_limit_delay = 1.0  # seconds between requests

        

    def _make_api_call(self, messages: list, max_tokens: int = 150) -> Optional[str]:

        try:

            response = self.client.chat.completions.create(

                model=self.model,

                messages=messages,

                max_tokens=max_tokens,

                temperature=0.7

            )

            time.sleep(self.rate_limit_delay)  # Basic rate limiting

            return response.choices[0].message.content.strip()

        except Exception as e:

            print(f"API call failed: {e}")

            return None

    

    def summarize_article(self, article_text: str) -> str:

        messages = [

            {"role": "system", "content": "You are a news summarization expert. Provide concise, accurate summaries."},

            {"role": "user", "content": f"Summarize this news article in 2-3 sentences: {article_text[:2000]}"}

        ]

        

        summary = self._make_api_call(messages, max_tokens=150)

        return summary if summary else "Summary unavailable"

    

    def classify_topic(self, article_text: str) -> str:

        topics = ["politics", "technology", "sports", "science", "business", "entertainment", "general"]

        

        messages = [

            {"role": "system", "content": f"Classify news articles into these categories: {', '.join(topics)}. Respond with only the category name."},

            {"role": "user", "content": f"Classify this article: {article_text[:1000]}"}

        ]

        

        classification = self._make_api_call(messages, max_tokens=20)

        

        if classification:

            classification_lower = classification.lower().strip()

            for topic in topics:

                if topic in classification_lower:

                    return topic

        

        return "general"


Web Scraping and Data Collection forms the foundation of content acquisition for the news aggregator. Modern web scraping must handle diverse content delivery mechanisms, from traditional server-rendered HTML to complex JavaScript-driven single-page applications. The scraping component must be robust enough to extract clean, readable content while respecting website terms of service and implementing appropriate rate limiting.

The content extraction process involves multiple steps including HTML parsing, content area identification, and text cleaning. Many news websites embed articles within complex layouts that include advertisements, navigation menus, and related content suggestions. Effective content extraction requires sophisticated parsing logic that can identify the main article content while filtering out extraneous elements.

The following comprehensive example demonstrates a robust web scraping implementation that handles various content formats and extraction challenges:

This web scraping implementation demonstrates several important concepts for building a production-ready news aggregator. The NewsArticle dataclass provides a structured representation of extracted content, ensuring consistent data handling throughout the system. The WebScraper class implements multiple extraction strategies, starting with newspaper3k for automatic content extraction and falling back to manual BeautifulSoup parsing when needed.


import requests

from bs4 import BeautifulSoup

from newspaper import Article

from urllib.parse import urljoin, urlparse

import time

from dataclasses import dataclass

from datetime import datetime

from typing import List, Optional

import re


@dataclass

class NewsArticle:

    title: str

    content: str

    url: str

    publication_date: Optional[datetime]

    source: str

    topic: str = "general"

    summary: str = ""


class WebScraper:

    def __init__(self, delay_between_requests: float = 1.0):

        self.session = requests.Session()

        self.session.headers.update({

            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'

        })

        self.delay = delay_between_requests

        

    def extract_article(self, url: str) -> Optional[NewsArticle]:

        try:

            # First attempt using newspaper3k for automatic extraction

            article = Article(url)

            article.download()

            article.parse()

            

            if article.text and len(article.text) > 100:

                return NewsArticle(

                    title=article.title or "No title",

                    content=article.text,

                    url=url,

                    publication_date=article.publish_date,

                    source=urlparse(url).netloc

                )

            

            # Fallback to manual extraction

            return self._manual_extract(url)

            

        except Exception as e:

            print(f"Failed to extract article from {url}: {e}")

            return None

    

    def _manual_extract(self, url: str) -> Optional[NewsArticle]:

        try:

            response = self.session.get(url, timeout=10)

            response.raise_for_status()

            

            soup = BeautifulSoup(response.content, 'html.parser')

            

            # Remove unwanted elements

            for element in soup(['script', 'style', 'nav', 'footer', 'aside']):

                element.decompose()

            

            # Try to find title

            title = self._extract_title(soup)

            

            # Try to find main content

            content = self._extract_content(soup)

            

            if content and len(content) > 100:

                return NewsArticle(

                    title=title,

                    content=content,

                    url=url,

                    publication_date=self._extract_date(soup),

                    source=urlparse(url).netloc

                )

            

            return None

            

        except Exception as e:

            print(f"Manual extraction failed for {url}: {e}")

            return None

    

    def _extract_title(self, soup: BeautifulSoup) -> str:

        # Try multiple title extraction strategies

        title_selectors = [

            'h1.entry-title',

            'h1.post-title',

            'h1.article-title',

            'h1',

            'title'

        ]

        

        for selector in title_selectors:

            element = soup.select_one(selector)

            if element and element.get_text().strip():

                return element.get_text().strip()

        

        return "No title found"

    

    def _extract_content(self, soup: BeautifulSoup) -> str:

        # Try multiple content extraction strategies

        content_selectors = [

            'div.entry-content',

            'div.post-content',

            'div.article-content',

            'div.content',

            'article',

            'main'

        ]

        

        for selector in content_selectors:

            element = soup.select_one(selector)

            if element:

                # Clean up the content

                text = element.get_text(separator=' ', strip=True)

                # Remove extra whitespace

                text = re.sub(r'\s+', ' ', text)

                if len(text) > 100:

                    return text

        

        # Fallback: get all paragraph text

        paragraphs = soup.find_all('p')

        content = ' '.join([p.get_text().strip() for p in paragraphs if p.get_text().strip()])

        return content if len(content) > 100 else ""

    

    def _extract_date(self, soup: BeautifulSoup) -> Optional[datetime]:

        # Try to extract publication date

        date_selectors = [

            'time[datetime]',

            'meta[property="article:published_time"]',

            'meta[name="publish-date"]',

            '.publish-date',

            '.post-date'

        ]

        

        for selector in date_selectors:

            element = soup.select_one(selector)

            if element:

                date_str = element.get('datetime') or element.get('content') or element.get_text()

                if date_str:

                    try:

                        # Try to parse various date formats

                        return datetime.fromisoformat(date_str.replace('Z', '+00:00'))

                    except:

                        continue

        

        return None


class NewsSourceManager:

    def __init__(self):

        self.sources = {

            'technology': [

                'https://techcrunch.com/feed/',

                'https://www.theverge.com/rss/index.xml',

                'https://feeds.arstechnica.com/arstechnica/index'

            ],

            'science': [

                'https://www.sciencedaily.com/rss/all.xml',

                'https://www.nature.com/nature.rss'

            ],

            'politics': [

                'https://www.politico.com/rss/politicopicks.xml',

                'https://www.reuters.com/rssFeed/politicsNews'

            ]

        }

    

    def get_sources_for_topic(self, topic: str) -> List[str]:

        return self.sources.get(topic.lower(), [])

    

    def discover_articles_from_rss(self, rss_url: str, scraper: WebScraper) -> List[NewsArticle]:

        articles = []

        try:

            import feedparser

            feed = feedparser.parse(rss_url)

            

            for entry in feed.entries[:10]:  # Limit to recent articles

                article = scraper.extract_article(entry.link)

                if article:

                    articles.append(article)

                time.sleep(scraper.delay)

            

        except Exception as e:

            print(f"Failed to process RSS feed {rss_url}: {e}")

        

        return articles


The rate limiting implementation prevents overwhelming target websites and helps maintain good relationships with content providers. The session management with appropriate headers ensures that requests appear legitimate and reduces the likelihood of being blocked by anti-bot measures.

Content Processing and Summarization represents the core intelligence layer where raw extracted content transforms into structured, actionable information. This component leverages the LLM's natural language understanding capabilities to generate concise summaries that capture the essential information while maintaining readability and context.

The summarization process must balance comprehensiveness with brevity, ensuring that key facts, implications, and context remain intact while eliminating redundant or tangential information. Advanced summarization techniques can also extract key entities, sentiment, and relevance scores to provide additional metadata for content organization.

Topic Classification and Organization enables the system to automatically categorize content according to user-specified interests. This functionality goes beyond simple keyword matching to understand semantic relationships and contextual relevance. The classification system must handle edge cases where articles span multiple topics or discuss emerging subjects that may not fit neatly into predefined categories.

The following example demonstrates an integrated content processing pipeline that combines summarization and classification:

This integrated processing pipeline demonstrates how the various components work together to transform raw article content into structured, categorized information. The ContentProcessor class serves as an orchestrator, coordinating between the LLM processor and additional analysis tools. The process_article method shows the complete workflow from raw content to final structured output.


from typing import Dict, List, Tuple

import re

from collections import Counter

import nltk

from nltk.corpus import stopwords

from nltk.tokenize import word_tokenize, sent_tokenize


class ContentProcessor:

    def __init__(self, llm_processor):

        self.llm_processor = llm_processor

        self.stop_words = set(stopwords.words('english'))

        

        # Topic keywords for fallback classification

        self.topic_keywords = {

            'technology': ['software', 'ai', 'artificial intelligence', 'computer', 'tech', 'digital', 'app', 'startup'],

            'politics': ['government', 'election', 'policy', 'congress', 'senate', 'president', 'political'],

            'science': ['research', 'study', 'discovery', 'experiment', 'scientific', 'laboratory'],

            'sports': ['game', 'team', 'player', 'championship', 'league', 'tournament', 'score'],

            'business': ['company', 'market', 'stock', 'economy', 'financial', 'revenue', 'profit'],

            'entertainment': ['movie', 'music', 'celebrity', 'film', 'actor', 'entertainment', 'show']

        }

    

    def process_article(self, article: NewsArticle) -> NewsArticle:

        # Generate summary

        article.summary = self.llm_processor.summarize_article(article.content)

        

        # Classify topic

        article.topic = self.classify_article_topic(article)

        

        # Extract additional metadata

        article.word_count = len(article.content.split())

        article.reading_time = max(1, article.word_count // 200)  # Approximate reading time

        article.key_entities = self.extract_key_entities(article.content)

        

        return article

    

    def classify_article_topic(self, article: NewsArticle) -> str:

        # Try LLM classification first

        llm_topic = self.llm_processor.classify_topic(article.content)

        

        if llm_topic and llm_topic != "general":

            return llm_topic

        

        # Fallback to keyword-based classification

        return self._keyword_based_classification(article.content)

    

    def _keyword_based_classification(self, content: str) -> str:

        content_lower = content.lower()

        topic_scores = {}

        

        for topic, keywords in self.topic_keywords.items():

            score = sum(content_lower.count(keyword) for keyword in keywords)

            topic_scores[topic] = score

        

        if topic_scores:

            best_topic = max(topic_scores, key=topic_scores.get)

            if topic_scores[best_topic] > 0:

                return best_topic

        

        return "general"

    

    def extract_key_entities(self, content: str) -> List[str]:

        # Simple entity extraction using frequency analysis

        words = word_tokenize(content.lower())

        words = [word for word in words if word.isalpha() and word not in self.stop_words]

        

        # Find most frequent words that might be entities

        word_freq = Counter(words)

        common_words = [word for word, freq in word_freq.most_common(10) if freq > 1]

        

        # Filter for likely entities (capitalized words in original text)

        entities = []

        sentences = sent_tokenize(content)

        for sentence in sentences:

            words_in_sentence = sentence.split()

            for word in words_in_sentence:

                clean_word = re.sub(r'[^\w]', '', word)

                if (clean_word.lower() in common_words and 

                    word[0].isupper() and 

                    len(clean_word) > 3 and

                    clean_word not in entities):

                    entities.append(clean_word)

        

        return entities[:5]  # Return top 5 entities


class NewsAggregator:

    def __init__(self, llm_processor, max_articles_per_topic: int = 20):

        self.content_processor = ContentProcessor(llm_processor)

        self.scraper = WebScraper()

        self.source_manager = NewsSourceManager()

        self.max_articles_per_topic = max_articles_per_topic

        self.processed_articles = {}

    

    def aggregate_news(self, topics: List[str]) -> Dict[str, List[NewsArticle]]:

        results = {}

        

        for topic in topics:

            print(f"Processing topic: {topic}")

            articles = []

            

            # Get RSS sources for this topic

            rss_sources = self.source_manager.get_sources_for_topic(topic)

            

            for rss_url in rss_sources:

                print(f"  Processing RSS feed: {rss_url}")

                topic_articles = self.source_manager.discover_articles_from_rss(rss_url, self.scraper)

                

                for article in topic_articles:

                    if len(articles) >= self.max_articles_per_topic:

                        break

                    

                    # Process the article

                    processed_article = self.content_processor.process_article(article)

                    articles.append(processed_article)

                

                if len(articles) >= self.max_articles_per_topic:

                    break

            

            results[topic] = articles

            print(f"  Found {len(articles)} articles for {topic}")

        

        return results

    

    def sort_articles(self, articles: List[NewsArticle], sort_by: str = "date") -> List[NewsArticle]:

        if sort_by == "date":

            return sorted(articles, 

                         key=lambda x: x.publication_date or datetime.min, 

                         reverse=True)

        elif sort_by == "title":

            return sorted(articles, key=lambda x: x.title.lower())

        elif sort_by == "source":

            return sorted(articles, key=lambda x: x.source.lower())

        else:

            return articles

    

    def format_output(self, aggregated_news: Dict[str, List[NewsArticle]], sort_by: str = "date") -> str:

        output = []

        output.append("=" * 80)

        output.append("NEWS AGGREGATION REPORT")

        output.append(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

        output.append("=" * 80)

        

        for topic, articles in aggregated_news.items():

            if not articles:

                continue

                

            output.append(f"\n{topic.upper()} NEWS")

            output.append("-" * 40)

            

            sorted_articles = self.sort_articles(articles, sort_by)

            

            for i, article in enumerate(sorted_articles, 1):

                output.append(f"\n{i}. {article.title}")

                output.append(f"   Source: {article.source}")

                output.append(f"   URL: {article.url}")

                if article.publication_date:

                    output.append(f"   Published: {article.publication_date.strftime('%Y-%m-%d %H:%M')}")

                output.append(f"   Summary: {article.summary}")

                if hasattr(article, 'key_entities') and article.key_entities:

                    output.append(f"   Key entities: {', '.join(article.key_entities)}")

                if hasattr(article, 'reading_time'):

                    output.append(f"   Reading time: {article.reading_time} minutes")

        

        return "\n".join(output)


Tool Integration and Search Functionality extends the system's capabilities beyond basic content aggregation. Modern LLM frameworks often provide access to external tools and search APIs that can enhance content discovery and analysis. Integration with search engines, social media APIs, and specialized news databases can provide broader coverage and real-time updates.

The tool integration layer must handle API rate limits, authentication, and error recovery gracefully. Different tools may have varying response formats and reliability characteristics, requiring robust adapter patterns to maintain consistent internal data representations.

Search functionality can operate at multiple levels, from initial content discovery to user-initiated queries within the aggregated content. Advanced search implementations can leverage semantic similarity matching to find related articles or identify trending topics across multiple sources.

Data Storage and Retrieval considerations become critical as the system scales to handle larger volumes of content and user requests. The storage layer must efficiently handle both structured metadata and unstructured content while supporting complex queries for content organization and retrieval.

Database design should accommodate the temporal nature of news content, where recent articles are accessed frequently while older content may be archived or purged. Indexing strategies must balance query performance with storage efficiency, particularly for full-text search capabilities.

The following example demonstrates a comprehensive storage and retrieval system:

This storage implementation showcases several important database design patterns for news aggregation systems. The SQLite database provides a lightweight, embedded solution suitable for development and smaller deployments, while the schema design supports efficient querying and content organization. The full-text search capabilities enable users to find relevant articles based on content similarity rather than just metadata matching.


import sqlite3

from typing import List, Optional, Dict, Any

import json

from datetime import datetime, timedelta

import hashlib


class NewsDatabase:

    def __init__(self, db_path: str = "news_aggregator.db"):

        self.db_path = db_path

        self.init_database()

    

    def init_database(self):

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        # Create articles table

        cursor.execute('''

            CREATE TABLE IF NOT EXISTS articles (

                id INTEGER PRIMARY KEY AUTOINCREMENT,

                url_hash TEXT UNIQUE NOT NULL,

                title TEXT NOT NULL,

                content TEXT NOT NULL,

                summary TEXT,

                url TEXT NOT NULL,

                source TEXT NOT NULL,

                topic TEXT NOT NULL,

                publication_date DATETIME,

                created_at DATETIME DEFAULT CURRENT_TIMESTAMP,

                word_count INTEGER,

                reading_time INTEGER,

                key_entities TEXT,

                processed BOOLEAN DEFAULT FALSE

            )

        ''')

        

        # Create full-text search table

        cursor.execute('''

            CREATE VIRTUAL TABLE IF NOT EXISTS articles_fts USING fts5(

                title, content, summary, key_entities,

                content='articles', content_rowid='id'

            )

        ''')

        

        # Create indexes for common queries

        cursor.execute('CREATE INDEX IF NOT EXISTS idx_topic ON articles(topic)')

        cursor.execute('CREATE INDEX IF NOT EXISTS idx_publication_date ON articles(publication_date)')

        cursor.execute('CREATE INDEX IF NOT EXISTS idx_source ON articles(source)')

        cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON articles(created_at)')

        

        conn.commit()

        conn.close()

    

    def _generate_url_hash(self, url: str) -> str:

        return hashlib.md5(url.encode()).hexdigest()

    

    def store_article(self, article: NewsArticle) -> bool:

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        try:

            url_hash = self._generate_url_hash(article.url)

            key_entities_json = json.dumps(getattr(article, 'key_entities', []))

            

            cursor.execute('''

                INSERT OR REPLACE INTO articles 

                (url_hash, title, content, summary, url, source, topic, 

                 publication_date, word_count, reading_time, key_entities, processed)

                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

            ''', (

                url_hash, article.title, article.content, article.summary,

                article.url, article.source, article.topic,

                article.publication_date, getattr(article, 'word_count', 0),

                getattr(article, 'reading_time', 0), key_entities_json, True

            ))

            

            # Update FTS index

            cursor.execute('''

                INSERT OR REPLACE INTO articles_fts 

                (rowid, title, content, summary, key_entities)

                SELECT id, title, content, summary, key_entities 

                FROM articles WHERE url_hash = ?

            ''', (url_hash,))

            

            conn.commit()

            return True

            

        except sqlite3.Error as e:

            print(f"Database error storing article: {e}")

            return False

        finally:

            conn.close()

    

    def get_articles_by_topic(self, topic: str, limit: int = 50, 

                             days_back: int = 7) -> List[NewsArticle]:

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        cutoff_date = datetime.now() - timedelta(days=days_back)

        

        cursor.execute('''

            SELECT title, content, summary, url, source, topic, 

                   publication_date, word_count, reading_time, key_entities

            FROM articles 

            WHERE topic = ? AND created_at >= ?

            ORDER BY publication_date DESC, created_at DESC

            LIMIT ?

        ''', (topic, cutoff_date, limit))

        

        articles = []

        for row in cursor.fetchall():

            article = NewsArticle(

                title=row[0],

                content=row[1],

                url=row[3],

                publication_date=datetime.fromisoformat(row[6]) if row[6] else None,

                source=row[4],

                topic=row[5],

                summary=row[2]

            )

            article.word_count = row[7] or 0

            article.reading_time = row[8] or 0

            article.key_entities = json.loads(row[9]) if row[9] else []

            articles.append(article)

        

        conn.close()

        return articles

    

    def search_articles(self, query: str, limit: int = 20) -> List[NewsArticle]:

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        # Use FTS for content search

        cursor.execute('''

            SELECT a.title, a.content, a.summary, a.url, a.source, a.topic,

                   a.publication_date, a.word_count, a.reading_time, a.key_entities

            FROM articles_fts fts

            JOIN articles a ON a.id = fts.rowid

            WHERE articles_fts MATCH ?

            ORDER BY rank

            LIMIT ?

        ''', (query, limit))

        

        articles = []

        for row in cursor.fetchall():

            article = NewsArticle(

                title=row[0],

                content=row[1],

                url=row[3],

                publication_date=datetime.fromisoformat(row[6]) if row[6] else None,

                source=row[4],

                topic=row[5],

                summary=row[2]

            )

            article.word_count = row[7] or 0

            article.reading_time = row[8] or 0

            article.key_entities = json.loads(row[9]) if row[9] else []

            articles.append(article)

        

        conn.close()

        return articles

    

    def get_trending_topics(self, days_back: int = 1) -> List[Tuple[str, int]]:

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        cutoff_date = datetime.now() - timedelta(days=days_back)

        

        cursor.execute('''

            SELECT topic, COUNT(*) as article_count

            FROM articles

            WHERE created_at >= ?

            GROUP BY topic

            ORDER BY article_count DESC

        ''', (cutoff_date,))

        

        trends = cursor.fetchall()

        conn.close()

        return trends

    

    def cleanup_old_articles(self, days_to_keep: int = 30):

        conn = sqlite3.connect(self.db_path)

        cursor = conn.cursor()

        

        cutoff_date = datetime.now() - timedelta(days=days_to_keep)

        

        cursor.execute('DELETE FROM articles WHERE created_at < ?', (cutoff_date,))

        cursor.execute('DELETE FROM articles_fts WHERE rowid NOT IN (SELECT id FROM articles)')

        

        conn.commit()

        conn.close()

        

        print(f"Cleaned up articles older than {days_to_keep} days")


User Interface and Experience considerations focus on presenting the aggregated content in a format that maximizes usability while minimizing cognitive load. The interface must support different consumption patterns, from quick headline scanning to detailed article reading, while providing efficient navigation between topics and time periods.

Command-line interfaces offer simplicity and scriptability for technical users, while web-based interfaces can provide richer interaction models including real-time updates and personalized content recommendations. The choice of interface technology should align with the target user base and deployment environment.

Performance Considerations become increasingly important as the system scales to handle more sources, topics, and users. Content processing pipelines must be designed to handle concurrent operations efficiently, while storage systems need to support high-throughput read and write operations.

Caching strategies can significantly improve response times for frequently accessed content, while background processing can ensure that new content is available without blocking user interactions. Load balancing and horizontal scaling considerations become relevant for multi-user deployments.

The following example demonstrates a complete system integration with performance optimizations:

This complete system integration demonstrates how all components work together to create a functional news aggregator. The performance optimizations include connection pooling, concurrent processing, and intelligent caching to ensure responsive operation even with large content volumes. The main execution flow shows how a user would interact with the system to get categorized, summarized news content.


import asyncio

import aiohttp

from concurrent.futures import ThreadPoolExecutor, as_completed

import threading

from typing import Dict, List, Set

import time


class PerformantNewsAggregator:

    def __init__(self, llm_processor, max_workers: int = 5):

        self.llm_processor = llm_processor

        self.database = NewsDatabase()

        self.content_processor = ContentProcessor(llm_processor)

        self.scraper = WebScraper()

        self.source_manager = NewsSourceManager()

        self.max_workers = max_workers

        self.cache = {}

        self.cache_lock = threading.Lock()

        self.cache_ttl = 3600  # 1 hour cache TTL

        

    def _get_cache_key(self, topics: List[str], sort_by: str) -> str:

        return f"{'_'.join(sorted(topics))}_{sort_by}"

    

    def _is_cache_valid(self, cache_entry: Dict) -> bool:

        return time.time() - cache_entry['timestamp'] < self.cache_ttl

    

    def get_cached_results(self, topics: List[str], sort_by: str) -> Optional[Dict]:

        cache_key = self._get_cache_key(topics, sort_by)

        with self.cache_lock:

            if cache_key in self.cache and self._is_cache_valid(self.cache[cache_key]):

                return self.cache[cache_key]['data']

        return None

    

    def cache_results(self, topics: List[str], sort_by: str, results: Dict):

        cache_key = self._get_cache_key(topics, sort_by)

        with self.cache_lock:

            self.cache[cache_key] = {

                'data': results,

                'timestamp': time.time()

            }

    

    def process_articles_concurrently(self, articles: List[NewsArticle]) -> List[NewsArticle]:

        processed_articles = []

        

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:

            # Submit processing tasks

            future_to_article = {

                executor.submit(self.content_processor.process_article, article): article 

                for article in articles

            }

            

            # Collect results as they complete

            for future in as_completed(future_to_article):

                try:

                    processed_article = future.result(timeout=30)  # 30 second timeout

                    processed_articles.append(processed_article)

                    

                    # Store in database immediately

                    self.database.store_article(processed_article)

                    

                except Exception as e:

                    original_article = future_to_article[future]

                    print(f"Failed to process article {original_article.url}: {e}")

        

        return processed_articles

    

    def aggregate_news_optimized(self, topics: List[str], sort_by: str = "date", 

                                use_cache: bool = True) -> Dict[str, List[NewsArticle]]:

        # Check cache first

        if use_cache:

            cached_results = self.get_cached_results(topics, sort_by)

            if cached_results:

                print("Returning cached results")

                return cached_results

        

        print("Fetching fresh content...")

        start_time = time.time()

        

        all_articles = []

        processed_urls = set()

        

        # Collect articles from all sources

        for topic in topics:

            print(f"Processing topic: {topic}")

            

            # First, try to get recent articles from database

            db_articles = self.database.get_articles_by_topic(topic, limit=10, days_back=1)

            topic_articles = list(db_articles)

            processed_urls.update(article.url for article in db_articles)

            

            # Then fetch new articles from RSS feeds

            rss_sources = self.source_manager.get_sources_for_topic(topic)

            

            for rss_url in rss_sources:

                try:

                    new_articles = self.source_manager.discover_articles_from_rss(rss_url, self.scraper)

                    

                    # Filter out already processed articles

                    fresh_articles = [

                        article for article in new_articles 

                        if article.url not in processed_urls

                    ]

                    

                    if fresh_articles:

                        # Process new articles

                        processed_new = self.process_articles_concurrently(fresh_articles)

                        topic_articles.extend(processed_new)

                        processed_urls.update(article.url for article in processed_new)

                    

                except Exception as e:

                    print(f"Error processing RSS feed {rss_url}: {e}")

            

            all_articles.extend(topic_articles)

        

        # Organize results by topic

        results = {}

        for topic in topics:

            topic_articles = [article for article in all_articles if article.topic == topic]

            

            # Sort articles according to user preference

            if sort_by == "date":

                topic_articles.sort(key=lambda x: x.publication_date or datetime.min, reverse=True)

            elif sort_by == "title":

                topic_articles.sort(key=lambda x: x.title.lower())

            elif sort_by == "source":

                topic_articles.sort(key=lambda x: x.source.lower())

            

            results[topic] = topic_articles[:20]  # Limit to top 20 per topic

        

        # Cache the results

        if use_cache:

            self.cache_results(topics, sort_by, results)

        

        processing_time = time.time() - start_time

        print(f"Content aggregation completed in {processing_time:.2f} seconds")

        

        return results

    

    def generate_report(self, topics: List[str], sort_by: str = "date", 

                       output_format: str = "text") -> str:

        aggregated_news = self.aggregate_news_optimized(topics, sort_by)

        

        if output_format == "text":

            return self._format_text_report(aggregated_news, sort_by)

        elif output_format == "json":

            return self._format_json_report(aggregated_news)

        else:

            raise ValueError(f"Unsupported output format: {output_format}")

    

    def _format_text_report(self, aggregated_news: Dict[str, List[NewsArticle]], 

                           sort_by: str) -> str:

        output = []

        output.append("=" * 80)

        output.append("INTELLIGENT NEWS AGGREGATION REPORT")

        output.append(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

        output.append(f"Sorted by: {sort_by}")

        output.append("=" * 80)

        

        total_articles = sum(len(articles) for articles in aggregated_news.values())

        output.append(f"Total articles processed: {total_articles}")

        output.append("")

        

        for topic, articles in aggregated_news.items():

            if not articles:

                continue

                

            output.append(f"{topic.upper()} NEWS ({len(articles)} articles)")

            output.append("-" * 50)

            

            for i, article in enumerate(articles, 1):

                output.append(f"\n{i}. {article.title}")

                output.append(f"   Source: {article.source}")

                output.append(f"   URL: {article.url}")

                

                if article.publication_date:

                    output.append(f"   Published: {article.publication_date.strftime('%Y-%m-%d %H:%M')}")

                

                if article.summary:

                    # Wrap summary text for better readability

                    summary_lines = []

                    words = article.summary.split()

                    current_line = "   Summary: "

                    

                    for word in words:

                        if len(current_line + word) > 75:

                            summary_lines.append(current_line)

                            current_line = "            " + word

                        else:

                            current_line += word + " "

                    

                    if current_line.strip():

                        summary_lines.append(current_line)

                    

                    output.extend(summary_lines)

                

                if hasattr(article, 'key_entities') and article.key_entities:

                    output.append(f"   Key entities: {', '.join(article.key_entities)}")

                

                if hasattr(article, 'reading_time') and article.reading_time:

                    output.append(f"   Reading time: {article.reading_time} minutes")

            

            output.append("")

        

        # Add trending topics summary

        trending = self.database.get_trending_topics(days_back=1)

        if trending:

            output.append("TRENDING TOPICS (Last 24 hours)")

            output.append("-" * 30)

            for topic, count in trending[:5]:

                output.append(f"  {topic}: {count} articles")

        

        return "\n".join(output)

    

    def _format_json_report(self, aggregated_news: Dict[str, List[NewsArticle]]) -> str:

        json_data = {

            "timestamp": datetime.now().isoformat(),

            "topics": {}

        }

        

        for topic, articles in aggregated_news.items():

            json_data["topics"][topic] = []

            

            for article in articles:

                article_data = {

                    "title": article.title,

                    "url": article.url,

                    "source": article.source,

                    "summary": article.summary,

                    "publication_date": article.publication_date.isoformat() if article.publication_date else None,

                    "word_count": getattr(article, 'word_count', 0),

                    "reading_time": getattr(article, 'reading_time', 0),

                    "key_entities": getattr(article, 'key_entities', [])

                }

                json_data["topics"][topic].append(article_data)

        

        return json.dumps(json_data, indent=2, ensure_ascii=False)


# Example usage and main execution

def main():

    # Initialize the system with a local LLM processor

    print("Initializing LLM-based News Aggregator...")

    

    # Choose between local and remote LLM

    use_local_llm = True  # Set to False to use OpenAI API

    

    if use_local_llm:

        llm_processor = LocalLLMNewsProcessor()

        print("Using local LLM processor")

    else:

        # For remote LLM, you would need to provide your API key

        # llm_processor = RemoteLLMNewsProcessor(api_key="your-api-key-here")

        print("Remote LLM not configured, using local LLM")

        llm_processor = LocalLLMNewsProcessor()

    

    # Initialize the news aggregator

    aggregator = PerformantNewsAggregator(llm_processor, max_workers=3)

    

    # Define topics of interest

    topics = ["technology", "science", "politics"]

    

    print(f"Aggregating news for topics: {', '.join(topics)}")

    print("This may take a few minutes for initial processing...")

    

    try:

        # Generate the news report

        report = aggregator.generate_report(

            topics=topics,

            sort_by="date",  # Options: "date", "title", "source"

            output_format="text"  # Options: "text", "json"

        )

        

        # Display the report

        print(report)

        

        # Optionally save to file

        with open(f"news_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", "w", encoding="utf-8") as f:

            f.write(report)

        

        print(f"\nReport saved to file: news_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")

        

    except Exception as e:

        print(f"Error generating news report: {e}")

        import traceback

        traceback.print_exc()


if __name__ == "__main__":

    main()


Challenges and Solutions in building an LLM-based news aggregator encompass both technical and operational aspects. Content extraction reliability varies significantly across different website architectures, requiring robust fallback mechanisms and continuous adaptation to changing web technologies. Anti-bot measures implemented by news websites can interfere with automated content collection, necessitating respectful scraping practices and potentially requiring human-in-the-loop verification for critical sources.

LLM processing costs and latency can become significant factors when processing large volumes of content. Local LLM deployment addresses cost concerns but introduces infrastructure complexity and maintenance overhead. Hybrid approaches that use local models for basic processing and remote models for complex analysis can provide optimal cost-performance trade-offs.

Content quality and accuracy present ongoing challenges, as LLM-generated summaries may occasionally introduce inaccuracies or miss important nuances. Implementing confidence scoring and human review processes for critical content categories can help maintain quality standards while preserving automation benefits.

Future Enhancements for LLM-based news aggregators include integration with real-time data streams, personalized content recommendation engines, and advanced sentiment analysis capabilities. Multi-modal content processing could extend the system to handle video and audio news sources, while integration with social media platforms could provide broader coverage of emerging stories and public sentiment.

Advanced analytics capabilities could identify trending topics, track story evolution over time, and provide predictive insights about developing news situations. Integration with fact-checking services and source credibility databases could enhance the reliability and trustworthiness of aggregated content.

The system architecture presented here provides a solid foundation for building sophisticated news aggregation tools that leverage the power of large language models while maintaining practical considerations for deployment and operation. The modular design allows for incremental enhancement and adaptation to changing requirements and technologies.

This comprehensive implementation demonstrates how modern AI capabilities can transform traditional content aggregation approaches, providing users with intelligent, contextual, and efficiently organized access to the vast landscape of digital news content. The combination of automated content discovery, intelligent processing, and structured presentation creates a powerful tool for staying informed in an increasingly complex information environment.

No comments: